AgentScope 2.0 框架分析

分析日期:2026-06-11
项目地址:https://github.com/agentscope-ai/agentscope
分析范围:src/agentscope 核心库 + src/agentscope/app 服务层 + examples/ 示例项目


在这里插入图片描述

目录


一、总体概览与定位

AgentScope 2.0 是一个生产级的 AI Agent(智能体)开发框架,核心定位是:

  • 地基(核心库 src/agentscope:负责"让一个 AI 学会思考、调用工具、记住对话"——这是单个 agent 的大脑。
  • 毛坯房(服务层 src/agentscope/app:负责"把 AI 助手变成一个能同时服务很多用户、能保存聊天记录、能多人在线的网络服务"。
  • 样板间(examples/:官方已经用这套地基盖好了一个几乎就是 ChatGPT 网页版的完整示例(后端 + React 前端)。

与 ChatGPT 的概念对应

ChatGPT 概念 AgentScope 对应
一个登录的用户 user_id(身份标识)
你创建的某个"自定义 GPT" Agent(一套助手配置:名字、人设、模型、工具)
左侧的一个个"对话" Session(一条对话线程)
对话里的一条条消息 Msg(消息对象)
打字时一个字一个字蹦出来 Event(流式事件,通过 SSE 推送)
设置里填的 API Key Credential(凭证)
模型选择 SessionConfig.chat_model_config
多进程部署 开箱即用(Redis lock + bus)

核心特性

  • Event System → 统一事件总线,驱动前端和人机交互
  • Permission System → 细粒度的工具/资源访问控制
  • Multi-tenancy & Multi-session Service → 生产级多租户多会话隔离
  • Workspace / Sandbox Support → 隔离的执行环境(本地/Docker/E2B)
  • Extensible Middleware System → 可组合的钩子,扩展 Agent 推理循环

二、整体目录结构

agentscope/
├── src/agentscope/          ← 框架本体
│   ├── 【核心库】单个 Agent 的大脑
│   │   agent/      模型推理-行动循环(ReAct)的核心
│   │   model/      对接各家大模型(通义/OpenAI/Claude…)
│   │   message/    统一的"消息"数据结构
│   │   event/      事件系统(流式输出的协议)
│   │   tool/       工具箱(让 AI 能执行命令、读写文件等)
│   │   formatter/  把统一消息翻译成各家模型要的格式
│   │   middleware/ 中间件(不改源码就能扩展行为)
│   │   permission/ 权限系统(工具调用要不要先问用户)
│   │   workspace/  沙箱(让 AI 在隔离环境里干活)
│   │   state/      运行状态(用于保存/恢复对话)
│   │   mcp/        MCP 协议客户端
│   │   skill/      技能加载
│   │   embedding/  文本向量化
│   │   credential/ 凭证管理
│   │
│   └── 【服务层】app/  把 Agent 变成多用户在线服务
│       _app.py         create_app() 工厂函数
│       _router/        HTTP 接口(7组路由)
│       _service/       业务逻辑层(ChatService 等)
│       storage/        持久化层(RedisStorage)
│       message_bus/    消息总线(RedisMessageBus)
│       _manager/       后台管家(定时/唤醒/取消/后台任务)
│       middleware/     服务层中间件
│       workspace_manager/ 工作区管理
│       deps.py         依赖注入
│       _lifespan.py    生命周期
│       _types.py       扩展点类型
│
├── examples/
│   ├── agent_service/   ← 后端服务示例(main.py)
│   └── web_ui/          ← React 前端 + 占位后端
│
├── tests/  docs/  scripts/
└── pyproject.toml  README.md

三、核心设计理念

理念 1:统一消息(Msg)+ 事件(Event)协议

框架里所有信息——用户的话、AI 的思考、工具调用、工具结果、图片音频——都装进统一的 Msg 对象,里面是一个个 ContentBlock。AI 在工作时不断发出细粒度小事件,前端订阅这些事件流实时渲染。

理念 2:ReAct 循环——让模型自己"边想边做"

核心在 agent/_agent.py_reply_impl。设计哲学:信任模型的推理和工具调用能力,少用死板的提示词约束

理念 3:洋葱式中间件——不改源码就能扩展

middleware/ 提供 6 个钩子点,遵循"开闭原则"。

理念 4:状态可序列化 = 对话能保存和恢复

AgentState 是 Pydantic 模型,可整体序列化存进数据库,下次恢复时塞回 Agent 继续对话。


四、核心运行时模块分析

4.1 Agent 模块

文件路径src/agentscope/agent/_agent.py

职责:编排推理-行动循环、上下文管理、权限检查、工具执行、事件发射。

核心类

类/函数 文件 说明
Agent agent/_agent.py 唯一 Agent 实现
ReActConfig agent/_config.py max_iters=20,控制循环上限
ContextConfig agent/_config.py 上下文压缩阈值、摘要模板
ModelConfig agent/_config.py 重试次数、fallback 模型
_ToolCallBatch agent/_utils.py 工具调用批次(sequential/concurrent)

关键方法签名

# agent/_agent.py
async def reply_stream(inputs: Msg | list[Msg] | UserConfirmResultEvent | ExternalExecutionResultEvent | None) -> AsyncGenerator[AgentEvent, None]
async def reply(inputs: ...) -> Msg  # 消费全部事件,返回最终 Msg
async def observe(msgs: Msg | list[Msg] | None) -> None
async def compress_context(context_config: ContextConfig | None = None) -> None

Agent 初始化参数

class Agent:
    def __init__(
        self,
        name: str,                          # Agent 标识名
        system_prompt: str,                 # 系统提示词
        model: ChatModelBase,               # 大模型实例
        toolkit: Toolkit | None = None,     # 工具箱
        middlewares: list[MiddlewareBase] | None = None,  # 中间件链
        state: AgentState | None = None,    # 可恢复的运行时状态
        offloader: Offloader | None = None, # 上下文 offloader
        model_config: ModelConfig = ModelConfig(),
        context_config: ContextConfig = ContextConfig(),
        react_config: ReActConfig = ReActConfig(),
    )

ReAct 循环实现_reply_impl,约 542–699 行):

while cur_iter < max_iters:
    action, data = _check_next_action()
    if action == "exit": yield data; return
    if action == "reasoning":
        await compress_context()
        async for evt in _reasoning(): ...
        if evt is Msg (无 tool_call): yield ReplyEndEvent; return
    for batch in _batch_tool_calls():
        执行 sequential 或 concurrent 工具
        若 RequireUserConfirmEvent / RequireExternalExecutionEvent: 暂停等待外部事件
    cur_iter += 1

_check_next_action 决策表(2267–2361 行):

可执行工具 等待中工具 下一步
任意 acting
exit(等待用户确认/外部执行)
reasoning

reply_stream 工作机制(191–214 行):

  • 调用内部 _reply(),它同时产出 AgentEvent 和最终 Msg
  • reply_stream 只 yield 非 Msg 的事件(过滤掉最终消息)
  • reply() 则遍历全部产出,捕获最后一个 Msg 作为返回值

推理阶段_reasoning_impl,755–880 行):

  1. yield ModelCallStartEvent
  2. kwargs = await _prepare_model_input() → system prompt + summary + context + tool schemas
  3. res = await _call_model(**kwargs) → 可能返回 ChatResponseAsyncGenerator[ChatResponse]
  4. 流式 chunk 经 _convert_chat_response_to_event() 转为 TextBlockDeltaEventToolCallDeltaEvent
  5. yield ModelCallEndEvent(含 token 用量)
  6. _save_to_context() 写入 assistant 消息
  7. 若无 ToolCallBlockyield AssistantMsg(本轮结束)

工具执行阶段_execute_tool_call,1293–1542 行):

  1. 校验工具可用性 + JSON Schema 校验输入
  2. PermissionEngine.check_permission() 决策:ASK / DENY / ALLOW
  3. ASK → RequireUserConfirmEvent,状态变为 ASKING
  4. ALLOW + 外部工具 → RequireExternalExecutionEvent,状态变为 SUBMITTED
  5. ALLOW + 本地工具 → _acting()toolkit.call_tool() → 流式 ToolChunk → 截断/offload → _save_to_context()

