上一篇【第16篇】Elasticsearch批量操作API——Bulk、Reindex与跨集群索引
下一篇【第18篇】Elasticsearch搜索入门——搜索API与URI查询模式


摘要

在分布式系统中,并发控制是保证数据一致性的关键。Elasticsearch作为分布式搜索引擎,提供了完善的并发控制机制来应对多线程、多节点环境下的数据竞争问题。本文深入解析了Elasticsearch并发控制的两大核心主题:refresh参数(控制数据可见性的三种模式——true立即刷新、false延迟刷新、wait_for等待刷新)的选择策略与性能影响,以及乐观并发控制(基于_seq_no和_primary_term的序列号机制)的完整用法。同时涵盖强制刷新的代价分析、if_seq_no + if_primary_term条件写入、外部版本号(version_type=external)、retry_on_conflict版本冲突重试策略以及高并发场景下的并发策略选型建议。掌握这些内容将帮助你构建可靠、高效的Elasticsearch应用。

一、refresh参数详解

1.1 数据可见性机制

在Elasticsearch中,索引操作(Index)、更新操作(Update)、删除操作(Delete)和批量操作(Bulk)写入的数据并不会立即对搜索可见。数据先被写入内存缓冲区,再定期刷新(refresh)到Lucene的段(Segment)中,只有被刷新到段中的数据才能被搜索到。

refresh 参数用于控制请求所做的更改何时对搜索可见。

1.2 refresh参数三个值

索引、更新、删除和批量API都支持 refresh 参数,允许的值如下:

1. true(或空)——立即刷新

操作发生后立即刷新相关的主分片和副本分片,使更新的文档立即可搜索。

PUT test/_doc/1?refresh=true
{
  "test": "test"
}
PUT test/_doc/2?refresh
{
  "test": "test"
}

2. false(默认)——不刷新

不执行与刷新相关的操作。请求所做的更改将在下一次自动刷新后可见(默认1秒)。

PUT test/_doc/3
{
  "test": "test"
}
PUT test/_doc/4?refresh=false
{
  "test": "test"
}

3. wait_for ——等待刷新

在返回结果之前,等待刷新使请求所做的更改可见。不会强制立即刷新,而是等待下一次自动刷新发生。

PUT test/_doc/5?refresh=wait_for
{
  "test": "test"
}

1.3 refresh参数对比

参数值 行为 可见性保证 性能影响 响应时间
true 立即强制刷新 立即可搜索 高(触发段创建) 较慢
false(默认) 不做任何操作 不保证 最低 最快
wait_for 等待下次自动刷新 等待后可见 取决于刷新间隔

1.4 如何选择refresh的值

默认选择:除非有充分的理由需要数据立即可见,否则始终使用 refresh=false(即不设置refresh参数)。

需要立即可见:当必须使请求所做的更改与请求同步可见时,选择 refresh=truerefresh=wait_for

场景 推荐值 原因
批量数据导入 false 性能最优,导入完成后手动刷新
实时写入+立即查询 truewait_for 保证写入后立即可搜索
一般业务写入 false 默认1秒内自动可见
单元测试 true 确保断言时数据可见

选择建议refresh=true 会增加系统负载,影响索引和搜索性能;refresh=wait_for 会增加响应等待时间。需要结合实际业务需求决定。

1.5 强制刷新的代价

自动刷新间隔:Elasticsearch自动刷新的频率由 index.refresh_interval 设置控制,默认为1秒。这个设置是动态的,可以随时修改:

PUT twitter/_settings
{
  "index.refresh_interval": "30s"
}

强制刷新的开销

  1. 段创建:每次刷新都会创建新的Lucene段,产生新的文件
  2. 资源消耗:频繁刷新会增加CPU和I/O负担
  3. 段合并压力:大量小段会触发后台段合并,进一步消耗资源
  4. max_refresh_listeners限制:默认为1000个,当等待刷新的请求达到此数量时,新请求会强制刷新
PUT test/_doc/6?refresh=wait_for
{
  "test": "test"
}

如果因监听器槽用完而强制刷新,响应中会包含 "forced_refresh": true

