hyperf 事件监听与消息推送区块链基础设施
--- 一、这个项目是干什么的?(大白话)
先理解问题 s
区块链上的智能合约每次被调用,都会产生"事件日志"(Event Log)。
比如:
- 有人用 Uniswap 换币 → 合约发出 Swap 事件
- 有人转了一笔 USDT → ERC-20 合约发出 Transfer 事件
- 有人买了一个 NFT → 合约发出 Transfer 事件
问题是: 这些事件只存在链上,你的业务系统根本不知道发生了什么。
你想要的效果:
- 用户充值 USDT 到你的平台地址 → 你的系统立刻知道,自动给用户加余额
- 有人买了你平台的 NFT → 你的系统立刻知道,自动更新订单状态
- 合约被攻击,发出异常事件 → 你的系统立刻报警
这个项目做的事:
▎ 像一个"快递员",24小时盯着区块链,一旦发现你关心的合约事件,立刻通过 Webhook(HTTP 回调) 或 WebSocket
▎ 推送给你的业务系统。
完整数据流(大白话)
区块链(每12秒出一个新块)
│
│ eth_getLogs(轮询拉取事件)
▼
监听进程(常驻,协程并发)
│
│ 解码事件数据
▼
Redis 队列(缓冲,防止推送失败丢数据)
│
├──→ Webhook 推送(HTTP POST 到你的业务系统)
│ └── 失败自动重试(指数退避)
│
├──→ WebSocket 广播(实时推给前端)
│
└──→ 数据库持久化(永久存档)
谁需要它?
┌────────────────┬───────────────────────────────────┐
│ 场景 │ 用法 │
├────────────────┼───────────────────────────────────┤
│ 交易所充值监听 │ 监听 USDT Transfer 事件,自动到账 │
├────────────────┼───────────────────────────────────┤
│ NFT 市场 │ 监听 Transfer/Sale 事件,更新订单 │
├────────────────┼───────────────────────────────────┤
│ DeFi 风控 │ 监听大额 Swap,触发风控告警 │
├────────────────┼───────────────────────────────────┤
│ 链游 │ 监听道具转移事件,更新游戏状态 │
├────────────────┼───────────────────────────────────┤
│ 数据分析平台 │ 收集所有链上事件,存库分析 │
└────────────────┴───────────────────────────────────┘
一句话总结
▎ 它是区块链和你业务系统之间的"信使",把链上发生的事实时翻译成你能处理的 HTTP 请求或 WebSocket 消息。
---
二、技术选型
┌────────────────┬──────────────────────────────┬────────────────────────────────────────────┐
│ 用途 │ 选择 │ 原因 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ 框架 │ hyperf/hyperf 3.x │ 协程并发,常驻进程,适合长轮询 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ 链上事件拉取 │ web3p/web3.php + eth_getLogs │ 最稳定的 PHP Web3 库,getLogs 支持批量拉取 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ 事件解码 │ kornrunner/php-ethereum-util │ Keccak256 计算事件签名 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ 消息队列 │ hyperf/async-queue(Redis) │ 解耦监听和推送,防止推送失败丢事件 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ Webhook 推送 │ hyperf/guzzle(Guzzle HTTP) │ 协程版 HTTP 客户端,并发推送 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ WebSocket 服务 │ hyperf/websocket-server │ 实时推送给前端 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ 数据库 │ hyperf/database MySQL │ 持久化事件和订阅配置 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ 缓存 │ hyperf/cache Redis │ 存储扫描进度、去重 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ 日志 │ hyperf/logger │ 结构化审计日志 │
├────────────────┼──────────────────────────────┼────────────────────────────────────────────┤
│ 测试 │ phpunit/phpunit │ 单元测试 │
└────────────────┴──────────────────────────────┴────────────────────────────────────────────┘
---
三、核心概念大白话
订阅(Subscription)
└── 你告诉系统:"我要监听 0xUSDT合约 的 Transfer 事件"
└── 系统记录下来,开始盯着这个合约
事件签名(Event Signature / Topic0)
└── 每种事件都有唯一的"指纹"
└── Transfer(address,address,uint256) 的指纹是固定的 keccak256 哈希
└── 系统用这个指纹过滤,只拿你关心的事件
Webhook
└── 就是一个 HTTP POST 请求
└── 系统发现事件后,POST 到你指定的 URL
└── 你的业务系统收到请求,处理业务逻辑
指数退避重试(Exponential Backoff)
└── 第1次失败 → 等1秒重试
└── 第2次失败 → 等2秒重试
└── 第3次失败 → 等4秒重试
└── 最多重试5次,还失败就进死信队列,人工处理
去重(Deduplication)
└── 同一个事件(txHash + logIndex)只推送一次
└── 防止重启后重复推送
---
四、项目目录结构
hyperf-contract-event-watcher/
├── .github/
│ ├── workflows/
│ │ ├── ci.yml
│ │ └── release.yml
│ ├── ISSUE_TEMPLATE/
│ │ ├── bug_report.md
│ │ └── feature_request.md
│ └── PULL_REQUEST_TEMPLATE.md
├── app/
│ ├── Constants/
│ │ └── EventStatus.php
│ ├── Contract/
│ │ └── PusherInterface.php
│ ├── Controller/
│ │ ├── SubscriptionController.php # 订阅管理 API
│ │ └── WebSocketController.php # WebSocket 服务
│ ├── Exception/
│ │ └── Handler/
│ │ └── AppExceptionHandler.php
│ ├── Job/
│ │ ├── DeliverWebhookJob.php # Webhook 推送队列任务
│ │ └── ProcessEventJob.php # 事件处理队列任务
│ ├── Model/
│ │ ├── Subscription.php # 订阅配置
│ │ ├── CapturedEvent.php # 已捕获的事件
│ │ └── WebhookDelivery.php # Webhook 推送记录
│ ├── Process/
│ │ └── EventPollingProcess.php # 常驻轮询进程
│ ├── Service/
│ │ ├── Web3Service.php # 链上数据拉取
│ │ ├── EventDecodeService.php # 事件解码
│ │ ├── EventCaptureService.php # 事件捕获核心
│ │ ├── WebhookPusherService.php # Webhook 推送
│ │ └── WebSocketPusherService.php # WebSocket 广播
│ └── Listener/
│ └── QueueFailedListener.php # 队列失败监听
├── config/
│ └── autoload/
│ ├── databases.php
│ ├── redis.php
│ ├── async_queue.php
│ ├── server.php
│ ├── logger.php
│ └── watcher.php
├── database/
│ └── migrations/
│ ├── 2024_01_01_create_subscriptions_table.php
│ ├── 2024_01_02_create_captured_events_table.php
│ └── 2024_01_03_create_webhook_deliveries_table.php
├── test/
│ ├── Cases/
│ │ ├── EventDecodeServiceTest.php
│ │ ├── EventCaptureServiceTest.php
│ │ └── WebhookPusherServiceTest.php
│ └── bootstrap.php
├── .env.example
├── .php-cs-fixer.php
├── composer.json
├── phpunit.xml
├── README.md
├── CHANGELOG.md
├── CONTRIBUTING.md
└── LICENSE
---
五、完整代码
composer.json
{
"name": "your-org/hyperf-contract-event-watcher",
"description": "A high-performance blockchain smart contract event listener and webhook/websocket push service
built on Hyperf",
"type": "project",
"keywords": [
"hyperf", "blockchain", "ethereum", "event-listener",
"webhook", "websocket", "web3", "evm", "smart-contract"
],
"license": "MIT",
"require": {
"php": ">=8.1",
"ext-gmp": "*",
"hyperf/async-queue": "^3.1",
"hyperf/cache": "^3.1",
"hyperf/command": "^3.1",
"hyperf/config": "^3.1",
"hyperf/database": "^3.1",
"hyperf/db-connection": "^3.1",
"hyperf/framework": "^3.1",
"hyperf/guzzle": "^3.1",
"hyperf/http-server": "^3.1",
"hyperf/logger": "^3.1",
"hyperf/process": "^3.1",
"hyperf/redis": "^3.1",
"hyperf/websocket-server": "^3.1",
"kornrunner/php-ethereum-util": "^1.0",
"web3p/web3.php": "^0.1.4"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.0",
"hyperf/testing": "^3.1",
"mockery/mockery": "^1.4",
"phpunit/phpunit": "^10.0"
},
"autoload": {
"psr-4": { "App\\": "app/" }
},
"autoload-dev": {
"psr-4": { "HyperfTest\\": "test/" }
},
"scripts": {
"test": "phpunit --colors=always",
"cs-fix": "php-cs-fixer fix",
"cs-check": "php-cs-fixer fix --dry-run --diff"
},
"minimum-stability": "dev",
"prefer-stable": true
}
---
.env.example
APP_NAME=hyperf-contract-event-watcher
APP_ENV=dev
# 数据库
DB_HOST=127.0.0.1
DB_PORT=3306
DB_DATABASE=event_watcher
DB_USERNAME=root
DB_PASSWORD=
# Redis
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_DB=0
# 以太坊节点
ETH_RPC_URL=https://mainnet.infura.io/v3/YOUR_KEY
ETH_CHAIN_ID=1
# 轮询配置
WATCHER_POLL_INTERVAL=3 # 轮询间隔(秒)
WATCHER_BLOCK_BATCH=50 # 每次扫描多少个块
WATCHER_CONFIRM_BLOCKS=12 # 等待确认块数
WATCHER_START_BLOCK=0 # 0=从最新块开始
# Webhook 推送配置
WEBHOOK_TIMEOUT=10 # 推送超时(秒)
WEBHOOK_MAX_RETRIES=5 # 最大重试次数
WEBHOOK_RETRY_BASE=2 # 指数退避基数(秒)
WEBHOOK_SECRET=your_hmac_secret # HMAC 签名密钥(接收方验证用)
# API 鉴权
WATCHER_API_TOKEN=your_api_token
---
config/autoload/watcher.php
<?php
declare(strict_types=1);
return [
'eth_rpc_url' => env('ETH_RPC_URL', 'http://127.0.0.1:8545'),
'eth_chain_id' => (int) env('ETH_CHAIN_ID', 1),
'poll_interval' => (int) env('WATCHER_POLL_INTERVAL', 3),
'block_batch' => (int) env('WATCHER_BLOCK_BATCH', 50),
'confirm_blocks' => (int) env('WATCHER_CONFIRM_BLOCKS', 12),
'start_block' => (int) env('WATCHER_START_BLOCK', 0),
'webhook_timeout' => (int) env('WEBHOOK_TIMEOUT', 10),
'webhook_max_retry' => (int) env('WEBHOOK_MAX_RETRIES', 5),
'webhook_retry_base'=> (int) env('WEBHOOK_RETRY_BASE', 2),
'webhook_secret' => env('WEBHOOK_SECRET', ''),
'api_token' => env('WATCHER_API_TOKEN', ''),
];
---
config/autoload/async_queue.php
<?php
declare(strict_types=1);
return [
// 主队列:处理事件
'event' => [
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
'redis' => ['pool' => 'default'],
'channel' => 'watcher:queue:event',
'timeout' => 60,
'retry_seconds'=> [1, 5, 10, 20],
'handle_timeout' => 60,
'processes' => 4,
'concurrent' => ['limit' => 20],
],
// Webhook 推送队列
'webhook' => [
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
'redis' => ['pool' => 'default'],
'channel' => 'watcher:queue:webhook',
'timeout' => 30,
'retry_seconds'=> [2, 4, 8, 16, 32], // 指数退避
'handle_timeout' => 30,
'processes' => 8, // 更多 worker 并发推送
'concurrent' => ['limit' => 50],
],
];
---
数据库迁移
database/migrations/2024_01_01_create_subscriptions_table.php
<?php
declare(strict_types=1);
use Hyperf\Database\Migrations\Migration;
use Hyperf\Database\Schema\Blueprint;
use Hyperf\Database\Schema\Schema;
class CreateSubscriptionsTable extends Migration
{
public function up(): void
{
Schema::create('subscriptions', function (Blueprint $table) {
$table->bigIncrements('id');
// 订阅名称,方便识别
$table->string('name', 100)->comment('订阅名称,如 usdt-transfer-monitor');
// 监听的合约地址(小写),NULL 表示监听所有合约
$table->char('contract_address', 42)->nullable()->comment('合约地址');
// 监听的事件签名(topic0),NULL 表示监听所有事件
// 例:Transfer(address,address,uint256) 的 keccak256
$table->char('event_signature', 66)->nullable()->comment('事件签名 topic0');
// 事件名称(可读,如 Transfer)
$table->string('event_name', 100)->nullable()->comment('事件名称');
// ABI 定义(JSON),用于解码事件参数
$table->json('event_abi')->nullable()->comment('事件 ABI');
// 推送方式:webhook / websocket / both
$table->string('push_type', 20)->default('webhook')->comment('推送方式');
// Webhook 目标 URL
$table->string('webhook_url', 500)->nullable()->comment('Webhook 回调地址');
// 额外过滤条件(JSON),如只关心金额大于 1000 的 Transfer
$table->json('filters')->nullable()->comment('额外过滤条件');
// 是否启用
$table->boolean('is_active')->default(true);
$table->timestamps();
$table->index('contract_address');
$table->index('event_signature');
$table->index('is_active');
});
}
public function down(): void
{
Schema::dropIfExists('subscriptions');
}
}
database/migrations/2024_01_02_create_captured_events_table.php
<?php
declare(strict_types=1);
use Hyperf\Database\Migrations\Migration;
use Hyperf\Database\Schema\Blueprint;
use Hyperf\Database\Schema\Schema;
class CreateCapturedEventsTable extends Migration
{
public function up(): void
{
Schema::create('captured_events', function (Blueprint $table) {
$table->bigIncrements('id');
$table->unsignedBigInteger('subscription_id')->comment('触发的订阅 ID');
$table->unsignedBigInteger('block_number')->comment('区块高度');
$table->char('block_hash', 66)->comment('区块哈希');
$table->char('transaction_hash', 66)->comment('交易哈希');
$table->unsignedSmallInteger('log_index')->comment('日志索引');
$table->char('contract_address', 42)->comment('合约地址');
$table->char('event_signature', 66)->comment('事件签名');
$table->string('event_name', 100)->nullable()->comment('事件名称');
$table->json('topics')->comment('原始 topics');
$table->text('data')->nullable()->comment('原始 data');
$table->json('decoded_params')->nullable()->comment('解码后的参数');
// 推送状态:pending / delivered / failed
$table->string('push_status', 20)->default('pending')->comment('推送状态');
$table->unsignedTinyInteger('push_attempts')->default(0)->comment('推送尝试次数');
$table->timestamp('last_push_at')->nullable()->comment('最后推送时间');
$table->timestamp('created_at')->useCurrent();
// 唯一索引:同一事件不重复存储
$table->unique(['transaction_hash', 'log_index'], 'uniq_tx_log');
$table->index('subscription_id');
$table->index('block_number');
$table->index('push_status');
$table->index('contract_address');
});
}
public function down(): void
{
Schema::dropIfExists('captured_events');
}
}
database/migrations/2024_01_03_create_webhook_deliveries_table.php
<?php
declare(strict_types=1);
use Hyperf\Database\Migrations\Migration;
use Hyperf\Database\Schema\Blueprint;
use Hyperf\Database\Schema\Schema;
class CreateWebhookDeliveriesTable extends Migration
{
public function up(): void
{
Schema::create('webhook_deliveries', function (Blueprint $table) {
$table->bigIncrements('id');
$table->unsignedBigInteger('captured_event_id')->comment('关联的事件 ID');
$table->string('webhook_url', 500)->comment('推送目标 URL');
$table->unsignedTinyInteger('attempt')->default(1)->comment('第几次尝试');
$table->unsignedSmallInteger('http_status')->nullable()->comment('HTTP 响应状态码');
$table->text('response_body')->nullable()->comment('响应内容(截断到 1000 字符)');
$table->unsignedSmallInteger('duration_ms')->nullable()->comment('耗时(毫秒)');
$table->boolean('success')->default(false)->comment('是否成功');
$table->string('error_message', 500)->nullable()->comment('错误信息');
$table->timestamp('created_at')->useCurrent();
$table->index('captured_event_id');
$table->index(['success', 'created_at']);
});
}
public function down(): void
{
Schema::dropIfExists('webhook_deliveries');
}
}
---
Model 层
app/Model/Subscription.php
<?php
declare(strict_types=1);
namespace App\Model;
use Hyperf\DbConnection\Model\Model;
class Subscription extends Model
{
protected ?string $table = 'subscriptions';
protected array $fillable = [
'name', 'contract_address', 'event_signature',
'event_name', 'event_abi', 'push_type',
'webhook_url', 'filters', 'is_active',
];
protected array $casts = [
'event_abi' => 'json',
'filters' => 'json',
'is_active' => 'boolean',
];
public function capturedEvents()
{
return $this->hasMany(CapturedEvent::class, 'subscription_id');
}
}
app/Model/CapturedEvent.php
<?php
declare(strict_types=1);
namespace App\Model;
use Hyperf\DbConnection\Model\Model;
class CapturedEvent extends Model
{
public bool $timestamps = false;
protected ?string $table = 'captured_events';
protected array $fillable = [
'subscription_id', 'block_number', 'block_hash',
'transaction_hash', 'log_index', 'contract_address',
'event_signature', 'event_name', 'topics', 'data',
'decoded_params', 'push_status', 'push_attempts', 'last_push_at',
];
protected array $casts = [
'topics' => 'json',
'decoded_params'=> 'json',
'block_number' => 'integer',
'log_index' => 'integer',
'push_attempts' => 'integer',
];
public function subscription()
{
return $this->belongsTo(Subscription::class);
}
public function deliveries()
{
return $this->hasMany(WebhookDelivery::class, 'captured_event_id');
}
}
app/Model/WebhookDelivery.php
<?php
declare(strict_types=1);
namespace App\Model;
use Hyperf\DbConnection\Model\Model;
class WebhookDelivery extends Model
{
public bool $timestamps = false;
protected ?string $table = 'webhook_deliveries';
protected array $fillable = [
'captured_event_id', 'webhook_url', 'attempt',
'http_status', 'response_body', 'duration_ms',
'success', 'error_message',
];
protected array $casts = [
'success' => 'boolean',
'http_status' => 'integer',
'attempt' => 'integer',
'duration_ms' => 'integer',
];
}
---
Service 层(核心)
app/Service/Web3Service.php
<?php
declare(strict_types=1);
namespace App\Service;
use Hyperf\Contract\ConfigInterface;
use Web3\Web3;
/**
* 链上数据拉取服务
* 大白话:负责和以太坊节点"打电话",拉取事件日志
*/
class Web3Service
{
private Web3 $web3;
public function __construct(private ConfigInterface $config)
{
$this->web3 = new Web3($this->config->get('watcher.eth_rpc_url'));
}
/**
* 获取最新区块高度
*/
public function getLatestBlockNumber(): int
{
$result = null;
$this->web3->eth->blockNumber(function ($err, $res) use (&$result) {
if ($err) {
throw new \RuntimeException('获取最新块失败: ' . $err->getMessage());
}
$result = hexdec($res->toString());
});
return (int) $result;
}
/**
* 批量拉取指定范围内的事件日志(eth_getLogs)
*
* 大白话:告诉节点"给我第 100 到 150 块里,
* 这些合约地址发出的这些事件",节点一次性返回所有匹配的日志
*
* @param int $fromBlock 起始块
* @param int $toBlock 结束块
* @param array $addresses 合约地址列表(空=不过滤)
* @param array $topics 事件签名列表(空=不过滤)
*/
public function getLogs(
int $fromBlock,
int $toBlock,
array $addresses = [],
array $topics = []
): array {
$filter = [
'fromBlock' => '0x' . dechex($fromBlock),
'toBlock' => '0x' . dechex($toBlock),
];
if (! empty($addresses)) {
$filter['address'] = $addresses;
}
if (! empty($topics)) {
// topics[0] 是事件签名,用 OR 逻辑匹配多个事件
$filter['topics'] = [array_values($topics)];
}
$logs = [];
$this->web3->eth->getLogs($filter, function ($err, $result) use (&$logs) {
if ($err) {
throw new \RuntimeException('getLogs 失败: ' . $err->getMessage());
}
$logs = (array) $result;
});
return $logs;
}
}
---
app/Service/EventDecodeService.php
<?php
declare(strict_types=1);
namespace App\Service;
use App\Model\Subscription;
/**
* 事件解码服务
* 大白话:把链上的"密文日志"翻译成人能看懂的参数
*
* 链上日志长这样:
* topics[0] = 0xddf252ad...(Transfer 的指纹)
* topics[1] = 0x000...from地址
* topics[2] = 0x000...to地址
* data = 0x000...金额(十六进制)
*
* 解码后:
* { "from": "0xabc...", "to": "0xdef...", "value": "1000000000000000000" }
*/
class EventDecodeService
{
/**
* 计算事件签名的 keccak256(topic0)
* 例:Transfer(address,address,uint256) → 0xddf252ad...
*/
public function computeEventSignature(string $eventSignatureStr): string
{
return '0x' . \kornrunner\Keccak::hash($eventSignatureStr, 256);
}
/**
* 根据 ABI 解码事件参数
*/
public function decode(array $log, ?array $abi): ?array
{
if (empty($abi)) {
return null;
}
$decoded = [];
$topics = array_map('strval', (array) ($log['topics'] ?? []));
$topicIndex = 1; // topics[0] 是事件签名,从 [1] 开始
$nonIndexedTypes = [];
foreach ($abi as $param) {
if ($param['indexed'] ?? false) {
$raw = $topics[$topicIndex++] ?? null;
$decoded[$param['name']] = $raw
? $this->decodeParam($param['type'], ltrim($raw, '0x'))
: null;
} else {
$nonIndexedTypes[] = $param;
}
}
// 解码 data 字段(非 indexed 参数,按 32 字节对齐)
$data = ltrim((string) ($log['data'] ?? ''), '0x');
foreach ($nonIndexedTypes as $i => $param) {
$chunk = substr($data, $i * 64, 64);
if ($chunk) {
$decoded[$param['name']] = $this->decodeParam($param['type'], $chunk);
}
}
return $decoded;
}
/**
* 根据订阅配置过滤事件是否匹配
* 大白话:检查这条日志是不是你关心的那种
*/
public function matchesSubscription(array $log, Subscription $sub): bool
{
$topics = array_map('strval', (array) ($log['topics'] ?? []));
// 检查合约地址
if ($sub->contract_address) {
$logAddr = strtolower((string) ($log['address'] ?? ''));
if ($logAddr !== strtolower($sub->contract_address)) {
return false;
}
}
// 检查事件签名(topic0)
if ($sub->event_signature) {
if (($topics[0] ?? '') !== $sub->event_signature) {
return false;
}
}
// 检查自定义过滤条件
if (! empty($sub->filters)) {
return $this->applyFilters($log, $sub->filters);
private─function applyFilters(array $log,─array $filters):─bool───────────────────────────────────────────────────
{ // 支持按 topic1/topic2 过滤(如只关心转给特定地址的事件)
foreach ($filters as $key => $value) { if (str_starts_with($key, 'topic')) {
$index = (int) substr($key, 5);
$topics = array_map('strval', (array) ($log['topics'] ?? []));
$actual = strtolower($topics[$index] ?? '');
// topic 里地址是 32 字节,取后 40 位
$expected = strtolower('0x' . str_pad(ltrim($value, '0x'), 64, '0', STR_PAD_LEFT));
if ($actual !== $expected) {
return false;
}
}
}
return true;
}
private function decodeParam(string $type, string $hex): mixed
{
return match (true) {
$type === 'address' => '0x' . substr($hex, -40),
str_starts_with($type, 'uint') => base_convert($hex, 16, 10),
str_starts_with($type, 'int') => base_convert($hex, 16, 10),
$type === 'bool' => $hex !== str_repeat('0', 64),
default => '0x' . $hex,
};
}
}
---
app/Service/EventCaptureService.php
<?php
declare(strict_types=1);
namespace App\Service;
use App\Job\ProcessEventJob;
use App\Model\CapturedEvent;
use App\Model\Subscription;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\Cache\Cache;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Logger\LoggerFactory;
use Psr\Log\LoggerInterface;
/**
* 事件捕获核心服务
* 大白话:这是"侦探",负责扫描区块、比对订阅、发现目标事件
*/
class EventCaptureService
{
private LoggerInterface $logger;
private int $blockBatch;
private int $confirmBlocks;
public function __construct(
private Web3Service $web3,
private EventDecodeService $decoder,
private DriverFactory $queue,
private Cache $cache,
private ConfigInterface $config,
LoggerFactory $loggerFactory
) {
$this->logger = $loggerFactory->get('watcher');
$this->blockBatch = $this->config->get('watcher.block_batch', 50);
$this->confirmBlocks = $this->config->get('watcher.confirm_blocks', 12);
}
/**
* 主扫描入口:找出未扫描的块范围,批量拉取日志
*/
public function scan(): void
{
$latestOnChain = $this->web3->getLatestBlockNumber();
$safeBlock = $latestOnChain - $this->confirmBlocks;
$lastScanned = (int) $this->cache->get('watcher:last_scanned_block', $this->getStartBlock() - 1);
if ($lastScanned >= $safeBlock) {
return; // 已是最新,无需扫描
}
// 加载所有启用的订阅
$subscriptions = Subscription::where('is_active', true)->get();
if ($subscriptions->isEmpty()) {
return;
}
// 收集所有订阅关心的合约地址和事件签名
$addresses = $subscriptions->pluck('contract_address')->filter()->unique()->values()->toArray();
$signatures = $subscriptions->pluck('event_signature')->filter()->unique()->values()->toArray();
// 分批扫描,每批最多 blockBatch 个块
$from = $lastScanned + 1;
while ($from <= $safeBlock) {
$to = min($from + $this->blockBatch - 1, $safeBlock);
try {
$this->scanRange($from, $to, $addresses, $signatures, $subscriptions->all());
$this->cache->set('watcher:last_scanned_block', $to, 86400);
$this->logger->info('扫描完成', ['from' => $from, 'to' => $to]);
} catch (\Throwable $e) {
$this->logger->error('扫描失败', [
'from' => $from,
'to' => $to,
'error' => $e->getMessage(),
]);
break; // 失败停止,等下次轮询重试
}
$from = $to + 1;
}
}
/**
* 扫描指定块范围
*/
private function scanRange(
int $from,
int $to,
array $addresses,
array $signatures,
array $subscriptions
): void {
$logs = $this->web3->getLogs($from, $to, $addresses, $signatures);
if (empty($logs)) {
return;
}
$this->logger->info('发现日志', ['from' => $from, 'to' => $to, 'count' => count($logs)]);
foreach ($logs as $log) {
$log = (array) $log;
foreach ($subscriptions as $sub) {
if ($this->decoder->matchesSubscription($log, $sub)) {
$this->captureEvent($log, $sub);
}
}
}
}
/**
* 捕获单条事件,存库并推入处理队列
*/
private function captureEvent(array $log, Subscription $sub): void
{
$txHash = (string) ($log['transactionHash'] ?? '');
$logIndex = hexdec((string) ($log['logIndex'] ?? '0x0'));
// 去重:同一事件只处理一次
$dedupeKey = "watcher:dedup:{$txHash}:{$logIndex}:{$sub->id}";
if ($this->cache->has($dedupeKey)) {
return;
}
$this->cache->set($dedupeKey, 1, 3600);
$topics = array_map('strval', (array) ($log['topics'] ?? []));
$decoded = $this->decoder->decode($log, $sub->event_abi);
try {
$event = CapturedEvent::create([
'subscription_id' => $sub->id,
'block_number' => hexdec((string) ($log['blockNumber'] ?? '0x0')),
'block_hash' => (string) ($log['blockHash'] ?? ''),
'transaction_hash' => $txHash,
'log_index' => $logIndex,
'contract_address' => strtolower((string) ($log['address'] ?? '')),
'event_signature' => $topics[0] ?? '',
'event_name' => $sub->event_name,
'topics' => $topics,
'data' => (string) ($log['data'] ?? ''),
'decoded_params' => $decoded,
'push_status' => 'pending',
]);
// 推入处理队列(解耦捕获和推送)
$this->queue->get('event')->push(new ProcessEventJob($event->id));
} catch (\Illuminate\Database\QueryException $e) {
// 唯一索引冲突 = 重复事件,忽略
if (str_contains($e->getMessage(), 'Duplicate entry')) {
return;
}
throw $e;
}
}
private function getStartBlock(): int
{
$configured = $this->config->get('watcher.start_block', 0);
return $configured > 0 ? $configured : $this->web3->getLatestBlockNumber();
}
}
---
app/Service/WebhookPusherService.php
<?php
declare(strict_types=1);
namespace App\Service;
use App\Model\CapturedEvent;
use App\Model\WebhookDelivery;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\RequestException;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Guzzle\ClientFactory;
use Hyperf\Logger\LoggerFactory;
use Psr\Log\LoggerInterface;
/**
* Webhook 推送服务
* 大白话:把捕获到的事件 POST 到业务系统的回调地址
* 带 HMAC 签名,让接收方验证消息真实性
* 失败自动重试,最终失败记录死信
*/
class WebhookPusherService
{
private Client $http;
private LoggerInterface $logger;
private string $secret;
private int $timeout;
public function __construct(
ClientFactory $clientFactory,
private ConfigInterface $config,
LoggerFactory $loggerFactory
) {
$this->timeout = $this->config->get('watcher.webhook_timeout', 10);
$this->secret = $this->config->get('watcher.webhook_secret', '');
$this->logger = $loggerFactory->get('watcher');
// 协程版 Guzzle 客户端
$this->http = $clientFactory->create([
'timeout' => $this->timeout,
'connect_timeout' => 5,
]);
}
/**
* 推送事件到 Webhook URL
*/
public function push(CapturedEvent $event, string $webhookUrl, int $attempt = 1): bool
{
$payload = $this->buildPayload($event);
$body = json_encode($payload);
$signature = $this->sign($body);
$startTime = microtime(true);
try {
$response = $this->http->post($webhookUrl, [
'body' => $body,
'headers' => [
'Content-Type' => 'application/json',
'X-Watcher-Signature' => $signature,
'X-Watcher-Event' => $event->event_name ?? 'unknown',
'X-Watcher-Delivery-Id' => (string) $event->id,
'X-Watcher-Timestamp' => (string) time(),
],
]);
$durationMs = (int) ((microtime(true) - $startTime) * 1000);
$statusCode = $response->getStatusCode();
$success = $statusCode >= 200 && $statusCode < 300;
WebhookDelivery::create([
'captured_event_id' => $event->id,
'webhook_url' => $webhookUrl,
'attempt' => $attempt,
'http_status' => $statusCode,
'response_body' => substr((string) $response->getBody(), 0, 1000),
'duration_ms' => $durationMs,
'success' => $success,
]);
if ($success) {
$event->push_status = 'delivered';
$event->push_attempts = $attempt;
$event->last_push_at = date('Y-m-d H:i:s');
$event->save();
$this->logger->info('Webhook 推送成功', [
'event_id' => $event->id,
'url' => $webhookUrl,
'status' => $statusCode,
'attempt' => $attempt,
'duration' => $durationMs . 'ms',
]);
}
return $success;
} catch (RequestException $e) {
$durationMs = (int) ((microtime(true) - $startTime) * 1000);
$statusCode = $e->hasResponse() ? $e->getResponse()->getStatusCode() : null;
WebhookDelivery::create([
'captured_event_id' => $event->id,
'webhook_url' => $webhookUrl,
'attempt' => $attempt,
'http_status' => $statusCode,
'duration_ms' => $durationMs,
'success' => false,
'error_message' => $e->getMessage(),
]);
$event->push_status = 'failed';
$event->push_attempts = $attempt;
$event->last_push_at = date('Y-m-d H:i:s');
$event->save();
$this->logger->warning('Webhook 推送失败', [
'event_id' => $event->id,
'url' => $webhookUrl,
'attempt' => $attempt,
'error' => $e->getMessage(),
]);
return false;
}
}
/**
* 构建推送 payload
* 大白话:把数据库里的事件记录整理成 JSON 格式发出去
*/
private function buildPayload(CapturedEvent $event): array
{
return [
'id' => $event->id,
'subscription_id' => $event->subscription_id,
'event_name' => $event->event_name,
'event_signature' => $event->event_signature,
'contract_address' => $event->contract_address,
'block_number' => $event->block_number,
'block_hash' => $event->block_hash,
'transaction_hash' => $event->transaction_hash,
'log_index' => $event->log_index,
'topics' => $event->topics,
'data' => $event->data,
'decoded_params' => $event->decoded_params,
'captured_at' => $event->created_at,
];
}
/**
* HMAC-SHA256 签名
* 大白话:给消息盖个"防伪章",接收方用同一个密钥验证
*
* 接收方验证方式:
* $expected = hash_hmac('sha256', $rawBody, $secret);
* if (!hash_equals($expected, $receivedSignature)) { 拒绝 }
*/
private function sign(string $body): string
{
if (! $this->secret) {
return '';
}
return 'sha256=' . hash_hmac('sha256', $body, $this->secret);
}
}
---
app/Service/WebSocketPusherService.php
<?php
declare(strict_types=1);
namespace App\Service;
use App\Model\CapturedEvent;
use Hyperf\WebSocketServer\Sender;
/**
* WebSocket 广播服务
* 大白话:把事件实时推给所有连接的前端客户端
* 前端可以订阅特定合约或事件,实时看到链上动态
*/
class WebSocketPusherService
{
// 存储 fd(连接ID)→ 订阅过滤条件 的映射
// 大白话:记录每个前端连接关心哪些事件
private static array $connections = [];
public function __construct(private Sender $sender)
{
}
/**
* 注册 WebSocket 连接的订阅过滤
*/
public static function subscribe(int $fd, array $filters): void
{
self::$connections[$fd] = $filters;
}
/**
* 移除连接
*/
public static function unsubscribe(int $fd): void
{
unset(self::$connections[$fd]);
}
/**
* 广播事件给所有匹配的连接
*/
public function broadcast(CapturedEvent $event): void
{
$payload = json_encode([
'type' => 'contract_event',
'event_name' => $event->event_name,
'contract_address' => $event->contract_address,
'transaction_hash' => $event->transaction_hash,
'block_number' => $event->block_number,
'decoded_params' => $event->decoded_params,
'captured_at' => $event->created_at,
]);
foreach (self::$connections as $fd => $filters) {
if ($this->matchesFilters($event, $filters)) {
try {
$this->sender->push($fd, $payload);
} catch (\Throwable) {
// 连接已断开,清理
self::unsubscribe($fd);
}
}
}
}
private function matchesFilters(CapturedEvent $event, array $filters): bool
{
if (! empty($filters['contract_address'])) {
if (strtolower($event->contract_address) !== strtolower($filters['contract_address'])) {
return false;
}
}
if (! empty($filters['event_name'])) {
if ($event->event_name !== $filters['event_name']) {
return false;
}
}
return true;
}
}
---
Job 层
app/Job/ProcessEventJob.php
<?php
declare(strict_types=1);
namespace App\Job;
use App\Model\CapturedEvent;
use App\Model\Subscription;
use App\Service\WebhookPusherService;
use App\Service\WebSocketPusherService;
use Hyperf\AsyncQueue\Job;
/**
* 事件处理队列任务
* 大白话:从队列里取出"待处理事件",决定用 Webhook 还是 WebSocket 推送
*/
class ProcessEventJob extends Job
{
public int $maxAttempts = 1; // 本任务不重试,推送失败由 DeliverWebhookJob 重试
public function __construct(public readonly int $eventId)
{
}
public function handle(): void
{
$event = CapturedEvent::find($this->eventId);
if (! $event) {
return;
}
$sub = Subscription::find($event->subscription_id);
if (! $sub) {
return;
}
$container = \Hyperf\Context\ApplicationContext::getContainer();
// WebSocket 广播(实时推给前端)
if (in_array($sub->push_type, ['websocket', 'both'])) {
$container->get(WebSocketPusherService::class)->broadcast($event);
}
// Webhook 推送(推给业务系统)
if (in_array($sub->push_type, ['webhook', 'both']) && $sub->webhook_url) {
$container->get(\Hyperf\AsyncQueue\Driver\DriverFactory::class)
->get('webhook')
->push(new DeliverWebhookJob($event->id, $sub->webhook_url));
}
}
}
app/Job/DeliverWebhookJob.php
<?php
declare(strict_types=1);
namespace App\Job;
use App\Model\CapturedEvent;
use App\Service\WebhookPusherService;
use Hyperf\AsyncQueue\Job;
/**
* Webhook 推送队列任务
* 大白话:专门负责推送 HTTP 请求,失败自动重试(指数退避)
* retry_seconds 配置在 async_queue.php 里:[2, 4, 8, 16, 32]
*/
class DeliverWebhookJob extends Job
{
public int $maxAttempts = 5;
public function __construct(
public readonly int $eventId,
public readonly string $webhookUrl,
public readonly int $attempt = 1
) {
}
public function handle(): void
{
$event = CapturedEvent::find($this->eventId);
if (! $event || $event->push_status === 'delivered') {
return; // 已成功推送,跳过
}
$pusher = \Hyperf\Context\ApplicationContext::getContainer()
->get(WebhookPusherService::class);
$success = $pusher->push($event, $this->webhookUrl, $this->attempt);
if (! $success) {
// 抛出异常触发队列重试机制
throw new \RuntimeException(
"Webhook 推送失败,event_id={$this->eventId},attempt={$this->attempt}"
);
}
}
}
---
常驻进程
app/Process/EventPollingProcess.php
<?php
declare(strict_types=1);
namespace App\Process;
use App\Service\EventCaptureService;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\Annotation\Process;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
/**
* 事件轮询常驻进程
* 大白话:这是"守夜人",永远不停地盯着区块链
* 每隔 N 秒扫一次,发现新事件就推入队列
*/
#[Process(name: 'event-polling')]
class EventPollingProcess extends AbstractProcess
{
public string $name = 'event-polling';
public int $restartInterval = 3;
public function __construct(
ContainerInterface $container,
private EventCaptureService $captureService,
private LoggerInterface $logger
) {
parent::__construct($container);
}
public function handle(): void
{
$interval = (int) \Hyperf\Support\env('WATCHER_POLL_INTERVAL', 3);
$this->logger->info('事件轮询进程启动');
while (true) {
try {
$this->captureService->scan();
} catch (\Throwable $e) {
$this->logger->error('轮询异常', [
'message' => $e->getMessage(),
'file' => $e->getFile() . ':' . $e->getLine(),
]);
}
\Swoole\Coroutine::sleep($interval);
}
}
}
---
Controller 层
app/Controller/SubscriptionController.php
<?php
declare(strict_types=1);
namespace App\Controller;
use App\Model\Subscription;
use App\Service\EventDecodeService;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\DeleteMapping;
use Hyperf\HttpServer\Annotation\GetMapping;
use Hyperf\HttpServer\Annotation\PostMapping;
use Hyperf\HttpServer\Annotation\PutMapping;
use Hyperf\HttpServer\Contract\RequestInterface;
/**
* 订阅管理 API
* 大白话:让业务系统告诉我"我要监听哪个合约的哪个事件,推到哪个地址"
*/
#[Controller(prefix: '/api/v1/subscriptions')]
class SubscriptionController
{
public function __construct(private EventDecodeService $decoder)
{
}
/** 列出所有订阅 */
#[GetMapping(path: '')]
public function index(): array
{
return [
'code' => 0,
'data' => Subscription::orderByDesc('id')->get(),
];
}
/**
* 创建订阅
*
* POST /api/v1/subscriptions
* {
* "name": "usdt-transfer-monitor",
* "contract_address": "0xdAC17F958D2ee523a2206206994597C13D831ec7",
* "event_signature_str": "Transfer(address,address,uint256)",
* "event_name": "Transfer",
* "event_abi": [
* {"name":"from", "type":"address","indexed":true},
* {"name":"to", "type":"address","indexed":true},
* {"name":"value", "type":"uint256","indexed":false}
* ],
* "push_type": "webhook",
* "webhook_url": "https://your-app.com/webhook/usdt",
* "filters": {"topic2": "0xYourAddress"}
* }
*/
#[PostMapping(path: '')]
public function store(RequestInterface $request): array
{
$sigStr = $request->input('event_signature_str');
$signature = $sigStr
? $this->decoder->computeEventSignature($sigStr)
: $request->input('event_signature');
$sub = Subscription::create([
'name' => $request->input('name'),
'contract_address' => $request->input('contract_address')
? strtolower($request->input('contract_address'))
: null,
'event_signature' => $signature,
'event_name' => $request->input('event_name'),
'event_abi' => $request->input('event_abi'),
'push_type' => $request->input('push_type', 'webhook'),
'webhook_url' => $request->input('webhook_url'),
'filters' => $request->input('filters'),
'is_active' => true,
]);
return ['code' => 0, 'data' => $sub];
}
/** 查看单个订阅及最近事件 */
#[GetMapping(path: '/{id}')]
public function show(int $id): array
{
$sub = Subscription::with(['capturedEvents' => function ($q) {
$q->orderByDesc('id')->limit(20);
}])->findOrFail($id);
return ['code' => 0, 'data' => $sub];
}
/** 启用/禁用订阅 */
#[PutMapping(path: '/{id}/toggle')]
public function toggle(int $id): array
{
$sub = Subscription::findOrFail($id);
$sub->is_active = ! $sub->is_active;
$sub->save();
return ['code' => 0, 'data' => ['is_active' => $sub->is_active]];
}
/** 删除订阅 */
#[DeleteMapping(path: '/{id}')]
public function destroy(int $id): array
{
Subscription::findOrFail($id)->delete();
return ['code' => 0, 'message' => '已删除'];
}
}
app/Controller/WebSocketController.php
<?php
declare(strict_types=1);
namespace App\Controller;
use App\Service\WebSocketPusherService;
use Hyperf\WebSocketServer\Annotation\OnClose;
use Hyperf\WebSocketServer\Annotation\OnMessage;
use Hyperf\WebSocketServer\Annotation\OnOpen;
use Hyperf\WebSocketServer\Context;
use Swoole\Http\Request;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
/**
* WebSocket 服务控制器
* 大白话:前端连上来后,发送订阅消息,服务端实时推送匹配的链上事件
*
* 前端使用示例:
* const ws = new WebSocket('ws://localhost:9502');
* ws.onopen = () => ws.send(JSON.stringify({
* action: 'subscribe',
* contract_address: '0xUSDT...',
* event_name: 'Transfer'
* }));
* ws.onmessage = (e) => console.log(JSON.parse(e.data));
*/
class WebSocketController
{
#[OnOpen]
public function onOpen(Server $server, Request $request): void
{
$server->push($request->fd, json_encode([
'type' => 'connected',
'message' => '已连接到 hyperf-contract-event-watcher',
]));
}
#[OnMessage]
public function onMessage(Server $server, Frame $frame): void
{
$data = json_decode($frame->data, true);
if (! $data) {
return;
}
$action = $data['action'] ?? '';
if ($action === 'subscribe') {
WebSocketPusherService::subscribe($frame->fd, [
'contract_address' => $data['contract_address'] ?? null,
'event_name' => $data['event_name'] ?? null,
]);
$server->push($frame->fd, json_encode([
'type' => 'subscribed',
'filters' => $data,
]));
}
if ($action === 'unsubscribe') {
WebSocketPusherService::unsubscribe($frame->fd);
$server->push($frame->fd, json_encode(['type' => 'unsubscribed']));
}
if ($action === 'ping') {
$server->push($frame->fd, json_encode(['type' => 'pong']));
}
}
#[OnClose]
public function onClose(Server $server, int $fd): void
{
WebSocketPusherService::unsubscribe($fd);
}
}
---
测试
test/Cases/EventDecodeServiceTest.php
<?php
declare(strict_types=1);
namespace HyperfTest\Cases;
use App\Service\EventDecodeService;
use PHPUnit\Framework\TestCase;
class EventDecodeServiceTest extends TestCase
{
private EventDecodeService $decoder;
protected function setUp(): void
{
$this->decoder = new EventDecodeService();
}
public function testComputeTransferEventSignature(): void
{
$sig = $this->decoder->computeEventSignature('Transfer(address,address,uint256)');
// 已知的 ERC-20 Transfer 事件签名
$this->assertEquals(
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
$sig
);
}
public function testDecodeTransferEvent(): void
{ $abi = [
['name' => 'from', 'type' => 'address', 'indexed' => true], ['name' => 'to', 'type' => 'address', 'indexed' => true],
['name' => 'value', 'type' => 'uint256', 'indexed' => false],
];
// 模拟真实的链上日志结构
$log = [
'topics' => [
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
'0x000000000000000000000000ab5801a7d398351b8be11c439e05c5b3259aec9b', // from
'0x000000000000000000000000ca35b7d915458ef540ade6068dfe2f44e8fa733c', // to
],
// value = 1000000000000000000 (1 ETH in wei)
'data' => '0x0000000000000000000000000000000000000000000000000de0b6b3a7640000',
];
$decoded = $this->decoder->decode($log, $abi);
$this->assertNotNull($decoded);
$this->assertEquals('0xab5801a7d398351b8be11c439e05c5b3259aec9b', $decoded['from']);
$this->assertEquals('0xca35b7d915458ef540ade6068dfe2f44e8fa733c', $decoded['to']);
$this->assertEquals('1000000000000000000', $decoded['value']);
}
public function testDecodeReturnsNullWithNoAbi(): void
{
$log = ['topics' => [], 'data' => '0x'];
$this->assertNull($this->decoder->decode($log, null));
$this->assertNull($this->decoder->decode($log, []));
}
public function testMatchesSubscriptionByAddress(): void
{
$sub = new \App\Model\Subscription();
$sub->contract_address = '0xdac17f958d2ee523a2206206994597c13d831ec7';
$sub->event_signature = null;
$sub->filters = null;
$matchingLog = ['address' => '0xdAC17F958D2ee523a2206206994597C13D831ec7', 'topics' => []];
$this->assertTrue($this->decoder->matchesSubscription($matchingLog, $sub));
$nonMatchingLog = ['address' => '0x0000000000000000000000000000000000000001', 'topics' => []];
$this->assertFalse($this->decoder->matchesSubscription($nonMatchingLog, $sub));
}
public function testMatchesSubscriptionByEventSignature(): void
{
$sub = new \App\Model\Subscription();
$sub->contract_address = null;
$sub->event_signature = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef';
$sub->filters = null;
$matchingLog = [
'address' => '0xany',
'topics' => ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],
];
$this->assertTrue($this->decoder->matchesSubscription($matchingLog, $sub));
$nonMatchingLog = ['address' => '0xany', 'topics' => ['0xdeadbeef']];
$this->assertFalse($this->decoder->matchesSubscription($nonMatchingLog, $sub));
}
}
---
test/Cases/EventCaptureServiceTest.php
<?php
declare(strict_types=1);
namespace HyperfTest\Cases;
use App\Service\EventCaptureService;
use App\Service\EventDecodeService;
use App\Service\Web3Service;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\Cache\Cache;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Logger\LoggerFactory;
use Mockery;
use PHPUnit\Framework\TestCase;
class EventCaptureServiceTest extends TestCase
{
protected function tearDown(): void
{
Mockery::close();
}
public function testScanSkipsWhenNoSubscriptions(): void
{
$web3 = Mockery::mock(Web3Service::class);
$web3->shouldReceive('getLatestBlockNumber')->andReturn(1000);
// 没有订阅时不应调用 getLogs
$web3->shouldNotReceive('getLogs');
$decoder = new EventDecodeService();
$queue = Mockery::mock(DriverFactory::class);
$queue->shouldNotReceive('get');
$cache = Mockery::mock(Cache::class);
$cache->shouldReceive('get')
->with('watcher:last_scanned_block', Mockery::any())
->andReturn(0);
$cache->shouldReceive('set');
$config = Mockery::mock(ConfigInterface::class);
$config->shouldReceive('get')->with('watcher.block_batch', 50)->andReturn(50);
$config->shouldReceive('get')->with('watcher.confirm_blocks', 12)->andReturn(12);
$config->shouldReceive('get')->with('watcher.start_block', 0)->andReturn(0);
$logger = Mockery::mock(\Psr\Log\LoggerInterface::class);
$logger->shouldReceive('info', 'error', 'warning');
$loggerFactory = Mockery::mock(LoggerFactory::class);
$loggerFactory->shouldReceive('get')->andReturn($logger);
// 模拟没有订阅
Mockery::mock('alias:App\Model\Subscription')
->shouldReceive('where->get')
->andReturn(new \Hyperf\Database\Model\Collection([]));
$service = new EventCaptureService($web3, $decoder, $queue, $cache, $config, $loggerFactory);
$service->scan();
$this->assertTrue(true); // 不抛异常即通过
}
public function testScanSkipsWhenAlreadyUpToDate(): void
{
$web3 = Mockery::mock(Web3Service::class);
// 最新块 1000,安全块 988,last_scanned 也是 988
$web3->shouldReceive('getLatestBlockNumber')->andReturn(1000);
$web3->shouldNotReceive('getLogs');
$decoder = new EventDecodeService();
$queue = Mockery::mock(DriverFactory::class);
$loggerFactory = Mockery::mock(LoggerFactory::class);
$loggerFactory->shouldReceive('get')->andReturn(
Mockery::mock(\Psr\Log\LoggerInterface::class)->shouldIgnoreMissing()
);
$cache = Mockery::mock(Cache::class);
$cache->shouldReceive('get')
->with('watcher:last_scanned_block', Mockery::any())
->andReturn(988); // 已是安全块
$config = Mockery::mock(ConfigInterface::class);
$config->shouldReceive('get')->with('watcher.block_batch', 50)->andReturn(50);
$config->shouldReceive('get')->with('watcher.confirm_blocks', 12)->andReturn(12);
$config->shouldReceive('get')->with('watcher.start_block', 0)->andReturn(0);
Mockery::mock('alias:App\Model\Subscription')
->shouldReceive('where->get')
->andReturn(new \Hyperf\Database\Model\Collection([
tap(new \App\Model\Subscription(), function ($s) {
$s->id = 1;
$s->contract_address = '0xabc';
$s->event_signature = '0xdef';
$s->is_active = true;
}),
]));
$service = new EventCaptureService($web3, $decoder, $queue, $cache, $config, $loggerFactory);
$service->scan();
$this->assertTrue(true);
}
}
---
test/Cases/WebhookPusherServiceTest.php
<?php
declare(strict_types=1);
namespace HyperfTest\Cases;
use App\Model\CapturedEvent;
use App\Service\WebhookPusherService;
use GuzzleHttp\Client;
use GuzzleHttp\Handler\MockHandler;
use GuzzleHttp\HandlerStack;
use GuzzleHttp\Psr7\Response;
use GuzzleHttp\Exception\ConnectException;
use GuzzleHttp\Psr7\Request;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Guzzle\ClientFactory;
use Hyperf\Logger\LoggerFactory;
use Mockery;
use PHPUnit\Framework\TestCase;
class WebhookPusherServiceTest extends TestCase
{
protected function tearDown(): void
{
Mockery::close();
}
private function makeEvent(): CapturedEvent
{
$event = new CapturedEvent();
$event->id = 1;
$event->subscription_id = 1;
$event->event_name = 'Transfer';
$event->event_signature = '0xddf252ad...';
$event->contract_address = '0xusdt...';
$event->block_number = 18000000;
$event->block_hash = '0xblockhash';
$event->transaction_hash = '0xtxhash';
$event->log_index = 0;
$event->topics = [];
$event->data = '0x';
$event->decoded_params = ['from' => '0xabc', 'to' => '0xdef', 'value' => '1000'];
$event->created_at = '2024-01-01 00:00:00';
return $event;
}
private function makeService(array $responses): WebhookPusherService
{
$mock = new MockHandler($responses);
$handler = HandlerStack::create($mock);
$client = new Client(['handler' => $handler]);
$clientFactory = Mockery::mock(ClientFactory::class);
$clientFactory->shouldReceive('create')->andReturn($client);
$config = Mockery::mock(ConfigInterface::class);
$config->shouldReceive('get')->with('watcher.webhook_timeout', 10)->andReturn(10);
$config->shouldReceive('get')->with('watcher.webhook_secret', '')->andReturn('test_secret');
$logger = Mockery::mock(\Psr\Log\LoggerInterface::class)->shouldIgnoreMissing();
$loggerFactory = Mockery::mock(LoggerFactory::class);
$loggerFactory->shouldReceive('get')->andReturn($logger);
Mockery::mock('alias:App\Model\WebhookDelivery')->shouldReceive('create');
return new WebhookPusherService($clientFactory, $config, $loggerFactory);
}
public function testPushSuccessOn200(): void
{
$service = $this->makeService([new Response(200, [], '{"ok":true}')]);
$event = $this->makeEvent();
// 模拟 save()
$event = Mockery::mock($event)->makePartial();
$event->shouldReceive('save');
$result = $service->push($event, 'https://example.com/webhook');
$this->assertTrue($result);
}
public function testPushFailsOn500(): void
{
$service = $this->makeService([new Response(500, [], 'Internal Server Error')]);
$event = $this->makeEvent();
$event = Mockery::mock($event)->makePartial();
$event->shouldReceive('save');
$result = $service->push($event, 'https://example.com/webhook');
$this->assertFalse($result);
}
public function testPushFailsOnConnectionError(): void
{
$service = $this->makeService([
new ConnectException('Connection refused', new Request('POST', 'test')),
]);
$event = $this->makeEvent();
$event = Mockery::mock($event)->makePartial();
$event->shouldReceive('save');
$result = $service->push($event, 'https://unreachable.example.com/webhook');
$this->assertFalse($result);
}
public function testHmacSignatureIsIncluded(): void
{
$capturedHeaders = [];
$mock = new MockHandler([new Response(200)]);
$handler = HandlerStack::create($mock);
// 拦截请求,捕获 headers
$handler->push(\GuzzleHttp\Middleware::tap(
function (\Psr\Http\Message\RequestInterface $req) use (&$capturedHeaders) {
$capturedHeaders = $req->getHeaders();
}
));
$client = new Client(['handler' => $handler]);
$clientFactory = Mockery::mock(ClientFactory::class);
$clientFactory->shouldReceive('create')->andReturn($client);
$config = Mockery::mock(ConfigInterface::class);
$config->shouldReceive('get')->with('watcher.webhook_timeout', 10)->andReturn(10);
$config->shouldReceive('get')->with('watcher.webhook_secret', '')->andReturn('my_secret');
$logger = Mockery::mock(\Psr\Log\LoggerInterface::class)->shouldIgnoreMissing();
$loggerFactory = Mockery::mock(LoggerFactory::class);
$loggerFactory->shouldReceive('get')->andReturn($logger);
Mockery::mock('alias:App\Model\WebhookDelivery')->shouldReceive('create');
$service = new WebhookPusherService($clientFactory, $config, $loggerFactory);
$event = Mockery::mock($this->makeEvent())->makePartial();
$event->shouldReceive('save');
$service->push($event, 'https://example.com/webhook');
$this->assertArrayHasKey('X-Watcher-Signature', $capturedHeaders);
$this->assertStringStartsWith('sha256=', $capturedHeaders['X-Watcher-Signature'][0]);
}
}
---
phpunit.xml
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.0/phpunit.xsd"
bootstrap="test/bootstrap.php"
colors="true">
<testsuites>
<testsuite name="Unit">
<directory>test/Cases</directory>
</testsuite>
</testsuites>
<coverage>
<include>
<directory>app</directory>
</include>
</coverage>
</phpunit>
---
.github/workflows/ci.yml
name: CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.0
env:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: event_watcher_test
ports: ["3306:3306"]
options: --health-cmd="mysqladmin ping" --health-interval=10s
redis:
image: redis:7-alpine
ports: ["6379:6379"]
options: --health-cmd="redis-cli ping" --health-interval=10s
steps:
- uses: actions/checkout@v4
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.2'
extensions: swoole, pdo_mysql, redis, gmp
coverage: xdebug
- name: Cache Composer
uses: actions/cache@v3
with:
path: vendor
key: ${{ runner.os }}-composer-${{ hashFiles('composer.lock') }}
- name: Install dependencies
run: composer install --no-interaction --prefer-dist
- name: Setup env
run: |
cp .env.example .env
sed -i 's/your_api_token/ci_test_token/' .env
sed -i 's/your_hmac_secret/ci_test_secret/' .env
- name: Run migrations
run: php bin/hyperf.php migrate --env=testing
- name: Run tests
run: composer test -- --coverage-clover coverage.xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: coverage.xml
- name: Check code style
run: composer cs-check
---
.github/workflows/release.yml
name: Release
on:
push:
tags: ['v*.*.*']
jobs:
release:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Create Release
uses: softprops/action-gh-release@v1
with:
generate_release_notes: true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
---
CONTRIBUTING.md
# 贡献指南
## 开发流程
```bash
git clone https://github.com/your-org/hyperf-contract-event-watcher.git
cd hyperf-contract-event-watcher
composer install
cp .env.example .env
php bin/hyperf.php migrate
git checkout -b feat/your-feature
composer test && composer cs-fix
git commit -m "feat: your feature"
git push origin feat/your-feature
Commit 规范
┌───────┬──────────┐
│ 前缀 │ 用途 │
├───────┼──────────┤
│ feat: │ 新功能 │
├───────┼──────────┤
│ fix: │ Bug 修复 │
├───────┼──────────┤
│ perf: │ 性能优化 │
├───────┼──────────┤
│ docs: │ 文档 │
├───────┼──────────┤
│ test: │ 测试 │
└───────┴──────────┘
分支策略
main ← 稳定版本
develop ← 开发主线
feat/* ← 新功能
fix/* ← Bug 修复
---
### `CHANGELOG.md`
```markdown
# Changelog
## [Unreleased]
## [1.0.0] - 2024-01-01
### Added
- eth_getLogs 批量轮询,协程并发扫块
- 多订阅配置(合约地址 + 事件签名 + 自定义过滤)
- ERC-20 Transfer/Approval 事件自动解码
- Webhook 推送 + HMAC-SHA256 签名验证
- 指数退避重试(最多5次:2/4/8/16/32秒)
- WebSocket 实时广播
- 事件去重(txHash + logIndex 唯一索引)
- 完整推送记录(每次 HTTP 请求都有日志)
- 订阅管理 REST API
- GitHub Actions CI/CD
---
README.md
# hyperf-contract-event-watcher
> 基于 Hyperf 协程的智能合约事件监听与 Webhook/WebSocket 推送服务
[](https://github.c
om/your-org/hyperf-contract-event-watcher/actions)
[](https://php.net)
[](LICENSE)
## 这是什么?
24小时盯着区块链,一旦发现你关心的合约事件,
立刻通过 Webhook(HTTP 回调)或 WebSocket 推送给你的业务系统。
## 特性
- **协程并发**:批量 getLogs,速度快
- **多订阅**:同时监听多个合约、多种事件
- **自动解码**:根据 ABI 把链上密文翻译成可读参数
- **可靠推送**:指数退避重试,最多5次,失败有记录
- **HMAC 验签**:接收方可验证消息真实性
- **WebSocket**:前端实时订阅链上事件
- **去重保证**:同一事件绝不重复推送
- **完整审计**:每次推送的 HTTP 状态、耗时、响应都有记录
## 快速开始
```bash
git clone https://github.com/your-org/hyperf-contract-event-watcher.git
cd hyperf-contract-event-watcher
composer install
cp .env.example .env # 填入 ETH_RPC_URL 等配置
php bin/hyperf.php migrate
php bin/hyperf.php start
创建订阅
# 监听 USDT 合约的所有 Transfer 事件
curl -X POST http://localhost:9501/api/v1/subscriptions \
-H "Content-Type: application/json" \
-d '{
"name": "usdt-transfer",
"contract_address": "0xdAC17F958D2ee523a2206206994597C13D831ec7",
"event_signature_str": "Transfer(address,address,uint256)",
"event_name": "Transfer",
"event_abi": [
{"name":"from", "type":"address","indexed":true},
{"name":"to", "type":"address","indexed":true},
{"name":"value", "type":"uint256","indexed":false}
],
"push_type": "webhook",
"webhook_url": "https://your-app.com/webhook/usdt-transfer"
}'
接收 Webhook
你的业务系统会收到这样的 POST 请求:
{
"id": 1,
"event_name": "Transfer",
"contract_address": "0xdac17f958d2ee523a2206206994597c13d831ec7",
"transaction_hash": "0xabc123...",
"block_number": 18000000,
"decoded_params": {
"from": "0xsender...",
"to": "0xreceiver...",
"value": "1000000000"
}
}
验证签名(PHP 示例):
$secret = 'your_hmac_secret';
$body = file_get_contents('php://input');
$received = $_SERVER['HTTP_X_WATCHER_SIGNATURE'];
$expected = 'sha256=' . hash_hmac('sha256', $body, $secret);
if (!hash_equals($expected, $received)) {
http_response_code(401);
exit('签名验证失败');
}
$event = json_decode($body, true);
// 处理业务逻辑...
WebSocket 实时订阅
const ws = new WebSocket('ws://localhost:9502');
ws.onopen = () => {
ws.send(JSON.stringify({
action: 'subscribe',
contract_address: '0xdAC17F958D2ee523a2206206994597C13D831ec7',
event_name: 'Transfer'
}));
};
ws.onmessage = (e) => {
const event = JSON.parse(e.data);
console.log('链上事件:', event);
};
架构图
以太坊节点
│ eth_getLogs(每3秒轮询)
▼
EventPollingProcess(常驻进程)
│
▼
EventCaptureService(比对订阅,去重)
│
▼
Redis 异步队列
│
├──→ DeliverWebhookJob(指数退避重试)
│ └──→ 你的业务系统 HTTP 回调
│
└──→ WebSocketPusherService
└──→ 前端浏览器实时推送
License
MIT
六、开源完整流程
第1步 本地开发完成
composer test && composer cs-fix
第2步 创建 GitHub 仓库
gh repo create your-org/hyperf-contract-event-watcher --public
git push -u origin main
第3步 配置仓库保护
Settings → Branch protection rules
→ main 分支:require PR + CI pass
第4步 发布到 Packagist
packagist.org → Submit → 填 GitHub 地址
GitHub → Settings → Webhooks → 添加 Packagist Hook
第5步 打第一个版本
git tag v1.0.0 && git push origin v1.0.0
→ GitHub Actions 自动创建 Release
第6步 持续维护
├── Issue 分类:bug / enhancement / question
├── 安全问题:私下联系 → 修复 → patch 版本
├── 新功能:develop 分支 → PR → main → 新 tag
└── 每次发版更新 CHANGELOG.md
七、核心价值总结
┌──────────────────────────────────┬─────────────────────────────────────┐
│ 没有它 │ 有了它 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 用户充值,系统不知道,要手动对账 │ Transfer 事件触发 Webhook,自动到账 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 合约被攻击,几小时后才发现 │ 异常事件实时推送,秒级告警 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 前端要轮询 API 查链上状态 │ WebSocket 实时推送,零延迟 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 推送失败数据丢失 │ 指数退避重试 + 完整推送记录 │
├──────────────────────────────────┼─────────────────────────────────────┤
│ 重启后重复处理事件 │ txHash+logIndex 唯一索引去重 │
└──────────────────────────────────┴─────────────────────────────────────┘
一句话:它是区块链和业务系统之间的"实时信使",把链上发生的每一件事可靠地传递给你的应用。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)