AI 代码审查助手实战
前言
💡 痛点:代码审查耗时耗力?漏掉安全漏洞?风格不一致?新人代码难以把关?
🎯 解决方案:构建 AI 代码审查助手 — 自动检测 Bug、安全漏洞、性能问题、风格规范,让 Code Review 效率提升 10 倍。
AI 代码审查能做什么?
审查维度对比:
| 维度 | 传统 Lint | 人工审查 | AI 审查 |
|---|---|---|---|
| Bug 检测 | ❌ 有限 | ✅ 强 | ✅ 强 |
| 安全漏洞 | ⚠️ 规则库 | ✅ 经验 | ✅ 深度分析 |
| 性能问题 | ❌ 无 | ⚠️ 依赖经验 | ✅ 自动分析 |
| 风格规范 | ✅ 强 | ⚠️ 主观 | ✅ 可配置 |
| 逻辑错误 | ❌ 无 | ✅ 强 | ✅ 较强 |
| 上下文理解 | ❌ 无 | ✅ 强 | ✅ 中等 |
| 速度 | ⚡ 毫秒 | 🐢 小时 | 🚀 秒级 |
一、代码审查基础框架
1.1 审查结果数据模型
# ===== 代码审查数据模型 =====
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional
from datetime import datetime
class Severity(Enum):
"""严重等级"""
CRITICAL = "critical" # 必须修复:安全漏洞、数据丢失风险
MAJOR = "major" # 应该修复:Bug、性能问题
MINOR = "minor" # 建议修复:代码风格、可读性
INFO = "info" # 信息提示:最佳实践建议
class IssueCategory(Enum):
"""问题分类"""
BUG = "bug" # Bug 和逻辑错误
SECURITY = "security" # 安全漏洞
PERFORMANCE = "performance" # 性能问题
STYLE = "style" # 代码风格
MAINTAINABILITY = "maintainability" # 可维护性
COMPLEXITY = "complexity" # 复杂度
ERROR_HANDLING = "error_handling" # 错误处理
TESTING = "testing" # 测试覆盖
class SuggestionAction(Enum):
"""建议操作"""
FIX = "fix" # 需要修复
CONSIDER = "consider" # 建议考虑
REFACTOR = "refactor" # 建议重构
DOCUMENT = "document" # 需要文档
TEST = "test" # 需要测试
@dataclass
class CodeLocation:
"""代码位置"""
file: str
start_line: int
end_line: int = 0
start_column: int = 0
end_column: int = 0
def __post_init__(self):
if self.end_line == 0:
self.end_line = self.start_line
def __str__(self):
if self.start_line == self.end_line:
return f"{self.file}:{self.start_line}"
return f"{self.file}:{self.start_line}-{self.end_line}"
@dataclass
class CodeIssue:
"""代码问题"""
severity: Severity
category: IssueCategory
title: str
description: str
location: CodeLocation
code_snippet: str = ""
suggestion: str = ""
fix_code: str = ""
rule_id: str = ""
confidence: float = 1.0
action: SuggestionAction = SuggestionAction.FIX
def to_dict(self) -> dict:
return {
"severity": self.severity.value,
"category": self.category.value,
"title": self.title,
"description": self.description,
"location": str(self.location),
"suggestion": self.suggestion,
"fix_code": self.fix_code,
"rule_id": self.rule_id,
"confidence": self.confidence,
"action": self.action.value
}
@dataclass
class ReviewResult:
"""审查结果"""
file_path: str
issues: List[CodeIssue] = field(default_factory=list)
summary: str = ""
score: float = 0.0 # 代码质量评分 0-100
reviewed_at: str = field(default_factory=lambda: datetime.now().isoformat())
@property
def critical_count(self) -> int:
return sum(1 for i in self.issues if i.severity == Severity.CRITICAL)
@property
def major_count(self) -> int:
return sum(1 for i in self.issues if i.severity == Severity.MAJOR)
@property
def has_blockers(self) -> bool:
return self.critical_count > 0
def add_issue(self, issue: CodeIssue):
self.issues.append(issue)
def to_report(self) -> str:
"""生成文本报告"""
lines = [
f"📋 代码审查报告: {self.file_path}",
f" 评分: {self.score}/100",
f" 问题: {len(self.issues)} 个",
f" 严重: {self.critical_count} | 主要: {self.major_count}",
f" {'🚫 阻断合并' if self.has_blockers else '✅ 可以合并'}",
""
]
for issue in sorted(self.issues, key=lambda x: x.severity.value):
icon = {
Severity.CRITICAL: "🔴",
Severity.MAJOR: "🟡",
Severity.MINOR: "🔵",
Severity.INFO: "⚪"
}[issue.severity]
lines.append(f"{icon} [{issue.category.value}] {issue.title}")
lines.append(f" 位置: {issue.location}")
lines.append(f" 说明: {issue.description}")
if issue.suggestion:
lines.append(f" 建议: {issue.suggestion}")
if issue.fix_code:
lines.append(f" 修复: {issue.fix_code}")
lines.append("")
return "\n".join(lines)
# 使用示例
result = ReviewResult(file_path="app.py", score=72)
result.add_issue(CodeIssue(
severity=Severity.CRITICAL,
category=IssueCategory.SECURITY,
title="SQL 注入漏洞",
description="直接拼接用户输入到 SQL 查询中",
location=CodeLocation("app.py", 42),
suggestion="使用参数化查询",
fix_code="cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))",
rule_id="SEC001"
))
result.add_issue(CodeIssue(
severity=Severity.MAJOR,
category=IssueCategory.BUG,
title="未处理异常",
description="数据库查询可能抛出异常但未捕获",
location=CodeLocation("app.py", 45),
suggestion="添加 try-except 块"
))
result.add_issue(CodeIssue(
severity=Severity.MINOR,
category=IssueCategory.STYLE,
title="变量命名不规范",
description="变量 'tmp' 含义不清",
location=CodeLocation("app.py", 30),
suggestion="使用更具描述性的名称",
action=SuggestionAction.CONSIDER
))
print(result.to_report())
1.2 代码解析器
# ===== 代码解析器 =====
import ast
import re
import os
from typing import List, Dict, Optional
class CodeParser:
"""代码解析器"""
SUPPORTED_EXTENSIONS = {
'.py': 'python',
'.js': 'javascript',
'.ts': 'typescript',
'.java': 'java',
'.go': 'go',
'.rs': 'rust',
'.cpp': 'cpp',
'.c': 'c',
'.rb': 'ruby',
'.php': 'php'
}
def __init__(self):
self.parsers = {
'python': self._parse_python,
'javascript': self._parse_js,
'typescript': self._parse_js,
}
def detect_language(self, file_path: str) -> Optional[str]:
"""检测编程语言"""
_, ext = os.path.splitext(file_path)
return self.SUPPORTED_EXTENSIONS.get(ext)
def parse(self, code: str, language: str) -> Dict:
"""解析代码"""
parser = self.parsers.get(language, self._parse_generic)
return parser(code)
def _parse_python(self, code: str) -> Dict:
"""解析 Python 代码"""
try:
tree = ast.parse(code)
except SyntaxError as e:
return {"error": str(e), "language": "python"}
functions = []
classes = []
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
functions.append({
"name": node.name,
"line": node.lineno,
"end_line": getattr(node, 'end_lineno', node.lineno),
"args": [a.arg for a in node.args.args],
"decorators": [
d.id if isinstance(d, ast.Name) else str(d)
for d in node.decorator_list
],
"is_async": isinstance(node, ast.AsyncFunctionDef),
"complexity": self._calculate_complexity(node)
})
elif isinstance(node, ast.ClassDef):
classes.append({
"name": node.name,
"line": node.lineno,
"bases": [
b.id if isinstance(b, ast.Name) else str(b)
for b in node.bases
],
"methods": [
n.name for n in node.body
if isinstance(n, ast.FunctionDef)
]
})
elif isinstance(node, (ast.Import, ast.ImportFrom)):
module = getattr(node, 'module', '') or ''
names = [a.name for a in node.names]
imports.append({"module": module, "names": names, "line": node.lineno})
return {
"language": "python",
"functions": functions,
"classes": classes,
"imports": imports,
"lines": len(code.splitlines()),
"tree": tree
}
def _parse_js(self, code: str) -> Dict:
"""解析 JavaScript/TypeScript 代码(正则简易版)"""
functions = re.findall(
r'(?:async\s+)?function\s+(\w+)\s*\(([^)]*)\)|'
r'(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\(([^)]*)\)\s*=>',
code
)
return {
"language": "javascript",
"functions": [
{"name": f[0] or f[2], "args": (f[1] or f[3]).split(",")}
for f in functions if f[0] or f[2]
],
"lines": len(code.splitlines())
}
def _parse_generic(self, code: str) -> Dict:
"""通用解析"""
return {
"language": "unknown",
"lines": len(code.splitlines()),
"chars": len(code),
"blank_lines": sum(1 for line in code.splitlines() if not line.strip()),
"long_lines": sum(1 for line in code.splitlines() if len(line) > 120)
}
def _calculate_complexity(self, node: ast.AST) -> int:
"""计算圈复杂度"""
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
return complexity
def get_diff_chunks(self, diff: str) -> List[Dict]:
"""解析 diff 为变更块"""
chunks = []
current_file = None
current_lines = []
for line in diff.splitlines():
if line.startswith('--- a/') or line.startswith('+++ b/'):
if line.startswith('+++ b/'):
current_file = line[6:]
continue
elif line.startswith('@@'):
if current_lines and current_file:
chunks.append({
"file": current_file,
"lines": current_lines
})
current_lines = []
# 解析行号
match = re.search(r'\+(\d+)', line)
start_line = int(match.group(1)) if match else 0
current_lines = []
elif line.startswith('+') and not line.startswith('+++'):
current_lines.append({
"type": "added",
"content": line[1:],
"line_number": len(current_lines) + start_line
})
elif line.startswith('-') and not line.startswith('---'):
current_lines.append({
"type": "removed",
"content": line[1:],
"line_number": 0
})
if current_lines and current_file:
chunks.append({
"file": current_file,
"lines": current_lines
})
return chunks
# 使用
parser = CodeParser()
sample_code = '''
import os
import json
def process_data(data: dict) -> list:
results = []
for key, value in data.items():
if value > 10:
results.append({"key": key, "value": value})
elif value > 5:
results.append({"key": key, "value": value * 2})
return results
class DataProcessor:
def __init__(self, config):
self.config = config
def run(self):
data = self.load_data()
return process_data(data)
def load_data(self):
return {"a": 15, "b": 8, "c": 3}
'''
parsed = parser.parse(sample_code, 'python')
print(f"语言: {parsed['language']}")
print(f"行数: {parsed['lines']}")
print(f"函数: {[f['name'] for f in parsed['functions']]}")
print(f"类: {[c['name'] for c in parsed['classes']]}")
print(f"导入: {[i['module'] for i in parsed['imports']]}")
二、规则引擎
2.1 基于规则的审查器
# ===== 规则引擎 =====
from abc import ABC, abstractmethod
import re
import ast
class ReviewRule(ABC):
"""审查规则基类"""
rule_id: str = ""
category: IssueCategory = IssueCategory.BUG
severity: Severity = Severity.MAJOR
description: str = ""
@abstractmethod
def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
"""检查代码,返回问题列表"""
pass
class SQLInjectionRule(ReviewRule):
"""SQL 注入检测"""
rule_id = "SEC001"
category = IssueCategory.SECURITY
severity = Severity.CRITICAL
description = "检测 SQL 注入漏洞"
SQL_PATTERNS = [
r'execute\s*\(\s*f["\'].*SELECT.*{',
r'execute\s*\(\s*["\'].*SELECT.*%[sd]',
r'execute\s*\(\s*["\'].*SELECT.*\+',
r'cursor\.execute\s*\(\s*f["\']',
]
def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
issues = []
for i, line in enumerate(code.splitlines(), 1):
for pattern in self.SQL_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
issues.append(CodeIssue(
severity=self.severity,
category=self.category,
title="SQL 注入风险",
description=f"检测到可能的 SQL 注入: {line.strip()}",
location=CodeLocation(file_path, i),
code_snippet=line.strip(),
suggestion="使用参数化查询替代字符串拼接",
fix_code=self._generate_fix(line),
rule_id=self.rule_id,
confidence=0.85
))
return issues
def _generate_fix(self, line: str) -> str:
if 'f"' in line or "f'" in line:
return line.replace('f"', '"').replace("f'", "'")
return "# 使用参数化查询: cursor.execute(sql, (param1, param2))"
class HardcodedSecretRule(ReviewRule):
"""硬编码密钥检测"""
rule_id = "SEC002"
category = IssueCategory.SECURITY
severity = Severity.CRITICAL
description = "检测硬编码的密钥和密码"
SECRET_PATTERNS = [
(r'password\s*=\s*["\'][^"\']{4,}["\']', "硬编码密码"),
(r'api_key\s*=\s*["\'][^"\']{8,}["\']', "硬编码 API Key"),
(r'secret\s*=\s*["\'][^"\']{8,}["\']', "硬编码 Secret"),
(r'token\s*=\s*["\'][^"\']{16,}["\']', "硬编码 Token"),
(r'AWS_SECRET_ACCESS_KEY\s*=\s*["\']', "AWS 密钥"),
(r'PRIVATE_KEY\s*=\s*["\']', "私钥"),
]
def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
issues = []
for i, line in enumerate(code.splitlines(), 1):
stripped = line.strip()
if stripped.startswith('#') or stripped.startswith('//'):
continue
for pattern, desc in self.SECRET_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
issues.append(CodeIssue(
severity=self.severity,
category=self.category,
title=desc,
description=f"发现硬编码凭据: {desc}",
location=CodeLocation(file_path, i),
suggestion="使用环境变量或密钥管理服务",
fix_code="os.environ.get('SECRET_KEY')",
rule_id=self.rule_id,
confidence=0.9
))
return issues
class EmptyExceptRule(ReviewRule):
"""空异常捕获检测"""
rule_id = "ERR001"
category = IssueCategory.ERROR_HANDLING
severity = Severity.MAJOR
description = "检测空 except 或过于宽泛的异常捕获"
def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
issues = []
if parsed and parsed.get("tree"):
for node in ast.walk(parsed["tree"]):
if isinstance(node, ast.ExceptHandler):
if node.type is None:
issues.append(CodeIssue(
severity=self.severity,
category=self.category,
title="裸 except 捕获",
description="使用裸 except 会捕获所有异常包括 KeyboardInterrupt",
location=CodeLocation(file_path, node.lineno),
suggestion="指定具体异常类型,如 except ValueError",
fix_code="except Exception as e:",
rule_id=self.rule_id
))
elif isinstance(node.type, ast.Name) and node.type.id == "Exception":
# 检查是否只是 pass
body_lines = [
ast.unparse(n) for n in node.body
] if hasattr(ast, 'unparse') else []
if len(body_lines) == 1 and 'pass' in str(body_lines):
issues.append(CodeIssue(
severity=Severity.MINOR,
category=self.category,
title="空异常处理",
description="捕获异常后只执行 pass,可能隐藏错误",
location=CodeLocation(file_path, node.lineno),
suggestion="至少记录日志",
fix_code="except Exception as e:\n logger.error(f'Error: {e}')",
rule_id=self.rule_id
))
# 正则回退
if not parsed or not parsed.get("tree"):
for i, line in enumerate(code.splitlines(), 1):
if re.match(r'\s*except\s*:', line):
issues.append(CodeIssue(
severity=self.severity,
category=self.category,
title="裸 except 捕获",
description="使用裸 except 会捕获所有异常",
location=CodeLocation(file_path, i),
suggestion="指定具体异常类型",
rule_id=self.rule_id
))
return issues
class HighComplexityRule(ReviewRule):
"""高复杂度检测"""
rule_id = "COM001"
category = IssueCategory.COMPLEXITY
severity = Severity.MAJOR
description = "检测高圈复杂度的函数"
MAX_COMPLEXITY = 10
def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
issues = []
if parsed and parsed.get("functions"):
for func in parsed["functions"]:
complexity = func.get("complexity", 1)
if complexity > self.MAX_COMPLEXITY:
issues.append(CodeIssue(
severity=self.severity,
category=self.category,
title=f"函数复杂度过高 ({complexity})",
description=f"函数 '{func['name']}' 圈复杂度为 {complexity},超过阈值 {self.MAX_COMPLEXITY}",
location=CodeLocation(file_path, func["line"]),
suggestion="拆分为多个小函数,每个函数只做一件事",
rule_id=self.rule_id,
confidence=1.0,
action=SuggestionAction.REFACTOR
))
return issues
class LongFunctionRule(ReviewRule):
"""过长函数检测"""
rule_id = "COM002"
category = IssueCategory.MAINTAINABILITY
severity = Severity.MINOR
description = "检测过长的函数"
MAX_LINES = 50
def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
issues = []
if parsed and parsed.get("functions"):
for func in parsed["functions"]:
func_lines = (func.get("end_line", func["line"]) - func["line"]) + 1
if func_lines > self.MAX_LINES:
issues.append(CodeIssue(
severity=self.severity,
category=self.category,
title=f"函数过长 ({func_lines} 行)",
description=f"函数 '{func['name']}' 有 {func_lines} 行,超过阈值 {self.MAX_LINES}",
location=CodeLocation(file_path, func["line"], func.get("end_line", func["line"])),
suggestion="将函数拆分为更小的、职责单一的函数",
rule_id=self.rule_id,
action=SuggestionAction.REFACTOR
))
return issues
class MagicNumberRule(ReviewRule):
"""魔法数字检测"""
rule_id = "STYLE001"
category = IssueCategory.STYLE
severity = Severity.MINOR
description = "检测代码中的魔法数字"
ALLOWED_NUMBERS = {0, 1, -1, 2, 10, 100, 1000, 0.0, 0.5, 1.0}
def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
issues = []
if parsed and parsed.get("tree"):
for node in ast.walk(parsed["tree"]):
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
if node.value not in self.ALLOWED_NUMBERS:
# 检查是否在赋值语句中(可能是常量定义)
issues.append(CodeIssue(
severity=Severity.INFO,
category=self.category,
title=f"魔法数字: {node.value}",
description=f"数字 {node.value} 含义不明确,建议提取为命名常量",
location=CodeLocation(file_path, node.lineno),
suggestion="提取为命名常量,如 MAX_RETRIES = 3",
rule_id=self.rule_id,
action=SuggestionAction.CONSIDER
))
return issues
# 规则注册
class RuleRegistry:
"""规则注册中心"""
def __init__(self):
self.rules: Dict[str, ReviewRule] = {}
def register(self, rule: ReviewRule):
self.rules[rule.rule_id] = rule
def register_defaults(self):
"""注册默认规则"""
default_rules = [
SQLInjectionRule(),
HardcodedSecretRule(),
EmptyExceptRule(),
HighComplexityRule(),
LongFunctionRule(),
MagicNumberRule(),
]
for rule in default_rules:
self.register(rule)
def get_rules(self, categories: List[IssueCategory] = None) -> List[ReviewRule]:
if categories:
return [r for r in self.rules.values() if r.category in categories]
return list(self.rules.values())
def list_rules(self) -> str:
lines = ["📋 已注册规则:", ""]
for rule in self.rules.values():
lines.append(
f" {rule.rule_id} [{rule.severity.value}] "
f"{rule.category.value}: {rule.description}"
)
return "\n".join(lines)
# 使用
registry = RuleRegistry()
registry.register_defaults()
print(registry.list_rules())
2.2 规则引擎执行器
# ===== 规则引擎执行器 =====
class RuleEngine:
"""规则引擎执行器"""
def __init__(self, registry: RuleRegistry = None):
self.registry = registry or RuleRegistry()
if not self.registry.rules:
self.registry.register_defaults()
self.parser = CodeParser()
def review_file(
self,
code: str,
file_path: str,
rules: List[str] = None,
categories: List[IssueCategory] = None,
min_severity: Severity = Severity.INFO
) -> ReviewResult:
"""审查单个文件"""
language = self.parser.detect_language(file_path)
parsed = self.parser.parse(code, language) if language else None
# 确定要执行的规则
if rules:
active_rules = [self.registry.rules[rid] for rid in rules if rid in self.registry.rules]
elif categories:
active_rules = self.registry.get_rules(categories)
else:
active_rules = self.registry.get_rules()
# 执行所有规则
all_issues = []
for rule in active_rules:
try:
issues = rule.check(code, file_path, parsed)
# 过滤严重级别
severity_order = {
Severity.INFO: 0, Severity.MINOR: 1,
Severity.MAJOR: 2, Severity.CRITICAL: 3
}
min_level = severity_order[min_severity]
issues = [
i for i in issues
if severity_order[i.severity] >= min_level
]
all_issues.extend(issues)
except Exception as e:
print(f"规则 {rule.rule_id} 执行出错: {e}")
# 计算评分
score = self._calculate_score(all_issues, len(code.splitlines()))
return ReviewResult(
file_path=file_path,
issues=all_issues,
score=score
)
def review_diff(
self,
diff: str,
full_code: Dict[str, str] = None
) -> List[ReviewResult]:
"""审查 diff"""
chunks = self.parser.get_diff_chunks(diff)
results = []
# 按文件分组
file_changes: Dict[str, str] = {}
for chunk in chunks:
file_path = chunk["file"]
if file_path not in file_changes:
file_changes[file_path] = []
file_changes[file_path].extend(chunk["lines"])
for file_path, changes in file_changes.items():
# 使用完整代码(如果有)或仅变更行
code = full_code.get(file_path, "") if full_code else ""
if not code:
code = "\n".join(
c["content"] for c in changes if c["type"] == "added"
)
result = self.review_file(code, file_path)
# 只保留变更行相关的问题
added_lines = {c["line_number"] for c in changes if c["type"] == "added"}
if added_lines:
result.issues = [
i for i in result.issues
if i.location.start_line in added_lines or not added_lines
]
results.append(result)
return results
def _calculate_score(self, issues: List[CodeIssue], total_lines: int) -> float:
"""计算代码质量评分"""
if total_lines == 0:
return 100.0
penalties = {
Severity.CRITICAL: 15,
Severity.MAJOR: 5,
Severity.MINOR: 2,
Severity.INFO: 0.5
}
total_penalty = sum(penalties[i.severity] for i in issues)
max_penalty = total_lines * 0.5 # 每行最多扣 0.5 分
score = max(0, 100 - (total_penalty / max(max_penalty, 1)) * 100)
return round(score, 1)
# 使用
engine = RuleEngine()
test_code = '''
import os
password = "my_secret_password_123"
api_key = "sk-1234567890abcdef"
def get_user(user_id):
query = f"SELECT * FROM users WHERE id = {user_id}"
cursor.execute(query)
return cursor.fetchone()
def process(data):
try:
result = complex_logic(data)
except:
pass
return result
def complex_logic(data):
if data > 100:
if data > 200:
if data > 300:
if data > 400:
if data > 500:
return "very high"
return "high"
return "medium high"
return "medium"
return "medium low"
return "low"
'''
result = engine.review_file(test_code, "app.py")
print(result.to_report())
三、LLM 驱动的深度审查
3.1 LLM 审查器
# ===== LLM 驱动的深度审查 =====
from openai import OpenAI
class LLMReviewer:
"""基于 LLM 的深度代码审查"""
def __init__(self, model: str = "gpt-4o", api_key: str = None):
self.model = model
self.client = OpenAI(api_key=api_key)
def review(
self,
code: str,
language: str = "python",
context: str = "",
focus_areas: List[str] = None
) -> ReviewResult:
"""深度审查代码"""
focus_prompt = ""
if focus_areas:
focus_prompt = f"\n重点关注以下方面: {', '.join(focus_areas)}"
system_prompt = f"""你是一个资深代码审查专家。请审查以下 {language} 代码。
审查维度:
1. **Bug 和逻辑错误** - 竞态条件、空指针、边界条件
2. **安全漏洞** - 注入、XSS、认证问题、数据泄露
3. **性能问题** - 不必要的计算、内存泄漏、N+1 查询
4. **可维护性** - 代码复杂度、命名、注释
5. **错误处理** - 异常处理、资源清理、超时
6. **最佳实践** - 设计模式、SOLID 原则
输出格式(JSON 数组):
```json
[{{
"severity": "critical|major|minor|info",
"category": "bug|security|performance|style|maintainability|error_handling",
"title": "简短标题",
"line": 行号,
"description": "详细描述",
"suggestion": "修改建议",
"fix_code": "修复代码(可选)"
}}]
注意:
-
只报告真实问题,不要过度警告
-
给出可操作的修复建议
-
修复代码要完整可运行{focus_prompt}“”"
user_message = f"请审查以下代码:\n\n```{language}\n{code}\n```" if context: user_message += f"\n\n上下文信息:\n{context}" response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ], response_format={"type": "json_object"}, temperature=0.1 ) return self._parse_llm_response(response.choices[0].message.content, "llm_review.py")def review_diff(
self,
diff: str,
language: str = “python”,
pr_description: str = “”
) -> ReviewResult:
“”“审查 PR diff”“”system_prompt = """你是一个代码审查专家,专门审查 PR/Patch 的变更。
重点检查:
- 变更是否引入新的 Bug
- 是否有安全风险
- 是否有性能回归
- 变更是否符合 PR 描述
- 是否缺少测试
输出 JSON 数组,格式:
[{“severity”, “category”, “title”, “line”, “description”, “suggestion”, “fix_code”}]“”"
user_message = f"请审查以下 diff:\n\n```diff\n{diff}\n```"
if pr_description:
user_message += f"\n\nPR 描述: {pr_description}"
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
response_format={"type": "json_object"},
temperature=0.1
)
return self._parse_llm_response(response.choices[0].message.content, "diff_review")
def _parse_llm_response(self, response_text: str, file_path: str) -> ReviewResult:
"""解析 LLM 响应为 ReviewResult"""
import json
try:
data = json.loads(response_text)
issues_list = data.get("issues", data) if isinstance(data, dict) else data
if not isinstance(issues_list, list):
issues_list = [issues_list]
except json.JSONDecodeError:
# 尝试从文本中提取 JSON
import re
json_match = re.search(r'\[.*\]', response_text, re.DOTALL)
if json_match:
issues_list = json.loads(json_match.group())
else:
issues_list = []
issues = []
severity_map = {
"critical": Severity.CRITICAL,
"major": Severity.MAJOR,
"minor": Severity.MINOR,
"info": Severity.INFO
}
category_map = {
"bug": IssueCategory.BUG,
"security": IssueCategory.SECURITY,
"performance": IssueCategory.PERFORMANCE,
"style": IssueCategory.STYLE,
"maintainability": IssueCategory.MAINTAINABILITY,
"error_handling": IssueCategory.ERROR_HANDLING,
}
for item in issues_list:
if not isinstance(item, dict):
continue
sev = severity_map.get(
item.get("severity", "info").lower(),
Severity.INFO
)
cat = category_map.get(
item.get("category", "bug").lower(),
IssueCategory.BUG
)
issues.append(CodeIssue(
severity=sev,
category=cat,
title=item.get("title", "未命名问题"),
description=item.get("description", ""),
location=CodeLocation(file_path, item.get("line", 0)),
suggestion=item.get("suggestion", ""),
fix_code=item.get("fix_code", ""),
confidence=0.8
))
score = max(0, 100 - sum(
{Severity.CRITICAL: 15, Severity.MAJOR: 5,
Severity.MINOR: 2, Severity.INFO: 0.5}[i.severity]
for i in issues
))
return ReviewResult(
file_path=file_path,
issues=issues,
score=round(score, 1)
)
使用
if name == ‘main’:
reviewer = LLMReviewer()
code = '''
def transfer(from_id, to_id, amount):
from_account = db.get(from_id)
to_account = db.get(to_id)
from_account.balance -= amount
to_account.balance += amount
db.save(from_account)
db.save(to_account)
‘’’
# result = reviewer.review(code, focus_areas=["security", "bug"])
# print(result.to_report())
### 3.2 混合审查器(规则 + LLM)
```python
# ===== 混合审查器 =====
class HybridReviewer:
"""混合代码审查器(规则引擎 + LLM)"""
def __init__(self, model: str = "gpt-4o", api_key: str = None):
self.rule_engine = RuleEngine()
self.llm_reviewer = LLMReviewer(model=model, api_key=api_key)
self.parser = CodeParser()
def review(
self,
code: str,
file_path: str,
use_llm: bool = True,
use_rules: bool = True,
llm_focus: List[str] = None
) -> ReviewResult:
"""混合审查"""
all_issues = []
# 1. 规则引擎审查(快速、确定性)
if use_rules:
rule_result = self.rule_engine.review_file(code, file_path)
all_issues.extend(rule_result.issues)
print(f" 📏 规则引擎: 发现 {len(rule_result.issues)} 个问题")
# 2. LLM 深度审查(慢速、上下文感知)
if use_llm:
language = self.parser.detect_language(file_path) or "python"
llm_result = self.llm_reviewer.review(
code, language=language, focus_areas=llm_focus
)
# 去重:移除规则引擎已发现的问题
rule_titles = {i.title for i in all_issues}
new_issues = [
i for i in llm_result.issues
if i.title not in rule_titles
]
all_issues.extend(new_issues)
print(f" 🤖 LLM 审查: 发现 {len(new_issues)} 个新问题(去重后)")
# 计算评分
total_lines = len(code.splitlines())
score = self.rule_engine._calculate_score(all_issues, total_lines)
return ReviewResult(
file_path=file_path,
issues=all_issues,
score=score
)
def review_pr(
self,
diff: str,
full_files: Dict[str, str] = None,
pr_description: str = ""
) -> Dict[str, ReviewResult]:
"""审查整个 PR"""
results = {}
# 规则引擎审查 diff
rule_results = self.rule_engine.review_diff(diff, full_files)
for result in rule_results:
if result.file_path not in results:
results[result.file_path] = ReviewResult(
file_path=result.file_path
)
results[result.file_path].issues.extend(result.issues)
# LLM 审查每个文件
if full_files:
for file_path, code in full_files.items():
language = self.parser.detect_language(file_path) or "python"
llm_result = self.llm_reviewer.review(
code, language=language
)
if file_path not in results:
results[file_path] = ReviewResult(file_path=file_path)
# 去重
existing_titles = {i.title for i in results[file_path].issues}
for issue in llm_result.issues:
if issue.title not in existing_titles:
results[file_path].issues.append(issue)
existing_titles.add(issue.title)
# 计算评分
for file_path, result in results.items():
code = (full_files or {}).get(file_path, "")
result.score = self.rule_engine._calculate_score(
result.issues, len(code.splitlines()) if code else 1
)
return results
def generate_summary(self, results: Dict[str, ReviewResult]) -> str:
"""生成 PR 审查摘要"""
total_issues = sum(len(r.issues) for r in results.values())
critical = sum(r.critical_count for r in results.values())
major = sum(r.major_count for r in results.values())
avg_score = sum(r.score for r in results.values()) / max(len(results), 1)
lines = [
"🔍 PR 审查摘要",
"=" * 40,
f"文件数: {len(results)}",
f"总问题: {total_issues}",
f"🔴 严重: {critical} | 🟡 主要: {major}",
f"平均评分: {avg_score:.1f}/100",
f"{'🚫 建议不合并' if critical > 0 else '✅ 可以合并'}",
""
]
for file_path, result in results.items():
if result.issues:
lines.append(f"📄 {file_path} (评分: {result.score})")
for issue in result.issues[:5]:
icon = "🔴" if issue.severity == Severity.CRITICAL else "🟡" if issue.severity == Severity.MAJOR else "🔵"
lines.append(f" {icon} {issue.title}")
if len(result.issues) > 5:
lines.append(f" ... 还有 {len(result.issues) - 5} 个问题")
lines.append("")
return "\n".join(lines)
# 使用
if __name__ == '__main__':
reviewer = HybridReviewer()
# 模拟 PR 审查
diff = """diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -10,6 +10,12 @@
+def get_user(user_id):
+ query = f"SELECT * FROM users WHERE id = {user_id}"
+ cursor.execute(query)
+ return cursor.fetchone()
+
+password = "admin123"
"""
full_files = {
"app.py": '''
password = "admin123"
def get_user(user_id):
query = f"SELECT * FROM users WHERE id = {user_id}"
cursor.execute(query)
return cursor.fetchone()
def process(data):
try:
result = transform(data)
except:
pass
return result
'''
}
# 先用规则引擎
rule_results = reviewer.rule_engine.review_diff(diff, full_files)
for r in rule_results:
print(r.to_report())
四、Git 集成
4.1 Git Hook 审查
# ===== Git Hook 审查 =====
import subprocess
import os
import sys
class GitHookReviewer:
"""Git Hook 代码审查"""
def __init__(self, repo_path: str = "."):
self.repo_path = repo_path
self.reviewer = RuleEngine()
def pre_commit_review(self) -> bool:
"""pre-commit 钩子审查"""
# 获取暂存文件
staged_files = self._get_staged_files()
if not staged_files:
return True
print(f"🔍 审查 {len(staged_files)} 个暂存文件...")
has_blockers = False
for file_path in staged_files:
language = self.reviewer.parser.detect_language(file_path)
if not language:
continue
# 获取暂存内容
content = self._get_staged_content(file_path)
if not content:
continue
result = self.reviewer.review_file(content, file_path)
if result.has_blockers:
has_blockers = True
print(f"\n❌ {file_path}: {result.critical_count} 个严重问题")
for issue in result.issues:
if issue.severity == Severity.CRITICAL:
print(f" 🔴 {issue.title} @ {issue.location}")
elif result.issues:
print(f"⚠️ {file_path}: {len(result.issues)} 个问题(评分 {result.score})")
else:
print(f"✅ {file_path}: 无问题")
if has_blockers:
print("\n🚫 提交被阻止:存在严重问题")
print(" 使用 git commit --no-verify 可跳过检查")
return False
return True
def _get_staged_files(self) -> List[str]:
"""获取暂存文件列表"""
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
cwd=self.repo_path,
capture_output=True, text=True
)
return [f for f in result.stdout.strip().splitlines() if f]
def _get_staged_content(self, file_path: str) -> str:
"""获取暂存文件内容"""
result = subprocess.run(
["git", "show", f":{file_path}"],
cwd=self.repo_path,
capture_output=True, text=True
)
return result.stdout
# pre-commit 钩子安装脚本
def install_pre_commit_hook(repo_path: str = "."):
"""安装 pre-commit 钩子"""
hook_dir = os.path.join(repo_path, ".git", "hooks")
hook_path = os.path.join(hook_dir, "pre-commit")
hook_script = '''#!/usr/bin/env python3
import sys
sys.path.insert(0, '.')
from code_review import GitHookReviewer
reviewer = GitHookReviewer()
if not reviewer.pre_commit_review():
sys.exit(1)
'''
with open(hook_path, 'w') as f:
f.write(hook_script)
os.chmod(hook_path, 0o755)
print(f"✅ pre-commit 钩子已安装: {hook_path}")
# 使用
if __name__ == '__main__':
reviewer = GitHookReviewer()
if "--install" in sys.argv:
install_pre_commit_hook()
else:
if not reviewer.pre_commit_review():
sys.exit(1)
4.2 GitHub PR 审查
# ===== GitHub PR 审查 =====
import aiohttp
import asyncio
class GitHubPRReviewer:
"""GitHub PR 审查机器人"""
def __init__(
self,
token: str,
repo: str,
reviewer: HybridReviewer = None
):
self.token = token
self.repo = repo # owner/repo
self.reviewer = reviewer or HybridReviewer()
self.api = "https://api.github.com"
self.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github.v3+json"
}
async def review_pr(self, pr_number: int) -> Dict:
"""审查 PR"""
# 获取 PR 信息
pr_info = await self._get_pr_info(pr_number)
pr_description = pr_info.get("body", "")
# 获取 PR diff
diff = await self._get_pr_diff(pr_number)
# 获取变更文件内容
files = await self._get_pr_files(pr_number)
full_files = {}
for f in files:
if f.get("patch"):
full_files[f["filename"]] = f["patch"]
# 执行审查
results = self.reviewer.review_pr(
diff, full_files, pr_description
)
# 生成摘要
summary = self.reviewer.generate_summary(results)
# 发表审查评论
await self._post_review_comment(pr_number, summary)
# 对具体问题添加行内评论
for file_path, result in results.items():
for issue in result.issues:
if issue.severity in (Severity.CRITICAL, Severity.MAJOR):
await self._post_inline_comment(
pr_number, file_path,
issue.location.start_line,
self._format_issue_comment(issue)
)
# 设置审查状态
has_blockers = any(r.has_blockers for r in results.values())
await self._submit_review(
pr_number,
"REQUEST_CHANGES" if has_blockers else "APPROVE",
summary
)
return {
"pr": pr_number,
"files_reviewed": len(results),
"total_issues": sum(len(r.issues) for r in results.values()),
"has_blockers": has_blockers,
"summary": summary
}
async def _get_pr_info(self, pr_number: int) -> Dict:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api}/repos/{self.repo}/pulls/{pr_number}",
headers=self.headers
) as resp:
return await resp.json()
async def _get_pr_diff(self, pr_number: int) -> str:
headers = {**self.headers, "Accept": "application/vnd.github.v3.diff"}
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api}/repos/{self.repo}/pulls/{pr_number}",
headers=headers
) as resp:
return await resp.text()
async def _get_pr_files(self, pr_number: int) -> List[Dict]:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api}/repos/{self.repo}/pulls/{pr_number}/files",
headers=self.headers
) as resp:
return await resp.json()
async def _post_review_comment(self, pr_number: int, body: str):
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.api}/repos/{self.repo}/issues/{pr_number}/comments",
headers=self.headers,
json={"body": body}
) as resp:
return await resp.json()
async def _post_inline_comment(
self, pr_number: int, path: str, line: int, body: str
):
# 需要先获取 latest commit
pr_info = await self._get_pr_info(pr_number)
commit_id = pr_info.get("head", {}).get("sha", "")
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.api}/repos/{self.repo}/pulls/{pr_number}/comments",
headers=self.headers,
json={
"body": body,
"commit_id": commit_id,
"path": path,
"line": line,
"side": "RIGHT"
}
)
async def _submit_review(
self, pr_number: int, event: str, body: str
):
pr_info = await self._get_pr_info(pr_number)
commit_id = pr_info.get("head", {}).get("sha", "")
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.api}/repos/{self.repo}/pulls/{pr_number}/reviews",
headers=self.headers,
json={
"commit_id": commit_id,
"body": body,
"event": event
}
)
def _format_issue_comment(self, issue: CodeIssue) -> str:
icon = "🔴" if issue.severity == Severity.CRITICAL else "🟡"
lines = [
f"{icon} **{issue.title}**",
f"",
f"{issue.description}",
]
if issue.suggestion:
lines.append(f"\n💡 {issue.suggestion}")
if issue.fix_code:
lines.append(f"\n```suggestion\n{issue.fix_code}\n```")
return "\n".join(lines)
# Webhook 处理器
class GitHubWebhookHandler:
"""GitHub Webhook 处理"""
def __init__(self, reviewer: GitHubPRReviewer):
self.reviewer = reviewer
async def handle_event(self, event: Dict) -> Optional[Dict]:
"""处理 GitHub Webhook 事件"""
action = event.get("action")
if event.get("pull_request") and action in ("opened", "synchronize"):
pr_number = event["pull_request"]["number"]
print(f"🔍 审查 PR #{pr_number} (action: {action})")
result = await self.reviewer.review_pr(pr_number)
return result
return None
五、自定义审查规则
5.1 项目级规则配置
# ===== 项目级规则配置 =====
import yaml
import json
@dataclass
class ReviewConfig:
"""审查配置"""
# 规则开关
enabled_rules: List[str] = field(default_factory=lambda: [
"SEC001", "SEC002", "ERR001", "COM001", "COM002", "STYLE001"
])
# 严重级别过滤
min_severity: Severity = Severity.MINOR
# 阈值
max_complexity: int = 10
max_function_lines: int = 50
max_file_lines: int = 500
max_line_length: int = 120
# 评分
fail_score: float = 60.0
# 忽略路径
ignore_paths: List[str] = field(default_factory=lambda: [
"vendor/", "node_modules/", "__pycache__/",
"*.min.js", "*.generated.*"
])
# 语言特定规则
language_rules: Dict[str, List[str]] = field(default_factory=dict)
# 自定义规则
custom_patterns: List[Dict] = field(default_factory=list)
@classmethod
def from_yaml(cls, path: str) -> 'ReviewConfig':
"""从 YAML 加载配置"""
with open(path, 'r') as f:
data = yaml.safe_load(f) or {}
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
@classmethod
def from_dict(cls, data: Dict) -> 'ReviewConfig':
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
# YAML 配置示例
CONFIG_YAML = """
# .code-review.yml
enabled_rules:
- SEC001 # SQL 注入
- SEC002 # 硬编码密钥
- ERR001 # 空异常捕获
- COM001 # 高复杂度
- COM002 # 过长函数
- STYLE001 # 魔法数字
min_severity: minor
max_complexity: 8
max_function_lines: 40
max_file_lines: 400
fail_score: 65.0
ignore_paths:
- vendor/
- node_modules/
- "**/*.min.js"
- "**/*.generated.*"
language_rules:
python:
- SEC001
- ERR001
javascript:
- SEC002
custom_patterns:
- id: CUSTOM001
name: "禁止 console.log"
pattern: "console\\.log\\("
severity: minor
category: style
suggestion: "使用 logger 替代 console.log"
- id: CUSTOM002
name: "禁止 TODO 遗留"
pattern: "TODO|FIXME|HACK"
severity: info
category: maintainability
suggestion: "创建 Issue 跟踪待办事项"
"""
class CustomPatternRule(ReviewRule):
"""自定义正则规则"""
def __init__(self, config: Dict):
self.rule_id = config["id"]
self.category = IssueCategory(config.get("category", "style"))
self.severity = Severity(config.get("severity", "minor"))
self.description = config.get("name", "")
self.pattern = config["pattern"]
self.suggestion_text = config.get("suggestion", "")
def check(self, code: str, file_path: str, parsed: Dict = None) -> List[CodeIssue]:
issues = []
for i, line in enumerate(code.splitlines(), 1):
if re.search(self.pattern, line, re.IGNORECASE):
issues.append(CodeIssue(
severity=self.severity,
category=self.category,
title=self.description,
description=f"匹配规则 {self.rule_id}: {line.strip()}",
location=CodeLocation(file_path, i),
suggestion=self.suggestion_text,
rule_id=self.rule_id
))
return issues
class ConfigurableReviewer:
"""可配置的审查器"""
def __init__(self, config: ReviewConfig = None):
self.config = config or ReviewConfig()
self.registry = RuleRegistry()
self.registry.register_defaults()
# 注册自定义规则
for custom in self.config.custom_patterns:
self.registry.register(CustomPatternRule(custom))
def should_review(self, file_path: str) -> bool:
"""判断文件是否需要审查"""
import fnmatch
for pattern in self.config.ignore_paths:
if fnmatch.fnmatch(file_path, pattern):
return False
if file_path.startswith(pattern.rstrip("/")):
return False
return True
def review(self, code: str, file_path: str) -> ReviewResult:
"""审查代码"""
if not self.should_review(file_path):
return ReviewResult(file_path=file_path, score=100, summary="已跳过(忽略路径)")
language = CodeParser().detect_language(file_path)
# 确定要使用的规则
rules = self.config.enabled_rules
if language and language in self.config.language_rules:
rules = self.config.language_rules[language]
engine = RuleEngine(self.registry)
return engine.review_file(
code, file_path,
rules=rules,
min_severity=self.config.min_severity
)
# 使用
if __name__ == '__main__':
# 从 YAML 加载
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
f.write(CONFIG_YAML)
config_path = f.name
config = ReviewConfig.from_yaml(config_path)
reviewer = ConfigurableReviewer(config)
print(f"启用规则: {reviewer.config.enabled_rules}")
print(f"忽略路径: {reviewer.config.ignore_paths}")
print(f"自定义规则: {len(reviewer.config.custom_patterns)}")
5.2 团队规则共享
# ===== 团队规则共享 =====
class TeamRuleManager:
"""团队规则管理"""
def __init__(self, config_repo: str = None):
self.config_repo = config_repo
self.rules_dir = ".code-review-rules"
def export_rules(self, config: ReviewConfig, output_path: str = ".code-review.yml"):
"""导出规则配置"""
data = {
"version": "1.0",
"enabled_rules": config.enabled_rules,
"min_severity": config.min_severity.value,
"max_complexity": config.max_complexity,
"max_function_lines": config.max_function_lines,
"ignore_paths": config.ignore_paths,
"custom_patterns": config.custom_patterns
}
with open(output_path, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
print(f"✅ 规则已导出: {output_path}")
def import_rules(self, source: str) -> ReviewConfig:
"""导入规则配置"""
if source.startswith("http"):
# 从远程加载
import urllib.request
with urllib.request.urlopen(source) as resp:
content = resp.read().decode()
else:
with open(source, 'r') as f:
content = f.read()
data = yaml.safe_load(content)
return ReviewConfig.from_dict(data)
def merge_configs(self, base: ReviewConfig, override: ReviewConfig) -> ReviewConfig:
"""合并配置(override 优先)"""
merged = ReviewConfig()
# 合并规则列表
base_rules = set(base.enabled_rules)
override_rules = set(override.enabled_rules)
merged.enabled_rules = list(base_rules | override_rules)
# override 的阈值优先
merged.max_complexity = override.max_complexity or base.max_complexity
merged.max_function_lines = override.max_function_lines or base.max_function_lines
merged.fail_score = override.fail_score or base.fail_score
# 合并忽略路径
merged.ignore_paths = list(set(base.ignore_paths + override.ignore_paths))
# 合并自定义规则
base_custom = {c["id"]: c for c in base.custom_patterns}
override_custom = {c["id"]: c for c in override.custom_patterns}
merged.custom_patterns = list({**base_custom, **override_custom}.values())
return merged
# 使用
manager = TeamRuleManager()
# 导出团队标准配置
default_config = ReviewConfig()
manager.export_rules(default_config, "team-code-review.yml")
六、审查报告生成
6.1 多格式报告
# ===== 审查报告生成器 =====
class ReportGenerator:
"""多格式报告生成器"""
@staticmethod
def to_markdown(results: Dict[str, ReviewResult]) -> str:
"""生成 Markdown 报告"""
total_issues = sum(len(r.issues) for r in results.values())
critical = sum(r.critical_count for r in results.values())
major = sum(r.major_count for r in results.values())
avg_score = sum(r.score for r in results.values()) / max(len(results), 1)
lines = [
"# 📋 代码审查报告\n",
f"**审查时间**: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"**文件数量**: {len(results)}",
f"**问题总数**: {total_issues}",
f"**平均评分**: {avg_score:.1f}/100\n",
"## 概览\n",
"| 状态 | 数量 |",
"|------|------|",
f"| 🔴 严重 | {critical} |",
f"| 🟡 主要 | {major} |",
f"| 🔵 次要 | {total_issues - critical - major} |",
""
]
if critical > 0:
lines.append("> ⚠️ **存在严重问题,建议修复后再合并**\n")
for file_path, result in results.items():
if not result.issues:
continue
lines.append(f"## 📄 {file_path}\n")
lines.append(f"**评分**: {result.score}/100 | **问题**: {len(result.issues)} 个\n")
lines.append("| 等级 | 类别 | 问题 | 建议 |")
lines.append("|------|------|------|------|")
for issue in sorted(result.issues, key=lambda x: x.severity.value):
icon = {"critical": "🔴", "major": "🟡", "minor": "🔵", "info": "⚪"}[issue.severity.value]
lines.append(
f"| {icon} | {issue.category.value} | "
f"{issue.title} | {issue.suggestion[:50]} |"
)
lines.append("")
return "\n".join(lines)
@staticmethod
def to_sarif(results: Dict[str, ReviewResult]) -> Dict:
"""生成 SARIF 格式(GitHub 支持的扫描结果格式)"""
sarif = {
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/main/sarif-2.1/schema/sarif-schema-2.1.0.json",
"version": "2.1.0",
"runs": [{
"tool": {
"driver": {
"name": "AI Code Reviewer",
"version": "1.0.0",
"rules": []
}
},
"results": []
}]
}
rule_index = {}
for file_path, result in results.items():
for issue in result.issues:
if issue.rule_id not in rule_index:
rule_index[issue.rule_id] = len(sarif["runs"][0]["tool"]["driver"]["rules"])
sarif["runs"][0]["tool"]["driver"]["rules"].append({
"id": issue.rule_id,
"shortDescription": {"text": issue.title},
"properties": {
"tags": [issue.category.value]
}
})
sarif["runs"][0]["results"].append({
"ruleId": issue.rule_id,
"ruleIndex": rule_index[issue.rule_id],
"level": {
Severity.CRITICAL: "error",
Severity.MAJOR: "warning",
Severity.MINOR: "note",
Severity.INFO: "none"
}[issue.severity],
"message": {"text": issue.description},
"locations": [{
"physicalLocation": {
"artifactLocation": {"uri": file_path},
"region": {"startLine": issue.location.start_line}
}
}]
})
return sarif
@staticmethod
def to_json(results: Dict[str, ReviewResult]) -> str:
"""生成 JSON 报告"""
data = {}
for file_path, result in results.items():
data[file_path] = {
"score": result.score,
"issues": [i.to_dict() for i in result.issues],
"reviewed_at": result.reviewed_at
}
return json.dumps(data, ensure_ascii=False, indent=2)
# 使用
if __name__ == '__main__':
generator = ReportGenerator()
# 模拟结果
results = {
"app.py": ReviewResult(
file_path="app.py",
score=72,
issues=[
CodeIssue(
severity=Severity.CRITICAL,
category=IssueCategory.SECURITY,
title="SQL 注入",
description="直接拼接用户输入",
location=CodeLocation("app.py", 42),
suggestion="参数化查询"
)
]
)
}
# Markdown
print(generator.to_markdown(results))
# SARIF
sarif = generator.to_sarif(results)
print(f"\nSARIF 规则数: {len(sarif['runs'][0]['tool']['driver']['rules'])}")
6.2 趋势追踪
# ===== 审查趋势追踪 =====
import json
from datetime import datetime, timedelta
class ReviewTracker:
"""审查趋势追踪"""
def __init__(self, storage_path: str = ".review-history.json"):
self.storage_path = storage_path
self.history = self._load()
def _load(self) -> Dict:
"""加载历史记录"""
try:
with open(self.storage_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {"reviews": []}
def _save(self):
"""保存历史记录"""
with open(self.storage_path, 'w') as f:
json.dump(self.history, f, ensure_ascii=False, indent=2)
def record(self, results: Dict[str, ReviewResult], commit_hash: str = ""):
"""记录审查结果"""
entry = {
"timestamp": datetime.now().isoformat(),
"commit": commit_hash,
"files_reviewed": len(results),
"total_issues": sum(len(r.issues) for r in results.values()),
"critical_issues": sum(r.critical_count for r in results.values()),
"avg_score": sum(r.score for r in results.values()) / max(len(results), 1),
"files": {
fp: {"score": r.score, "issues": len(r.issues)}
for fp, r in results.items()
}
}
self.history["reviews"].append(entry)
self._save()
def get_trend(self, days: int = 30) -> Dict:
"""获取趋势数据"""
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
recent = [
r for r in self.history["reviews"]
if r["timestamp"] >= cutoff
]
if not recent:
return {"trend": "no_data", "days": days}
scores = [r["avg_score"] for r in recent]
issues = [r["total_issues"] for r in recent]
criticals = [r["critical_issues"] for r in recent]
return {
"days": days,
"reviews": len(recent),
"score": {
"current": scores[-1],
"avg": sum(scores) / len(scores),
"trend": "improving" if scores[-1] > scores[0] else "declining"
},
"issues": {
"total": sum(issues),
"avg_per_review": sum(issues) / len(issues),
"critical_total": sum(criticals)
}
}
def get_top_issues(self, limit: int = 10) -> List[Dict]:
"""获取最常见问题"""
issue_counts: Dict[str, int] = {}
for review in self.history["reviews"]:
for file_path, file_data in review.get("files", {}).items():
# 简化统计
key = file_path
issue_counts[key] = issue_counts.get(key, 0) + file_data.get("issues", 0)
sorted_issues = sorted(issue_counts.items(), key=lambda x: x[1], reverse=True)
return [
{"file": f, "issue_count": c}
for f, c in sorted_issues[:limit]
]
# 使用
tracker = ReviewTracker()
七、生产案例
7.1 CI/CD 集成
# ===== CI/CD 集成 =====
class CIReviewer:
"""CI/CD 流水线集成"""
def __init__(self, config_path: str = ".code-review.yml"):
self.config = ReviewConfig.from_yaml(config_path)
self.reviewer = ConfigurableReviewer(self.config)
self.tracker = ReviewTracker()
self.reporter = ReportGenerator()
def run(self) -> bool:
"""运行 CI 审查"""
print("=" * 50)
print("🔍 AI 代码审查 - CI 模式")
print("=" * 50)
# 获取变更文件
changed_files = self._get_changed_files()
print(f"\n📁 变更文件: {len(changed_files)} 个")
results = {}
for file_path in changed_files:
if not self.reviewer.should_review(file_path):
continue
try:
with open(file_path, 'r', encoding='utf-8') as f:
code = f.read()
except (UnicodeDecodeError, FileNotFoundError):
continue
result = self.reviewer.review(code, file_path)
results[file_path] = result
status = "✅" if not result.has_blockers else "❌"
print(f" {status} {file_path}: 评分 {result.score}, 问题 {len(result.issues)} 个")
# 生成报告
report = self.reporter.to_markdown(results)
with open("review-report.md", 'w') as f:
f.write(report)
print(f"\n📊 报告: review-report.md")
# 记录历史
self.tracker.record(results)
# 判断是否通过
has_blockers = any(r.has_blockers for r in results.values())
avg_score = sum(r.score for r in results.values()) / max(len(results), 1)
passed = not has_blockers and avg_score >= self.config.fail_score
print(f"\n{'✅ 审查通过' if passed else '🚫 审查未通过'}")
print(f" 平均评分: {avg_score:.1f} (阈值: {self.config.fail_score})")
print(f" 严重问题: {sum(r.critical_count for r in results.values())}")
return passed
def _get_changed_files(self) -> List[str]:
"""获取变更文件"""
# 尝试 git diff
try:
result = subprocess.run(
["git", "diff", "--name-only", "HEAD~1"],
capture_output=True, text=True
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip().splitlines()
except Exception:
pass
# 回退:扫描所有文件
files = []
for root, dirs, filenames in os.walk("."):
dirs[:] = [d for d in dirs if d not in {
".git", "node_modules", "__pycache__", "venv", ".venv"
}]
for f in filenames:
filepath = os.path.join(root, f)
if self.reviewer.should_review(filepath):
files.append(filepath)
return files
# GitHub Actions 工作流
GITHUB_ACTIONS_YML = """
name: AI Code Review
on:
pull_request:
types: [opened, synchronize]
jobs:
review:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 2
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install openai pyyaml
- name: Run AI Code Review
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: python -m code_review --ci
- name: Upload Report
if: always()
uses: actions/upload-artifact@v4
with:
name: review-report
path: review-report.md
"""
# GitLab CI 配置
GITLAB_CI_YML = """
ai-code-review:
stage: test
image: python:3.11
before_script:
- pip install openai pyyaml
script:
- python -m code_review --ci
artifacts:
paths:
- review-report.md
when: always
variables:
OPENAI_API_KEY: $OPENAI_API_KEY
"""
# 使用
if __name__ == '__main__':
ci = CIReviewer()
success = ci.run()
sys.exit(0 if success else 1)
7.2 VS Code 扩展
# ===== VS Code 扩展后端 =====
class VSCodeReviewServer:
"""VS Code 代码审查扩展后端"""
def __init__(self):
self.reviewer = ConfigurableReviewer()
self.llm_reviewer = LLMReviewer()
self._debounce_timer = None
self._cache: Dict[str, ReviewResult] = {}
def review_on_save(self, file_path: str, code: str) -> Dict:
"""保存时审查"""
result = self.reviewer.review(code, file_path)
self._cache[file_path] = result
diagnostics = []
for issue in result.issues:
diagnostics.append({
"severity": self._to_vscode_severity(issue.severity),
"message": f"[{issue.rule_id}] {issue.title}: {issue.description}",
"range": {
"startLine": issue.location.start_line - 1,
"startColumn": issue.location.start_column,
"endLine": issue.location.end_line - 1,
"endColumn": issue.location.end_column or 999
},
"source": "AI Code Review",
"code": issue.rule_id,
"relatedInformation": [
{
"message": f"建议: {issue.suggestion}",
"location": {
"uri": file_path,
"range": {
"startLine": issue.location.start_line - 1,
"startColumn": 0,
"endLine": issue.location.start_line - 1,
"endColumn": 999
}
}
}
] if issue.suggestion else []
})
return {
"diagnostics": diagnostics,
"score": result.score,
"issueCount": len(result.issues)
}
async def deep_review(self, file_path: str, code: str) -> Dict:
"""深度审查(LLM)"""
language = CodeParser().detect_language(file_path) or "python"
result = self.llm_reviewer.review(code, language=language)
return {
"issues": [i.to_dict() for i in result.issues],
"score": result.score,
"summary": result.to_report()
}
def get_quick_fix(self, file_path: str, issue_index: int) -> Optional[Dict]:
"""获取快速修复"""
result = self._cache.get(file_path)
if not result or issue_index >= len(result.issues):
return None
issue = result.issues[issue_index]
if not issue.fix_code:
return None
return {
"title": f"修复: {issue.title}",
"edit": {
"range": {
"startLine": issue.location.start_line - 1,
"endLine": issue.location.end_line - 1
},
"newText": issue.fix_code
}
}
@staticmethod
def _to_vscode_severity(severity: Severity) -> int:
mapping = {
Severity.CRITICAL: 0, # Error
Severity.MAJOR: 1, # Warning
Severity.MINOR: 2, # Information
Severity.INFO: 3 # Hint
}
return mapping[severity]
# VS Code extension.js 示例
VSCODE_EXTENSION_JS = """
const vscode = require('vscode');
function activate(context) {
const diagnosticCollection = vscode.languages.createDiagnosticCollection('ai-code-review');
// 保存时审查
vscode.workspace.onDidSaveTextDocument(async (doc) => {
const result = await reviewFile(doc.uri.fsPath, doc.getText());
const diagnostics = result.diagnostics.map(d => {
const range = new vscode.Range(
d.range.startLine, d.range.startColumn,
d.range.endLine, d.range.endColumn
);
const diag = new vscode.Diagnostic(
range, d.message, d.severity
);
diag.source = d.source;
diag.code = d.code;
return diag;
});
diagnosticCollection.set(doc.uri, diagnostics);
// 状态栏显示评分
statusBarItem.text = `$(shield) ${result.score}/100`;
statusBarItem.show();
});
// 深度审查命令
const deepReviewCmd = vscode.commands.registerCommand(
'aiCodeReview.deepReview',
async () => {
const editor = vscode.window.activeTextEditor;
if (!editor) return;
vscode.window.withProgress(
{ location: vscode.ProgressLocation.Notification, title: 'AI 深度审查中...' },
async () => {
const result = await deepReview(
editor.document.uri.fsPath,
editor.document.getText()
);
// 在侧边面板显示结果
reviewPanel.update(result);
}
);
}
);
context.subscriptions.push(deepReviewCmd);
}
"""
7.3 全栈审查平台
# ===== 全栈审查平台 =====
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class CodeReviewPlatform:
"""代码审查平台"""
def __init__(self):
self.rule_reviewer = ConfigurableReviewer()
self.llm_reviewer = LLMReviewer()
self.tracker = ReviewTracker()
self.reporter = ReportGenerator()
def review_project(self, project_path: str) -> Dict:
"""审查整个项目"""
results = {}
for root, dirs, files in os.walk(project_path):
# 跳过忽略目录
dirs[:] = [d for d in dirs if d not in {
".git", "node_modules", "__pycache__", "venv",
".venv", "dist", "build", ".idea", ".vscode"
}]
for filename in files:
file_path = os.path.join(root, filename)
rel_path = os.path.relpath(file_path, project_path)
if not self.rule_reviewer.should_review(rel_path):
continue
try:
with open(file_path, 'r', encoding='utf-8') as f:
code = f.read()
except (UnicodeDecodeError, FileNotFoundError):
continue
result = self.rule_reviewer.review(code, rel_path)
if result.issues:
results[rel_path] = result
# 记录历史
self.tracker.record(results)
# 统计
total_issues = sum(len(r.issues) for r in results.values())
avg_score = sum(r.score for r in results.values()) / max(len(results), 1)
# 分类统计
category_stats = {}
severity_stats = {}
for result in results.values():
for issue in result.issues:
category_stats[issue.category.value] = category_stats.get(issue.category.value, 0) + 1
severity_stats[issue.severity.value] = severity_stats.get(issue.severity.value, 0) + 1
return {
"project_path": project_path,
"files_reviewed": len(results),
"total_issues": total_issues,
"avg_score": round(avg_score, 1),
"category_stats": category_stats,
"severity_stats": severity_stats,
"results": {fp: {"score": r.score, "issues": len(r.issues)} for fp, r in results.items()},
"report_markdown": self.reporter.to_markdown(results)
}
# API 服务
class ReviewAPIHandler(BaseHTTPRequestHandler):
"""代码审查 API 服务"""
platform = CodeReviewPlatform()
def do_POST(self):
if self.path == "/api/review/file":
self._handle_file_review()
elif self.path == "/api/review/diff":
self._handle_diff_review()
elif self.path == "/api/review/project":
self._handle_project_review()
def _handle_file_review(self):
content_length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(content_length))
code = body.get("code", "")
file_path = body.get("file_path", "unknown")
use_llm = body.get("use_llm", False)
if use_llm:
language = CodeParser().detect_language(file_path) or "python"
result = self.platform.llm_reviewer.review(code, language=language)
else:
result = self.platform.rule_reviewer.review(code, file_path)
self._json_response({
"score": result.score,
"issues": [i.to_dict() for i in result.issues],
"report": result.to_report()
})
def _handle_diff_review(self):
content_length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(content_length))
diff = body.get("diff", "")
engine = RuleEngine()
results = engine.review_diff(diff)
self._json_response({
"files_reviewed": len(results),
"results": [
{"file": r.file_path, "score": r.score, "issues": len(r.issues)}
for r in results
]
})
def _handle_project_review(self):
content_length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(content_length))
project_path = body.get("project_path", ".")
summary = self.platform.review_project(project_path)
self._json_response(summary)
def _json_response(self, data: dict, status: int = 200):
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode())
# 启动服务
if __name__ == '__main__':
server = HTTPServer(('0.0.0.0', 8765), ReviewAPIHandler)
print("🔍 AI 代码审查平台启动: http://localhost:8765")
print(" POST /api/review/file - 审查文件")
print(" POST /api/review/diff - 审查 Diff")
print(" POST /api/review/project - 审查项目")
server.serve_forever()
八、总结
8.1 AI 代码审查架构全景
8.2 审查维度总览
| 维度 | 规则引擎 | LLM | 示例 |
|---|---|---|---|
| Bug 检测 | ⚠️ 基础 | ✅ 深度 | 竞态条件、空指针 |
| 安全漏洞 | ✅ 强 | ✅ 强 | SQL 注入、硬编码密钥 |
| 性能问题 | ❌ 有限 | ✅ 强 | N+1、内存泄漏 |
| 代码风格 | ✅ 强 | ⚠️ 一般 | 魔法数字、命名 |
| 复杂度 | ✅ 精确 | ⚠️ 一般 | 圈复杂度、函数长度 |
| 错误处理 | ✅ 规则 | ✅ 上下文 | 空 except、资源泄漏 |
| 最佳实践 | ❌ 有限 | ✅ 强 | 设计模式、SOLID |
8.3 最佳实践
| 实践 | 说明 |
|---|---|
| 分层审查 | 规则引擎快速扫描 + LLM 深度审查 |
| 增量审查 | 只审查变更部分,减少开销 |
| 可配置 | 项目级规则配置,团队共享 |
| 去重 | 规则引擎和 LLM 结果合并去重 |
| 快速修复 | 提供可操作的修复代码 |
| 趋势追踪 | 长期跟踪代码质量变化 |
| CI 集成 | 自动化审查,阻断低质量代码 |
8.4 方案对比
| 方案 | 速度 | 深度 | 成本 | 准确率 |
|---|---|---|---|---|
| 规则引擎 | ⚡ 毫秒 | 中 | 免费 | 高(低误报) |
| LLM 审查 | 🚀 秒级 | 深 | 按调用 | 中(可能误报) |
| 混合方案 | 🚀 秒级 | 最深 | 中 | 最高 |
| 人工审查 | 🐢 小时 | 最深 | 人力 | 最高 |
本文涵盖 AI 代码审查的完整技术栈:从规则引擎到 LLM 深度审查,从 Git Hook 到 CI/CD 集成,从 IDE 扩展到全栈平台。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)