---                                                                                                                     一、这个项目是干什么的?(大白话)
                                                                                                                          先理解问题                             s

  区块链上的智能合约每次被调用,都会产生"事件日志"(Event Log)。

  比如:
  - 有人用 Uniswap 换币 → 合约发出 Swap 事件
  - 有人转了一笔 USDT → ERC-20 合约发出 Transfer 事件
  - 有人买了一个 NFT → 合约发出 Transfer 事件

  问题是: 这些事件只存在链上,你的业务系统根本不知道发生了什么。

  你想要的效果:
  - 用户充值 USDT 到你的平台地址 → 你的系统立刻知道,自动给用户加余额
  - 有人买了你平台的 NFT → 你的系统立刻知道,自动更新订单状态
  - 合约被攻击,发出异常事件 → 你的系统立刻报警

  这个项目做的事:

  ▎ 像一个"快递员",24小时盯着区块链,一旦发现你关心的合约事件,立刻通过 Webhook(HTTP 回调) 或 WebSocket
  ▎ 推送给你的业务系统。

  完整数据流(大白话)

  区块链(每12秒出一个新块)
      │
      │  eth_getLogs(轮询拉取事件)
      ▼
  监听进程(常驻,协程并发)
      │
      │  解码事件数据
      ▼
  Redis 队列(缓冲,防止推送失败丢数据)
      │
      ├──→ Webhook 推送(HTTP POST 到你的业务系统)
      │         └── 失败自动重试(指数退避)
      │
      ├──→ WebSocket 广播(实时推给前端)
      │
      └──→ 数据库持久化(永久存档)

  谁需要它?

  ┌────────────────┬───────────────────────────────────┐
  │      场景      │               用法                │
  ├────────────────┼───────────────────────────────────┤
  │ 交易所充值监听 │ 监听 USDT Transfer 事件,自动到账 │
  ├────────────────┼───────────────────────────────────┤
  │ NFT 市场       │ 监听 Transfer/Sale 事件,更新订单 │
  ├────────────────┼───────────────────────────────────┤
  │ DeFi 风控      │ 监听大额 Swap,触发风控告警       │
  ├────────────────┼───────────────────────────────────┤
  │ 链游           │ 监听道具转移事件,更新游戏状态    │
  ├────────────────┼───────────────────────────────────┤
  │ 数据分析平台   │ 收集所有链上事件,存库分析        │
  └────────────────┴───────────────────────────────────┘

  一句话总结

  ▎ 它是区块链和你业务系统之间的"信使",把链上发生的事实时翻译成你能处理的 HTTP 请求或 WebSocket 消息。

  ---
  二、技术选型

  ┌────────────────┬──────────────────────────────┬────────────────────────────────────────────┐
  │      用途      │             选择             │                    原因                    │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ 框架           │ hyperf/hyperf 3.x            │ 协程并发,常驻进程,适合长轮询             │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ 链上事件拉取   │ web3p/web3.php + eth_getLogs │ 最稳定的 PHP Web3 库,getLogs 支持批量拉取 │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ 事件解码       │ kornrunner/php-ethereum-util │ Keccak256 计算事件签名                     │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ 消息队列       │ hyperf/async-queue(Redis)  │ 解耦监听和推送,防止推送失败丢事件         │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ Webhook 推送   │ hyperf/guzzle(Guzzle HTTP) │ 协程版 HTTP 客户端,并发推送               │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ WebSocket 服务 │ hyperf/websocket-server      │ 实时推送给前端                             │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ 数据库         │ hyperf/database MySQL        │ 持久化事件和订阅配置                       │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ 缓存           │ hyperf/cache Redis           │ 存储扫描进度、去重                         │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ 日志           │ hyperf/logger                │ 结构化审计日志                             │
  ├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
  │ 测试           │ phpunit/phpunit              │ 单元测试                                   │
  └────────────────┴──────────────────────────────┴────────────────────────────────────────────┘

  ---
  三、核心概念大白话

  订阅(Subscription)
    └── 你告诉系统:"我要监听 0xUSDT合约 的 Transfer 事件"
    └── 系统记录下来,开始盯着这个合约

  事件签名(Event Signature / Topic0)
    └── 每种事件都有唯一的"指纹"
    └── Transfer(address,address,uint256) 的指纹是固定的 keccak256 哈希
    └── 系统用这个指纹过滤,只拿你关心的事件

  Webhook
    └── 就是一个 HTTP POST 请求
    └── 系统发现事件后,POST 到你指定的 URL
    └── 你的业务系统收到请求,处理业务逻辑

  指数退避重试(Exponential Backoff)
    └── 第1次失败 → 等1秒重试
    └── 第2次失败 → 等2秒重试
    └── 第3次失败 → 等4秒重试
    └── 最多重试5次,还失败就进死信队列,人工处理

  去重(Deduplication)
    └── 同一个事件(txHash + logIndex)只推送一次
    └── 防止重启后重复推送

  ---
  四、项目目录结构

  hyperf-contract-event-watcher/
  ├── .github/
  │   ├── workflows/
  │   │   ├── ci.yml
  │   │   └── release.yml
  │   ├── ISSUE_TEMPLATE/
  │   │   ├── bug_report.md
  │   │   └── feature_request.md
  │   └── PULL_REQUEST_TEMPLATE.md
  ├── app/
  │   ├── Constants/
  │   │   └── EventStatus.php
  │   ├── Contract/
  │   │   └── PusherInterface.php
  │   ├── Controller/
  │   │   ├── SubscriptionController.php   # 订阅管理 API
  │   │   └── WebSocketController.php      # WebSocket 服务
  │   ├── Exception/
  │   │   └── Handler/
  │   │       └── AppExceptionHandler.php
  │   ├── Job/
  │   │   ├── DeliverWebhookJob.php        # Webhook 推送队列任务
  │   │   └── ProcessEventJob.php          # 事件处理队列任务
  │   ├── Model/
  │   │   ├── Subscription.php             # 订阅配置
  │   │   ├── CapturedEvent.php            # 已捕获的事件
  │   │   └── WebhookDelivery.php          # Webhook 推送记录
  │   ├── Process/
  │   │   └── EventPollingProcess.php      # 常驻轮询进程
  │   ├── Service/
  │   │   ├── Web3Service.php              # 链上数据拉取
  │   │   ├── EventDecodeService.php       # 事件解码
  │   │   ├── EventCaptureService.php      # 事件捕获核心
  │   │   ├── WebhookPusherService.php     # Webhook 推送
  │   │   └── WebSocketPusherService.php   # WebSocket 广播
  │   └── Listener/
  │       └── QueueFailedListener.php      # 队列失败监听
  ├── config/
  │   └── autoload/
  │       ├── databases.php
  │       ├── redis.php
  │       ├── async_queue.php
  │       ├── server.php
  │       ├── logger.php
  │       └── watcher.php
  ├── database/
  │   └── migrations/
  │       ├── 2024_01_01_create_subscriptions_table.php
  │       ├── 2024_01_02_create_captured_events_table.php
  │       └── 2024_01_03_create_webhook_deliveries_table.php
  ├── test/
  │   ├── Cases/
  │   │   ├── EventDecodeServiceTest.php
  │   │   ├── EventCaptureServiceTest.php
  │   │   └── WebhookPusherServiceTest.php
  │   └── bootstrap.php
  ├── .env.example
  ├── .php-cs-fixer.php
  ├── composer.json
  ├── phpunit.xml
  ├── README.md
  ├── CHANGELOG.md
  ├── CONTRIBUTING.md
  └── LICENSE

  ---
  五、完整代码

  composer.json

  {
      "name": "your-org/hyperf-contract-event-watcher",
      "description": "A high-performance blockchain smart contract event listener and webhook/websocket push service
  built on Hyperf",
      "type": "project",
      "keywords": [
          "hyperf", "blockchain", "ethereum", "event-listener",
          "webhook", "websocket", "web3", "evm", "smart-contract"
      ],
      "license": "MIT",
      "require": {
          "php": ">=8.1",
          "ext-gmp": "*",
          "hyperf/async-queue": "^3.1",
          "hyperf/cache": "^3.1",
          "hyperf/command": "^3.1",
          "hyperf/config": "^3.1",
          "hyperf/database": "^3.1",
          "hyperf/db-connection": "^3.1",
          "hyperf/framework": "^3.1",
          "hyperf/guzzle": "^3.1",
          "hyperf/http-server": "^3.1",
          "hyperf/logger": "^3.1",
          "hyperf/process": "^3.1",
          "hyperf/redis": "^3.1",
          "hyperf/websocket-server": "^3.1",
          "kornrunner/php-ethereum-util": "^1.0",
          "web3p/web3.php": "^0.1.4"
      },
      "require-dev": {
          "friendsofphp/php-cs-fixer": "^3.0",
          "hyperf/testing": "^3.1",
          "mockery/mockery": "^1.4",
          "phpunit/phpunit": "^10.0"
      },
      "autoload": {
          "psr-4": { "App\\": "app/" }
      },
      "autoload-dev": {
          "psr-4": { "HyperfTest\\": "test/" }
      },
      "scripts": {
          "test":     "phpunit --colors=always",
          "cs-fix":   "php-cs-fixer fix",
          "cs-check": "php-cs-fixer fix --dry-run --diff"
      },
      "minimum-stability": "dev",
      "prefer-stable": true
  }

  ---
  .env.example

  APP_NAME=hyperf-contract-event-watcher
  APP_ENV=dev

  # 数据库
  DB_HOST=127.0.0.1
  DB_PORT=3306
  DB_DATABASE=event_watcher
  DB_USERNAME=root
  DB_PASSWORD=

  # Redis
  REDIS_HOST=127.0.0.1
  REDIS_PORT=6379
  REDIS_DB=0

  # 以太坊节点
  ETH_RPC_URL=https://mainnet.infura.io/v3/YOUR_KEY
  ETH_CHAIN_ID=1

  # 轮询配置
  WATCHER_POLL_INTERVAL=3          # 轮询间隔(秒)
  WATCHER_BLOCK_BATCH=50           # 每次扫描多少个块
  WATCHER_CONFIRM_BLOCKS=12        # 等待确认块数
  WATCHER_START_BLOCK=0            # 0=从最新块开始

  # Webhook 推送配置
  WEBHOOK_TIMEOUT=10               # 推送超时(秒)
  WEBHOOK_MAX_RETRIES=5            # 最大重试次数
  WEBHOOK_RETRY_BASE=2             # 指数退避基数(秒)
  WEBHOOK_SECRET=your_hmac_secret  # HMAC 签名密钥(接收方验证用)

  # API 鉴权
  WATCHER_API_TOKEN=your_api_token

  ---
  config/autoload/watcher.php

  <?php

  declare(strict_types=1);

  return [
      'eth_rpc_url'       => env('ETH_RPC_URL', 'http://127.0.0.1:8545'),
      'eth_chain_id'      => (int) env('ETH_CHAIN_ID', 1),
      'poll_interval'     => (int) env('WATCHER_POLL_INTERVAL', 3),
      'block_batch'       => (int) env('WATCHER_BLOCK_BATCH', 50),
      'confirm_blocks'    => (int) env('WATCHER_CONFIRM_BLOCKS', 12),
      'start_block'       => (int) env('WATCHER_START_BLOCK', 0),
      'webhook_timeout'   => (int) env('WEBHOOK_TIMEOUT', 10),
      'webhook_max_retry' => (int) env('WEBHOOK_MAX_RETRIES', 5),
      'webhook_retry_base'=> (int) env('WEBHOOK_RETRY_BASE', 2),
      'webhook_secret'    => env('WEBHOOK_SECRET', ''),
      'api_token'         => env('WATCHER_API_TOKEN', ''),
  ];

  ---
  config/autoload/async_queue.php

  <?php

  declare(strict_types=1);

  return [
      // 主队列:处理事件
      'event' => [
          'driver'       => Hyperf\AsyncQueue\Driver\RedisDriver::class,
          'redis'        => ['pool' => 'default'],
          'channel'      => 'watcher:queue:event',
          'timeout'      => 60,
          'retry_seconds'=> [1, 5, 10, 20],
          'handle_timeout' => 60,
          'processes'    => 4,
          'concurrent'   => ['limit' => 20],
      ],
      // Webhook 推送队列
      'webhook' => [
          'driver'       => Hyperf\AsyncQueue\Driver\RedisDriver::class,
          'redis'        => ['pool' => 'default'],
          'channel'      => 'watcher:queue:webhook',
          'timeout'      => 30,
          'retry_seconds'=> [2, 4, 8, 16, 32], // 指数退避
          'handle_timeout' => 30,
          'processes'    => 8,  // 更多 worker 并发推送
          'concurrent'   => ['limit' => 50],
      ],
  ];

  ---
  数据库迁移

  database/migrations/2024_01_01_create_subscriptions_table.php

  <?php

  declare(strict_types=1);

  use Hyperf\Database\Migrations\Migration;
  use Hyperf\Database\Schema\Blueprint;
  use Hyperf\Database\Schema\Schema;

  class CreateSubscriptionsTable extends Migration
  {
      public function up(): void
      {
          Schema::create('subscriptions', function (Blueprint $table) {
              $table->bigIncrements('id');

              // 订阅名称,方便识别
              $table->string('name', 100)->comment('订阅名称,如 usdt-transfer-monitor');

              // 监听的合约地址(小写),NULL 表示监听所有合约
              $table->char('contract_address', 42)->nullable()->comment('合约地址');

              // 监听的事件签名(topic0),NULL 表示监听所有事件
              // 例:Transfer(address,address,uint256) 的 keccak256
              $table->char('event_signature', 66)->nullable()->comment('事件签名 topic0');

              // 事件名称(可读,如 Transfer)
              $table->string('event_name', 100)->nullable()->comment('事件名称');

              // ABI 定义(JSON),用于解码事件参数
              $table->json('event_abi')->nullable()->comment('事件 ABI');

              // 推送方式:webhook / websocket / both
              $table->string('push_type', 20)->default('webhook')->comment('推送方式');

              // Webhook 目标 URL
              $table->string('webhook_url', 500)->nullable()->comment('Webhook 回调地址');

              // 额外过滤条件(JSON),如只关心金额大于 1000 的 Transfer
              $table->json('filters')->nullable()->comment('额外过滤条件');

              // 是否启用
              $table->boolean('is_active')->default(true);

              $table->timestamps();
              $table->index('contract_address');
              $table->index('event_signature');
              $table->index('is_active');
          });
      }

      public function down(): void
      {
          Schema::dropIfExists('subscriptions');
      }
  }

  database/migrations/2024_01_02_create_captured_events_table.php

  <?php

  declare(strict_types=1);

  use Hyperf\Database\Migrations\Migration;
  use Hyperf\Database\Schema\Blueprint;
  use Hyperf\Database\Schema\Schema;

  class CreateCapturedEventsTable extends Migration
  {
      public function up(): void
      {
          Schema::create('captured_events', function (Blueprint $table) {
              $table->bigIncrements('id');
              $table->unsignedBigInteger('subscription_id')->comment('触发的订阅 ID');
              $table->unsignedBigInteger('block_number')->comment('区块高度');
              $table->char('block_hash', 66)->comment('区块哈希');
              $table->char('transaction_hash', 66)->comment('交易哈希');
              $table->unsignedSmallInteger('log_index')->comment('日志索引');
              $table->char('contract_address', 42)->comment('合约地址');
              $table->char('event_signature', 66)->comment('事件签名');
              $table->string('event_name', 100)->nullable()->comment('事件名称');
              $table->json('topics')->comment('原始 topics');
              $table->text('data')->nullable()->comment('原始 data');
              $table->json('decoded_params')->nullable()->comment('解码后的参数');

              // 推送状态:pending / delivered / failed
              $table->string('push_status', 20)->default('pending')->comment('推送状态');
              $table->unsignedTinyInteger('push_attempts')->default(0)->comment('推送尝试次数');
              $table->timestamp('last_push_at')->nullable()->comment('最后推送时间');

              $table->timestamp('created_at')->useCurrent();

              // 唯一索引:同一事件不重复存储
              $table->unique(['transaction_hash', 'log_index'], 'uniq_tx_log');
              $table->index('subscription_id');
              $table->index('block_number');
              $table->index('push_status');
              $table->index('contract_address');
          });
      }

      public function down(): void
      {
          Schema::dropIfExists('captured_events');
      }
  }

  database/migrations/2024_01_03_create_webhook_deliveries_table.php

  <?php

  declare(strict_types=1);

  use Hyperf\Database\Migrations\Migration;
  use Hyperf\Database\Schema\Blueprint;
  use Hyperf\Database\Schema\Schema;

  class CreateWebhookDeliveriesTable extends Migration
  {
      public function up(): void
      {
          Schema::create('webhook_deliveries', function (Blueprint $table) {
              $table->bigIncrements('id');
              $table->unsignedBigInteger('captured_event_id')->comment('关联的事件 ID');
              $table->string('webhook_url', 500)->comment('推送目标 URL');
              $table->unsignedTinyInteger('attempt')->default(1)->comment('第几次尝试');
              $table->unsignedSmallInteger('http_status')->nullable()->comment('HTTP 响应状态码');
              $table->text('response_body')->nullable()->comment('响应内容(截断到 1000 字符)');
              $table->unsignedSmallInteger('duration_ms')->nullable()->comment('耗时(毫秒)');
              $table->boolean('success')->default(false)->comment('是否成功');
              $table->string('error_message', 500)->nullable()->comment('错误信息');
              $table->timestamp('created_at')->useCurrent();

              $table->index('captured_event_id');
              $table->index(['success', 'created_at']);
          });
      }

      public function down(): void
      {
          Schema::dropIfExists('webhook_deliveries');
      }
  }

  ---
  Model 层

  app/Model/Subscription.php

  <?php

  declare(strict_types=1);

  namespace App\Model;

  use Hyperf\DbConnection\Model\Model;

  class Subscription extends Model
  {
      protected ?string $table = 'subscriptions';

      protected array $fillable = [
          'name', 'contract_address', 'event_signature',
          'event_name', 'event_abi', 'push_type',
          'webhook_url', 'filters', 'is_active',
      ];

      protected array $casts = [
          'event_abi' => 'json',
          'filters'   => 'json',
          'is_active' => 'boolean',
      ];

      public function capturedEvents()
      {
          return $this->hasMany(CapturedEvent::class, 'subscription_id');
      }
  }

  app/Model/CapturedEvent.php

  <?php

  declare(strict_types=1);

  namespace App\Model;

  use Hyperf\DbConnection\Model\Model;

  class CapturedEvent extends Model
  {
      public bool $timestamps = false;

      protected ?string $table = 'captured_events';

      protected array $fillable = [
          'subscription_id', 'block_number', 'block_hash',
          'transaction_hash', 'log_index', 'contract_address',
          'event_signature', 'event_name', 'topics', 'data',
          'decoded_params', 'push_status', 'push_attempts', 'last_push_at',
      ];

      protected array $casts = [
          'topics'        => 'json',
          'decoded_params'=> 'json',
          'block_number'  => 'integer',
          'log_index'     => 'integer',
          'push_attempts' => 'integer',
      ];

      public function subscription()
      {
          return $this->belongsTo(Subscription::class);
      }

      public function deliveries()
      {
          return $this->hasMany(WebhookDelivery::class, 'captured_event_id');
      }
  }

  app/Model/WebhookDelivery.php

  <?php

  declare(strict_types=1);

  namespace App\Model;

  use Hyperf\DbConnection\Model\Model;

  class WebhookDelivery extends Model
  {
      public bool $timestamps = false;

      protected ?string $table = 'webhook_deliveries';

      protected array $fillable = [
          'captured_event_id', 'webhook_url', 'attempt',
          'http_status', 'response_body', 'duration_ms',
          'success', 'error_message',
      ];

      protected array $casts = [
          'success'     => 'boolean',
          'http_status' => 'integer',
          'attempt'     => 'integer',
          'duration_ms' => 'integer',
      ];
  }

  ---
  Service 层(核心)

  app/Service/Web3Service.php

  <?php

  declare(strict_types=1);

  namespace App\Service;

  use Hyperf\Contract\ConfigInterface;
  use Web3\Web3;

  /**
   * 链上数据拉取服务
   * 大白话:负责和以太坊节点"打电话",拉取事件日志
   */
  class Web3Service
  {
      private Web3 $web3;

      public function __construct(private ConfigInterface $config)
      {
          $this->web3 = new Web3($this->config->get('watcher.eth_rpc_url'));
      }

      /**
       * 获取最新区块高度
       */
      public function getLatestBlockNumber(): int
      {
          $result = null;
          $this->web3->eth->blockNumber(function ($err, $res) use (&$result) {
              if ($err) {
                  throw new \RuntimeException('获取最新块失败: ' . $err->getMessage());
              }
              $result = hexdec($res->toString());
          });
          return (int) $result;
      }

      /**
       * 批量拉取指定范围内的事件日志(eth_getLogs)
       *
       * 大白话:告诉节点"给我第 100 到 150 块里,
       * 这些合约地址发出的这些事件",节点一次性返回所有匹配的日志
       *
       * @param int      $fromBlock       起始块
       * @param int      $toBlock         结束块
       * @param array    $addresses       合约地址列表(空=不过滤)
       * @param array    $topics          事件签名列表(空=不过滤)
       */
      public function getLogs(
          int $fromBlock,
          int $toBlock,
          array $addresses = [],
          array $topics = []
      ): array {
          $filter = [
              'fromBlock' => '0x' . dechex($fromBlock),
              'toBlock'   => '0x' . dechex($toBlock),
          ];

          if (! empty($addresses)) {
              $filter['address'] = $addresses;
          }

          if (! empty($topics)) {
              // topics[0] 是事件签名,用 OR 逻辑匹配多个事件
              $filter['topics'] = [array_values($topics)];
          }

          $logs = [];
          $this->web3->eth->getLogs($filter, function ($err, $result) use (&$logs) {
              if ($err) {
                  throw new \RuntimeException('getLogs 失败: ' . $err->getMessage());
              }
              $logs = (array) $result;
          });

          return $logs;
      }
  }

  ---
  app/Service/EventDecodeService.php

  <?php

  declare(strict_types=1);

  namespace App\Service;

  use App\Model\Subscription;

  /**
   * 事件解码服务
   * 大白话:把链上的"密文日志"翻译成人能看懂的参数
   *
   * 链上日志长这样:
   *   topics[0] = 0xddf252ad...(Transfer 的指纹)
   *   topics[1] = 0x000...from地址
   *   topics[2] = 0x000...to地址
   *   data       = 0x000...金额(十六进制)
   *
   * 解码后:
   *   { "from": "0xabc...", "to": "0xdef...", "value": "1000000000000000000" }
   */
  class EventDecodeService
  {
      /**
       * 计算事件签名的 keccak256(topic0)
       * 例:Transfer(address,address,uint256) → 0xddf252ad...
       */
      public function computeEventSignature(string $eventSignatureStr): string
      {
          return '0x' . \kornrunner\Keccak::hash($eventSignatureStr, 256);
      }

      /**
       * 根据 ABI 解码事件参数
       */
      public function decode(array $log, ?array $abi): ?array
      {
          if (empty($abi)) {
              return null;
          }

          $decoded    = [];
          $topics     = array_map('strval', (array) ($log['topics'] ?? []));
          $topicIndex = 1; // topics[0] 是事件签名,从 [1] 开始

          $nonIndexedTypes = [];

          foreach ($abi as $param) {
              if ($param['indexed'] ?? false) {
                  $raw = $topics[$topicIndex++] ?? null;
                  $decoded[$param['name']] = $raw
                      ? $this->decodeParam($param['type'], ltrim($raw, '0x'))
                      : null;
              } else {
                  $nonIndexedTypes[] = $param;
              }
          }

          // 解码 data 字段(非 indexed 参数,按 32 字节对齐)
          $data = ltrim((string) ($log['data'] ?? ''), '0x');
          foreach ($nonIndexedTypes as $i => $param) {
              $chunk = substr($data, $i * 64, 64);
              if ($chunk) {
                  $decoded[$param['name']] = $this->decodeParam($param['type'], $chunk);
              }
          }

          return $decoded;
      }

      /**
       * 根据订阅配置过滤事件是否匹配
       * 大白话:检查这条日志是不是你关心的那种
       */
      public function matchesSubscription(array $log, Subscription $sub): bool
      {
          $topics = array_map('strval', (array) ($log['topics'] ?? []));

          // 检查合约地址
          if ($sub->contract_address) {
              $logAddr = strtolower((string) ($log['address'] ?? ''));
              if ($logAddr !== strtolower($sub->contract_address)) {
                  return false;
              }
          }

          // 检查事件签名(topic0)
          if ($sub->event_signature) {
              if (($topics[0] ?? '') !== $sub->event_signature) {
                  return false;
              }
          }

          // 检查自定义过滤条件
          if (! empty($sub->filters)) {
              return $this->applyFilters($log, $sub->filters);
         

      private─function applyFilters(array $log,─array $filters):─bool───────────────────────────────────────────────────
      {                                                                                                                           // 支持按 topic1/topic2 过滤(如只关心转给特定地址的事件)
          foreach ($filters as $key => $value) {                                                                                      if (str_starts_with($key, 'topic')) {
                  $index = (int) substr($key, 5);
                  $topics = array_map('strval', (array) ($log['topics'] ?? []));
                  $actual = strtolower($topics[$index] ?? '');
                  // topic 里地址是 32 字节,取后 40$expected = strtolower('0x' . str_pad(ltrim($value, '0x'), 64, '0', STR_PAD_LEFT));
                  if ($actual !== $expected) {
                      return false;
                  }
              }
          }
          return true;
      }

      private function decodeParam(string $type, string $hex): mixed
      {
          return match (true) {
              $type === 'address'             => '0x' . substr($hex, -40),
              str_starts_with($type, 'uint') => base_convert($hex, 16, 10),
              str_starts_with($type, 'int')  => base_convert($hex, 16, 10),
              $type === 'bool'               => $hex !== str_repeat('0', 64),
              default                        => '0x' . $hex,
          };
      }
  }

  ---
  app/Service/EventCaptureService.php

  <?php

  declare(strict_types=1);

  namespace App\Service;

  use App\Job\ProcessEventJob;
  use App\Model\CapturedEvent;
  use App\Model\Subscription;
  use Hyperf\AsyncQueue\Driver\DriverFactory;
  use Hyperf\Cache\Cache;
  use Hyperf\Contract\ConfigInterface;
  use Hyperf\Logger\LoggerFactory;
  use Psr\Log\LoggerInterface;

  /**
   * 事件捕获核心服务
   * 大白话:这是"侦探",负责扫描区块、比对订阅、发现目标事件
   */
  class EventCaptureService
  {
      private LoggerInterface $logger;
      private int $blockBatch;
      private int $confirmBlocks;

      public function __construct(
          private Web3Service $web3,
          private EventDecodeService $decoder,
          private DriverFactory $queue,
          private Cache $cache,
          private ConfigInterface $config,
          LoggerFactory $loggerFactory
      ) {
          $this->logger        = $loggerFactory->get('watcher');
          $this->blockBatch    = $this->config->get('watcher.block_batch', 50);
          $this->confirmBlocks = $this->config->get('watcher.confirm_blocks', 12);
      }

      /**
       * 主扫描入口:找出未扫描的块范围,批量拉取日志
       */
      public function scan(): void
      {
          $latestOnChain = $this->web3->getLatestBlockNumber();
          $safeBlock     = $latestOnChain - $this->confirmBlocks;
          $lastScanned   = (int) $this->cache->get('watcher:last_scanned_block', $this->getStartBlock() - 1);

          if ($lastScanned >= $safeBlock) {
              return; // 已是最新,无需扫描
          }

          // 加载所有启用的订阅
          $subscriptions = Subscription::where('is_active', true)->get();
          if ($subscriptions->isEmpty()) {
              return;
          }

          // 收集所有订阅关心的合约地址和事件签名
          $addresses  = $subscriptions->pluck('contract_address')->filter()->unique()->values()->toArray();
          $signatures = $subscriptions->pluck('event_signature')->filter()->unique()->values()->toArray();

          // 分批扫描,每批最多 blockBatch 个块
          $from = $lastScanned + 1;
          while ($from <= $safeBlock) {
              $to = min($from + $this->blockBatch - 1, $safeBlock);

              try {
                  $this->scanRange($from, $to, $addresses, $signatures, $subscriptions->all());
                  $this->cache->set('watcher:last_scanned_block', $to, 86400);
                  $this->logger->info('扫描完成', ['from' => $from, 'to' => $to]);
              } catch (\Throwable $e) {
                  $this->logger->error('扫描失败', [
                      'from'  => $from,
                      'to'    => $to,
                      'error' => $e->getMessage(),
                  ]);
                  break; // 失败停止,等下次轮询重试
              }

              $from = $to + 1;
          }
      }

      /**
       * 扫描指定块范围
       */
      private function scanRange(
          int $from,
          int $to,
          array $addresses,
          array $signatures,
          array $subscriptions
      ): void {
          $logs = $this->web3->getLogs($from, $to, $addresses, $signatures);

          if (empty($logs)) {
              return;
          }

          $this->logger->info('发现日志', ['from' => $from, 'to' => $to, 'count' => count($logs)]);

          foreach ($logs as $log) {
              $log = (array) $log;
              foreach ($subscriptions as $sub) {
                  if ($this->decoder->matchesSubscription($log, $sub)) {
                      $this->captureEvent($log, $sub);
                  }
              }
          }
      }

      /**
       * 捕获单条事件,存库并推入处理队列
       */
      private function captureEvent(array $log, Subscription $sub): void
      {
          $txHash   = (string) ($log['transactionHash'] ?? '');
          $logIndex = hexdec((string) ($log['logIndex'] ?? '0x0'));

          // 去重:同一事件只处理一次
          $dedupeKey = "watcher:dedup:{$txHash}:{$logIndex}:{$sub->id}";
          if ($this->cache->has($dedupeKey)) {
              return;
          }
          $this->cache->set($dedupeKey, 1, 3600);

          $topics  = array_map('strval', (array) ($log['topics'] ?? []));
          $decoded = $this->decoder->decode($log, $sub->event_abi);

          try {
              $event = CapturedEvent::create([
                  'subscription_id'  => $sub->id,
                  'block_number'     => hexdec((string) ($log['blockNumber'] ?? '0x0')),
                  'block_hash'       => (string) ($log['blockHash'] ?? ''),
                  'transaction_hash' => $txHash,
                  'log_index'        => $logIndex,
                  'contract_address' => strtolower((string) ($log['address'] ?? '')),
                  'event_signature'  => $topics[0] ?? '',
                  'event_name'       => $sub->event_name,
                  'topics'           => $topics,
                  'data'             => (string) ($log['data'] ?? ''),
                  'decoded_params'   => $decoded,
                  'push_status'      => 'pending',
              ]);

              // 推入处理队列(解耦捕获和推送)
              $this->queue->get('event')->push(new ProcessEventJob($event->id));

          } catch (\Illuminate\Database\QueryException $e) {
              // 唯一索引冲突 = 重复事件,忽略
              if (str_contains($e->getMessage(), 'Duplicate entry')) {
                  return;
              }
              throw $e;
          }
      }

      private function getStartBlock(): int
      {
          $configured = $this->config->get('watcher.start_block', 0);
          return $configured > 0 ? $configured : $this->web3->getLatestBlockNumber();
      }
  }

  ---
  app/Service/WebhookPusherService.php

  <?php

  declare(strict_types=1);

  namespace App\Service;

  use App\Model\CapturedEvent;
  use App\Model\WebhookDelivery;
  use GuzzleHttp\Client;
  use GuzzleHttp\Exception\RequestException;
  use Hyperf\Contract\ConfigInterface;
  use Hyperf\Guzzle\ClientFactory;
  use Hyperf\Logger\LoggerFactory;
  use Psr\Log\LoggerInterface;

  /**
   * Webhook 推送服务
   * 大白话:把捕获到的事件 POST 到业务系统的回调地址
   * 带 HMAC 签名,让接收方验证消息真实性
   * 失败自动重试,最终失败记录死信
   */
  class WebhookPusherService
  {
      private Client $http;
      private LoggerInterface $logger;
      private string $secret;
      private int $timeout;

      public function __construct(
          ClientFactory $clientFactory,
          private ConfigInterface $config,
          LoggerFactory $loggerFactory
      ) {
          $this->timeout = $this->config->get('watcher.webhook_timeout', 10);
          $this->secret  = $this->config->get('watcher.webhook_secret', '');
          $this->logger  = $loggerFactory->get('watcher');

          // 协程版 Guzzle 客户端
          $this->http = $clientFactory->create([
              'timeout'         => $this->timeout,
              'connect_timeout' => 5,
          ]);
      }

      /**
       * 推送事件到 Webhook URL
       */
      public function push(CapturedEvent $event, string $webhookUrl, int $attempt = 1): bool
      {
          $payload   = $this->buildPayload($event);
          $body      = json_encode($payload);
          $signature = $this->sign($body);
          $startTime = microtime(true);

          try {
              $response = $this->http->post($webhookUrl, [
                  'body'    => $body,
                  'headers' => [
                      'Content-Type'           => 'application/json',
                      'X-Watcher-Signature'    => $signature,
                      'X-Watcher-Event'        => $event->event_name ?? 'unknown',
                      'X-Watcher-Delivery-Id'  => (string) $event->id,
                      'X-Watcher-Timestamp'    => (string) time(),
                  ],
              ]);

              $durationMs = (int) ((microtime(true) - $startTime) * 1000);
              $statusCode = $response->getStatusCode();
              $success    = $statusCode >= 200 && $statusCode < 300;

              WebhookDelivery::create([
                  'captured_event_id' => $event->id,
                  'webhook_url'       => $webhookUrl,
                  'attempt'           => $attempt,
                  'http_status'       => $statusCode,
                  'response_body'     => substr((string) $response->getBody(), 0, 1000),
                  'duration_ms'       => $durationMs,
                  'success'           => $success,
              ]);

              if ($success) {
                  $event->push_status   = 'delivered';
                  $event->push_attempts = $attempt;
                  $event->last_push_at  = date('Y-m-d H:i:s');
                  $event->save();

                  $this->logger->info('Webhook 推送成功', [
                      'event_id'   => $event->id,
                      'url'        => $webhookUrl,
                      'status'     => $statusCode,
                      'attempt'    => $attempt,
                      'duration'   => $durationMs . 'ms',
                  ]);
              }

              return $success;

          } catch (RequestException $e) {
              $durationMs = (int) ((microtime(true) - $startTime) * 1000);
              $statusCode = $e->hasResponse() ? $e->getResponse()->getStatusCode() : null;

              WebhookDelivery::create([
                  'captured_event_id' => $event->id,
                  'webhook_url'       => $webhookUrl,
                  'attempt'           => $attempt,
                  'http_status'       => $statusCode,
                  'duration_ms'       => $durationMs,
                  'success'           => false,
                  'error_message'     => $e->getMessage(),
              ]);

              $event->push_status   = 'failed';
              $event->push_attempts = $attempt;
              $event->last_push_at  = date('Y-m-d H:i:s');
              $event->save();

              $this->logger->warning('Webhook 推送失败', [
                  'event_id' => $event->id,
                  'url'      => $webhookUrl,
                  'attempt'  => $attempt,
                  'error'    => $e->getMessage(),
              ]);

              return false;
          }
      }

      /**
       * 构建推送 payload
       * 大白话:把数据库里的事件记录整理成 JSON 格式发出去
       */
      private function buildPayload(CapturedEvent $event): array
      {
          return [
              'id'               => $event->id,
              'subscription_id'  => $event->subscription_id,
              'event_name'       => $event->event_name,
              'event_signature'  => $event->event_signature,
              'contract_address' => $event->contract_address,
              'block_number'     => $event->block_number,
              'block_hash'       => $event->block_hash,
              'transaction_hash' => $event->transaction_hash,
              'log_index'        => $event->log_index,
              'topics'           => $event->topics,
              'data'             => $event->data,
              'decoded_params'   => $event->decoded_params,
              'captured_at'      => $event->created_at,
          ];
      }

      /**
       * HMAC-SHA256 签名
       * 大白话:给消息盖个"防伪章",接收方用同一个密钥验证
       *
       * 接收方验证方式:
       * $expected = hash_hmac('sha256', $rawBody, $secret);
       * if (!hash_equals($expected, $receivedSignature)) { 拒绝 }
       */
      private function sign(string $body): string
      {
          if (! $this->secret) {
              return '';
          }
          return 'sha256=' . hash_hmac('sha256', $body, $this->secret);
      }
  }

  ---
  app/Service/WebSocketPusherService.php

  <?php

  declare(strict_types=1);

  namespace App\Service;

  use App\Model\CapturedEvent;
  use Hyperf\WebSocketServer\Sender;

  /**
   * WebSocket 广播服务
   * 大白话:把事件实时推给所有连接的前端客户端
   * 前端可以订阅特定合约或事件,实时看到链上动态
   */
  class WebSocketPusherService
  {
      // 存储 fd(连接ID)→ 订阅过滤条件 的映射
      // 大白话:记录每个前端连接关心哪些事件
      private static array $connections = [];

      public function __construct(private Sender $sender)
      {
      }

      /**
       * 注册 WebSocket 连接的订阅过滤
       */
      public static function subscribe(int $fd, array $filters): void
      {
          self::$connections[$fd] = $filters;
      }

      /**
       * 移除连接
       */
      public static function unsubscribe(int $fd): void
      {
          unset(self::$connections[$fd]);
      }

      /**
       * 广播事件给所有匹配的连接
       */
      public function broadcast(CapturedEvent $event): void
      {
          $payload = json_encode([
              'type'             => 'contract_event',
              'event_name'       => $event->event_name,
              'contract_address' => $event->contract_address,
              'transaction_hash' => $event->transaction_hash,
              'block_number'     => $event->block_number,
              'decoded_params'   => $event->decoded_params,
              'captured_at'      => $event->created_at,
          ]);

          foreach (self::$connections as $fd => $filters) {
              if ($this->matchesFilters($event, $filters)) {
                  try {
                      $this->sender->push($fd, $payload);
                  } catch (\Throwable) {
                      // 连接已断开,清理
                      self::unsubscribe($fd);
                  }
              }
          }
      }

      private function matchesFilters(CapturedEvent $event, array $filters): bool
      {
          if (! empty($filters['contract_address'])) {
              if (strtolower($event->contract_address) !== strtolower($filters['contract_address'])) {
                  return false;
              }
          }
          if (! empty($filters['event_name'])) {
              if ($event->event_name !== $filters['event_name']) {
                  return false;
              }
          }
          return true;
      }
  }

  ---
  Job 层

  app/Job/ProcessEventJob.php

  <?php

  declare(strict_types=1);

  namespace App\Job;

  use App\Model\CapturedEvent;
  use App\Model\Subscription;
  use App\Service\WebhookPusherService;
  use App\Service\WebSocketPusherService;
  use Hyperf\AsyncQueue\Job;

  /**
   * 事件处理队列任务
   * 大白话:从队列里取出"待处理事件",决定用 Webhook 还是 WebSocket 推送
   */
  class ProcessEventJob extends Job
  {
      public int $maxAttempts = 1; // 本任务不重试,推送失败由 DeliverWebhookJob 重试

      public function __construct(public readonly int $eventId)
      {
      }

      public function handle(): void
      {
          $event = CapturedEvent::find($this->eventId);
          if (! $event) {
              return;
          }

          $sub = Subscription::find($event->subscription_id);
          if (! $sub) {
              return;
          }

          $container = \Hyperf\Context\ApplicationContext::getContainer();

          // WebSocket 广播(实时推给前端)
          if (in_array($sub->push_type, ['websocket', 'both'])) {
              $container->get(WebSocketPusherService::class)->broadcast($event);
          }

          // Webhook 推送(推给业务系统)
          if (in_array($sub->push_type, ['webhook', 'both']) && $sub->webhook_url) {
              $container->get(\Hyperf\AsyncQueue\Driver\DriverFactory::class)
                  ->get('webhook')
                  ->push(new DeliverWebhookJob($event->id, $sub->webhook_url));
          }
      }
  }

  app/Job/DeliverWebhookJob.php

  <?php

  declare(strict_types=1);

  namespace App\Job;

  use App\Model\CapturedEvent;
  use App\Service\WebhookPusherService;
  use Hyperf\AsyncQueue\Job;

  /**
   * Webhook 推送队列任务
   * 大白话:专门负责推送 HTTP 请求,失败自动重试(指数退避)
   * retry_seconds 配置在 async_queue.php 里:[2, 4, 8, 16, 32]
   */
  class DeliverWebhookJob extends Job
  {
      public int $maxAttempts = 5;

      public function __construct(
          public readonly int $eventId,
          public readonly string $webhookUrl,
          public readonly int $attempt = 1
      ) {
      }

      public function handle(): void
      {
          $event = CapturedEvent::find($this->eventId);
          if (! $event || $event->push_status === 'delivered') {
              return; // 已成功推送,跳过
          }

          $pusher  = \Hyperf\Context\ApplicationContext::getContainer()
              ->get(WebhookPusherService::class);

          $success = $pusher->push($event, $this->webhookUrl, $this->attempt);

          if (! $success) {
              // 抛出异常触发队列重试机制
              throw new \RuntimeException(
                  "Webhook 推送失败,event_id={$this->eventId},attempt={$this->attempt}"
              );
          }
      }
  }

  ---
  常驻进程

  app/Process/EventPollingProcess.php

  <?php

  declare(strict_types=1);

  namespace App\Process;

  use App\Service\EventCaptureService;
  use Hyperf\Process\AbstractProcess;
  use Hyperf\Process\Annotation\Process;
  use Psr\Container\ContainerInterface;
  use Psr\Log\LoggerInterface;

  /**
   * 事件轮询常驻进程
   * 大白话:这是"守夜人",永远不停地盯着区块链
   * 每隔 N 秒扫一次,发现新事件就推入队列
   */
  #[Process(name: 'event-polling')]
  class EventPollingProcess extends AbstractProcess
  {
      public string $name           = 'event-polling';
      public int    $restartInterval = 3;

      public function __construct(
          ContainerInterface $container,
          private EventCaptureService $captureService,
          private LoggerInterface $logger
      ) {
          parent::__construct($container);
      }

      public function handle(): void
      {
          $interval = (int) \Hyperf\Support\env('WATCHER_POLL_INTERVAL', 3);

          $this->logger->info('事件轮询进程启动');

          while (true) {
              try {
                  $this->captureService->scan();
              } catch (\Throwable $e) {
                  $this->logger->error('轮询异常', [
                      'message' => $e->getMessage(),
                      'file'    => $e->getFile() . ':' . $e->getLine(),
                  ]);
              }

              \Swoole\Coroutine::sleep($interval);
          }
      }
  }

  ---
  Controller 层

  app/Controller/SubscriptionController.php

  <?php

  declare(strict_types=1);

  namespace App\Controller;

  use App\Model\Subscription;
  use App\Service\EventDecodeService;
  use Hyperf\HttpServer\Annotation\Controller;
  use Hyperf\HttpServer\Annotation\DeleteMapping;
  use Hyperf\HttpServer\Annotation\GetMapping;
  use Hyperf\HttpServer\Annotation\PostMapping;
  use Hyperf\HttpServer\Annotation\PutMapping;
  use Hyperf\HttpServer\Contract\RequestInterface;

  /**
   * 订阅管理 API
   * 大白话:让业务系统告诉我"我要监听哪个合约的哪个事件,推到哪个地址"
   */
  #[Controller(prefix: '/api/v1/subscriptions')]
  class SubscriptionController
  {
      public function __construct(private EventDecodeService $decoder)
      {
      }

      /** 列出所有订阅 */
      #[GetMapping(path: '')]
      public function index(): array
      {
          return [
              'code' => 0,
              'data' => Subscription::orderByDesc('id')->get(),
          ];
      }

      /**
       * 创建订阅
       *
       * POST /api/v1/subscriptions
       * {
       *   "name": "usdt-transfer-monitor",
       *   "contract_address": "0xdAC17F958D2ee523a2206206994597C13D831ec7",
       *   "event_signature_str": "Transfer(address,address,uint256)",
       *   "event_name": "Transfer",
       *   "event_abi": [
       *     {"name":"from",  "type":"address","indexed":true},
       *     {"name":"to",    "type":"address","indexed":true},
       *     {"name":"value", "type":"uint256","indexed":false}
       *   ],
       *   "push_type": "webhook",
       *   "webhook_url": "https://your-app.com/webhook/usdt",
       *   "filters": {"topic2": "0xYourAddress"}
       * }
       */
      #[PostMapping(path: '')]
      public function store(RequestInterface $request): array
      {
          $sigStr    = $request->input('event_signature_str');
          $signature = $sigStr
              ? $this->decoder->computeEventSignature($sigStr)
              : $request->input('event_signature');

          $sub = Subscription::create([
              'name'             => $request->input('name'),
              'contract_address' => $request->input('contract_address')
                  ? strtolower($request->input('contract_address'))
                  : null,
              'event_signature'  => $signature,
              'event_name'       => $request->input('event_name'),
              'event_abi'        => $request->input('event_abi'),
              'push_type'        => $request->input('push_type', 'webhook'),
              'webhook_url'      => $request->input('webhook_url'),
              'filters'          => $request->input('filters'),
              'is_active'        => true,
          ]);

          return ['code' => 0, 'data' => $sub];
      }

      /** 查看单个订阅及最近事件 */
      #[GetMapping(path: '/{id}')]
      public function show(int $id): array
      {
          $sub = Subscription::with(['capturedEvents' => function ($q) {
              $q->orderByDesc('id')->limit(20);
          }])->findOrFail($id);

          return ['code' => 0, 'data' => $sub];
      }

      /** 启用/禁用订阅 */
      #[PutMapping(path: '/{id}/toggle')]
      public function toggle(int $id): array
      {
          $sub            = Subscription::findOrFail($id);
          $sub->is_active = ! $sub->is_active;
          $sub->save();

          return ['code' => 0, 'data' => ['is_active' => $sub->is_active]];
      }

      /** 删除订阅 */
      #[DeleteMapping(path: '/{id}')]
      public function destroy(int $id): array
      {
          Subscription::findOrFail($id)->delete();
          return ['code' => 0, 'message' => '已删除'];
      }
  }

  app/Controller/WebSocketController.php

  <?php

  declare(strict_types=1);

  namespace App\Controller;

  use App\Service\WebSocketPusherService;
  use Hyperf\WebSocketServer\Annotation\OnClose;
  use Hyperf\WebSocketServer\Annotation\OnMessage;
  use Hyperf\WebSocketServer\Annotation\OnOpen;
  use Hyperf\WebSocketServer\Context;
  use Swoole\Http\Request;
  use Swoole\WebSocket\Frame;
  use Swoole\WebSocket\Server;

  /**
   * WebSocket 服务控制器
   * 大白话:前端连上来后,发送订阅消息,服务端实时推送匹配的链上事件
   *
   * 前端使用示例:
   * const ws = new WebSocket('ws://localhost:9502');
   * ws.onopen = () => ws.send(JSON.stringify({
   *   action: 'subscribe',
   *   contract_address: '0xUSDT...',
   *   event_name: 'Transfer'
   * }));
   * ws.onmessage = (e) => console.log(JSON.parse(e.data));
   */
  class WebSocketController
  {
      #[OnOpen]
      public function onOpen(Server $server, Request $request): void
      {
          $server->push($request->fd, json_encode([
              'type'    => 'connected',
              'message' => '已连接到 hyperf-contract-event-watcher',
          ]));
      }

      #[OnMessage]
      public function onMessage(Server $server, Frame $frame): void
      {
          $data = json_decode($frame->data, true);
          if (! $data) {
              return;
          }

          $action = $data['action'] ?? '';

          if ($action === 'subscribe') {
              WebSocketPusherService::subscribe($frame->fd, [
                  'contract_address' => $data['contract_address'] ?? null,
                  'event_name'       => $data['event_name'] ?? null,
              ]);

              $server->push($frame->fd, json_encode([
                  'type'    => 'subscribed',
                  'filters' => $data,
              ]));
          }

          if ($action === 'unsubscribe') {
              WebSocketPusherService::unsubscribe($frame->fd);
              $server->push($frame->fd, json_encode(['type' => 'unsubscribed']));
          }

          if ($action === 'ping') {
              $server->push($frame->fd, json_encode(['type' => 'pong']));
          }
      }

      #[OnClose]
      public function onClose(Server $server, int $fd): void
      {
          WebSocketPusherService::unsubscribe($fd);
      }
  }

  ---
  测试

  test/Cases/EventDecodeServiceTest.php

  <?php

  declare(strict_types=1);

  namespace HyperfTest\Cases;

  use App\Service\EventDecodeService;
  use PHPUnit\Framework\TestCase;

  class EventDecodeServiceTest extends TestCase
  {
      private EventDecodeService $decoder;

      protected function setUp(): void
      {
          $this->decoder = new EventDecodeService();
      }

      public function testComputeTransferEventSignature(): void
      {
          $sig = $this->decoder->computeEventSignature('Transfer(address,address,uint256)');
          // 已知的 ERC-20 Transfer 事件签名
          $this->assertEquals(
              '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
              $sig
          );
      }

     
 public function testDecodeTransferEvent(): void
      {                                                                                                                           $abi = [
              ['name' => 'from',  'type' => 'address', 'indexed' => true],                                                            ['name' => 'to',    'type' => 'address', 'indexed' => true],
              ['name' => 'value', 'type' => 'uint256', 'indexed' => false],
          ];

          // 模拟真实的链上日志结构
          $log = [
              'topics' => [
                  '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
                  '0x000000000000000000000000ab5801a7d398351b8be11c439e05c5b3259aec9b', // from
                  '0x000000000000000000000000ca35b7d915458ef540ade6068dfe2f44e8fa733c', // to
              ],
              // value = 1000000000000000000 (1 ETH in wei)
              'data' => '0x0000000000000000000000000000000000000000000000000de0b6b3a7640000',
          ];

          $decoded = $this->decoder->decode($log, $abi);

          $this->assertNotNull($decoded);
          $this->assertEquals('0xab5801a7d398351b8be11c439e05c5b3259aec9b', $decoded['from']);
          $this->assertEquals('0xca35b7d915458ef540ade6068dfe2f44e8fa733c', $decoded['to']);
          $this->assertEquals('1000000000000000000', $decoded['value']);
      }

      public function testDecodeReturnsNullWithNoAbi(): void
      {
          $log = ['topics' => [], 'data' => '0x'];
          $this->assertNull($this->decoder->decode($log, null));
          $this->assertNull($this->decoder->decode($log, []));
      }

      public function testMatchesSubscriptionByAddress(): void
      {
          $sub                   = new \App\Model\Subscription();
          $sub->contract_address = '0xdac17f958d2ee523a2206206994597c13d831ec7';
          $sub->event_signature  = null;
          $sub->filters          = null;

          $matchingLog = ['address' => '0xdAC17F958D2ee523a2206206994597C13D831ec7', 'topics' => []];
          $this->assertTrue($this->decoder->matchesSubscription($matchingLog, $sub));

          $nonMatchingLog = ['address' => '0x0000000000000000000000000000000000000001', 'topics' => []];
          $this->assertFalse($this->decoder->matchesSubscription($nonMatchingLog, $sub));
      }

      public function testMatchesSubscriptionByEventSignature(): void
      {
          $sub                   = new \App\Model\Subscription();
          $sub->contract_address = null;
          $sub->event_signature  = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef';
          $sub->filters          = null;

          $matchingLog = [
              'address' => '0xany',
              'topics'  => ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],
          ];
          $this->assertTrue($this->decoder->matchesSubscription($matchingLog, $sub));

          $nonMatchingLog = ['address' => '0xany', 'topics' => ['0xdeadbeef']];
          $this->assertFalse($this->decoder->matchesSubscription($nonMatchingLog, $sub));
      }
  }

  ---
  test/Cases/EventCaptureServiceTest.php

  <?php

  declare(strict_types=1);

  namespace HyperfTest\Cases;

  use App\Service\EventCaptureService;
  use App\Service\EventDecodeService;
  use App\Service\Web3Service;
  use Hyperf\AsyncQueue\Driver\DriverFactory;
  use Hyperf\Cache\Cache;
  use Hyperf\Contract\ConfigInterface;
  use Hyperf\Logger\LoggerFactory;
  use Mockery;
  use PHPUnit\Framework\TestCase;

  class EventCaptureServiceTest extends TestCase
  {
      protected function tearDown(): void
      {
          Mockery::close();
      }

      public function testScanSkipsWhenNoSubscriptions(): void
      {
          $web3 = Mockery::mock(Web3Service::class);
          $web3->shouldReceive('getLatestBlockNumber')->andReturn(1000);
          // 没有订阅时不应调用 getLogs
          $web3->shouldNotReceive('getLogs');

          $decoder = new EventDecodeService();

          $queue = Mockery::mock(DriverFactory::class);
          $queue->shouldNotReceive('get');

          $cache = Mockery::mock(Cache::class);
          $cache->shouldReceive('get')
              ->with('watcher:last_scanned_block', Mockery::any())
              ->andReturn(0);
          $cache->shouldReceive('set');

          $config = Mockery::mock(ConfigInterface::class);
          $config->shouldReceive('get')->with('watcher.block_batch', 50)->andReturn(50);
          $config->shouldReceive('get')->with('watcher.confirm_blocks', 12)->andReturn(12);
          $config->shouldReceive('get')->with('watcher.start_block', 0)->andReturn(0);

          $logger        = Mockery::mock(\Psr\Log\LoggerInterface::class);
          $logger->shouldReceive('info', 'error', 'warning');
          $loggerFactory = Mockery::mock(LoggerFactory::class);
          $loggerFactory->shouldReceive('get')->andReturn($logger);

          // 模拟没有订阅
          Mockery::mock('alias:App\Model\Subscription')
              ->shouldReceive('where->get')
              ->andReturn(new \Hyperf\Database\Model\Collection([]));

          $service = new EventCaptureService($web3, $decoder, $queue, $cache, $config, $loggerFactory);
          $service->scan();

          $this->assertTrue(true); // 不抛异常即通过
      }

      public function testScanSkipsWhenAlreadyUpToDate(): void
      {
          $web3 = Mockery::mock(Web3Service::class);
          // 最新块 1000,安全块 988,last_scanned 也是 988
          $web3->shouldReceive('getLatestBlockNumber')->andReturn(1000);
          $web3->shouldNotReceive('getLogs');

          $decoder       = new EventDecodeService();
          $queue         = Mockery::mock(DriverFactory::class);
          $loggerFactory = Mockery::mock(LoggerFactory::class);
          $loggerFactory->shouldReceive('get')->andReturn(
              Mockery::mock(\Psr\Log\LoggerInterface::class)->shouldIgnoreMissing()
          );

          $cache = Mockery::mock(Cache::class);
          $cache->shouldReceive('get')
              ->with('watcher:last_scanned_block', Mockery::any())
              ->andReturn(988); // 已是安全块

          $config = Mockery::mock(ConfigInterface::class);
          $config->shouldReceive('get')->with('watcher.block_batch', 50)->andReturn(50);
          $config->shouldReceive('get')->with('watcher.confirm_blocks', 12)->andReturn(12);
          $config->shouldReceive('get')->with('watcher.start_block', 0)->andReturn(0);

          Mockery::mock('alias:App\Model\Subscription')
              ->shouldReceive('where->get')
              ->andReturn(new \Hyperf\Database\Model\Collection([
                  tap(new \App\Model\Subscription(), function ($s) {
                      $s->id               = 1;
                      $s->contract_address = '0xabc';
                      $s->event_signature  = '0xdef';
                      $s->is_active        = true;
                  }),
              ]));

          $service = new EventCaptureService($web3, $decoder, $queue, $cache, $config, $loggerFactory);
          $service->scan();

          $this->assertTrue(true);
      }
  }

  ---
  test/Cases/WebhookPusherServiceTest.php

  <?php

  declare(strict_types=1);

  namespace HyperfTest\Cases;

  use App\Model\CapturedEvent;
  use App\Service\WebhookPusherService;
  use GuzzleHttp\Client;
  use GuzzleHttp\Handler\MockHandler;
  use GuzzleHttp\HandlerStack;
  use GuzzleHttp\Psr7\Response;
  use GuzzleHttp\Exception\ConnectException;
  use GuzzleHttp\Psr7\Request;
  use Hyperf\Contract\ConfigInterface;
  use Hyperf\Guzzle\ClientFactory;
  use Hyperf\Logger\LoggerFactory;
  use Mockery;
  use PHPUnit\Framework\TestCase;

  class WebhookPusherServiceTest extends TestCase
  {
      protected function tearDown(): void
      {
          Mockery::close();
      }

      private function makeEvent(): CapturedEvent
      {
          $event                    = new CapturedEvent();
          $event->id                = 1;
          $event->subscription_id   = 1;
          $event->event_name        = 'Transfer';
          $event->event_signature   = '0xddf252ad...';
          $event->contract_address  = '0xusdt...';
          $event->block_number      = 18000000;
          $event->block_hash        = '0xblockhash';
          $event->transaction_hash  = '0xtxhash';
          $event->log_index         = 0;
          $event->topics            = [];
          $event->data              = '0x';
          $event->decoded_params    = ['from' => '0xabc', 'to' => '0xdef', 'value' => '1000'];
          $event->created_at        = '2024-01-01 00:00:00';
          return $event;
      }

      private function makeService(array $responses): WebhookPusherService
      {
          $mock    = new MockHandler($responses);
          $handler = HandlerStack::create($mock);
          $client  = new Client(['handler' => $handler]);

          $clientFactory = Mockery::mock(ClientFactory::class);
          $clientFactory->shouldReceive('create')->andReturn($client);

          $config = Mockery::mock(ConfigInterface::class);
          $config->shouldReceive('get')->with('watcher.webhook_timeout', 10)->andReturn(10);
          $config->shouldReceive('get')->with('watcher.webhook_secret', '')->andReturn('test_secret');

          $logger        = Mockery::mock(\Psr\Log\LoggerInterface::class)->shouldIgnoreMissing();
          $loggerFactory = Mockery::mock(LoggerFactory::class);
          $loggerFactory->shouldReceive('get')->andReturn($logger);

          Mockery::mock('alias:App\Model\WebhookDelivery')->shouldReceive('create');

          return new WebhookPusherService($clientFactory, $config, $loggerFactory);
      }

      public function testPushSuccessOn200(): void
      {
          $service = $this->makeService([new Response(200, [], '{"ok":true}')]);
          $event   = $this->makeEvent();

          // 模拟 save()
          $event = Mockery::mock($event)->makePartial();
          $event->shouldReceive('save');

          $result = $service->push($event, 'https://example.com/webhook');
          $this->assertTrue($result);
      }

      public function testPushFailsOn500(): void
      {
          $service = $this->makeService([new Response(500, [], 'Internal Server Error')]);
          $event   = $this->makeEvent();

          $event = Mockery::mock($event)->makePartial();
          $event->shouldReceive('save');

          $result = $service->push($event, 'https://example.com/webhook');
          $this->assertFalse($result);
      }

      public function testPushFailsOnConnectionError(): void
      {
          $service = $this->makeService([
              new ConnectException('Connection refused', new Request('POST', 'test')),
          ]);
          $event = $this->makeEvent();

          $event = Mockery::mock($event)->makePartial();
          $event->shouldReceive('save');

          $result = $service->push($event, 'https://unreachable.example.com/webhook');
          $this->assertFalse($result);
      }

      public function testHmacSignatureIsIncluded(): void
      {
          $capturedHeaders = [];
          $mock    = new MockHandler([new Response(200)]);
          $handler = HandlerStack::create($mock);

          // 拦截请求,捕获 headers
          $handler->push(\GuzzleHttp\Middleware::tap(
              function (\Psr\Http\Message\RequestInterface $req) use (&$capturedHeaders) {
                  $capturedHeaders = $req->getHeaders();
              }
          ));

          $client        = new Client(['handler' => $handler]);
          $clientFactory = Mockery::mock(ClientFactory::class);
          $clientFactory->shouldReceive('create')->andReturn($client);

          $config = Mockery::mock(ConfigInterface::class);
          $config->shouldReceive('get')->with('watcher.webhook_timeout', 10)->andReturn(10);
          $config->shouldReceive('get')->with('watcher.webhook_secret', '')->andReturn('my_secret');

          $logger        = Mockery::mock(\Psr\Log\LoggerInterface::class)->shouldIgnoreMissing();
          $loggerFactory = Mockery::mock(LoggerFactory::class);
          $loggerFactory->shouldReceive('get')->andReturn($logger);

          Mockery::mock('alias:App\Model\WebhookDelivery')->shouldReceive('create');

          $service = new WebhookPusherService($clientFactory, $config, $loggerFactory);
          $event   = Mockery::mock($this->makeEvent())->makePartial();
          $event->shouldReceive('save');

          $service->push($event, 'https://example.com/webhook');

          $this->assertArrayHasKey('X-Watcher-Signature', $capturedHeaders);
          $this->assertStringStartsWith('sha256=', $capturedHeaders['X-Watcher-Signature'][0]);
      }
  }

  ---
  phpunit.xml

  <?xml version="1.0" encoding="UTF-8"?>
  <phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.0/phpunit.xsd"
           bootstrap="test/bootstrap.php"
           colors="true">
      <testsuites>
          <testsuite name="Unit">
              <directory>test/Cases</directory>
          </testsuite>
      </testsuites>
      <coverage>
          <include>
              <directory>app</directory>
          </include>
      </coverage>
  </phpunit>

  ---
  .github/workflows/ci.yml

  name: CI

  on:
    push:
      branches: [main, develop]
    pull_request:
      branches: [main]

  jobs:
    test:
      runs-on: ubuntu-latest

      services:
        mysql:
          image: mysql:8.0
          env:
            MYSQL_ROOT_PASSWORD: root
            MYSQL_DATABASE: event_watcher_test
          ports: ["3306:3306"]
          options: --health-cmd="mysqladmin ping" --health-interval=10s

        redis:
          image: redis:7-alpine
          ports: ["6379:6379"]
          options: --health-cmd="redis-cli ping" --health-interval=10s

      steps:
        - uses: actions/checkout@v4

        - name: Setup PHP
          uses: shivammathur/setup-php@v2
          with:
            php-version: '8.2'
            extensions: swoole, pdo_mysql, redis, gmp
            coverage: xdebug

        - name: Cache Composer
          uses: actions/cache@v3
          with:
            path: vendor
            key: ${{ runner.os }}-composer-${{ hashFiles('composer.lock') }}

        - name: Install dependencies
          run: composer install --no-interaction --prefer-dist

        - name: Setup env
          run: |
            cp .env.example .env
            sed -i 's/your_api_token/ci_test_token/' .env
            sed -i 's/your_hmac_secret/ci_test_secret/' .env

        - name: Run migrations
          run: php bin/hyperf.php migrate --env=testing

        - name: Run tests
          run: composer test -- --coverage-clover coverage.xml

        - name: Upload coverage
          uses: codecov/codecov-action@v3
          with:
            file: coverage.xml

        - name: Check code style
          run: composer cs-check

  ---
  .github/workflows/release.yml

  name: Release

  on:
    push:
      tags: ['v*.*.*']

  jobs:
    release:
      runs-on: ubuntu-latest
      steps:
        - uses: actions/checkout@v4
          with:
            fetch-depth: 0

        - name: Create Release
          uses: softprops/action-gh-release@v1
          with:
            generate_release_notes: true
          env:
            GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

  ---
  CONTRIBUTING.md

  # 贡献指南

  ## 开发流程

  ```bash
  git clone https://github.com/your-org/hyperf-contract-event-watcher.git
  cd hyperf-contract-event-watcher
  composer install
  cp .env.example .env
  php bin/hyperf.php migrate
  git checkout -b feat/your-feature
  composer test && composer cs-fix
  git commit -m "feat: your feature"
  git push origin feat/your-feature

  Commit 规范

  ┌───────┬──────────┐
  │ 前缀  │   用途   │
  ├───────┼──────────┤
  │ feat: │ 新功能   │
  ├───────┼──────────┤
  │ fix:  │ Bug 修复 │
  ├───────┼──────────┤
  │ perf: │ 性能优化 │
  ├───────┼──────────┤
  │ docs: │ 文档     │
  ├───────┼──────────┤
  │ test: │ 测试     │
  └───────┴──────────┘

  分支策略

  main      ← 稳定版本
  develop   ← 开发主线
  feat/*    ← 新功能
  fix/*     ← Bug 修复

  ---

  ### `CHANGELOG.md`

  ```markdown
  # Changelog

  ## [Unreleased]

  ## [1.0.0] - 2024-01-01

  ### Added
  - eth_getLogs 批量轮询,协程并发扫块
  - 多订阅配置(合约地址 + 事件签名 + 自定义过滤)
  - ERC-20 Transfer/Approval 事件自动解码
  - Webhook 推送 + HMAC-SHA256 签名验证
  - 指数退避重试(最多5次:2/4/8/16/32秒)
  - WebSocket 实时广播
  - 事件去重(txHash + logIndex 唯一索引)
  - 完整推送记录(每次 HTTP 请求都有日志)
  - 订阅管理 REST API
  - GitHub Actions CI/CD

  ---
  README.md

  # hyperf-contract-event-watcher

  > 基于 Hyperf 协程的智能合约事件监听与 Webhook/WebSocket 推送服务

  [![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fgithub.com%2Fyour-org%2Fhyperf-contract-event-watcher%2Factions%2Fworkflows%2Fci.yml%2Fbadge.svg&pos_id=img-wlw4u226-1777464805205)](https://github.c
  om/your-org/hyperf-contract-event-watcher/actions)
  [![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fimg.shields.io%2Fbadge%2Fphp-%253E%253D8.1-blue&pos_id=img-tCK1zEiM-1777464805210)](https://php.net)
  [![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=https%3A%2F%2Fimg.shields.io%2Fbadge%2FLicense-MIT-yellow.svg&pos_id=img-xEP05gUl-1777464805211)](LICENSE)

  ## 这是什么?

  24小时盯着区块链,一旦发现你关心的合约事件,
  立刻通过 Webhook(HTTP 回调)或 WebSocket 推送给你的业务系统。

  ## 特性

  - **协程并发**:批量 getLogs,速度快
  - **多订阅**:同时监听多个合约、多种事件
  - **自动解码**:根据 ABI 把链上密文翻译成可读参数
  - **可靠推送**:指数退避重试,最多5次,失败有记录
  - **HMAC 验签**:接收方可验证消息真实性
  - **WebSocket**:前端实时订阅链上事件
  - **去重保证**:同一事件绝不重复推送
  - **完整审计**:每次推送的 HTTP 状态、耗时、响应都有记录

  ## 快速开始

  ```bash
  git clone https://github.com/your-org/hyperf-contract-event-watcher.git
  cd hyperf-contract-event-watcher
  composer install
  cp .env.example .env   # 填入 ETH_RPC_URL 等配置
  php bin/hyperf.php migrate
  php bin/hyperf.php start

创建订阅

# 监听 USDT 合约的所有 Transfer 事件
curl -X POST http://localhost:9501/api/v1/subscriptions \
  -H "Content-Type: application/json" \
  -d '{
    "name": "usdt-transfer",
    "contract_address": "0xdAC17F958D2ee523a2206206994597C13D831ec7",
    "event_signature_str": "Transfer(address,address,uint256)",
    "event_name": "Transfer",
    "event_abi": [
      {"name":"from",  "type":"address","indexed":true},
      {"name":"to",    "type":"address","indexed":true},
      {"name":"value", "type":"uint256","indexed":false}
    ],
    "push_type": "webhook",
    "webhook_url": "https://your-app.com/webhook/usdt-transfer"
  }'

接收 Webhook

你的业务系统会收到这样的 POST 请求:

{
  "id": 1,
  "event_name": "Transfer",
  "contract_address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
  "transaction_hash": "0xabc123...",
  "block_number": 18000000,
  "decoded_params": {
    "from":  "0xsender...",
    "to":    "0xreceiver...",
    "value": "1000000000"
  }
}

验证签名(PHP 示例):

$secret    = 'your_hmac_secret';
$body      = file_get_contents('php://input');
$received  = $_SERVER['HTTP_X_WATCHER_SIGNATURE'];
$expected  = 'sha256=' . hash_hmac('sha256', $body, $secret);

if (!hash_equals($expected, $received)) {
    http_response_code(401);
    exit('签名验证失败');
}

$event = json_decode($body, true);
// 处理业务逻辑...

WebSocket 实时订阅

const ws = new WebSocket('ws://localhost:9502');

ws.onopen = () => {
  ws.send(JSON.stringify({
    action: 'subscribe',
    contract_address: '0xdAC17F958D2ee523a2206206994597C13D831ec7',
    event_name: 'Transfer'
  }));
};

ws.onmessage = (e) => {
  const event = JSON.parse(e.data);
  console.log('链上事件:', event);
};

架构图

以太坊节点
    │  eth_getLogs(每3秒轮询)
    ▼
EventPollingProcess(常驻进程)
    │
    ▼
EventCaptureService(比对订阅,去重)
    │
    ▼
Redis 异步队列
    │
    ├──→ DeliverWebhookJob(指数退避重试)
    │         └──→ 你的业务系统 HTTP 回调
    │
    └──→ WebSocketPusherService
              └──→ 前端浏览器实时推送

License

MIT


六、开源完整流程

第1步 本地开发完成
composer test && composer cs-fix

第2步 创建 GitHub 仓库
gh repo create your-org/hyperf-contract-event-watcher --public
git push -u origin main

第3步 配置仓库保护
Settings → Branch protection rules
→ main 分支:require PR + CI pass

第4步 发布到 Packagist
packagist.org → Submit → 填 GitHub 地址
GitHub → Settings → Webhooks → 添加 Packagist Hook

第5步 打第一个版本
git tag v1.0.0 && git push origin v1.0.0
→ GitHub Actions 自动创建 Release

第6步 持续维护
├── Issue 分类:bug / enhancement / question
├── 安全问题:私下联系 → 修复 → patch 版本
├── 新功能:develop 分支 → PR → main → 新 tag
└── 每次发版更新 CHANGELOG.md


七、核心价值总结

┌──────────────────────────────────┬─────────────────────────────────────┐
│ 没有它 │ 有了它 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 用户充值,系统不知道,要手动对账 │ Transfer 事件触发 Webhook,自动到账 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 合约被攻击,几小时后才发现 │ 异常事件实时推送,秒级告警 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 前端要轮询 API 查链上状态 │ WebSocket 实时推送,零延迟 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 推送失败数据丢失 │ 指数退避重试 + 完整推送记录 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 重启后重复处理事件 │ txHash+logIndex 唯一索引去重 │
└──────────────────────────────────┴─────────────────────────────────────┘

一句话:它是区块链和业务系统之间的"实时信使",把链上发生的每一件事可靠地传递给你的应用。


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