🚀 从零构建 AI 代码开发助手:架构设计与实战指南

前言

在软件开发领域,我们每天都在重复着相似的编码工作:编写样板代码、撰写文档、进行代码审查、修复常见 Bug。这些任务虽然简单,却消耗了大量宝贵时间。

“让我们教会 AI 为我们工作,而不是我们为 AI 工作。”

本文将带你从架构设计到代码实现,完整构建一个代码开发助手。这个助手能够:

  • 理解项目上下文,自主生成高质量代码
  • 智能进行代码审查,发现潜在问题
  • 自动生成测试用例和文档
  • 通过多 Agent 协作完成复杂任务

一、整体架构设计

1.1 系统架构概览

┌─────────────────────────────────────────────────────────────────┐
│                        用户交互层 (CLI / IDE插件)               │
├─────────────────────────────────────────────────────────────────┤
│                      任务调度层 (Task Orchestrator)             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
│  │ Code Agent  │  │ Review Agent│  │ Test Agent  │              │
│  └─────────────┘  └─────────────┘  └─────────────┘              │
├─────────────────────────────────────────────────────────────────┤
│                      工具能力层 (Tool Registry)                 │
│  ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐       │
│  │ 文件   │ │ Git    │ │ 执行   │ │ 搜索   │ │ LLM    │       │
│  │ 操作   │ │ 操作   │ │ 命令   │ │ 代码   │ │ 调用   │       │
│  └────────┘ └────────┘ └────────┘ └────────┘ └────────┘       │
├─────────────────────────────────────────────────────────────────┤
│                      上下文管理层 (Context Manager)             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
│  │ 项目结构    │  │ 代码语义    │  │ 对话历史    │              │
│  │ 索引        │  │ 分析        │  │ 管理        │              │
│  └─────────────┘  └─────────────┘  └─────────────┘              │
└─────────────────────────────────────────────────────────────────┘

1.2 核心设计理念

1. 工具优先 (Tool-First)

与纯对话式 AI 不同,我们的代码助手以工具为核心。每个 Agent 都被赋予特定的能力:

  • 文件读写、Git 操作、命令执行
  • 代码搜索、AST 解析、语法分析
  • 搜索引擎查询、文档获取

2. 状态管理 (State Management)

采用有限状态机管理 Agent 行为:

  • IDLEPLANNINGEXECUTINGREVIEWINGDONE / FAILED

3. 记忆系统 (Memory System)

三层记忆架构确保长期上下文能力:

  • 短期记忆:当前对话上下文
  • 中期记忆:项目级知识(项目结构、业务逻辑)
  • 长期记忆:跨项目的最佳实践和模式库

二、核心模块实现

2.1 Agent 基类设计

from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
import asyncio

class AgentState(Enum):
    IDLE = "idle"
    PLANNING = "planning"
    EXECUTING = "executing"
    REVIEWING = "reviewing"
    DONE = "done"
    FAILED = "failed"

@dataclass
class AgentResponse:
    """Agent 执行结果"""
    success: bool
    content: str
    state: AgentState
    artifacts: Dict[str, Any] = field(default_factory=dict)
    next_actions: List[str] = field(default_factory=list)
    error: Optional[str] = None

class BaseAgent(ABC):
    """Agent 基类 - 定义通用行为和接口"""
    
    def __init__(
        self,
        name: str,
        description: str,
        llm_client: "LLMClient",
        tool_registry: "ToolRegistry",
        max_iterations: int = 10
    ):
        self.name = name
        self.description = description
        self.llm = llm_client
        self.tools = tool_registry
        self.max_iterations = max_iterations
        self.state = AgentState.IDLE
        self._conversation_history: List[Dict] = []
    
    @abstractmethod
    async def plan(self, task: str, context: Dict) -> List[str]:
        """制定执行计划"""
        pass
    
    @abstractmethod
    async def execute(self, action: str, context: Dict) -> AgentResponse:
        """执行具体动作"""
        pass
    
    async def run(self, task: str, context: Dict) -> AgentResponse:
        """主运行循环"""
        self.state = AgentState.PLANNING
        plan = await self.plan(task, context)
        
        for action in plan[:self.max_iterations]:
            self.state = AgentState.EXECUTING
            response = await self.execute(action, context)
            
            if not response.success:
                self.state = AgentState.FAILED
                return response
            
            self._conversation_history.append({
                "action": action,
                "response": response
            })
            
            if response.state == AgentState.DONE:
                break
        
        self.state = AgentState.DONE
        return AgentResponse(
            success=True,
            content="任务完成",
            state=self.state
        )

