前言

在AI应用开发领域,智能体的API调用是每个开发者必须掌握的技能。很多同学在对接Coze智能体API时,被SSE流式响应搞得一头雾水:event类型怎么区分?delta数据怎么拼接?message_end什么时候触发?今天这篇文章,我将从实际项目经验出发,手把手教你掌握Coze智能体的SSE流式API调用。

一、Coze智能体API架构解析

在开始编码之前,我们先来理解Coze智能体API的整体架构。Coze平台提供了两种运行模式:

  1. chat模式:标准的问答交互模式,适合简单对话场景
  2. stream_run模式:流式运行模式,通过SSE(Server-Sent Events)实时推送响应内容

流式响应的核心优势在于:

  • 实时反馈:用户可以即时看到AI的思考过程和输出内容
  • 降低感知延迟:无需等待完整响应,提升用户体验
  • 支持长文本:大段内容可以边生成边展示,不会超时

1.1 API调用整体流程

┌─────────────┐     POST /v2/chat/run      ┌─────────────┐
│   Client    │ ─────────────────────────▶  │   Coze API  │
│  (Python)   │                             │   Server    │
└─────────────┘                             └─────────────┘
       │                                           │
       │  ◀──────── SSE Stream Response ──────────│
       │                                           │
       ▼                                           ▼
┌─────────────┐                             ┌─────────────┐
│ 解析event   │                             │  大模型推理  │
│ 拼接delta   │                             │  工作流执行  │
└─────────────┘                             └─────────────┘

二、环境准备与依赖安装

在开始之前,请确保已安装以下依赖:

pip install requests sseclient-py

2.1 核心配置参数

import requests
import json
from sseclient import SSEClient

class CozeAPIClient:
    """Coze智能体API客户端"""
    
    def __init__(self, api_key: str, bot_id: str):
        """
        初始化API客户端
        
        Args:
            api_key: Coze平台的个人访问令牌(PAT)
            bot_id: 智能体的ID
        """
        self.api_key = api_key
        self.bot_id = bot_id
        self.base_url = "https://api.coze.cn"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

三、stream_run接口详解

3.1 接口基本信息

POST https://api.coze.cn/v2/chat/run

这是流式调用智能体的核心接口,相比非流式的/v2/chat接口,它会通过SSE协议实时推送消息内容。

3.2 请求头配置

请求头中,最关键的是Authorization字段,需要使用Bearer Token认证方式:

headers = {
    "Authorization": f"Bearer {coze_api_key}",  # PAT认证
    "Content-Type": "application/json",
    "Accept": "text/event-stream"  # 声明接受SSE流
}

3.3 请求体参数

payload = {
    "bot_id": bot_id,                    # 必填:智能体ID
    "user_id": "user_001",                # 必填:用户标识
    "stream": True,                       # 必填:开启流式响应
    "auto_save_history": True,            # 可选:是否自动保存对话历史
    "additional_messages": [              # 可选:携带历史对话
        {
            "role": "user",
            "content": "你好",
            "content_type": "text"
        }
    ],
    "request_timeout": 120,               # 可选:请求超时时间(秒)
    "conversation_id": None,              # 可选:指定会话ID实现多轮对话
}

四、SSE流式响应深度解析

这是本文的核心部分。Coze的SSE响应包含多种event类型,理解它们是正确处理流式数据的关键。

4.1 Event类型一览

表格

Event类型 说明 处理方式
conversation.message.delta 消息内容增量 累加到完整内容
conversation.message.completed 单条消息完成 提取最终内容
conversation.chat.completed 整个对话完成 标记流结束
error 错误事件 记录并处理
ping 心跳事件 忽略或回复pong

4.2 响应数据解析代码

def stream_chat(messages: list, user_id: str = "default_user"):
    """
    流式调用Coze智能体
    
    Args:
        messages: 对话消息列表
        user_id: 用户ID
    
    Yields:
        dict: 解析后的消息片段
    """
    url = f"{BASE_URL}/v2/chat/run"
    
    payload = {
        "bot_id": BOT_ID,
        "user_id": user_id,
        "stream": True,
        "auto_save_history": True,
        "messages": messages,
        "request_timeout": 120
    }
    
    response = requests.post(
        url, 
        headers=HEADERS, 
        json=payload,
        stream=True,
        timeout=180  # 设置足够长的超时时间
    )
    
    if response.status_code != 200:
        raise Exception(f"API调用失败: {response.status_code} - {response.text}")
    
    full_content = ""  # 完整响应内容
    conversation_id = None
    
    # 使用SSEClient解析流
    client = SSEClient(response)
    
    for event in client.events():
        event_type = event.event
        event_data = event.data
        
        # 心跳事件 - 直接跳过
        if event_type == "ping":
            continue
        
        # 错误事件 - 记录并继续
        if event_type == "error":
            print(f"流式响应错误: {event_data}")
            continue
        
        # 跳过空数据
        if not event_data or event_data.strip() == "":
            continue
        
        try:
            data = json.loads(event_data)
        except json.JSONDecodeError:
            continue
        
        # 处理消息增量 - 核心数据类型
        if event_type == "conversation.message.delta":
            delta = data.get("data", {}).get("delta", "")
            full_content += delta
            yield {
                "type": "delta",
                "content": delta,
                "full_content": full_content
            }
        
        # 消息完成事件
        elif event_type == "conversation.message.completed":
            msg_type = data.get("data", {}).get("type", "")
            if msg_type == "answer":
                yield {
                    "type": "message_completed",
                    "content": data.get("data", {}).get("content", ""),
                    "role": data.get("data", {}).get("role", "")
                }
        
        # 对话完成 - 流结束标志
        elif event_type == "conversation.chat.completed":
            conv_info = data.get("data", {})
            conversation_id = conv_info.get("id")
            yield {
                "type": "chat_completed",
                "conversation_id": conversation_id,
                "content": full_content
            }
            break
    
    return conversation_id

