From 66d88ea2b31305022abdb2f7990de53d3fe98ade Mon Sep 17 00:00:00 2001 From: cano Date: Sun, 3 Mar 2024 09:39:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E6=8B=89=E5=87=BD=E6=95=B0=E4=B8=8Equ?= =?UTF-8?q?eue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Commands/DailyCheckUserActiveStatus.php | 2 +- app/Http/Controllers/Post/PostController.php | 48 ++++++ app/Jobs/AddPostQueue.php | 35 +++++ app/Jobs/UserActiveStatusQueue.php | 33 ++++ app/Models/Base/BaseModel.php | 8 + app/Models/Comment/PostCommentModel.php | 23 +++ .../Customer/CustomerChangeInfoLogModel.php | 16 ++ .../Customer/CustomerUserExtendModel.php | 143 +++++++++++------- app/Models/Post/PostModel.php | 99 ++++++++---- app/Models/Post/PostPushBoxModel.php | 114 +++++++++++--- app/Models/Post/Structs/PostParamsStruct.php | 16 ++ .../WebSocket/CustomerWsHistoryModel.php | 13 ++ app/Structs/QueueAddPostStruct.php | 12 ++ app/Structs/QueueUserActiveStatusStruct.php | 12 ++ app/Tools/Tools.php | 10 ++ 15 files changed, 476 insertions(+), 108 deletions(-) create mode 100644 app/Jobs/AddPostQueue.php create mode 100644 app/Jobs/UserActiveStatusQueue.php create mode 100644 app/Models/Comment/PostCommentModel.php create mode 100644 app/Models/Post/Structs/PostParamsStruct.php create mode 100644 app/Structs/QueueAddPostStruct.php create mode 100644 app/Structs/QueueUserActiveStatusStruct.php diff --git a/app/Console/Commands/DailyCheckUserActiveStatus.php b/app/Console/Commands/DailyCheckUserActiveStatus.php index 497552a..9f2331a 100644 --- a/app/Console/Commands/DailyCheckUserActiveStatus.php +++ b/app/Console/Commands/DailyCheckUserActiveStatus.php @@ -29,7 +29,7 @@ class DailyCheckUserActiveStatus extends Command $this->info('每日更新用户活跃状态'); $this->info('开始...'); $oCustomerUserExtendModelExtend = new CustomerUserExtendModel(); - $oCustomerUserExtendModelExtend->updateUserActiveStatus(); + $oCustomerUserExtendModelExtend->updateAllUserActiveStatus(); $this->info('结束...'); } } diff --git a/app/Http/Controllers/Post/PostController.php b/app/Http/Controllers/Post/PostController.php index 73f1dbd..42a71e7 100644 --- a/app/Http/Controllers/Post/PostController.php +++ b/app/Http/Controllers/Post/PostController.php @@ -4,8 +4,56 @@ namespace App\Http\Controllers\Post; use App\Http\Controllers\Base\BaseController; +use App\Models\Post\PostModel; +use App\Models\Post\PostPushBoxModel; +use App\Service\AuthService; class PostController extends BaseController { + //$uid, $content = null, $media = null,$mid = null,$sBachSn = null + public array $validateMethodParams = [ + 'addPost' => [ + 'type' => 'required|numeric|max:2', + 'content' => 'required|numeric|max:255', + 'media' => 'json', + ], + 'pullPostList' => [ + 'lastId' => 'numeric|max:10', + 'limit' => 'numeric|max:5', + ], + ]; + public function addPost(): \Illuminate\Http\JsonResponse + { + $aParams = request()->only([ + 'type', + 'content', + 'media', + ]); + if (!in_array($aParams['type'], [PostModel::TYPE_POST, PostModel::TYPE_REPOST])) return $this->error('type params error'); + + $oAuthService = new AuthService(); + $aUser = $oAuthService->getCurrentUser(); + $oFollowModel = new PostModel(); + $aParams['uid'] = $aUser['id']; + + $oFollowModel->addBatchPost([$aParams]); + + return $this->success(); + } + + function pullPostList() + { + $aParams = request()->only([ + 'lastId', + 'limit', + ]); + $oAuthService = new AuthService(); + $aUser = $oAuthService->getCurrentUser(); + $oPostModel = new PostPushBoxModel(); + $aPostList = $oPostModel->getPushBoxList($aUser['id'],$aParams['lastId']??0,$aParams['limit']??15); + return $this->success($aPostList); + + } + } diff --git a/app/Jobs/AddPostQueue.php b/app/Jobs/AddPostQueue.php new file mode 100644 index 0000000..a202883 --- /dev/null +++ b/app/Jobs/AddPostQueue.php @@ -0,0 +1,35 @@ +addPostQueueConsumer($params); + } +} diff --git a/app/Jobs/UserActiveStatusQueue.php b/app/Jobs/UserActiveStatusQueue.php new file mode 100644 index 0000000..7136e12 --- /dev/null +++ b/app/Jobs/UserActiveStatusQueue.php @@ -0,0 +1,33 @@ +activeUserStatusQueueConsumer($params); + } +} diff --git a/app/Models/Base/BaseModel.php b/app/Models/Base/BaseModel.php index 0ca17d3..b6dc395 100644 --- a/app/Models/Base/BaseModel.php +++ b/app/Models/Base/BaseModel.php @@ -27,6 +27,14 @@ class BaseModel extends Model return $this->newQuery()->create($aItem); } + function addItemWithCreateTime($aItem,$col = 'created_at'): Model|\Illuminate\Database\Eloquent\Builder|bool + { + $aItem = $this->checkColInFill($aItem); + if (empty($aItem)) return false; + $aItem[$col] = date('Y-m-d H:i:s'); + return $this->newQuery()->create($aItem); + } + function delItem($id) { return $this->newQuery()->where($this->primaryKey, $id)->delete(); diff --git a/app/Models/Comment/PostCommentModel.php b/app/Models/Comment/PostCommentModel.php new file mode 100644 index 0000000..e934138 --- /dev/null +++ b/app/Models/Comment/PostCommentModel.php @@ -0,0 +1,23 @@ +addItem($aItem); } + function addUserActiveStatusLog($uid,$beforeValue,$value,$remark_key): \Illuminate\Database\Eloquent\Model|\Illuminate\Database\Eloquent\Builder|array|null + { + $aLogInsert = [ + 'type' => CustomerChangeInfoLogModel::TYPE_CHANG_USER_ACTIVE_STATUS, + 'uid' => $uid, + 'column' => CustomerUserExtendModel::COL_IS_ACTIVE, + 'before_value' => $beforeValue, + 'pid' => CustomerChangeInfoLogModel::PID_SYSTEM, + 'value' => $value, + 'after_value' => $value, + 'remark_key' => $remark_key, + 'remark_desc' => CustomerChangeInfoLogModel::REMARK[$remark_key], + ]; + return $this->addLog($aLogInsert); + } + } diff --git a/app/Models/Customer/CustomerUserExtendModel.php b/app/Models/Customer/CustomerUserExtendModel.php index 3f7f89b..efe02d3 100644 --- a/app/Models/Customer/CustomerUserExtendModel.php +++ b/app/Models/Customer/CustomerUserExtendModel.php @@ -2,9 +2,13 @@ namespace App\Models\Customer; +use App\Const\Queue; use App\Exceptions\ModelException; +use App\Jobs\UserActiveStatusQueue; use App\Models\Base\CustomerBaseModel; +use App\Models\Post\PostPushBoxModel; use App\Models\WebSocket\CustomerWsHistoryModel; +use App\Structs\QueueUserActiveStatusStruct; use Carbon\Carbon; use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Log; @@ -36,8 +40,8 @@ class CustomerUserExtendModel extends CustomerBaseModel //增加用户扩展信息 function addExtend($aItem): \Illuminate\Database\Eloquent\Model|\Illuminate\Database\Eloquent\Builder|array|null { - if(empty($aItem['uid'])) throw new ModelException('uid error'); - if($this->findItem($aItem['uid'],['uid'])) return null; //已存在 + if (empty($aItem['uid'])) throw new ModelException('uid error'); + if ($this->findItem($aItem['uid'], ['uid'])) return null; //已存在 $sDateTime = date('Y-m-d H:i:s'); $aItem['updated_at'] = $sDateTime; return $this->addItem($aItem); @@ -46,33 +50,33 @@ class CustomerUserExtendModel extends CustomerBaseModel //增加当前粉丝总数 function incrFansNum($uid): int { - $oExtend = $this->findItem($uid,'uid'); - if(!$oExtend) throw new ModelException('user extend not found'); - return $this->newQuery()->where('uid',$uid)->increment('fans_num'); + $oExtend = $this->findItem($uid, 'uid'); + if (!$oExtend) throw new ModelException('user extend not found'); + return $this->newQuery()->where('uid', $uid)->increment('fans_num'); } //减去当前追随者总数 function decrFansNum($uid): int { - $oExtend = $this->findItem($uid,'uid'); - if(!$oExtend) throw new ModelException('user extend not found'); - return $this->newQuery()->where('uid',$uid)->decrement('fans_num'); + $oExtend = $this->findItem($uid, 'uid'); + if (!$oExtend) throw new ModelException('user extend not found'); + return $this->newQuery()->where('uid', $uid)->decrement('fans_num'); } //增加当前订阅总数 function incrFollowNum($uid): int { - $oExtend = $this->findItem($uid,'uid'); - if(!$oExtend) throw new ModelException('user extend not found'); - return $this->newQuery()->where('uid',$uid)->increment('follow_num'); + $oExtend = $this->findItem($uid, 'uid'); + if (!$oExtend) throw new ModelException('user extend not found'); + return $this->newQuery()->where('uid', $uid)->increment('follow_num'); } //减去当前订阅总数 function decrFollowNum($uid): int { - $oExtend = $this->findItem($uid,'uid'); - if(!$oExtend) throw new ModelException('user extend not found'); - return $this->newQuery()->where('uid',$uid)->decrement('follow_num'); + $oExtend = $this->findItem($uid, 'uid'); + if (!$oExtend) throw new ModelException('user extend not found'); + return $this->newQuery()->where('uid', $uid)->decrement('follow_num'); } //获取用户活跃信息 @@ -86,69 +90,92 @@ class CustomerUserExtendModel extends CustomerBaseModel } //检测所有用户活跃状态 - function updateUserActiveStatus($date = null): void + function updateAllUserActiveStatus($date = null): void { - if(empty($date)) $date = Carbon::yesterday()->toDateString(); + if (empty($date)) $date = Carbon::yesterday()->toDateString(); $oCustomerWsHistoryModel = new CustomerWsHistoryModel(); $aActiveUserIdList = $oCustomerWsHistoryModel->getActiveUserIdList($date); //三日内活跃用户 - if(empty($aActiveUserIdList)) return; + if (empty($aActiveUserIdList)) return; $oCustomerChangeInfoLogModel = new CustomerChangeInfoLogModel(); $nowUid = 0; - while (true){ - try{ + while (true) { + try { Db::beginTransaction(); - $aUserList = $this->getUserActiveListLimit($nowUid, 500); - if(empty($aUserList)) break; - $nowUid = max($aUserList->pluck('uid')->toArray()); + $aUserExtendList = $this->getUserActiveListLimit($nowUid, 500); + if (empty($aUserExtendList)) break; + $nowUid = max($aUserExtendList->pluck('uid')->toArray()); - foreach ($aUserList as $oUser){ - $isChanged = false; - $aLogInsert = [ - 'type' => CustomerChangeInfoLogModel::TYPE_CHANG_USER_ACTIVE_STATUS, - 'uid' => $oUser->uid, - 'column' => self::COL_IS_ACTIVE, - 'before_value' => $oUser->is_active, - 'pid' => CustomerChangeInfoLogModel::PID_SYSTEM, - ]; - - if(in_array($oUser->uid, $aActiveUserIdList)){ //在活跃列表中 - if($oUser->is_active == self::IS_ACTIVE_YES) continue; //已经是活跃用户 + foreach ($aUserExtendList as $oUserExtend) { + if (in_array($oUserExtend->uid, $aActiveUserIdList)) { //在活跃列表中 + if ($oUserExtend->is_active == self::IS_ACTIVE_YES) continue; //已经是活跃用户 //变更用户状态 - $res = $this->newQuery()->where('uid',$oUser->uid)->update(['is_active'=>self::IS_ACTIVE_YES]); - if($res) { - $isChanged = true; - $aLogInsert['value'] = self::IS_ACTIVE_YES; - $aLogInsert['after_value'] = self::IS_ACTIVE_YES; - $aLogInsert['remark_key'] = CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES; - $aLogInsert['remark_desc'] = CustomerChangeInfoLogModel::REMARK[CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES]; + $res = $this->newQuery()->where('uid', $oUserExtend->uid)->update(['is_active' => self::IS_ACTIVE_YES]); + if ($res) { + //记录日志 + $oCustomerChangeInfoLogModel->addUserActiveStatusLog($oUserExtend->uid, $oUserExtend->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES); + $this->activeUserStatusToQueueProducer($oUserExtend->uid); //投递到消息队列 } - }else{ //三日内不活跃 - if($oUser->is_active == self::IS_ACTIVE_NO) continue; //已经是不活跃用户 + } else { //三日内不活跃 + if ($oUserExtend->is_active == self::IS_ACTIVE_NO) continue; //已经是不活跃用户 //变更用户状态 - $res = $this->newQuery()->where('uid',$oUser->uid)->update(['is_active'=>self::IS_ACTIVE_NO]); - if($res) { - $isChanged = true; - $aLogInsert['value'] = self::IS_ACTIVE_NO; - $aLogInsert['after_value'] = self::IS_ACTIVE_NO; - $aLogInsert['remark_key'] = CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_NO; - $aLogInsert['remark_desc'] = CustomerChangeInfoLogModel::REMARK[CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_NO]; + $res = $this->newQuery()->where('uid', $oUserExtend->uid)->update(['is_active' => self::IS_ACTIVE_NO]); + if ($res) { + //记录日志 + $oCustomerChangeInfoLogModel->addUserActiveStatusLog($oUserExtend->uid, $oUserExtend->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_NO); } } - - if($isChanged){ //有变化插入日志 - $oCustomerChangeInfoLogModel->addLog($aLogInsert); - } - } Db::commit(); - }catch (\Exception $e){ - Log::error('updateUserActiveStatus error:'.$e->getMessage()); + } catch (\Exception $e) { + Log::error('updateUserActiveStatus error:' . $e->getMessage()); Db::rollBack(); } - } } + //登录检测用户活跃状态 + function updateUserActiveStatus($uid): void + { + try{ + Db::beginTransaction(); + $oCustomerWsHistoryModel = new CustomerWsHistoryModel(); + $aActiveUserId = $oCustomerWsHistoryModel->findActiveUserId($uid, Carbon::yesterday()->toDateString()); //三日内活跃用户 + if(empty($aActiveUserId)) return; + $oCustomerUserExtendModel = $this->newQuery()->where('uid', $uid)->first(['uid','is_active']); + if(!$oCustomerUserExtendModel) return; + $oCustomerChangeInfoLogModel = new CustomerChangeInfoLogModel(); + + $res = $this->newQuery()->where('uid', $uid)->update(['is_active' => self::IS_ACTIVE_YES]); + if ($res) { + //记录日志 + $oCustomerChangeInfoLogModel->addUserActiveStatusLog($uid, $oCustomerUserExtendModel->is_active, self::IS_ACTIVE_YES, CustomerChangeInfoLogModel::REMARK_DAILY_CHECK_USER_ACTIVE_STATUS_YES); + $this->activeUserStatusToQueueProducer($uid); //投递到消息队列 + } + Db::commit(); + }catch (\Exception $e){ + Log::error('updateUserActiveStatus error:' . $e->getMessage()); + Db::rollBack(); + } + + } + + //投递到消息队列,处理活跃推送信箱更新问题 + function activeUserStatusToQueueProducer($uid): void + { + $params = QueueUserActiveStatusStruct::PARAMS; + $params['uid'] = $uid; + $params['queueCreatedAt'] = date('Y-m-d H:i:s'); + UserActiveStatusQueue::dispatch($params)->onQueue(QueueUserActiveStatusStruct::QUEUE_NAME); + } + + function activeUserStatusQueueConsumer($params): void + { + $uid = $params['uid']; + $oPostPushBoxModel = new PostPushBoxModel(); + $oPostPushBoxModel->pullBigFanMasterPostConsumer($uid); + } + + } diff --git a/app/Models/Post/PostModel.php b/app/Models/Post/PostModel.php index db8f41d..3fade4e 100644 --- a/app/Models/Post/PostModel.php +++ b/app/Models/Post/PostModel.php @@ -1,9 +1,11 @@ '推文', + self::TYPE_REPOST => '转发', + ]; + + protected function media(): Attribute + { + return Attribute::make( + get: fn(string $value = '') => Tools::JonsDecode($value), + set: fn(array $value = []) => Tools::JonsEncode($value), + ); + } + /** * @throws ModelException */ - function addPost($uid, $content = null, $media = null): \Illuminate\Database\Eloquent\Model|\Illuminate\Database\Eloquent\Builder|array|null + function addPost($uid, $type = self::TYPE_POST, $content = null, $media = null, $mid = null, $sBachSn = null): \Illuminate\Database\Eloquent\Model|\Illuminate\Database\Eloquent\Builder|array|null { - if(!$content && !$media) throw new ModelException('addPost params error'); + if (!in_array($type, [self::TYPE_POST, self::TYPE_REPOST])) throw new ModelException('type params error'); + $uuid = Tools::genUuid(); + $bsn = 'BSN_' . ($sBachSn ?? Tools::genUuid()); + if (!$content && !$media) throw new ModelException('addPost params error'); + $aItem['type'] = $type; $aItem['uid'] = $uid; - $aItem['uuid'] = Tools::genUuid(); + $aItem['uuid'] = $uuid; + $aItem['post_batch_sn'] = $bsn; + $aItem['mid'] = $mid; $aItem['media'] = $media; $aItem['content'] = $content; $sDateTime = date('Y-m-d H:i:s'); $aItem['created_at'] = $sDateTime; $res = $this->addItem($aItem); - if($res){ - + if ($res) { + $this->pushToQueue(self::TYPE_POST, $res->id); } return $res; } + function addBatchPost(array $aPostList): void + { + $mid = null; + $bsn = 'BSN_' . ($sBachSn ?? Tools::genUuid()); + foreach ($aPostList as $aPostItem) { + $oPost = $this->addPost($aPostItem['uid'], $aPostItem['type'], $aPostItem['content'], $aPostItem['media'], $mid, $bsn); + if ($oPost) { + $mid = $oPost->id; + } + } + } + + //发送到消息队列处理新增post + function pushToQueue($type, $id): void + { + PostPushBoxModel::addPostQueueProducer(['type' => $type, 'id' => $id]); + } + /** * @throws ModelException */ @@ -50,9 +94,9 @@ class PostModel extends BaseModel { $oPost = $this->findItem($id); $res = $this->delItem($id); - if($res){ + if ($res) { $oPostHistoryModel = new PostHistoryModel(); - $oPostHistoryModel->addPostHistory(PostHistoryModel::METHOD_DEL,$oPost->toArray()); + $oPostHistoryModel->addPostHistory(PostHistoryModel::METHOD_DEL, $oPost->toArray()); } return $res; } @@ -62,18 +106,18 @@ class PostModel extends BaseModel */ function delPostByUuid($uuid) { - $oPost = $this->findItemByWhere(['uuid'=>$uuid]); + $oPost = $this->findItemByWhere(['uuid' => $uuid]); $res = $this->newQuery()->where('uuid', $uuid)->delete(); - if($res){ + if ($res) { $oPostHistoryModel = new PostHistoryModel(); - $oPostHistoryModel->addPostHistory(PostHistoryModel::METHOD_DEL,$oPost->toArray()); + $oPostHistoryModel->addPostHistory(PostHistoryModel::METHOD_DEL, $oPost->toArray()); } return $res; } function getPostListByUid($uid): \Illuminate\Database\Eloquent\Collection|array { - return $this->getItemsByWhere(['uid'=>$uid]); + return $this->getItemsByWhere(['uid' => $uid]); } function getPostById($id): \Illuminate\Database\Eloquent\Model|\Illuminate\Database\Eloquent\Collection|\Illuminate\Database\Eloquent\Builder|array|null @@ -86,13 +130,13 @@ class PostModel extends BaseModel */ function updatePostById($aItem): bool|int { - if(!isset($aItem['id'])) throw new ModelException('updatePostById params error'); - if(empty($aItem['id'])) throw new ModelException('updatePostById params error'); + if (!isset($aItem['id'])) throw new ModelException('updatePostById params error'); + if (empty($aItem['id'])) throw new ModelException('updatePostById params error'); $oPost = $this->findItem($aItem['id']); $res = $this->updateItem($aItem); - if($res){ + if ($res) { $oPostHistoryModel = new PostHistoryModel(); - $oPostHistoryModel->addPostHistory(PostHistoryModel::METHOD_EDIT,$oPost->toArray()); + $oPostHistoryModel->addPostHistory(PostHistoryModel::METHOD_EDIT, $oPost->toArray()); } return $res; } @@ -102,31 +146,32 @@ class PostModel extends BaseModel */ function updatePostByUuid($aItem): bool|int { - if(!isset($aItem['uuid'])) throw new ModelException('updatePostByUuid params error'); - if(empty($aItem['uuid'])) throw new ModelException('updatePostByUuid params error'); - $oPost = $this->findItemByWhere(['uuid'=>$aItem['uuid']]); - $res = $this->updateItem($aItem,'uuid'); - if($res){ + if (!isset($aItem['uuid'])) throw new ModelException('updatePostByUuid params error'); + if (empty($aItem['uuid'])) throw new ModelException('updatePostByUuid params error'); + $oPost = $this->findItemByWhere(['uuid' => $aItem['uuid']]); + $res = $this->updateItem($aItem, 'uuid'); + if ($res) { $oPostHistoryModel = new PostHistoryModel(); - $oPostHistoryModel->addPostHistory(PostHistoryModel::METHOD_EDIT,$oPost->toArray()); + $oPostHistoryModel->addPostHistory(PostHistoryModel::METHOD_EDIT, $oPost->toArray()); } return $res; } function getPostListByMid($mid): \Illuminate\Database\Eloquent\Collection|array { - return $this->getItemsByWhere(['mid'=>$mid]); + return $this->getItemsByWhere(['mid' => $mid]); } - function getPostListByUidMid($uid,$mid): \Illuminate\Database\Eloquent\Collection|array + function getPostListByUidMid($uid, $mid): \Illuminate\Database\Eloquent\Collection|array { - return $this->getItemsByWhere(['uid'=>$uid,'mid'=>$mid]); + return $this->getItemsByWhere(['uid' => $uid, 'mid' => $mid]); } - function getPostListByUids($uids,$sDateLimit = null): \Illuminate\Database\Eloquent\Collection|array + + function getPostListByUids($uids, $sDateLimit = null): \Illuminate\Database\Eloquent\Collection|array { - if($sDateLimit == null) $sDateLimit = date('Y-m-d H:i:s',strtotime('-3 day')); - return $this->newQuery()->where('created_at',$sDateLimit)->whereIn('uid',$uids)->get(); + if ($sDateLimit == null) $sDateLimit = date('Y-m-d H:i:s', strtotime('-3 day')); + return $this->newQuery()->where('created_at', $sDateLimit)->whereIn('uid', $uids)->get(); } diff --git a/app/Models/Post/PostPushBoxModel.php b/app/Models/Post/PostPushBoxModel.php index 064b180..eb33ea1 100644 --- a/app/Models/Post/PostPushBoxModel.php +++ b/app/Models/Post/PostPushBoxModel.php @@ -3,10 +3,16 @@ namespace App\Models\Post; use App\Exceptions\ModelException; +use App\Jobs\AddPostQueue; use App\Models\Base\BaseModel; +use App\Models\Comment\PostCommentModel; use App\Models\Customer\CustomerUserExtendModel; use App\Models\Follow\FollowModel; +use App\Models\Post\Structs\PostParamsStruct; +use App\Structs\QueueAddPostStruct; use App\Tools\CollectOffsetLimit; +use App\Tools\Tools; +use Illuminate\Database\Eloquent\Casts\Attribute; class PostPushBoxModel extends BaseModel { @@ -15,18 +21,29 @@ class PostPushBoxModel extends BaseModel protected $primaryKey = 'id'; protected $fillable = [ 'id', + 'type', 'uid', 'pid', 'puuid', + 'post_params', 'is_like', 'is_repost', 'is_bookmark', 'is_read', + 'post_created_at', 'created_at', - 'created_box_at', 'deleted_at', ]; + const TYPE_POST = 1; + const TYPE_REPOST = 2; + const TYPE_COMMENT = 3; + const TYPE = [ + self::TYPE_POST => '推文', + self::TYPE_REPOST => '转发', + self::TYPE_COMMENT => '评论', + ]; + const IS_LIKE_DEFAULT = 1; const IS_LIKE_YES = 2; const IS_LIKE_NO = 3; @@ -50,24 +67,69 @@ class PostPushBoxModel extends BaseModel self::IS_BOOKMARK_YES => '已收藏', ]; - const IS_READ_DEFAULT = 1; + const IS_READ_NO = 1; const IS_READ_YES = 2; const IS_READ = [ - self::IS_READ_DEFAULT => '默认', + self::IS_READ_NO => '默认', self::IS_READ_YES => '已收藏', ]; + protected function postParams(): Attribute + { + return Attribute::make( + get: fn (string $value = '') => Tools::JonsDecode($value), + set: fn (array $value = []) => Tools::JonsEncode($value), + ); + } + + public static function addPostQueueProducer(array $params): void + { + AddPostQueue::dispatch($params)->onQueue(QueueAddPostStruct::QUEUE_NAME); + } /** - * 提交后调用事件 + * 提交后调用事件,在消费队列跑推送 * @throws ModelException */ - function newPostPushTask($id = null, $uuid = null) + function addPostQueueConsumer(array $params) { + if(empty($params)) return false; + if(isset($params['id'])) return false; + if(isset($params['type'])) return false; + $id = $params['id']; + $type = $params['type']; + + if(!in_array($type,[self::TYPE_POST,self::TYPE_REPOST,self::TYPE_COMMENT])) return false; + if(empty($id)) return false; + $oPostModel = new PostModel(); $oPost = null; - if ($id) $oPost = $oPostModel->findItem($id); - if ($uuid) $oPost = $oPostModel->findItemByWhere(['uuid' => $uuid]); + $aPostParams = []; + + //判断推送类型 + if ($type == self::TYPE_POST) { + $postId = $id; + } elseif ($type == self::TYPE_REPOST) { + $postId = $id; + $oRePostModel = $oPostModel->findItem($id); + if(!$oRePostModel) return false; + $aPostParams[PostParamsStruct::REPOST_ORG_USER_ID] = $oRePostModel->uid; + $aPostParams[PostParamsStruct::REPOST_ORG_POST_ID] = $oRePostModel->mid; + } elseif ($type == self::TYPE_COMMENT) { + $oPostCommentModel = new PostCommentModel(); + $oPostCommentModel = $oPostCommentModel->findItem($id); + if(!$oPostCommentModel) return false; + $postId = $oPostCommentModel->pid; + $aPostParams[PostParamsStruct::COMMENT_ID] = $oPostCommentModel->id; + $aPostParams[PostParamsStruct::COMMENT_USER_ID] = $oPostCommentModel->uid; + $aPostParams[PostParamsStruct::COMMENT_POST_ID] = $oPostCommentModel->pid; + $aPostParams[PostParamsStruct::COMMENT_CONTEXT] = $oPostCommentModel->content; + $aPostParams[PostParamsStruct::COMMENT_CONTEXT_CREATE_TIME] = $oPostCommentModel->created_at; + } else { + throw new ModelException('type error'); + } + + if ($postId) $oPost = $oPostModel->findItem($postId); if (!$oPost) throw new ModelException('post not found'); $iConfigFansPushLimit = intval(env('CONFIG_FANS_PUSH_LIMIT', 2000)); @@ -80,11 +142,14 @@ class PostPushBoxModel extends BaseModel $bSendMode = $CustomerUserExtend->fans_num < $iConfigFansPushLimit; $iTotalCount = $this->countSendFans($bSendMode, $oPost->uid); //计算发送总数 + $aPost = $oPost->toArray(); + if(empty($aPost)) return false; + $aPost['post_params'] = $aPostParams; //分批发送 $oCollectOffsetLimit = new CollectOffsetLimit(); - $oCollectOffsetLimit->setITotalCount($iTotalCount)->runWhile(function ($offset, $limit) use ($oPost, $bSendMode) { - $oFollowList = $this->getFansListWithPage($bSendMode, $oPost->uid, $offset, $limit); - $this->sendPostToBox($oPost, $oFollowList); + $oCollectOffsetLimit->setITotalCount($iTotalCount)->runWhile(function ($offset, $limit) use ($aPost, $bSendMode) { + $oFollowList = $this->getFansListWithPage($bSendMode, $aPost['uid'], $offset, $limit); + $this->sendPostToBox($aPost, $oFollowList); }); } @@ -116,17 +181,15 @@ class PostPushBoxModel extends BaseModel } //发送到推送信箱 - function sendPostToBox($oPost, $oFollowList): void + function sendPostToBox($aPost, $oFollowList): void { if (!$oFollowList) return; - $date = date('Y-m-d H:i:s'); foreach ($oFollowList as $oFollow) { $aItem['uid'] = $oFollow->uid; - $aItem['pid'] = $oPost->id; - $aItem['puuid'] = $oPost->uuid; - $aItem['created_at'] = $oPost->created_at; - $aItem['created_box_at'] = $date; - $this->addItem($aItem); + $aItem['pid'] = $aPost['id']; + $aItem['puuid'] = $aPost['uuid']; + $aItem['post_created_at'] = $aPost['created_at']; + $this->addItemWithCreateTime($aItem); } } @@ -138,20 +201,27 @@ class PostPushBoxModel extends BaseModel * @param $limit * @return \Illuminate\Database\Eloquent\Collection|array */ - function getPushBoxList($uid, $last_id = 0, $limit = 20): \Illuminate\Database\Eloquent\Collection|array + function getPushBoxList($uid, $last_id = 0, $limit = 20,$cols = ['*']): \Illuminate\Database\Eloquent\Collection|array { //活跃用户直接拉取未读消息 - return $this->newQuery()->where('uid', $uid)->where('id', '>', $last_id)->where('is_read', self::IS_READ_DEFAULT)->orderBy('created_at', 'desc')->limit($limit)->get(); + return $this->newQuery() + ->where('uid', $uid) +// ->where('id', '>', $last_id) + ->where('is_read', self::IS_READ_NO) + ->orderBy('created_at', 'desc') + ->limit($limit) + ->get($cols); //非活跃用户拉取大v消息,在用户状态更新时已经调用过,此处不用在做处理 } //非活跃拉取已跟随大v最新文章。 //放在用户状态更新时调用 - function pullBigFanMasterPost($uid) + function pullBigFanMasterPostConsumer($uid) { + //@@获取文章和转发文章和评论文章 //获取大v定义粉丝数,获取大于该粉丝数的大v文章 $iConfigFansPushLimit = intval(env('CONFIG_FANS_PUSH_LIMIT', 2000)); - //@此处需针对大量数据进行分批获取 + //@@此处需针对大量数据进行分批获取 $oFollowModel = new FollowModel(); $oFollowList = $oFollowModel->getFollowListWithFansLimit($uid,$iConfigFansPushLimit,['a.follow_uid']); if ($oFollowList->isEmpty()) return null; @@ -161,7 +231,7 @@ class PostPushBoxModel extends BaseModel $oPostList = $oPostModel->getPostListByUids($aFollowUid); //将推文发送到信箱 - $this->sendPostToBoxByUid($oPostList,$uid); + self::sendPostToBoxByUid($oPostList,$uid); } diff --git a/app/Models/Post/Structs/PostParamsStruct.php b/app/Models/Post/Structs/PostParamsStruct.php new file mode 100644 index 0000000..9c89703 --- /dev/null +++ b/app/Models/Post/Structs/PostParamsStruct.php @@ -0,0 +1,16 @@ +toArray(); } + //获取判断用户是否活跃 + //@需要加入缓存-缓存时间为3天 + function findActiveUserId($uid, $date, $days = 3): array + { + return $this->newQuery() + ->where('uid', $uid) + ->where('event', self::EVENT_ON_CONNECT) + ->where('status', self::STATUS_SUCCESS) + ->whereBetween(DB::raw("DATE_FORMAT(created_at,'%Y-%m-%d')"), [Carbon::parse($date)->subDays($days - 1)->toDateString(), Carbon::parse($date)->toDateString()]) + ->first(['uid']) + ->toArray(); + } + } diff --git a/app/Structs/QueueAddPostStruct.php b/app/Structs/QueueAddPostStruct.php new file mode 100644 index 0000000..6fa9aa2 --- /dev/null +++ b/app/Structs/QueueAddPostStruct.php @@ -0,0 +1,12 @@ +'', + 'id'=>'', + ]; + +} diff --git a/app/Structs/QueueUserActiveStatusStruct.php b/app/Structs/QueueUserActiveStatusStruct.php new file mode 100644 index 0000000..00da62d --- /dev/null +++ b/app/Structs/QueueUserActiveStatusStruct.php @@ -0,0 +1,12 @@ +'', + 'queueCreatedAt'=>'', + ]; + +} diff --git a/app/Tools/Tools.php b/app/Tools/Tools.php index 356fac8..08bf888 100644 --- a/app/Tools/Tools.php +++ b/app/Tools/Tools.php @@ -51,6 +51,16 @@ class Tools return Str::orderedUuid(); } + static function JonsEncode(array $aData): string + { + return json_encode($aData, JSON_UNESCAPED_UNICODE); + } + + static function JonsDecode(string $aData): string + { + return json_decode($aData, JSON_UNESCAPED_UNICODE); + } + }