【BabyAGI 全解析 · 下】工具集成 / 记忆优化 / 实战案例 / LangChain 生产级 Pipeline

导语:上篇我们拆解了 BabyAGI 的核心架构,手写了一个 80 行的 Mini BabyAGI。但那个版本只能"思考"不能"行动"——它没有手,不能搜索网页、不能执行代码、不能读写文件。这篇是 BabyAGI 系列的下篇,重点解决三个问题:怎么给 Agent 装上工具?怎么优化记忆系统?怎么从玩具变成生产级系统? 三个完整实战案例 + LangChain 集成方案,学完这篇你就能把 BabyAGI 真正用到项目里。


在这里插入图片描述

一、工具集成:让 Agent 拥有"手脚"

1.1 为什么需要工具?

上篇的 Mini BabyAGI 有一个致命问题:Execution Agent 只能调用 LLM 生成文本,不能和外部世界交互。它像一个只有大脑没有手脚的人——能想但不能做。

现实中的 Agent 需要:

能力 工具 场景
获取信息 网页搜索、API 调用 调研、竞品分析
执行逻辑 Python REPL、代码沙箱 数据计算、算法验证
持久化 文件读写、数据库 保存报告、存储结果
通信 邮件、Slack、Webhook 通知、协作
浏览 Playwright、Selenium 网页抓取、自动化测试

1.2 工具集成架构

在这里插入图片描述

核心思路:Execution Agent 不再直接返回文本,而是先分析任务需要什么工具,调用工具获取结果,再基于结果生成最终输出。

这就是 ReAct(Reasoning + Acting)范式的核心:

ReAct Loop = Thought → Action → Observation → Thought → ⋯ \text{ReAct Loop} = \text{Thought} \rightarrow \text{Action} \rightarrow \text{Observation} \rightarrow \text{Thought} \rightarrow \cdots ReAct Loop=ThoughtActionObservationThought

  • Thought:LLM 思考下一步该做什么
  • Action:调用工具执行操作
  • Observation:获取工具返回的结果
  • 循环直到 LLM 认为任务完成

1.3 工具注册与调度系统

# tools.py
from typing import Callable, List, Dict, Any
import json

class Tool:
    """工具基类"""
    def __init__(self, name: str, description: str, func: Callable):
        self.name = name
        self.description = description
        self.func = func
    
    def run(self, *args, **kwargs) -> str:
        try:
            result = self.func(*args, **kwargs)
            return str(result)
        except Exception as e:
            return f"[工具执行错误] {type(e).__name__}: {str(e)}"

class ToolRegistry:
    """工具注册表 - 管理所有可用工具"""
    def __init__(self):
        self._tools: Dict[str, Tool] = {}
    
    def register(self, name: str, description: str, func: Callable):
        """注册一个工具"""
        self._tools[name] = Tool(name, description, func)
        print(f"  ✅ 工具已注册: {name}")
    
    def get(self, name: str) -> Tool:
        return self._tools.get(name)
    
    def get_descriptions(self) -> str:
        """生成工具描述文本,供 LLM 选择工具时使用"""
        desc = ""
        for name, tool in self._tools.items():
            desc += f"- {name}: {tool.description}\n"
        return desc
    
    def list_tools(self) -> List[str]:
        return list(self._tools.keys())


# ============ 注册具体工具 ============

def web_search(query: str) -> str:
    """网页搜索工具"""
    # 实际项目中替换为 Tavily / SerpAPI / Bing Search
    try:
        import requests
        # 示例:使用 DuckDuckGo 搜索(免费,无需 API Key)
        from duckduckgo_search import DDGS
        results = DDGS().text(query, max_results=5)
        output = ""
        for r in results:
            output += f"标题: {r['title']}\n摘要: {r['body']}\n链接: {r['href']}\n\n"
        return output if output else "未找到相关结果"
    except ImportError:
        return f"[模拟搜索] 关于 '{query}' 的搜索结果:这是一个模拟结果,请安装 duckduckgo-search"

def code_execute(code: str) -> str:
    """Python 代码执行工具(沙箱环境)"""
    import sys
    from io import StringIO
    
    # 安全限制:禁止危险操作
    forbidden = ['import os', 'import subprocess', 'import shutil', 
                 'rm -rf', '__import__', 'eval(', 'exec(']
    for f in forbidden:
        if f in code:
            return f"[安全拦截] 代码包含禁止操作: {f}"
    
    # 捕获输出
    old_stdout = sys.stdout
    sys.stdout = captured = StringIO()
    
    try:
        local_vars = {}
        exec(code, {"__builtins__": {}}, local_vars)
        output = captured.getvalue()
        if local_vars:
            output += f"\n变量: {list(local_vars.keys())}"
        return output if output else "代码执行成功(无输出)"
    except Exception as e:
        return f"[执行错误] {type(e).__name__}: {str(e)}"
    finally:
        sys.stdout = old_stdout

