AI 驱动的 ClickHouse 物化视图智能推荐:从查询模式到预计算策略
AI 驱动的 ClickHouse 物化视图智能推荐:从查询模式到预计算策略
一、ClickHouse 查询优化的核心矛盾:实时计算与响应延迟
ClickHouse 以列式存储和向量化执行引擎著称,单表扫描性能极强。但在实际业务中,随着数据量和查询复杂度的增长,性能瓶颈依然会出现。最常见的场景是:多维度聚合查询需要在数十亿行数据上执行 GROUP BY + 聚合函数,即使有 ClickHouse 的并行扫描能力,查询延迟也可能达到数秒甚至数十秒。
物化视图(Materialized View)是 ClickHouse 中解决这类问题的核心手段。物化视图在数据写入时自动触发预计算,将聚合结果持久化存储,查询时直接读取预计算结果,将秒级查询优化到毫秒级。但物化视图的难点不在于创建语法,而在于"创建什么"——需要分析查询模式,确定哪些维度组合值得预计算。
在一个拥有 200+ 张表的 ClickHouse 集群中,DBA 需要分析数千条查询日志,识别高频聚合模式,评估每个物化视图的收益(查询加速比)和成本(存储开销、写入放大)。这个过程耗时且容易遗漏。AI 驱动的物化视图智能推荐方案,通过分析查询日志自动提取聚合模式,并基于收益-成本模型推荐最优的物化视图组合。
二、查询模式分析与物化视图推荐的底层机制
2.1 查询日志解析与模式提取
ClickHouse 的 system.query_log 表记录了每条查询的完整信息:查询 SQL、执行时间、扫描行数、内存消耗等。推荐系统的第一步是从查询日志中提取聚合模式。
聚合模式的定义是:GROUP BY 字段集合 + 聚合函数集合。例如,SELECT city, product_category, SUM(amount) FROM orders GROUP BY city, product_category 的聚合模式是 {city, product_category} + {SUM(amount)}。
flowchart TD
A[system.query_log] --> B[SQL 解析器]
B --> C[提取 GROUP BY 字段集合]
B --> D[提取聚合函数集合]
C --> E[聚合模式聚类]
D --> E
E --> F[模式频率统计]
F --> G[收益-成本模型]
G --> H[推荐物化视图列表]
H --> I[生成 CREATE MATERIALIZED VIEW DDL]
subgraph 收益-成本模型
J[收益 = Σ 加速比 × 查询频率]
K[成本 = 存储增量 + 写入放大率]
L[综合评分 = 收益 / 成本]
end
2.2 收益-成本模型
物化视图的收益计算公式:
收益 = Σ (原始查询时间 - 物化视图查询时间) × 查询频率
物化视图的成本包括两部分:
- 存储增量:物化视图占用的磁盘空间。可以通过采样数据估算:取源表的 1% 数据,创建物化视图,测量其大小,然后线性外推。
- 写入放大率:每条 INSERT 需要同时更新源表和所有物化视图。写入放大率 = 1 + 物化视图数量。如果 INSERT 吞吐量是瓶颈,写入放大是最大的成本。
综合评分 = 收益 / 成本,按评分降序排列,推荐评分最高的物化视图组合。
2.3 维度覆盖与视图合并
多个查询可能共享部分 GROUP BY 字段。例如,查询 A 按 {city, product_category} 聚合,查询 B 按 {city, date} 聚合。如果创建一个按 {city, product_category, date} 聚合的物化视图,可以同时满足查询 A 和 B 的需求——查询 A 在读取物化视图后做一次额外的 GROUP BY 即可。
但视图合并有代价:维度越多,物化视图的基数越大,存储成本越高。推荐系统需要在视图合并(减少物化视图数量)和维度拆分(降低单个视图的存储成本)之间找到平衡。
三、生产级代码实现
3.1 查询日志解析与模式提取
import re
from dataclasses import dataclass, field
from collections import defaultdict
from typing import List, Set, Tuple
@dataclass
class AggregationPattern:
"""聚合模式:GROUP BY 字段 + 聚合函数"""
group_by_fields: Tuple[str, ...]
agg_functions: Tuple[str, ...]
query_count: int = 0 # 该模式出现的次数
total_duration_ms: float = 0.0 # 总执行时间
avg_scan_rows: float = 0.0 # 平均扫描行数
@property
def pattern_key(self) -> str:
return f"GB:{','.join(sorted(self.group_by_fields))}|AGG:{','.join(sorted(self.agg_functions))}"
class QueryLogParser:
"""从 ClickHouse 查询日志中提取聚合模式"""
def __init__(self):
# 简化的 SQL 解析:提取 GROUP BY 和聚合函数
self.agg_func_pattern = re.compile(
r'\b(SUM|COUNT|AVG|MIN|MAX|UNIQ|ANY)\s*\(', re.IGNORECASE
)
self.group_by_pattern = re.compile(
r'\bGROUP\s+BY\s+([^\s]+(?:\s*,\s*[^\s]+)*)', re.IGNORECASE
)
def parse_query(self, sql: str, duration_ms: float, scan_rows: float) -> AggregationPattern:
"""解析单条查询 SQL,提取聚合模式"""
# 提取 GROUP BY 字段
gb_match = self.group_by_pattern.search(sql)
if not gb_match:
return None
group_by = tuple(f.strip() for f in gb_match.group(1).split(','))
# 提取聚合函数
agg_funcs = tuple(set(
m.group(1).upper() for m in self.agg_func_pattern.finditer(sql)
))
return AggregationPattern(
group_by_fields=group_by,
agg_functions=agg_funcs,
query_count=1,
total_duration_ms=duration_ms,
avg_scan_rows=scan_rows,
)
def cluster_patterns(
self, patterns: List[AggregationPattern]
) -> List[AggregationPattern]:
"""将相同模式的查询聚合"""
cluster: dict[str, AggregationPattern] = {}
for p in patterns:
if p is None:
continue
key = p.pattern_key
if key in cluster:
existing = cluster[key]
existing.query_count += p.query_count
existing.total_duration_ms += p.total_duration_ms
existing.avg_scan_rows = (
(existing.avg_scan_rows * (existing.query_count - 1)
+ p.avg_scan_rows) / existing.query_count
)
else:
cluster[key] = AggregationPattern(
group_by_fields=p.group_by_fields,
agg_functions=p.agg_functions,
query_count=p.query_count,
total_duration_ms=p.total_duration_ms,
avg_scan_rows=p.avg_scan_rows,
)
return sorted(cluster.values(), key=lambda x: x.total_duration_ms, reverse=True)
3.2 收益-成本评估模型
@dataclass
class MVRecommendation:
"""物化视图推荐结果"""
pattern: AggregationPattern
estimated_storage_gb: float
write_amplification: float
benefit_score: float
cost_score: float
composite_score: float
ddl: str
class MVRecommender:
"""基于收益-成本模型的物化视图推荐"""
def __init__(
self,
source_table: str,
source_table_size_gb: float,
source_table_rows: float,
insert_qps: float,
avg_insert_rows: float,
):
self.source_table = source_table
self.source_size_gb = source_table_size_gb
self.source_rows = source_table_rows
self.insert_qps = insert_qps
self.avg_insert_rows = avg_insert_rows
def recommend(
self,
patterns: List[AggregationPattern],
top_k: int = 10,
) -> List[MVRecommendation]:
"""推荐 Top-K 物化视图"""
recommendations = []
for pattern in patterns:
# 估算物化视图存储:基于维度基数和聚合粒度
cardinality = self._estimate_cardinality(pattern)
storage_gb = self._estimate_storage(cardinality, pattern)
# 估算查询加速比
speedup = self._estimate_speedup(pattern)
# 收益 = 加速比 × 查询频率 × 平均查询时间
benefit = speedup * pattern.query_count * (pattern.total_duration_ms / pattern.query_count)
# 成本 = 存储增量 + 写入放大惩罚
write_penalty = self.insert_qps * self.avg_insert_rows * 0.001 # 每行额外 0.001ms
cost = storage_gb * 10 + write_penalty # 存储权重 10
composite = benefit / max(cost, 1.0)
# 生成 DDL
ddl = self._generate_ddl(pattern)
recommendations.append(MVRecommendation(
pattern=pattern,
estimated_storage_gb=storage_gb,
write_amplification=1.0 + storage_gb / self.source_size_gb,
benefit_score=benefit,
cost_score=cost,
composite_score=composite,
ddl=ddl,
))
recommendations.sort(key=lambda x: x.composite_score, reverse=True)
return recommendations[:top_k]
def _estimate_cardinality(self, pattern: AggregationPattern) -> float:
"""估算 GROUP BY 字段组合的基数"""
# 简化估算:假设每个字段的独立基数,组合基数为乘积的衰减
per_field_cardinality = 1000 # 平均每个字段 1000 个不同值
n_fields = len(pattern.group_by_fields)
# 衰减因子:字段越多,实际基数越小于笛卡尔积
decay = 0.3 ** (n_fields - 1)
return (per_field_cardinality ** n_fields) * decay
def _estimate_storage(self, cardinality: float, pattern: AggregationPattern) -> float:
"""估算物化视图存储大小(GB)"""
bytes_per_row = 50 + len(pattern.group_by_fields) * 20 + len(pattern.agg_functions) * 16
return (cardinality * bytes_per_row) / (1024 ** 3)
def _estimate_speedup(self, pattern: AggregationPattern) -> float:
"""估算查询加速比"""
if pattern.avg_scan_rows < 1_000_000:
return 2.0 # 小数据量加速不明显
elif pattern.avg_scan_rows < 100_000_000:
return 10.0
else:
return 50.0 # 大数据量加速显著
def _generate_ddl(self, pattern: AggregationPattern) -> str:
"""生成 CREATE MATERIALIZED VIEW DDL"""
gb_fields = ', '.join(pattern.group_by_fields)
agg_selects = []
for func in pattern.agg_functions:
if func == 'COUNT':
agg_selects.append(f'{func}(*) AS cnt')
else:
agg_selects.append(f'{func}(value) AS {func.lower()}_val')
agg_clause = ', '.join(agg_selects)
return (
f"CREATE MATERIALIZED VIEW mv_{self.source_table}_"
f"{'_'.join(pattern.group_by_fields)}\n"
f"ENGINE = SummingMergeTree()\n"
f"ORDER BY ({gb_fields})\n"
f"AS SELECT {gb_fields}, {agg_clause}\n"
f"FROM {self.source_table}\n"
f"GROUP BY {gb_fields}"
)
四、Trade-offs:智能推荐的局限与风险
4.1 基数估算的不确定性
收益-成本模型的核心输入是维度基数的估算,而基数估算本身存在较大误差。如果实际基数远大于估算值,物化视图的存储成本会远超预期。解决方案是在创建物化视图前,用 SELECT COUNT(DISTINCT ...) 精确计算维度基数,但这本身就是一个重查询。
4.2 写入放大的累积效应
每增加一个物化视图,INSERT 延迟都会增加。在写入密集的场景下,10 个物化视图可能导致写入吞吐量下降 30-50%。推荐系统需要设置写入放大的上限,当总写入放大超过阈值时停止推荐新的物化视图。
4.3 适用边界
物化视图智能推荐适用于以下场景:查询模式相对稳定(以聚合查询为主)、数据量在亿级以上、有充足的查询日志可供分析。不适用于:点查询为主的场景(物化视图无收益)、查询模式频繁变化、写入延迟敏感的业务。
五、总结
AI 驱动的物化视图推荐,将 DBA 从手动分析查询日志的繁琐工作中解放出来。核心落地步骤如下:
- 采集查询日志:从
system.query_log中提取聚合查询的 SQL、执行时间和扫描行数。 - 提取聚合模式:解析 SQL 中的 GROUP BY 字段和聚合函数,按模式聚类统计频率。
- 评估收益与成本:基于维度基数估算存储成本,基于扫描行数估算加速比,计算综合评分。
- 生成 DDL 并验证:自动生成 CREATE MATERIALIZED VIEW 语句,在测试环境验证加速效果。
- 监控写入影响:上线后持续监控 INSERT 延迟和存储增长,必要时回滚低收益的物化视图。
物化视图的本质是用写入时的预计算换取查询时的加速。AI 推荐的价值在于自动找到"收益最高、成本最低"的预计算策略,而非盲目创建视图。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)