前言

💡 痛点:代码审查耗时耗力?漏掉安全漏洞?风格不一致?新人代码难以把关?

🎯 解决方案:构建 AI 代码审查助手 — 自动检测 Bug、安全漏洞、性能问题、风格规范,让 Code Review 效率提升 10 倍。

AI 代码审查能做什么?

输出

AI审查引擎

输入

Diff / PR

完整代码

规则配置

预处理

分析

Bug检测

安全扫描

性能分析

风格检查

严重问题 🔴

安全警告 🟡

优化建议 🔵

风格提示 ⚪

审查报告

审查维度对比:

维度 传统 Lint 人工审查 AI 审查
Bug 检测 ❌ 有限 ✅ 强 ✅ 强
安全漏洞 ⚠️ 规则库 ✅ 经验 ✅ 深度分析
性能问题 ❌ 无 ⚠️ 依赖经验 ✅ 自动分析
风格规范 ✅ 强 ⚠️ 主观 ✅ 可配置
逻辑错误 ❌ 无 ✅ 强 ✅ 较强
上下文理解 ❌ 无 ✅ 强 ✅ 中等
速度 ⚡ 毫秒 🐢 小时 🚀 秒级

一、代码审查基础框架

1.1 审查结果数据模型

# ===== 代码审查数据模型 =====

from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional
from datetime import datetime

class Severity(Enum):
    """严重等级"""
    CRITICAL = "critical"    # 必须修复:安全漏洞、数据丢失风险
    MAJOR = "major"          # 应该修复:Bug、性能问题
    MINOR = "minor"          # 建议修复:代码风格、可读性
    INFO = "info"            # 信息提示:最佳实践建议

class IssueCategory(Enum):
    """问题分类"""
    BUG = "bug"              # Bug 和逻辑错误
    SECURITY = "security"    # 安全漏洞
    PERFORMANCE = "performance"  # 性能问题
    STYLE = "style"          # 代码风格
    MAINTAINABILITY = "maintainability"  # 可维护性
    COMPLEXITY = "complexity"  # 复杂度
    ERROR_HANDLING = "error_handling"  # 错误处理
    TESTING = "testing"      # 测试覆盖

class SuggestionAction(Enum):
    """建议操作"""
    FIX = "fix"              # 需要修复
    CONSIDER = "consider"    # 建议考虑
    REFACTOR = "refactor"    # 建议重构
    DOCUMENT = "document"    # 需要文档
    TEST = "test"            # 需要测试

@dataclass
class CodeLocation:
    """代码位置"""
    file: str
    start_line: int
    end_line: int = 0
    start_column: int = 0
    end_column: int = 0
    
    def __post_init__(self):
        if self.end_line == 0:
            self.end_line = self.start_line
    
    def __str__(self):
        if self.start_line == self.end_line:
            return f"{self.file}:{self.start_line}"
        return f"{self.file}:{self.start_line}-{self.end_line}"

@dataclass
class CodeIssue:
    """代码问题"""
    severity: Severity
    category: IssueCategory
    title: str
    description: str
    location: CodeLocation
    code_snippet: str = ""
    suggestion: str = ""
    fix_code: str = ""
    rule_id: str = ""
    confidence: float = 1.0
    action: SuggestionAction = SuggestionAction.FIX
    
    def to_dict(self) -> dict:
        return {
            "severity": self.severity.value,
            "category": self.category.value,
            "title": self.title,
            "description": self.description,
            "location": str(self.location),
            "suggestion": self.suggestion,
            "fix_code": self.fix_code,
            "rule_id": self.rule_id,
            "confidence": self.confidence,
            "action": self.action.value
        }

@dataclass
class ReviewResult:
    """审查结果"""
    file_path: str
    issues: List[CodeIssue] = field(default_factory=list)
    summary: str = ""
    score: float = 0.0       # 代码质量评分 0-100
    reviewed_at: str = field(default_factory=lambda: datetime.now().isoformat())
    
    @property
    def critical_count(self) -> int:
        return sum(1 for i in self.issues if i.severity == Severity.CRITICAL)
    
    @property
    def major_count(self) -> int:
        return sum(1 for i in self.issues if i.severity == Severity.MAJOR)
    
    @property
    def has_blockers(self) -> bool:
        return self.critical_count > 0
    
    def add_issue(self, issue: CodeIssue):
        self.issues.append(issue)
    
    def to_report(self) -> str:
        """生成文本报告"""
        lines = [
            f"📋 代码审查报告: {self.file_path}",
            f"   评分: {self.score}/100",
            f"   问题: {len(self.issues)} 个",
            f"   严重: {self.critical_count} | 主要: {self.major_count}",
            f"   {'🚫 阻断合并' if self.has_blockers else '✅ 可以合并'}",
            ""
        ]
        
        for issue in sorted(self.issues, key=lambda x: x.severity.value):
            icon = {
                Severity.CRITICAL: "🔴",
                Severity.MAJOR: "🟡",
                Severity.MINOR: "🔵",
                Severity.INFO: "⚪"
            }[issue.severity]
            
            lines.append(f"{icon} [{issue.category.value}] {issue.title}")
            lines.append(f"   位置: {issue.location}")
            lines.append(f"   说明: {issue.description}")
            if issue.suggestion:
                lines.append(f"   建议: {issue.suggestion}")
            if issue.fix_code:
                lines.append(f"   修复: {issue.fix_code}")
            lines.append("")
        
        return "\n".join(lines)

# 使用示例
result = ReviewResult(file_path="app.py", score=72)
result.add_issue(CodeIssue(
    severity=Severity.CRITICAL,
    category=IssueCategory.SECURITY,
    title="SQL 注入漏洞",
    description="直接拼接用户输入到 SQL 查询中",
    location=CodeLocation("app.py", 42),
    suggestion="使用参数化查询",
    fix_code="cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))",
    rule_id="SEC001"
))
result.add_issue(CodeIssue(
    severity=Severity.MAJOR,
    category=IssueCategory.BUG,
    title="未处理异常",
    description="数据库查询可能抛出异常但未捕获",
    location=CodeLocation("app.py", 45),
    suggestion="添加 try-except 块"
))
result.add_issue(CodeIssue(
    severity=Severity.MINOR,
    category=IssueCategory.STYLE,
    title="变量命名不规范",
    description="变量 'tmp' 含义不清",
    location=CodeLocation("app.py", 30),
    suggestion="使用更具描述性的名称",
    action=SuggestionAction.CONSIDER
))

print(result.to_report())

1.2 代码解析器

# ===== 代码解析器 =====

import ast
import re
import os
from typing import List, Dict, Optional

class CodeParser:
    """代码解析器"""
    
    SUPPORTED_EXTENSIONS = {
        '.py': 'python',
        '.js': 'javascript',
        '.ts': 'typescript',
        '.java': 'java',
        '.go': 'go',
        '.rs': 'rust',
        '.cpp': 'cpp',
        '.c': 'c',
        '.rb': 'ruby',
        '.php': 'php'
    }
    
    def __init__(self):
        self.parsers = {
            'python': self._parse_python,
            'javascript': self._parse_js,
            'typescript': self._parse_js,
        }
    
    def detect_language(self, file_path: str) -> Optional[str]:
        """检测编程语言"""
        _, ext = os.path.splitext(file_path)
        return self.SUPPORTED_EXTENSIONS.get(ext)
    
    def parse(self, code: str, language: str) -> Dict:
        """解析代码"""
        parser = self.parsers.get(language, self._parse_generic)
        return parser(code)
    
    def _parse_python(self, code: str) -> Dict:
        """解析 Python 代码"""
        try:
            tree = ast.parse(code)
        except SyntaxError as e:
            return {"error": str(e), "language": "python"}
        
        functions = []
        classes = []
        imports = []
        
        for node in ast.walk(tree):
            if isinstance(node, ast.FunctionDef):
                functions.append({
                    "name": node.name,
                    "line": node.lineno,
                    "end_line": getattr(node, 'end_lineno', node.lineno),
                    "args": [a.arg for a in node.args.args],
                    "decorators": [
                        d.id if isinstance(d, ast.Name) else str(d)
                        for d in node.decorator_list
                    ],
                    "is_async": isinstance(node, ast.AsyncFunctionDef),
                    "complexity": self._calculate_complexity(node)
                })
            elif isinstance(node, ast.ClassDef):
                classes.append({
                    "name": node.name,
                    "line": node.lineno,
                    "bases": [
                        b.id if isinstance(b, ast.Name) else str(b)
                        for b in node.bases
                    ],
                    "methods": [
                        n.name for n in node.body
                        if isinstance(n, ast.FunctionDef)
                    ]
                })
            elif isinstance(node, (ast.Import, ast.ImportFrom)):
                module = getattr(node, 'module', '') or ''
                names = [a.name for a in node.names]
                imports.append({"module": module, "names": names, "line": node.lineno})
        
        return {
            "language": "python",
            "functions": functions,
            "classes": classes,
            "imports": imports,
            "lines": len(code.splitlines()),
            "tree": tree
        }
    
    def _parse_js(self, code: str) -> Dict:
        """解析 JavaScript/TypeScript 代码(正则简易版)"""
        functions = re.findall(
            r'(?:async\s+)?function\s+(\w+)\s*\(([^)]*)\)|'
            r'(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\(([^)]*)\)\s*=>',
            code
        )
        
        return {
            "language": "javascript",
            "functions": [
                {"name": f[0] or f[2], "args": (f[1] or f[3]).split(",")}
                for f in functions if f[0] or f[2]
            ],
            "lines": len(code.splitlines())
        }
    
    def _parse_generic(self, code: str) -> Dict:
        """通用解析"""
        return {
            "language": "unknown",
            "lines": len(code.splitlines()),
            "chars": len(code),
            "blank_lines": sum(1 for line in code.splitlines() if not line.strip()),
            "long_lines": sum(1 for line in code.splitlines() if len(line) > 120)
        }
    
    def _calculate_complexity(self, node: ast.AST) -> int:
        """计算圈复杂度"""
        complexity = 1
        for child in ast.walk(node):
            if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
                complexity += 1
            elif isinstance(child, ast.BoolOp):
                complexity += len(child.values) - 1
        return complexity
    
    def get_diff_chunks(self, diff: str) -> List[Dict]:
        """解析 diff 为变更块"""
        chunks = []
        current_file = None
        current_lines = []
        
        for line in diff.splitlines():
            if line.startswith('--- a/') or line.startswith('+++ b/'):
                if line.startswith('+++ b/'):
                    current_file = line[6:]
                continue
            elif line.startswith('@@'):
                if current_lines and current_file:
                    chunks.append({
                        "file": current_file,
                        "lines": current_lines
                    })
                current_lines = []
                # 解析行号
                match = re.search(r'\+(\d+)', line)
                start_line = int(match.group(1)) if match else 0
                current_lines = []
            elif line.startswith('+') and not line.startswith('+++'):
                current_lines.append({
                    "type": "added",
                    "content": line[1:],
                    "line_number": len(current_lines) + start_line
                })
            elif line.startswith('-') and not line.startswith('---'):
                current_lines.append({
                    "type": "removed",
                    "content": line[1:],
                    "line_number": 0
                })
        
        if current_lines and current_file:
            chunks.append({
                "file": current_file,
                "lines": current_lines
            })
        
        return chunks