2.2 工具注册中心 (Tool Registry)

import inspect
from typing import Callable, Any
from dataclasses import dataclass

@dataclass
class Tool:
    """工具定义"""
    name: str
    description: str
    parameters: Dict[str, Any]
    handler: Callable
    category: str

class ToolRegistry:
    """工具注册中心 - 管理所有可用工具"""
    
    def __init__(self):
        self._tools: Dict[str, Tool] = {}
        self._categories: Dict[str, List[str]] = {}
    
    def register(
        self,
        name: str,
        description: str,
        category: str = "general"
    ):
        """装饰器注册工具"""
        def decorator(func: Callable) -> Callable:
            sig = inspect.signature(func)
            params = {
                p.name: {
                    "type": str(p.annotation) if p.annotation != inspect.Parameter.empty else "any",
                    "default": p.default if p.default != inspect.Parameter.empty else None,
                    "required": p.default == inspect.Parameter.empty
                }
                for p in sig.parameters.values()
            }
            
            self._tools[name] = Tool(
                name=name,
                description=description,
                parameters=params,
                handler=func,
                category=category
            )
            
            if category not in self._categories:
                self._categories[category] = []
            self._categories[category].append(name)
            
            return func
        return decorator
    
    def get_tool(self, name: str) -> Optional[Tool]:
        return self._tools.get(name)
    
    def get_tools_by_category(self, category: str) -> List[Tool]:
        return [self._tools[name] for name in self._categories.get(category, [])]
    
    def list_all_tools(self) -> Dict[str, Tool]:
        return self._tools.copy()
    
    def generate_tools_prompt(self) -> str:
        """生成工具描述供 LLM 使用"""
        lines = ["## 可用工具\n"]
        for category, tool_names in self._categories.items():
            lines.append(f"\n### {category}\n")
            for name in tool_names:
                tool = self._tools[name]
                lines.append(f"- **{name}**: {tool.description}")
        return "\n".join(lines)

# 全局工具注册中心实例
global_tools = ToolRegistry()

# 使用示例
@global_tools.register(name="read_file", description="读取文件内容", category="file")
async def read_file(path: str, encoding: str = "utf-8") -> str:
    """读取文件内容"""
    with open(path, 'r', encoding=encoding) as f:
        return f.read()

@global_tools.register(name="write_file", description="写入文件内容", category="file")
async def write_file(path: str, content: str, encoding: str = "utf-8") -> str:
    """写入文件内容"""
    with open(path, 'w', encoding=encoding) as f:
        f.write(content)
    return f"成功写入文件: {path}"

@global_tools.register(name="search_code", description="搜索代码文件", category="search")
async def search_code(pattern: str, path: str = ".") -> List[str]:
    """在代码库中搜索匹配的内容"""
    import glob
    results = []
    for file in glob.glob(f"{path}/**/*.py", recursive=True):
        try:
            with open(file, 'r', encoding='utf-8') as f:
                if pattern in f.read():
                    results.append(file)
        except:
            continue
    return results

2.3 代码生成 Agent 实现

class CodeAgent(BaseAgent):
    """代码生成 Agent"""
    
    def __init__(self, llm_client, tool_registry, context_manager):
        super().__init__(
            name="code_agent",
            description="专业的代码生成助手",
            llm_client=llm_client,
            tool_registry=tool_registry
        )
        self.context_manager = context_manager
    
    async def plan(self, task: str, context: Dict) -> List[str]:
        """分析任务并制定执行计划"""
        
        # 获取项目上下文
        project_context = await self.context_manager.get_project_context()
        
        prompt = f"""
## 任务
生成满足以下需求的代码:
{task}

## 项目上下文
{project_context}

## 已有文件结构
{context.get('file_tree', '未知')}

## 历史记录
{context.get('recent_changes', '无')}

请分析需求,制定代码生成计划:
1. 需要创建/修改哪些文件
2. 依赖关系和执行顺序
3. 需要调用的工具

输出格式:
- 每个步骤一行
- 格式:序号. 具体行动
"""
        
        response = await self.llm.complete(prompt)
        plan = [line.strip() for line in response.split('\n') if line.strip()]
        return plan
    
    async def execute(self, action: str, context: Dict) -> AgentResponse:
        """执行具体的代码生成动作"""
        
        # 根据动作类型调用相应工具
        if "创建文件" in action or "生成代码" in action:
            # 解析文件名和内容
            file_match = re.search(r'[`"\']([^`"\']+\.\w+)[`"\']', action)
            if file_match:
                file_path = file_match.group(1)
                
                # 请求 LLM 生成代码
                code_prompt = f"""
任务:{action}
语言:{file_path.split('.')[-1]}
上下文:{context}
请生成完整的、可运行的代码。
"""
                code = await self.llm.complete(code_prompt, max_tokens=2000)
                
                # 写入文件
                await self._write_code_file(file_path, code)
                
                return AgentResponse(
                    success=True,
                    content=f"生成文件: {file_path}",
                    state=AgentState.EXECUTING
                )
        
        return AgentResponse(
            success=False,
            content="无法理解动作",
            state=AgentState.FAILED,
            error="解析失败"
        )
    
    async def _write_code_file(self, path: str, code: str) -> None:
        """写入代码文件"""
        # 确保目录存在
        import os
        os.makedirs(os.path.dirname(path), exist_ok=True)
        
        tool = self.tools.get_tool("write_file")
        await tool.handler(path, code)

