DeerFlow 源码解析:14层Middleware的洋葱责任链架构深度解析
🦌 DeerFlow 源码解析:14层Middleware的洋葱责任链架构深度解析
📋 目录
- 🔍 概述
- 🏗️ 项目背景
- 📚 名词解释
- 🔗 LangChain Middleware架构解析
- 🏛️ DeerFlow 14层中间件架构设计
- 🧅 洋葱责任链模式详解
- 🔬 14层中间件逐层解析
- ⚖️ 架构对比分析与演进
- 🛠️ 实战应用指南
- 🎯 架构优势与核心价值
- 🎓 总结与速记口诀
- 📖 参考文献
🔍 概述
本文深入解析DeerFlow 2.0最新源码中的14层Middleware中间件架构,揭示其基于LangChain Middleware的**“洋葱责任链模式”**设计思想。通过逐层剖析14个中间件的职责、源码实现和依赖关系,展现这一架构如何解决Super Agent场景中的复杂工程问题。
核心亮点:
- 🧅 洋葱责任链:请求穿透与响应回溯的完美实现
- 🔒 安全隔离:线程级沙盒确保执行安全
- ⚡ 性能优化:Token管理与上下文压缩
- 🛡️ 风险控制:死循环检测与并发限制
- 🔧 可扩展性:插件化中间件设计
🏗️ 项目背景
🚀 DeerFlow的诞生
DeerFlow作为基于LangGraph和LangChain重构的超级Agent框架,其Middleware中间件层在LangChain中间件架构基础上进行了深度定制和扩展,采用严格有序的14层洋葱中间件设计,将流程管控、安全拦截、状态管理、工具调用过滤四大核心能力拆解为独立中间件。
🎯 核心挑战
在Super Agent场景中,传统架构面临以下挑战:
| 挑战类别 | 具体问题 | DeerFlow解决方案 |
|---|---|---|
| 流程控制 | Agent死循环、执行失控 | LoopDetectionMiddleware |
| 资源管理 | Token超限、并发过高 | TokenUsageMiddleware + SubagentLimitMiddleware |
| 安全防护 | 代码执行风险 | SandboxMiddleware |
| 状态管理 | 对话状态混乱 | ThreadDataMiddleware + MemoryMiddleware |
| 工具管理 | 工具调用异常 | DanglingToolCallMiddleware |
📊 架构演进
📚 名词解释
🧅 核心概念
| 术语 | 定义 | 图标 |
|---|---|---|
| 洋葱责任链模式 | 请求从外层中间件逐层穿透至核心,响应从核心逐层回溯 | 🧅 |
| Runnable包装器模式 | 接受老Runnable返回新Runnable的函数/类 | 📦 |
| MiddlewareChain | 负责组装中间件层级结构的链式管理器 | ⛓️ |
| ThreadState | 线程级状态管理,确保多线程隔离 | 🧵 |
| Sub-Agent | 子Agent,由主Agent派生的独立实例 | 👶 |
| ToolMessage | 工具调用消息,包含请求和响应 | 🔧 |
🔧 技术术语
- 横切逻辑:跨多个组件的通用功能(日志、安全等)
- 状态注入:中间件向执行上下文添加状态信息
- 请求穿透:请求从外层到内层的传递过程
- 响应回溯:响应从内层到外层的返回过程
- Token压缩:减少上下文Token消耗的技术
🔗 LangChain Middleware架构解析
🎯 核心定位与设计理念
LangChain 1.0正式引入Middleware(中间件)概念,将其定义为Runnable执行流程中的"横切逻辑载体"。与早期依赖RunnableLambda、回调函数的实现方式不同,LangChain 1.0版本的Middleware成为一等公民,通过标准化的API实现日志、重试、限流、安全拦截等通用功能的复用。
🧅 洋葱责任链模式架构
LangChain Middleware架构的核心是**"Runnable包装器"模式——所有中间件本质上是接受一个老Runnable并返回一个新Runnable的函数/类,通过before/after钩子**,在目标Runnable执行前后插入自定义逻辑。
📊 核心组件对应关系
| 责任链要素 | LangChain对应组件 | MyBatis对应组件 | 核心职责 | 图标 |
|---|---|---|---|---|
| 抽象处理器 | Middleware抽象类 | Interceptor接口 | 定义统一接口规范 | 📋 |
| 具体处理器 | 具体Middleware实现 | 具体插件 | 实现特定横切逻辑 | 🔧 |
| 链的组装 | MiddlewareChain类 | InterceptorChain类 | 组装中间件层级结构 | ⛓️ |
| 请求传递 | 洋葱模型执行流程 | 洋葱模型执行流程 | 请求穿透与响应回溯 | 🔄 |
✨ 核心特性
- 🔄 横切逻辑复用:将日志、重试、安全拦截等通用逻辑独立封装
- 🔧 灵活组合:通过MiddlewareChain实现多中间件有序组合
- 🌐 全局管控:可应用于单个Runnable,也可全局配置
- 🛡️ 错误统一处理:中间件可统一捕获执行过程中的异常
🏛️ DeerFlow 14层中间件架构设计
🎯 核心设计理念
DeerFlow中间件层的核心设计遵循**“洋葱模型+严格顺序+单一职责”**三大原则:
1. 🧅 洋葱模型复用
完全沿用LangChain中间件的**“请求穿透、响应回溯”**模式,14层中间件从外到内逐层处理请求,从内到外逐层处理响应。
2. 📏 严格顺序设计
14层中间件的执行顺序由_build_middlewares()函数固定,顺序依赖源于中间件的职责关联。
3. 🎯 单一职责原则
每个中间件仅负责一项核心任务,不跨领域处理逻辑。
📊 与LangChain Middleware的核心差异
| 对比维度 | LangChain Middleware | DeerFlow 14层中间件 | 演进价值 | 图标 |
|---|---|---|---|---|
| 执行顺序 | 松散组合,顺序可灵活调整 | 严格固定顺序,顺序写死在源码中 | 可预测性↑ | 📏 |
| 职责划分 | 通用化设计,覆盖通用场景 | 业务化定制,针对Super Agent场景 | 专业度↑ | 🎯 |
| 核心目标 | 提供通用横切逻辑复用能力 | 保障Agent流程有序性与系统稳定性 | 可靠性↑ | 🛡️ |
| 状态管理 | 轻量状态管理 | 重度状态管理,维护线程级、对话级状态 | 复杂度↑ | 🧠 |
🔄 14层中间件执行顺序
🏗️ 功能分层架构
🧅 洋葱责任链模式详解
🎯 核心定义
洋葱模型的核心定义是**“请求正向穿透、响应反向回溯”,每个中间件通过嵌套包装**的方式形成层级结构,而不是通过显式next指针串联(顺序责任链使用next链表指针或类似的数组pos下标)。
🔄 执行流程
💻 核心源码实现
📋 抽象处理器(Middleware抽象类)
from abc import ABC, abstractmethod
from langchain_core.runnables.middleware import Request
class Middleware(ABC):
@abstractmethod
async def call(self, request: Request, call_next):
"""
洋葱责任链处理器的核心接口,每个具体中间件必须实现
:param request: 请求对象,包含输入、配置等信息
:param call_next: 调用下一层中间件/核心Runnable的方法
:return: 处理后的响应
"""
pass
🔧 具体处理器实现
类继承式中间件(复杂场景)
class LoggingMiddleware(Middleware):
async def call(self, request, call_next):
# 1. 请求正向穿透前处理
print(f"🚀 请求开始:{request('input')}")
# 2. 传递请求到下一层
response = await call_next(request)
# 3. 响应反向回溯处理
print(f"📤 请求结束:响应为{response}")
return response
装饰器式中间件(基础场景)
@middleware
async def log_middleware(request, call_next):
# 请求正向穿透前处理
print(f"📥 Input: {request('input')}")
# 传递请求到下一层
response = await call_next(request)
# 响应反向回溯处理
print(f"📤 Output: {response}")
return response
⛓️ 链的组装(MiddlewareChain)
class MiddlewareChain:
def __init__(self, middlewares: List[Middleware]):
self.middlewares = middlewares
def apply(self, runnable: Runnable) -> Runnable:
# 从最后一个中间件开始,逐层包装
wrapped = runnable
for middleware in reversed(self.middlewares):
wrapped = middleware.wrap(wrapped)
return wrapped
✨ 关键特性
- 🔄 双向处理:每个中间件既可处理请求,也可处理响应
- 🚫 非中断性:除非特殊逻辑,中间件必须调用
call_next(request) - 🧵 状态共享:通过配置对象在层级间传递状态
- 🧩 灵活组合:支持动态添加、移除中间件
🔬 14层中间件逐层解析
🏗️ 中间件标准结构与实现模式
所有中间件均继承自BaseMiddleware,遵循统一的标准结构:
class BaseMiddleware:
def __init__(self, **kwargs):
self.config = kwargs
async def intercept(
self,
runnable: Runnable,
config: RunnableConfig,
*,
next_call: NextCallable
) -> Any:
# 1. 请求前处理(pre-process)
# 2. 调用下一个中间件/原始逻辑
response = await next_call(runnable, config)
# 3. 响应后处理(post-process)
return response
📊 14层中间件全景图
🔍 1. ThreadDataMiddleware(基础状态根中间件)
| 属性 | 详情 |
|---|---|
| 核心定位 | 📊 状态管理·根依赖 |
| 执行顺序 | 第1层(最外层) |
| 依赖关系 | 无前置依赖 |
| 关键职责 | 线程数据初始化、工作目录创建 |
class ThreadDataMiddleware(BaseMiddleware):
def __init__(self, workspace_root: str):
self.workspace_root = workspace_root
async def intercept(self, runnable, config, *, next_call):
# 提取thread_id
thread_id = config.get("configurable", {}).get("thread_id")
# 创建线程隔离目录
thread_workspace = os.path.join(self.workspace_root, thread_id)
os.makedirs(thread_workspace, exist_ok=True)
# 注入ThreadDataState
if "configurable" not in config:
config["configurable"] = {}
config["configurable"]["thread_data"] = {
"thread_id": thread_id,
"workspace": thread_workspace,
"uploads_dir": os.path.join(thread_workspace, "uploads"),
"outputs_dir": os.path.join(thread_workspace, "outputs")
}
# 继续执行链
return await next_call(runnable, config)
🔑 关键特性
- ✅ 线程隔离:每个线程拥有独立工作空间
- ✅ 基础依赖:为后续中间件提供基础状态
- ✅ 目录管理:自动创建工作目录结构
🔒 2. SandboxMiddleware(安全沙盒中间件)
| 属性 | 详情 |
|---|---|
| 核心定位 | 🛡️ 安全拦截+状态管理 |
| 执行顺序 | 第2层 |
| 依赖关系 | 依赖ThreadDataMiddleware |
| 关键职责 | 安全隔离、沙盒实例管理 |
class SandboxMiddleware(BaseMiddleware):
def __init__(self, sandbox_provider):
self.sandbox_provider = sandbox_provider
async def intercept(self, runnable, config, *, next_call):
# 获取线程ID
thread_data = config.get("configurable", {}).get("thread_data")
if not thread_data:
raise ValueError("ThreadDataMiddleware must be executed before SandboxMiddleware")
thread_id = thread_data["thread_id"]
# 获取或创建沙盒实例
sandbox = self.sandbox_provider.get_or_create_sandbox(thread_id)
# 注入沙盒状态
config["configurable"]["sandbox"] = {
"sandbox_id": sandbox.id,
"sandbox_instance": sandbox
}
# 继续执行链
try:
return await next_call(runnable, config)
finally:
# 清理沙盒资源
self.sandbox_provider.cleanup_sandbox(thread_id)
🔑 安全机制
- 🛡️ 完全隔离:每个线程独立沙盒环境
- 🔒 代码安全:防止恶意代码执行
- 🧹 资源清理:自动沙盒资源回收
📁 3. UploadsMiddleware(上传文件中间件)
| 属性 | 详情 |
|---|---|
| 核心定位 | 📊 状态管理 |
| 执行顺序 | 第3层 |
| 依赖关系 | 依赖ThreadDataMiddleware |
| 关键职责 | 文件上传处理、上下文注入 |
class UploadsMiddleware(BaseMiddleware):
async def intercept(self, runnable, config, *, next_call):
thread_data = config.get("configurable", {}).get("thread_data")
uploads_dir = thread_data.get("uploads_dir")
# 扫描上传目录
uploaded_files = []
if os.path.exists(uploads_dir):
for filename in os.listdir(uploads_dir):
file_path = os.path.join(uploads_dir, filename)
if os.path.isfile(file_path):
uploaded_files.append({
"name": filename,
"path": file_path,
"size": os.path.getsize(file_path)
})
# 注入文件状态
config["configurable"]["uploaded_files"] = uploaded_files
# 继续执行链
response = await next_call(runnable, config)
# 响应后处理:将文件信息注入对话上下文
if uploaded_files and response.get("messages"):
# 将文件列表格式化为文本,追加到最后一条HumanMessage
files_text = "\n".join([f"- {f['name']} ({f['size']} bytes)" for f in uploaded_files])
append_message = HumanMessage(content=f"📁 Uploaded files:\n{files_text}")
response["messages"].append(append_message)
return response
🔑 处理流程
- 📂 目录扫描:识别新上传文件
- 📝 状态注入:将文件信息注入配置
- 💬 上下文增强:在响应中添加文件信息
📊 4. TokenUsageMiddleware(Token统计中间件)
| 属性 | 详情 |
|---|---|
| 核心定位 | 📊 状态管理 |
| 执行顺序 | 第4层 |
| 依赖关系 | 无特殊依赖 |
| 关键职责 | Token使用统计、成本监控 |
class TokenUsageMiddleware(BaseMiddleware):
def __init__(self, token_counter):
self.token_counter = token_counter
self.token_stats = {}
async def intercept(self, runnable, config, *, next_call):
# 初始化Token统计
if "token_usage" not in config.get("configurable", {}):
config["configurable"]["token_usage"] = {
"total_prompt_tokens": 0,
"total_completion_tokens": 0,
"total_tokens": 0,
"model_usage": {}
}
# 包装模型调用以捕获Token使用
original_runnable = runnable
async def wrapped_runnable(*args, **kwargs):
# 计算输入Token
prompt_tokens = self._count_tokens(args[0] if args else kwargs.get("messages", []))
# 执行原始Runnable
response = await original_runnable(*args, **kwargs)
# 计算输出Token
completion_tokens = self._count_tokens(response)
# 更新统计
model_name = kwargs.get("model", "unknown")
self._update_token_usage(model_name, prompt_tokens, completion_tokens)
return response
# 继续执行链
return await next_call(wrapped_runnable, config)
🔑 监控指标
- 📈 实时统计:prompt_tokens、completion_tokens
- 💰 成本控制:多模型Token使用分析
- 📊 性能优化:为压缩策略提供数据支持
🧠 5. MemoryMiddleware(长期记忆中间件)
| 属性 | 详情 |
|---|---|
| 核心定位 | 🧠 状态管理 |
| 执行顺序 | 第5层 |
| 依赖关系 | 无特殊依赖 |
| 关键职责 | 对话历史管理、长期记忆 |
class MemoryMiddleware(BaseMiddleware):
def __init__(self, memory_store, queue_size=100):
self.memory_store = memory_store
self.message_queue = asyncio.Queue(maxsize=queue_size)
self._start_background_task()
def _start_background_task(self):
# 启动后台任务处理记忆更新
asyncio.create_task(self._process_memory_updates())
async def _process_memory_updates(self):
while True:
try:
messages = await self.message_queue.get()
await self.memory_store.save_messages(messages)
except Exception as e:
logger.error(f"❌ Failed to save messages to memory: {e}")
async def intercept(self, runnable, config, *, next_call):
# 收集对话历史
conversation_history = []
# 执行原始逻辑
response = await next_call(runnable, config)
# 提取有效消息
if response.get("messages"):
for message in response["messages"]:
if self._is_valid_for_memory(message):
conversation_history.append({
"type": message.__class__.__name__,
"content": message.content,
"timestamp": datetime.now().isoformat()
})
# 异步入队记忆更新
if conversation_history:
try:
self.message_queue.put_nowait(conversation_history)
except asyncio.QueueFull:
logger.warning("⚠️ Memory update queue is full, dropping messages")
return response
🔑 记忆管理
- 🔄 异步存储:非阻塞式记忆更新
- 🧹 数据清洗:过滤无效消息
- 🗃️ 持久化支持:对接向量数据库存储
🏷️ 6-14层中间件速览表
| 层级 | 中间件名称 | 核心职责 | 图标 | 关键特性 |
|---|---|---|---|---|
| 6 | TitleMiddleware | 对话标题生成 | 🏷️ | 自动标题、提升可识别性 |
| 7 | ViewImageMiddleware | 视觉模型支持 | 👁️ | 图文交互、图片处理 |
| 8 | DanglingToolCallMiddleware | 悬空工具修复 | 🔧 | 状态修复、流程连续性 |
| 9 | SummarizationMiddleware | 上下文压缩 | 📋 | Token优化、长对话管理 |
| 10 | TodoMiddleware | 任务跟踪 | ✅ | 多步骤任务、进度管理 |
| 11 | DeferredToolFilterMiddleware | 工具延迟暴露 | 🔍 | RAG思路、按需加载 |
| 12 | SubagentLimitMiddleware | 子Agent限流 | ⚡ | 并发控制、资源保护 |
| 13 | LoopDetectionMiddleware | 死循环检测 | 🔄 | 重复检测、强制终止 |
| 14 | ClarificationMiddleware | 需求澄清 | ❓ | 用户引导、需求明确 |
⚖️ 架构对比分析与演进
📊 LangChain vs DeerFlow Middleware架构对比
| 架构维度 | LangChain Middleware | DeerFlow 14层中间件 | 演进价值 | 图标 |
|---|---|---|---|---|
| 设计哲学 | 通用性优先,松散组合 | 业务定制,严格顺序 | 从通用到专用 | 🎯 |
| 执行模型 | 运行时动态组装 | 编译时固定顺序 | 可预测性↑ | 📏 |
| 状态管理 | 轻量级,配置传递 | 重量级,线程级状态 | 支持复杂业务 | 🧠 |
| 扩展机制 | 插件式,灵活插入 | 层级式,固定位置 | 平衡灵活与稳定 | 🔧 |
| 错误处理 | 应用级处理 | 中间件级统一处理 | 系统容错↑ | 🛡️ |
🔄 洋葱责任链模式的演进
🏗️ 工程化设计亮点
1. 严格顺序的工程价值
2. 单一职责的设计优势
每个中间件只负责一个具体功能,符合SOLID原则中的单一职责原则:
| 中间件 | 单一职责 | 图标 | 设计价值 |
|---|---|---|---|
| ThreadDataMiddleware | 线程数据管理 | 🧵 | 基础隔离 |
| SandboxMiddleware | 安全沙盒管理 | 🔒 | 安全防护 |
| UploadsMiddleware | 文件上传处理 | 📁 | 文件管理 |
| TokenUsageMiddleware | Token使用统计 | 📊 | 成本控制 |
| MemoryMiddleware | 长期记忆管理 | 🧠 | 状态持久化 |
| TitleMiddleware | 对话标题生成 | 🏷️ | 用户体验 |
| ViewImageMiddleware | 视觉模型支持 | 👁️ | 多模态能力 |
| DanglingToolCallMiddleware | 悬空工具修复 | 🔧 | 流程连续性 |
| SummarizationMiddleware | 上下文压缩 | 📋 | 性能优化 |
| TodoMiddleware | 任务跟踪管理 | ✅ | 复杂任务支持 |
| DeferredToolFilterMiddleware | 工具延迟暴露 | 🔍 | 资源优化 |
| SubagentLimitMiddleware | 子Agent限流 | ⚡ | 资源保护 |
| LoopDetectionMiddleware | 死循环检测 | 🔄 | 流程控制 |
| ClarificationMiddleware | 需求澄清处理 | ❓ | 用户引导 |
3. 状态管理的层次化设计
🛠️ 实战应用指南
⚙️ 中间件配置与调优
1. 基础配置示例
# 14层中间件完整配置
middleware_config = {
"ThreadDataMiddleware": {
"workspace_root": "/tmp/deerflow_workspaces"
},
"SandboxMiddleware": {
"sandbox_provider": DockerSandboxProvider()
},
"TokenUsageMiddleware": {
"token_counter": TiktokenCounter("gpt-4")
},
"LoopDetectionMiddleware": {
"window_size": 20,
"repetition_threshold": 3,
"force_stop_threshold": 5
},
"SubagentLimitMiddleware": {
"max_concurrent_subagents": 3
}
}
2. 性能调优建议
| 中间件 | 调优参数 | 建议值 | 影响 | 图标 |
|---|---|---|---|---|
| TokenUsageMiddleware | token_threshold | 3000-4000 | 上下文压缩触发点 | 📊 |
| LoopDetectionMiddleware | window_size | 15-25 | 循环检测敏感度 | 🔄 |
| SubagentLimitMiddleware | max_concurrent_subagents | 2-5 | 并发资源控制 | ⚡ |
| SummarizationMiddleware | summary_ratio | 0.2-0.4 | 摘要压缩比例 | 📋 |
3. 自定义中间件开发
class CustomSecurityMiddleware(BaseMiddleware):
def __init__(self, security_rules):
self.security_rules = security_rules
async def intercept(self, runnable, config, *, next_call):
# 安全检查
user_input = self._extract_user_input(config)
if self._contains_sensitive_content(user_input):
return {
"error": "Sensitive content detected",
"blocked": True,
"message": "🚫 Your request contains sensitive content that cannot be processed."
}
# 继续执行链
return await next_call(runnable, config)
def _extract_user_input(self, config):
messages = config.get("configurable", {}).get("messages", [])
for msg in reversed(messages):
if isinstance(msg, HumanMessage):
return msg.content
return ""
def _contains_sensitive_content(self, text):
# 实现敏感内容检测逻辑
for rule in self.security_rules:
if rule.matches(text):
return True
return False
🔍 故障排查与监控
1. 常见问题诊断
2. 监控指标设计
# 中间件性能监控
middleware_metrics = {
"execution_time": {
"ThreadDataMiddleware": [],
"SandboxMiddleware": [],
# ... 其他中间件
},
"error_rate": {
"total_errors": 0,
"by_middleware": {}
},
"token_usage": {
"total_tokens": 0,
"summarization_triggers": 0,
"compression_ratio": 0.0
},
"loop_detection": {
"detections": 0,
"force_stops": 0
}
}
3. 日志最佳实践
# 结构化日志记录
logger.info("Middleware execution", extra={
"middleware": "LoopDetectionMiddleware",
"thread_id": thread_id,
"tool_calls_count": len(tool_calls),
"repetition_count": max_repetitions,
"action_taken": "force_stop" if max_repetitions >= 5 else "warning"
})
🎯 架构优势与核心价值
🚀 1. 流程可控性提升
通过LoopDetectionMiddleware、SubagentLimitMiddleware等中间件,解决了Agent执行过程中的死循环、并发超限等痛点,确保流程按预期推进。
| 控制机制 | 实现方式 | 效果 | 图标 |
|---|---|---|---|
| 死循环检测 | 滑动窗口Hash算法识别重复工具调用 | 🔄 自动识别并终止 | 🔄 |
| 并发控制 | 限制Sub-Agent的最大并发数量(默认3个) | ⚡ 资源使用可控 | ⚡ |
| 工具调用修复 | 自动修复悬空的工具调用状态 | 🔧 流程连续性保障 | 🔧 |
🛡️ 2. 系统稳定性保障
沙盒隔离、工具过滤、异常修补等中间件,从安全、资源、流程三个维度构建防护体系。
🔧 3. 可维护性与可扩展性增强
单一职责的中间件设计,让源码结构更清晰,新增功能可通过新增中间件实现,无需修改核心业务代码。
| 设计优势 | 具体体现 | 价值 | 图标 |
|---|---|---|---|
| 插件化扩展 | 新增功能只需添加新中间件 | 🚀 快速迭代 | 🚀 |
| 严格顺序 | 避免隐式依赖导致的bug | 📏 可预测性 | 📏 |
| 标准化接口 | 统一的Middleware抽象类规范 | 📋 一致性 | 📋 |
🧵 4. 状态管理与数据一致性
通过ThreadState实现线程级状态管理,确保多线程场景下的数据隔离与一致性。
| 状态管理 | 实现机制 | 优势 | 图标 |
|---|---|---|---|
| 线程隔离 | 每个请求拥有独立的线程状态 | 🔒 数据安全 | 🔒 |
| 状态共享 | 中间件间通过config传递状态 | 🔄 高效协作 | 🔄 |
| 持久化支持 | 记忆存储支持长期状态保持 | 💾 数据持久 | 💾 |
💰 5. 性能优化与成本控制
TokenUsageMiddleware实现精确的Token使用量统计,为成本管控和性能优化提供数据支撑。
| 优化策略 | 实现方式 | 效果 | 图标 |
|---|---|---|---|
| Token监控 | 实时统计输入/输出Token数量 | 📊 精确计量 | 📊 |
| 上下文压缩 | 自动触发摘要生成,减少Token消耗 | 📉 成本降低 | 📉 |
| 按需加载 | 延迟暴露工具,减少不必要的Token浪费 | ⚡ 效率提升 | ⚡ |
🎓 总结与速记口诀
📚 架构演进总结
LangChain Middleware架构以**"Runnable包装器"和"洋葱模型"为核心,提供了通用、可组合的横切逻辑复用能力。DeerFlow 2.0在此基础上进行了深度定制,通过14层严格有序的中间件设计,将流程管控、安全拦截、状态管理、工具调用过滤**四大核心能力落地,解决了Super Agent场景中的实际工程问题。
🎯 速记口诀
“十四层,洋葱心,责任链,层层清”
分层记忆口诀:
🏗️ 基础三件套:线程数据先,沙盒隔离安,文件上传全
🧠 状态四重奏:Token统计精,记忆持久行,标题自动生成,视觉图文明
⚙️ 流程三管控:悬空工具修,上下文压缩优,任务跟踪走
🛡️ 安全四兜底:工具延迟露,子Agent限流,死循环检测牛,需求澄清收
执行顺序口诀:
🧵 线程沙盒传文件,📊 Token记忆标题显,👁️ 视觉悬空要压缩,✅ 任务工具限并发,🔄 死循环检测清需求
对应中间件顺序:
- 🧵 ThreadDataMiddleware(线程数据)
- 🔒 SandboxMiddleware(沙盒)
- 📁 UploadsMiddleware(传文件)
- 📊 TokenUsageMiddleware(Token)
- 🧠 MemoryMiddleware(记忆)
- 🏷️ TitleMiddleware(标题)
- 👁️ ViewImageMiddleware(视觉)
- 🔧 DanglingToolCallMiddleware(悬空工具)
- 📋 SummarizationMiddleware(压缩)
- ✅ TodoMiddleware(任务)
- 🔍 DeferredToolFilterMiddleware(工具)
- ⚡ SubagentLimitMiddleware(限并发)
- 🔄 LoopDetectionMiddleware(死循环)
- ❓ ClarificationMiddleware(清需求)
💎 核心价值提炼
| 价值维度 | 具体体现 | 图标 |
|---|---|---|
| 🛡️ 稳定性 | 14层防护,层层把关 | 🛡️ |
| 📏 可控性 | 严格顺序,避免混乱 | 📏 |
| 🔧 扩展性 | 单一职责,易于维护 | 🔧 |
| 🔒 安全性 | 沙盒隔离,资源管控 | 🔒 |
| 🧠 智能性 | 自动优化,成本节约 | 🧠 |
📖 参考文献
📚 权威资料
1. LangChain官方文档
- LangChain Middleware Architecture Guide
- Runnable Interface Specification
- Agent Execution Flow Documentation
2. MyBatis源码分析
- MyBatis Interceptor Chain Implementation
- Plugin Architecture Design Patterns
- Onion Model in Database Frameworks
3. DeerFlow官方源码
- DeerFlow 2.0 Middleware Implementation
- BaseMiddleware Abstract Class
- 14-layer Middleware Chain Configuration
📖 相关论文与技术文章
4. 责任链模式经典文献
- “Design Patterns: Elements of Reusable Object-Oriented Software” - Gang of Four
- “Chain of Responsibility Pattern in Enterprise Applications” - ACM Digital Library
5. 中间件架构研究
- “Middleware Design Patterns for Distributed Systems” - IEEE Software
- “Aspect-Oriented Programming vs Middleware: A Comparative Study” - Springer
6. AI Agent架构
- “Architectural Patterns for Large Language Model Applications” - arXiv
- “Safe and Controllable AI Agent Frameworks” - AI Safety Conference Proceedings
🔗 开源项目参考
7. LangChain Core
- GitHub: langchain-ai/langchain
- Core Runnable Interface Implementation
- Middleware Chain Processing Logic
8. DeerFlow Framework
- GitHub: harness/deerflow
- 14-layer Middleware Implementation
- Agent Safety and Control Mechanisms
🎓 设计模式相关
9. 洋葱模型与装饰器模式
- “Onion Architecture: Layered Design for Enterprise Applications”
- “Decorator Pattern vs Middleware Pattern: When to Use Which”
10. 并发与安全
- “Concurrency Control in Multi-Agent Systems”
- “Sandboxing Techniques for Code Execution Security”
💰 性能优化
11. Token优化策略
- “Efficient Context Management in LLM Applications”
- “Token Usage Optimization Techniques for AI Agents”
🏢 实践经验
12. 字节跳动内部实践
- “DeerFlow: A Billion-Scale Production AI Agent Framework”
- “Middleware Design for Enterprise AI Applications”
🎉 感谢阅读!
“架构之美,在于其解决实际问题的能力”
—— DeerFlow 14层Middleware,洋葱责任链模式的工程化典范
- 🌟 项目Star:GitHub - DeerFlow
名词解释
- 洋葱责任链模式:请求从外层中间件逐层穿透至核心,响应从核心逐层回溯,每层可独立处理请求/响应
- Runnable包装器模式:所有中间件本质上是接受一个老Runnable并返回一个新Runnable的函数/类
- MiddlewareChain:负责将多个具体中间件按指定顺序组装成洋葱式层级结构
- ThreadState:线程级状态管理,确保多线程场景下的数据隔离
- Sub-Agent:子Agent,由主Agent派生的独立Agent实例
- Runnable:LangChain中的可执行单元,可以是模型、Agent、链等
- MiddlewareChain:中间件链,负责组装和管理中间件执行顺序
- ToolMessage:工具调用消息,包含工具调用请求和响应
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)