揭秘工业级 Text-to-SQL 系统:3800行代码背后的工程智慧
摘要:本文深入剖析一个近4000行的生产级SQL生成与验证系统,揭示其如何通过多轮反思、并行候选、智能裁判、断点续跑等机制,实现从自然语言到可执行SQL的高精度转换。我们将拆解其核心架构、关键技术点和工程实践,为构建可靠的AI数据查询系统提供参考。
一、系统概览:不只是"LLM生成SQL"那么简单
当你看到 validate_sql.py 这个3869行的文件时,第一反应可能是:"为什么一个简单的Text-to-SQL任务需要这么多代码?"
答案在于:生产环境与Demo的本质区别。
1.1 三大核心模式
该系统支持三种工作模式:
| 模式 | 功能 | 应用场景 |
|---|---|---|
| 模式1 | JSONL批量验证 | 已有SQL数据集的回归测试 |
| 模式2 | Excel生成+验证 | 从问题列表自动生成SQL(主流程) |
| 模式3 | Excel已有SQL评估 | 对历史SQL进行质量审计 |
1.2 核心能力矩阵
plaintext
┌─────────────────────────────────────────┐
│ validate_sql.py 核心能力 │
├─────────────────────────────────────────┤
│ ✓ 多轮反思纠错(最多N轮迭代) │
│ ✓ n-best并行候选生成 │
│ ✓ 双模型协作(小模型生成 + 大模型裁判) │
│ ✓ 细粒度错误诊断(15种标签体系) │
│ ✓ 数据库断连自动重连 │
│ ✓ 断点续跑与周期落盘 │
│ ✓ LLM服务降级与重试机制 │
│ ✓ SQL清洗与安全防护 │
└─────────────────────────────────────────┘
二、架构设计:分层解耦的工程哲学
2.1 三层架构
虽然代码在一个文件中,但逻辑上分为三个层次:
plaintext
┌──────────────────────────────────────┐
│ Layer 1: 流程编排层 │
│ - run_generate_validate_for_xlsx() │
│ - run_judge_existing_sql_for_xlsx()│
│ - main() │
├──────────────────────────────────────┤
│ Layer 2: 业务逻辑层 │
│ - build_generate_messages() │
│ - build_reflect_messages() │
│ - _call_llm_judge_realistic() │
│ - execute_sql_server() │
├──────────────────────────────────────┤
│ Layer 3: 工具函数层 │
│ - _strip_code_fences() │
│ - _ensure_single_statement() │
│ - _extract_exec_error_facts() │
│ - _truncate_text() │
└──────────────────────────────────────┘
2.2 关键数据结构
python
@dataclass
class XlsxRow:
"""Excel行数据结构"""
表名: str
问题种类: str
问题: str
sql: str
@dataclass
class SqlTask:
"""SQL验证任务"""
idx: int
obj: Dict[str, Any]
key: str # sql / sql_turn1 / sql_turn2
sql: str
三、核心技术深度解析
3.1 多轮反思机制(Reflection Loop)
这是系统的灵魂所在。传统Text-to-SQL是"一次生成",而本系统采用生成-执行-评估-修正的闭环:
python
for r in range(max_rounds): # 默认3轮
if r == 0:
# 第1轮:初始生成
msgs = build_generate_messages(...)
else:
# 后续轮次:基于上一轮反馈反思
fb = _summarize_exec_feedback_for_small(last_exec)
tags = last_judge.get("error_tags")
advice = last_judge.get("advice")
msgs = build_reflect_messages(
prev_sql=prev_sql,
exec_feedback=fb,
error_tags=tags,
judge_advice=advice,
...
)
gen_sql = call_llm_sql(client=small_client, messages=msgs)
exec_ret = execute_sql_server(sql=gen_sql)
if exec_ret["success"] and exec_ret["row_count"] > 0:
break # 成功则退出
# 失败则进入下一轮反思
last_judge = _call_llm_judge_realistic(...)
prev_sql = gen_sql
关键洞察:
- 每轮反思都携带结构化反馈(非原始报错)
- 裁判模型输出错误标签和修复建议
- 小模型根据标签定向修正(而非盲目重试)
3.2 n-best并行候选生成
首轮生成不是单条SQL,而是并行生成N个候选(默认5个):
python
n_best = max(1, int(gen_n)) # 默认5
max_workers = min(int(gen_parallel), n_best)
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futs = {ex.submit(_gen_one, i + 1): (i + 1) for i in range(n_best)}
for fut in as_completed(futs):
sql_out = fut.result()
if sql_out:
gen_sqls.append(sql_out)
优势:
- 提高首轮命中率(多样性采样)
- 避免串行等待(并行加速)
- 去重后保留唯一候选
3.3 双模型协作架构
系统采用大小模型分工策略:
| 角色 | 模型 | 职责 | 温度 |
|---|---|---|---|
| 生成模型 | qwen3-coder(小模型) | SQL生成/修正 | 0.0-0.3 |
| 裁判模型 | qwen3(大模型) | 结果评估/错误诊断 | 0.01 |
裁判模型的两大功能:
功能1:完整诊断(_call_llm_judge_realistic)
输出JSON结构:
json
{
"is_correct": false,
"score": 45,
"error_tags": ["COLUMN_NAME_FIX"],
"repair_task": "sql_correct",
"comment": "列名拼写错误",
"advice": "将PART_NM改为PART_NAME_C"
}
功能2:简化判断(_call_llm_judge_mode3_answer_only)
仅输出:
json
{
"is_correct": true,
"comment": "SQL正确回答了问题"
}
设计考量:
- 完整诊断用于反思阶段(指导修正)
- 简化判断用于最终评估(快速筛选)
3.4 细粒度错误标签体系
系统定义了15种诊断标签,覆盖常见SQL错误:
python
DIAG_TAGS = [
# 语法/拼写
"SYNTAX_FIX_KEYWORD_TYPO", # 关键字拼写错误
"SYNTAX_FIX_PUNCTUATION", # 标点/括号问题
"SYNTAX_FIX_IDENTIFIER_QUOTE", # 标识符引号
# 列/表/别名
"ALIAS_ADD_FOR_EXPR", # 表达式缺别名
"COLUMN_NAME_FIX", # 列名错误
"TABLE_NAME_FIX", # 表名错误
"AMBIGUOUS_COLUMN_ADD_QUALIFIER", # 歧义列
# 类型/空值
"TYPE_CAST_NUMERIC", # 数值类型转换
"TYPE_CAST_DATE", # 日期类型转换
"NULL_COALESCE_ADD", # NULL处理
"DIVIDE_BY_ZERO_GUARD", # 除零保护
# 语义/逻辑
"AGGREGATION_FIX", # 聚合/GROUP BY
"FILTER_CONDITION_FIX", # WHERE条件
"TIME_WINDOW_FIX", # 时间窗口
"SELECT_COLUMNS_FIX", # SELECT列选择
"ORDER_BY_FIX", # ORDER BY排序
]
标签映射规则:
- 每个错误只分配1个最相关标签
- 标签驱动修复策略(
repair_task) - 提供针对性建议(
advice字段)
3.5 SQL清洗与安全防护
LLM输出的SQL往往包含噪声,系统实现了8步清洗流水线:
python
def _strip_code_fences(text: str) -> str:
# 1. 去除Markdown代码块(```sql ... ```)
# 2. 去除<think>推理块
# 3. 识别并剔除推理开场白("好的,我需要...")
# 4. 从第一个SQL关键字开始截取
# 5. 去除EOS标记(</s>, <|eot_id|>等)
# 6. 去除外层双引号
# 7. 清理末尾分号
# 8. 清除推理结束标记("因此SQL为:")
return cleaned_sql
安全校验:
python
def _ensure_single_statement(sql: str) -> Optional[str]:
"""确保只有一条SELECT语句"""
# 禁止多条语句(防止注入)
# 禁止非SELECT操作(INSERT/UPDATE/DELETE)
# 允许安全前缀(SET NOCOUNT ON; DECLARE @var)
3.6 数据库容错机制
生产环境数据库连接不稳定,系统实现了三层防护:
python
# 1. 预检机制
cur.execute("SELECT 1") # 启动时验证连接
# 2. 断连检测
def _is_db_disconnected(exec_ret):
err = str(exec_ret.get("error"))
return any(k in err for k in [
"Not connected to any MS SQL server",
"DBPROCESS is dead",
"Connection reset",
])
# 3. 自动重连
if _is_db_disconnected(exec_ret):
_close_db(conn)
conn = _connect_db()
exec_ret = execute_sql_server(...) # 重试
四、工程实践亮点
4.1 断点续跑(Resume)
处理大规模数据集时,中断后可无缝续跑:
python
# 跳过已成功的行
if resume and existing_sql and _is_true(existing_judge):
skipped_cnt += 1
continue
# 周期落盘(每N条保存一次)
if attempted % save_every == 0:
wb.save(target_xlsx)
ff.flush()
收益:
- 避免重复计算(节省成本)
- 故障恢复(网络中断/服务重启)
- 增量处理(新数据追加)
4.2 LLM服务降级
面对不稳定的LLM服务,系统实现多级回退:
python
# 1. 重试机制(最多N次)
for attempt in range(llm_retries + 1):
try:
resp = client.chat.completions.create(...)
break
except Exception as e:
if _is_retryable_llm_error(e):
time.sleep(1.0)
continue
raise
# 2. response_format降级
try:
resp = client.chat.completions.create(
response_format={"type": "json_object"}
)
except Exception:
resp = client.chat.completions.create() # 不带format
# 3. 裁判失败兜底
if judge_parse_failed:
last_judge = _fallback_diagnosis_when_judge_down(...)
4.3 执行结果压缩
为避免Prompt过长,系统对执行结果做智能压缩:
python
def _execution_summary(exec_result, max_preview_rows=3):
return {
"success": True/False,
"error": "错误信息(失败时)",
"row_count": 10,
"columns": ["col1", "col2"],
"rows_preview": [前3行数据], # 截断长文本
"elapsed_ms": 125
}
压缩策略:
- 成功且行数少 → 全量返回
- 行数过多 → 仅返回摘要(前3行预览)
- 字符串超过120字符 → 截断
4.4 进度跟踪与日志
系统提供实时进度反馈:
python
[PROGRESS] attempted=50 filled=42 skipped=5 fail=3 (rows=200)
[JUDGE_FALSE] row=23 row_count=0 truncated=False comment=过滤条件过严
[EXEC_FAIL] row=45 err=Invalid column name 'PART_NAM'
五、性能优化技巧
5.1 并行化策略
| 环节 | 并行度 | 说明 |
|---|---|---|
| n-best生成 | 5线程 | 同时生成5个候选SQL |
| Judge评估 | 8线程 | 并行判定多个候选 |
| 数据库执行 | 串行 | 避免DB压力过大 |
5.2 Token控制
python
# 限制Judge输出长度
judge_max_tokens = int(os.getenv("JUDGE_MAX_TOKENS", "256"))
# 避免temperature=0触发后端冲突
temp_for_judge = max(0.01, temperature)
# 文本截断
def _truncate_text(s, max_chars=2000):
if len(s) <= max_chars:
return s
return s[:max_chars] + "...<truncated>"
5.3 缓存机制
python
# 表列名缓存(避免重复查询系统表)
cols_cache: Dict[str, List[str]] = {}
if table not in cols_cache:
cols_cache[table] = get_table_columns(conn, table)
六、典型错误案例与修复
案例1:列名拼写错误
错误SQL:
sql
SELECT PART_NAM, PRICE FROM VIEW_PART_USER_HISTORY
裁判诊断:
json
{
"error_tags": ["COLUMN_NAME_FIX"],
"advice": "将PART_NAM改为PART_NAME_C"
}
修复后:
sql
SELECT PART_NAME_C, PRICE FROM VIEW_PART_USER_HISTORY
案例2:时间函数不兼容
错误SQL:
sql
SELECT * FROM orders WHERE QUARTER(order_date) = 1
裁判诊断:
json
{
"error_tags": ["TIME_WINDOW_FIX"],
"advice": "SQL Server不支持QUARTER(),改用DATEPART(QUARTER, order_date)"
}
案例3:GROUP BY缺失
错误SQL:
sql
SELECT category, SUM(amount) FROM orders
裁判诊断:
json
{
"error_tags": ["AGGREGATION_FIX"],
"advice": "非聚合列category需加入GROUP BY"
}
七、使用指南
7.1 基本用法
bash
# 模式2:从Excel生成SQL
python validate_sql.py \
--questions-xlsx questions.xlsx \
--ref-xlsx sql3.xlsx \
--db-host localhost \
--db-user sa \
--db-password your_password \
--db-name your_database \
--small-base-url http://your-llm-api \
--small-model qwen3-coder \
--judge-model qwen3 \
--max-rounds 3 \
--gen-n 5
7.2 关键参数
| 参数 | 默认值 | 说明 |
|---|---|---|
--max-rounds |
3 | 最大反思轮次 |
--gen-n |
5 | n-best候选数量 |
--gen-parallel |
5 | 并行生成线程数 |
--judge-parallel |
8 | 并行Judge线程数 |
--save-every |
20 | 每N条落盘一次 |
--resume |
True | 启用断点续跑 |
--strict-health |
True | 严格健康检查 |
7.3 输出文件
plaintext
questions_result_20260414.xlsx # 带SQL和判定的Excel
sql_gen_success.jsonl # 成功轨迹
sql_gen_failures.jsonl # 失败轨迹(含每轮尝试)
八、局限性与改进方向
8.1 当前局限
- 单表限制:主要针对单表查询,复杂JOIN场景支持有限
- Schema静态:表结构硬编码在代码中(
STATIC_SCHEMA_COLUMNS) - 依赖pymssql:仅支持SQL Server,未抽象数据库接口
- 单文件臃肿:3869行代码维护成本高(已模块化重构)
8.2 改进建议
- 多表JOIN推理:引入Schema链接(Schema Linking)技术
- 动态Schema加载:从数据库系统表自动获取列信息
- 数据库抽象层:支持MySQL/PostgreSQL等多引擎
- 向量检索增强:集成FAISS实现few-shot样例智能检索
- 执行计划分析:结合EXPLAIN PLAN优化复杂查询
九、总结
validate_sql.py 展现了一个工业级Text-to-SQL系统应有的工程素养:
✅ 鲁棒性:断连重连、服务降级、异常捕获
✅ 可维护性:分层架构、清晰命名、详细注释
✅ 可扩展性:模块化设计、配置驱动、插件式标签
✅ 可观测性:进度跟踪、轨迹记录、详细日志
它告诉我们:优秀的AI应用不仅是算法创新,更是工程艺术的体现。
附录:核心代码统计
| 模块 | 行数 | 占比 |
|---|---|---|
| SQL生成与反思 | ~800 | 21% |
| 裁判模型评估 | ~600 | 16% |
| 数据库执行 | ~200 | 5% |
| 工具函数 | ~500 | 13% |
| 流程编排 | ~1200 | 31% |
| 其他 | ~569 | 14% |
| 总计 | 3869 | 100% |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)