'活跃', self::IS_ACTIVE_NO => '不活跃', ]; //增加用户扩展信息 function addExtend($aItem): \Illuminate\Database\Eloquent\Model|\Illuminate\Database\Eloquent\Builder|array|null { if (empty($aItem['uid'])) throw new ModelException('uid error'); if ($this->findItem($aItem['uid'], ['uid'])) return null; //已存在 $sDateTime = date('Y-m-d H:i:s'); $aItem['updated_at'] = $sDateTime; return $this->addItem($aItem); } //增加当前粉丝总数 function incrFansNum($uid): int { $oExtend = $this->findItem($uid, 'uid'); if (!$oExtend) throw new ModelException('user extend not found'); return $this->newQuery()->where('uid', $uid)->increment('fans_num'); } //减去当前追随者总数 function decrFansNum($uid): int { $oExtend = $this->findItem($uid, 'uid'); if (!$oExtend) throw new ModelException('user extend not found'); return $this->newQuery()->where('uid', $uid)->decrement('fans_num'); } //增加当前订阅总数 function incrFollowNum($uid): int { $oExtend = $this->findItem($uid, 'uid'); if (!$oExtend) throw new ModelException('user extend not found'); return $this->newQuery()->where('uid', $uid)->increment('follow_num'); } //减去当前订阅总数 function decrFollowNum($uid): int { $oExtend = $this->findItem($uid, 'uid'); if (!$oExtend) throw new ModelException('user extend not found'); return $this->newQuery()->where('uid', $uid)->decrement('follow_num'); } //获取用户活跃信息 function getUserActiveListLimit($nowUid, $limit = 500): \Illuminate\Database\Eloquent\Collection|array { return $this->newQuery() ->where('id', '>=', $nowUid) ->limit($limit) ->orderBy('id') ->get(['uid', 'is_active']); } //检测所有用户活跃状态 function updateAllUserActiveStatus($date = null): void { if (empty($date)) $date = Carbon::yesterday()->toDateString(); $oCustomerWsHistoryModel = new CustomerWsHistoryModel(); $aActiveUserIdList = $oCustomerWsHistoryModel->getActiveUserIdList($date); //三日内活跃用户 if (empty($aActiveUserIdList)) return; $oCustomerChangeInfoLogModel = new CustomerChangeInfoLogModel(); $nowUid = 0; while (true) { try { Db::beginTransaction(); $aUserExtendList = $this->getUserActiveListLimit($nowUid, 500); if (empty($aUserExtendList)) break; $nowUid = max($aUserExtendList->pluck('uid')->toArray()); foreach ($aUserExtendList as $oUserExtend) { if (in_array($oUserExtend->uid, $aActiveUserIdList)) { //在活跃列表中 if ($oUserExtend->is_active == self::IS_ACTIVE_YES) continue; //已经是活跃用户 //变更用户状态 $res = $this->newQuery()->where('uid', $oUserExtend->uid)->update(['is_active' => self::IS_ACTIVE_YES]); if ($res) { //记录日志 $oCustomerChangeInfoLogModel->addUserActiveStatusLog($oUserExtend->uid, $oUserExtend->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES); $this->activeUserStatusToQueueProducer($oUserExtend->uid); //投递到消息队列 } } else { //三日内不活跃 if ($oUserExtend->is_active == self::IS_ACTIVE_NO) continue; //已经是不活跃用户 //变更用户状态 $res = $this->newQuery()->where('uid', $oUserExtend->uid)->update(['is_active' => self::IS_ACTIVE_NO]); if ($res) { //记录日志 $oCustomerChangeInfoLogModel->addUserActiveStatusLog($oUserExtend->uid, $oUserExtend->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_NO); } } } Db::commit(); } catch (\Exception $e) { Log::error('updateUserActiveStatus error:' . $e->getMessage()); Db::rollBack(); } } } //登录检测用户活跃状态 function updateUserActiveStatus($uid): void { try{ Db::beginTransaction(); $oCustomerWsHistoryModel = new CustomerWsHistoryModel(); $aActiveUserId = $oCustomerWsHistoryModel->findActiveUserId($uid, Carbon::yesterday()->toDateString()); //三日内活跃用户 if(empty($aActiveUserId)) return; $oCustomerUserExtendModel = $this->newQuery()->where('uid', $uid)->first(['uid','is_active']); if(!$oCustomerUserExtendModel) return; $oCustomerChangeInfoLogModel = new CustomerChangeInfoLogModel(); $res = $this->newQuery()->where('uid', $uid)->update(['is_active' => self::IS_ACTIVE_YES]); if ($res) { //记录日志 $oCustomerChangeInfoLogModel->addUserActiveStatusLog($uid, $oCustomerUserExtendModel->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES); $this->activeUserStatusToQueueProducer($uid); //投递到消息队列 } Db::commit(); }catch (\Exception $e){ Log::error('updateUserActiveStatus error:' . $e->getMessage()); Db::rollBack(); } } //投递到消息队列,处理活跃推送信箱更新问题 function activeUserStatusToQueueProducer($uid): void { $params = QueueUserActiveStatusStruct::PARAMS; $params['uid'] = $uid; $params['queueCreatedAt'] = date('Y-m-d H:i:s'); UserActiveStatusQueue::dispatch($params)->onQueue(QueueUserActiveStatusStruct::QUEUE_NAME); } function activeUserStatusQueueConsumer($params): void { $uid = $params['uid']; $oPostPushBoxModel = new PostPushBoxModel(); $oPostPushBoxModel->pullBigFanMasterPostConsumer($uid); } }