五、常见踩坑与解决方案

在实际项目中,我遇到了以下几个典型问题,这里分享给大家:

5.1 超时处理

SSE连接容易被防火墙或代理服务器中断。建议采用以下策略:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session_with_retry():
    """创建带有重试机制的Session"""
    session = requests.Session()
    
    # 配置重试策略:总共重试3次
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,  # 重试间隔:1s, 2s, 4s
        status_forcelist=[500, 502, 503, 504]
    )
    
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    
    return session

5.2 流中断处理

当流意外中断时,我们需要识别并处理:

def handle_stream_interrupt(response_stream):
    """处理流中断的优雅降级"""
    full_content = ""
    
    try:
        for chunk in response_stream.iter_content(chunk_size=1024):
            if chunk:
                full_content += chunk.decode('utf-8')
    except requests.exceptions.ChunkedEncodingError:
        # 流中断,返回已获取的内容
        print("流式响应意外中断,返回已获取内容")
        return full_content, None
    
    return full_content, None

5.3 answer为空时的降级方案

有时模型返回的delta为空,可能是内容正在生成中:

def process_stream_with_fallback(messages):
    """
    带降级处理的流式调用
    """
    try:
        # 尝试流式调用
        result_gen = stream_chat(messages)
        full_content = ""
        
        for result in result_gen:
            if result["type"] == "delta":
                full_content = result["full_content"]
            elif result["type"] == "chat_completed":
                return full_content, result.get("conversation_id")
        
        # 如果流式没有返回完整内容,尝试非流式API
        if not full_content:
            return non_stream_chat(messages)
        
        return full_content, None
        
    except Exception as e:
        print(f"流式调用异常: {e},降级到非流式调用")
        return non_stream_chat(messages)

5.4 PAT认证配置

Coze API使用Personal Access Token(个人访问令牌)进行认证,配置方式如下:

# 方式1:直接在代码中配置(不推荐用于生产环境)
COZE_API_KEY = "your_pat_here"

# 方式2:从环境变量读取(推荐)
import os
COZE_API_KEY = os.getenv("COZE_API_KEY")

# 方式3:从配置文件读取
import yaml
with open("config.yaml", "r") as f:
    config = yaml.safe_load(f)
    COZE_API_KEY = config["coze"]["api_key"]

获取PAT的步骤

  1. 登录 Coze平台
  2. 进入「个人设置」→「API Keys」
  3. 点击「创建」,复制生成的Token
  4. 注意:Token只显示一次,请妥善保管

六、完整实战项目

下面是一个可运行的完整示例,整合了所有知识点:

#!/usr/bin/env python3
"""
Coze智能体SSE流式调用完整示例
"""

import os
import json
import requests
from sseclient import SSEClient
from typing import Generator, Dict, Any, Optional

# ============ 配置区 ============
COZE_API_KEY = os.getenv("COZE_API_KEY", "your_api_key_here")
BOT_ID = os.getenv("BOT_ID", "your_bot_id_here")
BASE_URL = "https://api.coze.cn"
# ================================

