AI 驱动的 ClickHouse 物化视图智能推荐:从查询模式到预计算策略

一、ClickHouse 查询优化的核心矛盾:实时计算与响应延迟

ClickHouse 以列式存储和向量化执行引擎著称,单表扫描性能极强。但在实际业务中,随着数据量和查询复杂度的增长,性能瓶颈依然会出现。最常见的场景是:多维度聚合查询需要在数十亿行数据上执行 GROUP BY + 聚合函数,即使有 ClickHouse 的并行扫描能力,查询延迟也可能达到数秒甚至数十秒。

物化视图(Materialized View)是 ClickHouse 中解决这类问题的核心手段。物化视图在数据写入时自动触发预计算,将聚合结果持久化存储,查询时直接读取预计算结果,将秒级查询优化到毫秒级。但物化视图的难点不在于创建语法,而在于"创建什么"——需要分析查询模式,确定哪些维度组合值得预计算。

在一个拥有 200+ 张表的 ClickHouse 集群中,DBA 需要分析数千条查询日志,识别高频聚合模式,评估每个物化视图的收益(查询加速比)和成本(存储开销、写入放大)。这个过程耗时且容易遗漏。AI 驱动的物化视图智能推荐方案,通过分析查询日志自动提取聚合模式,并基于收益-成本模型推荐最优的物化视图组合。

二、查询模式分析与物化视图推荐的底层机制

2.1 查询日志解析与模式提取

ClickHouse 的 system.query_log 表记录了每条查询的完整信息:查询 SQL、执行时间、扫描行数、内存消耗等。推荐系统的第一步是从查询日志中提取聚合模式。

聚合模式的定义是:GROUP BY 字段集合 + 聚合函数集合。例如,SELECT city, product_category, SUM(amount) FROM orders GROUP BY city, product_category 的聚合模式是 {city, product_category} + {SUM(amount)}

flowchart TD
    A[system.query_log] --> B[SQL 解析器]
    B --> C[提取 GROUP BY 字段集合]
    B --> D[提取聚合函数集合]
    C --> E[聚合模式聚类]
    D --> E
    E --> F[模式频率统计]
    F --> G[收益-成本模型]
    G --> H[推荐物化视图列表]
    H --> I[生成 CREATE MATERIALIZED VIEW DDL]

    subgraph 收益-成本模型
        J[收益 = Σ 加速比 × 查询频率]
        K[成本 = 存储增量 + 写入放大率]
        L[综合评分 = 收益 / 成本]
    end

2.2 收益-成本模型

物化视图的收益计算公式:

收益 = Σ (原始查询时间 - 物化视图查询时间) × 查询频率

物化视图的成本包括两部分:

  • 存储增量:物化视图占用的磁盘空间。可以通过采样数据估算:取源表的 1% 数据,创建物化视图,测量其大小,然后线性外推。
  • 写入放大率:每条 INSERT 需要同时更新源表和所有物化视图。写入放大率 = 1 + 物化视图数量。如果 INSERT 吞吐量是瓶颈,写入放大是最大的成本。

综合评分 = 收益 / 成本,按评分降序排列,推荐评分最高的物化视图组合。

2.3 维度覆盖与视图合并

多个查询可能共享部分 GROUP BY 字段。例如,查询 A 按 {city, product_category} 聚合,查询 B 按 {city, date} 聚合。如果创建一个按 {city, product_category, date} 聚合的物化视图,可以同时满足查询 A 和 B 的需求——查询 A 在读取物化视图后做一次额外的 GROUP BY 即可。

但视图合并有代价:维度越多,物化视图的基数越大,存储成本越高。推荐系统需要在视图合并(减少物化视图数量)和维度拆分(降低单个视图的存储成本)之间找到平衡。

三、生产级代码实现

3.1 查询日志解析与模式提取

import re
from dataclasses import dataclass, field
from collections import defaultdict
from typing import List, Set, Tuple

@dataclass
class AggregationPattern:
    """聚合模式:GROUP BY 字段 + 聚合函数"""
    group_by_fields: Tuple[str, ...]
    agg_functions: Tuple[str, ...]
    query_count: int = 0          # 该模式出现的次数
    total_duration_ms: float = 0.0  # 总执行时间
    avg_scan_rows: float = 0.0      # 平均扫描行数

    @property
    def pattern_key(self) -> str:
        return f"GB:{','.join(sorted(self.group_by_fields))}|AGG:{','.join(sorted(self.agg_functions))}"

