Webhook 能返回 202、任务能进库之后,又产生了新的问题:如果默认谁 POST 过来我都信,那和在内网裸奔一个调试接口差别不大;GitHub 重试一次 delivery,我这边会不会又建一条任务;审查跑得慢的时候,队列里别的任务会不会一直吃不上 CPU。这些问题不先收口,后面再加 OAuth、多仓库,只是在更脆的入口上叠功能。

        最早一版仓库里的习惯写法,是把 body 直接绑成 Pydantic 模型,接口很短,本地演示特别快。但签名校验必须用原始字节算 HMAC,一旦让框架先解析过 body,再想去对齐 X-Hub-Signature-256 就会别扭。所以现在的写法是先整块读入,再在同一串 raw_body 上做签名、解码、分支路由。

@router.post(

"/webhooks/github",

status_code=status.HTTP_202_ACCEPTED,

)

async def github_webhook(

request: Request,

db: Session = Depends(get_db_session),

) -> dict[str, object]:

settings = get_settings()

raw_body = await request.body()

signature = request.headers.get("X-Hub-Signature-256")

delivery_id = request.headers.get(_GITHUB_DELIVERY_HEADER)

event_name = request.headers.get(_GITHUB_EVENT_HEADER)

request_timestamp = _extract_request_timestamp(request)

_validate_github_signature(signature=signature, raw_body=raw_body, settings=settings)

_validate_delivery_headers(delivery_id=delivery_id, event_name=event_name, settings=settings)

_validate_replay_time_window(request_timestamp=request_timestamp, settings=settings)

normalized_event_name = (event_name or "pull_request").strip().lower()

payload_data = _decode_github_webhook_payload(raw_body=raw_body, content_type=request.headers.get("content-type"))

if normalized_event_name == "pull_request":

payload = _validate_github_pull_request_payload(payload_data)

actor = payload.pull_request.user.login

task = create_review_task_from_github_webhook(

db,

payload,

actor,

delivery_id=delivery_id,

event_name=normalized_event_name,

raw_body=raw_body,

)

return serialize_review_task_summary(task).model_dump(mode="json")

        签名函数单独拆出去,逻辑很朴素:根据配置判断「这次要不要验」;要验就算 sha256= 摘要,用 compare_digest 防时序旁路。我本地 mock 时经常关掉强制校验,但一想到切到 github 模式就会收紧,就把「何时必须验」写进条件里,而不是靠人肉记得改环境。

def _validate_github_signature(*, signature: str | None, raw_body: bytes, settings: Settings) -> None:

should_validate = (

settings.github_webhook_require_signature

or settings.git_provider_mode.strip().lower() == "github"

or bool(signature)

)

if not should_validate:

return

if not signature:

raise HTTPException(status_code=401, detail="Missing GitHub webhook signature.")

expected = "sha256=" + hmac.new(

settings.github_webhook_secret.encode("utf-8"),

raw_body,

hashlib.sha256,

).hexdigest()

if not hmac.compare_digest(signature, expected):

raise HTTPException(status_code=401, detail="GitHub webhook signature mismatch.")

      delivery 和 event 头单独校验,是因为幂等和排错都依赖它们:没有 X-GitHub-Delivery,很难在库里说认出每一条投递;没有 X-GitHub-Event,以后 issue_comment 和 pull_request 混在一个 URL 上时,只能靠猜。_validate_delivery_headers 里按 deployment_mode 或开关决定是否强制,这样本地松、部署紧,不必改代码。

       真正提供帮助的是服务层:create_review_task_from_github_webhook 现在吃 delivery_id 和 raw_body,先算指纹,再查 GitHubWebhookDelivery。命中已有 delivery 就只写 webhook.duplicate_skipped 审计并返回旧任务;窗口内相似指纹走 webhook.replay_window_skipped。写审计时我一开始嫌啰嗦,后来发现没有这几行,GitHub 后台 delivery 列表和本地数据库根本对不上号。

def create_review_task_from_github_webhook(

db: Session,

payload: GitHubPullRequestWebhookPayload,

actor: str,

*,

delivery_id: str | None = None,

event_name: str = "pull_request",

raw_body: bytes | None = None,

) -> ReviewTask:

