【Dv3Admin】腾讯云智能体流式调用与计费设计
在实际项目中接入流式 AI 能力,往往会同时遇到接口复杂、状态难以追踪、计费边界模糊等问题。尤其是在需要长期维护和不断扩展业务的场景下,如果缺少清晰的结构设计,很容易让系统逐步失控。一个可复用、可审计、可扩展的后端架构,往往比单次调用是否成功更重要。
本文围绕一个真实的腾讯工作流接入案例展开,从数据模型设计、三层代码结构划分到流式调用与计费闭环的实现,完整梳理一套适合自学者理解和复用的工程实践思路,重点放在“为什么这样设计”而不仅是“如何调用接口”。
文章目录
model 数据结构
Users 用来保存用户的“总额度/已用额度”,支撑调用前的余额校验与剩余额度计算.
# users/models.py(重点字段简化版)
import uuid
from django.db import models
from django.contrib.auth.models import AbstractUser
class Users(AbstractUser):
# 稳定的跨表标识(PointsLedger 用它关联用户)
uuid = models.UUIDField(
default=uuid.uuid4, editable=False, unique=True,
verbose_name="系统UUID编号", help_text="系统UUID编号"
)
# 腾讯工作流额度:调用前用来判断剩余点数
workflow_tencent_use = models.IntegerField(
default=0, verbose_name="已用点数", help_text="腾讯工作流已用点数"
)
workflow_tencent_all = models.IntegerField(
default=0, verbose_name="总计点数", help_text="腾讯工作流总点数"
)
PointsLedger 用来记录每次调用的消费流水与审计信息,保证扣费可追溯、问题可排查,并能按业务维度统计消耗。
# billing/models.py(重点字段简化版)
from django.db import models
class PointsLedger(models.Model):
# 关联用户(对齐 Users.uuid)
user_uuid = models.UUIDField(
db_index=True, verbose_name="用户UUID",
help_text="对应 system.Users.uuid"
)
# 扣费核心字段(可复盘一次扣费过程)
kind = models.CharField(max_length=32) # workflow/tts/ocr/gpt_token...
units = models.IntegerField() # 本次计费单位
factor = models.IntegerField() # 倍率
cost = models.IntegerField() # 本次消耗
before = models.IntegerField() # 扣费前余额
after = models.IntegerField() # 扣费后余额
# 审计字段(可追溯问答内容)
prompt = models.TextField(blank=True, default="", verbose_name="提问内容")
answer = models.TextField(blank=True, default="", verbose_name="模型输出")
# 业务定位字段(可统计、可排查)
workflow_name = models.CharField(max_length=255, blank=True, default="")
module = models.CharField(max_length=255, blank=True, default="")
biz_type = models.CharField(max_length=64, blank=True, default="")
biz_id = models.CharField(max_length=128, blank=True, default="")
# 状态字段(异常与中断可记录)
status = models.CharField(max_length=16, blank=True, default="ok") # ok/error/aborted
error_message = models.TextField(blank=True, default="")
把上面两个模型放到对应 app 的 models.py 后,执行迁移即可生成表结构。
# 迁移创建表结构
python manage.py makemigrations
python manage.py migrate
key / utils / views 三层结构
围绕腾讯工作流的流式调用构建了一整套稳定、可复用的后端能力,将配置管理、SSE 通信、流式解析、计费结算和业务接入统一整合在同一条调用链中。系统内部能够完成上游 SSE 连接、事件读取与规范化处理,将不稳定的上游事件流转换为前端可直接消费的标准 SSE 输出,并在过程中自动处理增量计算、异常兜底和连接保活。同时每次调用都会与点数消耗和审计记录强绑定,在执行前完成额度校验,在执行结束后自动扣费并落账,保证调用行为可追溯、可结算。对外层而言,不同业务场景只需按统一方式组织输入并发起调用,即可复用同一套流式输出和计费逻辑,从而在扩展业务的同时保持底层行为一致、维护成本可控。
| 文件名 | 定位角色 | 功能概述 | 在系统中的作用 |
|---|---|---|---|
key.py |
配置与常量层 | 统一定义腾讯工作流 API 地址、不同智能体的 bot_key、计费倍率和默认单位 |
为整个工作流体系提供稳定、集中、可复用的配置来源 |
utils.py |
核心工具与引擎层 | 封装腾讯 SSE 请求、流式数据解析、reply/thought 拆分、token 统计、点数扣费、审计记录与异常兜底 | 承担全部技术复杂度,是工作流调用和计费逻辑的核心 |
views.py |
业务接口层 | 定义具体业务 API,整理请求参数和业务上下文,调用统一的工作流客户端 | 将不同业务需求接入同一套腾讯工作流能力 |
key.py 密钥设置
key.py 是整个模块中最基础、也最“安静”的文件,它本身不参与任何业务处理,也不发起请求,只负责集中存放腾讯工作流相关的配置数据。通过将 API 地址、不同智能体的 Key 以及计费相关常量统一定义在这里,其它模块只需按需引用,避免了配置分散和硬编码问题,使得后期替换 Key 或调整倍率的成本非常低。
配置引用方式
from dvadmin.workflow.tencent.key import ApiUrl, KeySystemHelp, WORKFLOW_FACTOR, DEFAULT_UNITS
# ApiUrl:腾讯 SSE API 地址
# KeySystemHelp:系统帮助智能体 key
# WORKFLOW_FACTOR / DEFAULT_UNITS:计费相关默认参数
utils.py 工具封装与流式计费引擎
utils.py 是系统的技术核心,几乎所有与“腾讯工作流如何运行”有关的逻辑都集中在这里。它负责向腾讯侧发起 SSE 流式请求,并对返回的事件流进行增量解析,将复杂、多样、不稳定的上游数据,转换为前端可以稳定消费的标准 SSE 输出。同时这个文件还承担了点数校验、扣费计算、账本写入以及调试日志控制等横切逻辑,对上隐藏了复杂度,对下保证了统一行为,是整个体系中不可替代的一层。
get_billing_units|读取计费单位
它从 request.data 里优先读 billing_units,其次读 tokens 或 max_tokens,读不到就回落到默认值,保证每次调用都有一个可用的计费基准。
units = get_billing_units(request)
units2 = get_billing_units(request, default=8000)
get_workflow_remain|获取剩余点数
它从用户表里拿“总额”和“已用”,算出剩余额度,用于调用前预检或页面展示。
remain = get_workflow_remain(user_id=123)
if remain <= 0:
print("余额不足")
preview_cost|预估消耗
按 units * factor 做一次预估,用在调用前预检,避免连上游后才发现不够扣。
est = preview_cost(units=5000, factor=3)
print(est) # 15000
sse_error|生成 SSE error 事件
返回一个 SSE 事件片段(bytes),用于在流里可见地告诉前端“发生了错误”,而不是静默断开。
chunk = sse_error("bad request")
# 你可以把 chunk yield 给 StreamingHttpResponse
sse_missing_query_response|缺参返回 SSE
当必要参数缺失时,直接返回 SSE 响应,让前端保持同一套 SSE 解析逻辑。
return sse_missing_query_response("missing query")
sse_send_json|标准 SSE 帧
将 dict 压成 JSON 并包装成 data:{json}\n\n 的 bytes,这是下游统一协议的基础。
chunk = sse_send_json({"type": "reply", "delta": "hi"})
build_sse_response|构建流式响应
把一个“可迭代产出 bytes 的对象”包装成 SSE 响应,并设置禁缓存与禁缓冲头,保证前端实时收到。
def gen():
yield sse_send_json({"type": "reply", "delta": "hello"})
yield sse_send_json({"type": "done"})
resp = build_sse_response(gen())
return resp
sse_reply_text_response|文本型 SSE
在余额不足、短路返回、简单提示场景,不需要连上游,只要统一吐一段 reply 再 done。
return sse_reply_text_response("点数不足", req_id="abc123")
build_headers|统一请求头
用来统一生成 Content-Type,可选 Bearer 鉴权,并在 Bearer 时声明 Accept: text/event-stream。
h1 = build_headers()
h2 = build_headers(api_key="TOKEN", bearer=True)
post_stream_json|发起上游请求
把 payload 序列化为 JSON,stream=True 发起 POST,返回 requests.Response,后续由流解析函数去读 iter_lines()。
resp = post_stream_json(
api_url="https://example.com/sse",
payload={"content": "hi", "stream": "enable"},
timeout=100,
)
for line in resp.iter_lines():
pass
_read_upstream_preview|上游错误预览
当上游返回 4xx/5xx 时,读一小段 body 作为 debug 信息,避免只看到状态码看不到原因。
preview = _read_upstream_preview(resp, req_id="abc123")
print(preview[:200])
maybe_fix_mojibake|简易乱码修复
当文本里出现典型“乱码提示字符”且又不是中文时,尝试 latin1→utf8 纠正,尽量让用户看到可读文本。
text = maybe_fix_mojibake("ä¸Â文")
print(text)
calc_incremental|计算增量片段
上游可能每次给你“全文”,这个函数把“本次全文”减去“上次全文”,拿到新增部分。
delta = calc_incremental(prev="hello", curr="hello world")
print(delta) # " world"
_extract_token_stat_total|统一 token 统计口径
它会按多个字段路径兜底提取 token 总数,优先 statistic_infos[].total_tokens,不行再试 input+output,再兜底其它字段,尽量保证计费有依据。
total = _extract_token_stat_total({"statistic_infos": [{"total_tokens": 1234}]})
print(total) # 1234
open_upstream_log|上游日志落盘
只在 debug 开启时尝试创建目录与文件,把上游原始行写下来。失败也不影响主流程。
with open_upstream_log(req_id="abc123", enabled=True) as (f, path):
if f:
f.write(b"raw line\n")
_clean_prefix|清理不可见前缀字符
去掉 BOM、零宽字符等,避免这些字符污染解析与展示。
s = _clean_prefix("\ufeff\u200bhello")
_strip_internal_markers|去掉内部标记
去掉内部约定的 |SYSTEM_NOT_FINISHED_OUTPUT| 等标记,确保前端看到的是干净文本。
s = _strip_internal_markers("|SYSTEM_NOT_FINISHED_OUTPUT|hello")
print(s) # "hello"
_parse_reply_json|解析 Content/Thought 包装
当 reply 文本本身是 JSON(包含 Content/Thought)时,拆出内容与思考;否则按普通文本返回。
content, thought = _parse_reply_json('{"Content":"A","Thought":"B"}')
_extract_from_reply_event|从 reply 事件提取内容
兼容多种上游字段结构,过滤非模型生成内容,必要时处理 workflow 节点里的 Reply,并做乱码与标记清理。
c, t = _extract_from_reply_event({"type": "reply", "payload": {"content": "hi"}})
_extract_from_thought_event|从 thought 事件提取思考
从 procedures/debugging 等路径拿 thought 内容并清理。
t = _extract_from_thought_event({"type": "thought", "payload": {"debugging": {"content": "thinking"}})
_normalize_upstream_line|标准化上游行
把 data: 行、裸 JSON 行统一成“可 json.loads 的 bytes”,过滤掉空行、DONE、控制行等。
body = _normalize_upstream_line(b"data:{\"type\":\"reply\"}")
extract_workflow_name_from_event|提取工作流名称
在任意事件里尽早提取 workflow_name,用于计费审计记录,避免跑完了不知道跑了哪个工作流。
name = extract_workflow_name_from_event({"payload": {"workflow_name": "MyFlow"}})
print(name)
_pointsledger_has_field|检测账本字段是否存在
用于兼容迁移过程:字段没迁移完也不报错,存在才写入。
ok = _pointsledger_has_field("workflow_name")
print(ok)
sse_patch_stream|上游 SSE 转下游 SSE
读取上游流式数据,解析事件类型,执行增量拼接与节流控制,并持续向下游输出统一格式的 SSE 数据。
for chunk in sse_patch_stream(
resp, req_id="abc123", usage_box={}
):
pass
sse_capture_text|捕获输出用于审计
它包裹一个下游输出迭代器,一边原样 yield,一边收集 reply/thought 内容,结束时调用回调,把内容、思考、状态与错误信息交给你做落账。
def on_finish(content, thought, status, err):
print(status, len(content), len(thought), err)
wrapped = sse_capture_text(base_iter=gen(), on_finish=on_finish, req_id="abc123")
for chunk in wrapped:
pass
upload_analysis_txt|上传分析文本并返回路径
把大输入变成文件引用,避免请求体过大,也更利于上游工作流按变量读取。
path = upload_analysis_txt("很长很长的数据……")
print(path)
TencentSSEClient:对业务层暴露的唯一正确入口
到这一步,工具函数你都能单独用,但真正工程化的做法是让业务层只用一个入口,避免调用姿势千奇百怪。TencentSSEClient 就是这个入口,它把“请求组装、上游调用、流式转发、结束扣费落账”串成一个生命周期。
client = TencentSSEClient(api_url="https://example.com/sse", timeout=100)
return client.request_for_bot_billed(
request,
bot_key="BOT_KEY",
content="用户输入",
billing_units=get_billing_units(request),
module="某模块",
biz_type="某biz_type",
extra={"workflow_status": "enable"},
)
通过这种方式,业务层不需要理解 SSE、不需要处理 token、不需要关心计费细节,只负责组织业务输入即可。
views.py 业务接口层
views.py 则处在最外层,直接面对前端或调用方。它不关心 SSE 的细节,也不处理 token 或计费算法,而是专注于“这是一个什么业务”。每个接口函数对应一个明确的业务场景,通过整理请求参数、构造 prompt、准备上下文数据,最终统一调用 utils.py 中的工作流客户端。这样设计的结果是,新增或修改业务时,只需要改动 views 层,而底层能力可以完全复用。
chat_system_help|系统使用帮助
return tencent_client.request_for_bot_billed(
request,
bot_key="KeySystemHelp",
content="query + inputs 组合后的输入",
billing_units=get_billing_units(request),
module="系统帮助/主题",
biz_type="tencent.chat_system_help",
extra={"workflow_status": "disable"},
)
chat_score_analyst|成绩分析(先上传数据文件,再传路径变量)
file_path = upload_analysis_txt("成绩数据文本")
return tencent_client.request_for_bot_billed(
request,
bot_key="KeyScoreAnalyst",
content="用户指令 inputs",
billing_units=get_billing_units(request),
module="成绩分析/页面",
biz_type="tencent.chat_score_analyst",
extra={"workflow_status": "enable", "custom_variables": {"data": file_path}},
)
chat_lesson_plan_design|教案设计(从数据库拿上下文,按 target 生成模块)
return tencent_client.request_for_bot_billed(
request,
bot_key="KeySubjectHelp",
content="query(用户要求)",
billing_units=get_billing_units(request),
module="教案设计/教学目标",
biz_type="tencent.chat_lesson_plan_design",
extra={"workflow_status": "enable", "custom_variables": {"subject": "语文"}},
)
chat_graphic_teaching_plan|图文教案(固定指令 + 用户补充要求)
return tencent_client.request_for_bot_billed(
request,
bot_key="KeyGraphicTeachingPlan",
content="固定指令 + 用户 query",
billing_units=get_billing_units(request),
module="图文教案",
biz_type="tencent.chat_graphic_teaching_plan",
extra={"workflow_status": "enable", "custom_variables": {"textbook_select": "某教材"}},
)
总结
通过对 Users 与 PointsLedger 两个核心模型的设计,可以将用户额度状态与消费流水彻底解耦,使调用前校验、调用中处理、调用后审计各自职责清晰。配合 key、utils、views 三层结构,将配置、引擎能力与业务接口拆分,能够有效降低系统复杂度,让流式 SSE 调用在高并发场景下依然保持稳定与可维护。
随着 AI 服务形态不断演进,流式输出、按量计费和可审计调用将成为常态。基于当前结构,可以进一步扩展更精细的计费策略、更丰富的业务模型,以及异步任务与监控体系。在保持核心调用链稳定的前提下,这套架构具备持续演进为通用 AI 能力中台的空间,也为自学者理解真实工程中的系统设计提供了可落地的参考。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)