def file_write(filename: str, content: str) -> str:
    """文件写入工具"""
    # 安全限制:只允许写入指定目录
    safe_dir = "./babyagi_output"
    import os
    os.makedirs(safe_dir, exist_ok=True)
    filepath = os.path.join(safe_dir, os.path.basename(filename))
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(content)
    return f"文件已写入: {filepath} ({len(content)} 字符)"

def file_read(filename: str) -> str:
    """文件读取工具"""
    import os
    safe_dir = "./babyagi_output"
    filepath = os.path.join(safe_dir, os.path.basename(filename))
    if not os.path.exists(filepath):
        return f"[错误] 文件不存在: {filepath}"
    with open(filepath, 'r', encoding='utf-8') as f:
        content = f.read()
    return content[:5000]  # 限制读取长度

def http_request(url: str, method: str = "GET") -> str:
    """HTTP 请求工具"""
    import requests
    try:
        resp = requests.request(method, url, timeout=10)
        return f"状态码: {resp.status_code}\n内容: {resp.text[:2000]}"
    except Exception as e:
        return f"[请求错误] {str(e)}"

# 创建全局注册表
registry = ToolRegistry()
registry.register("web_search", "搜索网页获取信息,输入搜索关键词", web_search)
registry.register("code_execute", "执行 Python 代码并返回输出,输入代码字符串", code_execute)
registry.register("file_write", "将内容写入文件,输入文件名和内容", file_write)
registry.register("file_read", "读取文件内容,输入文件名", file_read)
registry.register("http_request", "发送 HTTP 请求,输入 URL 和方法", http_request)

1.4 ReAct 风格的 Execution Agent

# react_agent.py
import openai

REACT_SYSTEM_PROMPT = """你是一个拥有工具的智能代理。你可以通过思考和行动来完成任务。

可用工具:
{tools_description}

你必须严格按照以下格式回复:

Thought: [思考下一步该做什么]
Action: [工具名称]
Action Input: [工具输入参数]

当你获得足够的信息后,使用以下格式给出最终答案:

Thought: [我已经获得了足够的信息]
Final Answer: [最终答案]

注意:
- 每次只能调用一个工具
- 仔细分析任务需要什么信息,选择最合适的工具
- 如果工具返回错误,尝试换一种方式
- 不要编造信息,只基于工具返回的结果回答
"""

def react_execution(task: str, context: str, registry, max_steps: int = 5) -> str:
    """ReAct 风格的任务执行"""
    tools_desc = registry.get_descriptions()
    messages = [
        {"role": "system", "content": REACT_SYSTEM_PROMPT.format(tools_description=tools_desc)},
        {"role": "user", "content": f"任务: {task}\n\n已有上下文:\n{context}"}
    ]
    
    for step in range(max_steps):
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            temperature=0,
            max_tokens=1000
        )
        
        reply = response.choices[0].message.content
        messages.append({"role": "assistant", "content": reply})
        
        # 检查是否给出最终答案
        if "Final Answer:" in reply:
            final_answer = reply.split("Final Answer:")[-1].strip()
            return final_answer
        
        # 解析工具调用
        try:
            action_line = [l for l in reply.split('\n') if l.startswith('Action:')][0]
            input_line = [l for l in reply.split('\n') if l.startswith('Action Input:')][0]
            
            tool_name = action_line.replace('Action:', '').strip()
            tool_input = input_line.replace('Action Input:', '').strip()
            
            # 调用工具
            tool = registry.get(tool_name)
            if tool:
                observation = tool.run(tool_input)
            else:
                observation = f"[错误] 工具 '{tool_name}' 不存在"
            
            # 将观察结果加入对话
            messages.append({
                "role": "user", 
                "content": f"Observation: {observation}"
            })
            
        except (IndexError, ValueError) as e:
            messages.append({
                "role": "user",
                "content": f"Observation: [格式错误] 请严格按照 Thought/Action/Action Input 格式回复"
            })
    
    return "[达到最大步数限制] 任务未完成"

1.5 集成到 BabyAGI 主循环

# 替换上篇的 execution_agent 函数
def execution_agent_with_tools(task: str, context: str, registry) -> str:
    """带工具的执行代理"""
    return react_execution(task, context, registry)

# 主循环中替换调用
# result = execution_agent(task, context)  # 旧版
# result = execution_agent_with_tools(task, context, registry)  # 新版

二、记忆系统优化:从 Pinecone 到 Chroma 本地化

