向量化执行引擎中的 AI 辅助查询优化:从代价模型到执行计划选择

cover

一、查询优化器的"代价盲区":传统代价模型的局限性

数据库查询优化器的核心任务是:为一条 SQL 选择执行代价最低的执行计划。传统优化器基于统计信息(行数估计、唯一值数量、数据分布直方图)计算代价,但统计信息是采样得到的,可能不准确。当统计信息过期或数据倾斜严重时,优化器可能选择次优计划,导致查询性能下降数十倍。

AI 辅助查询优化的核心思路是:用机器学习模型替代或增强传统代价模型。模型从历史查询的执行数据中学习"查询特征 → 执行代价"的映射关系,能够捕捉传统统计信息无法表达的复杂相关性。

二、AI 辅助查询优化的架构

flowchart TD
    A[SQL 查询] --> B[传统优化器: 生成候选计划]
    B --> C[计划 1: Hash Join]
    B --> D[计划 2: Nested Loop]
    B --> E[计划 3: Merge Join]
    C & D & E --> F[AI 代价模型: 预测各计划执行时间]
    F --> G{AI 预测 vs 传统代价}
    G -->|AI 更可信| H[选择 AI 推荐的计划]
    G -->|传统更可信| I[选择传统代价最低的计划]
    H & I --> J[执行查询]
    J --> K[记录实际执行时间]
    K --> L[反馈训练: 更新 AI 模型]

三、AI 代价模型的代码实现

3.1 查询特征提取

@dataclass
class PlanFeatures:
    """执行计划特征:用于 AI 代价预测"""
    # 扫描特征
    scan_type: str           # seq_scan / index_scan / bitmap_scan
    estimated_rows: float    # 估计扫描行数
    table_size_mb: float     # 表大小

    # Join 特征
    join_type: str           # hash / nested_loop / merge
    join_cardinality: float  # Join 结果估计行数

    # 聚合特征
    has_group_by: bool
    group_by_columns: int
    has_distinct: bool

    # 排序特征
    has_order_by: bool
    sort_columns: int

    # 输出特征
    result_rows: float       # 估计结果行数

class PlanFeatureExtractor:
    """从执行计划中提取特征向量"""

    def extract(self, explain_json: dict) -> PlanFeatures:
        plan = explain_json['plan']
        return PlanFeatures(
            scan_type=self._detect_scan_type(plan),
            estimated_rows=plan.get('rows', 0),
            table_size_mb=self._get_table_size(plan),
            join_type=self._detect_join_type(plan),
            join_cardinality=self._estimate_join_cardinality(plan),
            has_group_by='Group' in plan.get('node_type', ''),
            group_by_columns=len(plan.get('group_key', [])),
            has_distinct='Distinct' in plan.get('node_type', ''),
            has_order_by='Sort' in plan.get('node_type', ''),
            sort_columns=len(plan.get('sort_key', [])),
            result_rows=plan.get('rows', 0),
        )

    def to_vector(self, features: PlanFeatures) -> list:
        """将特征转换为数值向量"""
        scan_map = {'seq_scan': 0, 'index_scan': 1, 'bitmap_scan': 2}
        join_map = {'hash': 0, 'nested_loop': 1, 'merge': 2, 'none': 3}

        return [
            scan_map.get(features.scan_type, 0),
            np.log1p(features.estimated_rows),
            np.log1p(features.table_size_mb),
            join_map.get(features.join_type, 3),
            np.log1p(features.join_cardinality),
            int(features.has_group_by),
            features.group_by_columns,
            int(features.has_distinct),
            int(features.has_order_by),
            features.sort_columns,
            np.log1p(features.result_rows),
        ]

3.2 AI 代价预测模型

import torch
import torch.nn as nn

class QueryCostModel(nn.Module):
    """
    查询代价预测模型:基于执行计划特征预测执行时间
    使用多层感知机,输入为计划特征向量,输出为预测执行时间(ms)
    """

    def __init__(self, input_dim: int = 11, hidden_dim: int = 64):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, 1),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # 预测 log(执行时间),避免大值主导损失
        return self.network(x)

class AICostEstimator:
    """AI 代价估计器:集成到查询优化器中"""

    def __init__(self, model_path: str):
        self.model = QueryCostModel()
        self.model.load_state_dict(torch.load(model_path))
        self.model.eval()
        self.feature_extractor = PlanFeatureExtractor()

    @torch.no_grad()
    def predict_cost(self, explain_json: dict) -> float:
        """预测执行计划的代价(执行时间 ms)"""
        features = self.feature_extractor.extract(explain_json)
        vector = self.feature_extractor.to_vector(features)
        x = torch.tensor([vector], dtype=torch.float32)
        log_time = self.model(x).item()
        return np.exp(log_time)  # 反归一化

    def select_best_plan(self, plans: List[dict]) -> int:
        """从多个候选计划中选择 AI 预测代价最低的"""
        costs = []
        for plan in plans:
            cost = self.predict_cost(plan)
            costs.append(cost)

        best_idx = np.argmin(costs)
        return best_idx

3.3 模型训练与反馈循环

class CostModelTrainer:
    """代价模型训练器:从历史查询执行数据中学习"""

    def train(self, training_data: List[tuple]):
        """
        training_data: [(plan_features, actual_time_ms), ...]
        """
        X = torch.tensor(
            [feat for feat, _ in training_data],
            dtype=torch.float32
        )
        y = torch.tensor(
            [np.log(time) for _, time in training_data],
            dtype=torch.float32
        ).unsqueeze(1)

        model = QueryCostModel()
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
        loss_fn = nn.MSELoss()

        for epoch in range(100):
            pred = model(X)
            loss = loss_fn(pred, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        torch.save(model.state_dict(), 'cost_model.pt')

四、AI 查询优化的边界分析与架构权衡

模型的冷启动问题。AI 代价模型需要大量历史查询数据训练,新部署的数据库没有足够数据。建议在冷启动阶段使用传统代价模型,积累足够数据后再切换到 AI 模型。

模型泛化性。训练数据覆盖的查询模式有限,遇到新模式时预测可能不准确。建议设置置信度阈值:当模型对预测结果不确定时,回退到传统代价模型。

推理延迟。AI 模型的推理时间(约 1-5ms)增加了查询优化阶段的开销。对于短查询(执行时间 <10ms),优化阶段的额外开销可能超过查询本身的执行时间。建议对短查询跳过 AI 优化。

适用边界:AI 查询优化最适合查询模式复杂、数据倾斜严重、传统优化器频繁选错计划的场景。对于简单查询或数据分布均匀的场景,传统优化器已经足够。

五、总结

AI 辅助查询优化通过机器学习模型增强传统代价模型,从历史执行数据中学习"查询特征 → 执行代价"的映射。落地时需关注冷启动问题、模型泛化性和推理延迟。建议采用"传统优先 + AI 辅助"的混合策略,在模型置信度高时使用 AI 预测,低时回退到传统代价模型。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