'推文', self::TYPE_REPOST => '转发', self::TYPE_COMMENT => '评论', ]; const IS_LIKE_DEFAULT = 1; const IS_LIKE_YES = 2; const IS_LIKE_NO = 3; const IS_LIKE = [ self::IS_LIKE_DEFAULT => '默认', self::IS_LIKE_YES => '喜欢', self::IS_LIKE_NO => '不喜欢', ]; const IS_REPOST_DEFAULT = 1; const IS_REPOST_YES = 2; const IS_REPOST = [ self::IS_REPOST_DEFAULT => '默认', self::IS_REPOST_YES => '已转发', ]; const IS_BOOKMARK_DEFAULT = 1; const IS_BOOKMARK_YES = 2; const IS_BOOKMARK = [ self::IS_BOOKMARK_DEFAULT => '默认', self::IS_BOOKMARK_YES => '已收藏', ]; const IS_READ_NO = 1; const IS_READ_YES = 2; const IS_READ = [ self::IS_READ_NO => '默认', self::IS_READ_YES => '已收藏', ]; protected function postParams(): Attribute { return Attribute::make( get: fn(string $value = '') => Tools::JonsDecode($value), set: fn(array $value = []) => Tools::JonsEncode($value), ); } /** * 提交后调用事件,在消费队列跑推送 * @throws ModelException */ function addPostQueueConsumer(QueueAddPostBean $bean) { $id = $bean->getId(); $type = $bean->getType(); if (empty($id)) return false; if (empty($type)) return false; if (!in_array($type, [self::TYPE_POST, self::TYPE_REPOST, self::TYPE_COMMENT])) return false; if (empty($id)) return false; $oPostModel = new PostModel(); $oPost = null; $aPostParams = []; //判断推送类型 if ($type == self::TYPE_POST) { $postId = $id; } elseif ($type == self::TYPE_REPOST) { $postId = $id; $oRePostModel = $oPostModel->findItem($id); if (!$oRePostModel) return false; $aPostParams[PostParamsStruct::REPOST_ORG_USER_ID] = $oRePostModel->uid; $aPostParams[PostParamsStruct::REPOST_ORG_POST_ID] = $oRePostModel->mid; } elseif ($type == self::TYPE_COMMENT) { $oPostCommentModel = new PostCommentModel(); $oPostCommentModel = $oPostCommentModel->findItem($id); if (!$oPostCommentModel) return false; $postId = $oPostCommentModel->pid; $aPostParams[PostParamsStruct::COMMENT_ID] = $oPostCommentModel->id; $aPostParams[PostParamsStruct::COMMENT_USER_ID] = $oPostCommentModel->uid; $aPostParams[PostParamsStruct::COMMENT_POST_ID] = $oPostCommentModel->pid; $aPostParams[PostParamsStruct::COMMENT_CONTEXT] = $oPostCommentModel->content; $aPostParams[PostParamsStruct::COMMENT_CONTEXT_CREATE_TIME] = $oPostCommentModel->created_at; } else { throw new ModelException('type error'); } if ($postId) $oPost = $oPostModel->findItem($postId); if (!$oPost) throw new ModelException('post not found'); $iConfigFansPushLimit = intval(env('CONFIG_FANS_PUSH_LIMIT', 2000)); //查询粉丝数 $CustomerUserExtendModel = new CustomerUserExtendModel(); $CustomerUserExtend = $CustomerUserExtendModel->findItem($oPost->uid); if (!$CustomerUserExtend) throw new ModelException('user extend not found'); //粉丝数少于$iConfigFansPushLimit的用户走写扩散流程(所有粉丝信箱插入一条) $bSendMode = $CustomerUserExtend->fans_num < $iConfigFansPushLimit; $iTotalCount = $this->countSendFans($bSendMode, $oPost->uid); //计算发送总数 $aPost = $oPost->toArray(); if (empty($aPost)) return false; $aPost['post_params'] = $aPostParams; //分批发送 $oCollectOffsetLimit = new CollectOffsetLimit(); $oCollectOffsetLimit->setITotalCount($iTotalCount)->runWhile(function ($offset, $limit) use ($aPost, $bSendMode) { $oFollowList = $this->getFansListWithPage($bSendMode, $aPost['uid'], $offset, $limit); if(empty($oFollowList)) return; $this->sendPostToBox($aPost, $oFollowList); }); } function getFansListWithPage($bAllFans, $uid, $offset = 0, $limit = 2000): \Illuminate\Database\Eloquent\Collection|array|\Illuminate\Support\Collection { $oFollowModel = new FollowModel(); //粉丝数少于$iConfigFansPushLimit的用户走写扩散流程(所有粉丝信箱插入一条) if ($bAllFans) { $oFollowList = $oFollowModel->getFansList($uid, ['uid'], $offset, $limit); //获取所有粉丝列表 } else { //读扩散流程(只针对活跃粉丝信箱插入一条) //获取活跃粉丝列表 //@活跃粉丝数量过大需要分批处理 $oFollowList = $oFollowModel->getActiveFansUidList($uid, ['a.uid'], $offset, $limit); } return $oFollowList; } function countSendFans($bAllFans, $uid): int { $oFollowModel = new FollowModel(); if ($bAllFans) { $iCount = $oFollowModel->getFansCount($uid); //获取所有粉丝列表 } else { //读扩散流程(只针对活跃粉丝信箱插入一条) //获取活跃粉丝列表 $iCount = $oFollowModel->getActiveFansUidListCount($uid); } return $iCount; } //发送到推送信箱 function sendPostToBox($aPost, $oFollowList): void { if (!$oFollowList) return; foreach ($oFollowList as $oFollow) { $aItem['uid'] = $oFollow->uid; $aItem['pid'] = $aPost['id']; $aItem['puuid'] = $aPost['uuid']; $aItem['post_created_at'] = $aPost['created_at']; $this->addItemWithCreateTime($aItem); } } /** * 拉取推送信箱列表 * @param $uid * @param $last_id //上次最后一条id * @param $limit * @return \Illuminate\Database\Eloquent\Collection|array */ function getPushBoxList($uid, $last_id = 0, $limit = 20, $cols = ['*']): \Illuminate\Database\Eloquent\Collection|array { //活跃用户直接拉取未读消息 return $this->newQuery() ->where('uid', $uid) // ->where('id', '>', $last_id) ->where('is_read', self::IS_READ_NO) ->orderBy('created_at', 'desc') ->limit($limit) ->get($cols); //非活跃用户拉取大v消息,在用户状态更新时已经调用过,此处不用在做处理 } //非活跃拉取已跟随大v最新文章。 //放在用户状态更新时调用 function pullBigFanMasterPostConsumer($uid) { //获取文章和转发文章 //获取大v定义粉丝数,获取大于该粉丝数的大v文章 $iConfigFansPushLimit = intval(env('CONFIG_FANS_PUSH_LIMIT', 2000)); $oFollowModel = new FollowModel(); $oFollowList = $oFollowModel->getFollowListWithFansLimit($uid, $iConfigFansPushLimit, ['a.follow_uid']); if ($oFollowList->isEmpty()) return null; $aFollowList = $oFollowList->toArray(); $aFollowUid = array_column($aFollowList, 'follow_uid'); $oPostModel = new PostModel(); //分批发送到信箱 //@@大v评论拉取到自己信箱 $iTotalCount = $oPostModel->CountPostListByUids($aFollowUid); $oCollectOffsetLimit = new CollectOffsetLimit(); $oCollectOffsetLimit->setITotalCount($iTotalCount)->runWhile(function ($offset, $limit) use ($oPostModel,$aFollowUid,$uid) { $oPostList = $oPostModel->getPostListByUids($aFollowUid,null,['*'],$offset, $limit); if(empty($oPostList)) return; $this->sendPostToBoxByUid($oPostList->toArray(), $uid); }); } function sendPostToBoxByUid(array $oPostList, $uid): void { foreach ($oPostList as $oPost) { $aItem['type'] = $oPost->type; $aItem['uid'] = $uid; $aItem['pid'] = $oPost->id; $aItem['puuid'] = $oPost->uuid; $aItem['post_params'] = $oPost->post_params; $aItem['post_created_at'] = $oPost->created_at; $this->addItemWithCreateTime($aItem); } } }