# 使用
parser = CodeParser()

sample_code = '''
import os
import json

def process_data(data: dict) -> list:
    results = []
    for key, value in data.items():
        if value > 10:
            results.append({"key": key, "value": value})
        elif value > 5:
            results.append({"key": key, "value": value * 2})
    return results

class DataProcessor:
    def __init__(self, config):
        self.config = config
    
    def run(self):
        data = self.load_data()
        return process_data(data)
    
    def load_data(self):
        return {"a": 15, "b": 8, "c": 3}
'''

parsed = parser.parse(sample_code, 'python')
print(f"语言: {parsed['language']}")
print(f"行数: {parsed['lines']}")
print(f"函数: {[f['name'] for f in parsed['functions']]}")
print(f"类: {[c['name'] for c in parsed['classes']]}")
print(f"导入: {[i['module'] for i in parsed['imports']]}")

二、规则引擎

2.1 基于规则的审查器

# ===== 规则引擎 =====

from abc import ABC, abstractmethod
import re
import ast

class ReviewRule(ABC):
    """审查规则基类"""
    
    rule_id: str = ""
    category: IssueCategory = IssueCategory.BUG
    severity: Severity = Severity.MAJOR
    description: str = ""
    
    @abstractmethod
    def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
        """检查代码,返回问题列表"""
        pass

class SQLInjectionRule(ReviewRule):
    """SQL 注入检测"""
    
    rule_id = "SEC001"
    category = IssueCategory.SECURITY
    severity = Severity.CRITICAL
    description = "检测 SQL 注入漏洞"
    
    SQL_PATTERNS = [
        r'execute\s*\(\s*f["\'].*SELECT.*{',
        r'execute\s*\(\s*["\'].*SELECT.*%[sd]',
        r'execute\s*\(\s*["\'].*SELECT.*\+',
        r'cursor\.execute\s*\(\s*f["\']',
    ]
    
    def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
        issues = []
        for i, line in enumerate(code.splitlines(), 1):
            for pattern in self.SQL_PATTERNS:
                if re.search(pattern, line, re.IGNORECASE):
                    issues.append(CodeIssue(
                        severity=self.severity,
                        category=self.category,
                        title="SQL 注入风险",
                        description=f"检测到可能的 SQL 注入: {line.strip()}",
                        location=CodeLocation(file_path, i),
                        code_snippet=line.strip(),
                        suggestion="使用参数化查询替代字符串拼接",
                        fix_code=self._generate_fix(line),
                        rule_id=self.rule_id,
                        confidence=0.85
                    ))
        return issues
    
    def _generate_fix(self, line: str) -> str:
        if 'f"' in line or "f'" in line:
            return line.replace('f"', '"').replace("f'", "'")
        return "# 使用参数化查询: cursor.execute(sql, (param1, param2))"

class HardcodedSecretRule(ReviewRule):
    """硬编码密钥检测"""
    
    rule_id = "SEC002"
    category = IssueCategory.SECURITY
    severity = Severity.CRITICAL
    description = "检测硬编码的密钥和密码"
    
    SECRET_PATTERNS = [
        (r'password\s*=\s*["\'][^"\']{4,}["\']', "硬编码密码"),
        (r'api_key\s*=\s*["\'][^"\']{8,}["\']', "硬编码 API Key"),
        (r'secret\s*=\s*["\'][^"\']{8,}["\']', "硬编码 Secret"),
        (r'token\s*=\s*["\'][^"\']{16,}["\']', "硬编码 Token"),
        (r'AWS_SECRET_ACCESS_KEY\s*=\s*["\']', "AWS 密钥"),
        (r'PRIVATE_KEY\s*=\s*["\']', "私钥"),
    ]
    
    def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
        issues = []
        for i, line in enumerate(code.splitlines(), 1):
            stripped = line.strip()
            if stripped.startswith('#') or stripped.startswith('//'):
                continue
            
            for pattern, desc in self.SECRET_PATTERNS:
                if re.search(pattern, line, re.IGNORECASE):
                    issues.append(CodeIssue(
                        severity=self.severity,
                        category=self.category,
                        title=desc,
                        description=f"发现硬编码凭据: {desc}",
                        location=CodeLocation(file_path, i),
                        suggestion="使用环境变量或密钥管理服务",
                        fix_code="os.environ.get('SECRET_KEY')",
                        rule_id=self.rule_id,
                        confidence=0.9
                    ))
        return issues

class EmptyExceptRule(ReviewRule):
    """空异常捕获检测"""
    
    rule_id = "ERR001"
    category = IssueCategory.ERROR_HANDLING
    severity = Severity.MAJOR
    description = "检测空 except 或过于宽泛的异常捕获"
    
    def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
        issues = []
        
        if parsed and parsed.get("tree"):
            for node in ast.walk(parsed["tree"]):
                if isinstance(node, ast.ExceptHandler):
                    if node.type is None:
                        issues.append(CodeIssue(
                            severity=self.severity,
                            category=self.category,
                            title="裸 except 捕获",
                            description="使用裸 except 会捕获所有异常包括 KeyboardInterrupt",
                            location=CodeLocation(file_path, node.lineno),
                            suggestion="指定具体异常类型,如 except ValueError",
                            fix_code="except Exception as e:",
                            rule_id=self.rule_id
                        ))
                    elif isinstance(node.type, ast.Name) and node.type.id == "Exception":
                        # 检查是否只是 pass
                        body_lines = [
                            ast.unparse(n) for n in node.body
                        ] if hasattr(ast, 'unparse') else []
                        if len(body_lines) == 1 and 'pass' in str(body_lines):
                            issues.append(CodeIssue(
                                severity=Severity.MINOR,
                                category=self.category,
                                title="空异常处理",
                                description="捕获异常后只执行 pass,可能隐藏错误",
                                location=CodeLocation(file_path, node.lineno),
                                suggestion="至少记录日志",
                                fix_code="except Exception as e:\n    logger.error(f'Error: {e}')",
                                rule_id=self.rule_id
                            ))
        
        # 正则回退
        if not parsed or not parsed.get("tree"):
            for i, line in enumerate(code.splitlines(), 1):
                if re.match(r'\s*except\s*:', line):
                    issues.append(CodeIssue(
                        severity=self.severity,
                        category=self.category,
                        title="裸 except 捕获",
                        description="使用裸 except 会捕获所有异常",
                        location=CodeLocation(file_path, i),
                        suggestion="指定具体异常类型",
                        rule_id=self.rule_id
                    ))
        
        return issues

class HighComplexityRule(ReviewRule):
    """高复杂度检测"""
    
    rule_id = "COM001"
    category = IssueCategory.COMPLEXITY
    severity = Severity.MAJOR
    description = "检测高圈复杂度的函数"
    
    MAX_COMPLEXITY = 10
    
    def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
        issues = []
        
        if parsed and parsed.get("functions"):
            for func in parsed["functions"]:
                complexity = func.get("complexity", 1)
                if complexity > self.MAX_COMPLEXITY:
                    issues.append(CodeIssue(
                        severity=self.severity,
                        category=self.category,
                        title=f"函数复杂度过高 ({complexity})",
                        description=f"函数 '{func['name']}' 圈复杂度为 {complexity},超过阈值 {self.MAX_COMPLEXITY}",
                        location=CodeLocation(file_path, func["line"]),
                        suggestion="拆分为多个小函数,每个函数只做一件事",
                        rule_id=self.rule_id,
                        confidence=1.0,
                        action=SuggestionAction.REFACTOR
                    ))
        
        return issues

