【BabyAGI 全解析 · 下】工具集成 / 记忆优化 / 实战案例 / LangChain 生产级 Pipeline
【BabyAGI 全解析 · 下】工具集成 / 记忆优化 / 实战案例 / LangChain 生产级 Pipeline
导语:上篇我们拆解了 BabyAGI 的核心架构,手写了一个 80 行的 Mini BabyAGI。但那个版本只能"思考"不能"行动"——它没有手,不能搜索网页、不能执行代码、不能读写文件。这篇是 BabyAGI 系列的下篇,重点解决三个问题:怎么给 Agent 装上工具?怎么优化记忆系统?怎么从玩具变成生产级系统? 三个完整实战案例 + LangChain 集成方案,学完这篇你就能把 BabyAGI 真正用到项目里。
文章目录

一、工具集成:让 Agent 拥有"手脚"
1.1 为什么需要工具?
上篇的 Mini BabyAGI 有一个致命问题:Execution Agent 只能调用 LLM 生成文本,不能和外部世界交互。它像一个只有大脑没有手脚的人——能想但不能做。
现实中的 Agent 需要:
| 能力 | 工具 | 场景 |
|---|---|---|
| 获取信息 | 网页搜索、API 调用 | 调研、竞品分析 |
| 执行逻辑 | Python REPL、代码沙箱 | 数据计算、算法验证 |
| 持久化 | 文件读写、数据库 | 保存报告、存储结果 |
| 通信 | 邮件、Slack、Webhook | 通知、协作 |
| 浏览 | Playwright、Selenium | 网页抓取、自动化测试 |
1.2 工具集成架构

