usdtTrc20Sdk = new UsdtTrc20(); } function getHandle(): \Sdk\Wallet\Trc20Sdk { return $this->usdtTrc20Sdk->getHandler(); } //定时请求当前区块比对redis中当前区块,差值存入队列 function checkNowBlock(): void { $oNowBlockBean = $this->getHandle()->getNowBlockNumber(); //查询redis中当前区块 $oTronKeyCache = new TronKeyCache(); $sLocalNowBlock = $oTronKeyCache->getNowBlock(); if (empty($sLocalNowBlock)) { //不存在就从数据库中取 $oWalletTronBlockModel = new WalletTronBlockModel(); $sLocalNowBlock = $oWalletTronBlockModel->getLastBlockNumber(); if (!empty($sLocalNowBlock)) $oTronKeyCache->setNowBlock($sLocalNowBlock); } $oWalletTronBlockModel = new WalletTronBlockModel(); //如果都不存在,就直接投递到队列 if (empty($sLocalNowBlock)) { $res = $oWalletTronBlockModel->insertByBlockNum($oNowBlockBean->getBlockNumber()); if (!$res) return; $oQueueWalletBlockBean = new QueueWalletBlockBean(); $oQueueWalletBlockBean->setId($res->id); $oQueueWalletBlockBean->setBlockNumber($oNowBlockBean->getBlockNumber()); WalletBlockQueue::putToQueue($oQueueWalletBlockBean); return; } //如果相等获取大于,就不处理 if ($sLocalNowBlock >= $oNowBlockBean->getBlockNumber()) { return; } //如果不相等,获取差值一个个投递到队列 for ($i = $sLocalNowBlock + 1; $i <= $oNowBlockBean->getBlockNumber(); $i++) { $res = $oWalletTronBlockModel->insertByBlockNum($i); if (!$res) continue; $oQueueWalletBlockBean = new QueueWalletBlockBean(); $oQueueWalletBlockBean->setId($res->id); $oQueueWalletBlockBean->setBlockNumber($i); WalletBlockQueue::putToQueue($oQueueWalletBlockBean); } } //监听处理区块 public function tronBlockConsumer(QueueWalletBlockBean $oQueueWalletBlockBean): void { $oWalletTronBlockModel = new WalletTronBlockModel(); $oResModel = $oWalletTronBlockModel->findItem($oQueueWalletBlockBean->getId()); if (!$oResModel) throw new QueueException('区块不存在'); if ($oResModel->status != WalletTronBlockModel::STATUS_WAIT) return; $oResModel->status = WalletTronBlockModel::STATUS_PROCESS; $oResModel->save(); $blockNumber = $oQueueWalletBlockBean->getBlockNumber(); $fingerprint = null; try { //循环获取区块所有转账信息 while (true) { $oResponseBean = $this->getHandle()->getTrc20BlockEventsTransferByBlockNumber($blockNumber, $fingerprint); if ($oResponseBean === false) throw new QueueException('获取区块信息失败'); if (empty($oResponseBean)) break; if ($oResponseBean->getSuccess() === false) break; $fingerprint = $oResponseBean->getMeta()->getFingerprint(); if (empty($oResponseBean->getData())) break; //投递到区块账变队列 $oQueueWalletBlockTransactionBean = new QueueWalletBlockTransactionBean(); $oQueueWalletBlockTransactionBean->setData($oResponseBean->getData()); WalletBlockTransactionQueue::putToQueue($oQueueWalletBlockTransactionBean); if (empty($fingerprint)) break; } $updateItem = [ 'id' => $oResModel->id, 'status' => WalletTronBlockModel::STATUS_FINISH, ]; $oWalletTronBlockModel->updateItem($updateItem); } catch (\Exception $e) { Logs::ErrLog(__FUNCTION__, $e, $oQueueWalletBlockBean->toArrayNotNull()); $oQueueWalletBlockBean->IncrTryTimes(); if (!$oQueueWalletBlockBean->checkTryTimes()) { //超出重试次数 $updateItem = [ 'id' => $oResModel->id, 'status' => WalletTronBlockModel::STATUS_FAIL, ]; $oWalletTronBlockModel->updateItem($updateItem); return; } WalletBlockQueue::putToQueue($oQueueWalletBlockBean, 30); } } //监听处理区块每笔合约交易 public function tronBlockTransactionConsumer(QueueWalletBlockTransactionBean $oQueueWalletBlockTransactionBean): void { $aBlockEvents = $oQueueWalletBlockTransactionBean->getData(); foreach ($aBlockEvents as $item) { $oEventBean = new EventBean($item); try { $this->tronBlockTransaction($oEventBean); } catch (\Exception $e) { Logs::ErrLog(__FUNCTION__, $e, $oEventBean->toArrayNotNull()); } } } //监听处理单个合约交易 function tronBlockTransaction(EventBean $oEventBean): void { //检查合约 if ($oEventBean->getContractAddress() != UsdtTrc20::CONTRACT_ADDRESS) return; $from_addr = $oEventBean->getResult()->getFrom(); $to_addr = $oEventBean->getResult()->getTo(); $value = $oEventBean->getResult()->getValue(); //检查交易双方是否是平台地址 $oTronWalletAddressSetCache = new TronWalletAddressSetCache(); $is_from = $oTronWalletAddressSetCache->checkKey($from_addr); $is_to = $oTronWalletAddressSetCache->checkKey($to_addr); if (!$is_from && !$is_to) return; //投递到钱包地址账变处理队列 $oQueueEventBean = new QueueEventBean($oEventBean->toArrayNotNull()); WalletAddressTransactionQueue::putToQueue($oQueueEventBean); return; } //用户钱包地址交易账变处理 function walletAddressTransactionConsumer(QueueEventBean $bean): void { try { $from_addr = $bean->getResult()->getFrom(); $to_addr = $bean->getResult()->getTo(); $value = $bean->getResult()->getValue(); //查询地址是否存在 $oWalletAddressModel = new WalletAddressModel(); $resFromModel = $oWalletAddressModel->findByAddressHex($from_addr, WalletCurrencyModel::CODE_USDT_TRC20); $resToModel = $oWalletAddressModel->findByAddressHex($to_addr, WalletCurrencyModel::CODE_USDT_TRC20); if (!$resFromModel && !$resToModel) return; //先落地到数据库 $oWalletAddressTransactionModel = new WalletAddressTransactionModel(); $oWalletAddressTransactionBean = new WalletAddressTransactionBean(); $oWalletAddressTransactionBean->setCurrencyCode(WalletCurrencyModel::CODE_USDT_TRC20); $oWalletAddressTransactionBean->setBlockTransactionId($bean->getTransactionId()); $oWalletAddressTransactionBean->setBlockNumber($bean->getBlockNumber()); $oWalletAddressTransactionBean->setBlockEventName($bean->getEventName()); $oWalletAddressTransactionBean->setBlockFromAddress($bean->getResult()->getFrom()); $oWalletAddressTransactionBean->setBlockToAddress($bean->getResult()->getTo()); $oWalletAddressTransactionBean->setBlockValue($bean->getResult()->getValue()); $oWalletAddressTransactionBean->setIsNotify(WalletAddressTransactionModel::IS_NOTIFY_WAIT); if ($resToModel) { //充值 $oWalletAddressTransactionBean->setType(WalletAddressTransactionModel::TYPE_RECHARGE); $oWalletAddressTransactionBean->setWalletAddressId($resToModel->id); $resRecharge = $oWalletAddressTransactionModel->typeRecharge($oWalletAddressTransactionBean); if (!$resRecharge) throw new QueueException('WalletAddressTransactionModel addTransaction recharge error'); //投递到处理平台用户账变队列 $this->addToQueue($oWalletAddressTransactionBean); } if ($resFromModel) { //提现 $oWalletAddressTransactionBean->setType(WalletAddressTransactionModel::TYPE_WITHDRAW); $oWalletAddressTransactionBean->setWalletAddressId($resFromModel->id); $resWithdraw = $oWalletAddressTransactionModel->typeWithdraw($oWalletAddressTransactionBean); if (!$resWithdraw) throw new QueueException('WalletAddressTransactionModel addTransaction withdraw error'); //投递到处理平台用户账变队列 $this->addToQueue($oWalletAddressTransactionBean); } } catch (\Exception $e) { Logs::ErrLog(__FUNCTION__, $e, $bean->toArrayNotNull()); $bean->IncrTryTimes(); if($bean->checkTryTimes()) return; WalletAddressTransactionQueue::putToQueue($bean, 30); } } function addToQueue(WalletAddressTransactionBean $bean): void { $oQueueWalletPlatformTransactionBean = new QueueWalletPlatformTransactionBean(); $oQueueWalletPlatformTransactionBean->setWalletId($bean->getWalletAddressId()); $oQueueWalletPlatformTransactionBean->setWalletTransactionId($bean->getId()); $oQueueWalletPlatformTransactionBean->setBlockTransactionId($bean->getBlockTransactionId()); $oQueueWalletPlatformTransactionBean->setAmount($bean->getAmount()); $oQueueWalletPlatformTransactionBean->setType($bean->getType()); WalletPlatformTransactionQueue::putToQueue($oQueueWalletPlatformTransactionBean); } }