class LongFunctionRule(ReviewRule):
    """过长函数检测"""
    
    rule_id = "COM002"
    category = IssueCategory.MAINTAINABILITY
    severity = Severity.MINOR
    description = "检测过长的函数"
    
    MAX_LINES = 50
    
    def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
        issues = []
        
        if parsed and parsed.get("functions"):
            for func in parsed["functions"]:
                func_lines = (func.get("end_line", func["line"]) - func["line"]) + 1
                if func_lines > self.MAX_LINES:
                    issues.append(CodeIssue(
                        severity=self.severity,
                        category=self.category,
                        title=f"函数过长 ({func_lines} 行)",
                        description=f"函数 '{func['name']}' 有 {func_lines} 行,超过阈值 {self.MAX_LINES}",
                        location=CodeLocation(file_path, func["line"], func.get("end_line", func["line"])),
                        suggestion="将函数拆分为更小的、职责单一的函数",
                        rule_id=self.rule_id,
                        action=SuggestionAction.REFACTOR
                    ))
        
        return issues

class MagicNumberRule(ReviewRule):
    """魔法数字检测"""
    
    rule_id = "STYLE001"
    category = IssueCategory.STYLE
    severity = Severity.MINOR
    description = "检测代码中的魔法数字"
    
    ALLOWED_NUMBERS = {0, 1, -1, 2, 10, 100, 1000, 0.0, 0.5, 1.0}
    
    def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
        issues = []
        
        if parsed and parsed.get("tree"):
            for node in ast.walk(parsed["tree"]):
                if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
                    if node.value not in self.ALLOWED_NUMBERS:
                        # 检查是否在赋值语句中(可能是常量定义)
                        issues.append(CodeIssue(
                            severity=Severity.INFO,
                            category=self.category,
                            title=f"魔法数字: {node.value}",
                            description=f"数字 {node.value} 含义不明确,建议提取为命名常量",
                            location=CodeLocation(file_path, node.lineno),
                            suggestion="提取为命名常量,如 MAX_RETRIES = 3",
                            rule_id=self.rule_id,
                            action=SuggestionAction.CONSIDER
                        ))
        
        return issues

# 规则注册
class RuleRegistry:
    """规则注册中心"""
    
    def __init__(self):
        self.rules: Dict[str, ReviewRule] = {}
    
    def register(self, rule: ReviewRule):
        self.rules[rule.rule_id] = rule
    
    def register_defaults(self):
        """注册默认规则"""
        default_rules = [
            SQLInjectionRule(),
            HardcodedSecretRule(),
            EmptyExceptRule(),
            HighComplexityRule(),
            LongFunctionRule(),
            MagicNumberRule(),
        ]
        for rule in default_rules:
            self.register(rule)
    
    def get_rules(self, categories: List[IssueCategory] = None) -> List[ReviewRule]:
        if categories:
            return [r for r in self.rules.values() if r.category in categories]
        return list(self.rules.values())
    
    def list_rules(self) -> str:
        lines = ["📋 已注册规则:", ""]
        for rule in self.rules.values():
            lines.append(
                f"  {rule.rule_id} [{rule.severity.value}] "
                f"{rule.category.value}: {rule.description}"
            )
        return "\n".join(lines)

# 使用
registry = RuleRegistry()
registry.register_defaults()
print(registry.list_rules())

2.2 规则引擎执行器

# ===== 规则引擎执行器 =====

class RuleEngine:
    """规则引擎执行器"""
    
    def __init__(self, registry: RuleRegistry = None):
        self.registry = registry or RuleRegistry()
        if not self.registry.rules:
            self.registry.register_defaults()
        self.parser = CodeParser()
    
    def review_file(
        self,
        code: str,
        file_path: str,
        rules: List[str] = None,
        categories: List[IssueCategory] = None,
        min_severity: Severity = Severity.INFO
    ) -> ReviewResult:
        """审查单个文件"""
        language = self.parser.detect_language(file_path)
        parsed = self.parser.parse(code, language) if language else None
        
        # 确定要执行的规则
        if rules:
            active_rules = [self.registry.rules[rid] for rid in rules if rid in self.registry.rules]
        elif categories:
            active_rules = self.registry.get_rules(categories)
        else:
            active_rules = self.registry.get_rules()
        
        # 执行所有规则
        all_issues = []
        for rule in active_rules:
            try:
                issues = rule.check(code, file_path, parsed)
                # 过滤严重级别
                severity_order = {
                    Severity.INFO: 0, Severity.MINOR: 1,
                    Severity.MAJOR: 2, Severity.CRITICAL: 3
                }
                min_level = severity_order[min_severity]
                issues = [
                    i for i in issues
                    if severity_order[i.severity] >= min_level
                ]
                all_issues.extend(issues)
            except Exception as e:
                print(f"规则 {rule.rule_id} 执行出错: {e}")
        
        # 计算评分
        score = self._calculate_score(all_issues, len(code.splitlines()))
        
        return ReviewResult(
            file_path=file_path,
            issues=all_issues,
            score=score
        )
    
    def review_diff(
        self,
        diff: str,
        full_code: Dict[str, str] = None
    ) -> List[ReviewResult]:
        """审查 diff"""
        chunks = self.parser.get_diff_chunks(diff)
        results = []
        
        # 按文件分组
        file_changes: Dict[str, str] = {}
        for chunk in chunks:
            file_path = chunk["file"]
            if file_path not in file_changes:
                file_changes[file_path] = []
            file_changes[file_path].extend(chunk["lines"])
        
        for file_path, changes in file_changes.items():
            # 使用完整代码(如果有)或仅变更行
            code = full_code.get(file_path, "") if full_code else ""
            if not code:
                code = "\n".join(
                    c["content"] for c in changes if c["type"] == "added"
                )
            
            result = self.review_file(code, file_path)
            
            # 只保留变更行相关的问题
            added_lines = {c["line_number"] for c in changes if c["type"] == "added"}
            if added_lines:
                result.issues = [
                    i for i in result.issues
                    if i.location.start_line in added_lines or not added_lines
                ]
            
            results.append(result)
        
        return results
    
    def _calculate_score(self, issues: List[CodeIssue], total_lines: int) -> float:
        """计算代码质量评分"""
        if total_lines == 0:
            return 100.0
        
        penalties = {
            Severity.CRITICAL: 15,
            Severity.MAJOR: 5,
            Severity.MINOR: 2,
            Severity.INFO: 0.5
        }
        
        total_penalty = sum(penalties[i.severity] for i in issues)
        max_penalty = total_lines * 0.5  # 每行最多扣 0.5 分
        
        score = max(0, 100 - (total_penalty / max(max_penalty, 1)) * 100)
        return round(score, 1)

# 使用
engine = RuleEngine()

test_code = '''
import os

password = "my_secret_password_123"
api_key = "sk-1234567890abcdef"

def get_user(user_id):
    query = f"SELECT * FROM users WHERE id = {user_id}"
    cursor.execute(query)
    return cursor.fetchone()

def process(data):
    try:
        result = complex_logic(data)
    except:
        pass
    return result

def complex_logic(data):
    if data > 100:
        if data > 200:
            if data > 300:
                if data > 400:
                    if data > 500:
                        return "very high"
                    return "high"
                return "medium high"
            return "medium"
        return "medium low"
    return "low"
'''

result = engine.review_file(test_code, "app.py")
print(result.to_report())

三、LLM 驱动的深度审查

3.1 LLM 审查器

# ===== LLM 驱动的深度审查 =====

from openai import OpenAI

class LLMReviewer:
    """基于 LLM 的深度代码审查"""
    
    def __init__(self, model: str = "gpt-4o", api_key: str = None):
        self.model = model
        self.client = OpenAI(api_key=api_key)
    
    def review(
        self,
        code: str,
        language: str = "python",
        context: str = "",
        focus_areas: List[str] = None
    ) -> ReviewResult:
        """深度审查代码"""
        
        focus_prompt = ""
        if focus_areas:
            focus_prompt = f"\n重点关注以下方面: {', '.join(focus_areas)}"
        
        system_prompt = f"""你是一个资深代码审查专家。请审查以下 {language} 代码。

审查维度:
1. **Bug 和逻辑错误** - 竞态条件、空指针、边界条件
2. **安全漏洞** - 注入、XSS、认证问题、数据泄露
3. **性能问题** - 不必要的计算、内存泄漏、N+1 查询
4. **可维护性** - 代码复杂度、命名、注释
5. **错误处理** - 异常处理、资源清理、超时
6. **最佳实践** - 设计模式、SOLID 原则

输出格式(JSON 数组):
```json
[{{
  "severity": "critical|major|minor|info",
  "category": "bug|security|performance|style|maintainability|error_handling",
  "title": "简短标题",
  "line": 行号,
  "description": "详细描述",
  "suggestion": "修改建议",
  "fix_code": "修复代码(可选)"
}}]

注意:

  • 只报告真实问题,不要过度警告

  • 给出可操作的修复建议

  • 修复代码要完整可运行{focus_prompt}“”"

      user_message = f"请审查以下代码:\n\n```{language}\n{code}\n```"
      if context:
          user_message += f"\n\n上下文信息:\n{context}"
      
      response = self.client.chat.completions.create(
          model=self.model,
          messages=[
              {"role": "system", "content": system_prompt},
              {"role": "user", "content": user_message}
          ],
          response_format={"type": "json_object"},
          temperature=0.1
      )
      
      return self._parse_llm_response(response.choices[0].message.content, "llm_review.py")
    

    def review_diff(
    self,
    diff: str,
    language: str = “python”,
    pr_description: str = “”
    ) -> ReviewResult:
    “”“审查 PR diff”“”

      system_prompt = """你是一个代码审查专家,专门审查 PR/Patch 的变更。
    