class CozeStreamClient:
    def __init__(self, api_key: str, bot_id: str):
        self.api_key = api_key
        self.bot_id = bot_id
        self.base_url = "https://api.coze.cn"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def chat(self, user_message: str, user_id: str = "default") -> Dict[str, Any]:
        """
        流式对话主函数
        
        Returns:
            dict: {
                "success": bool,
                "content": str,  # 完整响应
                "conversation_id": str,
                "error": str  # 如果失败
            }
        """
        messages = [{
            "role": "user",
            "content": user_message,
            "content_type": "text"
        }]
        
        try:
            full_content = ""
            conversation_id = None
            
            for event in self._stream_request(messages, user_id):
                if event["type"] == "delta":
                    full_content = event["full_content"]
                    # 实时打印(模拟打字效果)
                    print(event["content"], end="", flush=True)
                elif event["type"] == "chat_completed":
                    conversation_id = event.get("conversation_id")
            
            print()  # 换行
            return {
                "success": True,
                "content": full_content,
                "conversation_id": conversation_id
            }
            
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "content": ""
            }
    
    def _stream_request(self, messages: list, user_id: str) -> Generator:
        """发送流式请求"""
        url = f"{self.base_url}/v2/chat/run"
        
        payload = {
            "bot_id": self.bot_id,
            "user_id": user_id,
            "stream": True,
            "auto_save_history": True,
            "messages": messages,
            "request_timeout": 120
        }
        
        response = requests.post(
            url,
            headers=self.headers,
            json=payload,
            stream=True,
            timeout=180
        )
        
        if response.status_code != 200:
            raise Exception(f"请求失败: {response.status_code} - {response.text}")
        
        full_content = ""
        client = SSEClient(response)
        
        for event in client.events():
            if event.event == "ping":
                continue
            
            if not event.data:
                continue
            
            try:
                data = json.loads(event.data)
            except json.JSONDecodeError:
                continue
            
            if event.event == "conversation.message.delta":
                delta = data.get("data", {}).get("delta", "")
                full_content += delta
                yield {"type": "delta", "content": delta, "full_content": full_content}
            
            elif event.event == "conversation.chat.completed":
                yield {
                    "type": "chat_completed",
                    "conversation_id": data.get("data", {}).get("id")
                }
                break


def main():
    client = CozeStreamClient(COZE_API_KEY, BOT_ID)
    
    print("=" * 50)
    print("Coze智能体流式对话测试")
    print("=" * 50)
    print("\n请输入您的问题(输入q退出):\n")
    
    while True:
        user_input = input("👤 您: ").strip()
        
        if user_input.lower() in ['q', 'quit', 'exit']:
            print("感谢使用!")
            break
        
        if not user_input:
            continue
        
        print("🤖 智能体: ", end="", flush=True)
        result = client.chat(user_input)
        
        if not result["success"]:
            print(f"\n❌ 错误: {result['error']}")
        
        print()


if __name__ == "__main__":
    main()

运行效果

==================================================
Coze智能体流式对话测试
==================================================

请输入您的问题(输入q退出):

👤 您: 介绍一下Python的异步编程

🤖 智能体: Python的异步编程是一种高效处理并发任务的方式...

👤 您: 

七、进阶技巧

7.1 多轮对话实现

通过保存conversation_id,可以实现多轮上下文对话:

class CozeMultiTurnClient(CozeStreamClient):
    """支持多轮对话的客户端"""
    
    def __init__(self, api_key: str, bot_id: str):
        super().__init__(api_key, bot_id)
        self.conversation_id = None
        self.messages = []
    
    def chat_with_context(self, user_message: str):
        """携带上下文的对话"""
        # 添加用户消息
        self.messages.append({
            "role": "user",
            "content": user_message,
            "content_type": "text"
        })
        
        payload = {
            "bot_id": self.bot_id,
            "user_id": "user_001",
            "stream": True,
            "auto_save_history": True,
            "messages": self.messages,
            "conversation_id": self.conversation_id
        }
        
        # 处理响应...
        # 更新conversation_id...

7.2 消息内容类型

Coze支持多种消息内容类型:

# 文本消息
text_message = {
    "role": "user",
    "content": "Hello",
    "content_type": "text"
}

# 图片消息
image_message = {
    "role": "user",
    "content": json.dumps([{
        "type": "image",
        "file_id": "file_xxx"
    }]),
    "content_type": "text"
}

# 卡片消息
card_message = {
    "role": "user", 
    "content": "这是卡片内容",
    "content_type": "text"
}

八、总结

本文详细讲解了Coze智能体SSE流式API调用的完整流程,包括:

  1. API架构:理解chat和stream_run两种模式的区别
  2. 认证配置:PAT Token的正确配置方式
  3. SSE响应解析:各种event类型的处理逻辑
  4. 实战代码:可运行的完整Python示例
  5. 踩坑指南:超时处理、流中断、answer为空的解决方案

核心要点

  • SSE流式响应的delta事件是数据核心,需要累加拼接
  • conversation.chat.completed是流结束的标志
  • 务必设置合理的超时时间,建议120秒以上
  • 建议实现降级方案,提升系统健壮性

有智能体定制开发需求?

如果你在AI智能体开发中遇到复杂问题,或者需要定制化的智能体解决方案,欢迎私信咨询!专注于Coze智能体开发、API对接、自动化系统搭建,支持企业级应用落地。

原创不易,码字艰辛。如果这篇文章对你有帮助,欢迎点赞、评论、收藏!有问题也欢迎在评论区留言,我会尽力解答。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