class QueryLogParser:
    """从 ClickHouse 查询日志中提取聚合模式"""

    def __init__(self):
        # 简化的 SQL 解析:提取 GROUP BY 和聚合函数
        self.agg_func_pattern = re.compile(
            r'\b(SUM|COUNT|AVG|MIN|MAX|UNIQ|ANY)\s*\(', re.IGNORECASE
        )
        self.group_by_pattern = re.compile(
            r'\bGROUP\s+BY\s+([^\s]+(?:\s*,\s*[^\s]+)*)', re.IGNORECASE
        )

    def parse_query(self, sql: str, duration_ms: float, scan_rows: float) -> AggregationPattern:
        """解析单条查询 SQL,提取聚合模式"""
        # 提取 GROUP BY 字段
        gb_match = self.group_by_pattern.search(sql)
        if not gb_match:
            return None
        group_by = tuple(f.strip() for f in gb_match.group(1).split(','))

        # 提取聚合函数
        agg_funcs = tuple(set(
            m.group(1).upper() for m in self.agg_func_pattern.finditer(sql)
        ))

        return AggregationPattern(
            group_by_fields=group_by,
            agg_functions=agg_funcs,
            query_count=1,
            total_duration_ms=duration_ms,
            avg_scan_rows=scan_rows,
        )

    def cluster_patterns(
        self, patterns: List[AggregationPattern]
    ) -> List[AggregationPattern]:
        """将相同模式的查询聚合"""
        cluster: dict[str, AggregationPattern] = {}

        for p in patterns:
            if p is None:
                continue
            key = p.pattern_key
            if key in cluster:
                existing = cluster[key]
                existing.query_count += p.query_count
                existing.total_duration_ms += p.total_duration_ms
                existing.avg_scan_rows = (
                    (existing.avg_scan_rows * (existing.query_count - 1)
                     + p.avg_scan_rows) / existing.query_count
                )
            else:
                cluster[key] = AggregationPattern(
                    group_by_fields=p.group_by_fields,
                    agg_functions=p.agg_functions,
                    query_count=p.query_count,
                    total_duration_ms=p.total_duration_ms,
                    avg_scan_rows=p.avg_scan_rows,
                )

        return sorted(cluster.values(), key=lambda x: x.total_duration_ms, reverse=True)

3.2 收益-成本评估模型

@dataclass
class MVRecommendation:
    """物化视图推荐结果"""
    pattern: AggregationPattern
    estimated_storage_gb: float
    write_amplification: float
    benefit_score: float
    cost_score: float
    composite_score: float
    ddl: str

