AI 驱动的索引推荐系统:从工作负载特征到自动索引创建

cover

一、索引管理的"经验盲区":DBA 的隐性知识难以传承

数据库索引是查询性能的关键杠杆,但索引的选择高度依赖 DBA 的经验——哪些列组合需要联合索引、索引的列顺序如何决定、何时需要覆盖索引避免回表。这些决策需要同时理解查询模式、数据分布和写入代价。一个 100 张表、500 条慢查询的系统中,可能的索引组合是指数级的,DBA 只能凭经验挑选"最可能有效"的索引。更关键的是,当 DBA 离职后,这些隐性知识随之流失,新 DBA 需要重新积累。AI 驱动的索引推荐系统将工作负载分析自动化,从慢查询日志中提取索引候选,评估收益和代价,输出可执行的索引创建建议。

二、索引推荐的问题建模与搜索空间

2.1 从工作负载到索引候选的搜索流程

flowchart TB
    A[慢查询日志] --> B[查询特征提取]
    B --> C[谓词列提取]
    B --> D[Join 列提取]
    B --> E[排序/分组列提取]

    C & D & E --> F[索引候选生成]
    F --> G[单列索引]
    F --> H[联合索引<br/>排列组合]
    F --> I[覆盖索引<br/>包含 SELECT 列]

    G & H & I --> J[代价模型评估]
    J --> K[查询加速收益]
    J --> L[写入代价增加]
    J --> M[存储空间开销]

    K & L & M --> N[收益-代价排序]
    N --> O[推荐索引列表<br/>+ 预期收益]

    subgraph 约束条件
        P[最大索引数量限制]
        Q[写入比例阈值]
        R[存储空间预算]
    end

    N --> P & Q & R

2.2 索引搜索空间的规模

# 搜索空间分析
# 假设一张表有 N 个列,索引最多包含 K 列
# 单列索引:C(N,1) = N 种
# 2列联合索引:C(N,2) × 2! = N×(N-1) 种(列顺序不同是不同索引)
# K列联合索引:C(N,K) × K! 种

# 一张 20 列的表,最多 3 列联合索引
# 搜索空间 = C(20,1) + C(20,2)×2 + C(20,3)×6
#           = 20 + 380 + 6840 = 7240 种可能索引

# 全库 100 张表:724,000 种候选索引
# 需要高效的剪枝策略

三、AI 索引推荐系统实现

3.1 工作负载特征提取

import re
from dataclasses import dataclass
from typing import List

@dataclass
class QueryFeature:
    """查询特征提取结果"""
    query_id: str
    tables: List[str]
    predicate_columns: List[dict]   # [{table, column, op, selectivity}]
    join_columns: List[dict]        # [{left_table, left_col, right_table, right_col}]
    order_by_columns: List[dict]    # [{table, column, direction}]
    group_by_columns: List[dict]    # [{table, column}]
    select_columns: List[dict]      # [{table, column}]
    execution_time_ms: float
    scan_rows: int

class WorkloadAnalyzer:
    """从慢查询日志提取工作负载特征"""

    def analyze_slow_query_log(self, log_entries: list) -> list:
        """分析慢查询日志,提取每条查询的特征"""
        features = []

        for entry in log_entries:
            query = entry['query']
            feature = QueryFeature(
                query_id=entry['id'],
                tables=self._extract_tables(query),
                predicate_columns=self._extract_predicates(query),
                join_columns=self._extract_joins(query),
                order_by_columns=self._extract_order_by(query),
                group_by_columns=self._extract_group_by(query),
                select_columns=self._extract_select(query),
                execution_time_ms=entry.get('query_time', 0),
                scan_rows=entry.get('rows_examined', 0),
            )
            features.append(feature)

        return features

    def _extract_predicates(self, query: str) -> list:
        """提取 WHERE 子句中的谓词列"""
        predicates = []
        # 简化实现:正则匹配 column = value / column > value 等模式
        pattern = r'(\w+)\.(\w+)\s*(=|!=|>|<|>=|<=|LIKE|IN)\s*[\?\d\'"]'
        for match in re.finditer(pattern, query, re.IGNORECASE):
            predicates.append({
                'table': match.group(1),
                'column': match.group(2),
                'op': match.group(3).upper(),
            })
        return predicates

3.2 索引候选生成与代价评估

from itertools import combinations, permutations

