diff --git a/app/Bean/Model/Wallet/Platform/WalletPlatformBean.php b/app/Bean/Model/Wallet/Platform/WalletPlatformBean.php new file mode 100644 index 0000000..4ca8d1c --- /dev/null +++ b/app/Bean/Model/Wallet/Platform/WalletPlatformBean.php @@ -0,0 +1,90 @@ +id; + } + + public function setId($id): void + { + $this->id = $id; + } + + public function getName() + { + return $this->name; + } + + public function setName($name): void + { + $this->name = $name; + } + + public function getCode() + { + return $this->code; + } + + public function setCode($code): void + { + $this->code = $code; + } + + public function getAppid() + { + return $this->appid; + } + + public function setAppid($appid): void + { + $this->appid = $appid; + } + + public function getSecret() + { + return $this->secret; + } + + public function setSecret($secret): void + { + $this->secret = $secret; + } + + public function getNotifyIp() + { + return $this->notify_ip; + } + + public function setNotifyIp($notify_ip): void + { + $this->notify_ip = $notify_ip; + } + + public function getCreatedAt() + { + return $this->created_at; + } + + public function setCreatedAt($created_at): void + { + $this->created_at = $created_at; + } + + + +} diff --git a/app/Bean/Queue/QueueBaseBean.php b/app/Bean/Queue/QueueBaseBean.php index 70a6575..be20bd2 100644 --- a/app/Bean/Queue/QueueBaseBean.php +++ b/app/Bean/Queue/QueueBaseBean.php @@ -5,7 +5,7 @@ use App\Bean\Model\Base\BaseBean; class QueueBaseBean extends BaseBean { - const TRY_LIMIT = 3; + protected $try_limt = 3; protected $try_times = 0; public function IncrTryTimes($num = 1): void @@ -15,11 +15,21 @@ class QueueBaseBean extends BaseBean public function checkTryTimes(): bool { - if($this->try_times >= self::TRY_LIMIT) { + if($this->try_times >= $this->try_limt) { return false; } return true; } + public function getTryTimes(): int + { + return $this->try_times; + } + + function getDelaySeconds($seconds = 30): int + { + return $seconds * $this->try_times; + } + } diff --git a/app/Bean/Service/HttpServiceConfigBean.php b/app/Bean/Service/HttpServiceConfigBean.php new file mode 100644 index 0000000..a9b4121 --- /dev/null +++ b/app/Bean/Service/HttpServiceConfigBean.php @@ -0,0 +1,23 @@ +is_return_json; + } + + public function setIsReturnJson(bool $is_return_json): void + { + $this->is_return_json = $is_return_json; + } + + + + +} diff --git a/app/Bean/Service/HttpServiceReturnLogBean.php b/app/Bean/Service/HttpServiceReturnLogBean.php new file mode 100644 index 0000000..6db6ce0 --- /dev/null +++ b/app/Bean/Service/HttpServiceReturnLogBean.php @@ -0,0 +1,66 @@ +response_status_code; + } + + public function setResponseStatusCode($response_status_code): void + { + $this->response_status_code = $response_status_code; + } + + public function getRequestUrl() + { + return $this->request_url; + } + + public function setRequestUrl($request_url): void + { + $this->request_url = $request_url; + } + + public function getRequestData() + { + return $this->request_data; + } + + public function setRequestData($request_data): void + { + $this->request_data = $request_data; + } + + public function getRequestHeader() + { + return $this->request_header; + } + + public function setRequestHeader($request_header): void + { + $this->request_header = $request_header; + } + + public function getResponseBody() + { + return $this->response_body; + } + + public function setResponseBody($response_body): void + { + $this->response_body = $response_body; + } + + + +} diff --git a/app/Cache/Base/BaseCache.php b/app/Cache/Base/BaseCache.php index 791a5b7..818bb38 100644 --- a/app/Cache/Base/BaseCache.php +++ b/app/Cache/Base/BaseCache.php @@ -39,6 +39,10 @@ abstract class BaseCache { return $this->getCacheData($key); } + function del($key): bool + { + return $this->delCacheData($key); + } function getCacheData($primary_key = null): array|string|null { diff --git a/app/Cache/Table/Wallet/TableWalletPlatformCache.php b/app/Cache/Table/Wallet/TableWalletPlatformCache.php new file mode 100644 index 0000000..3517243 --- /dev/null +++ b/app/Cache/Table/Wallet/TableWalletPlatformCache.php @@ -0,0 +1,26 @@ +info('每2秒检查最新链上区块'); + $this->info('开始...'); + $oUsdtTrx20Service = new UsdtTrx20Service(); + $oUsdtTrx20Service->checkNowBlock(); + $this->info('结束...'); + } +} diff --git a/app/Console/Kernel.php b/app/Console/Kernel.php index f304417..5d11858 100644 --- a/app/Console/Kernel.php +++ b/app/Console/Kernel.php @@ -15,6 +15,7 @@ class Kernel extends ConsoleKernel // $schedule->command('inspire')->hourly(); $schedule->command('app:daily-check-user-active-status-cmd')->dailyAt('00:30')->onOneServer(); $schedule->command('app:hour-customer-wallet-trs-delay-pay-cmd')->hourly()->onOneServer(); + $schedule->command('app:seconds-wallet-block-check-cmd')->everyTwoSeconds()->onOneServer(); } /** diff --git a/app/Exceptions/PlatformNotifyException.php b/app/Exceptions/PlatformNotifyException.php new file mode 100644 index 0000000..7e81394 --- /dev/null +++ b/app/Exceptions/PlatformNotifyException.php @@ -0,0 +1,8 @@ +toArrayNotNull())->onQueue(QueueWalletBlockTransactionBean::QUEUE_NAME); + $queue = self::dispatch($bean->toArrayNotNull())->onQueue(QueueWalletBlockTransactionBean::QUEUE_NAME); + if(!empty($delaySec)){ + $queue->delay(now()->addSeconds($delaySec)); + } + return $queue; } } diff --git a/app/Jobs/Wallet/WalletNotifyToPlatformQueue.php b/app/Jobs/Wallet/WalletNotifyToPlatformQueue.php index d142934..a30acd9 100644 --- a/app/Jobs/Wallet/WalletNotifyToPlatformQueue.php +++ b/app/Jobs/Wallet/WalletNotifyToPlatformQueue.php @@ -3,6 +3,7 @@ namespace App\Jobs\Wallet; use App\Bean\Queue\Wallet\QueueNotifyToPlatformBean; +use App\Service\Wallet\PlatformNotifyService; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; @@ -24,13 +25,18 @@ class WalletNotifyToPlatformQueue implements ShouldQueue /** * Execute the job. */ - public function handle(): void + public function handle(array $params): void { - // + $oPlatformNotifyService = new PlatformNotifyService(); + $oPlatformNotifyService->notifyToPlatform(new QueueNotifyToPlatformBean($params)); } - public static function putToQueue(QueueNotifyToPlatformBean $bean): \Illuminate\Foundation\Bus\PendingDispatch + public static function putToQueue(QueueNotifyToPlatformBean $bean, $delaySec = null): \Illuminate\Foundation\Bus\PendingDispatch { - return self::dispatch($bean->toArrayNotNull())->onQueue(QueueNotifyToPlatformBean::QUEUE_NAME); + $queue = self::dispatch($bean->toArrayNotNull())->onQueue(QueueNotifyToPlatformBean::QUEUE_NAME); + if(!empty($delaySec)){ + $queue->delay(now()->addSeconds($delaySec)); + } + return $queue; } } diff --git a/app/Jobs/Wallet/WalletPlatformTransactionQueue.php b/app/Jobs/Wallet/WalletPlatformTransactionQueue.php index 599cd06..ec3f666 100644 --- a/app/Jobs/Wallet/WalletPlatformTransactionQueue.php +++ b/app/Jobs/Wallet/WalletPlatformTransactionQueue.php @@ -47,8 +47,12 @@ class WalletPlatformTransactionQueue implements ShouldQueue } } - public static function putToQueue(QueueWalletPlatformTransactionBean $bean): \Illuminate\Foundation\Bus\PendingDispatch + public static function putToQueue(QueueWalletPlatformTransactionBean $bean, $delaySec = null): \Illuminate\Foundation\Bus\PendingDispatch { - return self::dispatch($bean->toArrayNotNull())->onQueue(QueueWalletPlatformTransactionBean::QUEUE_NAME); + $queue = self::dispatch($bean->toArrayNotNull())->onQueue(QueueWalletPlatformTransactionBean::QUEUE_NAME); + if(!empty($delaySec)){ + $queue->delay(now()->addSeconds($delaySec)); + } + return $queue; } } diff --git a/app/Jobs/Wallet/WalletPlatformWithdrawTransferQueue.php b/app/Jobs/Wallet/WalletPlatformWithdrawTransferQueue.php index beff737..6bc9814 100644 --- a/app/Jobs/Wallet/WalletPlatformWithdrawTransferQueue.php +++ b/app/Jobs/Wallet/WalletPlatformWithdrawTransferQueue.php @@ -37,7 +37,7 @@ class WalletPlatformWithdrawTransferQueue implements ShouldQueue public static function putToQueue(QueueWalletPlatformWithdrawTransferBean $bean, $delaySec = null): \Illuminate\Foundation\Bus\PendingDispatch { - $queue = self::dispatch($bean->toArrayNotNull())->onQueue(QueueWalletPlatformWithdrawTransferBean::QUEUE_NAME); + $queue = self::dispatch($bean->toArrayNotNull())->onQueue(QueueWalletPlatformWithdrawTransferBean::QUEUE_NAME); if(!empty($delaySec)){ $queue->delay(now()->addSeconds($delaySec)); } diff --git a/app/Models/Base/BaseModel.php b/app/Models/Base/BaseModel.php index 9825f4a..56ce7ef 100644 --- a/app/Models/Base/BaseModel.php +++ b/app/Models/Base/BaseModel.php @@ -8,6 +8,13 @@ class BaseModel extends Model { public $timestamps = false; + const LOCK_TYPE_FOR_UPDATE = 1; + const LOCK_TYPE_FOR_SHARE = 2; + const LOCK_TYPE = [ + self::LOCK_TYPE_FOR_UPDATE => '悲观锁', + self::LOCK_TYPE_FOR_SHARE => '乐观锁', + ]; + function checkColInFill($aItem) { foreach ($aItem as $key => $value) { @@ -64,6 +71,17 @@ class BaseModel extends Model return $this->newQuery()->where($aWhere)->get($col); } + function setLockByPrimaryKey($id,$lock_type = self::LOCK_TYPE_FOR_UPDATE): Model|\Illuminate\Database\Eloquent\Builder|null + { + $model = $this->newQuery()->where($this->primaryKey, $id); + if($lock_type == self::LOCK_TYPE_FOR_SHARE){ + $model = $model->sharedLock(); + }elseif ($lock_type == self::LOCK_TYPE_FOR_UPDATE){ + $model = $model->lockForUpdate(); + } + return $model->first(); + } + } diff --git a/app/Models/Wallet/Platform/WalletPlatformModel.php b/app/Models/Wallet/Platform/WalletPlatformModel.php index 3d242f1..6b28c50 100644 --- a/app/Models/Wallet/Platform/WalletPlatformModel.php +++ b/app/Models/Wallet/Platform/WalletPlatformModel.php @@ -1,6 +1,7 @@ primaryKey])) { + $this->delTableCache($aItem[$this->primaryKey]); + } + return $res; + } + function delTableCache($key): bool + { + $oTableWalletPlatformCache = new TableWalletPlatformCache(); + return $oTableWalletPlatformCache->del($key); } } diff --git a/app/Service/HttpService.php b/app/Service/HttpService.php new file mode 100644 index 0000000..8ae4af3 --- /dev/null +++ b/app/Service/HttpService.php @@ -0,0 +1,86 @@ +client = new Client(); + } + + public function getClient(): Client + { + return $this->client; + } + + public function get(string $endpoint, array $data = [], HttpServiceConfigBean $configBean = null) + { + if ($configBean === null) $configBean = new HttpServiceConfigBean(); + $oHttpServiceReturnLogBean = $this->request(self::REQ_METHOD_GET, $endpoint, $data, $configBean); + if ($configBean->isIsReturnJson() === true) { + return json_decode($oHttpServiceReturnLogBean->getResponseBody(), false); + } + return $oHttpServiceReturnLogBean->getResponseBody(); + } + + public function post(string $endpoint, array $data = [], HttpServiceConfigBean $configBean = null) + { + if ($configBean === null) $configBean = new HttpServiceConfigBean(); + $oHttpServiceReturnLogBean = $this->request(self::REQ_METHOD_POST, $endpoint, $data, $configBean); + if ($configBean->isIsReturnJson() === true) { + return json_decode($oHttpServiceReturnLogBean->getResponseBody(), false); + } + return $oHttpServiceReturnLogBean->getResponseBody(); + } + + /** + * @throws GuzzleException + */ + function request($req_method , string $endpoint, array $data = [], HttpServiceConfigBean $configBean = null): HttpServiceReturnLogBean|bool + { + try { + if(!in_array($req_method, [self::REQ_METHOD_GET, self::REQ_METHOD_POST])) { + throw new \Exception('request method not support'); + } + if ($configBean === null) $configBean = new HttpServiceConfigBean(); + $req_data = []; + if($req_method === self::REQ_METHOD_GET) { + $req_data = ['query' => $data]; + } + if($req_method === self::REQ_METHOD_POST) { + $req_data = ['json' => $data]; + } + $resp = $this->getClient()->request($req_method,$endpoint, $req_data); + $status_code = $resp->getStatusCode(); + $body = $resp->getBody()->getContents(); + $resp->getBody()->close(); + + //记录日志 + $oHttpServiceReturnLogBean = new HttpServiceReturnLogBean(); + $oHttpServiceReturnLogBean->setRequestData($data); + $oHttpServiceReturnLogBean->setRequestUrl($endpoint); + $oHttpServiceReturnLogBean->setResponseStatusCode($status_code); + $oHttpServiceReturnLogBean->setResponseBody($body); + Logs::InfoLog(__CLASS__ . '-' . __FUNCTION__, $oHttpServiceReturnLogBean->toArrayNotNull()); + + return $oHttpServiceReturnLogBean; + }catch (\Exception $e) { + Logs::ErrLog(__CLASS__ . '-' . __FUNCTION__, $e, func_get_args()); + return false; + } + } + + +} diff --git a/app/Service/Wallet/Coin/Tron/UsdtTrx20Service.php b/app/Service/Wallet/Coin/Tron/UsdtTrx20Service.php index 9fc2630..0d201d5 100644 --- a/app/Service/Wallet/Coin/Tron/UsdtTrx20Service.php +++ b/app/Service/Wallet/Coin/Tron/UsdtTrx20Service.php @@ -113,7 +113,7 @@ class UsdtTrx20Service $oWalletTronBlockModel->updateItem($updateItem); } catch (\Exception $e) { - Logs::ErrLog(__FUNCTION__, $e, $oQueueWalletBlockBean->toArray()); + Logs::ErrLog(__FUNCTION__, $e, $oQueueWalletBlockBean->toArrayNotNull()); $oQueueWalletBlockBean->IncrTryTimes(); if (!$oQueueWalletBlockBean->checkTryTimes()) { //超出重试次数 $updateItem = [ @@ -137,7 +137,7 @@ class UsdtTrx20Service try { $this->tronBlockTransaction($oEventBean); } catch (\Exception $e) { - Logs::ErrLog(__FUNCTION__, $e, $oEventBean->toArray()); + Logs::ErrLog(__FUNCTION__, $e, $oEventBean->toArrayNotNull()); } } } @@ -158,7 +158,7 @@ class UsdtTrx20Service $is_to = $oTronWalletAddressSetCache->checkKey($to_addr); if (!$is_from && !$is_to) return; //投递到钱包地址账变处理队列 - $oQueueEventBean = new QueueEventBean($oEventBean->toArray()); + $oQueueEventBean = new QueueEventBean($oEventBean->toArrayNotNull()); WalletAddressTransactionQueue::putToQueue($oQueueEventBean); return; } diff --git a/app/Service/Wallet/PlatformNotifyService.php b/app/Service/Wallet/PlatformNotifyService.php new file mode 100644 index 0000000..d264e3b --- /dev/null +++ b/app/Service/Wallet/PlatformNotifyService.php @@ -0,0 +1,48 @@ +getPlatformId())) return; + + $oTableWalletPlatformCache = new TableWalletPlatformCache(); + $aWalletPlatform= $oTableWalletPlatformCache->get($bean->getPlatformId()); + if(empty($aWalletPlatform)) return; + + $oWalletPlatformBean = new WalletPlatformBean($aWalletPlatform); + if(empty($oWalletPlatformBean->getNotifyIp())) return; + //通知到平台 + $oHttpService = new \App\Service\HttpService(); + $HttpServiceConfigBean = new HttpServiceConfigBean(); + $HttpServiceConfigBean->setIsReturnJson(false); //返回不格式化json + $res = $oHttpService->post($oWalletPlatformBean->getNotifyIp(),$bean->toArrayNotNull(),$HttpServiceConfigBean); + if($res !== self::RETURN_SUCCESS){ //通知失败 + throw new PlatformNotifyException('notify to platform failed'); + } + + Logs::SuccLog(__FUNCTION__, $bean->toArrayNotNull()); + }catch (\Exception $e) { + Logs::ErrLog(__FUNCTION__, $e, $bean->toArrayNotNull()); + $bean->IncrTryTimes(); + if (!$bean->checkTryTimes()) { //超出重试次数 + return; + } + WalletNotifyToPlatformQueue::putToQueue($bean, $bean->getTryTimes()); + } + + } + + +} diff --git a/app/Tools/Logs.php b/app/Tools/Logs.php index 9e27fd3..98d628c 100644 --- a/app/Tools/Logs.php +++ b/app/Tools/Logs.php @@ -16,6 +16,11 @@ class Logs Log::error(self::getTitle($title,'success:'), $params); } + static function InfoLog($title, $params = []): void + { + Log::error(self::getTitle($title,'info:'), $params); + } + static function getTitle($title,$prefix=''): string { return $title.' '.$prefix;