重点检查:

  1. 变更是否引入新的 Bug
  2. 是否有安全风险
  3. 是否有性能回归
  4. 变更是否符合 PR 描述
  5. 是否缺少测试

输出 JSON 数组,格式:
[{“severity”, “category”, “title”, “line”, “description”, “suggestion”, “fix_code”}]“”"

    user_message = f"请审查以下 diff:\n\n```diff\n{diff}\n```"
    if pr_description:
        user_message += f"\n\nPR 描述: {pr_description}"
    
    response = self.client.chat.completions.create(
        model=self.model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message}
        ],
        response_format={"type": "json_object"},
        temperature=0.1
    )
    
    return self._parse_llm_response(response.choices[0].message.content, "diff_review")

def _parse_llm_response(self, response_text: str, file_path: str) -> ReviewResult:
    """解析 LLM 响应为 ReviewResult"""
    import json
    
    try:
        data = json.loads(response_text)
        issues_list = data.get("issues", data) if isinstance(data, dict) else data
        
        if not isinstance(issues_list, list):
            issues_list = [issues_list]
    except json.JSONDecodeError:
        # 尝试从文本中提取 JSON
        import re
        json_match = re.search(r'\[.*\]', response_text, re.DOTALL)
        if json_match:
            issues_list = json.loads(json_match.group())
        else:
            issues_list = []
    
    issues = []
    severity_map = {
        "critical": Severity.CRITICAL,
        "major": Severity.MAJOR,
        "minor": Severity.MINOR,
        "info": Severity.INFO
    }
    category_map = {
        "bug": IssueCategory.BUG,
        "security": IssueCategory.SECURITY,
        "performance": IssueCategory.PERFORMANCE,
        "style": IssueCategory.STYLE,
        "maintainability": IssueCategory.MAINTAINABILITY,
        "error_handling": IssueCategory.ERROR_HANDLING,
    }
    
    for item in issues_list:
        if not isinstance(item, dict):
            continue
        
        sev = severity_map.get(
            item.get("severity", "info").lower(),
            Severity.INFO
        )
        cat = category_map.get(
            item.get("category", "bug").lower(),
            IssueCategory.BUG
        )
        
        issues.append(CodeIssue(
            severity=sev,
            category=cat,
            title=item.get("title", "未命名问题"),
            description=item.get("description", ""),
            location=CodeLocation(file_path, item.get("line", 0)),
            suggestion=item.get("suggestion", ""),
            fix_code=item.get("fix_code", ""),
            confidence=0.8
        ))
    
    score = max(0, 100 - sum(
        {Severity.CRITICAL: 15, Severity.MAJOR: 5,
         Severity.MINOR: 2, Severity.INFO: 0.5}[i.severity]
        for i in issues
    ))
    
    return ReviewResult(
        file_path=file_path,
        issues=issues,
        score=round(score, 1)
    )

使用

if name == ‘main’:
reviewer = LLMReviewer()

code = '''

def transfer(from_id, to_id, amount):
from_account = db.get(from_id)
to_account = db.get(to_id)
from_account.balance -= amount
to_account.balance += amount
db.save(from_account)
db.save(to_account)
‘’’

# result = reviewer.review(code, focus_areas=["security", "bug"])
# print(result.to_report())

### 3.2 混合审查器(规则 + LLM)

```python
# ===== 混合审查器 =====

class HybridReviewer:
    """混合代码审查器(规则引擎 + LLM)"""
    
    def __init__(self, model: str = "gpt-4o", api_key: str = None):
        self.rule_engine = RuleEngine()
        self.llm_reviewer = LLMReviewer(model=model, api_key=api_key)
        self.parser = CodeParser()
    
    def review(
        self,
        code: str,
        file_path: str,
        use_llm: bool = True,
        use_rules: bool = True,
        llm_focus: List[str] = None
    ) -> ReviewResult:
        """混合审查"""
        all_issues = []
        
        # 1. 规则引擎审查(快速、确定性)
        if use_rules:
            rule_result = self.rule_engine.review_file(code, file_path)
            all_issues.extend(rule_result.issues)
            print(f"  📏 规则引擎: 发现 {len(rule_result.issues)} 个问题")
        
        # 2. LLM 深度审查(慢速、上下文感知)
        if use_llm:
            language = self.parser.detect_language(file_path) or "python"
            llm_result = self.llm_reviewer.review(
                code, language=language, focus_areas=llm_focus
            )
            
            # 去重:移除规则引擎已发现的问题
            rule_titles = {i.title for i in all_issues}
            new_issues = [
                i for i in llm_result.issues
                if i.title not in rule_titles
            ]
            all_issues.extend(new_issues)
            print(f"  🤖 LLM 审查: 发现 {len(new_issues)} 个新问题(去重后)")
        
        # 计算评分
        total_lines = len(code.splitlines())
        score = self.rule_engine._calculate_score(all_issues, total_lines)
        
        return ReviewResult(
            file_path=file_path,
            issues=all_issues,
            score=score
        )
    
    def review_pr(
        self,
        diff: str,
        full_files: Dict[str, str] = None,
        pr_description: str = ""
    ) -> Dict[str, ReviewResult]:
        """审查整个 PR"""
        results = {}
        
        # 规则引擎审查 diff
        rule_results = self.rule_engine.review_diff(diff, full_files)
        for result in rule_results:
            if result.file_path not in results:
                results[result.file_path] = ReviewResult(
                    file_path=result.file_path
                )
            results[result.file_path].issues.extend(result.issues)
        
        # LLM 审查每个文件
        if full_files:
            for file_path, code in full_files.items():
                language = self.parser.detect_language(file_path) or "python"
                llm_result = self.llm_reviewer.review(
                    code, language=language
                )
                
                if file_path not in results:
                    results[file_path] = ReviewResult(file_path=file_path)
                
                # 去重
                existing_titles = {i.title for i in results[file_path].issues}
                for issue in llm_result.issues:
                    if issue.title not in existing_titles:
                        results[file_path].issues.append(issue)
                        existing_titles.add(issue.title)
        
        # 计算评分
        for file_path, result in results.items():
            code = (full_files or {}).get(file_path, "")
            result.score = self.rule_engine._calculate_score(
                result.issues, len(code.splitlines()) if code else 1
            )
        
        return results
    
    def generate_summary(self, results: Dict[str, ReviewResult]) -> str:
        """生成 PR 审查摘要"""
        total_issues = sum(len(r.issues) for r in results.values())
        critical = sum(r.critical_count for r in results.values())
        major = sum(r.major_count for r in results.values())
        avg_score = sum(r.score for r in results.values()) / max(len(results), 1)
        
        lines = [
            "🔍 PR 审查摘要",
            "=" * 40,
            f"文件数: {len(results)}",
            f"总问题: {total_issues}",
            f"🔴 严重: {critical} | 🟡 主要: {major}",
            f"平均评分: {avg_score:.1f}/100",
            f"{'🚫 建议不合并' if critical > 0 else '✅ 可以合并'}",
            ""
        ]
        
        for file_path, result in results.items():
            if result.issues:
                lines.append(f"📄 {file_path} (评分: {result.score})")
                for issue in result.issues[:5]:
                    icon = "🔴" if issue.severity == Severity.CRITICAL else "🟡" if issue.severity == Severity.MAJOR else "🔵"
                    lines.append(f"  {icon} {issue.title}")
                if len(result.issues) > 5:
                    lines.append(f"  ... 还有 {len(result.issues) - 5} 个问题")
                lines.append("")
        
        return "\n".join(lines)

# 使用
if __name__ == '__main__':
    reviewer = HybridReviewer()
    
    # 模拟 PR 审查
    diff = """diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -10,6 +10,12 @@
+def get_user(user_id):
+    query = f"SELECT * FROM users WHERE id = {user_id}"
+    cursor.execute(query)
+    return cursor.fetchone()
+
+password = "admin123"
"""
    
    full_files = {
        "app.py": '''
password = "admin123"

def get_user(user_id):
    query = f"SELECT * FROM users WHERE id = {user_id}"
    cursor.execute(query)
    return cursor.fetchone()

def process(data):
    try:
        result = transform(data)
    except:
        pass
    return result