class IndexRecommender:
    """索引推荐引擎"""

    def __init__(self, db_stats: dict, max_index_columns: int = 3):
        self.db_stats = db_stats
        self.max_columns = max_index_columns

    def generate_candidates(self, workload: list) -> list:
        """为工作负载生成索引候选"""
        candidates = {}

        for query in workload:
            # 等值谓词列优先级最高(索引最左前缀原则)
            eq_cols = [p for p in query.predicate_columns if p['op'] == '=']
            range_cols = [p for p in query.predicate_columns if p['op'] in ('>', '<', '>=', '<=')]
            join_cols = query.join_columns
            order_cols = query.order_by_columns
            group_cols = query.group_by_columns

            # 生成联合索引候选:等值 → 范围 → 排序/分组
            index_cols = (
                [c['column'] for c in eq_cols] +
                [c['column'] for c in range_cols] +
                [c['column'] for c in order_cols] +
                [c['column'] for c in group_cols]
            )

            # 截断到最大列数
            for k in range(1, min(self.max_columns + 1, len(index_cols) + 1)):
                for perm in permutations(index_cols[:k + 2], k):
                    # 只保留前缀有序的排列(等值列顺序不影响,但范围列必须最后)
                    key = perm
                    if key not in candidates:
                        candidates[key] = {
                            'columns': list(perm),
                            'benefited_queries': [],
                            'estimated_benefit': 0,
                        }
                    candidates[key]['benefited_queries'].append(query.query_id)

        return list(candidates.values())

    def estimate_benefit(self, candidate: dict, workload: list) -> dict:
        """估算索引的查询加速收益"""
        total_benefit_ms = 0

        for qid in candidate['benefited_queries']:
            query = next(q for q in workload if q.query_id == qid)
            # 简化模型:索引可将全表扫描变为索引扫描
            # 收益 = 扫描行数 × 每行扫描时间 - 索引查找时间
            full_scan_cost = query.scan_rows * 0.001  # 假设每行 0.001ms
            index_lookup_cost = 50 * 0.001  # B+树 3-4 层查找约 50 次比较
            benefit = max(0, full_scan_cost - index_lookup_cost)
            total_benefit_ms += benefit

        # 写入代价:每次 INSERT/UPDATE/DELETE 需维护索引
        table = candidate['columns'][0].split('.')[0] if '.' in candidate['columns'][0] else None
        write_ratio = self.db_stats.get(table, {}).get('write_ratio', 0.3)
        write_cost_per_txn = len(candidate['columns']) * 0.05  # 每列约 0.05ms 维护代价
        daily_writes = self.db_stats.get(table, {}).get('daily_writes', 10000)
        daily_write_cost_ms = daily_writes * write_cost_per_txn * write_ratio

        return {
            'columns': candidate['columns'],
            'query_benefit_ms': total_benefit_ms,
            'daily_write_cost_ms': daily_write_cost_ms,
            'net_benefit_ms': total_benefit_ms - daily_write_cost_ms,
            'benefited_query_count': len(candidate['benefited_queries']),
        }

    def rank_recommendations(self, candidates: list, top_k: int = 10) -> list:
        """按净收益排序,输出 Top-K 推荐"""
        evaluated = [self.estimate_benefit(c, []) for c in candidates]
        # 过滤负收益索引
        positive = [c for c in evaluated if c['net_benefit_ms'] > 0]
        # 按净收益降序排序
        positive.sort(key=lambda x: x['net_benefit_ms'], reverse=True)
        return positive[:top_k]

3.3 自动化索引创建与回滚

class IndexAutomation:
    """索引自动创建与回滚"""

    def __init__(self, db_connection, monitor):
        self.db = db_connection
        self.monitor = monitor

    def apply_recommendation(self, recommendation: dict) -> dict:
        """创建推荐索引并监控效果"""
        table = recommendation['table']
        columns = recommendation['columns']
        index_name = f"idx_auto_{'_'.join(columns)}"

        # 1. 创建前采集基线指标
        baseline = self.monitor.get_query_metrics(
            recommendation['benefited_query_count']
        )

        # 2. 使用 ONLINE 创建,不阻塞写入
        create_sql = (
            f"CREATE INDEX CONCURRENTLY {index_name} "
            f"ON {table} ({', '.join(columns)})"
        )

        try:
            self.db.execute(create_sql)
        except Exception as e:
            return {'status': 'failed', 'error': str(e)}

        # 3. 等待统计信息更新
        self.db.execute(f"ANALYZE {table}")

        # 4. 采集创建后指标
        after_metrics = self.monitor.get_query_metrics(
            recommendation['benefited_query_count']
        )

        # 5. 评估实际收益
        actual_improvement = (
            (baseline['avg_time_ms'] - after_metrics['avg_time_ms'])
            / baseline['avg_time_ms'] * 100
        )

        # 6. 收益不达预期则回滚
        if actual_improvement < 10:  # 改善低于 10% 则回滚
            self.db.execute(f"DROP INDEX CONCURRENTLY {index_name}")
            return {
                'status': 'rolled_back',
                'improvement_pct': actual_improvement,
                'reason': '收益不达预期',
            }

        return {
            'status': 'applied',
            'index_name': index_name,
            'improvement_pct': actual_improvement,
        }

四、边界分析与架构权衡

4.1 联合索引的列顺序敏感性

联合索引 (a, b, c)(b, a, c) 是不同的索引,适用不同的查询模式。AI 推荐系统需要根据谓词类型(等值 vs 范围)和频率确定最优列顺序。等值谓词列应在最左前缀,范围谓词列在最后。但实际工作负载中,同一列可能在不同查询中既作为等值谓词又作为范围谓词,列顺序的决策需要全局权衡。

4.2 冗余索引检测

(a, b)(a, b, c) 的前缀,前者是冗余索引。推荐系统需要检测并过滤冗余索引,避免浪费存储和写入带宽。但某些场景下,较短的前缀索引因为更紧凑(更多条目/页),在特定查询中反而更快。

4.3 写入密集场景的索引代价

在写入比例超过 70% 的表上,每个额外索引都会显著增加写入延迟。推荐系统需要设置写入代价阈值,在写入密集表上只推荐收益极高的索引,或建议使用部分索引(Partial Index)减少维护范围。

4.4 模型推荐的置信度

AI 推荐基于历史工作负载,对新型查询模式(如新上线的功能)无法预测。推荐结果应附带置信度评分,低置信度的建议需要 DBA 人工审核。

五、总结

AI 驱动的索引推荐系统将 DBA 的隐性知识显性化、自动化。从慢查询日志提取谓词、Join、排序特征,生成联合索引候选,通过代价模型评估查询加速收益和写入代价,输出按净收益排序的推荐列表。自动化流程包括在线创建、效果监控和收益不达预期时的自动回滚。工程实践中需注意联合索引列顺序的敏感性、冗余索引检测、写入密集场景的代价控制,以及模型推荐的置信度评估。AI 推荐最适合作为 DBA 的辅助工具,在高置信度场景下自动执行,低置信度场景下交由人工决策。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