突破 Elasticsearch 性能天花板:ELK 优化海量并发日志吞吐的工程机制

信息图

一、第一层瓶颈是I/O,第二层瓶颈是什么?

1.1 ES线程池模型

在Elasticsearch中,不同类型的操作由不同的线程池处理:

线程池 职责 队列类型 默认队列大小
write bulk写入 fixed 200
search 查询 fixed 1000
get 实时获取 fixed 1000
analyze 分析 fixed 16
refresh 刷新 scaling -
force_merge 段合并 fixed 1

当我们的写入吞吐达到新高后,write线程池的队列深度频繁超过200,导致新到来的bulk请求被拒绝:

# 查看ES线程池状态
curl -s 'http://es-data-01:9200/_cat/thread_pool/write?v&h=node_name,name,active,queue,rejected'

# 输出
node     name   active queue rejected
es-data-01 write      8    212       43
es-data-02 write      8    198       28
es-data-03 write      8    235       56

rejected列不为0,说明有请求被丢弃了——这不是丢日志,而是性能瓶颈的明确信号。

二、写入线程池调优

2.1 动态调整线程池大小

// ES集群设置 — 调整write线程池
PUT /_cluster/settings
{
  "persistent": {
    "thread_pool.write.size": 16,
    "thread_pool.write.queue_size": 1000
  }
}

把write线程数从默认的(CPU核数)调整为16,队列从200增加到1000。

但注意:线程数不是越大越好。线程数的上限取决于磁盘的并发IOPS能力。我们的NVMe SSD的随机写IOPS约500K,16个线程足够压满。

2.2 线程池监控与动态扩缩

我们写了一个监控脚本,当检测到rejected请求时自动调整:

# threadpool_autoscaler.py — 自动扩缩线程池
import requests
import time
import json

ES_HOST = 'http://es-data-01:9200'

class ThreadPoolAutoScaler:
    """ES线程池自动扩缩器"""
    
    def __init__(self, pool_name='write'):
        self.pool_name = pool_name
        self.min_size = 8
        self.max_size = 32
        self.current_size = 8
    
    def get_pool_stats(self):
        """获取线程池状态"""
        resp = requests.get(f'{ES_HOST}/_cat/thread_pool/{self.pool_name}'
                           f'?v&h=node_name,active,queue,rejected&format=json')
        return resp.json()
    
    def scale(self):
        """根据负载自动调整线程池大小"""
        stats = self.get_pool_stats()
        
        total_rejected = sum(int(node['rejected']) for node in stats)
        total_queue = sum(int(node['queue']) for node in stats)
        
        # 扩容条件:有rejected或队列深度超过阈值
        if total_rejected > 0 or total_queue > 500:
            new_size = min(self.current_size + 4, self.max_size)
            if new_size != self.current_size:
                self._apply_settings(new_size)
                self.current_size = new_size
                print(f"[SCALE UP] thread_pool.{self.pool_name}.size: {self.current_size} → {new_size}")
        
        # 缩容条件:队列空闲且无rejected
        elif total_queue == 0 and total_rejected == 0 and self.current_size > self.min_size:
            new_size = max(self.current_size - 2, self.min_size)
            if new_size != self.current_size:
                self._apply_settings(new_size)
                self.current_size = new_size
                print(f"[SCALE DOWN] thread_pool.{self.pool_name}.size: {self.current_size} → {new_size}")
    
    def _apply_settings(self, size):
        """应用ES设置"""
        payload = {
            "persistent": {
                f"thread_pool.{self.pool_name}.size": size
            }
        }
        requests.put(f'{ES_HOST}/_cluster/settings', 
                     json=payload,
                     headers={'Content-Type': 'application/json'})

scaler = ThreadPoolAutoScaler()
while True:
    scaler.scale()
    time.sleep(60)  # 每分钟检查一次

三、分片策略的再优化

3.1 分片大小的黄金法则

Elasticsearch社区有一个广泛接受的分片大小建议:每个分片20-50GB。但我们之前的索引因为数据量增长,分片已经膨胀到80GB+。

// ILM策略 — 按分片大小自动rollover
PUT /_ilm/policy/logs_rollover_policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "40GB",
            "max_age": "1d"
          },
          "set_priority": {
            "priority": 100
          }
        }
      }
    }
  }
}

// 应用到索引模板
PUT /_index_template/logs_template
{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 5,
      "number_of_replicas": 1,
      "routing.allocation.total_shards_per_node": 3,
      "sort.field": "@timestamp",
      "sort.order": "desc"
    }
  },
  "composed_of": ["logs_rollover_policy"]
}

关键调整:

  1. max_size: 40GB:分片到40GB就rollover
  2. routing.allocation.total_shards_per_node: 3:每个节点最多3个分片,防止热点
  3. sort.field: @timestamp:按时间排序,提高时间范围查询效率

3.2 Routing优化

对于日志场景,我们不需要跨分片做聚合查询时,可以指定路由:

# Logstash输出 — 按服务名路由
output {
  elasticsearch {
    hosts => ["es:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    # 按服务名路由,同服务的日志落到同一个分片
    document_id => "%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"
    routing => "%{[service][name]}"
  }
}

在查询时也指定路由:

GET logs-2026.06.01/_search?routing=payment
{
  "query": {
    "match": {
      "service": "payment"
    }
  }
}

路由带来的性能提升:查询只扫描1个分片而不是5个分片,性能提升约5倍。

四、深度优化:索引排序与分段合并

4.1 索引排序

ES 7.x+支持索引级别的排序,将同类型数据物理上相邻存储:

{
  "settings": {
    "index.sort.field": "@timestamp",
    "index.sort.order": "desc"
  }
}

按时间倒序排序后,最近的日志在段的前部,查询最新日志时只需要扫描少量的段。Grafana看板中对最近1小时的查询,性能提升了60%。

4.2 强制合并调度

定期对Warm阶段的索引做force merge,减少段数量:

// ILM Warm阶段 — force merge到1个段
{
  "warm": {
    "min_age": "7d",
    "actions": {
      "forcemerge": {
        "max_num_segments": 1
      },
      "shrink": {
        "number_of_shards": 1
      },
      "allocate": {
        "number_of_replicas": 1,
        "require": {
          "box_type": "warm"
        }
      }
    }
  }
}

force merge之后,索引从50+段合并为1个段,查询性能提示约40%,磁盘占用减少15%(因为去掉了删除标记)。

五、优化效果

指标 第一轮优化后 第二轮优化后 提升
ES写入吞吐 180MB/s 320MB/s 78%
bulk拒绝率 0.3% 0% 100%
写入P99延迟 550ms 180ms 67%
查询P99延迟 220ms 95ms 57%
分片平均大小 80GB 35GB 56%
段数量/索引 50+ 1(force merge后) 98%

结语

ELK优化是一个持续迭代的过程。第一轮解决的是"磁盘I/O打满"的显性问题,第二轮解决的是"线程池和分片"的结构性问题。

我发现很多团队在做了第一轮优化(调refresh_interval、translog)之后就停下来了。但其实当业务量继续增长时,线程池模型、分片策略、索引排序这些更深层的优化机制才是支撑更高并发的关键。

记住一句话:能扛住当前2倍流量的系统,才算优化完成

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