摘要:本文深入剖析一个近4000行的生产级SQL生成与验证系统,揭示其如何通过多轮反思、并行候选、智能裁判、断点续跑等机制,实现从自然语言到可执行SQL的高精度转换。我们将拆解其核心架构、关键技术点和工程实践,为构建可靠的AI数据查询系统提供参考。


一、系统概览:不只是"LLM生成SQL"那么简单

当你看到 validate_sql.py 这个3869行的文件时,第一反应可能是:"为什么一个简单的Text-to-SQL任务需要这么多代码?"

答案在于:生产环境与Demo的本质区别

1.1 三大核心模式

该系统支持三种工作模式:

模式 功能 应用场景
模式1 JSONL批量验证 已有SQL数据集的回归测试
模式2 Excel生成+验证 从问题列表自动生成SQL(主流程)
模式3 Excel已有SQL评估 对历史SQL进行质量审计

1.2 核心能力矩阵


plaintext

┌─────────────────────────────────────────┐
│         validate_sql.py 核心能力          │
├─────────────────────────────────────────┤
│ ✓ 多轮反思纠错(最多N轮迭代)              │
│ ✓ n-best并行候选生成                     │
│ ✓ 双模型协作(小模型生成 + 大模型裁判)     │
│ ✓ 细粒度错误诊断(15种标签体系)           │
│ ✓ 数据库断连自动重连                      │
│ ✓ 断点续跑与周期落盘                      │
│ ✓ LLM服务降级与重试机制                   │
│ ✓ SQL清洗与安全防护                       │
└─────────────────────────────────────────┘

二、架构设计:分层解耦的工程哲学

2.1 三层架构

虽然代码在一个文件中,但逻辑上分为三个层次:


plaintext

┌──────────────────────────────────────┐
│   Layer 1: 流程编排层                  │
│   - run_generate_validate_for_xlsx() │
│   - run_judge_existing_sql_for_xlsx()│
│   - main()                           │
├──────────────────────────────────────┤
│   Layer 2: 业务逻辑层                  │
│   - build_generate_messages()        │
│   - build_reflect_messages()         │
│   - _call_llm_judge_realistic()      │
│   - execute_sql_server()             │
├──────────────────────────────────────┤
│   Layer 3: 工具函数层                  │
│   - _strip_code_fences()             │
│   - _ensure_single_statement()       │
│   - _extract_exec_error_facts()      │
│   - _truncate_text()                 │
└──────────────────────────────────────┘

2.2 关键数据结构


python

@dataclass
class XlsxRow:
    """Excel行数据结构"""
    表名: str
    问题种类: str
    问题: str
    sql: str

@dataclass
class SqlTask:
    """SQL验证任务"""
    idx: int
    obj: Dict[str, Any]
    key: str  # sql / sql_turn1 / sql_turn2
    sql: str

三、核心技术深度解析

3.1 多轮反思机制(Reflection Loop)

这是系统的灵魂所在。传统Text-to-SQL是"一次生成",而本系统采用生成-执行-评估-修正的闭环:


python

for r in range(max_rounds):  # 默认3轮
    if r == 0:
        # 第1轮:初始生成
        msgs = build_generate_messages(...)
    else:
        # 后续轮次:基于上一轮反馈反思
        fb = _summarize_exec_feedback_for_small(last_exec)
        tags = last_judge.get("error_tags")
        advice = last_judge.get("advice")
        msgs = build_reflect_messages(
            prev_sql=prev_sql,
            exec_feedback=fb,
            error_tags=tags,
            judge_advice=advice,
            ...
        )
    
    gen_sql = call_llm_sql(client=small_client, messages=msgs)
    exec_ret = execute_sql_server(sql=gen_sql)
    
    if exec_ret["success"] and exec_ret["row_count"] > 0:
        break  # 成功则退出
    
    # 失败则进入下一轮反思
    last_judge = _call_llm_judge_realistic(...)
    prev_sql = gen_sql

关键洞察:

  • 每轮反思都携带结构化反馈(非原始报错)
  • 裁判模型输出错误标签修复建议
  • 小模型根据标签定向修正(而非盲目重试)

3.2 n-best并行候选生成

首轮生成不是单条SQL,而是并行生成N个候选(默认5个):


python

n_best = max(1, int(gen_n))  # 默认5
max_workers = min(int(gen_parallel), n_best)

with ThreadPoolExecutor(max_workers=max_workers) as ex:
    futs = {ex.submit(_gen_one, i + 1): (i + 1) for i in range(n_best)}
    for fut in as_completed(futs):
        sql_out = fut.result()
        if sql_out:
            gen_sqls.append(sql_out)

优势:

  • 提高首轮命中率(多样性采样)
  • 避免串行等待(并行加速)
  • 去重后保留唯一候选

3.3 双模型协作架构

系统采用大小模型分工策略:

角色 模型 职责 温度
生成模型 qwen3-coder(小模型) SQL生成/修正 0.0-0.3
裁判模型 qwen3(大模型) 结果评估/错误诊断 0.01

裁判模型的两大功能:

