推拉函数与queue
This commit is contained in:
@ -47,4 +47,20 @@ class CustomerChangeInfoLogModel extends CustomerBaseModel
|
||||
return $this->addItem($aItem);
|
||||
}
|
||||
|
||||
function addUserActiveStatusLog($uid,$beforeValue,$value,$remark_key): \Illuminate\Database\Eloquent\Model|\Illuminate\Database\Eloquent\Builder|array|null
|
||||
{
|
||||
$aLogInsert = [
|
||||
'type' => CustomerChangeInfoLogModel::TYPE_CHANG_USER_ACTIVE_STATUS,
|
||||
'uid' => $uid,
|
||||
'column' => CustomerUserExtendModel::COL_IS_ACTIVE,
|
||||
'before_value' => $beforeValue,
|
||||
'pid' => CustomerChangeInfoLogModel::PID_SYSTEM,
|
||||
'value' => $value,
|
||||
'after_value' => $value,
|
||||
'remark_key' => $remark_key,
|
||||
'remark_desc' => CustomerChangeInfoLogModel::REMARK[$remark_key],
|
||||
];
|
||||
return $this->addLog($aLogInsert);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -2,9 +2,13 @@
|
||||
|
||||
namespace App\Models\Customer;
|
||||
|
||||
use App\Const\Queue;
|
||||
use App\Exceptions\ModelException;
|
||||
use App\Jobs\UserActiveStatusQueue;
|
||||
use App\Models\Base\CustomerBaseModel;
|
||||
use App\Models\Post\PostPushBoxModel;
|
||||
use App\Models\WebSocket\CustomerWsHistoryModel;
|
||||
use App\Structs\QueueUserActiveStatusStruct;
|
||||
use Carbon\Carbon;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
@ -36,8 +40,8 @@ class CustomerUserExtendModel extends CustomerBaseModel
|
||||
//增加用户扩展信息
|
||||
function addExtend($aItem): \Illuminate\Database\Eloquent\Model|\Illuminate\Database\Eloquent\Builder|array|null
|
||||
{
|
||||
if(empty($aItem['uid'])) throw new ModelException('uid error');
|
||||
if($this->findItem($aItem['uid'],['uid'])) return null; //已存在
|
||||
if (empty($aItem['uid'])) throw new ModelException('uid error');
|
||||
if ($this->findItem($aItem['uid'], ['uid'])) return null; //已存在
|
||||
$sDateTime = date('Y-m-d H:i:s');
|
||||
$aItem['updated_at'] = $sDateTime;
|
||||
return $this->addItem($aItem);
|
||||
@ -46,33 +50,33 @@ class CustomerUserExtendModel extends CustomerBaseModel
|
||||
//增加当前粉丝总数
|
||||
function incrFansNum($uid): int
|
||||
{
|
||||
$oExtend = $this->findItem($uid,'uid');
|
||||
if(!$oExtend) throw new ModelException('user extend not found');
|
||||
return $this->newQuery()->where('uid',$uid)->increment('fans_num');
|
||||
$oExtend = $this->findItem($uid, 'uid');
|
||||
if (!$oExtend) throw new ModelException('user extend not found');
|
||||
return $this->newQuery()->where('uid', $uid)->increment('fans_num');
|
||||
}
|
||||
|
||||
//减去当前追随者总数
|
||||
function decrFansNum($uid): int
|
||||
{
|
||||
$oExtend = $this->findItem($uid,'uid');
|
||||
if(!$oExtend) throw new ModelException('user extend not found');
|
||||
return $this->newQuery()->where('uid',$uid)->decrement('fans_num');
|
||||
$oExtend = $this->findItem($uid, 'uid');
|
||||
if (!$oExtend) throw new ModelException('user extend not found');
|
||||
return $this->newQuery()->where('uid', $uid)->decrement('fans_num');
|
||||
}
|
||||
|
||||
//增加当前订阅总数
|
||||
function incrFollowNum($uid): int
|
||||
{
|
||||
$oExtend = $this->findItem($uid,'uid');
|
||||
if(!$oExtend) throw new ModelException('user extend not found');
|
||||
return $this->newQuery()->where('uid',$uid)->increment('follow_num');
|
||||
$oExtend = $this->findItem($uid, 'uid');
|
||||
if (!$oExtend) throw new ModelException('user extend not found');
|
||||
return $this->newQuery()->where('uid', $uid)->increment('follow_num');
|
||||
}
|
||||
|
||||
//减去当前订阅总数
|
||||
function decrFollowNum($uid): int
|
||||
{
|
||||
$oExtend = $this->findItem($uid,'uid');
|
||||
if(!$oExtend) throw new ModelException('user extend not found');
|
||||
return $this->newQuery()->where('uid',$uid)->decrement('follow_num');
|
||||
$oExtend = $this->findItem($uid, 'uid');
|
||||
if (!$oExtend) throw new ModelException('user extend not found');
|
||||
return $this->newQuery()->where('uid', $uid)->decrement('follow_num');
|
||||
}
|
||||
|
||||
//获取用户活跃信息
|
||||
@ -86,69 +90,92 @@ class CustomerUserExtendModel extends CustomerBaseModel
|
||||
}
|
||||
|
||||
//检测所有用户活跃状态
|
||||
function updateUserActiveStatus($date = null): void
|
||||
function updateAllUserActiveStatus($date = null): void
|
||||
{
|
||||
if(empty($date)) $date = Carbon::yesterday()->toDateString();
|
||||
if (empty($date)) $date = Carbon::yesterday()->toDateString();
|
||||
|
||||
$oCustomerWsHistoryModel = new CustomerWsHistoryModel();
|
||||
$aActiveUserIdList = $oCustomerWsHistoryModel->getActiveUserIdList($date); //三日内活跃用户
|
||||
if(empty($aActiveUserIdList)) return;
|
||||
if (empty($aActiveUserIdList)) return;
|
||||
$oCustomerChangeInfoLogModel = new CustomerChangeInfoLogModel();
|
||||
|
||||
$nowUid = 0;
|
||||
while (true){
|
||||
try{
|
||||
while (true) {
|
||||
try {
|
||||
Db::beginTransaction();
|
||||
$aUserList = $this->getUserActiveListLimit($nowUid, 500);
|
||||
if(empty($aUserList)) break;
|
||||
$nowUid = max($aUserList->pluck('uid')->toArray());
|
||||
$aUserExtendList = $this->getUserActiveListLimit($nowUid, 500);
|
||||
if (empty($aUserExtendList)) break;
|
||||
$nowUid = max($aUserExtendList->pluck('uid')->toArray());
|
||||
|
||||
foreach ($aUserList as $oUser){
|
||||
$isChanged = false;
|
||||
$aLogInsert = [
|
||||
'type' => CustomerChangeInfoLogModel::TYPE_CHANG_USER_ACTIVE_STATUS,
|
||||
'uid' => $oUser->uid,
|
||||
'column' => self::COL_IS_ACTIVE,
|
||||
'before_value' => $oUser->is_active,
|
||||
'pid' => CustomerChangeInfoLogModel::PID_SYSTEM,
|
||||
];
|
||||
|
||||
if(in_array($oUser->uid, $aActiveUserIdList)){ //在活跃列表中
|
||||
if($oUser->is_active == self::IS_ACTIVE_YES) continue; //已经是活跃用户
|
||||
foreach ($aUserExtendList as $oUserExtend) {
|
||||
if (in_array($oUserExtend->uid, $aActiveUserIdList)) { //在活跃列表中
|
||||
if ($oUserExtend->is_active == self::IS_ACTIVE_YES) continue; //已经是活跃用户
|
||||
//变更用户状态
|
||||
$res = $this->newQuery()->where('uid',$oUser->uid)->update(['is_active'=>self::IS_ACTIVE_YES]);
|
||||
if($res) {
|
||||
$isChanged = true;
|
||||
$aLogInsert['value'] = self::IS_ACTIVE_YES;
|
||||
$aLogInsert['after_value'] = self::IS_ACTIVE_YES;
|
||||
$aLogInsert['remark_key'] = CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES;
|
||||
$aLogInsert['remark_desc'] = CustomerChangeInfoLogModel::REMARK[CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES];
|
||||
$res = $this->newQuery()->where('uid', $oUserExtend->uid)->update(['is_active' => self::IS_ACTIVE_YES]);
|
||||
if ($res) {
|
||||
//记录日志
|
||||
$oCustomerChangeInfoLogModel->addUserActiveStatusLog($oUserExtend->uid, $oUserExtend->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES);
|
||||
$this->activeUserStatusToQueueProducer($oUserExtend->uid); //投递到消息队列
|
||||
}
|
||||
}else{ //三日内不活跃
|
||||
if($oUser->is_active == self::IS_ACTIVE_NO) continue; //已经是不活跃用户
|
||||
} else { //三日内不活跃
|
||||
if ($oUserExtend->is_active == self::IS_ACTIVE_NO) continue; //已经是不活跃用户
|
||||
//变更用户状态
|
||||
$res = $this->newQuery()->where('uid',$oUser->uid)->update(['is_active'=>self::IS_ACTIVE_NO]);
|
||||
if($res) {
|
||||
$isChanged = true;
|
||||
$aLogInsert['value'] = self::IS_ACTIVE_NO;
|
||||
$aLogInsert['after_value'] = self::IS_ACTIVE_NO;
|
||||
$aLogInsert['remark_key'] = CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_NO;
|
||||
$aLogInsert['remark_desc'] = CustomerChangeInfoLogModel::REMARK[CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_NO];
|
||||
$res = $this->newQuery()->where('uid', $oUserExtend->uid)->update(['is_active' => self::IS_ACTIVE_NO]);
|
||||
if ($res) {
|
||||
//记录日志
|
||||
$oCustomerChangeInfoLogModel->addUserActiveStatusLog($oUserExtend->uid, $oUserExtend->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_NO);
|
||||
}
|
||||
}
|
||||
|
||||
if($isChanged){ //有变化插入日志
|
||||
$oCustomerChangeInfoLogModel->addLog($aLogInsert);
|
||||
}
|
||||
|
||||
}
|
||||
Db::commit();
|
||||
}catch (\Exception $e){
|
||||
Log::error('updateUserActiveStatus error:'.$e->getMessage());
|
||||
} catch (\Exception $e) {
|
||||
Log::error('updateUserActiveStatus error:' . $e->getMessage());
|
||||
Db::rollBack();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
//登录检测用户活跃状态
|
||||
function updateUserActiveStatus($uid): void
|
||||
{
|
||||
try{
|
||||
Db::beginTransaction();
|
||||
$oCustomerWsHistoryModel = new CustomerWsHistoryModel();
|
||||
$aActiveUserId = $oCustomerWsHistoryModel->findActiveUserId($uid, Carbon::yesterday()->toDateString()); //三日内活跃用户
|
||||
if(empty($aActiveUserId)) return;
|
||||
$oCustomerUserExtendModel = $this->newQuery()->where('uid', $uid)->first(['uid','is_active']);
|
||||
if(!$oCustomerUserExtendModel) return;
|
||||
$oCustomerChangeInfoLogModel = new CustomerChangeInfoLogModel();
|
||||
|
||||
$res = $this->newQuery()->where('uid', $uid)->update(['is_active' => self::IS_ACTIVE_YES]);
|
||||
if ($res) {
|
||||
//记录日志
|
||||
$oCustomerChangeInfoLogModel->addUserActiveStatusLog($uid, $oCustomerUserExtendModel->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES);
|
||||
$this->activeUserStatusToQueueProducer($uid); //投递到消息队列
|
||||
}
|
||||
Db::commit();
|
||||
}catch (\Exception $e){
|
||||
Log::error('updateUserActiveStatus error:' . $e->getMessage());
|
||||
Db::rollBack();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//投递到消息队列,处理活跃推送信箱更新问题
|
||||
function activeUserStatusToQueueProducer($uid): void
|
||||
{
|
||||
$params = QueueUserActiveStatusStruct::PARAMS;
|
||||
$params['uid'] = $uid;
|
||||
$params['queueCreatedAt'] = date('Y-m-d H:i:s');
|
||||
UserActiveStatusQueue::dispatch($params)->onQueue(QueueUserActiveStatusStruct::QUEUE_NAME);
|
||||
}
|
||||
|
||||
function activeUserStatusQueueConsumer($params): void
|
||||
{
|
||||
$uid = $params['uid'];
|
||||
$oPostPushBoxModel = new PostPushBoxModel();
|
||||
$oPostPushBoxModel->pullBigFanMasterPostConsumer($uid);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user