上下文压缩_compress_context_impl,300–491 行):

  • 当 token 超过 trigger_ratio * context_size 时触发
  • model.generate_structured_output() 生成结构化摘要
  • 旧消息 offload 到 workspace,保留最近 reserve_ratio 比例的消息

并发策略_batch_tool_calls,1102–1139 行):

  • is_concurrency_safe=True 的工具可并发执行
  • 非并发安全工具(如 Bash、Write)串行执行
  • 并发执行用 asyncio.gather + Queue 收集事件

4.2 Model 模块

文件路径src/agentscope/model/

职责:统一 LLM 调用接口,屏蔽 DashScope / OpenAI / Anthropic / Gemini / Ollama 等差异。

核心类

文件 说明
ChatModelBase model/_base.py 抽象基类
ChatResponse model/_model_response.py 模型响应(含 is_last 流式标志)
ChatUsage model/_model_usage.py token 用量
StructuredResponse model/_model_response.py 结构化输出(用于压缩摘要)
ModelCard model/_model_card.py 从 YAML 加载模型卡片

具体实现model/__init__.py 导出):

  • AnthropicChatModel
  • DashScopeChatModel
  • DeepSeekChatModel
  • GeminiChatModel
  • OllamaChatModel
  • OpenAIChatModel
  • OpenAIResponseModel
  • XAIChatModel
  • MoonshotChatModel

统一接口ChatModelBase):

async def __call__(messages: list[Msg], tools: list[dict] | None, tool_choice: ToolChoice | None, **kwargs) -> ChatResponse | AsyncGenerator[ChatResponse, None]
async def count_tokens(messages: list[Msg], tools: list[dict] | None) -> int  # 默认 bytes/4 估算
async def generate_structured_output(messages, structured_model) -> StructuredResponse
@abstractmethod
async def _call_api(model_name, messages, tools, tool_choice, **kwargs) -> ...

统一化设计

  1. 所有模型接收统一的 list[Msg],不直接对接厂商格式
  2. 每个模型持有**FormatterBase 实例**,在 _call_apiawait self.formatter.format(messages) 转换
  3. 流式响应解析为**ChatResponse chunks**(contentTextBlock | ToolCallBlock | ThinkingBlock | DataBlock 序列)
  4. 内置重试(max_retries + _get_retryable_exceptions())+ Agent 层 fallback 模型

以 OpenAI 为例model/_openai_chat/_model.py):

Msg[] → OpenAIChatFormatter.format() → OpenAI API messages[]
→ stream/non-stream → _parse_stream_response() → ChatResponse chunks

4.3 Message 模块

文件路径src/agentscope/message/

职责:Agent 间信息存储与传输的统一数据模型。

核心类message/_base.py, message/_block.py):

说明
Msg 消息主体:name, content: list[ContentBlock], role, id, usage
UserMsg() / AssistantMsg() / SystemMsg() 工厂函数
TextBlock 文本
ThinkingBlock 推理/思考内容
ToolCallBlock 工具调用(含 state 状态机)
ToolResultBlock 工具结果
DataBlock 多模态二进制(Base64Source / URLSource
HintBlock 循环中给 LLM 的提示(转为 user 消息)

ToolCallState 状态机message/_block.py,104–155 行):

pending → asking (需用户确认) → allowed → finished
pending → allowed → finished (本地执行)
allowed → submitted (外部工具) → finished (收到 ExternalExecutionResultEvent)

Msg.append_event()message/_base.py,211–460 行):

  • 将流式 AgentEvent 增量重建为完整 Msg.content
  • 用于 ChatService 持久化:边收事件边拼装 reply 消息
  • 支持 text/thinking/tool_call/tool_result/data 等全部事件类型

角色约束

  • user:仅 text + data
  • system:仅 text
  • assistant:所有 block 类型

4.4 Event 模块

文件路径src/agentscope/event/_event.py

职责:定义 Agent 执行过程中产出的所有事件类型,供流式传输和前端渲染。

EventType 枚举(20–60 行):

类别 事件类型
回复生命周期 REPLY_START, REPLY_END, EXCEED_MAX_ITERS
模型调用 MODEL_CALL_START, MODEL_CALL_END
文本流 TEXT_BLOCK_START/DELTA/END
思考流 THINKING_BLOCK_START/DELTA/END
多模态流 DATA_BLOCK_START/DELTA/END
工具调用流 TOOL_CALL_START/DELTA/END
工具结果流 TOOL_RESULT_START, TOOL_RESULT_TEXT_DELTA, TOOL_RESULT_DATA_DELTA, TOOL_RESULT_END
人机交互 REQUIRE_USER_CONFIRM, REQUIRE_EXTERNAL_EXECUTION, USER_CONFIRM_RESULT, EXTERNAL_EXECUTION_RESULT
扩展 HINT_BLOCK, CUSTOM

事件总线如何驱动前端

Agent.reply_stream()
    → ChatService.run()  (app/_service/_chat.py)
        → message_bus.session_publish_event(session_id, event.model_dump())
            → Redis Stream (replay log) + Pub/Sub (live fan-out)
    → 前端 GET /sessions/{sid}/stream (SSE)
        → session_read_events() 回放历史
        → session_subscribe_events() 实时订阅
        → data: {JSON event}\n\n

前端通过 reply_msg.append_event(event) 可将事件流重建为完整消息用于持久化。


4.5 Middleware 模块

文件路径src/agentscope/middleware/_base.py

职责:在不修改 Agent 源码的前提下扩展推理循环。

核心类MiddlewareBase

6 个钩子点

钩子 模式 拦截点
on_reply 洋葱 整个 reply 流程
on_reasoning 洋葱 推理/模型调用阶段
on_acting 洋葱 toolkit.call_tool 执行
on_model_call 洋葱 原始 LLM API 调用
on_compress_context 洋葱 上下文压缩
on_system_prompt 管道(顺序变换) system prompt 字符串

洋葱链实现(以 _reply 为例,506–540 行):

async def execute_chain(index=0):
    if index >= len(middlewares):
        async for item in self._reply_impl(inputs): yield item
    else:
        async def next_handler(**kwargs):
            async for item in execute_chain(index+1, **kwargs): yield item
        async for item in mw.on_reply(agent, input_kwargs, next_handler): yield item

内置中间件TracingMiddlewaremiddleware/_tracing/

应用层中间件app/middleware/,服务部署时使用):

  • InboxMiddleware:团队消息注入
  • StateChangeMiddleware:状态变更发 CustomEvent
  • ToolOffloadMiddleware:长任务后台执行

设计要点on_acting 只包裹工具 I/O,不含权限检查和 context 写入,便于安全地后台 offload。