'''
    }
    
    # 先用规则引擎
    rule_results = reviewer.rule_engine.review_diff(diff, full_files)
    for r in rule_results:
        print(r.to_report())

四、Git 集成

4.1 Git Hook 审查

# ===== Git Hook 审查 =====

import subprocess
import os
import sys

class GitHookReviewer:
    """Git Hook 代码审查"""
    
    def __init__(self, repo_path: str = "."):
        self.repo_path = repo_path
        self.reviewer = RuleEngine()
    
    def pre_commit_review(self) -> bool:
        """pre-commit 钩子审查"""
        # 获取暂存文件
        staged_files = self._get_staged_files()
        
        if not staged_files:
            return True
        
        print(f"🔍 审查 {len(staged_files)} 个暂存文件...")
        
        has_blockers = False
        for file_path in staged_files:
            language = self.reviewer.parser.detect_language(file_path)
            if not language:
                continue
            
            # 获取暂存内容
            content = self._get_staged_content(file_path)
            if not content:
                continue
            
            result = self.reviewer.review_file(content, file_path)
            
            if result.has_blockers:
                has_blockers = True
                print(f"\n❌ {file_path}: {result.critical_count} 个严重问题")
                for issue in result.issues:
                    if issue.severity == Severity.CRITICAL:
                        print(f"   🔴 {issue.title} @ {issue.location}")
            elif result.issues:
                print(f"⚠️ {file_path}: {len(result.issues)} 个问题(评分 {result.score})")
            else:
                print(f"✅ {file_path}: 无问题")
        
        if has_blockers:
            print("\n🚫 提交被阻止:存在严重问题")
            print("   使用 git commit --no-verify 可跳过检查")
            return False
        
        return True
    
    def _get_staged_files(self) -> List[str]:
        """获取暂存文件列表"""
        result = subprocess.run(
            ["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
            cwd=self.repo_path,
            capture_output=True, text=True
        )
        return [f for f in result.stdout.strip().splitlines() if f]
    
    def _get_staged_content(self, file_path: str) -> str:
        """获取暂存文件内容"""
        result = subprocess.run(
            ["git", "show", f":{file_path}"],
            cwd=self.repo_path,
            capture_output=True, text=True
        )
        return result.stdout

# pre-commit 钩子安装脚本
def install_pre_commit_hook(repo_path: str = "."):
    """安装 pre-commit 钩子"""
    hook_dir = os.path.join(repo_path, ".git", "hooks")
    hook_path = os.path.join(hook_dir, "pre-commit")
    
    hook_script = '''#!/usr/bin/env python3
import sys
sys.path.insert(0, '.')

from code_review import GitHookReviewer

reviewer = GitHookReviewer()
if not reviewer.pre_commit_review():
    sys.exit(1)