2.1 原版记忆系统的问题

原版 BabyAGI 使用 Pinecone 作为向量数据库,存在三个痛点:

问题 影响
云端依赖 必须联网,断网即瘫痪
费用累积 Pinecone 按查询计费,长期运行成本高
数据隐私 向量数据存在第三方服务器,敏感场景不可用

2.2 Chroma 本地化方案

在这里插入图片描述

Chroma 是一个轻量级本地向量数据库,核心优势:

  • 零费用:完全本地运行,无 API 调用成本
  • 低延迟:本地查询 < 5ms,比 Pinecone 快 20-60 倍
  • 离线可用:不需要网络连接
  • 数据自控:向量数据完全在本机

2.3 Chroma 记忆系统实现

# memory.py
import chromadb
from chromadb.utils import embedding_functions

class ChromaMemory:
    """基于 Chroma 的本地向量记忆系统"""
    
    def __init__(self, collection_name: str = "babyagi_memory", 
                 persist_dir: str = "./chroma_db"):
        # 初始化 Chroma 客户端(本地持久化)
        self.client = chromadb.PersistentClient(path=persist_dir)
        
        # 使用本地 Embedding 模型(无需 API Key)
        # 首次运行会自动下载模型(约 420MB)
        self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name="all-MiniLM-L6-v2"  # 轻量级,速度快
        )
        
        # 获取或创建集合
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=self.embed_fn,
            metadata={"hnsw:space": "cosine"}  # 使用余弦相似度
        )
        
        print(f"  ✅ Chroma 记忆系统初始化完成 (集合: {collection_name})")
        print(f"  📊 已存储 {self.collection.count()} 条记录")
    
    def add(self, task_id: str, task: str, result: str):
        """添加一条执行记录"""
        doc = f"任务: {task}\n结果: {result}"
        self.collection.upsert(
            documents=[doc],
            ids=[f"task_{task_id}"],
            metadatas=[{"task_id": task_id, "task": task}]
        )
    
    def query(self, query_text: str, n_results: int = 5) -> str:
        """语义检索相关记忆"""
        if self.collection.count() == 0:
            return ""
        
        results = self.collection.query(
            query_texts=[query_text],
            n_results=min(n_results, self.collection.count())
        )
        
        context = ""
        for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
            context += f"[任务{meta['task_id']}] {doc}\n---\n"
        
        return context
    
    def clear(self):
        """清空记忆"""
        self.client.delete_collection(self.collection.name)
        self.collection = self.client.get_or_create_collection(
            name=self.collection.name,
            embedding_function=self.embed_fn
        )

2.4 本地 Embedding 方案对比

方案 模型大小 延迟 质量 是否需要网络
OpenAI text-embedding-3-small 云端 100-300ms ★★★★★
all-MiniLM-L6-v2 420MB < 5ms ★★★★ 否(首次下载需网络)
all-mpnet-base-v2 1.1GB < 10ms ★★★★★
bge-small-zh-v1.5 390MB < 5ms ★★★★(中文优)

推荐:英文场景用 all-MiniLM-L6-v2,中文场景用 bge-small-zh-v1.5

2.5 集成到 BabyAGI

# 替换上篇的 Pinecone 相关代码
memory = ChromaMemory()

# 在 execution_agent 中检索上下文
context = memory.query(task, n_results=5)

# 执行完成后存储结果
memory.add(task_id=str(task_id), task=task, result=result)

三、实战案例一:自动化竞品分析

3.1 场景描述

目标:自动分析三个 AI 编程助手(Cursor、Windsurf、Copilot)的竞品对比,生成结构化报告。

3.2 完整代码

# case_competitive_analysis.py
"""BabyAGI 实战:自动化竞品分析"""

import openai
from tools import registry  # 上面的工具注册表
from memory import ChromaMemory
from datetime import datetime

