向量化执行引擎中的 AI 辅助查询优化:从代价模型到执行计划选择
向量化执行引擎中的 AI 辅助查询优化:从代价模型到执行计划选择

一、查询优化器的"代价盲区":传统代价模型的局限性
数据库查询优化器的核心任务是:为一条 SQL 选择执行代价最低的执行计划。传统优化器基于统计信息(行数估计、唯一值数量、数据分布直方图)计算代价,但统计信息是采样得到的,可能不准确。当统计信息过期或数据倾斜严重时,优化器可能选择次优计划,导致查询性能下降数十倍。
AI 辅助查询优化的核心思路是:用机器学习模型替代或增强传统代价模型。模型从历史查询的执行数据中学习"查询特征 → 执行代价"的映射关系,能够捕捉传统统计信息无法表达的复杂相关性。
二、AI 辅助查询优化的架构
flowchart TD
A[SQL 查询] --> B[传统优化器: 生成候选计划]
B --> C[计划 1: Hash Join]
B --> D[计划 2: Nested Loop]
B --> E[计划 3: Merge Join]
C & D & E --> F[AI 代价模型: 预测各计划执行时间]
F --> G{AI 预测 vs 传统代价}
G -->|AI 更可信| H[选择 AI 推荐的计划]
G -->|传统更可信| I[选择传统代价最低的计划]
H & I --> J[执行查询]
J --> K[记录实际执行时间]
K --> L[反馈训练: 更新 AI 模型]
三、AI 代价模型的代码实现
3.1 查询特征提取
@dataclass
class PlanFeatures:
"""执行计划特征:用于 AI 代价预测"""
# 扫描特征
scan_type: str # seq_scan / index_scan / bitmap_scan
estimated_rows: float # 估计扫描行数
table_size_mb: float # 表大小
# Join 特征
join_type: str # hash / nested_loop / merge
join_cardinality: float # Join 结果估计行数
# 聚合特征
has_group_by: bool
group_by_columns: int
has_distinct: bool
# 排序特征
has_order_by: bool
sort_columns: int
# 输出特征
result_rows: float # 估计结果行数
class PlanFeatureExtractor:
"""从执行计划中提取特征向量"""
def extract(self, explain_json: dict) -> PlanFeatures:
plan = explain_json['plan']
return PlanFeatures(
scan_type=self._detect_scan_type(plan),
estimated_rows=plan.get('rows', 0),
table_size_mb=self._get_table_size(plan),
join_type=self._detect_join_type(plan),
join_cardinality=self._estimate_join_cardinality(plan),
has_group_by='Group' in plan.get('node_type', ''),
group_by_columns=len(plan.get('group_key', [])),
has_distinct='Distinct' in plan.get('node_type', ''),
has_order_by='Sort' in plan.get('node_type', ''),
sort_columns=len(plan.get('sort_key', [])),
result_rows=plan.get('rows', 0),
)
def to_vector(self, features: PlanFeatures) -> list:
"""将特征转换为数值向量"""
scan_map = {'seq_scan': 0, 'index_scan': 1, 'bitmap_scan': 2}
join_map = {'hash': 0, 'nested_loop': 1, 'merge': 2, 'none': 3}
return [
scan_map.get(features.scan_type, 0),
np.log1p(features.estimated_rows),
np.log1p(features.table_size_mb),
join_map.get(features.join_type, 3),
np.log1p(features.join_cardinality),
int(features.has_group_by),
features.group_by_columns,
int(features.has_distinct),
int(features.has_order_by),
features.sort_columns,
np.log1p(features.result_rows),
]
3.2 AI 代价预测模型
import torch
import torch.nn as nn
class QueryCostModel(nn.Module):
"""
查询代价预测模型:基于执行计划特征预测执行时间
使用多层感知机,输入为计划特征向量,输出为预测执行时间(ms)
"""
def __init__(self, input_dim: int = 11, hidden_dim: int = 64):
super().__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_dim, 1),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# 预测 log(执行时间),避免大值主导损失
return self.network(x)
class AICostEstimator:
"""AI 代价估计器:集成到查询优化器中"""
def __init__(self, model_path: str):
self.model = QueryCostModel()
self.model.load_state_dict(torch.load(model_path))
self.model.eval()
self.feature_extractor = PlanFeatureExtractor()
@torch.no_grad()
def predict_cost(self, explain_json: dict) -> float:
"""预测执行计划的代价(执行时间 ms)"""
features = self.feature_extractor.extract(explain_json)
vector = self.feature_extractor.to_vector(features)
x = torch.tensor([vector], dtype=torch.float32)
log_time = self.model(x).item()
return np.exp(log_time) # 反归一化
def select_best_plan(self, plans: List[dict]) -> int:
"""从多个候选计划中选择 AI 预测代价最低的"""
costs = []
for plan in plans:
cost = self.predict_cost(plan)
costs.append(cost)
best_idx = np.argmin(costs)
return best_idx
3.3 模型训练与反馈循环
class CostModelTrainer:
"""代价模型训练器:从历史查询执行数据中学习"""
def train(self, training_data: List[tuple]):
"""
training_data: [(plan_features, actual_time_ms), ...]
"""
X = torch.tensor(
[feat for feat, _ in training_data],
dtype=torch.float32
)
y = torch.tensor(
[np.log(time) for _, time in training_data],
dtype=torch.float32
).unsqueeze(1)
model = QueryCostModel()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
for epoch in range(100):
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
torch.save(model.state_dict(), 'cost_model.pt')
四、AI 查询优化的边界分析与架构权衡
模型的冷启动问题。AI 代价模型需要大量历史查询数据训练,新部署的数据库没有足够数据。建议在冷启动阶段使用传统代价模型,积累足够数据后再切换到 AI 模型。
模型泛化性。训练数据覆盖的查询模式有限,遇到新模式时预测可能不准确。建议设置置信度阈值:当模型对预测结果不确定时,回退到传统代价模型。
推理延迟。AI 模型的推理时间(约 1-5ms)增加了查询优化阶段的开销。对于短查询(执行时间 <10ms),优化阶段的额外开销可能超过查询本身的执行时间。建议对短查询跳过 AI 优化。
适用边界:AI 查询优化最适合查询模式复杂、数据倾斜严重、传统优化器频繁选错计划的场景。对于简单查询或数据分布均匀的场景,传统优化器已经足够。
五、总结
AI 辅助查询优化通过机器学习模型增强传统代价模型,从历史执行数据中学习"查询特征 → 执行代价"的映射。落地时需关注冷启动问题、模型泛化性和推理延迟。建议采用"传统优先 + AI 辅助"的混合策略,在模型置信度高时使用 AI 预测,低时回退到传统代价模型。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)