核心思路:Execution Agent 不再直接返回文本,而是先分析任务需要什么工具,调用工具获取结果,再基于结果生成最终输出。
这就是 ReAct(Reasoning + Acting)范式的核心:
ReAct Loop = Thought → Action → Observation → Thought → ⋯ \text{ReAct Loop} = \text{Thought} \rightarrow \text{Action} \rightarrow \text{Observation} \rightarrow \text{Thought} \rightarrow \cdots ReAct Loop=Thought→Action→Observation→Thought→⋯
- Thought:LLM 思考下一步该做什么
- Action:调用工具执行操作
- Observation:获取工具返回的结果
- 循环直到 LLM 认为任务完成
1.3 工具注册与调度系统
# tools.py
from typing import Callable, List, Dict, Any
import json
class Tool:
"""工具基类"""
def __init__(self, name: str, description: str, func: Callable):
self.name = name
self.description = description
self.func = func
def run(self, *args, **kwargs) -> str:
try:
result = self.func(*args, **kwargs)
return str(result)
except Exception as e:
return f"[工具执行错误] {type(e).__name__}: {str(e)}"
class ToolRegistry:
"""工具注册表 - 管理所有可用工具"""
def __init__(self):
self._tools: Dict[str, Tool] = {}
def register(self, name: str, description: str, func: Callable):
"""注册一个工具"""
self._tools[name] = Tool(name, description, func)
print(f" ✅ 工具已注册: {name}")
def get(self, name: str) -> Tool:
return self._tools.get(name)
def get_descriptions(self) -> str:
"""生成工具描述文本,供 LLM 选择工具时使用"""
desc = ""
for name, tool in self._tools.items():
desc += f"- {name}: {tool.description}\n"
return desc
def list_tools(self) -> List[str]:
return list(self._tools.keys())
# ============ 注册具体工具 ============
def web_search(query: str) -> str:
"""网页搜索工具"""
# 实际项目中替换为 Tavily / SerpAPI / Bing Search
try:
import requests
# 示例:使用 DuckDuckGo 搜索(免费,无需 API Key)
from duckduckgo_search import DDGS
results = DDGS().text(query, max_results=5)
output = ""
for r in results:
output += f"标题: {r['title']}\n摘要: {r['body']}\n链接: {r['href']}\n\n"
return output if output else "未找到相关结果"
except ImportError:
return f"[模拟搜索] 关于 '{query}' 的搜索结果:这是一个模拟结果,请安装 duckduckgo-search"
def code_execute(code: str) -> str:
"""Python 代码执行工具(沙箱环境)"""
import sys
from io import StringIO
# 安全限制:禁止危险操作
forbidden = ['import os', 'import subprocess', 'import shutil',
'rm -rf', '__import__', 'eval(', 'exec(']
for f in forbidden:
if f in code:
return f"[安全拦截] 代码包含禁止操作: {f}"
# 捕获输出
old_stdout = sys.stdout
sys.stdout = captured = StringIO()
try:
local_vars = {}
exec(code, {"__builtins__": {}}, local_vars)
output = captured.getvalue()
if local_vars:
output += f"\n变量: {list(local_vars.keys())}"
return output if output else "代码执行成功(无输出)"
except Exception as e:
return f"[执行错误] {type(e).__name__}: {str(e)}"
finally:
sys.stdout = old_stdout
def file_write(filename: str, content: str) -> str:
"""文件写入工具"""
# 安全限制:只允许写入指定目录
safe_dir = "./babyagi_output"
import os
os.makedirs(safe_dir, exist_ok=True)
filepath = os.path.join(safe_dir, os.path.basename(filename))
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return f"文件已写入: {filepath} ({len(content)} 字符)"
def file_read(filename: str) -> str:
"""文件读取工具"""
import os
safe_dir = "./babyagi_output"
filepath = os.path.join(safe_dir, os.path.basename(filename))
if not os.path.exists(filepath):
return f"[错误] 文件不存在: {filepath}"
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
return content[:5000] # 限制读取长度
def http_request(url: str, method: str = "GET") -> str:
"""HTTP 请求工具"""
import requests
try:
resp = requests.request(method, url, timeout=10)
return f"状态码: {resp.status_code}\n内容: {resp.text[:2000]}"
except Exception as e:
return f"[请求错误] {str(e)}"
# 创建全局注册表
registry = ToolRegistry()
registry.register("web_search", "搜索网页获取信息,输入搜索关键词", web_search)
registry.register("code_execute", "执行 Python 代码并返回输出,输入代码字符串", code_execute)
registry.register("file_write", "将内容写入文件,输入文件名和内容", file_write)
registry.register("file_read", "读取文件内容,输入文件名", file_read)
registry.register("http_request", "发送 HTTP 请求,输入 URL 和方法", http_request)
1.4 ReAct 风格的 Execution Agent
# react_agent.py
import openai
REACT_SYSTEM_PROMPT = """你是一个拥有工具的智能代理。你可以通过思考和行动来完成任务。
可用工具:
{tools_description}
你必须严格按照以下格式回复:
Thought: [思考下一步该做什么]
Action: [工具名称]
Action Input: [工具输入参数]
当你获得足够的信息后,使用以下格式给出最终答案:
Thought: [我已经获得了足够的信息]
Final Answer: [最终答案]
注意:
- 每次只能调用一个工具
- 仔细分析任务需要什么信息,选择最合适的工具
- 如果工具返回错误,尝试换一种方式
- 不要编造信息,只基于工具返回的结果回答
"""
def react_execution(task: str, context: str, registry, max_steps: int = 5) -> str:
"""ReAct 风格的任务执行"""
tools_desc = registry.get_descriptions()
messages = [
{"role": "system", "content": REACT_SYSTEM_PROMPT.format(tools_description=tools_desc)},
{"role": "user", "content": f"任务: {task}\n\n已有上下文:\n{context}"}
]
for step in range(max_steps):
response = openai.ChatCompletion.create(
model="gpt-4",
messages=messages,
temperature=0,
max_tokens=1000
)
reply = response.choices[0].message.content
messages.append({"role": "assistant", "content": reply})
# 检查是否给出最终答案
if "Final Answer:" in reply:
final_answer = reply.split("Final Answer:")[-1].strip()
return final_answer
# 解析工具调用
try:
action_line = [l for l in reply.split('\n') if l.startswith('Action:')][0]
input_line = [l for l in reply.split('\n') if l.startswith('Action Input:')][0]
tool_name = action_line.replace('Action:', '').strip()
tool_input = input_line.replace('Action Input:', '').strip()
# 调用工具
tool = registry.get(tool_name)
if tool:
observation = tool.run(tool_input)
else:
observation = f"[错误] 工具 '{tool_name}' 不存在"
# 将观察结果加入对话
messages.append({
"role": "user",
"content": f"Observation: {observation}"
})
except (IndexError, ValueError) as e:
messages.append({
"role": "user",
"content": f"Observation: [格式错误] 请严格按照 Thought/Action/Action Input 格式回复"
})
return "[达到最大步数限制] 任务未完成"
1.5 集成到 BabyAGI 主循环
# 替换上篇的 execution_agent 函数
def execution_agent_with_tools(task: str, context: str, registry) -> str:
"""带工具的执行代理"""
return react_execution(task, context, registry)
# 主循环中替换调用
# result = execution_agent(task, context) # 旧版
# result = execution_agent_with_tools(task, context, registry) # 新版
二、记忆系统优化:从 Pinecone 到 Chroma 本地化
2.1 原版记忆系统的问题
原版 BabyAGI 使用 Pinecone 作为向量数据库,存在三个痛点:
| 问题 | 影响 |
|---|---|
| 云端依赖 | 必须联网,断网即瘫痪 |
| 费用累积 | Pinecone 按查询计费,长期运行成本高 |
| 数据隐私 | 向量数据存在第三方服务器,敏感场景不可用 |
2.2 Chroma 本地化方案