class CompetitiveAnalysisAgent:
    def __init__(self, objective: str):
        self.objective = objective
        self.memory = ChromaMemory("competitive_analysis")
        self.task_list = []
        self.completed = []
        self.task_id_counter = 0
    
    def add_task(self, task: str, priority: int = 0):
        self.task_id_counter += 1
        self.task_list.append({
            "id": self.task_id_counter,
            "task": task,
            "priority": priority
        })
        self.task_list.sort(key=lambda x: x["priority"], reverse=True)
    
    def execute_task(self, task: str) -> str:
        """使用 ReAct + 工具执行任务"""
        context = self.memory.query(task, n_results=3)
        return react_execution(task, context, registry, max_steps=6)
    
    def create_subtasks(self, result: str):
        """基于执行结果创建后续任务"""
        prompt = f"""基于以下执行结果,创建后续需要完成的子任务。

目标: {self.objective}
已完成任务: {self.completed[-1] if self.completed else '无'}
执行结果: {result[:1000]}

请列出最多3个必要的后续子任务,每个一行,格式: 优先级(0-9) | 任务描述
不要重复已完成的任务。"""
        
        response = openai.ChatCompletion.create(
            model="gpt-4", messages=[{"role": "user", "content": prompt}],
            temperature=0, max_tokens=300
        )
        
        for line in response.choices[0].message.content.strip().split('\n'):
            line = line.strip()
            if '|' in line:
                parts = line.split('|', 1)
                try:
                    priority = int(parts[0].strip())
                    task = parts[1].strip()
                    if task not in [t["task"] for t in self.task_list] + self.completed:
                        self.add_task(task, priority)
                except ValueError:
                    pass
    
    def run(self, max_iterations: int = 10):
        """运行竞品分析"""
        print(f"\n{'='*60}")
        print(f"🎯 目标: {self.objective}")
        print(f"{'='*60}\n")
        
        # 初始任务
        self.add_task("搜索 Cursor AI 编程助手的核心功能、定价和用户评价", 9)
        self.add_task("搜索 Windsurf (Codeium) 编程助手的核心功能、定价和用户评价", 9)
        self.add_task("搜索 GitHub Copilot 的核心功能、定价和用户评价", 9)
        
        for i in range(max_iterations):
            if not self.task_list:
                print("\n✅ 所有任务已完成!")
                break
            
            task = self.task_list.pop(0)
            print(f"\n--- 迭代 {i+1} ---")
            print(f"📋 执行: {task['task']}")
            
            result = self.execute_task(task['task'])
            print(f"✅ 结果: {result[:200]}...")
            
            # 存入记忆
            self.memory.add(str(task['id']), task['task'], result)
            self.completed.append(task['task'])
            
            # 创建后续任务
            self.create_subtasks(result)
        
        # 生成最终报告
        self.generate_report()
    
    def generate_report(self):
        """生成竞品分析报告"""
        print(f"\n{'='*60}")
        print("📝 生成竞品分析报告...")
        
        all_context = self.memory.query(self.objective, n_results=20)
        
        prompt = f"""基于以下调研结果,撰写一份结构化的竞品分析报告。

目标: {self.objective}

调研数据:
{all_context}

报告要求:
1. 概述:三款产品的定位和核心差异
2. 功能对比表:核心功能逐项对比
3. 定价对比:各套餐价格和性价比
4. 用户评价总结:优缺点汇总
5. 结论和建议

请用 Markdown 格式输出。"""
        
        response = openai.ChatCompletion.create(
            model="gpt-4", messages=[{"role": "user", "content": prompt}],
            temperature=0.3, max_tokens=3000
        )
        
        report = response.choices[0].message.content
        
        # 保存报告
        registry.get("file_write").run(
            "competitive_analysis_report.md", report
        )
        
        print("📊 报告已保存到 babyagi_output/competitive_analysis_report.md")
        print(f"{'='*60}")


# 运行
if __name__ == "__main__":
    agent = CompetitiveAnalysisAgent(
        objective="对比分析 Cursor、Windsurf、GitHub Copilot 三款 AI 编程助手,生成竞品报告"
    )
    agent.run(max_iterations=12)

3.3 运行效果

============================================================
🎯 目标: 对比分析 Cursor、Windsurf、GitHub Copilot 三款 AI 编程助手
============================================================

--- 迭代 1 ---
📋 执行: 搜索 Cursor AI 编程助手的核心功能、定价和用户评价
Thought: 我需要搜索 Cursor 的信息
Action: web_search
Action Input: Cursor AI code editor features pricing review 2025
Observation: [搜索结果...]
✅ 结果: Cursor 是一款 AI 优先的代码编辑器...

--- 迭代 2 ---
📋 执行: 搜索 Windsurf 编程助手的核心功能、定价和用户评价
...

--- 迭代 8 ---
📋 执行: 将对比结果整理为结构化表格
Thought: 我需要将数据整理成表格
Action: code_execute
Action Input: import json\n... 格式化代码 ...
Observation: 表格生成完成
✅ 结果: 对比表格已生成...

✅ 所有任务已完成!
📊 报告已保存到 babyagi_output/competitive_analysis_report.md

四、实战案例二:自动化代码审查

4.1 场景描述

目标:自动审查指定目录下的 Python 代码,检查代码质量、安全漏洞、性能问题,生成审查报告。

4.2 核心代码

# case_code_review.py
"""BabyAGI 实战:自动化代码审查"""

import os
import openai
from tools import registry
from memory import ChromaMemory