'''
    
    with open(hook_path, 'w') as f:
        f.write(hook_script)
    
    os.chmod(hook_path, 0o755)
    print(f"✅ pre-commit 钩子已安装: {hook_path}")

# 使用
if __name__ == '__main__':
    reviewer = GitHookReviewer()
    
    if "--install" in sys.argv:
        install_pre_commit_hook()
    else:
        if not reviewer.pre_commit_review():
            sys.exit(1)

4.2 GitHub PR 审查

# ===== GitHub PR 审查 =====

import aiohttp
import asyncio

class GitHubPRReviewer:
    """GitHub PR 审查机器人"""
    
    def __init__(
        self,
        token: str,
        repo: str,
        reviewer: HybridReviewer = None
    ):
        self.token = token
        self.repo = repo  # owner/repo
        self.reviewer = reviewer or HybridReviewer()
        self.api = "https://api.github.com"
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/vnd.github.v3+json"
        }
    
    async def review_pr(self, pr_number: int) -> Dict:
        """审查 PR"""
        # 获取 PR 信息
        pr_info = await self._get_pr_info(pr_number)
        pr_description = pr_info.get("body", "")
        
        # 获取 PR diff
        diff = await self._get_pr_diff(pr_number)
        
        # 获取变更文件内容
        files = await self._get_pr_files(pr_number)
        full_files = {}
        for f in files:
            if f.get("patch"):
                full_files[f["filename"]] = f["patch"]
        
        # 执行审查
        results = self.reviewer.review_pr(
            diff, full_files, pr_description
        )
        
        # 生成摘要
        summary = self.reviewer.generate_summary(results)
        
        # 发表审查评论
        await self._post_review_comment(pr_number, summary)
        
        # 对具体问题添加行内评论
        for file_path, result in results.items():
            for issue in result.issues:
                if issue.severity in (Severity.CRITICAL, Severity.MAJOR):
                    await self._post_inline_comment(
                        pr_number, file_path,
                        issue.location.start_line,
                        self._format_issue_comment(issue)
                    )
        
        # 设置审查状态
        has_blockers = any(r.has_blockers for r in results.values())
        await self._submit_review(
            pr_number,
            "REQUEST_CHANGES" if has_blockers else "APPROVE",
            summary
        )
        
        return {
            "pr": pr_number,
            "files_reviewed": len(results),
            "total_issues": sum(len(r.issues) for r in results.values()),
            "has_blockers": has_blockers,
            "summary": summary
        }
    
    async def _get_pr_info(self, pr_number: int) -> Dict:
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.api}/repos/{self.repo}/pulls/{pr_number}",
                headers=self.headers
            ) as resp:
                return await resp.json()
    
    async def _get_pr_diff(self, pr_number: int) -> str:
        headers = {**self.headers, "Accept": "application/vnd.github.v3.diff"}
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.api}/repos/{self.repo}/pulls/{pr_number}",
                headers=headers
            ) as resp:
                return await resp.text()
    
    async def _get_pr_files(self, pr_number: int) -> List[Dict]:
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.api}/repos/{self.repo}/pulls/{pr_number}/files",
                headers=self.headers
            ) as resp:
                return await resp.json()
    
    async def _post_review_comment(self, pr_number: int, body: str):
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.api}/repos/{self.repo}/issues/{pr_number}/comments",
                headers=self.headers,
                json={"body": body}
            ) as resp:
                return await resp.json()
    
    async def _post_inline_comment(
        self, pr_number: int, path: str, line: int, body: str
    ):
        # 需要先获取 latest commit
        pr_info = await self._get_pr_info(pr_number)
        commit_id = pr_info.get("head", {}).get("sha", "")
        
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.api}/repos/{self.repo}/pulls/{pr_number}/comments",
                headers=self.headers,
                json={
                    "body": body,
                    "commit_id": commit_id,
                    "path": path,
                    "line": line,
                    "side": "RIGHT"
                }
            )
    
    async def _submit_review(
        self, pr_number: int, event: str, body: str
    ):
        pr_info = await self._get_pr_info(pr_number)
        commit_id = pr_info.get("head", {}).get("sha", "")
        
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.api}/repos/{self.repo}/pulls/{pr_number}/reviews",
                headers=self.headers,
                json={
                    "commit_id": commit_id,
                    "body": body,
                    "event": event
                }
            )
    
    def _format_issue_comment(self, issue: CodeIssue) -> str:
        icon = "🔴" if issue.severity == Severity.CRITICAL else "🟡"
        lines = [
            f"{icon} **{issue.title}**",
            f"",
            f"{issue.description}",
        ]
        if issue.suggestion:
            lines.append(f"\n💡 {issue.suggestion}")
        if issue.fix_code:
            lines.append(f"\n```suggestion\n{issue.fix_code}\n```")
        return "\n".join(lines)

# Webhook 处理器
class GitHubWebhookHandler:
    """GitHub Webhook 处理"""
    
    def __init__(self, reviewer: GitHubPRReviewer):
        self.reviewer = reviewer
    
    async def handle_event(self, event: Dict) -> Optional[Dict]:
        """处理 GitHub Webhook 事件"""
        action = event.get("action")
        
        if event.get("pull_request") and action in ("opened", "synchronize"):
            pr_number = event["pull_request"]["number"]
            print(f"🔍 审查 PR #{pr_number} (action: {action})")
            
            result = await self.reviewer.review_pr(pr_number)
            return result
        
        return None

五、自定义审查规则

5.1 项目级规则配置

# ===== 项目级规则配置 =====

import yaml
import json

@dataclass
class ReviewConfig:
    """审查配置"""
    # 规则开关
    enabled_rules: List[str] = field(default_factory=lambda: [
        "SEC001", "SEC002", "ERR001", "COM001", "COM002", "STYLE001"
    ])
    
    # 严重级别过滤
    min_severity: Severity = Severity.MINOR
    
    # 阈值
    max_complexity: int = 10
    max_function_lines: int = 50
    max_file_lines: int = 500
    max_line_length: int = 120
    
    # 评分
    fail_score: float = 60.0
    
    # 忽略路径
    ignore_paths: List[str] = field(default_factory=lambda: [
        "vendor/", "node_modules/", "__pycache__/",
        "*.min.js", "*.generated.*"
    ])
    
    # 语言特定规则
    language_rules: Dict[str, List[str]] = field(default_factory=dict)
    
    # 自定义规则
    custom_patterns: List[Dict] = field(default_factory=list)
    
    @classmethod
    def from_yaml(cls, path: str) -> 'ReviewConfig':
        """从 YAML 加载配置"""
        with open(path, 'r') as f:
            data = yaml.safe_load(f) or {}
        return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
    
    @classmethod
    def from_dict(cls, data: Dict) -> 'ReviewConfig':
        return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

# YAML 配置示例
CONFIG_YAML = """
# .code-review.yml
enabled_rules:
  - SEC001    # SQL 注入
  - SEC002    # 硬编码密钥
  - ERR001    # 空异常捕获
  - COM001    # 高复杂度
  - COM002    # 过长函数
  - STYLE001  # 魔法数字

min_severity: minor

max_complexity: 8
max_function_lines: 40
max_file_lines: 400
fail_score: 65.0

ignore_paths:
  - vendor/
  - node_modules/
  - "**/*.min.js"
  - "**/*.generated.*"

language_rules:
  python:
    - SEC001
    - ERR001
  javascript:
    - SEC002

custom_patterns:
  - id: CUSTOM001
    name: "禁止 console.log"
    pattern: "console\\.log\\("
    severity: minor
    category: style
    suggestion: "使用 logger 替代 console.log"
  - id: CUSTOM002
    name: "禁止 TODO 遗留"
    pattern: "TODO|FIXME|HACK"
    severity: info
    category: maintainability
    suggestion: "创建 Issue 跟踪待办事项"
"""

class CustomPatternRule(ReviewRule):
    """自定义正则规则"""
    
    def __init__(self, config: Dict):
        self.rule_id = config["id"]
        self.category = IssueCategory(config.get("category", "style"))
        self.severity = Severity(config.get("severity", "minor"))
        self.description = config.get("name", "")
        self.pattern = config["pattern"]
        self.suggestion_text = config.get("suggestion", "")
    
    def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
        issues = []
        for i, line in enumerate(code.splitlines(), 1):
            if re.search(self.pattern, line, re.IGNORECASE):
                issues.append(CodeIssue(
                    severity=self.severity,
                    category=self.category,
                    title=self.description,
                    description=f"匹配规则 {self.rule_id}: {line.strip()}",
                    location=CodeLocation(file_path, i),
                    suggestion=self.suggestion_text,
                    rule_id=self.rule_id
                ))
        return issues

class ConfigurableReviewer:
    """可配置的审查器"""
    
    def __init__(self, config: ReviewConfig = None):
        self.config = config or ReviewConfig()
        self.registry = RuleRegistry()
        self.registry.register_defaults()
        
        # 注册自定义规则
        for custom in self.config.custom_patterns:
            self.registry.register(CustomPatternRule(custom))
    
    def should_review(self, file_path: str) -> bool:
        """判断文件是否需要审查"""
        import fnmatch
        for pattern in self.config.ignore_paths:
            if fnmatch.fnmatch(file_path, pattern):
                return False
            if file_path.startswith(pattern.rstrip("/")):
                return False
        return True
    
    def review(self, code: str, file_path: str) -> ReviewResult:
        """审查代码"""
        if not self.should_review(file_path):
            return ReviewResult(file_path=file_path, score=100, summary="已跳过(忽略路径)")
        
        language = CodeParser().detect_language(file_path)
        
        # 确定要使用的规则
        rules = self.config.enabled_rules
        if language and language in self.config.language_rules:
            rules = self.config.language_rules[language]
        
        engine = RuleEngine(self.registry)
        return engine.review_file(
            code, file_path,
            rules=rules,
            min_severity=self.config.min_severity
        )

# 使用
if __name__ == '__main__':
    # 从 YAML 加载
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
        f.write(CONFIG_YAML)
        config_path = f.name
    
    config = ReviewConfig.from_yaml(config_path)
    reviewer = ConfigurableReviewer(config)
    
    print(f"启用规则: {reviewer.config.enabled_rules}")
    print(f"忽略路径: {reviewer.config.ignore_paths}")
    print(f"自定义规则: {len(reviewer.config.custom_patterns)}")

5.2 团队规则共享

# ===== 团队规则共享 =====

class TeamRuleManager:
    """团队规则管理"""
    
    def __init__(self, config_repo: str = None):
        self.config_repo = config_repo
        self.rules_dir = ".code-review-rules"
    
    def export_rules(self, config: ReviewConfig, output_path: str = ".code-review.yml"):
        """导出规则配置"""
        data = {
            "version": "1.0",
            "enabled_rules": config.enabled_rules,
            "min_severity": config.min_severity.value,
            "max_complexity": config.max_complexity,
            "max_function_lines": config.max_function_lines,
            "ignore_paths": config.ignore_paths,
            "custom_patterns": config.custom_patterns
        }
        
        with open(output_path, 'w') as f:
            yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
        
        print(f"✅ 规则已导出: {output_path}")
    
    def import_rules(self, source: str) -> ReviewConfig:
        """导入规则配置"""
        if source.startswith("http"):
            # 从远程加载
            import urllib.request
            with urllib.request.urlopen(source) as resp:
                content = resp.read().decode()
        else:
            with open(source, 'r') as f:
                content = f.read()
        
        data = yaml.safe_load(content)
        return ReviewConfig.from_dict(data)
    
    def merge_configs(self, base: ReviewConfig, override: ReviewConfig) -> ReviewConfig:
        """合并配置(override 优先)"""
        merged = ReviewConfig()
        
        # 合并规则列表
        base_rules = set(base.enabled_rules)
        override_rules = set(override.enabled_rules)
        merged.enabled_rules = list(base_rules | override_rules)
        
        # override 的阈值优先
        merged.max_complexity = override.max_complexity or base.max_complexity
        merged.max_function_lines = override.max_function_lines or base.max_function_lines
        merged.fail_score = override.fail_score or base.fail_score
        
        # 合并忽略路径
        merged.ignore_paths = list(set(base.ignore_paths + override.ignore_paths))
        
        # 合并自定义规则
        base_custom = {c["id"]: c for c in base.custom_patterns}
        override_custom = {c["id"]: c for c in override.custom_patterns}
        merged.custom_patterns = list({**base_custom, **override_custom}.values())
        
        return merged

# 使用
manager = TeamRuleManager()

# 导出团队标准配置
default_config = ReviewConfig()
manager.export_rules(default_config, "team-code-review.yml")

六、审查报告生成

6.1 多格式报告

# ===== 审查报告生成器 =====

class ReportGenerator:
    """多格式报告生成器"""
    
    @staticmethod
    def to_markdown(results: Dict[str, ReviewResult]) -> str:
        """生成 Markdown 报告"""
        total_issues = sum(len(r.issues) for r in results.values())
        critical = sum(r.critical_count for r in results.values())
        major = sum(r.major_count for r in results.values())
        avg_score = sum(r.score for r in results.values()) / max(len(results), 1)
        
        lines = [
            "# 📋 代码审查报告\n",
            f"**审查时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
            f"**文件数量**: {len(results)}",
            f"**问题总数**: {total_issues}",
            f"**平均评分**: {avg_score:.1f}/100\n",
            "## 概览\n",
            "| 状态 | 数量 |",
            "|------|------|",
            f"| 🔴 严重 | {critical} |",
            f"| 🟡 主要 | {major} |",
            f"| 🔵 次要 | {total_issues - critical - major} |",
            ""
        ]
        
        if critical > 0:
            lines.append("> ⚠️ **存在严重问题,建议修复后再合并**\n")
        
        for file_path, result in results.items():
            if not result.issues:
                continue
            
            lines.append(f"## 📄 {file_path}\n")
            lines.append(f"**评分**: {result.score}/100 | **问题**: {len(result.issues)} 个\n")
            lines.append("| 等级 | 类别 | 问题 | 建议 |")
            lines.append("|------|------|------|------|")
            
            for issue in sorted(result.issues, key=lambda x: x.severity.value):
                icon = {"critical": "🔴", "major": "🟡", "minor": "🔵", "info": "⚪"}[issue.severity.value]
                lines.append(
                    f"| {icon} | {issue.category.value} | "
                    f"{issue.title} | {issue.suggestion[:50]} |"
                )
            lines.append("")
        
        return "\n".join(lines)
    
    @staticmethod
    def to_sarif(results: Dict[str, ReviewResult]) -> Dict:
        """生成 SARIF 格式(GitHub 支持的扫描结果格式)"""
        sarif = {
            "$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/main/sarif-2.1/schema/sarif-schema-2.1.0.json",
            "version": "2.1.0",
            "runs": [{
                "tool": {
                    "driver": {
                        "name": "AI Code Reviewer",
                        "version": "1.0.0",
                        "rules": []
                    }
                },
                "results": []
            }]
        }
        
        rule_index = {}
        for file_path, result in results.items():
            for issue in result.issues:
                if issue.rule_id not in rule_index:
                    rule_index[issue.rule_id] = len(sarif["runs"][0]["tool"]["driver"]["rules"])
                    sarif["runs"][0]["tool"]["driver"]["rules"].append({
                        "id": issue.rule_id,
                        "shortDescription": {"text": issue.title},
                        "properties": {
                            "tags": [issue.category.value]
                        }
                    })
                
                sarif["runs"][0]["results"].append({
                    "ruleId": issue.rule_id,
                    "ruleIndex": rule_index[issue.rule_id],
                    "level": {
                        Severity.CRITICAL: "error",
                        Severity.MAJOR: "warning",
                        Severity.MINOR: "note",
                        Severity.INFO: "none"
                    }[issue.severity],
                    "message": {"text": issue.description},
                    "locations": [{
                        "physicalLocation": {
                            "artifactLocation": {"uri": file_path},
                            "region": {"startLine": issue.location.start_line}
                        }
                    }]
                })
        
        return sarif
    
    @staticmethod
    def to_json(results: Dict[str, ReviewResult]) -> str:
        """生成 JSON 报告"""
        data = {}
        for file_path, result in results.items():
            data[file_path] = {
                "score": result.score,
                "issues": [i.to_dict() for i in result.issues],
                "reviewed_at": result.reviewed_at
            }
        return json.dumps(data, ensure_ascii=False, indent=2)

# 使用
if __name__ == '__main__':
    generator = ReportGenerator()
    
    # 模拟结果
    results = {
        "app.py": ReviewResult(
            file_path="app.py",
            score=72,
            issues=[
                CodeIssue(
                    severity=Severity.CRITICAL,
                    category=IssueCategory.SECURITY,
                    title="SQL 注入",
                    description="直接拼接用户输入",
                    location=CodeLocation("app.py", 42),
                    suggestion="参数化查询"
                )
            ]
        )
    }
    
    # Markdown
    print(generator.to_markdown(results))
    
    # SARIF
    sarif = generator.to_sarif(results)
    print(f"\nSARIF 规则数: {len(sarif['runs'][0]['tool']['driver']['rules'])}")

6.2 趋势追踪

# ===== 审查趋势追踪 =====

import json
from datetime import datetime, timedelta

class ReviewTracker:
    """审查趋势追踪"""
    
    def __init__(self, storage_path: str = ".review-history.json"):
        self.storage_path = storage_path
        self.history = self._load()
    
    def _load(self) -> Dict:
        """加载历史记录"""
        try:
            with open(self.storage_path, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {"reviews": []}
    
    def _save(self):
        """保存历史记录"""
        with open(self.storage_path, 'w') as f:
            json.dump(self.history, f, ensure_ascii=False, indent=2)
    
    def record(self, results: Dict[str, ReviewResult], commit_hash: str = ""):
        """记录审查结果"""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "commit": commit_hash,
            "files_reviewed": len(results),
            "total_issues": sum(len(r.issues) for r in results.values()),
            "critical_issues": sum(r.critical_count for r in results.values()),
            "avg_score": sum(r.score for r in results.values()) / max(len(results), 1),
            "files": {
                fp: {"score": r.score, "issues": len(r.issues)}
                for fp, r in results.items()
            }
        }
        
        self.history["reviews"].append(entry)
        self._save()
    
    def get_trend(self, days: int = 30) -> Dict:
        """获取趋势数据"""
        cutoff = (datetime.now() - timedelta(days=days)).isoformat()
        
        recent = [
            r for r in self.history["reviews"]
            if r["timestamp"] >= cutoff
        ]
        
        if not recent:
            return {"trend": "no_data", "days": days}
        
        scores = [r["avg_score"] for r in recent]
        issues = [r["total_issues"] for r in recent]
        criticals = [r["critical_issues"] for r in recent]
        
        return {
            "days": days,
            "reviews": len(recent),
            "score": {
                "current": scores[-1],
                "avg": sum(scores) / len(scores),
                "trend": "improving" if scores[-1] > scores[0] else "declining"
            },
            "issues": {
                "total": sum(issues),
                "avg_per_review": sum(issues) / len(issues),
                "critical_total": sum(criticals)
            }
        }
    
    def get_top_issues(self, limit: int = 10) -> List[Dict]:
        """获取最常见问题"""
        issue_counts: Dict[str, int] = {}
        
        for review in self.history["reviews"]:
            for file_path, file_data in review.get("files", {}).items():
                # 简化统计
                key = file_path
                issue_counts[key] = issue_counts.get(key, 0) + file_data.get("issues", 0)
        
        sorted_issues = sorted(issue_counts.items(), key=lambda x: x[1], reverse=True)
        return [
            {"file": f, "issue_count": c}
            for f, c in sorted_issues[:limit]
        ]

# 使用
tracker = ReviewTracker()

七、生产案例

7.1 CI/CD 集成

# ===== CI/CD 集成 =====

class CIReviewer:
    """CI/CD 流水线集成"""
    
    def __init__(self, config_path: str = ".code-review.yml"):
        self.config = ReviewConfig.from_yaml(config_path)
        self.reviewer = ConfigurableReviewer(self.config)
        self.tracker = ReviewTracker()
        self.reporter = ReportGenerator()
    
    def run(self) -> bool:
        """运行 CI 审查"""
        print("=" * 50)
        print("🔍 AI 代码审查 - CI 模式")
        print("=" * 50)
        
        # 获取变更文件
        changed_files = self._get_changed_files()
        print(f"\n📁 变更文件: {len(changed_files)} 个")
        
        results = {}
        for file_path in changed_files:
            if not self.reviewer.should_review(file_path):
                continue
            
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    code = f.read()
            except (UnicodeDecodeError, FileNotFoundError):
                continue
            
            result = self.reviewer.review(code, file_path)
            results[file_path] = result
            
            status = "✅" if not result.has_blockers else "❌"
            print(f"  {status} {file_path}: 评分 {result.score}, 问题 {len(result.issues)} 个")
        
        # 生成报告
        report = self.reporter.to_markdown(results)
        with open("review-report.md", 'w') as f:
            f.write(report)
        print(f"\n📊 报告: review-report.md")
        
        # 记录历史
        self.tracker.record(results)
        
        # 判断是否通过
        has_blockers = any(r.has_blockers for r in results.values())
        avg_score = sum(r.score for r in results.values()) / max(len(results), 1)
        
        passed = not has_blockers and avg_score >= self.config.fail_score
        
        print(f"\n{'✅ 审查通过' if passed else '🚫 审查未通过'}")
        print(f"  平均评分: {avg_score:.1f} (阈值: {self.config.fail_score})")
        print(f"  严重问题: {sum(r.critical_count for r in results.values())}")
        
        return passed
    
    def _get_changed_files(self) -> List[str]:
        """获取变更文件"""
        # 尝试 git diff
        try:
            result = subprocess.run(
                ["git", "diff", "--name-only", "HEAD~1"],
                capture_output=True, text=True
            )
            if result.returncode == 0 and result.stdout.strip():
                return result.stdout.strip().splitlines()
        except Exception:
            pass
        
        # 回退:扫描所有文件
        files = []
        for root, dirs, filenames in os.walk("."):
            dirs[:] = [d for d in dirs if d not in {
                ".git", "node_modules", "__pycache__", "venv", ".venv"
            }]
            for f in filenames:
                filepath = os.path.join(root, f)
                if self.reviewer.should_review(filepath):
                    files.append(filepath)
        
        return files

# GitHub Actions 工作流
GITHUB_ACTIONS_YML = """
name: AI Code Review

on:
  pull_request:
    types: [opened, synchronize]

jobs:
  review:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 2
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install openai pyyaml
      
      - name: Run AI Code Review
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
        run: python -m code_review --ci
      
      - name: Upload Report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: review-report
          path: review-report.md
"""

# GitLab CI 配置
GITLAB_CI_YML = """
ai-code-review:
  stage: test
  image: python:3.11
  before_script:
    - pip install openai pyyaml
  script:
    - python -m code_review --ci
  artifacts:
    paths:
      - review-report.md
    when: always
  variables:
    OPENAI_API_KEY: $OPENAI_API_KEY
"""

# 使用
if __name__ == '__main__':
    ci = CIReviewer()
    success = ci.run()
    sys.exit(0 if success else 1)

7.2 VS Code 扩展

# ===== VS Code 扩展后端 =====

class VSCodeReviewServer:
    """VS Code 代码审查扩展后端"""
    
    def __init__(self):
        self.reviewer = ConfigurableReviewer()
        self.llm_reviewer = LLMReviewer()
        self._debounce_timer = None
        self._cache: Dict[str, ReviewResult] = {}
    
    def review_on_save(self, file_path: str, code: str) -> Dict:
        """保存时审查"""
        result = self.reviewer.review(code, file_path)
        self._cache[file_path] = result
        
        diagnostics = []
        for issue in result.issues:
            diagnostics.append({
                "severity": self._to_vscode_severity(issue.severity),
                "message": f"[{issue.rule_id}] {issue.title}: {issue.description}",
                "range": {
                    "startLine": issue.location.start_line - 1,
                    "startColumn": issue.location.start_column,
                    "endLine": issue.location.end_line - 1,
                    "endColumn": issue.location.end_column or 999
                },
                "source": "AI Code Review",
                "code": issue.rule_id,
                "relatedInformation": [
                    {
                        "message": f"建议: {issue.suggestion}",
                        "location": {
                            "uri": file_path,
                            "range": {
                                "startLine": issue.location.start_line - 1,
                                "startColumn": 0,
                                "endLine": issue.location.start_line - 1,
                                "endColumn": 999
                            }
                        }
                    }
                ] if issue.suggestion else []
            })
        
        return {
            "diagnostics": diagnostics,
            "score": result.score,
            "issueCount": len(result.issues)
        }
    
    async def deep_review(self, file_path: str, code: str) -> Dict:
        """深度审查(LLM)"""
        language = CodeParser().detect_language(file_path) or "python"
        result = self.llm_reviewer.review(code, language=language)
        
        return {
            "issues": [i.to_dict() for i in result.issues],
            "score": result.score,
            "summary": result.to_report()
        }
    
    def get_quick_fix(self, file_path: str, issue_index: int) -> Optional[Dict]:
        """获取快速修复"""
        result = self._cache.get(file_path)
        if not result or issue_index >= len(result.issues):
            return None
        
        issue = result.issues[issue_index]
        if not issue.fix_code:
            return None
        
        return {
            "title": f"修复: {issue.title}",
            "edit": {
                "range": {
                    "startLine": issue.location.start_line - 1,
                    "endLine": issue.location.end_line - 1
                },
                "newText": issue.fix_code
            }
        }
    
    @staticmethod
    def _to_vscode_severity(severity: Severity) -> int:
        mapping = {
            Severity.CRITICAL: 0,  # Error
            Severity.MAJOR: 1,     # Warning
            Severity.MINOR: 2,     # Information
            Severity.INFO: 3       # Hint
        }
        return mapping[severity]

# VS Code extension.js 示例
VSCODE_EXTENSION_JS = """
const vscode = require('vscode');

function activate(context) {
    const diagnosticCollection = vscode.languages.createDiagnosticCollection('ai-code-review');
    
    // 保存时审查
    vscode.workspace.onDidSaveTextDocument(async (doc) => {
        const result = await reviewFile(doc.uri.fsPath, doc.getText());
        
        const diagnostics = result.diagnostics.map(d => {
            const range = new vscode.Range(
                d.range.startLine, d.range.startColumn,
                d.range.endLine, d.range.endColumn
            );
            const diag = new vscode.Diagnostic(
                range, d.message, d.severity
            );
            diag.source = d.source;
            diag.code = d.code;
            return diag;
        });
        
        diagnosticCollection.set(doc.uri, diagnostics);
        
        // 状态栏显示评分
        statusBarItem.text = `$(shield) ${result.score}/100`;
        statusBarItem.show();
    });
    
    // 深度审查命令
    const deepReviewCmd = vscode.commands.registerCommand(
        'aiCodeReview.deepReview',
        async () => {
            const editor = vscode.window.activeTextEditor;
            if (!editor) return;
            
            vscode.window.withProgress(
                { location: vscode.ProgressLocation.Notification, title: 'AI 深度审查中...' },
                async () => {
                    const result = await deepReview(
                        editor.document.uri.fsPath,
                        editor.document.getText()
                    );
                    
                    // 在侧边面板显示结果
                    reviewPanel.update(result);
                }
            );
        }
    );
    
    context.subscriptions.push(deepReviewCmd);
}
"""

7.3 全栈审查平台

# ===== 全栈审查平台 =====

from http.server import HTTPServer, BaseHTTPRequestHandler
import json

class CodeReviewPlatform:
    """代码审查平台"""
    
    def __init__(self):
        self.rule_reviewer = ConfigurableReviewer()
        self.llm_reviewer = LLMReviewer()
        self.tracker = ReviewTracker()
        self.reporter = ReportGenerator()
    
    def review_project(self, project_path: str) -> Dict:
        """审查整个项目"""
        results = {}
        
        for root, dirs, files in os.walk(project_path):
            # 跳过忽略目录
            dirs[:] = [d for d in dirs if d not in {
                ".git", "node_modules", "__pycache__", "venv",
                ".venv", "dist", "build", ".idea", ".vscode"
            }]
            
            for filename in files:
                file_path = os.path.join(root, filename)
                rel_path = os.path.relpath(file_path, project_path)
                
                if not self.rule_reviewer.should_review(rel_path):
                    continue
                
                try:
                    with open(file_path, 'r', encoding='utf-8') as f:
                        code = f.read()
                except (UnicodeDecodeError, FileNotFoundError):
                    continue
                
                result = self.rule_reviewer.review(code, rel_path)
                if result.issues:
                    results[rel_path] = result
        
        # 记录历史
        self.tracker.record(results)
        
        # 统计
        total_issues = sum(len(r.issues) for r in results.values())
        avg_score = sum(r.score for r in results.values()) / max(len(results), 1)
        
        # 分类统计
        category_stats = {}
        severity_stats = {}
        for result in results.values():
            for issue in result.issues:
                category_stats[issue.category.value] = category_stats.get(issue.category.value, 0) + 1
                severity_stats[issue.severity.value] = severity_stats.get(issue.severity.value, 0) + 1
        
        return {
            "project_path": project_path,
            "files_reviewed": len(results),
            "total_issues": total_issues,
            "avg_score": round(avg_score, 1),
            "category_stats": category_stats,
            "severity_stats": severity_stats,
            "results": {fp: {"score": r.score, "issues": len(r.issues)} for fp, r in results.items()},
            "report_markdown": self.reporter.to_markdown(results)
        }

# API 服务
class ReviewAPIHandler(BaseHTTPRequestHandler):
    """代码审查 API 服务"""
    
    platform = CodeReviewPlatform()
    
    def do_POST(self):
        if self.path == "/api/review/file":
            self._handle_file_review()
        elif self.path == "/api/review/diff":
            self._handle_diff_review()
        elif self.path == "/api/review/project":
            self._handle_project_review()
    
    def _handle_file_review(self):
        content_length = int(self.headers.get('Content-Length', 0))
        body = json.loads(self.rfile.read(content_length))
        
        code = body.get("code", "")
        file_path = body.get("file_path", "unknown")
        use_llm = body.get("use_llm", False)
        
        if use_llm:
            language = CodeParser().detect_language(file_path) or "python"
            result = self.platform.llm_reviewer.review(code, language=language)
        else:
            result = self.platform.rule_reviewer.review(code, file_path)
        
        self._json_response({
            "score": result.score,
            "issues": [i.to_dict() for i in result.issues],
            "report": result.to_report()
        })
    
    def _handle_diff_review(self):
        content_length = int(self.headers.get('Content-Length', 0))
        body = json.loads(self.rfile.read(content_length))
        
        diff = body.get("diff", "")
        
        engine = RuleEngine()
        results = engine.review_diff(diff)
        
        self._json_response({
            "files_reviewed": len(results),
            "results": [
                {"file": r.file_path, "score": r.score, "issues": len(r.issues)}
                for r in results
            ]
        })
    
    def _handle_project_review(self):
        content_length = int(self.headers.get('Content-Length', 0))
        body = json.loads(self.rfile.read(content_length))
        
        project_path = body.get("project_path", ".")
        summary = self.platform.review_project(project_path)
        
        self._json_response(summary)
    
    def _json_response(self, data: dict, status: int = 200):
        self.send_response(status)
        self.send_header("Content-Type", "application/json")
        self.end_headers()
        self.wfile.write(json.dumps(data, ensure_ascii=False).encode())

# 启动服务
if __name__ == '__main__':
    server = HTTPServer(('0.0.0.0', 8765), ReviewAPIHandler)
    print("🔍 AI 代码审查平台启动: http://localhost:8765")
    print("  POST /api/review/file   - 审查文件")
    print("  POST /api/review/diff   - 审查 Diff")
    print("  POST /api/review/project - 审查项目")
    server.serve_forever()

八、总结

8.1 AI 代码审查架构全景

输出

审查引擎

输入源

IDE 保存

Git Hook

PR/MR

CI/CD

规则引擎

结果合并

LLM 审查

去重

评分

审查报告

诊断信息

PR 评论

趋势追踪

8.2 审查维度总览

维度 规则引擎 LLM 示例
Bug 检测 ⚠️ 基础 ✅ 深度 竞态条件、空指针
安全漏洞 ✅ 强 ✅ 强 SQL 注入、硬编码密钥
性能问题 ❌ 有限 ✅ 强 N+1、内存泄漏
代码风格 ✅ 强 ⚠️ 一般 魔法数字、命名
复杂度 ✅ 精确 ⚠️ 一般 圈复杂度、函数长度
错误处理 ✅ 规则 ✅ 上下文 空 except、资源泄漏
最佳实践 ❌ 有限 ✅ 强 设计模式、SOLID

8.3 最佳实践

实践 说明
分层审查 规则引擎快速扫描 + LLM 深度审查
增量审查 只审查变更部分,减少开销
可配置 项目级规则配置,团队共享
去重 规则引擎和 LLM 结果合并去重
快速修复 提供可操作的修复代码
趋势追踪 长期跟踪代码质量变化
CI 集成 自动化审查,阻断低质量代码

8.4 方案对比

方案 速度 深度 成本 准确率
规则引擎 ⚡ 毫秒 免费 高(低误报)
LLM 审查 🚀 秒级 按调用 中(可能误报)
混合方案 🚀 秒级 最深 最高
人工审查 🐢 小时 最深 人力 最高

本文涵盖 AI 代码审查的完整技术栈:从规则引擎到 LLM 深度审查,从 Git Hook 到 CI/CD 集成,从 IDE 扩展到全栈平台。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