Chroma 是一个轻量级本地向量数据库,核心优势:
- 零费用:完全本地运行,无 API 调用成本
- 低延迟:本地查询 < 5ms,比 Pinecone 快 20-60 倍
- 离线可用:不需要网络连接
- 数据自控:向量数据完全在本机
2.3 Chroma 记忆系统实现
# memory.py
import chromadb
from chromadb.utils import embedding_functions
class ChromaMemory:
"""基于 Chroma 的本地向量记忆系统"""
def __init__(self, collection_name: str = "babyagi_memory",
persist_dir: str = "./chroma_db"):
# 初始化 Chroma 客户端(本地持久化)
self.client = chromadb.PersistentClient(path=persist_dir)
# 使用本地 Embedding 模型(无需 API Key)
# 首次运行会自动下载模型(约 420MB)
self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2" # 轻量级,速度快
)
# 获取或创建集合
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=self.embed_fn,
metadata={"hnsw:space": "cosine"} # 使用余弦相似度
)
print(f" ✅ Chroma 记忆系统初始化完成 (集合: {collection_name})")
print(f" 📊 已存储 {self.collection.count()} 条记录")
def add(self, task_id: str, task: str, result: str):
"""添加一条执行记录"""
doc = f"任务: {task}\n结果: {result}"
self.collection.upsert(
documents=[doc],
ids=[f"task_{task_id}"],
metadatas=[{"task_id": task_id, "task": task}]
)
def query(self, query_text: str, n_results: int = 5) -> str:
"""语义检索相关记忆"""
if self.collection.count() == 0:
return ""
results = self.collection.query(
query_texts=[query_text],
n_results=min(n_results, self.collection.count())
)
context = ""
for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
context += f"[任务{meta['task_id']}] {doc}\n---\n"
return context
def clear(self):
"""清空记忆"""
self.client.delete_collection(self.collection.name)
self.collection = self.client.get_or_create_collection(
name=self.collection.name,
embedding_function=self.embed_fn
)
2.4 本地 Embedding 方案对比
| 方案 | 模型大小 | 延迟 | 质量 | 是否需要网络 |
|---|---|---|---|---|
OpenAI text-embedding-3-small |
云端 | 100-300ms | ★★★★★ | 是 |
all-MiniLM-L6-v2 |
420MB | < 5ms | ★★★★ | 否(首次下载需网络) |
all-mpnet-base-v2 |
1.1GB | < 10ms | ★★★★★ | 否 |
bge-small-zh-v1.5 |
390MB | < 5ms | ★★★★(中文优) | 否 |
推荐:英文场景用 all-MiniLM-L6-v2,中文场景用 bge-small-zh-v1.5。
2.5 集成到 BabyAGI
# 替换上篇的 Pinecone 相关代码
memory = ChromaMemory()
# 在 execution_agent 中检索上下文
context = memory.query(task, n_results=5)
# 执行完成后存储结果
memory.add(task_id=str(task_id), task=task, result=result)
三、实战案例一:自动化竞品分析
3.1 场景描述
目标:自动分析三个 AI 编程助手(Cursor、Windsurf、Copilot)的竞品对比,生成结构化报告。
3.2 完整代码
# case_competitive_analysis.py
"""BabyAGI 实战:自动化竞品分析"""
import openai
from tools import registry # 上面的工具注册表
from memory import ChromaMemory
from datetime import datetime
class CompetitiveAnalysisAgent:
def __init__(self, objective: str):
self.objective = objective
self.memory = ChromaMemory("competitive_analysis")
self.task_list = []
self.completed = []
self.task_id_counter = 0
def add_task(self, task: str, priority: int = 0):
self.task_id_counter += 1
self.task_list.append({
"id": self.task_id_counter,
"task": task,
"priority": priority
})
self.task_list.sort(key=lambda x: x["priority"], reverse=True)
def execute_task(self, task: str) -> str:
"""使用 ReAct + 工具执行任务"""
context = self.memory.query(task, n_results=3)
return react_execution(task, context, registry, max_steps=6)
def create_subtasks(self, result: str):
"""基于执行结果创建后续任务"""
prompt = f"""基于以下执行结果,创建后续需要完成的子任务。
目标: {self.objective}
已完成任务: {self.completed[-1] if self.completed else '无'}
执行结果: {result[:1000]}
请列出最多3个必要的后续子任务,每个一行,格式: 优先级(0-9) | 任务描述
不要重复已完成的任务。"""
response = openai.ChatCompletion.create(
model="gpt-4", messages=[{"role": "user", "content": prompt}],
temperature=0, max_tokens=300
)
for line in response.choices[0].message.content.strip().split('\n'):
line = line.strip()
if '|' in line:
parts = line.split('|', 1)
try:
priority = int(parts[0].strip())
task = parts[1].strip()
if task not in [t["task"] for t in self.task_list] + self.completed:
self.add_task(task, priority)
except ValueError:
pass
def run(self, max_iterations: int = 10):
"""运行竞品分析"""
print(f"\n{'='*60}")
print(f"🎯 目标: {self.objective}")
print(f"{'='*60}\n")
# 初始任务
self.add_task("搜索 Cursor AI 编程助手的核心功能、定价和用户评价", 9)
self.add_task("搜索 Windsurf (Codeium) 编程助手的核心功能、定价和用户评价", 9)
self.add_task("搜索 GitHub Copilot 的核心功能、定价和用户评价", 9)
for i in range(max_iterations):
if not self.task_list:
print("\n✅ 所有任务已完成!")
break
task = self.task_list.pop(0)
print(f"\n--- 迭代 {i+1} ---")
print(f"📋 执行: {task['task']}")
result = self.execute_task(task['task'])
print(f"✅ 结果: {result[:200]}...")
# 存入记忆
self.memory.add(str(task['id']), task['task'], result)
self.completed.append(task['task'])
# 创建后续任务
self.create_subtasks(result)
# 生成最终报告
self.generate_report()
def generate_report(self):
"""生成竞品分析报告"""
print(f"\n{'='*60}")
print("📝 生成竞品分析报告...")
all_context = self.memory.query(self.objective, n_results=20)
prompt = f"""基于以下调研结果,撰写一份结构化的竞品分析报告。
目标: {self.objective}
调研数据:
{all_context}
报告要求:
1. 概述:三款产品的定位和核心差异
2. 功能对比表:核心功能逐项对比
3. 定价对比:各套餐价格和性价比
4. 用户评价总结:优缺点汇总
5. 结论和建议
请用 Markdown 格式输出。"""
response = openai.ChatCompletion.create(
model="gpt-4", messages=[{"role": "user", "content": prompt}],
temperature=0.3, max_tokens=3000
)
report = response.choices[0].message.content
# 保存报告
registry.get("file_write").run(
"competitive_analysis_report.md", report
)
print("📊 报告已保存到 babyagi_output/competitive_analysis_report.md")
print(f"{'='*60}")
# 运行
if __name__ == "__main__":
agent = CompetitiveAnalysisAgent(
objective="对比分析 Cursor、Windsurf、GitHub Copilot 三款 AI 编程助手,生成竞品报告"
)
agent.run(max_iterations=12)
3.3 运行效果
============================================================
🎯 目标: 对比分析 Cursor、Windsurf、GitHub Copilot 三款 AI 编程助手
============================================================
--- 迭代 1 ---
📋 执行: 搜索 Cursor AI 编程助手的核心功能、定价和用户评价
Thought: 我需要搜索 Cursor 的信息
Action: web_search
Action Input: Cursor AI code editor features pricing review 2025
Observation: [搜索结果...]
✅ 结果: Cursor 是一款 AI 优先的代码编辑器...
--- 迭代 2 ---
📋 执行: 搜索 Windsurf 编程助手的核心功能、定价和用户评价
...
--- 迭代 8 ---
📋 执行: 将对比结果整理为结构化表格
Thought: 我需要将数据整理成表格
Action: code_execute
Action Input: import json\n... 格式化代码 ...
Observation: 表格生成完成
✅ 结果: 对比表格已生成...
✅ 所有任务已完成!
📊 报告已保存到 babyagi_output/competitive_analysis_report.md
四、实战案例二:自动化代码审查
4.1 场景描述
目标:自动审查指定目录下的 Python 代码,检查代码质量、安全漏洞、性能问题,生成审查报告。
4.2 核心代码
# case_code_review.py
"""BabyAGI 实战:自动化代码审查"""
import os
import openai
from tools import registry
from memory import ChromaMemory
class CodeReviewAgent:
def __init__(self, target_dir: str):
self.target_dir = target_dir
self.memory = ChromaMemory("code_review")
self.issues = []
def scan_files(self) -> list:
"""扫描目标目录下的 Python 文件"""
py_files = []
for root, dirs, files in os.walk(self.target_dir):
# 跳过虚拟环境和缓存
dirs[:] = [d for d in dirs if d not in ['venv', '__pycache__', '.git', 'node_modules']]
for f in files:
if f.endswith('.py'):
py_files.append(os.path.join(root, f))
return py_files
def review_file(self, filepath: str) -> str:
"""审查单个文件"""
# 读取文件内容
code = registry.get("file_read").run(filepath)
prompt = f"""请审查以下 Python 代码,从三个维度分析:
1. **代码质量**:命名规范、函数设计、代码重复、可读性
2. **安全问题**:SQL注入、硬编码密钥、不安全的函数调用
3. **性能问题**:N+1查询、不必要的循环、内存泄漏风险
文件: {filepath}
```python
{code}
请按以下格式输出:
-
[严重/警告/建议] 问题描述 | 行号范围 | 修复建议"“”
response = openai.ChatCompletion.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0, max_tokens=2000 ) return response.choices[0].message.contentdef run(self):
“”“执行代码审查”“”
print(f"\n🔍 扫描目录: {self.target_dir}“)
files = self.scan_files()
print(f"📁 发现 {len(files)} 个 Python 文件\n”)all_results = {} for i, filepath in enumerate(files): print(f"[{i+1}/{len(files)}] 审查: {filepath}") result = self.review_file(filepath) all_results[filepath] = result # 存入记忆 self.memory.add(str(i), filepath, result) # 统计问题 for line in result.split('\n'): if line.strip().startswith('- ['): self.issues.append({ "file": filepath, "issue": line.strip() }) # 生成审查报告 self.generate_report(all_results)def generate_report(self, results: dict):
“”“生成审查报告”“”
# 按严重程度分类
severe = [i for i in self.issues if ‘[严重]’ in i[‘issue’]]
warnings = [i for i in self.issues if ‘[警告]’ in i[‘issue’]]
suggestions = [i for i in self.issues if ‘[建议]’ in i[‘issue’]]report = f"""# 代码审查报告
概览
- 审查文件数: {len(results)}
- 严重问题: {len(severe)}
- 警告: {len(warnings)}
- 建议: {len(suggestions)}
严重问题
“”"
for issue in severe:
report += f"- {issue[‘file’]}: {issue[‘issue’]}\n"
report += "\n## 警告\n"
for issue in warnings:
report += f"- **{issue['file']}**: {issue['issue']}\n"
report += "\n## 改进建议\n"
for issue in suggestions:
report += f"- **{issue['file']}**: {issue['issue']}\n"
registry.get("file_write").run("code_review_report.md", report)
print(f"\n📊 审查报告已保存到 babyagi_output/code_review_report.md")
print(f" 严重: {len(severe)} | 警告: {len(warnings)} | 建议: {len(suggestions)}")
运行
if name == “main”:
agent = CodeReviewAgent(target_dir=“./src”)
agent.run()
---
## 五、实战案例三:数据分析自动化
### 5.1 场景描述
目标:**自动分析 CSV 数据集,生成统计摘要、趋势分析、可视化图表。**
### 5.2 核心代码
```python
# case_data_analysis.py
"""BabyAGI 实战:数据分析自动化"""
import openai
from tools import registry
from memory import ChromaMemory
class DataAnalysisAgent:
def __init__(self, data_path: str, question: str):
self.data_path = data_path
self.question = question
self.memory = ChromaMemory("data_analysis")
self.findings = []
def explore_data(self) -> str:
"""探索性数据分析"""
code = f"""
import pandas as pd
df = pd.read_csv('{self.data_path}')
print("=== 数据概览 ===")
print(f"行数: {{len(df)}}, 列数: {{len(df.columns)}}")
print(f"\\n列名: {{list(df.columns)}}")
print(f"\\n数据类型:\\n{{df.dtypes}}")
print(f"\\n缺失值:\\n{{df.isnull().sum()}}")
print(f"\\n数值列统计:\\n{{df.describe()}}")
print(f"\\n前5行:\\n{{df.head()}}")
"""
return registry.get("code_execute").run(code)
def analyze(self, finding: str) -> str:
"""基于发现进行深入分析"""
context = self.memory.query(finding, n_results=3)
prompt = f"""基于以下数据探索结果,编写 Python 分析代码。
数据文件: {self.data_path}
分析目标: {finding}
已有发现: {context}
要求:
1. 使用 pandas 读取数据
2. 用 print() 输出分析结果
3. 如果需要可视化,保存图片到 ./babyagi_output/
4. 代码必须能独立运行"""
response = openai.ChatCompletion.create(
model="gpt-4", messages=[{"role": "user", "content": prompt}],
temperature=0, max_tokens=1500
)
code = response.choices[0].message.content
# 提取代码块
if '```python' in code:
code = code.split('```python')[1].split('```')[0]
elif '```' in code:
code = code.split('```')[1].split('```')[0]
return registry.get("code_execute").run(code)
def run(self, max_iterations: int = 8):
"""运行数据分析"""
print(f"\n📊 数据: {self.data_path}")
print(f"❓ 问题: {self.question}\n")
# Step 1: 探索数据
print("Step 1: 探索性数据分析...")
exploration = self.explore_data()
self.memory.add("0", "数据探索", exploration)
print(f" ✅ 数据概览完成\n")
# Step 2: 生成分析子任务
prompt = f"""基于以下数据概览,列出需要进行的分析步骤。
数据概览:
{exploration[:2000]}
分析目标: {self.question}
请列出最多5个分析步骤,每个一行。"""
response = openai.ChatCompletion.create(
model="gpt-4", messages=[{"role": "user", "content": prompt}],
temperature=0, max_tokens=500
)
tasks = [line.strip().lstrip('0123456789.-) ')
for line in response.choices[0].message.content.strip().split('\n')
if line.strip()]
# Step 3: 逐步执行分析
for i, task in enumerate(tasks[:max_iterations]):
print(f"Step {i+2}: {task}")
result = self.analyze(task)
self.memory.add(str(i+1), task, result)
self.findings.append(f"**{task}**:\n{result[:500]}")
print(f" ✅ 完成\n")
# Step 4: 生成最终报告
self.generate_report()
def generate_report(self):
"""生成分析报告"""
report = f"# 数据分析报告\n\n## 分析目标\n{self.question}\n\n"
report += "## 分析发现\n\n"
for finding in self.findings:
report += f"{finding}\n\n---\n\n"
registry.get("file_write").run("data_analysis_report.md", report)
print(f"📊 分析报告已保存到 babyagi_output/data_analysis_report.md")
# 运行
if __name__ == "__main__":
agent = DataAnalysisAgent(
data_path="./data/sales.csv",
question="分析销售趋势,找出增长最快的产品类别和季节性规律"
)
agent.run()
六、LangChain 集成:构建生产级 Agent Pipeline
6.1 为什么需要 LangChain?
手写的 BabyAGI 有三个生产级痛点:
| 痛点 | 手写方案 | LangChain 方案 |
|---|---|---|
| 错误恢复 | 工具调用失败就卡住 | 自动重试 + 降级策略 |
| 流程编排 | 硬编码循环 | 声明式 Chain / Graph |
| 可观测性 | print 大法 | LangSmith 追踪 + 日志 |
6.2 生产级 Pipeline 架构