class CodeReviewAgent:
    def __init__(self, target_dir: str):
        self.target_dir = target_dir
        self.memory = ChromaMemory("code_review")
        self.issues = []
    
    def scan_files(self) -> list:
        """扫描目标目录下的 Python 文件"""
        py_files = []
        for root, dirs, files in os.walk(self.target_dir):
            # 跳过虚拟环境和缓存
            dirs[:] = [d for d in dirs if d not in ['venv', '__pycache__', '.git', 'node_modules']]
            for f in files:
                if f.endswith('.py'):
                    py_files.append(os.path.join(root, f))
        return py_files
    
    def review_file(self, filepath: str) -> str:
        """审查单个文件"""
        # 读取文件内容
        code = registry.get("file_read").run(filepath)
        
        prompt = f"""请审查以下 Python 代码,从三个维度分析:

1. **代码质量**:命名规范、函数设计、代码重复、可读性
2. **安全问题**:SQL注入、硬编码密钥、不安全的函数调用
3. **性能问题**:N+1查询、不必要的循环、内存泄漏风险

文件: {filepath}

```python
{code}

请按以下格式输出:

  • [严重/警告/建议] 问题描述 | 行号范围 | 修复建议"“”

      response = openai.ChatCompletion.create(
          model="gpt-4", messages=[{"role": "user", "content": prompt}],
          temperature=0, max_tokens=2000
      )
      
      return response.choices[0].message.content
    

    def run(self):
    “”“执行代码审查”“”
    print(f"\n🔍 扫描目录: {self.target_dir}“)
    files = self.scan_files()
    print(f"📁 发现 {len(files)} 个 Python 文件\n”)

      all_results = {}
      
      for i, filepath in enumerate(files):
          print(f"[{i+1}/{len(files)}] 审查: {filepath}")
          result = self.review_file(filepath)
          all_results[filepath] = result
          
          # 存入记忆
          self.memory.add(str(i), filepath, result)
          
          # 统计问题
          for line in result.split('\n'):
              if line.strip().startswith('- ['):
                  self.issues.append({
                      "file": filepath,
                      "issue": line.strip()
                  })
      
      # 生成审查报告
      self.generate_report(all_results)
    

    def generate_report(self, results: dict):
    “”“生成审查报告”“”
    # 按严重程度分类
    severe = [i for i in self.issues if ‘[严重]’ in i[‘issue’]]
    warnings = [i for i in self.issues if ‘[警告]’ in i[‘issue’]]
    suggestions = [i for i in self.issues if ‘[建议]’ in i[‘issue’]]

      report = f"""# 代码审查报告
    

概览

  • 审查文件数: {len(results)}
  • 严重问题: {len(severe)}
  • 警告: {len(warnings)}
  • 建议: {len(suggestions)}

严重问题

“”"
for issue in severe:
report += f"- {issue[‘file’]}: {issue[‘issue’]}\n"

    report += "\n## 警告\n"
    for issue in warnings:
        report += f"- **{issue['file']}**: {issue['issue']}\n"
    
    report += "\n## 改进建议\n"
    for issue in suggestions:
        report += f"- **{issue['file']}**: {issue['issue']}\n"
    
    registry.get("file_write").run("code_review_report.md", report)
    print(f"\n📊 审查报告已保存到 babyagi_output/code_review_report.md")
    print(f"   严重: {len(severe)} | 警告: {len(warnings)} | 建议: {len(suggestions)}")

运行

if name == “main”:
agent = CodeReviewAgent(target_dir=“./src”)
agent.run()


---

## 五、实战案例三:数据分析自动化

### 5.1 场景描述

目标:**自动分析 CSV 数据集,生成统计摘要、趋势分析、可视化图表。**

### 5.2 核心代码