2.4 代码审查 Agent 实现

class ReviewAgent(BaseAgent):
    """代码审查 Agent - 检测代码质量问题"""
    
    def __init__(self, llm_client, tool_registry):
        super().__init__(
            name="review_agent",
            description="专业的代码审查助手",
            llm_client=llm_client,
            tool_registry=tool_registry
        )
        self.review_criteria = {
            "code_quality": ["可读性", "复杂性", "命名规范"],
            "best_practices": ["设计模式", "SOLID原则", "DRY原则"],
            "security": ["注入风险", "敏感信息", "权限控制"],
            "performance": ["算法复杂度", "资源使用", "缓存策略"],
            "testing": ["覆盖率", "边界条件", "Mock使用"]
        }
    
    async def plan(self, task: str, context: Dict) -> List[str]:
        """制定代码审查计划"""
        prompt = f"""
## 审查任务
{task}

## 审查文件
{context.get('files_to_review', [])}

## 审查维度
{json.dumps(self.review_criteria, ensure_ascii=False)}

制定审查计划,按以下格式输出:
1. 语法检查
2. 语义分析
3. 安全审查
4. 性能分析
5. 最佳实践检查
"""
        response = await self.llm.complete(prompt)
        return [line.strip() for line in response.split('\n') if line.strip()]
    
    async def execute(self, action: str, context: Dict) -> AgentResponse:
        """执行代码审查"""
        file_path = context.get("current_file")
        
        if not file_path:
            return AgentResponse(
                success=False,
                content="未指定文件",
                state=AgentState.FAILED,
                error="缺少文件路径"
            )
        
        # 读取文件内容
        read_tool = self.tools.get_tool("read_file")
        content = await read_tool.handler(file_path)
        
        # 进行审查
        review_prompt = f"""
## 审查类型
{action}

## 代码内容
```python
{content}

请进行专业审查,按以下格式输出:

发现的问题

  1. [严重/警告/提示] 问题描述
    • 位置:xxx
    • 建议:xxx

总体评分

  • 可读性:X/10
  • 安全性:X/10
  • 性能:X/10
  • 可维护性:X/10

改进建议


“”"

    review_result = await self.llm.complete(review_prompt, max_tokens=1500)
    
    return AgentResponse(
        success=True,
        content=review_result,
        state=AgentState.EXECUTING,
        artifacts={"review_report": review_result}
    )

---

## 三、多 Agent 协作系统

### 3.1 任务编排器 (Task Orchestrator)

```python
from typing import List, Dict, Any
from dataclasses import dataclass
import asyncio

@dataclass
class Task:
    """任务定义"""
    id: str
    type: str  # code | review | test | refactor
    description: str
    dependencies: List[str] = field(default_factory=list)
    status: str = "pending"
    result: Any = None

class TaskOrchestrator:
    """任务编排器 - 管理多 Agent 协作"""
    
    def __init__(self, agents: Dict[str, BaseAgent]):
        self.agents = agents
        self.tasks: Dict[str, Task] = {}
        self._execution_graph: Dict[str, List[str]] = {}
    
    def add_task(self, task: Task) -> None:
        """添加任务"""
        self.tasks[task.id] = task
        if task.id not in self._execution_graph:
            self._execution_graph[task.id] = task.dependencies
    
    def _get_execution_order(self) -> List[str]:
        """基于依赖关系计算执行顺序(拓扑排序)"""
        visited = set()
        order = []
        
        def dfs(node: str):
            if node in visited:
                return
            visited.add(node)
            for dep in self._execution_graph.get(node, []):
                dfs(dep)
            order.append(node)
        
        for task_id in self.tasks:
            dfs(task_id)
        
        return list(reversed(order))
    
    async def execute_workflow(
        self,
        initial_task: str,
        context: Dict
    ) -> Dict[str, Any]:
        """执行完整工作流"""
        
        # 1. 拓扑排序确定执行顺序
        execution_order = self._get_execution_order()
        
        results = {}
        
        # 2. 按顺序执行任务
        for task_id in execution_order:
            task = self.tasks[task_id]
            
            # 检查依赖是否满足
            deps_satisfied = all(
                self.tasks[dep].status == "completed"
                for dep in task.dependencies
            )
            
            if not deps_satisfied:
                task.status = "skipped"
                continue
            
            # 选择合适的 Agent
            agent = self._select_agent(task.type)
            
            # 合并上下文
            task_context = {
                **context,
                "dependencies_results": {
                    dep: self.tasks[dep].result
                    for dep in task.dependencies
                }
            }
            
            # 执行任务
            try:
                response = await agent.run(task.description, task_context)
                task.result = response
                task.status = "completed" if response.success else "failed"
                results[task_id] = response
            except Exception as e:
                task.status = "failed"
                task.result = {"error": str(e)}
                results[task_id] = {"error": str(e)}
        
        return results
    
    def _select_agent(self, task_type: str) -> BaseAgent:
        """根据任务类型选择合适的 Agent"""
        agent_map = {
            "code": self.agents.get("code_agent"),
            "review": self.agents.get("review_agent"),
            "test": self.agents.get("test_agent"),
            "refactor": self.agents.get("refactor_agent"),
        }
        return agent_map.get(task_type, self.agents.get("code_agent"))

3.2 协作示例:实现一个 API 功能

async def implement_api_feature(orchestrator: TaskOrchestrator):
    """协作实现一个完整的 API 功能"""
    
    # 创建任务
    tasks = [
        Task(
            id="write_model",
            type="code",
            description="创建 User 模型,包含 id, name, email, created_at 字段"
        ),
        Task(
            id="write_repository",
            type="code",
            description="创建 UserRepository,实现 CRUD 操作",
            dependencies=["write_model"]
        ),
        Task(
            id="write_service",
            type="code",
            description="创建 UserService,实现业务逻辑",
            dependencies=["write_repository"]
        ),
        Task(
            id="write_api",
            type="code",
            description="创建 FastAPI 路由,实现 GET/POST/PUT/DELETE",
            dependencies=["write_service"]
        ),
        Task(
            id="review_code",
            type="review",
            description="审查所有生成的代码",
            dependencies=["write_api"]
        ),
        Task(
            id="write_tests",
            type="test",
            description="为 UserService 编写单元测试",
            dependencies=["write_service"]
        ),
    ]
    
    # 添加任务到编排器
    for task in tasks:
        orchestrator.add_task(task)
    
    # 执行工作流
    context = {
        "project_path": "./src",
        "framework": "fastapi",
        "database": "postgresql"
    }
    
    results = await orchestrator.execute_workflow("write_api", context)
    
    # 输出结果
    for task_id, result in results.items():
        print(f"Task {task_id}: {result}")

四、上下文管理优化

4.1 项目结构索引

import os
from pathlib import Path
from typing import Dict, List, Optional
import json

class ProjectContextManager:
    """项目上下文管理器"""
    
    def __init__(self, project_root: str):
        self.project_root = Path(project_root)
        self._file_index: Dict[str, FileInfo] = {}
        self._structure_tree: Dict = {}
        self._code_chunks: List[CodeChunk] = []
    
    def build_index(self) -> None:
        """构建项目索引"""
        self._scan_directory(self.project_root)
        self._build_structure_tree()
        self._generate_code_chunks()
    
    def _scan_directory(self, directory: Path) -> None:
        """扫描目录构建文件索引"""
        ignore_patterns = {
            '__pycache__', '.git', 'node_modules', 
            '.venv', 'dist', 'build', '.idea'
        }
        
        for item in directory.iterdir():
            if item.name in ignore_patterns:
                continue
            
            if item.is_file() and self._is_code_file(item):
                self._index_file(item)
            elif item.is_dir():
                self._scan_directory(item)
    
    def _index_file(self, file_path: Path) -> None:
        """索引单个文件"""
        file_info = FileInfo(
            path=str(file_path.relative_to(self.project_root)),
            name=file_path.name,
            extension=file_path.suffix,
            size=file_path.stat().st_size,
            modified=file_path.stat().st_mtime
        )
        self._file_index[str(file_path)] = file_info
    
    def _build_structure_tree(self) -> None:
        """构建目录结构树"""
        tree = {"name": self.project_root.name, "type": "folder", "children": []}
        
        for file_path in self._file_index.values():
            parts = file_info.path.split(os.sep)
            current = tree
            for i, part in enumerate(parts[:-1]):
                # 导航或创建路径
                child = next((c for c in current["children"] if c["name"] == part), None)
                if not child:
                    child = {"name": part, "type": "folder", "children": []}
                    current["children"].append(child)
                current = child
            
            current["children"].append({
                "name": parts[-1],
                "type": "file",
                "path": file_info.path
            })
        
        self._structure_tree = tree
    
    def get_relevant_context(self, query: str) -> str:
        """根据查询获取相关上下文"""
        # 简单实现:返回目录结构
        return json.dumps(self._structure_tree, indent=2)

五、实战效果展示

5.1 使用示例

# 初始化系统
async def main():
    # 1. 初始化 LLM 客户端
    llm = OpenAIClient(api_key="your-api-key", model="gpt-4")
    
    # 2. 初始化工具注册中心
    tools = ToolRegistry()
    register_default_tools(tools)  # 注册所有内置工具
    
    # 3. 初始化上下文管理器
    context_mgr = ProjectContextManager("./my_project")
    context_mgr.build_index()
    
    # 4. 初始化各个 Agent
    agents = {
        "code_agent": CodeAgent(llm, tools, context_mgr),
        "review_agent": ReviewAgent(llm, tools),
        "test_agent": TestAgent(llm, tools),
    }
    
    # 5. 创建任务编排器
    orchestrator = TaskOrchestrator(agents)
    
    # 6. 执行任务
    result = await orchestrator.execute_workflow(
        initial_task="implement_user_api",
        context={
            "project_path": "./my_project",
            "description": "实现用户管理 API"
        }
    )
    
    print(json.dumps(result, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    asyncio.run(main())

5.2 输出效果

Task write_model: ✅ 成功
  - 创建文件: src/models/user.py
  - 包含: id, name, email, created_at, updated_at

Task write_repository: ✅ 成功
  - 创建文件: src/repositories/user_repository.py
  - 实现: create, get_by_id, update, delete, list

Task write_service: ✅ 成功
  - 创建文件: src/services/user_service.py
  - 实现: 业务逻辑层封装

Task write_api: ✅ 成功
  - 创建文件: src/api/routes/user.py
  - 实现: CRUD 路由 + 参数验证

Task review_code: ✅ 通过
  - 代码质量: 8.5/10
  - 安全检查: 通过
  - 建议: 3 处可优化

Task write_tests: ✅ 成功
  - 创建文件: tests/test_user_service.py
  - 覆盖率: 85%

六、进阶优化建议

6.1 当前限制与改进方向

方向 当前状态 改进建议
上下文窗口 简单截断 引入向量数据库,支持语义检索
代码理解 依赖 LLM 集成 Tree-sitter,实现 AST 分析
执行安全 无沙箱 引入 Docker 沙箱,限制危险操作
持久化 无状态 集成向量数据库,保存执行历史

6.2 长期记忆实现

class LongTermMemory:
    """长期记忆系统 - 跨项目知识复用"""
    
    def __init__(self, vector_store):
        self.vector_store = vector_store
        self.patterns: Dict[str, List[str]] = defaultdict(list)
    
    async def store_pattern(
        self,
        pattern_type: str,
        code: str,
        description: str
    ) -> None:
        """存储代码模式"""
        vector = await self.embeddings.embed(code)
        await self.vector_store.insert({
            "type": pattern_type,
            "code": code,
            "description": description,
            "vector": vector
        })
    
    async def retrieve_similar(
        self,
        code: str,
        top_k: int = 5
    ) -> List[Dict]:
        """检索相似模式"""
        query_vector = await self.embeddings.embed(code)
        results = await self.vector_store.search(query_vector, top_k)
        return results

七、总结

本文完整实现了一个代码开发助手的核心系统:

  1. 模块化 Agent 架构:每个 Agent 职责单一,易于扩展
  2. 强大的工具系统:文件、Git、搜索等能力无缝集成
  3. 多 Agent 协作:通过任务编排器实现复杂流程自动化
  4. 智能上下文管理:三层记忆系统确保长期有效性

下一步建议

  • 接入实际项目运行,观察效果
  • 根据业务需求扩展工具集
  • 引入反馈机制,持续优化生成质量

📢 写在最后

如果你觉得这篇文章对你有帮助,欢迎:

  • 👍 点赞 + 收藏
  • 💬 评论区分享你的想法
  • ➕ 关注我,获取更多 AI + 软件开发实战内容

有任何问题或建议,欢迎在评论区交流!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