AgentScope 2.0 框架分析
AgentScope 2.0 框架分析
分析日期:2026-06-11
项目地址:https://github.com/agentscope-ai/agentscope
分析范围:src/agentscope核心库 +src/agentscope/app服务层 +examples/示例项目

目录
- 一、总体概览与定位
- 二、整体目录结构
- 三、核心设计理念
- 四、核心运行时模块分析
- 五、生产级服务模块 app 分析
- 六、示例项目分析
- 七、一次 agent.reply 的完整数据流
- 八、模块协作关系图
- 九、基于此框架做 ChatGPT 式产品的开发指南
- 十、关键文件索引
一、总体概览与定位
AgentScope 2.0 是一个生产级的 AI Agent(智能体)开发框架,核心定位是:
- 地基(核心库
src/agentscope):负责"让一个 AI 学会思考、调用工具、记住对话"——这是单个 agent 的大脑。 - 毛坯房(服务层
src/agentscope/app):负责"把 AI 助手变成一个能同时服务很多用户、能保存聊天记录、能多人在线的网络服务"。 - 样板间(
examples/):官方已经用这套地基盖好了一个几乎就是 ChatGPT 网页版的完整示例(后端 + React 前端)。
与 ChatGPT 的概念对应
| ChatGPT 概念 | AgentScope 对应 |
|---|---|
| 一个登录的用户 | user_id(身份标识) |
| 你创建的某个"自定义 GPT" | Agent(一套助手配置:名字、人设、模型、工具) |
| 左侧的一个个"对话" | Session(一条对话线程) |
| 对话里的一条条消息 | Msg(消息对象) |
| 打字时一个字一个字蹦出来 | Event(流式事件,通过 SSE 推送) |
| 设置里填的 API Key | Credential(凭证) |
| 模型选择 | SessionConfig.chat_model_config |
| 多进程部署 | 开箱即用(Redis lock + bus) |
核心特性
- Event System → 统一事件总线,驱动前端和人机交互
- Permission System → 细粒度的工具/资源访问控制
- Multi-tenancy & Multi-session Service → 生产级多租户多会话隔离
- Workspace / Sandbox Support → 隔离的执行环境(本地/Docker/E2B)
- Extensible Middleware System → 可组合的钩子,扩展 Agent 推理循环
二、整体目录结构
agentscope/
├── src/agentscope/ ← 框架本体
│ ├── 【核心库】单个 Agent 的大脑
│ │ agent/ 模型推理-行动循环(ReAct)的核心
│ │ model/ 对接各家大模型(通义/OpenAI/Claude…)
│ │ message/ 统一的"消息"数据结构
│ │ event/ 事件系统(流式输出的协议)
│ │ tool/ 工具箱(让 AI 能执行命令、读写文件等)
│ │ formatter/ 把统一消息翻译成各家模型要的格式
│ │ middleware/ 中间件(不改源码就能扩展行为)
│ │ permission/ 权限系统(工具调用要不要先问用户)
│ │ workspace/ 沙箱(让 AI 在隔离环境里干活)
│ │ state/ 运行状态(用于保存/恢复对话)
│ │ mcp/ MCP 协议客户端
│ │ skill/ 技能加载
│ │ embedding/ 文本向量化
│ │ credential/ 凭证管理
│ │
│ └── 【服务层】app/ 把 Agent 变成多用户在线服务
│ _app.py create_app() 工厂函数
│ _router/ HTTP 接口(7组路由)
│ _service/ 业务逻辑层(ChatService 等)
│ storage/ 持久化层(RedisStorage)
│ message_bus/ 消息总线(RedisMessageBus)
│ _manager/ 后台管家(定时/唤醒/取消/后台任务)
│ middleware/ 服务层中间件
│ workspace_manager/ 工作区管理
│ deps.py 依赖注入
│ _lifespan.py 生命周期
│ _types.py 扩展点类型
│
├── examples/
│ ├── agent_service/ ← 后端服务示例(main.py)
│ └── web_ui/ ← React 前端 + 占位后端
│
├── tests/ docs/ scripts/
└── pyproject.toml README.md
三、核心设计理念
理念 1:统一消息(Msg)+ 事件(Event)协议
框架里所有信息——用户的话、AI 的思考、工具调用、工具结果、图片音频——都装进统一的 Msg 对象,里面是一个个 ContentBlock。AI 在工作时不断发出细粒度小事件,前端订阅这些事件流实时渲染。
理念 2:ReAct 循环——让模型自己"边想边做"
核心在 agent/_agent.py 的 _reply_impl。设计哲学:信任模型的推理和工具调用能力,少用死板的提示词约束。
理念 3:洋葱式中间件——不改源码就能扩展
middleware/ 提供 6 个钩子点,遵循"开闭原则"。
理念 4:状态可序列化 = 对话能保存和恢复
AgentState 是 Pydantic 模型,可整体序列化存进数据库,下次恢复时塞回 Agent 继续对话。
四、核心运行时模块分析
4.1 Agent 模块
文件路径:src/agentscope/agent/_agent.py
职责:编排推理-行动循环、上下文管理、权限检查、工具执行、事件发射。
核心类:
| 类/函数 | 文件 | 说明 |
|---|---|---|
Agent |
agent/_agent.py |
唯一 Agent 实现 |
ReActConfig |
agent/_config.py |
max_iters=20,控制循环上限 |
ContextConfig |
agent/_config.py |
上下文压缩阈值、摘要模板 |
ModelConfig |
agent/_config.py |
重试次数、fallback 模型 |
_ToolCallBatch |
agent/_utils.py |
工具调用批次(sequential/concurrent) |
关键方法签名:
# agent/_agent.py
async def reply_stream(inputs: Msg | list[Msg] | UserConfirmResultEvent | ExternalExecutionResultEvent | None) -> AsyncGenerator[AgentEvent, None]
async def reply(inputs: ...) -> Msg # 消费全部事件,返回最终 Msg
async def observe(msgs: Msg | list[Msg] | None) -> None
async def compress_context(context_config: ContextConfig | None = None) -> None
Agent 初始化参数:
class Agent:
def __init__(
self,
name: str, # Agent 标识名
system_prompt: str, # 系统提示词
model: ChatModelBase, # 大模型实例
toolkit: Toolkit | None = None, # 工具箱
middlewares: list[MiddlewareBase] | None = None, # 中间件链
state: AgentState | None = None, # 可恢复的运行时状态
offloader: Offloader | None = None, # 上下文 offloader
model_config: ModelConfig = ModelConfig(),
context_config: ContextConfig = ContextConfig(),
react_config: ReActConfig = ReActConfig(),
)
ReAct 循环实现(_reply_impl,约 542–699 行):
while cur_iter < max_iters:
action, data = _check_next_action()
if action == "exit": yield data; return
if action == "reasoning":
await compress_context()
async for evt in _reasoning(): ...
if evt is Msg (无 tool_call): yield ReplyEndEvent; return
for batch in _batch_tool_calls():
执行 sequential 或 concurrent 工具
若 RequireUserConfirmEvent / RequireExternalExecutionEvent: 暂停等待外部事件
cur_iter += 1
_check_next_action 决策表(2267–2361 行):
| 可执行工具 | 等待中工具 | 下一步 |
|---|---|---|
| 有 | 任意 | acting |
| 无 | 有 | exit(等待用户确认/外部执行) |
| 无 | 无 | reasoning |
reply_stream 工作机制(191–214 行):
- 调用内部
_reply(),它同时产出AgentEvent和最终Msg reply_stream只 yield 非 Msg 的事件(过滤掉最终消息)reply()则遍历全部产出,捕获最后一个Msg作为返回值
推理阶段(_reasoning_impl,755–880 行):
yield ModelCallStartEventkwargs = await _prepare_model_input()→ system prompt + summary + context + tool schemasres = await _call_model(**kwargs)→ 可能返回ChatResponse或AsyncGenerator[ChatResponse]- 流式 chunk 经
_convert_chat_response_to_event()转为TextBlockDeltaEvent、ToolCallDeltaEvent等 yield ModelCallEndEvent(含 token 用量)_save_to_context()写入 assistant 消息- 若无
ToolCallBlock→yield AssistantMsg(本轮结束)
工具执行阶段(_execute_tool_call,1293–1542 行):
- 校验工具可用性 + JSON Schema 校验输入
PermissionEngine.check_permission()决策:ASK / DENY / ALLOW- ASK →
RequireUserConfirmEvent,状态变为ASKING - ALLOW + 外部工具 →
RequireExternalExecutionEvent,状态变为SUBMITTED - ALLOW + 本地工具 →
_acting()→toolkit.call_tool()→ 流式ToolChunk→ 截断/offload →_save_to_context()
上下文压缩(_compress_context_impl,300–491 行):
- 当 token 超过
trigger_ratio * context_size时触发 - 用
model.generate_structured_output()生成结构化摘要 - 旧消息 offload 到 workspace,保留最近
reserve_ratio比例的消息
并发策略(_batch_tool_calls,1102–1139 行):
is_concurrency_safe=True的工具可并发执行- 非并发安全工具(如 Bash、Write)串行执行
- 并发执行用
asyncio.gather+ Queue 收集事件
4.2 Model 模块
文件路径:src/agentscope/model/
职责:统一 LLM 调用接口,屏蔽 DashScope / OpenAI / Anthropic / Gemini / Ollama 等差异。
核心类:
| 类 | 文件 | 说明 |
|---|---|---|
ChatModelBase |
model/_base.py |
抽象基类 |
ChatResponse |
model/_model_response.py |
模型响应(含 is_last 流式标志) |
ChatUsage |
model/_model_usage.py |
token 用量 |
StructuredResponse |
model/_model_response.py |
结构化输出(用于压缩摘要) |
ModelCard |
model/_model_card.py |
从 YAML 加载模型卡片 |
具体实现(model/__init__.py 导出):
AnthropicChatModelDashScopeChatModelDeepSeekChatModelGeminiChatModelOllamaChatModelOpenAIChatModelOpenAIResponseModelXAIChatModelMoonshotChatModel
统一接口(ChatModelBase):
async def __call__(messages: list[Msg], tools: list[dict] | None, tool_choice: ToolChoice | None, **kwargs) -> ChatResponse | AsyncGenerator[ChatResponse, None]
async def count_tokens(messages: list[Msg], tools: list[dict] | None) -> int # 默认 bytes/4 估算
async def generate_structured_output(messages, structured_model) -> StructuredResponse
@abstractmethod
async def _call_api(model_name, messages, tools, tool_choice, **kwargs) -> ...
统一化设计:
- 所有模型接收统一的
list[Msg],不直接对接厂商格式 - 每个模型持有**
FormatterBase实例**,在_call_api中await self.formatter.format(messages)转换 - 流式响应解析为**
ChatResponsechunks**(content为TextBlock | ToolCallBlock | ThinkingBlock | DataBlock序列) - 内置重试(
max_retries+_get_retryable_exceptions())+ Agent 层 fallback 模型
以 OpenAI 为例(model/_openai_chat/_model.py):
Msg[] → OpenAIChatFormatter.format() → OpenAI API messages[]
→ stream/non-stream → _parse_stream_response() → ChatResponse chunks
4.3 Message 模块
文件路径:src/agentscope/message/
职责:Agent 间信息存储与传输的统一数据模型。
核心类(message/_base.py, message/_block.py):
| 类 | 说明 |
|---|---|
Msg |
消息主体:name, content: list[ContentBlock], role, id, usage |
UserMsg() / AssistantMsg() / SystemMsg() |
工厂函数 |
TextBlock |
文本 |
ThinkingBlock |
推理/思考内容 |
ToolCallBlock |
工具调用(含 state 状态机) |
ToolResultBlock |
工具结果 |
DataBlock |
多模态二进制(Base64Source / URLSource) |
HintBlock |
循环中给 LLM 的提示(转为 user 消息) |
ToolCallState 状态机(message/_block.py,104–155 行):
pending → asking (需用户确认) → allowed → finished
pending → allowed → finished (本地执行)
allowed → submitted (外部工具) → finished (收到 ExternalExecutionResultEvent)
Msg.append_event()(message/_base.py,211–460 行):
- 将流式
AgentEvent增量重建为完整Msg.content - 用于
ChatService持久化:边收事件边拼装 reply 消息 - 支持 text/thinking/tool_call/tool_result/data 等全部事件类型
角色约束:
user:仅text+datasystem:仅textassistant:所有 block 类型
4.4 Event 模块
文件路径:src/agentscope/event/_event.py
职责:定义 Agent 执行过程中产出的所有事件类型,供流式传输和前端渲染。
EventType 枚举(20–60 行):
| 类别 | 事件类型 |
|---|---|
| 回复生命周期 | REPLY_START, REPLY_END, EXCEED_MAX_ITERS |
| 模型调用 | MODEL_CALL_START, MODEL_CALL_END |
| 文本流 | TEXT_BLOCK_START/DELTA/END |
| 思考流 | THINKING_BLOCK_START/DELTA/END |
| 多模态流 | DATA_BLOCK_START/DELTA/END |
| 工具调用流 | TOOL_CALL_START/DELTA/END |
| 工具结果流 | TOOL_RESULT_START, TOOL_RESULT_TEXT_DELTA, TOOL_RESULT_DATA_DELTA, TOOL_RESULT_END |
| 人机交互 | REQUIRE_USER_CONFIRM, REQUIRE_EXTERNAL_EXECUTION, USER_CONFIRM_RESULT, EXTERNAL_EXECUTION_RESULT |
| 扩展 | HINT_BLOCK, CUSTOM |
事件总线如何驱动前端:
Agent.reply_stream()
→ ChatService.run() (app/_service/_chat.py)
→ message_bus.session_publish_event(session_id, event.model_dump())
→ Redis Stream (replay log) + Pub/Sub (live fan-out)
→ 前端 GET /sessions/{sid}/stream (SSE)
→ session_read_events() 回放历史
→ session_subscribe_events() 实时订阅
→ data: {JSON event}\n\n
前端通过 reply_msg.append_event(event) 可将事件流重建为完整消息用于持久化。
4.5 Middleware 模块
文件路径:src/agentscope/middleware/_base.py
职责:在不修改 Agent 源码的前提下扩展推理循环。
核心类:MiddlewareBase
6 个钩子点:
| 钩子 | 模式 | 拦截点 |
|---|---|---|
on_reply |
洋葱 | 整个 reply 流程 |
on_reasoning |
洋葱 | 推理/模型调用阶段 |
on_acting |
洋葱 | 纯 toolkit.call_tool 执行 |
on_model_call |
洋葱 | 原始 LLM API 调用 |
on_compress_context |
洋葱 | 上下文压缩 |
on_system_prompt |
管道(顺序变换) | system prompt 字符串 |
洋葱链实现(以 _reply 为例,506–540 行):
async def execute_chain(index=0):
if index >= len(middlewares):
async for item in self._reply_impl(inputs): yield item
else:
async def next_handler(**kwargs):
async for item in execute_chain(index+1, **kwargs): yield item
async for item in mw.on_reply(agent, input_kwargs, next_handler): yield item
内置中间件:TracingMiddleware(middleware/_tracing/)
应用层中间件(app/middleware/,服务部署时使用):
InboxMiddleware:团队消息注入StateChangeMiddleware:状态变更发CustomEventToolOffloadMiddleware:长任务后台执行
设计要点:on_acting 只包裹工具 I/O,不含权限检查和 context 写入,便于安全地后台 offload。
4.6 Tool 模块
文件路径:src/agentscope/tool/
职责:工具注册、分组管理、Schema 生成、统一流式执行。
核心类:
| 类 | 文件 | 说明 |
|---|---|---|
Toolkit |
tool/_toolkit.py |
工具注册中心 |
ToolBase |
tool/_base.py |
工具抽象协议 |
ToolGroup |
tool/_tool_group.py |
工具分组(可动态激活/停用) |
ToolChunk / ToolResponse |
tool/_response.py |
流式/完整工具结果 |
ToolChoice |
tool/_types.py |
模型工具选择策略 |
FunctionTool / MCPTool |
tool/_adapters.py |
函数/MCP 适配器 |
关键方法:
# Toolkit
async def get_tool_schemas(groups: list[str] | None) -> list[dict]
async def call_tool(tool_call: ToolCallBlock, state: AgentState) -> AsyncGenerator[ToolChunk | ToolResponse, None]
async def check_tool_available(tool_name, activated_groups) -> ToolBase
async def get_skill_instructions(activated_groups) -> str | None
工具调用流程:
ToolCallBlock → call_tool()
→ 检查工具组是否激活
→ JSON 解析 input + state 注入 (_agent_state)
→ 执行 tool.__call__() → yield ToolChunk(s) → 最终 ToolResponse
内置工具(tool/_builtin/):
| 工具 | 特性 |
|---|---|
Read |
只读、并发安全、is_state_injected=True(文件缓存) |
Write |
写文件 |
Edit |
编辑文件 |
Bash |
执行 shell,非并发安全,有命令解析器 |
Grep |
内容搜索 |
Glob |
文件搜索 |
ResetTools |
元工具:激活/停用工具组 |
SkillViewer |
元工具:查看 skill 指令 |
还有任务管理工具:TaskCreate/Update/Get/List(tool/_task/)
4.7 Formatter 模块
文件路径:src/agentscope/formatter/
职责:将统一的 list[Msg] 转换为各 LLM 厂商 API 所需的消息格式。
核心类:FormatterBase(formatter/_formatter_base.py)
@abstractmethod
async def format(self, *args, **kwargs) -> list[dict[str, Any]]
每个 Provider 一对 Formatter:
| Formatter | 对应模型 |
|---|---|
OpenAIChatFormatter |
OpenAI Chat Completions |
OpenAIResponseFormatter |
OpenAI Responses API |
DashScopeChatFormatter |
通义千问 |
AnthropicChatFormatter |
Claude |
GeminiChatFormatter |
Gemini |
DeepSeekChatFormatter |
DeepSeek |
MoonshotChatFormatter |
Moonshot |
XAIChatFormatter |
xAI |
还有 *MultiAgentFormatter 变体,处理多 Agent 对话历史(工具序列分组、消息折叠)。
关键能力:
_group_messages():将消息分为tool_sequence和agent_message组convert_tool_result_to_string():不支持多模态 tool result 的 API,将DataBlock提升为 user 消息或转文字描述input_types:控制哪些media_type的DataBlock可传给 API
Formatter 在模型初始化时注入,在 _call_api 第一步调用。
4.8 State 模块
文件路径:src/agentscope/state/_state.py
职责:Agent 可持久化的全部运行时状态。
核心类:
class AgentState(BaseModel):
session_id: str
summary: str | list[TextBlock | DataBlock] # 压缩后的历史摘要
context: list[Msg] # 未压缩的对话上下文
reply_id: str # 当前 reply 的 ID
cur_iter: int # ReAct 循环迭代计数
permission_context: PermissionContext
tool_context: ToolContext # 工具组激活状态 + Read 文件缓存
tasks_context: TaskContext # 任务列表
序列化:AgentState 是 Pydantic BaseModel,由 ChatService.run() 结束时 storage.update_session_state() 持久化。
ToolContext 管理:
activated_groups:当前激活的工具组read_file_cache:Read 工具的 LRU 文件缓存
Task(state/_task.py):任务看板条目,含 pending/in_progress/completed 状态和依赖关系。
4.9 Memory(无独立模块)
代码库中没有 src/agentscope/memory 目录。记忆机制内建于 Agent 状态:
| 机制 | 实现位置 | 说明 |
|---|---|---|
| 短期记忆 | AgentState.context |
完整对话历史(含 tool_call/result) |
| 长期记忆 | AgentState.summary |
超 token 阈值时 LLM 生成的结构化摘要 |
| 外部记忆 | Offloader / WorkspaceBase |
压缩/offload 的内容存到文件系统 |
| 观察 | Agent.observe() |
外部消息写入 context,不触发 reply |
五、生产级服务模块 app 分析
5.1 总体服务架构
AgentScope 的 app 层是一个 FastAPI 生产服务,核心设计是双后端分离:
| 组件 | 职责 | 默认实现 |
|---|---|---|
| Storage | 持久化记录(Agent、Session、消息、凭证、定时任务、Team) | RedisStorage(JSON + Redis Set/List) |
| MessageBus | 实时协调(分布式锁、SSE 事件、Inbox、Wake-up、Cancel) | RedisMessageBus(Stream + Pub/Sub + SET NX 锁) |
| WorkspaceManager | 每会话沙箱(工具/MCP/Skill 执行环境) | Local / Docker / E2B |
入口工厂:create_app()(src/agentscope/app/_app.py)
app = create_app(
storage=RedisStorage(),
message_bus=RedisMessageBus(),
workspace_manager=LocalWorkspaceManager(basedir="..."),
)
核心对话流程:
前端 POST /chat/ 触发对话
→ ChatRunRegistry.spawn(ChatService.run(...)) # 后台任务
→ 前端 GET /sessions/{sid}/stream # 长连接 SSE 收事件
ChatService.run:
1. storage.get_session → 加载 AgentState
2. 组装 Agent + Toolkit + Middleware
3. message_bus.session_run(session_id) # 分布式锁,全局单会话单 run
4. agent.reply_stream → 事件 publish 到 bus
5. storage.upsert_message + update_session_state # 持久化
5.2 FastAPI 路由层
共 7 个 Router:
5.2.1 /agent — Agent 配置 CRUD
文件:src/agentscope/app/_router/_agent.py
| 方法 | 路径 | 作用 |
|---|---|---|
| GET | /agent/schema |
返回前端表单用的 JSON Schema(identity/context/react) |
| GET | /agent/ |
列出当前用户所有 Agent(source=user,不含 team worker) |
| POST | /agent/ |
创建 Agent(name、system_prompt、context_config、react_config) |
| PATCH | /agent/{agent_id} |
部分更新 Agent |
| DELETE | /agent/{agent_id} |
删除 Agent,级联删所有 Session + Schedule + 取消运行中任务 |
5.2.2 /sessions — 会话生命周期 + 消息 + SSE 流
文件:src/agentscope/app/_router/_session.py
| 方法 | 路径 | 作用 |
|---|---|---|
| GET | /sessions/?agent_id= |
列出某 Agent 下所有 Session,附带 is_running、Team 详情 |
| POST | /sessions/ |
创建 Session(绑定 workspace、model config) |
| PATCH | /sessions/{session_id}?agent_id= |
更新 Session(模型、权限模式等) |
| DELETE | /sessions/{session_id}?agent_id= |
删除 Session(跨进程 cancel + 清 bus) |
| GET | /sessions/{session_id}/messages?agent_id= |
分页读持久化消息历史 |
| GET | /sessions/{session_id}/stream?agent_id= |
SSE 长连接:replay 当前 run 事件 + 实时订阅 |
SSE 是 ChatGPT 式体验的关键:POST /chat 只返回 {status: "started"},所有流式输出走 /stream。
5.2.3 /chat — 触发对话(Fire-and-Forget)
文件:src/agentscope/app/_router/_chat.py
| 方法 | 路径 | 作用 |
|---|---|---|
| POST | /chat/ |
触发一次 chat run,立即返回;409 若同 session 本进程已有 run |
input 支持四种形态:
Msg/list[Msg]:新用户消息UserConfirmResultEvent/ExternalExecutionResultEvent:人机确认/外部执行恢复None:从当前状态继续(wake-up 场景)
5.2.4 /credential — API 凭证管理
文件:src/agentscope/app/_router/_credential.py
| 方法 | 路径 | 作用 |
|---|---|---|
| GET | /credential/schemas |
所有凭证类型的 JSON Schema(动态表单) |
| GET | /credential/ |
列出用户凭证 |
| POST | /credential/ |
创建凭证 |
| PATCH | /credential/{credential_id} |
更新凭证 |
| DELETE | /credential/{credential_id} |
删除凭证 |
5.2.5 /model — 模型列表
文件:src/agentscope/app/_router/_model.py
| 方法 | 路径 | 作用 |
|---|---|---|
| GET | /model/?provider= |
按 provider 类型列出可用模型 |
5.2.6 /schedule — 定时任务 CRUD
文件:src/agentscope/app/_router/_schedule.py
| 方法 | 路径 | 作用 |
|---|---|---|
| GET | /schedule/ |
列出用户所有 cron 任务 |
| POST | /schedule/ |
创建并注册 APScheduler job |
| PATCH | /schedule/{schedule_id} |
更新(cron/enable 等),自动 reschedule |
| DELETE | /schedule/{schedule_id} |
删除 + 级联删执行 Session |
| GET | /schedule/{schedule_id}/sessions |
查看该 schedule 触发的所有 Session |
5.2.7 /workspace — MCP + Skill 管理
文件:src/agentscope/app/_router/_workspace.py
| 方法 | 路径 | 作用 |
|---|---|---|
| GET | /workspace/mcp?agent_id=&session_id= |
列出 MCP 客户端 + 健康状态 + tools |
| POST | /workspace/mcp?... |
添加 MCP |
| DELETE | /workspace/mcp/{mcp_name}?... |
移除 MCP |
| GET | /workspace/skill?... |
列出 Skill |
| POST | /workspace/skill?... |
添加 Skill |
| DELETE | /workspace/skill/{skill_name}?... |
移除 Skill |
5.3 业务服务层
5.3.1 ChatService — 核心对话引擎
文件:src/agentscope/app/_service/_chat.py
唯一对话执行入口,HTTP /chat 和 WakeupDispatcher 都调用 ChatService.run()。
执行步骤:
- 加载:
get_agent+get_session+get_workspace - 组装 Toolkit:
get_toolkit()(workspace 工具 + 规划 + 定时 + Team + MCP/Skill) - 组装 Middleware:
InboxMiddleware→StateChangeMiddleware→ToolOffloadMiddleware+ 用户自定义 - 组装 Model:从 Session 的
chat_model_config+ Credential 构建 - 创建 Agent:
state=session_record.state(从存储恢复状态) - 分布式锁内运行:
- Case A:新消息 →
upsert_message→reply_stream→ 事件 publish - Case B:续接工具调用 → 从 storage 取 reply Msg → 追加 event
- Case A:新消息 →
- 持久化:
upsert_message(reply_msg)+update_session_state(agent.state)
关键设计:update_session_state 必须在 session_run 锁内完成,防止多进程竞态。
5.3.2 SessionService — 跨资源生命周期
文件:src/agentscope/app/_service/_session.py
唯一同时操作 Storage + MessageBus 的组件。
级联删除链:
delete_session → cancel(bus) → storage.delete_session → bus.session_purge
delete_agent → 对每个 session 调 delete_session → storage.delete_agent
delete_schedule → 对每个 spawned session 调 delete_session
delete_team → 对每个 worker agent 调 delete_agent
5.3.3 get_toolkit() — 工具集组装
文件:src/agentscope/app/_service/_toolkit.py
按顺序挂载:Workspace 工具 → TaskCreate/List/Get/Update → TaskStop → Schedule* → Team* → 用户 extra tools → MCP/Skill。
5.3.4 get_model() — 模型实例化
文件:src/agentscope/app/_service/_model.py
从 CredentialRecord + ChatModelConfig 构建 ChatModelBase。
5.4 持久化层 Storage
技术选型
- 不是传统 ORM/SQL,而是抽象接口
StorageBase+ Pydantic 模型 + JSON 序列化 - 当前唯一完整实现:
RedisStorage(redis.asyncio) - 设计上预留了换 SQL 的可能(注释明确 storage 与 bus 可分离)
数据模型(storage/_model/)
| 模型 | 文件 | 核心字段 |
|---|---|---|
_RecordBase |
_base.py |
id, created_at, updated_at |
AgentRecord |
_agent.py |
user_id, source(user/team), data(AgentData) |
SessionRecord |
_session.py |
user_id, agent_id, config, state: AgentState, team_id, source |
SessionConfig |
_session.py |
workspace_id, name, chat_model_config, fallback_chat_model_config |
CredentialRecord |
_credential.py |
user_id, data(dict) |
ScheduleRecord |
_schedule.py |
user_id, agent_id, data(ScheduleData) |
TeamRecord |
_team.py |
user_id, session_id(leader), data(TeamData) |
UserRecord |
_user.py |
仅占位,当前未使用 |
Redis Key 设计(多用户隔离的核心)
文件:src/agentscope/app/storage/_redis_storage.py
所有 key 以 agentscope:user:{user_id}: 为前缀:
agentscope:user:{user_id}:agent:{agent_id} # Agent 记录
agentscope:user:{user_id}:session:{session_id} # Session 记录(含 AgentState JSON)
agentscope:user:{user_id}:session:{session_id}:messages # 消息 List
agentscope:user:{user_id}:agent:{agent_id}:sessions # Session 索引 Set
agentscope:user:{user_id}:credential:{credential_id}
agentscope:user:{user_id}:schedule:{schedule_id}
agentscope:user:{user_id}:team:{team_id}
agentscope:schedules # 全局 schedule 索引(启动恢复用)
可选 key_ttl 滑动过期。
会话状态序列化与恢复
保存:SessionRecord.model_dump_json() 整体写入 Redis String;热路径用 update_session_state() 只更新 state 字段。
恢复:ChatService._run_impl 加载 session_record.state → 传给 Agent(state=agent_state) → Agent 从该 state 继续推理。
消息双轨存储:
AgentState.context:Agent 运行时 LLM 上下文(含 tool call 状态、HintBlock 等)storage.list_messages:面向 UI 的持久化消息列表(Msg对象,Redis List)
每次 run 结束:upsert_message(reply_msg) + update_session_state(agent.state) 同步更新。
5.5 Manager 层
ChatRunRegistry — 本进程任务注册表
文件:src/agentscope/app/_manager/_chat_run_registry.py
- 维护
session_id → asyncio.Task映射 - 同 session 本进程只能有一个 run(与 bus 分布式锁配合)
- 应用 shutdown 时 cancel 所有 in-flight run
WakeupDispatcher — 跨会话/跨进程唤醒
文件:src/agentscope/app/_manager/_wakeup_dispatcher.py
- 每进程一个 asyncio 任务,订阅
agentscope:wakeup_signal - 收到信号 →
dequeue_wakeups()→ 对 idle session 调用ChatService.run(input_msg=None) - 触发源:TeamSay、Schedule 触发、后台工具完成、AgentCreate 等
CancelDispatcher — 跨进程取消
文件:src/agentscope/app/_manager/_cancel_dispatcher.py
- 订阅全局 cancel 频道
agentscope:session:cancel - 收到
session_id→ 本进程 cancel 对应 chat run + BG tasks
BackgroundTaskManager — 长耗时工具后台化
文件:src/agentscope/app/_manager/_background_task_manager.py
- 注册后台 asyncio 任务,提供
TaskStop工具 - 完成时通过 inbox + wakeup 投递结果(跨进程安全)
SchedulerManager — Cron 定时任务
文件:src/agentscope/app/_manager/_scheduler/_scheduler_manager.py
- APScheduler 管理 cron job
- 触发时不直接调 ChatService,而是 inbox_push(HintBlock) + enqueue_wakeup
- 启动时从
storage.list_all_schedules()恢复所有 enabled schedule - stateful 模式复用固定 session_id
{schedule_id}_stateful
5.6 消息总线 MessageBus
抽象与 Redis 实现
- 基类:
src/agentscope/app/message_bus/_base.py - 实现:
src/agentscope/app/message_bus/_redis_message_bus.py
五种模式:
| 模式 | Redis 原语 | 用途 |
|---|---|---|
| A Drain Queue | Stream + XDEL | Inbox、Wake-up 队列 |
| C Replay Log | Stream (只读) | Session 事件 replay |
| D Broadcast | Pub/Sub | 实时 SSE、Cancel、Wake-up 信号 |
| E Distributed Lock | SET NX + heartbeat | 每 session 单 run |
多会话/多进程如何工作
单 Session 串行化:
session_run(session_id) → acquire_lock("agentscope:session:lock:{sid}")
任意进程、任意时刻,同一 session 只有一个 chat run。
SSE 多订阅者:
session_publish_event → log_append(持久) + publish(实时)
session_subscribe_events → Pub/Sub 订阅
session_read_events → 新连接先 replay 当前 run 缓冲
跨 Session 通信(Team):
TeamSay → inbox_push(target_session) + enqueue_wakeup
WakeupDispatcher(任意进程) → ChatService.run(input=None)
InboxMiddleware → inbox_drain → 注入 HintBlock 到 context
跨进程 Cancel:
SessionService.cancel_session_run → session_publish_cancel
CancelDispatcher(各进程) → 本地 cancel task
5.7 服务层中间件
Agent 中间件(挂载到 Agent 推理链)
| 中间件 | 文件 | 作用 |
|---|---|---|
InboxMiddleware |
app/middleware/_inbox_middleware.py |
每步 reasoning 前 drain inbox,注入 HintBlock |
StateChangeMiddleware |
app/middleware/_state_change_middleware.py |
tool 执行后检测 state/team 变化,push CustomEvent 到 SSE |
ToolOffloadMiddleware |
app/middleware/_tool_offload_middleware.py |
工具超时 → 后台执行 → 完成时 inbox+wakeup |
协议中间件(ASGI 层)
| 中间件 | 文件 | 作用 |
|---|---|---|
ProtocolMiddlewareBase |
app/middleware/_protocol/_base.py |
拦截 StreamingResponse,将 AgentEvent 转为目标协议 |
AGUIProtocolMiddleware |
app/middleware/_protocol/_agui.py |
转为 AG-UI 协议(RunStarted/Finished、TextMessage、ToolCall 等) |
通过 create_app(..., extra_middlewares=[...]) 挂载,不是默认启用。
5.8 生命周期与依赖注入
_lifespan.py — 启动/关闭顺序
文件:src/agentscope/app/_lifespan.py
AsyncExitStack 管理(逆序关闭):
1. storage.__aenter__
2. message_bus.__aenter__
3. workspace_manager.__aenter__
4. BackgroundTaskManager
5. ChatRunRegistry
6. SchedulerManager(启动 APScheduler + 恢复 schedules)
7. ChatService / SessionService(无 lifecycle,直接构造)
8. WakeupDispatcher
9. CancelDispatcher
deps.py — FastAPI Depends
文件:src/agentscope/app/deps.py
| 依赖函数 | 来源 |
|---|---|
get_current_user_id |
X-User-ID Header(临时方案,注释说将来换 JWT) |
get_storage |
request.app.state.storage |
get_message_bus |
request.app.state.message_bus |
get_chat_service |
request.app.state.chat_service |
get_session_service |
request.app.state.session_service |
get_chat_run_registry |
request.app.state.chat_run_registry |
get_scheduler_manager |
request.app.state.scheduler_manager |
get_background_task_manager |
request.app.state.background_task_manager |
get_workspace_manager |
request.app.state.workspace_manager |
get_extra_agent_middlewares/tools |
用户工厂函数 |
_types.py — 扩展点
AgentMiddlewareFactory:(user_id, agent_id, session_id) → list[MiddlewareBase]AgentToolFactory:(user_id, agent_id, session_id) → list[ToolBase]SubAgentTemplate: Team 子 Agent 蓝图
5.9 多租户/多会话隔离机制
「多租户」= user_id,不是 tenant_id
框架没有显式 tenant 概念,隔离单元是 user_id:
- 所有 Storage API 第一个参数都是
user_id - Redis key 全部带
user_id前缀 - 路由层通过
Depends(get_current_user_id)注入,目前来自X-User-IDHTTP Header UserRecord模型存在但未接入(占位)
要做真正的多租户 SaaS,你需要:
- 把
get_current_user_id换成 JWT/OAuth 解析 - 可选:在
extra_agent_middlewares里注入租户级隔离 - 可选:WorkspaceManager 用
user_id分目录
多会话隔离
数据模型层级:
User (user_id)
└── Agent (agent_id) # 一个「助手」配置
└── Session (session_id) # 一次对话线程
├── SessionConfig # workspace、model、name
├── AgentState # 运行时状态(context、tasks、permissions)
├── Messages[] # UI 消息历史
└── Workspace # 独立沙箱(tools/MCP/skills)
隔离手段:
- Storage:
(user_id, agent_id, session_id)三元组校验所有权 - MessageBus:lock/events/inbox 均按
session_id隔离(lock key 不含 user_id,但 session_id 全局唯一 UUID) - 运行时:
session_run分布式锁保证同 session 全局单 run - SSE:每个 session 独立 stream 端点
状态持久化与恢复完整流程
创建会话:
POST /sessions/ { agent_id, workspace_id, chat_model_config }
→ storage.upsert_session(user_id, agent_id, config, state=AgentState())
→ 返回 session_id
发送消息:
POST /chat/ { agent_id, session_id, input: Msg }
→ ChatService.run:
session_record = storage.get_session(...) # 加载 AgentState
agent = Agent(state=session_record.state) # 恢复
async with bus.session_run(session_id): # 加锁
storage.upsert_message(user_msg) # 存用户消息
async for event in agent.reply_stream(): # 推理
bus.session_publish_event(...) # SSE 推送
storage.upsert_message(reply_msg) # 存助手回复
storage.update_session_state(agent.state) # 存完整 AgentState
刷新页面恢复:
GET /sessions/?agent_id= → 会话列表 + state + is_running
GET /sessions/{sid}/messages → 历史消息(ChatGPT 消息列表)
GET /sessions/{sid}/stream → 重连 SSE(replay + live)
POST /chat/ { input: null } → 若有 pending inbox/tool,继续推理
AgentState.context vs messages 的区别:
messages:面向用户的完整 Msg(含 tool call 结果渲染)AgentState.context:Agent 内部 LLM 上下文(可能含 summary 压缩、HintBlock、tool 中间态)- 两者在每次 run 结束时同步更新,但语义不同——做 ChatGPT 克隆时 UI 主要读
messages
六、示例项目分析
6.1 agent_service 后端服务
文件:examples/agent_service/main.py
启动入口做了什么
| 步骤 | 内容 |
|---|---|
| 1. 配置 MCP 工具 | 默认挂载 browser-use(Playwright MCP);若环境变量 AMAP_API_KEY 存在,追加高德地图 MCP |
2. 调用 create_app(...) |
注入存储、消息总线、工作区管理器、子 Agent 模板、CORS 中间件 |
3. uvicorn.run(...) |
监听 0.0.0.0:8000,reload=True 热重载 |
app = create_app(
storage=RedisStorage(host="localhost", port=6379),
message_bus=RedisMessageBus(host="localhost", port=6379),
workspace_manager=LocalWorkspaceManager(
basedir=.../workspaces,
default_mcps=default_mcps,
),
custom_subagent_templates=[SubAgentTemplate(type="explorer", ...)],
extra_middlewares=[Middleware(CORSMiddleware, allow_origins=["*"], ...)],
)
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
create_app 如何构建 FastAPI
文件:src/agentscope/app/_app.py
核心职责:
- 创建
FastAPI实例,绑定lifespan生命周期管理 - 把
storage、message_bus、workspace_manager等挂到app.state - 自动注册 7 组内置路由
- 添加用户自定义中间件(示例里加了 CORS)
存储 / 消息总线 / 工作区 / 工具配置
| 组件 | 示例配置 | 作用 |
|---|---|---|
| RedisStorage | localhost:6379 |
持久化 Agent、Session、Message、Credential、Team 等,实现多租户隔离 |
| RedisMessageBus | localhost:6379 |
会话级分布式锁、事件 replay 日志、SSE 实时推送、跨进程 inbox |
| LocalWorkspaceManager | workspaces/ 目录 |
每个 session 独立工作目录 + 默认 MCP |
| MCP 工具 | Playwright + 可选高德 | 通过 workspace 注入 agent toolkit |
| SubAgentTemplate | explorer 只读探索型子 Agent |
团队模式下 AgentCreate 可选用 |
Model 不在 main.py 里硬编码,而是每个 Session 的 chat_model_config 里配置(含 credential_id + model),运行时由 ChatService 从 storage 解析凭证并实例化模型。
6.2 web_ui 前端
技术栈
| 类别 | 技术 |
|---|---|
| 框架 | React 19 + TypeScript |
| 构建 | Vite 8 |
| 路由 | react-router-dom 7 |
| 样式 | Tailwind CSS 4 + shadcn/ui(Radix UI) |
| 消息协议 | @agentscope-ai/agentscope npm 包(Msg / AgentEvent / appendEvent) |
| Markdown | react-markdown + remark-gfm |
| 国际化 | i18next |
| 动画/引导 | framer-motion、onborda |
| 通知 | sonner |
首次连接配置
文件:examples/web_ui/frontend/src/pages/setup/index.tsx
用户填写:
- Server URL →
localStorage.server_url(如http://localhost:8000) - Username →
localStorage.username(作为X-User-ID)
App.tsx 检测无 server_url 时强制显示 Setup 页。
API 客户端层
client.ts — 统一 HTTP 封装
文件:examples/web_ui/frontend/src/api/client.ts
getBaseUrl()/getUserId()从 localStorage 读取- 每个请求自动带
X-User-IDheader - 提供
get/post/patch/delete+stream()(返回原始Response,供 SSE 解析) - 错误统一 toast + 抛出
ApiError
主要 API 模块:
| 文件 | 端点 | 用途 |
|---|---|---|
agent.ts |
/agent/ |
Agent CRUD |
session.ts |
/sessions/ |
会话 CRUD、历史消息、SSE 流 |
chat.ts |
POST /chat/ |
触发对话(不返回流) |
credential.ts |
/credential/ |
API Key 等凭证 |
workspace.ts |
/workspace/ |
MCP / Skill |
model.ts |
/model/ |
模型列表 |
types.ts |
— | 与后端 Pydantic schema 对齐的 TS 类型 |
核心 Hooks
useSessions(agentId) — 会话列表
文件:examples/web_ui/frontend/src/hooks/useSessions.ts
GET /sessions/?agent_id=xxx拉取SessionView[]- 每个
SessionView包含:session记录 +is_running+ 可选team详情 - 提供
create/update/remove/refetch
useMessages(agentId, sessionId) — 消息 + 流式(核心 Hook)
文件:examples/web_ui/frontend/src/hooks/useMessages.ts
双通道设计:
| 通道 | API | 用途 |
|---|---|---|
| 历史 | GET /sessions/{sid}/messages |
加载已持久化的完整 Msg[] |
| 实时 | GET /sessions/{sid}/stream (SSE) |
长连接接收 AgentEvent 增量 |
生命周期(useEffect):
- 清空本地状态
- 拉历史消息 → 写入
msgsRef - 打开 SSE 长连接,
for await消费事件 - 组件卸载时
AbortController.abort()
事件处理:
REPLY_START→ 新建AssistantMsg,streaming=trueDATA_BLOCK_DELTA等 →appendEvent()增量拼接到当前 replyREPLY_END→streaming=falseCUSTOM(team_updated/state_updated)→ 回调通知 UI 刷新
发送消息 send(content):
- 乐观更新:本地 append
UserMsg POST /chat/触发后端(fire-and-forget)- 回复事件从已打开的 SSE 流入,不是从 POST 响应来
页面结构
App.tsx
├── SetupPage(无 server_url 时)
└── RouterProvider
├── /setup
└── AppLayout(左侧图标导航)
├── /chat/:agentId?/:sessionId?/:memberId? → ChatPage
├── /schedule → 定时任务
└── /credential → 凭证管理
ChatPage(pages/chat/index.tsx)职责拆分:
| 组件 | 职责 |
|---|---|
| ChatPage 外层 | URL 驱动选中 agent/session;左侧 sidebar 显示 agent 下拉 + 会话列表 |
| TeamSidebar | 团队模式下切换 member |
| ChatViewport | 右侧主面板:模型选择、权限模式、消息区、工作区 drawer |
URL 即状态源:/chat/{agentId}/{sessionId}/{memberId?},支持浏览器前进后退、刷新保持状态。
6.3 完整链路:用户发消息到流式渲染
时序图
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 A:进入对话页 │
│ ① useMessages → GET /sessions/{sid}/messages → 获取历史消息 │
│ ② useMessages → GET /sessions/{sid}/stream → 打开 SSE 长连接 │
│ (先 replay 当前 run 缓冲事件,再订阅 live 事件) │
└──────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 B:用户输入并发送 │
│ ① TextInput → onSend(contentBlocks) │
│ ② 乐观更新:本地 append UserMsg │
│ ③ POST /chat/ {agent_id, session_id, input: UserMsg} │
│ → 后端 ChatRunRegistry.spawn(ChatService.run) │
│ → 立即返回 {status: "started"}(不等 agent 跑完) │
└──────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 C:后端 Agent 执行 │
│ ① 加载 agent/session/workspace 记录 │
│ ② get_toolkit() 组装工具 + get_model() 实例化 LLM │
│ ③ 组装 Agent(...) + 挂载 middleware(Inbox/StateChange/ToolOffload) │
│ ④ 在 message_bus.session_run(session_id) 分布式锁内: │
│ - 持久化用户消息 │
│ - async for event in agent.reply_stream(inputs=...): │
│ 每个 event → message_bus.session_publish_event(session_id) │
│ - 存完整 assistant 消息 + 更新 agent state │
└──────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 D:前端流式渲染 │
│ ① SSE 收到 data: {...} → JSON.parse 为 AgentEvent │
│ ② processEvent(event): │
│ - REPLY_START → 新建 assistant 消息气泡 │
│ - TEXT_BLOCK_DELTA → appendEvent 增量更新 content(打字效果) │
│ - TOOL_CALL → MessageBubble 显示工具调用 UI │
│ - REPLY_END → streaming=false │
│ ③ requestAnimationFrame 批量 setMsgs(避免每 token 重渲染) │
│ ④ MessageBubble 用 react-markdown 渲染文本,专用 renderer 渲染工具 │
└──────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 E:Human-in-the-loop(工具确认,可选) │
│ 用户点确认/拒绝 → onUserConfirm() │
│ → 再次 POST /chat/,input 为 UserConfirmResultEvent │
│ → 后端 Case B 续跑 agent;事件仍走同一 SSE 连接 │
└──────────────────────────────────────────────────────────────────────┘
关键设计要点
-
「POST 触发 + SSE 订阅」双通道:POST 只负责"点火",SSE 长连接负责"看火"。好处是同一 session 可接收多种来源(用户消息、定时任务、团队成员、后台唤醒)的事件。
-
SSE 连接跨多次 POST /chat/ 保持打开,不需要每次发消息重连。
-
实时 vs 历史的边界:
| 时机 | 数据来源 |
|---|---|
| 首次进入 / 切换 session | HTTP 拉历史 |
| 正在对话 / agent 运行中 | SSE 增量事件 |
| 对话结束后 | 后端已 upsert 完整 Msg;下次进入从历史 API 读到 |
- 官方 npm 包
@agentscope-ai/agentscope:前端与后端共用同一套 Msg / AgentEvent 协议,appendEvent()在前端复现后端的增量拼接逻辑,保证渲染与持久化一致。
七、一次 agent.reply 的完整数据流
以**用户发送一条新消息、模型回复文本(无工具调用)**为例:
┌─────────────────────────────────────────────────────────────────┐
│ 1. 入口 │
│ agent.reply(UserMsg("user", "你好")) │
│ → _reply() [middleware 链] → _reply_impl() │
└──────────────────────────┬──────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ 2. 输入处理 │
│ _check_incoming_event(None) → False(非续接) │
│ _handle_incoming_messages(msgs) → state.context.append(msg) │
│ state.reply_id = uuid4(); state.cur_iter = 0 │
│ yield ReplyStartEvent │
└──────────────────────────┬──────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ 3. ReAct 循环 (iter=0) │
│ _check_next_action() → ("reasoning", None) │
│ compress_context() → token 未超阈值,跳过 │
└──────────────────────────┬──────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ 4. 推理 _reasoning_impl() │
│ yield ModelCallStartEvent │
│ _prepare_model_input(): │
│ messages = [SystemMsg, (summary?), ...context] │
│ tools = toolkit.get_tool_schemas(activated_groups) │
│ _call_model(messages, tools) │
│ → model.__call__() [+ model_call middleware] │
│ → formatter.format(messages) → 厂商 API 格式 │
│ → _call_api() → 流式 ChatResponse chunks │
│ 每个 chunk → _convert_chat_response_to_event(): │
│ TextBlockDeltaEvent / ThinkingBlockDeltaEvent / ... │
│ yield ModelCallEndEvent(input_tokens, output_tokens) │
│ _save_to_context(blocks, usage) → context 追加 AssistantMsg │
│ 无 ToolCallBlock → yield AssistantMsg (最终回复) │
└──────────────────────────┬──────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ 5. 结束 │
│ yield ReplyEndEvent │
│ reply() 捕获最后的 AssistantMsg 并返回 │
│ reply_stream() 则只 yield 上述所有 AgentEvent(不含 Msg) │
└─────────────────────────────────────────────────────────────────┘
若模型返回 tool_call,循环继续:
_reasoning → 产出 ToolCallBlock(不 yield 最终 Msg)
→ _batch_tool_calls() 分批
→ _execute_tool_call():
校验 → 权限检查 → _acting() → toolkit.call_tool()
→ ToolResultStart/TextDelta/End 事件
→ _save_to_context(ToolResultBlock)
→ cur_iter += 1
→ _check_next_action() → "reasoning"(无更多待执行工具)
→ 再次 _reasoning(模型看到工具结果,继续推理)
若需用户确认:
权限 ASK → RequireUserConfirmEvent → reply 暂停
用户确认 → agent.reply(UserConfirmResultEvent)
→ _handle_incoming_event() → 更新 tool_call state
→ 继续 acting / reasoning
八、模块协作关系图
┌──────────────────────────────────────┐
│ Agent 层 │
│ ┌────────────────────────────────┐ │
│ │ Agent (ReAct 循环) │ │
│ │ ├── 持有 AgentState │ │
│ │ ├── 持有 Toolkit │ │
│ │ ├── 持有 ChatModelBase │ │
│ │ └── 产出 AgentEvent │ │
│ └────────────────────────────────┘ │
│ ▲ │ │ │
│ │ │ │ │
│ Middleware Model Tool │
│ (拦截) (推理) (执行) │
└──────────────────────────────────────┘
│
┌───────────┼───────────┐
▼ ▼ ▼
┌─────────────┐ ┌──────────┐ ┌─────────┐
│ FormatterBase│ │ToolChunk │ │ToolBase │
│ → 厂商 API │ │ToolResp │ │ (内置) │
└─────────────┘ └──────────┘ └─────────┘
│
▼
┌─────────────────────────────────────────┐
│ 服务层 (app) │
│ ┌─────────────────────────────────────┐│
│ │ ChatService.run() ││
│ │ → publish events → MessageBus ││
│ │ → upsert Msg → Storage ││
│ │ → update state → Storage ││
│ └─────────────────────────────────────┘│
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────────┐ │
│ │MessageBus│ │ Storage │ │
│ │(Redis) │ │ (Redis) │ │
│ │锁/SSE/ │ │ Agent/Sess/ │ │
│ │Inbox │ │ Msg/Cred │ │
│ └──────────┘ └──────────────┘ │
└─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 前端 (React) │
│ GET /sessions/{id}/stream → SSE │
│ appendEvent() → 增量渲染 │
│ POST /chat/ → 触发 │
└─────────────────────────────────────────┘
九、基于此框架做 ChatGPT 式产品的开发指南
阶段 0:先把现成的跑起来(半天)
# 1. 启动 Redis(持久化 + 消息总线都靠它)
docker run --rm -p 6379:6379 redis:7
# 2. 启动后端服务(默认 8000 端口)
cd examples/agent_service
python main.py
# 3. 启动前端(新开一个终端)
cd examples/web_ui
pnpm install
pnpm dev
浏览器打开前端,Setup 页填后端地址 http://localhost:8000 和用户名,即可创建助手、开对话、看流式回复、刷新后历史还在。
阶段 1:必做的核心改造
| 要做的事 | 为什么 | 怎么做 |
|---|---|---|
| ① 替换用户认证 | 现在只靠 X-User-ID 标识用户,不安全 |
改 src/agentscope/app/deps.py 的 get_current_user_id,接入 JWT/OAuth |
| ② 配置大模型 + 凭证 | 让 AI 能回答 | 通过 /credential 接口存入 API Key,在创建会话时指定模型 |
| ③ 前端品牌化改造 | 变成你自己的产品 | 基于 examples/web_ui 改 UI/logo/配色,或复用 hooks 逻辑重做界面 |
阶段 2:生产持久化(按需)
- 默认 Redis 已能持久化对话(开启 RDB/AOF 即可),中小规模直接用。
- 如果需要长期归档/复杂查询,自己实现
StorageBase子类对接 PostgreSQL/MySQL。 - 推荐架构:消息总线继续用 Redis(实时推送/锁离不开它),长期数据落 PostgreSQL。
阶段 3:可选增强
- 限流/配额/审计/内容审核:写 Agent 中间件挂到推理链上
- 给 AI 加专属工具/知识库 (RAG):通过
extra_agent_tools给不同用户挂不同工具 - 历史消息分页:
/sessions/{id}/messages已支持offset/limit - AGUI 协议对接:挂载
AGUIProtocolMiddleware对接特定前端 SDK - 团队 Agent 模式:利用框架内置的 TeamCreate/TeamSay 工具
阶段 4:部署运维
- Redis 是必备依赖
- 框架原生支持多进程部署(Redis 分布式锁 + Pub/Sub)
- 用 Nginx/网关处理 SSE 长连接(注意关闭
text/event-stream缓冲) - 多 worker 用
uvicorn --workers N或 Gunicorn
推荐前端交互模式
- 登录后所有请求带
X-User-ID(或 JWT Token) - 侧边栏:
GET /agent/→ 选 Agent →GET /sessions/?agent_id= - 打开对话:
GET /messages+ 建立/streamSSE - 发消息:
POST /chat/,从 SSE 收事件渲染 - 新建对话:
POST /sessions/
扩展点速查
extra_agent_middlewares(user_id, agent_id, session_id):审计、配额、RAGextra_agent_tools(...):按用户挂载不同工具extra_middlewares:挂载 AGUI 协议中间件StorageBase子类:换 PostgreSQL 做长期持久化custom_subagent_templates:自定义团队子 Agent 类型
十、关键文件索引
核心库
| 模块 | 路径 |
|---|---|
| Agent 核心 | src/agentscope/agent/_agent.py |
| Agent 配置 | src/agentscope/agent/_config.py |
| 模型基类 | src/agentscope/model/_base.py |
| 模型响应 | src/agentscope/model/_model_response.py |
| OpenAI 模型 | src/agentscope/model/_openai_chat/_model.py |
| DashScope 模型 | src/agentscope/model/_dashscope/__init__.py |
| Anthropic 模型 | src/agentscope/model/_anthropic/_model.py |
| 消息定义 | src/agentscope/message/_base.py |
| 内容块定义 | src/agentscope/message/_block.py |
| 事件定义 | src/agentscope/event/_event.py |
| 中间件基类 | src/agentscope/middleware/_base.py |
| Tracing 中间件 | src/agentscope/middleware/_tracing/ |
| 工具箱 | src/agentscope/tool/_toolkit.py |
| 工具基类 | src/agentscope/tool/_base.py |
| 内置工具 | src/agentscope/tool/_builtin/ |
| Formatter 基类 | src/agentscope/formatter/_formatter_base.py |
| 状态定义 | src/agentscope/state/_state.py |
| 权限系统 | src/agentscope/permission/__init__.py |
| 工作区基类 | src/agentscope/workspace/_base.py |
| MCP 客户端 | src/agentscope/mcp/_mcp_client.py |
| 凭证定义 | src/agentscope/credential/__init__.py |
服务层
| 模块 | 路径 |
|---|---|
| App 工厂 | src/agentscope/app/_app.py |
| 生命周期 | src/agentscope/app/_lifespan.py |
| 依赖注入 | src/agentscope/app/deps.py |
| 扩展点类型 | src/agentscope/app/_types.py |
| Chat 服务(核心) | src/agentscope/app/_service/_chat.py |
| Session 服务 | src/agentscope/app/_service/_session.py |
| 工具集组装 | src/agentscope/app/_service/_toolkit.py |
| 模型实例化 | src/agentscope/app/_service/_model.py |
| 存储抽象 | src/agentscope/app/storage/_base.py |
| Redis 存储 | src/agentscope/app/storage/_redis_storage.py |
| Session 数据模型 | src/agentscope/app/storage/_model/_session.py |
| Agent 数据模型 | src/agentscope/app/storage/_model/_agent.py |
| 消息总线抽象 | src/agentscope/app/message_bus/_base.py |
| Redis 消息总线 | src/agentscope/app/message_bus/_redis_message_bus.py |
| Chat Run 注册表 | src/agentscope/app/_manager/_chat_run_registry.py |
| 唤醒调度器 | src/agentscope/app/_manager/_wakeup_dispatcher.py |
| 取消调度器 | src/agentscope/app/_manager/_cancel_dispatcher.py |
| 后台任务管理 | src/agentscope/app/_manager/_background_task_manager.py |
| 定时任务管理 | src/agentscope/app/_manager/_scheduler/_scheduler_manager.py |
| SSE 路由 | src/agentscope/app/_router/_session.py |
| Chat 路由 | src/agentscope/app/_router/_chat.py |
| Inbox 中间件 | src/agentscope/app/middleware/_inbox_middleware.py |
| 状态变更中间件 | src/agentscope/app/middleware/_state_change_middleware.py |
| 工具卸载中间件 | src/agentscope/app/middleware/_tool_offload_middleware.py |
| AGUI 协议 | src/agentscope/app/middleware/_protocol/_agui.py |
示例项目
| 模块 | 路径 |
|---|---|
| 后端启动入口 | examples/agent_service/main.py |
| 后端说明 | examples/agent_service/README.md |
| 前端 API 客户端 | examples/web_ui/frontend/src/api/client.ts |
| 前端会话 API | examples/web_ui/frontend/src/api/session.ts |
| 前端 Chat API | examples/web_ui/frontend/src/api/chat.ts |
| 前端类型定义 | examples/web_ui/frontend/src/api/types.ts |
| 前端消息核心 Hook | examples/web_ui/frontend/src/hooks/useMessages.ts |
| 前端会话 Hook | examples/web_ui/frontend/src/hooks/useSessions.ts |
| 前端 Chat 页面 | examples/web_ui/frontend/src/pages/chat/index.tsx |
| 前端 ChatViewport | examples/web_ui/frontend/src/pages/chat/ChatViewport.tsx |
| 前端 Setup 页 | examples/web_ui/frontend/src/pages/setup/index.tsx |
| 前端 package.json | examples/web_ui/frontend/package.json |
| 前端路由 | examples/web_ui/frontend/src/App.tsx |
附录:设计亮点总结
- 事件即协议:
AgentEvent是 agent 与前端之间的唯一通信契约,细粒度到 block 级别 delta - Msg 与 Event 双向映射:
append_event()可从事件流重建消息,保证持久化与实时流一致 - 工具状态机:
ToolCallState贯穿权限、确认、外部执行全生命周期 - 分层中间件:
on_acting只管 I/O,on_reply管全流程,职责清晰 - Formatter 解耦:新增模型只需实现
_call_api+ 对应 Formatter,Agent 层零改动 - 无独立 Memory 模块:通过 context + summary 压缩 + offloader 实现可扩展的记忆管理
- POST 触发 + SSE 订阅双通道:适配多来源事件(用户/定时/团队/后台唤醒)
- 分布式锁保证单 Session 串行:跨进程安全
- 状态可完整序列化恢复:对话中断后可从任意状态接续
- 多租户按 user_id 隔离:Redis key prefix + Storage API 第一参数
本报告基于 AgentScope v2.0.1(2026-06-05 发布)源代码分析生成。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)