```python
# case_data_analysis.py
"""BabyAGI 实战:数据分析自动化"""

import openai
from tools import registry
from memory import ChromaMemory

class DataAnalysisAgent:
    def __init__(self, data_path: str, question: str):
        self.data_path = data_path
        self.question = question
        self.memory = ChromaMemory("data_analysis")
        self.findings = []
    
    def explore_data(self) -> str:
        """探索性数据分析"""
        code = f"""
import pandas as pd
df = pd.read_csv('{self.data_path}')
print("=== 数据概览 ===")
print(f"行数: {{len(df)}}, 列数: {{len(df.columns)}}")
print(f"\\n列名: {{list(df.columns)}}")
print(f"\\n数据类型:\\n{{df.dtypes}}")
print(f"\\n缺失值:\\n{{df.isnull().sum()}}")
print(f"\\n数值列统计:\\n{{df.describe()}}")
print(f"\\n前5行:\\n{{df.head()}}")
"""
        return registry.get("code_execute").run(code)
    
    def analyze(self, finding: str) -> str:
        """基于发现进行深入分析"""
        context = self.memory.query(finding, n_results=3)
        
        prompt = f"""基于以下数据探索结果,编写 Python 分析代码。

数据文件: {self.data_path}
分析目标: {finding}
已有发现: {context}

要求:
1. 使用 pandas 读取数据
2. 用 print() 输出分析结果
3. 如果需要可视化,保存图片到 ./babyagi_output/
4. 代码必须能独立运行"""
        
        response = openai.ChatCompletion.create(
            model="gpt-4", messages=[{"role": "user", "content": prompt}],
            temperature=0, max_tokens=1500
        )
        
        code = response.choices[0].message.content
        # 提取代码块
        if '```python' in code:
            code = code.split('```python')[1].split('```')[0]
        elif '```' in code:
            code = code.split('```')[1].split('```')[0]
        
        return registry.get("code_execute").run(code)
    
    def run(self, max_iterations: int = 8):
        """运行数据分析"""
        print(f"\n📊 数据: {self.data_path}")
        print(f"❓ 问题: {self.question}\n")
        
        # Step 1: 探索数据
        print("Step 1: 探索性数据分析...")
        exploration = self.explore_data()
        self.memory.add("0", "数据探索", exploration)
        print(f"  ✅ 数据概览完成\n")
        
        # Step 2: 生成分析子任务
        prompt = f"""基于以下数据概览,列出需要进行的分析步骤。

数据概览:
{exploration[:2000]}

分析目标: {self.question}

请列出最多5个分析步骤,每个一行。"""
        
        response = openai.ChatCompletion.create(
            model="gpt-4", messages=[{"role": "user", "content": prompt}],
            temperature=0, max_tokens=500
        )
        
        tasks = [line.strip().lstrip('0123456789.-) ') 
                 for line in response.choices[0].message.content.strip().split('\n')
                 if line.strip()]
        
        # Step 3: 逐步执行分析
        for i, task in enumerate(tasks[:max_iterations]):
            print(f"Step {i+2}: {task}")
            result = self.analyze(task)
            self.memory.add(str(i+1), task, result)
            self.findings.append(f"**{task}**:\n{result[:500]}")
            print(f"  ✅ 完成\n")
        
        # Step 4: 生成最终报告
        self.generate_report()
    
    def generate_report(self):
        """生成分析报告"""
        report = f"# 数据分析报告\n\n## 分析目标\n{self.question}\n\n"
        report += "## 分析发现\n\n"
        for finding in self.findings:
            report += f"{finding}\n\n---\n\n"
        
        registry.get("file_write").run("data_analysis_report.md", report)
        print(f"📊 分析报告已保存到 babyagi_output/data_analysis_report.md")


# 运行
if __name__ == "__main__":
    agent = DataAnalysisAgent(
        data_path="./data/sales.csv",
        question="分析销售趋势,找出增长最快的产品类别和季节性规律"
    )
    agent.run()

六、LangChain 集成:构建生产级 Agent Pipeline

6.1 为什么需要 LangChain?

手写的 BabyAGI 有三个生产级痛点:

痛点 手写方案 LangChain 方案
错误恢复 工具调用失败就卡住 自动重试 + 降级策略
流程编排 硬编码循环 声明式 Chain / Graph
可观测性 print 大法 LangSmith 追踪 + 日志

6.2 生产级 Pipeline 架构

在这里插入图片描述

6.3 LangChain + BabyAGI 实现

# langchain_pipeline.py
"""生产级 Agent Pipeline:LangChain + BabyAGI"""

from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import Tool
from langchain import hub
from langchain_community.utilities import SerpAPIWrapper
from langchain_community.tools import PythonREPLTool
from langchain_chroma import Chroma as LangChainChroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
import os

# ============ 1. 初始化 LLM ============
llm = ChatOpenAI(
    model="gpt-4",
    temperature=0,
    max_retries=3,  # 自动重试
    request_timeout=60
)

# ============ 2. 定义工具 ============
# 搜索工具
search = SerpAPIWrapper()
search_tool = Tool(
    name="web_search",
    description="搜索网页获取最新信息",
    func=search.run
)

# 代码执行工具
python_tool = Tool(
    name="python_repl",
    description="执行 Python 代码,用于数据分析和计算",
    func=PythonREPLTool().run
)

# 文件写入工具
def write_file(input_str: str) -> str:
    """格式: 'filename|||content'"""
    parts = input_str.split("|||", 1)
    if len(parts) != 2:
        return "格式错误,请使用: filename|||content"
    filename, content = parts
    os.makedirs("./babyagi_output", exist_ok=True)
    filepath = os.path.join("./babyagi_output", os.path.basename(filename))
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(content)
    return f"文件已保存: {filepath}"

