Coze智能体SSE流式API调用实战,手把手教你对接扣子智能体
前言
在AI应用开发领域,智能体的API调用是每个开发者必须掌握的技能。很多同学在对接Coze智能体API时,被SSE流式响应搞得一头雾水:event类型怎么区分?delta数据怎么拼接?message_end什么时候触发?今天这篇文章,我将从实际项目经验出发,手把手教你掌握Coze智能体的SSE流式API调用。
一、Coze智能体API架构解析
在开始编码之前,我们先来理解Coze智能体API的整体架构。Coze平台提供了两种运行模式:
- chat模式:标准的问答交互模式,适合简单对话场景
- stream_run模式:流式运行模式,通过SSE(Server-Sent Events)实时推送响应内容
流式响应的核心优势在于:
- 实时反馈:用户可以即时看到AI的思考过程和输出内容
- 降低感知延迟:无需等待完整响应,提升用户体验
- 支持长文本:大段内容可以边生成边展示,不会超时
1.1 API调用整体流程
┌─────────────┐ POST /v2/chat/run ┌─────────────┐
│ Client │ ─────────────────────────▶ │ Coze API │
│ (Python) │ │ Server │
└─────────────┘ └─────────────┘
│ │
│ ◀──────── SSE Stream Response ──────────│
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ 解析event │ │ 大模型推理 │
│ 拼接delta │ │ 工作流执行 │
└─────────────┘ └─────────────┘
二、环境准备与依赖安装
在开始之前,请确保已安装以下依赖:
pip install requests sseclient-py
2.1 核心配置参数
import requests
import json
from sseclient import SSEClient
class CozeAPIClient:
"""Coze智能体API客户端"""
def __init__(self, api_key: str, bot_id: str):
"""
初始化API客户端
Args:
api_key: Coze平台的个人访问令牌(PAT)
bot_id: 智能体的ID
"""
self.api_key = api_key
self.bot_id = bot_id
self.base_url = "https://api.coze.cn"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
三、stream_run接口详解
3.1 接口基本信息
POST https://api.coze.cn/v2/chat/run
这是流式调用智能体的核心接口,相比非流式的/v2/chat接口,它会通过SSE协议实时推送消息内容。
3.2 请求头配置
请求头中,最关键的是Authorization字段,需要使用Bearer Token认证方式:
headers = {
"Authorization": f"Bearer {coze_api_key}", # PAT认证
"Content-Type": "application/json",
"Accept": "text/event-stream" # 声明接受SSE流
}
3.3 请求体参数
payload = {
"bot_id": bot_id, # 必填:智能体ID
"user_id": "user_001", # 必填:用户标识
"stream": True, # 必填:开启流式响应
"auto_save_history": True, # 可选:是否自动保存对话历史
"additional_messages": [ # 可选:携带历史对话
{
"role": "user",
"content": "你好",
"content_type": "text"
}
],
"request_timeout": 120, # 可选:请求超时时间(秒)
"conversation_id": None, # 可选:指定会话ID实现多轮对话
}
四、SSE流式响应深度解析
这是本文的核心部分。Coze的SSE响应包含多种event类型,理解它们是正确处理流式数据的关键。
4.1 Event类型一览
表格
| Event类型 | 说明 | 处理方式 |
|---|---|---|
conversation.message.delta |
消息内容增量 | 累加到完整内容 |
conversation.message.completed |
单条消息完成 | 提取最终内容 |
conversation.chat.completed |
整个对话完成 | 标记流结束 |
error |
错误事件 | 记录并处理 |
ping |
心跳事件 | 忽略或回复pong |
4.2 响应数据解析代码
def stream_chat(messages: list, user_id: str = "default_user"):
"""
流式调用Coze智能体
Args:
messages: 对话消息列表
user_id: 用户ID
Yields:
dict: 解析后的消息片段
"""
url = f"{BASE_URL}/v2/chat/run"
payload = {
"bot_id": BOT_ID,
"user_id": user_id,
"stream": True,
"auto_save_history": True,
"messages": messages,
"request_timeout": 120
}
response = requests.post(
url,
headers=HEADERS,
json=payload,
stream=True,
timeout=180 # 设置足够长的超时时间
)
if response.status_code != 200:
raise Exception(f"API调用失败: {response.status_code} - {response.text}")
full_content = "" # 完整响应内容
conversation_id = None
# 使用SSEClient解析流
client = SSEClient(response)
for event in client.events():
event_type = event.event
event_data = event.data
# 心跳事件 - 直接跳过
if event_type == "ping":
continue
# 错误事件 - 记录并继续
if event_type == "error":
print(f"流式响应错误: {event_data}")
continue
# 跳过空数据
if not event_data or event_data.strip() == "":
continue
try:
data = json.loads(event_data)
except json.JSONDecodeError:
continue
# 处理消息增量 - 核心数据类型
if event_type == "conversation.message.delta":
delta = data.get("data", {}).get("delta", "")
full_content += delta
yield {
"type": "delta",
"content": delta,
"full_content": full_content
}
# 消息完成事件
elif event_type == "conversation.message.completed":
msg_type = data.get("data", {}).get("type", "")
if msg_type == "answer":
yield {
"type": "message_completed",
"content": data.get("data", {}).get("content", ""),
"role": data.get("data", {}).get("role", "")
}
# 对话完成 - 流结束标志
elif event_type == "conversation.chat.completed":
conv_info = data.get("data", {})
conversation_id = conv_info.get("id")
yield {
"type": "chat_completed",
"conversation_id": conversation_id,
"content": full_content
}
break
return conversation_id
五、常见踩坑与解决方案
在实际项目中,我遇到了以下几个典型问题,这里分享给大家:
5.1 超时处理
SSE连接容易被防火墙或代理服务器中断。建议采用以下策略:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session_with_retry():
"""创建带有重试机制的Session"""
session = requests.Session()
# 配置重试策略:总共重试3次
retry_strategy = Retry(
total=3,
backoff_factor=1, # 重试间隔:1s, 2s, 4s
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
5.2 流中断处理
当流意外中断时,我们需要识别并处理:
def handle_stream_interrupt(response_stream):
"""处理流中断的优雅降级"""
full_content = ""
try:
for chunk in response_stream.iter_content(chunk_size=1024):
if chunk:
full_content += chunk.decode('utf-8')
except requests.exceptions.ChunkedEncodingError:
# 流中断,返回已获取的内容
print("流式响应意外中断,返回已获取内容")
return full_content, None
return full_content, None
5.3 answer为空时的降级方案
有时模型返回的delta为空,可能是内容正在生成中:
def process_stream_with_fallback(messages):
"""
带降级处理的流式调用
"""
try:
# 尝试流式调用
result_gen = stream_chat(messages)
full_content = ""
for result in result_gen:
if result["type"] == "delta":
full_content = result["full_content"]
elif result["type"] == "chat_completed":
return full_content, result.get("conversation_id")
# 如果流式没有返回完整内容,尝试非流式API
if not full_content:
return non_stream_chat(messages)
return full_content, None
except Exception as e:
print(f"流式调用异常: {e},降级到非流式调用")
return non_stream_chat(messages)
5.4 PAT认证配置
Coze API使用Personal Access Token(个人访问令牌)进行认证,配置方式如下:
# 方式1:直接在代码中配置(不推荐用于生产环境)
COZE_API_KEY = "your_pat_here"
# 方式2:从环境变量读取(推荐)
import os
COZE_API_KEY = os.getenv("COZE_API_KEY")
# 方式3:从配置文件读取
import yaml
with open("config.yaml", "r") as f:
config = yaml.safe_load(f)
COZE_API_KEY = config["coze"]["api_key"]
获取PAT的步骤:
- 登录 Coze平台
- 进入「个人设置」→「API Keys」
- 点击「创建」,复制生成的Token
- 注意:Token只显示一次,请妥善保管
六、完整实战项目
下面是一个可运行的完整示例,整合了所有知识点:
#!/usr/bin/env python3
"""
Coze智能体SSE流式调用完整示例
"""
import os
import json
import requests
from sseclient import SSEClient
from typing import Generator, Dict, Any, Optional
# ============ 配置区 ============
COZE_API_KEY = os.getenv("COZE_API_KEY", "your_api_key_here")
BOT_ID = os.getenv("BOT_ID", "your_bot_id_here")
BASE_URL = "https://api.coze.cn"
# ================================
class CozeStreamClient:
def __init__(self, api_key: str, bot_id: str):
self.api_key = api_key
self.bot_id = bot_id
self.base_url = "https://api.coze.cn"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def chat(self, user_message: str, user_id: str = "default") -> Dict[str, Any]:
"""
流式对话主函数
Returns:
dict: {
"success": bool,
"content": str, # 完整响应
"conversation_id": str,
"error": str # 如果失败
}
"""
messages = [{
"role": "user",
"content": user_message,
"content_type": "text"
}]
try:
full_content = ""
conversation_id = None
for event in self._stream_request(messages, user_id):
if event["type"] == "delta":
full_content = event["full_content"]
# 实时打印(模拟打字效果)
print(event["content"], end="", flush=True)
elif event["type"] == "chat_completed":
conversation_id = event.get("conversation_id")
print() # 换行
return {
"success": True,
"content": full_content,
"conversation_id": conversation_id
}
except Exception as e:
return {
"success": False,
"error": str(e),
"content": ""
}
def _stream_request(self, messages: list, user_id: str) -> Generator:
"""发送流式请求"""
url = f"{self.base_url}/v2/chat/run"
payload = {
"bot_id": self.bot_id,
"user_id": user_id,
"stream": True,
"auto_save_history": True,
"messages": messages,
"request_timeout": 120
}
response = requests.post(
url,
headers=self.headers,
json=payload,
stream=True,
timeout=180
)
if response.status_code != 200:
raise Exception(f"请求失败: {response.status_code} - {response.text}")
full_content = ""
client = SSEClient(response)
for event in client.events():
if event.event == "ping":
continue
if not event.data:
continue
try:
data = json.loads(event.data)
except json.JSONDecodeError:
continue
if event.event == "conversation.message.delta":
delta = data.get("data", {}).get("delta", "")
full_content += delta
yield {"type": "delta", "content": delta, "full_content": full_content}
elif event.event == "conversation.chat.completed":
yield {
"type": "chat_completed",
"conversation_id": data.get("data", {}).get("id")
}
break
def main():
client = CozeStreamClient(COZE_API_KEY, BOT_ID)
print("=" * 50)
print("Coze智能体流式对话测试")
print("=" * 50)
print("\n请输入您的问题(输入q退出):\n")
while True:
user_input = input("👤 您: ").strip()
if user_input.lower() in ['q', 'quit', 'exit']:
print("感谢使用!")
break
if not user_input:
continue
print("🤖 智能体: ", end="", flush=True)
result = client.chat(user_input)
if not result["success"]:
print(f"\n❌ 错误: {result['error']}")
print()
if __name__ == "__main__":
main()
运行效果
==================================================
Coze智能体流式对话测试
==================================================
请输入您的问题(输入q退出):
👤 您: 介绍一下Python的异步编程
🤖 智能体: Python的异步编程是一种高效处理并发任务的方式...
👤 您:
七、进阶技巧
7.1 多轮对话实现
通过保存conversation_id,可以实现多轮上下文对话:
class CozeMultiTurnClient(CozeStreamClient):
"""支持多轮对话的客户端"""
def __init__(self, api_key: str, bot_id: str):
super().__init__(api_key, bot_id)
self.conversation_id = None
self.messages = []
def chat_with_context(self, user_message: str):
"""携带上下文的对话"""
# 添加用户消息
self.messages.append({
"role": "user",
"content": user_message,
"content_type": "text"
})
payload = {
"bot_id": self.bot_id,
"user_id": "user_001",
"stream": True,
"auto_save_history": True,
"messages": self.messages,
"conversation_id": self.conversation_id
}
# 处理响应...
# 更新conversation_id...
7.2 消息内容类型
Coze支持多种消息内容类型:
# 文本消息
text_message = {
"role": "user",
"content": "Hello",
"content_type": "text"
}
# 图片消息
image_message = {
"role": "user",
"content": json.dumps([{
"type": "image",
"file_id": "file_xxx"
}]),
"content_type": "text"
}
# 卡片消息
card_message = {
"role": "user",
"content": "这是卡片内容",
"content_type": "text"
}
八、总结
本文详细讲解了Coze智能体SSE流式API调用的完整流程,包括:
- API架构:理解chat和stream_run两种模式的区别
- 认证配置:PAT Token的正确配置方式
- SSE响应解析:各种event类型的处理逻辑
- 实战代码:可运行的完整Python示例
- 踩坑指南:超时处理、流中断、answer为空的解决方案
核心要点:
- SSE流式响应的
delta事件是数据核心,需要累加拼接 conversation.chat.completed是流结束的标志- 务必设置合理的超时时间,建议120秒以上
- 建议实现降级方案,提升系统健壮性
有智能体定制开发需求?
如果你在AI智能体开发中遇到复杂问题,或者需要定制化的智能体解决方案,欢迎私信咨询!专注于Coze智能体开发、API对接、自动化系统搭建,支持企业级应用落地。
原创不易,码字艰辛。如果这篇文章对你有帮助,欢迎点赞、评论、收藏!有问题也欢迎在评论区留言,我会尽力解答。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)