6.3 LangChain + BabyAGI 实现
# langchain_pipeline.py
"""生产级 Agent Pipeline:LangChain + BabyAGI"""
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import Tool
from langchain import hub
from langchain_community.utilities import SerpAPIWrapper
from langchain_community.tools import PythonREPLTool
from langchain_chroma import Chroma as LangChainChroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
import os
# ============ 1. 初始化 LLM ============
llm = ChatOpenAI(
model="gpt-4",
temperature=0,
max_retries=3, # 自动重试
request_timeout=60
)
# ============ 2. 定义工具 ============
# 搜索工具
search = SerpAPIWrapper()
search_tool = Tool(
name="web_search",
description="搜索网页获取最新信息",
func=search.run
)
# 代码执行工具
python_tool = Tool(
name="python_repl",
description="执行 Python 代码,用于数据分析和计算",
func=PythonREPLTool().run
)
# 文件写入工具
def write_file(input_str: str) -> str:
"""格式: 'filename|||content'"""
parts = input_str.split("|||", 1)
if len(parts) != 2:
return "格式错误,请使用: filename|||content"
filename, content = parts
os.makedirs("./babyagi_output", exist_ok=True)
filepath = os.path.join("./babyagi_output", os.path.basename(filename))
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return f"文件已保存: {filepath}"
file_tool = Tool(
name="file_write",
description="将内容写入文件,输入格式: filename|||content",
func=write_file
)
tools = [search_tool, python_tool, file_tool]
# ============ 3. 创建 ReAct Agent ============
prompt = hub.pull("hwchase17/react")
prompt = prompt.partial(
tools="\n".join([f"{t.name}: {t.description}" for t in tools]),
tool_names=", ".join([t.name for t in tools])
)
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=10, # 最大迭代次数
handle_parsing_errors=True, # 自动处理解析错误
max_execution_time=300 # 5分钟超时
)
# ============ 4. BabyAGI 编排层 ============
class ProductionBabyAGI:
"""生产级 BabyAGI:LangChain Agent + 任务编排"""
def __init__(self, objective: str):
self.objective = objective
self.task_queue = []
self.results = []
self.completed_tasks = set()
def add_task(self, task: str, priority: int = 5):
if task not in self.completed_tasks:
self.task_queue.append({"task": task, "priority": priority})
self.task_queue.sort(key=lambda x: x["priority"], reverse=True)
def execute_task(self, task: str) -> str:
"""使用 LangChain AgentExecutor 执行任务"""
try:
result = agent_executor.invoke({
"input": f"目标: {self.objective}\n当前任务: {task}"
})
return result.get("output", "执行完成但无输出")
except Exception as e:
return f"[执行失败] {str(e)}"
def generate_subtasks(self, task: str, result: str):
"""基于执行结果生成后续任务"""
prompt = PromptTemplate.from_template(
"""基于以下信息,生成后续子任务。
目标: {objective}
已完成: {task}
结果摘要: {result}
列出最多3个必要的后续子任务,格式: 优先级(0-9) | 任务描述"""
)
chain = prompt | llm | StrOutputParser()
response = chain.invoke({
"objective": self.objective,
"task": task,
"result": result[:1000]
})
for line in response.strip().split('\n'):
if '|' in line:
parts = line.split('|', 1)
try:
priority = int(parts[0].strip())
subtask = parts[1].strip()
self.add_task(subtask, priority)
except ValueError:
pass
def run(self, max_iterations: int = 10):
"""运行主循环"""
print(f"\n🎯 目标: {self.objective}\n")
# 初始任务分解
self.add_task(f"分析目标 '{self.objective}',列出需要完成的主要步骤", 9)
for i in range(max_iterations):
if not self.task_queue:
print("✅ 所有任务已完成!")
break
task_item = self.task_queue.pop(0)
task = task_item["task"]
print(f"\n--- 迭代 {i+1} ---")
print(f"📋 任务: {task}")
result = self.execute_task(task)
self.results.append({"task": task, "result": result})
self.completed_tasks.add(task)
print(f"✅ 完成: {result[:150]}...")
self.generate_subtasks(task, result)
return self.results
# ============ 5. 运行 ============
if __name__ == "__main__":
babyagi = ProductionBabyAGI(
objective="调研 2026 年最流行的 Python Web 框架,对比性能和生态,生成技术选型报告"
)
results = babyagi.run(max_iterations=8)
6.4 LangChain 集成的关键优势
| 能力 | 手写 BabyAGI | LangChain BabyAGI |
|---|---|---|
| 工具调用 | 手动解析 Action/Input | AgentExecutor 自动处理 |
| 错误恢复 | 无 | handle_parsing_errors=True |
| 超时控制 | 无 | max_execution_time |
| 重试机制 | 无 | max_retries=3 |
| 可观测性 | LangSmith 追踪 | |
| 流程编排 | while 循环 | LCEL Chain / Graph |
七、生产部署最佳实践
7.1 成本控制
| 策略 | 效果 | 实现方式 |
|---|---|---|
| 限制迭代次数 | 防止无限循环 | max_iterations=10 |
| 使用 GPT-4o-mini | 成本降 90% | 简单任务用 mini,复杂任务用 GPT-4 |
| 缓存 LLM 响应 | 重复任务零成本 | langchain.cache.InMemoryCache |
| 本地 Embedding | 省去 Embedding API 费用 | sentence-transformers |
| Token 预算 | 单任务 token 上限 | max_tokens + 结果截断 |
7.2 安全防护
# 安全配置
SECURITY_CONFIG = {
# 代码执行沙箱
"code_sandbox": {
"forbidden_imports": ["os", "subprocess", "shutil", "sys"],
"max_execution_time": 30, # 秒
"max_memory_mb": 512,
},
# 文件系统
"file_system": {
"allowed_dirs": ["./babyagi_output"],
"max_file_size_mb": 10,
},
# 网络请求
"network": {
"allowed_domains": ["*"], # 生产环境应限制
"request_timeout": 15,
"max_requests_per_task": 10,
},
# LLM 调用
"llm": {
"max_tokens_per_call": 2000,
"max_calls_per_task": 5,
"temperature": 0, # 降低随机性
}
}
7.3 监控与日志
# 监控配置
import logging
import time
class BabyAGIMonitor:
"""BabyAGI 运行监控"""
def __init__(self):
self.metrics = {
"total_tasks": 0,
"completed_tasks": 0,
"failed_tasks": 0,
"total_llm_calls": 0,
"total_tokens": 0,
"total_cost": 0.0,
"start_time": time.time(),
}
logging.basicConfig(
filename='babyagi.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def record_task(self, success: bool, tokens: int = 0, cost: float = 0.0):
self.metrics["total_tasks"] += 1
if success:
self.metrics["completed_tasks"] += 1
else:
self.metrics["failed_tasks"] += 1
self.metrics["total_llm_calls"] += 1
self.metrics["total_tokens"] += tokens
self.metrics["total_cost"] += cost
def summary(self) -> str:
elapsed = time.time() - self.metrics["start_time"]
return f"""运行摘要:
- 任务完成率: {self.metrics['completed_tasks']}/{self.metrics['total_tasks']}
- LLM 调用次数: {self.metrics['total_llm_calls']}
- Token 消耗: {self.metrics['total_tokens']:,}
- 预估费用: ${self.metrics['total_cost']:.2f}
- 运行时长: {elapsed:.1f}s"""
八、面试高频问题
Q1:BabyAGI 的工具调用和 Function Calling 有什么区别?
BabyAGI 的工具调用基于 ReAct 范式(Thought → Action → Observation 循环),LLM 通过文本推理决定调用什么工具。Function Calling 是 OpenAI 原生支持的能力,LLM 直接输出结构化的工具调用 JSON,不需要文本解析。Function Calling 更可靠、更快,但 ReAct 更灵活、不依赖特定模型。
Q2:Chroma 和 Pinecone 怎么选?
开发阶段用 Chroma(零成本、低延迟、离线可用),生产阶段看需求:数据量 < 100 万且单机部署用 Chroma;数据量 > 100 万或需要分布式用 Pinecone/Milvus。核心指标是数据规模和是否需要云端托管。
Q3:怎么防止 BabyAGI 任务无限膨胀?
四道防线:(1) max_iterations 硬限制迭代次数;(2) Prompt 加"只创建必要的子任务,最多 3 个";(3) 去重检查——新任务与已有任务语义相似度 > 0.9 则跳过;(4) 任务预算——设定最大任务总数,超出后只执行不创建。
Q4:ReAct 和 Plan-and-Execute 有什么区别?
ReAct 是逐步推理+行动,每一步都重新思考下一步做什么,灵活但可能走弯路。Plan-and-Execute 是先规划再执行,一次性生成完整计划然后逐步执行,效率高但不够灵活。BabyAGI 原版更接近 Plan-and-Execute(先创建任务列表再执行),工具增强版更接近 ReAct。
Q5:生产环境怎么保证 Agent 的输出质量?
三层保障:(1) 质量门控——Review Agent 对输出评分,低于阈值则重做;(2) 人工审核——关键决策点插入人类确认(Human-in-the-loop);(3) 版本追踪——LangSmith 记录每步的输入输出,出问题可回溯。
九、总结
BabyAGI 系列上下两篇,覆盖了从原理到生产的完整链路:
| 模块 | 上篇 | 下篇 |
|---|---|---|
| 架构原理 | 三大代理 + 核心循环 | — |
| 源码拆解 | 逐行解析原版代码 | — |
| Mini BabyAGI | 80 行手写实现 | — |
| 框架对比 | vs AutoGPT/CrewAI/LangGraph | — |
| 工具集成 | — | ReAct + ToolRegistry + 5 种工具 |
| 记忆优化 | — | Chroma 本地化 + Embedding 对比 |
| 实战案例 | — | 竞品分析 / 代码审查 / 数据分析 |
| LangChain | — | AgentExecutor + 生产级 Pipeline |
| 部署 | — | 成本控制 + 安全防护 + 监控日志 |
一句话总结 BabyAGI 的进化路径:
上篇的 BabyAGI 是一个"会思考的大脑"——能拆任务、排优先级、循环执行。下篇给它装上了"手脚"(工具)、“记忆”(Chroma)、“纪律”(LangChain 编排),从一个有趣的实验变成了一个能干活的系统。
如果觉得这个系列对你有帮助,欢迎点赞 + 收藏 + 关注,你的支持是我持续创作的动力!有问题欢迎在评论区交流~
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)