file_tool = Tool(
    name="file_write",
    description="将内容写入文件,输入格式: filename|||content",
    func=write_file
)

tools = [search_tool, python_tool, file_tool]

# ============ 3. 创建 ReAct Agent ============
prompt = hub.pull("hwchase17/react")
prompt = prompt.partial(
    tools="\n".join([f"{t.name}: {t.description}" for t in tools]),
    tool_names=", ".join([t.name for t in tools])
)

agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=10,      # 最大迭代次数
    handle_parsing_errors=True,  # 自动处理解析错误
    max_execution_time=300  # 5分钟超时
)

# ============ 4. BabyAGI 编排层 ============
class ProductionBabyAGI:
    """生产级 BabyAGI:LangChain Agent + 任务编排"""
    
    def __init__(self, objective: str):
        self.objective = objective
        self.task_queue = []
        self.results = []
        self.completed_tasks = set()
    
    def add_task(self, task: str, priority: int = 5):
        if task not in self.completed_tasks:
            self.task_queue.append({"task": task, "priority": priority})
            self.task_queue.sort(key=lambda x: x["priority"], reverse=True)
    
    def execute_task(self, task: str) -> str:
        """使用 LangChain AgentExecutor 执行任务"""
        try:
            result = agent_executor.invoke({
                "input": f"目标: {self.objective}\n当前任务: {task}"
            })
            return result.get("output", "执行完成但无输出")
        except Exception as e:
            return f"[执行失败] {str(e)}"
    
    def generate_subtasks(self, task: str, result: str):
        """基于执行结果生成后续任务"""
        prompt = PromptTemplate.from_template(
            """基于以下信息,生成后续子任务。

目标: {objective}
已完成: {task}
结果摘要: {result}

列出最多3个必要的后续子任务,格式: 优先级(0-9) | 任务描述"""
        )
        
        chain = prompt | llm | StrOutputParser()
        response = chain.invoke({
            "objective": self.objective,
            "task": task,
            "result": result[:1000]
        })
        
        for line in response.strip().split('\n'):
            if '|' in line:
                parts = line.split('|', 1)
                try:
                    priority = int(parts[0].strip())
                    subtask = parts[1].strip()
                    self.add_task(subtask, priority)
                except ValueError:
                    pass
    
    def run(self, max_iterations: int = 10):
        """运行主循环"""
        print(f"\n🎯 目标: {self.objective}\n")
        
        # 初始任务分解
        self.add_task(f"分析目标 '{self.objective}',列出需要完成的主要步骤", 9)
        
        for i in range(max_iterations):
            if not self.task_queue:
                print("✅ 所有任务已完成!")
                break
            
            task_item = self.task_queue.pop(0)
            task = task_item["task"]
            
            print(f"\n--- 迭代 {i+1} ---")
            print(f"📋 任务: {task}")
            
            result = self.execute_task(task)
            self.results.append({"task": task, "result": result})
            self.completed_tasks.add(task)
            
            print(f"✅ 完成: {result[:150]}...")
            
            self.generate_subtasks(task, result)
        
        return self.results


# ============ 5. 运行 ============
if __name__ == "__main__":
    babyagi = ProductionBabyAGI(
        objective="调研 2026 年最流行的 Python Web 框架,对比性能和生态,生成技术选型报告"
    )
    results = babyagi.run(max_iterations=8)

6.4 LangChain 集成的关键优势

能力 手写 BabyAGI LangChain BabyAGI
工具调用 手动解析 Action/Input AgentExecutor 自动处理
错误恢复 handle_parsing_errors=True
超时控制 max_execution_time
重试机制 max_retries=3
可观测性 print LangSmith 追踪
流程编排 while 循环 LCEL Chain / Graph

七、生产部署最佳实践

7.1 成本控制

策略 效果 实现方式
限制迭代次数 防止无限循环 max_iterations=10
使用 GPT-4o-mini 成本降 90% 简单任务用 mini,复杂任务用 GPT-4
缓存 LLM 响应 重复任务零成本 langchain.cache.InMemoryCache
本地 Embedding 省去 Embedding API 费用 sentence-transformers
Token 预算 单任务 token 上限 max_tokens + 结果截断

7.2 安全防护