4.6 Tool 模块

文件路径src/agentscope/tool/

职责:工具注册、分组管理、Schema 生成、统一流式执行。

核心类

文件 说明
Toolkit tool/_toolkit.py 工具注册中心
ToolBase tool/_base.py 工具抽象协议
ToolGroup tool/_tool_group.py 工具分组(可动态激活/停用)
ToolChunk / ToolResponse tool/_response.py 流式/完整工具结果
ToolChoice tool/_types.py 模型工具选择策略
FunctionTool / MCPTool tool/_adapters.py 函数/MCP 适配器

关键方法

# Toolkit
async def get_tool_schemas(groups: list[str] | None) -> list[dict]
async def call_tool(tool_call: ToolCallBlock, state: AgentState) -> AsyncGenerator[ToolChunk | ToolResponse, None]
async def check_tool_available(tool_name, activated_groups) -> ToolBase
async def get_skill_instructions(activated_groups) -> str | None

工具调用流程

ToolCallBlock → call_tool()
    → 检查工具组是否激活
    → JSON 解析 input + state 注入 (_agent_state)
    → 执行 tool.__call__() → yield ToolChunk(s) → 最终 ToolResponse

内置工具tool/_builtin/):

工具 特性
Read 只读、并发安全、is_state_injected=True(文件缓存)
Write 写文件
Edit 编辑文件
Bash 执行 shell,非并发安全,有命令解析器
Grep 内容搜索
Glob 文件搜索
ResetTools 元工具:激活/停用工具组
SkillViewer 元工具:查看 skill 指令

还有任务管理工具:TaskCreate/Update/Get/Listtool/_task/


4.7 Formatter 模块

文件路径src/agentscope/formatter/

职责:将统一的 list[Msg] 转换为各 LLM 厂商 API 所需的消息格式。

核心类FormatterBaseformatter/_formatter_base.py

@abstractmethod
async def format(self, *args, **kwargs) -> list[dict[str, Any]]

每个 Provider 一对 Formatter

Formatter 对应模型
OpenAIChatFormatter OpenAI Chat Completions
OpenAIResponseFormatter OpenAI Responses API
DashScopeChatFormatter 通义千问
AnthropicChatFormatter Claude
GeminiChatFormatter Gemini
DeepSeekChatFormatter DeepSeek
MoonshotChatFormatter Moonshot
XAIChatFormatter xAI

还有 *MultiAgentFormatter 变体,处理多 Agent 对话历史(工具序列分组、消息折叠)。

关键能力

  1. _group_messages():将消息分为 tool_sequenceagent_message
  2. convert_tool_result_to_string():不支持多模态 tool result 的 API,将 DataBlock 提升为 user 消息或转文字描述
  3. input_types:控制哪些 media_typeDataBlock 可传给 API

Formatter 在模型初始化时注入,在 _call_api 第一步调用。


4.8 State 模块

文件路径src/agentscope/state/_state.py

职责:Agent 可持久化的全部运行时状态。

核心类

class AgentState(BaseModel):
    session_id: str
    summary: str | list[TextBlock | DataBlock]  # 压缩后的历史摘要
    context: list[Msg]                          # 未压缩的对话上下文
    reply_id: str                               # 当前 reply 的 ID
    cur_iter: int                               # ReAct 循环迭代计数
    permission_context: PermissionContext
    tool_context: ToolContext                   # 工具组激活状态 + Read 文件缓存
    tasks_context: TaskContext                  # 任务列表

序列化AgentState 是 Pydantic BaseModel,由 ChatService.run() 结束时 storage.update_session_state() 持久化。

ToolContext 管理:

  • activated_groups:当前激活的工具组
  • read_file_cache:Read 工具的 LRU 文件缓存

Taskstate/_task.py):任务看板条目,含 pending/in_progress/completed 状态和依赖关系。


4.9 Memory(无独立模块)

代码库中没有 src/agentscope/memory 目录。记忆机制内建于 Agent 状态:

机制 实现位置 说明
短期记忆 AgentState.context 完整对话历史(含 tool_call/result)
长期记忆 AgentState.summary 超 token 阈值时 LLM 生成的结构化摘要
外部记忆 Offloader / WorkspaceBase 压缩/offload 的内容存到文件系统
观察 Agent.observe() 外部消息写入 context,不触发 reply

五、生产级服务模块 app 分析

5.1 总体服务架构

AgentScope 的 app 层是一个 FastAPI 生产服务,核心设计是双后端分离

组件 职责 默认实现
Storage 持久化记录(Agent、Session、消息、凭证、定时任务、Team) RedisStorage(JSON + Redis Set/List)
MessageBus 实时协调(分布式锁、SSE 事件、Inbox、Wake-up、Cancel) RedisMessageBus(Stream + Pub/Sub + SET NX 锁)
WorkspaceManager 每会话沙箱(工具/MCP/Skill 执行环境) Local / Docker / E2B

入口工厂:create_app()src/agentscope/app/_app.py

app = create_app(
    storage=RedisStorage(),
    message_bus=RedisMessageBus(),
    workspace_manager=LocalWorkspaceManager(basedir="..."),
)

核心对话流程

前端 POST /chat/ 触发对话
    → ChatRunRegistry.spawn(ChatService.run(...))  # 后台任务
    → 前端 GET /sessions/{sid}/stream  # 长连接 SSE 收事件

ChatService.run:
    1. storage.get_session → 加载 AgentState
    2. 组装 Agent + Toolkit + Middleware
    3. message_bus.session_run(session_id)  # 分布式锁,全局单会话单 run
    4. agent.reply_stream → 事件 publish 到 bus
    5. storage.upsert_message + update_session_state  # 持久化

5.2 FastAPI 路由层

7 个 Router

5.2.1 /agent — Agent 配置 CRUD

文件:src/agentscope/app/_router/_agent.py

方法 路径 作用
GET /agent/schema 返回前端表单用的 JSON Schema(identity/context/react)
GET /agent/ 列出当前用户所有 Agent(source=user,不含 team worker)
POST /agent/ 创建 Agent(name、system_prompt、context_config、react_config)
PATCH /agent/{agent_id} 部分更新 Agent
DELETE /agent/{agent_id} 删除 Agent,级联删所有 Session + Schedule + 取消运行中任务
5.2.2 /sessions — 会话生命周期 + 消息 + SSE 流

文件:src/agentscope/app/_router/_session.py

方法 路径 作用
GET /sessions/?agent_id= 列出某 Agent 下所有 Session,附带 is_running、Team 详情
POST /sessions/ 创建 Session(绑定 workspace、model config)
PATCH /sessions/{session_id}?agent_id= 更新 Session(模型、权限模式等)
DELETE /sessions/{session_id}?agent_id= 删除 Session(跨进程 cancel + 清 bus)
GET /sessions/{session_id}/messages?agent_id= 分页读持久化消息历史
GET /sessions/{session_id}/stream?agent_id= SSE 长连接:replay 当前 run 事件 + 实时订阅

SSE 是 ChatGPT 式体验的关键POST /chat 只返回 {status: "started"},所有流式输出走 /stream

5.2.3 /chat — 触发对话(Fire-and-Forget)

文件:src/agentscope/app/_router/_chat.py

方法 路径 作用
POST /chat/ 触发一次 chat run,立即返回;409 若同 session 本进程已有 run