slug = payload.repository.full_name

fingerprint = hashlib.sha256(raw_body or payload.model_dump_json().encode("utf-8")).hexdigest()

replay_window_seconds = max(int(get_settings().github_webhook_replay_window_seconds), 0)

if delivery_id:

existing_delivery = db.scalar(

select(GitHubWebhookDelivery).where(GitHubWebhookDelivery.delivery_id == delivery_id)

)

if existing_delivery is not None and existing_delivery.review_task_id is not None:

existing_task = _get_review_task_with_context(db, existing_delivery.review_task_id)

if existing_task is not None:

db.add(

AuditLog(

action="webhook.duplicate_skipped",

)

)

db.commit()

return existing_task

recent_replay = _find_recent_replayed_delivery(

db,

delivery_id=delivery_id,

)

if recent_replay is not None and recent_replay.review_task_id is not None:

db.commit()

return existing_task

       入口收紧之后,异步侧的问题会浮上来。本地长期 CELERY_TASK_ALWAYS_EAGER=true 很省事,但要模拟「HTTP 秒回、审查慢慢跑」,就必须 Redis + Worker。Worker 默认 prefetch 一大,我遇到过队列里有任务却半天轮不到的情况,所以在应用里把 worker_prefetch_multiplier 钉成 1,并给审查任务加了软/硬超时,避免一条任务把 worker 占死。

celery_app = Celery(

"codeguard",

broker=settings.celery_broker_url,

backend=settings.celery_result_backend,

include=["app.tasks.review_tasks"],

)

celery_app.conf.update(

task_always_eager=settings.celery_task_always_eager,

task_serializer="json",

result_serializer="json",

accept_content=["json"],

timezone="UTC",

worker_prefetch_multiplier=1,

task_soft_time_limit=max(0, int(settings.review_task_worker_soft_timeout_seconds)),

task_time_limit=max(0, int(settings.review_task_worker_hard_timeout_seconds)),

)

       部署上我不想把并发写死在镜像命令里,docker-compose 里用环境变量覆盖 --concurrency,再配合 --prefetch-multiplier=1 和 -O fair,改并发只动 .env 或 compose 即可。

worker:

build:

context: ./backend

command: celery -A app.tasks.celery_app.celery_app worker --loglevel=info --concurrency=${CELERY_WORKER_CONCURRENCY:-1} --prefetch-multiplier=1 -O fair

      对照手里较早的那份快照,Webhook 仍是一个 POST,但职责已经完全不同:那份是「模型一绑就进业务」,现在是「先字节、再信任、再路由、再幂等」。这不是谁好谁坏的评判,而是项目长到这里自然会发生的重构——先把链路跑通,再补信任和调度,否则一开始就会被密钥、签名和队列参数绑死,反而动不了。

       如果后面继续做 OAuth、做仓库连接状态,我会尽量让新能力挂在这条已经收紧的链路上,而不是另起一套入口。这样我自己读代码时,也能分清:哪一段是在回答这是不是 GitHub,哪一段是在回答同一消息算不算两遍,哪一段是在回答算完之后别把自己堵死。

       前面这些改动单看都像加校验、加表、改 compose,叠在一起的意义其实很实在:Webhook 从能接到变成接得对、接得稳,审查从跟着 HTTP 一起堵变成能排队、能超时、能调并发,后面无论是接 OAuth、挂多仓库,还是做评论发布成功率,都不必再回头拆掉一整个入口。中期这一段的价值,对我来说就是把最费返工的地基压实了,后面写业务接口时少了很多万一其实是伪造请求的心虚。对于代码审查助手来说,开发者打开 PR,希望尽快得到有据、可追踪、可确认的审查意见,而不是多一个容易误报、重复投递、后台卡死的玩具接口。把 Webhook 验真、幂等和 Worker 调度收紧之后,同一条 PR 事件只会稳定地变成一条可查询的审查任务,审查流水线才有机会在后台慢慢拉 diff、跑分析、生成草稿评论——助手才算站在能帮上忙这一侧,而不是给团队添噪音。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