功能1:完整诊断(_call_llm_judge_realistic

输出JSON结构:


json

{
  "is_correct": false,
  "score": 45,
  "error_tags": ["COLUMN_NAME_FIX"],
  "repair_task": "sql_correct",
  "comment": "列名拼写错误",
  "advice": "将PART_NM改为PART_NAME_C"
}
功能2:简化判断(_call_llm_judge_mode3_answer_only

仅输出:


json

{
  "is_correct": true,
  "comment": "SQL正确回答了问题"
}

设计考量:

  • 完整诊断用于反思阶段(指导修正)
  • 简化判断用于最终评估(快速筛选)

3.4 细粒度错误标签体系

系统定义了15种诊断标签,覆盖常见SQL错误:


python

DIAG_TAGS = [
    # 语法/拼写
    "SYNTAX_FIX_KEYWORD_TYPO",      # 关键字拼写错误
    "SYNTAX_FIX_PUNCTUATION",       # 标点/括号问题
    "SYNTAX_FIX_IDENTIFIER_QUOTE",  # 标识符引号
    
    # 列/表/别名
    "ALIAS_ADD_FOR_EXPR",           # 表达式缺别名
    "COLUMN_NAME_FIX",              # 列名错误
    "TABLE_NAME_FIX",               # 表名错误
    "AMBIGUOUS_COLUMN_ADD_QUALIFIER", # 歧义列
    
    # 类型/空值
    "TYPE_CAST_NUMERIC",            # 数值类型转换
    "TYPE_CAST_DATE",               # 日期类型转换
    "NULL_COALESCE_ADD",            # NULL处理
    "DIVIDE_BY_ZERO_GUARD",         # 除零保护
    
    # 语义/逻辑
    "AGGREGATION_FIX",              # 聚合/GROUP BY
    "FILTER_CONDITION_FIX",         # WHERE条件
    "TIME_WINDOW_FIX",              # 时间窗口
    "SELECT_COLUMNS_FIX",           # SELECT列选择
    "ORDER_BY_FIX",                 # ORDER BY排序
]

标签映射规则:

  • 每个错误只分配1个最相关标签
  • 标签驱动修复策略(repair_task
  • 提供针对性建议(advice字段)

3.5 SQL清洗与安全防护

LLM输出的SQL往往包含噪声,系统实现了8步清洗流水线


python

def _strip_code_fences(text: str) -> str:
    # 1. 去除Markdown代码块(```sql ... ```)
    # 2. 去除<think>推理块
    # 3. 识别并剔除推理开场白("好的,我需要...")
    # 4. 从第一个SQL关键字开始截取
    # 5. 去除EOS标记(</s>, <|eot_id|>等)
    # 6. 去除外层双引号
    # 7. 清理末尾分号
    # 8. 清除推理结束标记("因此SQL为:")
    return cleaned_sql

安全校验:


python

def _ensure_single_statement(sql: str) -> Optional[str]:
    """确保只有一条SELECT语句"""
    # 禁止多条语句(防止注入)
    # 禁止非SELECT操作(INSERT/UPDATE/DELETE)
    # 允许安全前缀(SET NOCOUNT ON; DECLARE @var)

3.6 数据库容错机制

生产环境数据库连接不稳定,系统实现了三层防护


python

# 1. 预检机制
cur.execute("SELECT 1")  # 启动时验证连接

# 2. 断连检测
def _is_db_disconnected(exec_ret):
    err = str(exec_ret.get("error"))
    return any(k in err for k in [
        "Not connected to any MS SQL server",
        "DBPROCESS is dead",
        "Connection reset",
    ])

# 3. 自动重连
if _is_db_disconnected(exec_ret):
    _close_db(conn)
    conn = _connect_db()
    exec_ret = execute_sql_server(...)  # 重试

四、工程实践亮点

4.1 断点续跑(Resume)

处理大规模数据集时,中断后可无缝续跑


python

# 跳过已成功的行
if resume and existing_sql and _is_true(existing_judge):
    skipped_cnt += 1
    continue

# 周期落盘(每N条保存一次)
if attempted % save_every == 0:
    wb.save(target_xlsx)
    ff.flush()

收益:

  • 避免重复计算(节省成本)
  • 故障恢复(网络中断/服务重启)
  • 增量处理(新数据追加)

4.2 LLM服务降级

面对不稳定的LLM服务,系统实现多级回退


python

# 1. 重试机制(最多N次)
for attempt in range(llm_retries + 1):
    try:
        resp = client.chat.completions.create(...)
        break
    except Exception as e:
        if _is_retryable_llm_error(e):
            time.sleep(1.0)
            continue
        raise

# 2. response_format降级
try:
    resp = client.chat.completions.create(
        response_format={"type": "json_object"}
    )
except Exception:
    resp = client.chat.completions.create()  # 不带format

# 3. 裁判失败兜底
if judge_parse_failed:
    last_judge = _fallback_diagnosis_when_judge_down(...)

4.3 执行结果压缩

为避免Prompt过长,系统对执行结果做智能压缩


python

def _execution_summary(exec_result, max_preview_rows=3):
    return {
        "success": True/False,
        "error": "错误信息(失败时)",
        "row_count": 10,
        "columns": ["col1", "col2"],
        "rows_preview": [前3行数据],  # 截断长文本
        "elapsed_ms": 125
    }

压缩策略:

  • 成功且行数少 → 全量返回
  • 行数过多 → 仅返回摘要(前3行预览)
  • 字符串超过120字符 → 截断

4.4 进度跟踪与日志

系统提供实时进度反馈


python

[PROGRESS] attempted=50 filled=42 skipped=5 fail=3 (rows=200)
[JUDGE_FALSE] row=23 row_count=0 truncated=False comment=过滤条件过严
[EXEC_FAIL] row=45 err=Invalid column name 'PART_NAM'

五、性能优化技巧

5.1 并行化策略

环节 并行度 说明
n-best生成 5线程 同时生成5个候选SQL
Judge评估 8线程 并行判定多个候选
数据库执行 串行 避免DB压力过大

5.2 Token控制


python

# 限制Judge输出长度
judge_max_tokens = int(os.getenv("JUDGE_MAX_TOKENS", "256"))

# 避免temperature=0触发后端冲突
temp_for_judge = max(0.01, temperature)

# 文本截断
def _truncate_text(s, max_chars=2000):
    if len(s) <= max_chars:
        return s
    return s[:max_chars] + "...<truncated>"

5.3 缓存机制


python

# 表列名缓存(避免重复查询系统表)
cols_cache: Dict[str, List[str]] = {}
if table not in cols_cache:
    cols_cache[table] = get_table_columns(conn, table)

六、典型错误案例与修复

案例1:列名拼写错误

错误SQL:


sql

SELECT PART_NAM, PRICE FROM VIEW_PART_USER_HISTORY

裁判诊断:


json

{
  "error_tags": ["COLUMN_NAME_FIX"],
  "advice": "将PART_NAM改为PART_NAME_C"
}

修复后:


sql

SELECT PART_NAME_C, PRICE FROM VIEW_PART_USER_HISTORY

案例2:时间函数不兼容

错误SQL:


sql

SELECT * FROM orders WHERE QUARTER(order_date) = 1

裁判诊断:


json

{
  "error_tags": ["TIME_WINDOW_FIX"],
  "advice": "SQL Server不支持QUARTER(),改用DATEPART(QUARTER, order_date)"
}

案例3:GROUP BY缺失

错误SQL:


sql

SELECT category, SUM(amount) FROM orders

裁判诊断:


json

{
  "error_tags": ["AGGREGATION_FIX"],
  "advice": "非聚合列category需加入GROUP BY"
}

七、使用指南

7.1 基本用法


bash

# 模式2:从Excel生成SQL
python validate_sql.py \
    --questions-xlsx questions.xlsx \
    --ref-xlsx sql3.xlsx \
    --db-host localhost \
    --db-user sa \
    --db-password your_password \
    --db-name your_database \
    --small-base-url http://your-llm-api \
    --small-model qwen3-coder \
    --judge-model qwen3 \
    --max-rounds 3 \
    --gen-n 5

7.2 关键参数

参数 默认值 说明
--max-rounds 3 最大反思轮次
--gen-n 5 n-best候选数量
--gen-parallel 5 并行生成线程数
--judge-parallel 8 并行Judge线程数
--save-every 20 每N条落盘一次
--resume True 启用断点续跑
--strict-health True 严格健康检查

7.3 输出文件


plaintext

questions_result_20260414.xlsx  # 带SQL和判定的Excel
sql_gen_success.jsonl            # 成功轨迹
sql_gen_failures.jsonl           # 失败轨迹(含每轮尝试)

八、局限性与改进方向

8.1 当前局限

  1. 单表限制:主要针对单表查询,复杂JOIN场景支持有限
  2. Schema静态:表结构硬编码在代码中(STATIC_SCHEMA_COLUMNS
  3. 依赖pymssql:仅支持SQL Server,未抽象数据库接口
  4. 单文件臃肿:3869行代码维护成本高(已模块化重构)

8.2 改进建议

  1. 多表JOIN推理:引入Schema链接(Schema Linking)技术
  2. 动态Schema加载:从数据库系统表自动获取列信息
  3. 数据库抽象层:支持MySQL/PostgreSQL等多引擎
  4. 向量检索增强:集成FAISS实现few-shot样例智能检索
  5. 执行计划分析:结合EXPLAIN PLAN优化复杂查询

九、总结

validate_sql.py 展现了一个工业级Text-to-SQL系统应有的工程素养:

✅ 鲁棒性:断连重连、服务降级、异常捕获
✅ 可维护性:分层架构、清晰命名、详细注释
✅ 可扩展性:模块化设计、配置驱动、插件式标签
✅ 可观测性:进度跟踪、轨迹记录、详细日志

它告诉我们:优秀的AI应用不仅是算法创新,更是工程艺术的体现


附录:核心代码统计

模块 行数 占比
SQL生成与反思 ~800 21%
裁判模型评估 ~600 16%
数据库执行 ~200 5%
工具函数 ~500 13%
流程编排 ~1200 31%
其他 ~569 14%
总计 3869 100%
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