input 支持四种形态:

  • Msg / list[Msg]:新用户消息
  • UserConfirmResultEvent / ExternalExecutionResultEvent:人机确认/外部执行恢复
  • None:从当前状态继续(wake-up 场景)
5.2.4 /credential — API 凭证管理

文件:src/agentscope/app/_router/_credential.py

方法 路径 作用
GET /credential/schemas 所有凭证类型的 JSON Schema(动态表单)
GET /credential/ 列出用户凭证
POST /credential/ 创建凭证
PATCH /credential/{credential_id} 更新凭证
DELETE /credential/{credential_id} 删除凭证
5.2.5 /model — 模型列表

文件:src/agentscope/app/_router/_model.py

方法 路径 作用
GET /model/?provider= 按 provider 类型列出可用模型
5.2.6 /schedule — 定时任务 CRUD

文件:src/agentscope/app/_router/_schedule.py

方法 路径 作用
GET /schedule/ 列出用户所有 cron 任务
POST /schedule/ 创建并注册 APScheduler job
PATCH /schedule/{schedule_id} 更新(cron/enable 等),自动 reschedule
DELETE /schedule/{schedule_id} 删除 + 级联删执行 Session
GET /schedule/{schedule_id}/sessions 查看该 schedule 触发的所有 Session
5.2.7 /workspace — MCP + Skill 管理

文件:src/agentscope/app/_router/_workspace.py

方法 路径 作用
GET /workspace/mcp?agent_id=&session_id= 列出 MCP 客户端 + 健康状态 + tools
POST /workspace/mcp?... 添加 MCP
DELETE /workspace/mcp/{mcp_name}?... 移除 MCP
GET /workspace/skill?... 列出 Skill
POST /workspace/skill?... 添加 Skill
DELETE /workspace/skill/{skill_name}?... 移除 Skill

5.3 业务服务层

5.3.1 ChatService — 核心对话引擎

文件:src/agentscope/app/_service/_chat.py

唯一对话执行入口,HTTP /chatWakeupDispatcher 都调用 ChatService.run()

执行步骤:

  1. 加载get_agent + get_session + get_workspace
  2. 组装 Toolkitget_toolkit()(workspace 工具 + 规划 + 定时 + Team + MCP/Skill)
  3. 组装 MiddlewareInboxMiddlewareStateChangeMiddlewareToolOffloadMiddleware + 用户自定义
  4. 组装 Model:从 Session 的 chat_model_config + Credential 构建
  5. 创建 Agentstate=session_record.state从存储恢复状态
  6. 分布式锁内运行
    • Case A:新消息 → upsert_messagereply_stream → 事件 publish
    • Case B:续接工具调用 → 从 storage 取 reply Msg → 追加 event
  7. 持久化upsert_message(reply_msg) + update_session_state(agent.state)

关键设计update_session_state 必须在 session_run 锁内完成,防止多进程竞态。

5.3.2 SessionService — 跨资源生命周期

文件:src/agentscope/app/_service/_session.py

唯一同时操作 Storage + MessageBus 的组件

级联删除链:

delete_session → cancel(bus) → storage.delete_session → bus.session_purge
delete_agent   → 对每个 session 调 delete_session → storage.delete_agent
delete_schedule → 对每个 spawned session 调 delete_session
delete_team    → 对每个 worker agent 调 delete_agent
5.3.3 get_toolkit() — 工具集组装

文件:src/agentscope/app/_service/_toolkit.py

按顺序挂载:Workspace 工具 → TaskCreate/List/Get/Update → TaskStop → Schedule* → Team* → 用户 extra tools → MCP/Skill。

5.3.4 get_model() — 模型实例化

文件:src/agentscope/app/_service/_model.py

CredentialRecord + ChatModelConfig 构建 ChatModelBase


5.4 持久化层 Storage

技术选型
  • 不是传统 ORM/SQL,而是抽象接口 StorageBase + Pydantic 模型 + JSON 序列化
  • 当前唯一完整实现:RedisStorageredis.asyncio
  • 设计上预留了换 SQL 的可能(注释明确 storage 与 bus 可分离)