注意:批量请求在每个分片上只占用一个刷新监听器槽,不管它修改分片多少次。这与单条请求相比,是批量操作的另一个优势。

手动强制刷新:可以通过API手动刷新整个索引:

POST twitter/_refresh

或刷新所有索引:

POST _refresh

最佳实践:批量导入数据时,建议先关闭自动刷新(refresh_interval: -1),导入完成后再手动刷新并恢复自动刷新。

二、乐观并发控制原理

2.1 为什么需要并发控制

Elasticsearch是分布式系统,创建、更新或删除文档时,必须将文档的新版本复制到集群中的其他节点。同时,Elasticsearch也是异步和并发的,复制请求是并行发送的,并且可能不按顺序到达目的地。

Elasticsearch需要一种机制来确保旧版本的文档永远不会覆盖新版本的文档。

2.2 传统的版本号机制

在早期版本中,Elasticsearch使用 _version 字段进行并发控制。每次文档更新时 _version 递增,通过指定版本号来确保操作的是正确版本:

PUT twitter/_doc/1?version=2
{
  "message": "updated message"
}

如果当前版本不是2,操作将返回版本冲突错误。

2.3 序列号机制(_seq_no + _primary_term)

从Elasticsearch 6.x/7.x开始,引入了更可靠的序列号机制替代传统版本号。对文档执行的每个操作都由主分片分配一个序列号(_seq_no),序列号随每个操作递增,确保新操作的序列号一定比旧操作高。

_primary_term 用于标识主分片的任期,每当主分片发生重新分配时(如重启、Primary选举等),_primary_term 递增1。

创建文档时自动分配

PUT products/_doc/1567
{
  "product": "phone"
}

响应中包含分配的序列号:

{
  "_index": "products",
  "_type": "_doc",
  "_id": "1567",
  "_version": 1,
  "result": "created",
  "_seq_no": 0,
  "_primary_term": 1,
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

GET API返回序列号

GET products/_doc/1567
{
  "_index": "products",
  "_type": "_doc",
  "_id": "1567",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "product": "phone"
  }
}

搜索API返回序列号:设置 seq_no_primary_term 参数可以在搜索结果中返回每个命中文档的序列号:

GET products/_search?seq_no_primary_term=true
{
  "query": {
    "match": {
      "product": "phone"
    }
  }
}

三、条件写入(if_seq_no + if_primary_term)

3.1 基本用法

_seq_no_primary_term 唯一标识一个变更。通过记录这两个值,可以确保在获取文档后没有被其他操作修改的前提下才执行变更。

PUT products/_doc/1567?if_seq_no=0&if_primary_term=1
{
  "product": "phone",
  "tag": "new"
}

如果序列号匹配,操作成功;如果不匹配(表示文档已被其他操作修改),操作返回版本冲突错误:

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[1567]: version conflict"
      }
    ]
  },
  "status": 409
}

3.2 条件写入流程

1. GET 文档 → 获取 _seq_no=0, _primary_term=1
2. 修改文档内容(客户端)
3. PUT 文档?if_seq_no=0&if_primary_term=1
   ├─ 成功 → 序列号匹配,写入成功
   └─ 失败 → 序列号不匹配,需要重新GET并重试

3.3 条件删除

同样可以用于条件删除:

DELETE products/_doc/1567?if_seq_no=0&if_primary_term=1

四、外部版本号

4.1 version_type=external

当使用外部版本控制系统(如数据库自增ID、时间戳等)时,可以使用 version_type=external 参数:

PUT twitter/_doc/1?version=2&version_type=external
{
  "message": "updated with external version"
}

4.2 外部版本号的行为

参数 行为
version_type=internal(默认) 版本号必须严格等于当前版本+1
version_type=external 版本号必须大于当前版本号
version_type=external_gte 版本号必须大于等于当前版本号

4.3 版本控制策略对比

策略 参数 并发保证 适用场景
序列号机制 if_seq_no + if_primary_term 最强(唯一标识变更) ES内部并发控制(推荐)
内部版本号 version 版本严格递增 旧版兼容
外部版本号 version_type=external 版本单调递增 外部系统同步
无控制 不指定参数 无保证 单线程写入