class MVRecommender:
    """基于收益-成本模型的物化视图推荐"""

    def __init__(
        self,
        source_table: str,
        source_table_size_gb: float,
        source_table_rows: float,
        insert_qps: float,
        avg_insert_rows: float,
    ):
        self.source_table = source_table
        self.source_size_gb = source_table_size_gb
        self.source_rows = source_table_rows
        self.insert_qps = insert_qps
        self.avg_insert_rows = avg_insert_rows

    def recommend(
        self,
        patterns: List[AggregationPattern],
        top_k: int = 10,
    ) -> List[MVRecommendation]:
        """推荐 Top-K 物化视图"""
        recommendations = []

        for pattern in patterns:
            # 估算物化视图存储:基于维度基数和聚合粒度
            cardinality = self._estimate_cardinality(pattern)
            storage_gb = self._estimate_storage(cardinality, pattern)

            # 估算查询加速比
            speedup = self._estimate_speedup(pattern)

            # 收益 = 加速比 × 查询频率 × 平均查询时间
            benefit = speedup * pattern.query_count * (pattern.total_duration_ms / pattern.query_count)

            # 成本 = 存储增量 + 写入放大惩罚
            write_penalty = self.insert_qps * self.avg_insert_rows * 0.001  # 每行额外 0.001ms
            cost = storage_gb * 10 + write_penalty  # 存储权重 10

            composite = benefit / max(cost, 1.0)

            # 生成 DDL
            ddl = self._generate_ddl(pattern)

            recommendations.append(MVRecommendation(
                pattern=pattern,
                estimated_storage_gb=storage_gb,
                write_amplification=1.0 + storage_gb / self.source_size_gb,
                benefit_score=benefit,
                cost_score=cost,
                composite_score=composite,
                ddl=ddl,
            ))

        recommendations.sort(key=lambda x: x.composite_score, reverse=True)
        return recommendations[:top_k]

    def _estimate_cardinality(self, pattern: AggregationPattern) -> float:
        """估算 GROUP BY 字段组合的基数"""
        # 简化估算:假设每个字段的独立基数,组合基数为乘积的衰减
        per_field_cardinality = 1000  # 平均每个字段 1000 个不同值
        n_fields = len(pattern.group_by_fields)
        # 衰减因子:字段越多,实际基数越小于笛卡尔积
        decay = 0.3 ** (n_fields - 1)
        return (per_field_cardinality ** n_fields) * decay

    def _estimate_storage(self, cardinality: float, pattern: AggregationPattern) -> float:
        """估算物化视图存储大小(GB)"""
        bytes_per_row = 50 + len(pattern.group_by_fields) * 20 + len(pattern.agg_functions) * 16
        return (cardinality * bytes_per_row) / (1024 ** 3)

    def _estimate_speedup(self, pattern: AggregationPattern) -> float:
        """估算查询加速比"""
        if pattern.avg_scan_rows < 1_000_000:
            return 2.0  # 小数据量加速不明显
        elif pattern.avg_scan_rows < 100_000_000:
            return 10.0
        else:
            return 50.0  # 大数据量加速显著

    def _generate_ddl(self, pattern: AggregationPattern) -> str:
        """生成 CREATE MATERIALIZED VIEW DDL"""
        gb_fields = ', '.join(pattern.group_by_fields)
        agg_selects = []
        for func in pattern.agg_functions:
            if func == 'COUNT':
                agg_selects.append(f'{func}(*) AS cnt')
            else:
                agg_selects.append(f'{func}(value) AS {func.lower()}_val')
        agg_clause = ', '.join(agg_selects)

        return (
            f"CREATE MATERIALIZED VIEW mv_{self.source_table}_"
            f"{'_'.join(pattern.group_by_fields)}\n"
            f"ENGINE = SummingMergeTree()\n"
            f"ORDER BY ({gb_fields})\n"
            f"AS SELECT {gb_fields}, {agg_clause}\n"
            f"FROM {self.source_table}\n"
            f"GROUP BY {gb_fields}"
        )

四、Trade-offs:智能推荐的局限与风险

4.1 基数估算的不确定性

收益-成本模型的核心输入是维度基数的估算,而基数估算本身存在较大误差。如果实际基数远大于估算值,物化视图的存储成本会远超预期。解决方案是在创建物化视图前,用 SELECT COUNT(DISTINCT ...) 精确计算维度基数,但这本身就是一个重查询。

4.2 写入放大的累积效应

每增加一个物化视图,INSERT 延迟都会增加。在写入密集的场景下,10 个物化视图可能导致写入吞吐量下降 30-50%。推荐系统需要设置写入放大的上限,当总写入放大超过阈值时停止推荐新的物化视图。

4.3 适用边界

物化视图智能推荐适用于以下场景:查询模式相对稳定(以聚合查询为主)、数据量在亿级以上、有充足的查询日志可供分析。不适用于:点查询为主的场景(物化视图无收益)、查询模式频繁变化、写入延迟敏感的业务。

五、总结

AI 驱动的物化视图推荐,将 DBA 从手动分析查询日志的繁琐工作中解放出来。核心落地步骤如下:

  1. 采集查询日志:从 system.query_log 中提取聚合查询的 SQL、执行时间和扫描行数。
  2. 提取聚合模式:解析 SQL 中的 GROUP BY 字段和聚合函数,按模式聚类统计频率。
  3. 评估收益与成本:基于维度基数估算存储成本,基于扫描行数估算加速比,计算综合评分。
  4. 生成 DDL 并验证:自动生成 CREATE MATERIALIZED VIEW 语句,在测试环境验证加速效果。
  5. 监控写入影响:上线后持续监控 INSERT 延迟和存储增长,必要时回滚低收益的物化视图。

物化视图的本质是用写入时的预计算换取查询时的加速。AI 推荐的价值在于自动找到"收益最高、成本最低"的预计算策略,而非盲目创建视图。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