数据模型(storage/_model/
模型 文件 核心字段
_RecordBase _base.py id, created_at, updated_at
AgentRecord _agent.py user_id, source(user/team), data(AgentData)
SessionRecord _session.py user_id, agent_id, config, state: AgentState, team_id, source
SessionConfig _session.py workspace_id, name, chat_model_config, fallback_chat_model_config
CredentialRecord _credential.py user_id, data(dict)
ScheduleRecord _schedule.py user_id, agent_id, data(ScheduleData)
TeamRecord _team.py user_id, session_id(leader), data(TeamData)
UserRecord _user.py 仅占位,当前未使用
Redis Key 设计(多用户隔离的核心)

文件:src/agentscope/app/storage/_redis_storage.py

所有 key 以 agentscope:user:{user_id}: 为前缀:

agentscope:user:{user_id}:agent:{agent_id}           # Agent 记录
agentscope:user:{user_id}:session:{session_id}       # Session 记录(含 AgentState JSON)
agentscope:user:{user_id}:session:{session_id}:messages  # 消息 List
agentscope:user:{user_id}:agent:{agent_id}:sessions  # Session 索引 Set
agentscope:user:{user_id}:credential:{credential_id}
agentscope:user:{user_id}:schedule:{schedule_id}
agentscope:user:{user_id}:team:{team_id}
agentscope:schedules                                 # 全局 schedule 索引(启动恢复用)

可选 key_ttl 滑动过期。

会话状态序列化与恢复

保存SessionRecord.model_dump_json() 整体写入 Redis String;热路径用 update_session_state() 只更新 state 字段。

恢复ChatService._run_impl 加载 session_record.state → 传给 Agent(state=agent_state) → Agent 从该 state 继续推理。

消息双轨存储

  • AgentState.context:Agent 运行时 LLM 上下文(含 tool call 状态、HintBlock 等)
  • storage.list_messages:面向 UI 的持久化消息列表(Msg 对象,Redis List)

每次 run 结束:upsert_message(reply_msg) + update_session_state(agent.state) 同步更新。


5.5 Manager 层

ChatRunRegistry — 本进程任务注册表

文件:src/agentscope/app/_manager/_chat_run_registry.py

  • 维护 session_id → asyncio.Task 映射
  • 同 session 本进程只能有一个 run(与 bus 分布式锁配合)
  • 应用 shutdown 时 cancel 所有 in-flight run
WakeupDispatcher — 跨会话/跨进程唤醒

文件:src/agentscope/app/_manager/_wakeup_dispatcher.py

  • 每进程一个 asyncio 任务,订阅 agentscope:wakeup_signal
  • 收到信号 → dequeue_wakeups() → 对 idle session 调用 ChatService.run(input_msg=None)
  • 触发源:TeamSay、Schedule 触发、后台工具完成、AgentCreate 等
CancelDispatcher — 跨进程取消

文件:src/agentscope/app/_manager/_cancel_dispatcher.py

  • 订阅全局 cancel 频道 agentscope:session:cancel
  • 收到 session_id → 本进程 cancel 对应 chat run + BG tasks
BackgroundTaskManager — 长耗时工具后台化

文件:src/agentscope/app/_manager/_background_task_manager.py

  • 注册后台 asyncio 任务,提供 TaskStop 工具
  • 完成时通过 inbox + wakeup 投递结果(跨进程安全)
SchedulerManager — Cron 定时任务

文件:src/agentscope/app/_manager/_scheduler/_scheduler_manager.py

  • APScheduler 管理 cron job
  • 触发时不直接调 ChatService,而是 inbox_push(HintBlock) + enqueue_wakeup
  • 启动时从 storage.list_all_schedules() 恢复所有 enabled schedule
  • stateful 模式复用固定 session_id {schedule_id}_stateful

5.6 消息总线 MessageBus

抽象与 Redis 实现
  • 基类:src/agentscope/app/message_bus/_base.py
  • 实现:src/agentscope/app/message_bus/_redis_message_bus.py

五种模式:

模式 Redis 原语 用途
A Drain Queue Stream + XDEL Inbox、Wake-up 队列
C Replay Log Stream (只读) Session 事件 replay
D Broadcast Pub/Sub 实时 SSE、Cancel、Wake-up 信号
E Distributed Lock SET NX + heartbeat 每 session 单 run
多会话/多进程如何工作

单 Session 串行化

session_run(session_id) → acquire_lock("agentscope:session:lock:{sid}")

任意进程、任意时刻,同一 session 只有一个 chat run。

SSE 多订阅者

session_publish_event → log_append(持久) + publish(实时)
session_subscribe_events → Pub/Sub 订阅
session_read_events → 新连接先 replay 当前 run 缓冲

跨 Session 通信(Team)

TeamSay → inbox_push(target_session) + enqueue_wakeup
WakeupDispatcher(任意进程) → ChatService.run(input=None)
InboxMiddleware → inbox_drain → 注入 HintBlock 到 context

跨进程 Cancel

SessionService.cancel_session_run → session_publish_cancel
CancelDispatcher(各进程) → 本地 cancel task

5.7 服务层中间件

Agent 中间件(挂载到 Agent 推理链)
中间件 文件 作用
InboxMiddleware app/middleware/_inbox_middleware.py 每步 reasoning 前 drain inbox,注入 HintBlock
StateChangeMiddleware app/middleware/_state_change_middleware.py tool 执行后检测 state/team 变化,push CustomEvent 到 SSE
ToolOffloadMiddleware app/middleware/_tool_offload_middleware.py 工具超时 → 后台执行 → 完成时 inbox+wakeup
协议中间件(ASGI 层)
中间件 文件 作用
ProtocolMiddlewareBase app/middleware/_protocol/_base.py 拦截 StreamingResponse,将 AgentEvent 转为目标协议
AGUIProtocolMiddleware app/middleware/_protocol/_agui.py 转为 AG-UI 协议(RunStarted/Finished、TextMessage、ToolCall 等)

通过 create_app(..., extra_middlewares=[...]) 挂载,不是默认启用


5.8 生命周期与依赖注入

_lifespan.py — 启动/关闭顺序

文件:src/agentscope/app/_lifespan.py

AsyncExitStack 管理(逆序关闭):
  1. storage.__aenter__
  2. message_bus.__aenter__
  3. workspace_manager.__aenter__
  4. BackgroundTaskManager
  5. ChatRunRegistry
  6. SchedulerManager(启动 APScheduler + 恢复 schedules)
  7. ChatService / SessionService(无 lifecycle,直接构造)
  8. WakeupDispatcher
  9. CancelDispatcher
deps.py — FastAPI Depends

文件:src/agentscope/app/deps.py

依赖函数 来源
get_current_user_id X-User-ID Header(临时方案,注释说将来换 JWT)
get_storage request.app.state.storage
get_message_bus request.app.state.message_bus
get_chat_service request.app.state.chat_service
get_session_service request.app.state.session_service
get_chat_run_registry request.app.state.chat_run_registry
get_scheduler_manager request.app.state.scheduler_manager
get_background_task_manager request.app.state.background_task_manager
get_workspace_manager request.app.state.workspace_manager
get_extra_agent_middlewares/tools 用户工厂函数
_types.py — 扩展点
  • AgentMiddlewareFactory: (user_id, agent_id, session_id) → list[MiddlewareBase]
  • AgentToolFactory: (user_id, agent_id, session_id) → list[ToolBase]
  • SubAgentTemplate: Team 子 Agent 蓝图

5.9 多租户/多会话隔离机制

「多租户」= user_id,不是 tenant_id

框架没有显式 tenant 概念,隔离单元是 user_id

  • 所有 Storage API 第一个参数都是 user_id
  • Redis key 全部带 user_id 前缀
  • 路由层通过 Depends(get_current_user_id) 注入,目前来自 X-User-ID HTTP Header
  • UserRecord 模型存在但未接入(占位)

要做真正的多租户 SaaS,你需要:

  1. get_current_user_id 换成 JWT/OAuth 解析
  2. 可选:在 extra_agent_middlewares 里注入租户级隔离
  3. 可选:WorkspaceManager 用 user_id 分目录
多会话隔离

数据模型层级:

User (user_id)
  └── Agent (agent_id)          # 一个「助手」配置
        └── Session (session_id)  # 一次对话线程
              ├── SessionConfig   # workspace、model、name
              ├── AgentState      # 运行时状态(context、tasks、permissions)
              ├── Messages[]      # UI 消息历史
              └── Workspace       # 独立沙箱(tools/MCP/skills)

隔离手段:

  • Storage(user_id, agent_id, session_id) 三元组校验所有权
  • MessageBus:lock/events/inbox 均按 session_id 隔离(lock key 不含 user_id,但 session_id 全局唯一 UUID)
  • 运行时session_run 分布式锁保证同 session 全局单 run
  • SSE:每个 session 独立 stream 端点
状态持久化与恢复完整流程

创建会话

POST /sessions/ { agent_id, workspace_id, chat_model_config }
  → storage.upsert_session(user_id, agent_id, config, state=AgentState())
  → 返回 session_id

发送消息

POST /chat/ { agent_id, session_id, input: Msg }
  → ChatService.run:
      session_record = storage.get_session(...)     # 加载 AgentState
      agent = Agent(state=session_record.state)     # 恢复
      async with bus.session_run(session_id):       # 加锁
          storage.upsert_message(user_msg)          # 存用户消息
          async for event in agent.reply_stream():  # 推理
              bus.session_publish_event(...)       # SSE 推送
          storage.upsert_message(reply_msg)         # 存助手回复
          storage.update_session_state(agent.state) # 存完整 AgentState

刷新页面恢复

GET /sessions/?agent_id=          → 会话列表 + state + is_running
GET /sessions/{sid}/messages      → 历史消息(ChatGPT 消息列表)
GET /sessions/{sid}/stream        → 重连 SSE(replay + live)
POST /chat/ { input: null }        → 若有 pending inbox/tool,继续推理

AgentState.context vs messages 的区别

  • messages:面向用户的完整 Msg(含 tool call 结果渲染)
  • AgentState.context:Agent 内部 LLM 上下文(可能含 summary 压缩、HintBlock、tool 中间态)
  • 两者在每次 run 结束时同步更新,但语义不同——做 ChatGPT 克隆时 UI 主要读 messages

六、示例项目分析

6.1 agent_service 后端服务

文件examples/agent_service/main.py

启动入口做了什么
步骤 内容
1. 配置 MCP 工具 默认挂载 browser-use(Playwright MCP);若环境变量 AMAP_API_KEY 存在,追加高德地图 MCP
2. 调用 create_app(...) 注入存储、消息总线、工作区管理器、子 Agent 模板、CORS 中间件
3. uvicorn.run(...) 监听 0.0.0.0:8000reload=True 热重载
app = create_app(
    storage=RedisStorage(host="localhost", port=6379),
    message_bus=RedisMessageBus(host="localhost", port=6379),
    workspace_manager=LocalWorkspaceManager(
        basedir=.../workspaces,
        default_mcps=default_mcps,
    ),
    custom_subagent_templates=[SubAgentTemplate(type="explorer", ...)],
    extra_middlewares=[Middleware(CORSMiddleware, allow_origins=["*"], ...)],
)

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
create_app 如何构建 FastAPI

文件:src/agentscope/app/_app.py

核心职责:

  1. 创建 FastAPI 实例,绑定 lifespan 生命周期管理
  2. storagemessage_busworkspace_manager 等挂到 app.state
  3. 自动注册 7 组内置路由
  4. 添加用户自定义中间件(示例里加了 CORS)
存储 / 消息总线 / 工作区 / 工具配置
组件 示例配置 作用
RedisStorage localhost:6379 持久化 Agent、Session、Message、Credential、Team 等,实现多租户隔离
RedisMessageBus localhost:6379 会话级分布式锁、事件 replay 日志、SSE 实时推送、跨进程 inbox
LocalWorkspaceManager workspaces/ 目录 每个 session 独立工作目录 + 默认 MCP
MCP 工具 Playwright + 可选高德 通过 workspace 注入 agent toolkit
SubAgentTemplate explorer 只读探索型子 Agent 团队模式下 AgentCreate 可选用

Model 不在 main.py 里硬编码,而是每个 Session 的 chat_model_config 里配置(含 credential_id + model),运行时由 ChatService 从 storage 解析凭证并实例化模型。


6.2 web_ui 前端

技术栈
类别 技术
框架 React 19 + TypeScript
构建 Vite 8
路由 react-router-dom 7
样式 Tailwind CSS 4 + shadcn/ui(Radix UI)
消息协议 @agentscope-ai/agentscope npm 包(Msg / AgentEvent / appendEvent)
Markdown react-markdown + remark-gfm
国际化 i18next
动画/引导 framer-motion、onborda
通知 sonner
首次连接配置

文件:examples/web_ui/frontend/src/pages/setup/index.tsx

用户填写:

  • Server URLlocalStorage.server_url(如 http://localhost:8000
  • UsernamelocalStorage.username(作为 X-User-ID

App.tsx 检测无 server_url 时强制显示 Setup 页。

API 客户端层

client.ts — 统一 HTTP 封装

文件:examples/web_ui/frontend/src/api/client.ts

  • getBaseUrl() / getUserId() 从 localStorage 读取
  • 每个请求自动带 X-User-ID header
  • 提供 get/post/patch/delete + stream()(返回原始 Response,供 SSE 解析)
  • 错误统一 toast + 抛出 ApiError

主要 API 模块

文件 端点 用途
agent.ts /agent/ Agent CRUD
session.ts /sessions/ 会话 CRUD、历史消息、SSE 流
chat.ts POST /chat/ 触发对话(不返回流)
credential.ts /credential/ API Key 等凭证
workspace.ts /workspace/ MCP / Skill
model.ts /model/ 模型列表
types.ts 与后端 Pydantic schema 对齐的 TS 类型
核心 Hooks
useSessions(agentId) — 会话列表

文件:examples/web_ui/frontend/src/hooks/useSessions.ts

  • GET /sessions/?agent_id=xxx 拉取 SessionView[]
  • 每个 SessionView 包含:session 记录 + is_running + 可选 team 详情
  • 提供 create/update/remove/refetch
useMessages(agentId, sessionId) — 消息 + 流式(核心 Hook)

文件:examples/web_ui/frontend/src/hooks/useMessages.ts

双通道设计

通道 API 用途
历史 GET /sessions/{sid}/messages 加载已持久化的完整 Msg[]
实时 GET /sessions/{sid}/stream (SSE) 长连接接收 AgentEvent 增量

生命周期(useEffect):

  1. 清空本地状态
  2. 拉历史消息 → 写入 msgsRef
  3. 打开 SSE 长连接,for await 消费事件
  4. 组件卸载时 AbortController.abort()

事件处理

  • REPLY_START → 新建 AssistantMsgstreaming=true
  • DATA_BLOCK_DELTA 等 → appendEvent() 增量拼接到当前 reply
  • REPLY_ENDstreaming=false
  • CUSTOM(team_updated/state_updated) → 回调通知 UI 刷新

发送消息 send(content)

  1. 乐观更新:本地 append UserMsg
  2. POST /chat/ 触发后端(fire-and-forget)
  3. 回复事件从已打开的 SSE 流入,不是从 POST 响应来
页面结构
App.tsx
├── SetupPage(无 server_url 时)
└── RouterProvider
    ├── /setup
    └── AppLayout(左侧图标导航)
        ├── /chat/:agentId?/:sessionId?/:memberId?  → ChatPage
        ├── /schedule  → 定时任务
        └── /credential → 凭证管理

ChatPagepages/chat/index.tsx)职责拆分:

组件 职责
ChatPage 外层 URL 驱动选中 agent/session;左侧 sidebar 显示 agent 下拉 + 会话列表
TeamSidebar 团队模式下切换 member
ChatViewport 右侧主面板:模型选择、权限模式、消息区、工作区 drawer

URL 即状态源:/chat/{agentId}/{sessionId}/{memberId?},支持浏览器前进后退、刷新保持状态。


6.3 完整链路:用户发消息到流式渲染

时序图
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 A:进入对话页                                                     │
│ ① useMessages → GET /sessions/{sid}/messages → 获取历史消息           │
│ ② useMessages → GET /sessions/{sid}/stream → 打开 SSE 长连接         │
│    (先 replay 当前 run 缓冲事件,再订阅 live 事件)                     │
└──────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 B:用户输入并发送                                                  │
│ ① TextInput → onSend(contentBlocks)                                  │
│ ② 乐观更新:本地 append UserMsg                                       │
│ ③ POST /chat/ {agent_id, session_id, input: UserMsg}                 │
│    → 后端 ChatRunRegistry.spawn(ChatService.run)                      │
│    → 立即返回 {status: "started"}(不等 agent 跑完)                   │
└──────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 C:后端 Agent 执行                                                │
│ ① 加载 agent/session/workspace 记录                                   │
│ ② get_toolkit() 组装工具 + get_model() 实例化 LLM                     │
│ ③ 组装 Agent(...) + 挂载 middleware(Inbox/StateChange/ToolOffload)   │
│ ④ 在 message_bus.session_run(session_id) 分布式锁内:                  │
│    - 持久化用户消息                                                     │
│    - async for event in agent.reply_stream(inputs=...):               │
│        每个 event → message_bus.session_publish_event(session_id)      │
│    - 存完整 assistant 消息 + 更新 agent state                          │
└──────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 D:前端流式渲染                                                    │
│ ① SSE 收到 data: {...} → JSON.parse 为 AgentEvent                    │
│ ② processEvent(event):                                                │
│    - REPLY_START → 新建 assistant 消息气泡                              │
│    - TEXT_BLOCK_DELTA → appendEvent 增量更新 content(打字效果)         │
│    - TOOL_CALL → MessageBubble 显示工具调用 UI                         │
│    - REPLY_END → streaming=false                                      │
│ ③ requestAnimationFrame 批量 setMsgs(避免每 token 重渲染)             │
│ ④ MessageBubble 用 react-markdown 渲染文本,专用 renderer 渲染工具      │
└──────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌──────────────────────────────────────────────────────────────────────┐
│ 阶段 E:Human-in-the-loop(工具确认,可选)                              │
│ 用户点确认/拒绝 → onUserConfirm()                                      │
│ → 再次 POST /chat/,input 为 UserConfirmResultEvent                   │
│ → 后端 Case B 续跑 agent;事件仍走同一 SSE 连接                         │
└──────────────────────────────────────────────────────────────────────┘
关键设计要点
  1. 「POST 触发 + SSE 订阅」双通道:POST 只负责"点火",SSE 长连接负责"看火"。好处是同一 session 可接收多种来源(用户消息、定时任务、团队成员、后台唤醒)的事件。

  2. SSE 连接跨多次 POST /chat/ 保持打开,不需要每次发消息重连。

  3. 实时 vs 历史的边界

时机 数据来源
首次进入 / 切换 session HTTP 拉历史
正在对话 / agent 运行中 SSE 增量事件
对话结束后 后端已 upsert 完整 Msg;下次进入从历史 API 读到
  1. 官方 npm 包 @agentscope-ai/agentscope:前端与后端共用同一套 Msg / AgentEvent 协议,appendEvent() 在前端复现后端的增量拼接逻辑,保证渲染与持久化一致。

七、一次 agent.reply 的完整数据流

以**用户发送一条新消息、模型回复文本(无工具调用)**为例:

┌─────────────────────────────────────────────────────────────────┐
│ 1. 入口                                                          │
│    agent.reply(UserMsg("user", "你好"))                          │
│    → _reply() [middleware 链] → _reply_impl()                    │
└──────────────────────────┬──────────────────────────────────────┘
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│ 2. 输入处理                                                      │
│    _check_incoming_event(None) → False(非续接)                  │
│    _handle_incoming_messages(msgs) → state.context.append(msg)   │
│    state.reply_id = uuid4(); state.cur_iter = 0                  │
│    yield ReplyStartEvent                                         │
└──────────────────────────┬──────────────────────────────────────┘
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│ 3. ReAct 循环 (iter=0)                                           │
│    _check_next_action() → ("reasoning", None)                   │
│    compress_context() → token 未超阈值,跳过                      │
└──────────────────────────┬──────────────────────────────────────┘
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│ 4. 推理 _reasoning_impl()                                        │
│    yield ModelCallStartEvent                                     │
│    _prepare_model_input():                                       │
│      messages = [SystemMsg, (summary?), ...context]              │
│      tools = toolkit.get_tool_schemas(activated_groups)          │
│    _call_model(messages, tools)                                  │
│      → model.__call__() [+ model_call middleware]                │
│      → formatter.format(messages) → 厂商 API 格式               │
│      → _call_api() → 流式 ChatResponse chunks                   │
│    每个 chunk → _convert_chat_response_to_event():               │
│      TextBlockDeltaEvent / ThinkingBlockDeltaEvent / ...         │
│    yield ModelCallEndEvent(input_tokens, output_tokens)          │
│    _save_to_context(blocks, usage) → context 追加 AssistantMsg   │
│    无 ToolCallBlock → yield AssistantMsg (最终回复)               │
└──────────────────────────┬──────────────────────────────────────┘
                           ▼
┌─────────────────────────────────────────────────────────────────┐
│ 5. 结束                                                          │
│    yield ReplyEndEvent                                           │
│    reply() 捕获最后的 AssistantMsg 并返回                         │
│    reply_stream() 则只 yield 上述所有 AgentEvent(不含 Msg)      │
└─────────────────────────────────────────────────────────────────┘

若模型返回 tool_call,循环继续

_reasoning → 产出 ToolCallBlock(不 yield 最终 Msg)
    → _batch_tool_calls() 分批
    → _execute_tool_call():
        校验 → 权限检查 → _acting() → toolkit.call_tool()
        → ToolResultStart/TextDelta/End 事件
        → _save_to_context(ToolResultBlock)
    → cur_iter += 1
    → _check_next_action() → "reasoning"(无更多待执行工具)
    → 再次 _reasoning(模型看到工具结果,继续推理)

若需用户确认

权限 ASK → RequireUserConfirmEvent → reply 暂停
用户确认 → agent.reply(UserConfirmResultEvent)
    → _handle_incoming_event() → 更新 tool_call state
    → 继续 acting / reasoning

八、模块协作关系图

                          ┌──────────────────────────────────────┐
                          │           Agent 层                    │
                          │  ┌────────────────────────────────┐  │
                          │  │  Agent (ReAct 循环)             │  │
                          │  │  ├── 持有 AgentState            │  │
                          │  │  ├── 持有 Toolkit               │  │
                          │  │  ├── 持有 ChatModelBase         │  │
                          │  │  └── 产出 AgentEvent            │  │
                          │  └────────────────────────────────┘  │
                          │       ▲        │          │          │
                          │       │        │          │          │
                          │  Middleware   Model       Tool       │
                          │  (拦截)      (推理)     (执行)       │
                          └──────────────────────────────────────┘
                                           │
                               ┌───────────┼───────────┐
                               ▼           ▼           ▼
                    ┌─────────────┐  ┌──────────┐  ┌─────────┐
                    │ FormatterBase│  │ToolChunk │  │ToolBase │
                    │ → 厂商 API  │  │ToolResp  │  │  (内置) │
                    └─────────────┘  └──────────┘  └─────────┘
                               │
                               ▼
                    ┌─────────────────────────────────────────┐
                    │          服务层 (app)                     │
                    │  ┌─────────────────────────────────────┐│
                    │  │ ChatService.run()                    ││
                    │  │  → publish events → MessageBus      ││
                    │  │  → upsert Msg    → Storage          ││
                    │  │  → update state  → Storage          ││
                    │  └─────────────────────────────────────┘│
                    │       │                    │             │
                    │       ▼                    ▼             │
                    │  ┌──────────┐      ┌──────────────┐     │
                    │  │MessageBus│      │   Storage    │     │
                    │  │(Redis)   │      │  (Redis)     │     │
                    │  │锁/SSE/   │      │  Agent/Sess/ │     │
                    │  │Inbox     │      │  Msg/Cred    │     │
                    │  └──────────┘      └──────────────┘     │
                    └─────────────────────────────────────────┘
                               │
                               ▼
                    ┌─────────────────────────────────────────┐
                    │          前端 (React)                     │
                    │  GET /sessions/{id}/stream → SSE        │
                    │  appendEvent() → 增量渲染               │
                    │  POST /chat/ → 触发                      │
                    └─────────────────────────────────────────┘

九、基于此框架做 ChatGPT 式产品的开发指南

阶段 0:先把现成的跑起来(半天)

# 1. 启动 Redis(持久化 + 消息总线都靠它)
docker run --rm -p 6379:6379 redis:7

# 2. 启动后端服务(默认 8000 端口)
cd examples/agent_service
python main.py

# 3. 启动前端(新开一个终端)
cd examples/web_ui
pnpm install
pnpm dev

浏览器打开前端,Setup 页填后端地址 http://localhost:8000 和用户名,即可创建助手、开对话、看流式回复、刷新后历史还在。

阶段 1:必做的核心改造

要做的事 为什么 怎么做
① 替换用户认证 现在只靠 X-User-ID 标识用户,不安全 src/agentscope/app/deps.pyget_current_user_id,接入 JWT/OAuth
② 配置大模型 + 凭证 让 AI 能回答 通过 /credential 接口存入 API Key,在创建会话时指定模型
③ 前端品牌化改造 变成你自己的产品 基于 examples/web_ui 改 UI/logo/配色,或复用 hooks 逻辑重做界面

阶段 2:生产持久化(按需)

  • 默认 Redis 已能持久化对话(开启 RDB/AOF 即可),中小规模直接用。
  • 如果需要长期归档/复杂查询,自己实现 StorageBase 子类对接 PostgreSQL/MySQL
  • 推荐架构:消息总线继续用 Redis(实时推送/锁离不开它),长期数据落 PostgreSQL

阶段 3:可选增强

  • 限流/配额/审计/内容审核:写 Agent 中间件挂到推理链上
  • 给 AI 加专属工具/知识库 (RAG):通过 extra_agent_tools 给不同用户挂不同工具
  • 历史消息分页/sessions/{id}/messages 已支持 offset/limit
  • AGUI 协议对接:挂载 AGUIProtocolMiddleware 对接特定前端 SDK
  • 团队 Agent 模式:利用框架内置的 TeamCreate/TeamSay 工具

阶段 4:部署运维

  • Redis 是必备依赖
  • 框架原生支持多进程部署(Redis 分布式锁 + Pub/Sub)
  • 用 Nginx/网关处理 SSE 长连接(注意关闭 text/event-stream 缓冲)
  • 多 worker 用 uvicorn --workers N 或 Gunicorn

推荐前端交互模式

  1. 登录后所有请求带 X-User-ID(或 JWT Token)
  2. 侧边栏:GET /agent/ → 选 Agent → GET /sessions/?agent_id=
  3. 打开对话:GET /messages + 建立 /stream SSE
  4. 发消息:POST /chat/,从 SSE 收事件渲染
  5. 新建对话:POST /sessions/

扩展点速查

  • extra_agent_middlewares(user_id, agent_id, session_id):审计、配额、RAG
  • extra_agent_tools(...):按用户挂载不同工具
  • extra_middlewares:挂载 AGUI 协议中间件
  • StorageBase 子类:换 PostgreSQL 做长期持久化
  • custom_subagent_templates:自定义团队子 Agent 类型

十、关键文件索引

核心库

模块 路径
Agent 核心 src/agentscope/agent/_agent.py
Agent 配置 src/agentscope/agent/_config.py
模型基类 src/agentscope/model/_base.py
模型响应 src/agentscope/model/_model_response.py
OpenAI 模型 src/agentscope/model/_openai_chat/_model.py
DashScope 模型 src/agentscope/model/_dashscope/__init__.py
Anthropic 模型 src/agentscope/model/_anthropic/_model.py
消息定义 src/agentscope/message/_base.py
内容块定义 src/agentscope/message/_block.py
事件定义 src/agentscope/event/_event.py
中间件基类 src/agentscope/middleware/_base.py
Tracing 中间件 src/agentscope/middleware/_tracing/
工具箱 src/agentscope/tool/_toolkit.py
工具基类 src/agentscope/tool/_base.py
内置工具 src/agentscope/tool/_builtin/
Formatter 基类 src/agentscope/formatter/_formatter_base.py
状态定义 src/agentscope/state/_state.py
权限系统 src/agentscope/permission/__init__.py
工作区基类 src/agentscope/workspace/_base.py
MCP 客户端 src/agentscope/mcp/_mcp_client.py
凭证定义 src/agentscope/credential/__init__.py

服务层

模块 路径
App 工厂 src/agentscope/app/_app.py
生命周期 src/agentscope/app/_lifespan.py
依赖注入 src/agentscope/app/deps.py
扩展点类型 src/agentscope/app/_types.py
Chat 服务(核心) src/agentscope/app/_service/_chat.py
Session 服务 src/agentscope/app/_service/_session.py
工具集组装 src/agentscope/app/_service/_toolkit.py
模型实例化 src/agentscope/app/_service/_model.py
存储抽象 src/agentscope/app/storage/_base.py
Redis 存储 src/agentscope/app/storage/_redis_storage.py
Session 数据模型 src/agentscope/app/storage/_model/_session.py
Agent 数据模型 src/agentscope/app/storage/_model/_agent.py
消息总线抽象 src/agentscope/app/message_bus/_base.py
Redis 消息总线 src/agentscope/app/message_bus/_redis_message_bus.py
Chat Run 注册表 src/agentscope/app/_manager/_chat_run_registry.py
唤醒调度器 src/agentscope/app/_manager/_wakeup_dispatcher.py
取消调度器 src/agentscope/app/_manager/_cancel_dispatcher.py
后台任务管理 src/agentscope/app/_manager/_background_task_manager.py
定时任务管理 src/agentscope/app/_manager/_scheduler/_scheduler_manager.py
SSE 路由 src/agentscope/app/_router/_session.py
Chat 路由 src/agentscope/app/_router/_chat.py
Inbox 中间件 src/agentscope/app/middleware/_inbox_middleware.py
状态变更中间件 src/agentscope/app/middleware/_state_change_middleware.py
工具卸载中间件 src/agentscope/app/middleware/_tool_offload_middleware.py
AGUI 协议 src/agentscope/app/middleware/_protocol/_agui.py

示例项目

模块 路径
后端启动入口 examples/agent_service/main.py
后端说明 examples/agent_service/README.md
前端 API 客户端 examples/web_ui/frontend/src/api/client.ts
前端会话 API examples/web_ui/frontend/src/api/session.ts
前端 Chat API examples/web_ui/frontend/src/api/chat.ts
前端类型定义 examples/web_ui/frontend/src/api/types.ts
前端消息核心 Hook examples/web_ui/frontend/src/hooks/useMessages.ts
前端会话 Hook examples/web_ui/frontend/src/hooks/useSessions.ts
前端 Chat 页面 examples/web_ui/frontend/src/pages/chat/index.tsx
前端 ChatViewport examples/web_ui/frontend/src/pages/chat/ChatViewport.tsx
前端 Setup 页 examples/web_ui/frontend/src/pages/setup/index.tsx
前端 package.json examples/web_ui/frontend/package.json
前端路由 examples/web_ui/frontend/src/App.tsx

附录:设计亮点总结

  1. 事件即协议AgentEvent 是 agent 与前端之间的唯一通信契约,细粒度到 block 级别 delta
  2. Msg 与 Event 双向映射append_event() 可从事件流重建消息,保证持久化与实时流一致
  3. 工具状态机ToolCallState 贯穿权限、确认、外部执行全生命周期
  4. 分层中间件on_acting 只管 I/O,on_reply 管全流程,职责清晰
  5. Formatter 解耦:新增模型只需实现 _call_api + 对应 Formatter,Agent 层零改动
  6. 无独立 Memory 模块:通过 context + summary 压缩 + offloader 实现可扩展的记忆管理
  7. POST 触发 + SSE 订阅双通道:适配多来源事件(用户/定时/团队/后台唤醒)
  8. 分布式锁保证单 Session 串行:跨进程安全
  9. 状态可完整序列化恢复:对话中断后可从任意状态接续
  10. 多租户按 user_id 隔离:Redis key prefix + Storage API 第一参数

本报告基于 AgentScope v2.0.1(2026-06-05 发布)源代码分析生成。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