# 安全配置
SECURITY_CONFIG = {
    # 代码执行沙箱
    "code_sandbox": {
        "forbidden_imports": ["os", "subprocess", "shutil", "sys"],
        "max_execution_time": 30,  # 秒
        "max_memory_mb": 512,
    },
    # 文件系统
    "file_system": {
        "allowed_dirs": ["./babyagi_output"],
        "max_file_size_mb": 10,
    },
    # 网络请求
    "network": {
        "allowed_domains": ["*"],  # 生产环境应限制
        "request_timeout": 15,
        "max_requests_per_task": 10,
    },
    # LLM 调用
    "llm": {
        "max_tokens_per_call": 2000,
        "max_calls_per_task": 5,
        "temperature": 0,  # 降低随机性
    }
}

7.3 监控与日志

# 监控配置
import logging
import time

class BabyAGIMonitor:
    """BabyAGI 运行监控"""
    
    def __init__(self):
        self.metrics = {
            "total_tasks": 0,
            "completed_tasks": 0,
            "failed_tasks": 0,
            "total_llm_calls": 0,
            "total_tokens": 0,
            "total_cost": 0.0,
            "start_time": time.time(),
        }
        logging.basicConfig(
            filename='babyagi.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
    
    def record_task(self, success: bool, tokens: int = 0, cost: float = 0.0):
        self.metrics["total_tasks"] += 1
        if success:
            self.metrics["completed_tasks"] += 1
        else:
            self.metrics["failed_tasks"] += 1
        self.metrics["total_llm_calls"] += 1
        self.metrics["total_tokens"] += tokens
        self.metrics["total_cost"] += cost
    
    def summary(self) -> str:
        elapsed = time.time() - self.metrics["start_time"]
        return f"""运行摘要:
- 任务完成率: {self.metrics['completed_tasks']}/{self.metrics['total_tasks']}
- LLM 调用次数: {self.metrics['total_llm_calls']}
- Token 消耗: {self.metrics['total_tokens']:,}
- 预估费用: ${self.metrics['total_cost']:.2f}
- 运行时长: {elapsed:.1f}s"""

八、面试高频问题

Q1:BabyAGI 的工具调用和 Function Calling 有什么区别?

BabyAGI 的工具调用基于 ReAct 范式(Thought → Action → Observation 循环),LLM 通过文本推理决定调用什么工具。Function Calling 是 OpenAI 原生支持的能力,LLM 直接输出结构化的工具调用 JSON,不需要文本解析。Function Calling 更可靠、更快,但 ReAct 更灵活、不依赖特定模型。

Q2:Chroma 和 Pinecone 怎么选?

开发阶段用 Chroma(零成本、低延迟、离线可用),生产阶段看需求:数据量 < 100 万且单机部署用 Chroma;数据量 > 100 万或需要分布式用 Pinecone/Milvus。核心指标是数据规模是否需要云端托管

Q3:怎么防止 BabyAGI 任务无限膨胀?

四道防线:(1) max_iterations 硬限制迭代次数;(2) Prompt 加"只创建必要的子任务,最多 3 个";(3) 去重检查——新任务与已有任务语义相似度 > 0.9 则跳过;(4) 任务预算——设定最大任务总数,超出后只执行不创建。

Q4:ReAct 和 Plan-and-Execute 有什么区别?

ReAct 是逐步推理+行动,每一步都重新思考下一步做什么,灵活但可能走弯路。Plan-and-Execute 是先规划再执行,一次性生成完整计划然后逐步执行,效率高但不够灵活。BabyAGI 原版更接近 Plan-and-Execute(先创建任务列表再执行),工具增强版更接近 ReAct。

Q5:生产环境怎么保证 Agent 的输出质量?

三层保障:(1) 质量门控——Review Agent 对输出评分,低于阈值则重做;(2) 人工审核——关键决策点插入人类确认(Human-in-the-loop);(3) 版本追踪——LangSmith 记录每步的输入输出,出问题可回溯。


九、总结

BabyAGI 系列上下两篇,覆盖了从原理到生产的完整链路:

模块 上篇 下篇
架构原理 三大代理 + 核心循环
源码拆解 逐行解析原版代码
Mini BabyAGI 80 行手写实现
框架对比 vs AutoGPT/CrewAI/LangGraph
工具集成 ReAct + ToolRegistry + 5 种工具
记忆优化 Chroma 本地化 + Embedding 对比
实战案例 竞品分析 / 代码审查 / 数据分析
LangChain AgentExecutor + 生产级 Pipeline
部署 成本控制 + 安全防护 + 监控日志

一句话总结 BabyAGI 的进化路径

上篇的 BabyAGI 是一个"会思考的大脑"——能拆任务、排优先级、循环执行。下篇给它装上了"手脚"(工具)、“记忆”(Chroma)、“纪律”(LangChain 编排),从一个有趣的实验变成了一个能干活的系统。

如果觉得这个系列对你有帮助,欢迎点赞 + 收藏 + 关注,你的支持是我持续创作的动力!有问题欢迎在评论区交流~

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