五、retry_on_conflict重试策略

5.1 版本冲突的重试

在高并发场景下,多个客户端可能同时读取并尝试更新同一个文档,导致版本冲突。retry_on_conflict 参数指定在发生版本冲突时自动重试的次数。

Update API中的使用

POST twitter/_update/1?retry_on_conflict=3
{
  "script": {
    "source": "ctx._source.counter += params.count",
    "lang": "painless",
    "params": {
      "count": 1
    }
  }
}

Bulk API中的使用

POST _bulk
{"update":{"_id":"1","_index":"test","retry_on_conflict":3}}
{"script":{"source":"ctx._source.counter += 1","lang":"painless"}}

5.2 重试次数建议

场景 推荐重试次数 说明
低并发写入 0-1 冲突概率低
中等并发写入 3-5 一般业务场景
高并发写入 5-10 热点数据更新

注意retry_on_conflict 仅适用于Update API。对于Index API的条件写入(if_seq_no),需要由客户端自行实现重试逻辑。

5.3 客户端重试模式示例

对于使用 if_seq_no + if_primary_term 的场景,客户端需要自行实现重试:

max_retries = 3
for i in range(max_retries):
    doc = GET /index/_doc/id
    seq_no = doc._seq_no
    primary_term = doc._primary_term
    # 修改文档内容
    doc._source.field = new_value
    response = PUT /index/_doc/id?if_seq_no=seq_no&if_primary_term=primary_term
    if response.status == 200:
        break  # 成功
    elif response.status == 409:
        continue  # 版本冲突,重试
    else:
        raise Exception("操作失败")
else:
    raise Exception("超过最大重试次数")

六、高并发场景下的并发策略选型

6.1 策略选择决策树

是否需要防止并发覆盖?
├── 否 → 不使用并发控制(默认行为)
└── 是
    ├── ES内部操作 → 使用 if_seq_no + if_primary_term
    ├── 外部系统同步 → 使用 version_type=external
    └── Update API → 使用 retry_on_conflict

6.2 不同场景的推荐策略

场景 推荐策略 说明
单文档计数器更新 retry_on_conflict + 脚本 自动重试,简单可靠
读-改-写模式 if_seq_no + if_primary_term + 客户端重试 最严格的并发保证
数据库同步 version_type=external 使用数据库版本号
批量导入 无并发控制 单线程顺序写入
热点文档 路由+分片 减少单分片压力

6.3 数据刷新与可见性策略对比

策略 数据安全性 查询实时性 性能 适用场景
refresh=false 最高 最差(延迟1s) 最优 批量导入
refresh=wait_for 中等(等待刷新) 中等 需要确认可见
refresh=true 中等 最好(即时可见) 最差 测试/调试

七、总结与最佳实践

7.1 核心要点回顾

  1. refresh参数 控制数据的搜索可见性,false(默认)性能最优,true 实时性最强
  2. 强制刷新代价高昂,会导致频繁创建段和触发段合并,应谨慎使用
  3. _seq_no + _primary_term 是Elasticsearch推荐的并发控制机制,替代了传统的_version
  4. if_seq_no + if_primary_term 条件写入提供最强的并发保证,适合读-改-写模式
  5. 外部版本号 适用于与外部系统集成的场景,版本号单调递增即可
  6. retry_on_conflict 是Update API的高并发利器,自动处理版本冲突

7.2 生产环境最佳实践

  • 默认使用refresh=false:除非业务明确需要实时可见,否则不要设置refresh参数
  • 批量导入优化:临时关闭自动刷新(refresh_interval: -1),导入完成后再恢复
  • 优先使用序列号机制:新项目应使用 if_seq_no + if_primary_term 而非 _version
  • 合理设置重试次数:根据并发量设置 retry_on_conflict,避免无限重试
  • 分而治之:对于极高并发的热点文档,考虑使用自定义路由分散到更多分片

上一篇【第16篇】Elasticsearch批量操作API——Bulk、Reindex与跨集群索引
下一篇【第18篇】Elasticsearch搜索入门——搜索API与URI查询模式


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