【Elasticsearch从入门到精通】第17篇:Elasticsearch并发控制——refresh参数与乐观并发控制
上一篇【第16篇】Elasticsearch批量操作API——Bulk、Reindex与跨集群索引
下一篇【第18篇】Elasticsearch搜索入门——搜索API与URI查询模式
摘要
在分布式系统中,并发控制是保证数据一致性的关键。Elasticsearch作为分布式搜索引擎,提供了完善的并发控制机制来应对多线程、多节点环境下的数据竞争问题。本文深入解析了Elasticsearch并发控制的两大核心主题:refresh参数(控制数据可见性的三种模式——true立即刷新、false延迟刷新、wait_for等待刷新)的选择策略与性能影响,以及乐观并发控制(基于_seq_no和_primary_term的序列号机制)的完整用法。同时涵盖强制刷新的代价分析、if_seq_no + if_primary_term条件写入、外部版本号(version_type=external)、retry_on_conflict版本冲突重试策略以及高并发场景下的并发策略选型建议。掌握这些内容将帮助你构建可靠、高效的Elasticsearch应用。
一、refresh参数详解
1.1 数据可见性机制
在Elasticsearch中,索引操作(Index)、更新操作(Update)、删除操作(Delete)和批量操作(Bulk)写入的数据并不会立即对搜索可见。数据先被写入内存缓冲区,再定期刷新(refresh)到Lucene的段(Segment)中,只有被刷新到段中的数据才能被搜索到。
refresh 参数用于控制请求所做的更改何时对搜索可见。
1.2 refresh参数三个值
索引、更新、删除和批量API都支持 refresh 参数,允许的值如下:
1. true(或空)——立即刷新
操作发生后立即刷新相关的主分片和副本分片,使更新的文档立即可搜索。
PUT test/_doc/1?refresh=true
{
"test": "test"
}
PUT test/_doc/2?refresh
{
"test": "test"
}
2. false(默认)——不刷新
不执行与刷新相关的操作。请求所做的更改将在下一次自动刷新后可见(默认1秒)。
PUT test/_doc/3
{
"test": "test"
}
PUT test/_doc/4?refresh=false
{
"test": "test"
}
3. wait_for ——等待刷新
在返回结果之前,等待刷新使请求所做的更改可见。不会强制立即刷新,而是等待下一次自动刷新发生。
PUT test/_doc/5?refresh=wait_for
{
"test": "test"
}
1.3 refresh参数对比
| 参数值 | 行为 | 可见性保证 | 性能影响 | 响应时间 |
|---|---|---|---|---|
true |
立即强制刷新 | 立即可搜索 | 高(触发段创建) | 较慢 |
false(默认) |
不做任何操作 | 不保证 | 最低 | 最快 |
wait_for |
等待下次自动刷新 | 等待后可见 | 低 | 取决于刷新间隔 |
1.4 如何选择refresh的值
默认选择:除非有充分的理由需要数据立即可见,否则始终使用 refresh=false(即不设置refresh参数)。
需要立即可见:当必须使请求所做的更改与请求同步可见时,选择 refresh=true 或 refresh=wait_for。
| 场景 | 推荐值 | 原因 |
|---|---|---|
| 批量数据导入 | false |
性能最优,导入完成后手动刷新 |
| 实时写入+立即查询 | true 或 wait_for |
保证写入后立即可搜索 |
| 一般业务写入 | false |
默认1秒内自动可见 |
| 单元测试 | true |
确保断言时数据可见 |
选择建议:
refresh=true会增加系统负载,影响索引和搜索性能;refresh=wait_for会增加响应等待时间。需要结合实际业务需求决定。
1.5 强制刷新的代价
自动刷新间隔:Elasticsearch自动刷新的频率由 index.refresh_interval 设置控制,默认为1秒。这个设置是动态的,可以随时修改:
PUT twitter/_settings
{
"index.refresh_interval": "30s"
}
强制刷新的开销:
- 段创建:每次刷新都会创建新的Lucene段,产生新的文件
- 资源消耗:频繁刷新会增加CPU和I/O负担
- 段合并压力:大量小段会触发后台段合并,进一步消耗资源
- max_refresh_listeners限制:默认为1000个,当等待刷新的请求达到此数量时,新请求会强制刷新
PUT test/_doc/6?refresh=wait_for
{
"test": "test"
}
如果因监听器槽用完而强制刷新,响应中会包含 "forced_refresh": true。
注意:批量请求在每个分片上只占用一个刷新监听器槽,不管它修改分片多少次。这与单条请求相比,是批量操作的另一个优势。
手动强制刷新:可以通过API手动刷新整个索引:
POST twitter/_refresh
或刷新所有索引:
POST _refresh
最佳实践:批量导入数据时,建议先关闭自动刷新(
refresh_interval: -1),导入完成后再手动刷新并恢复自动刷新。
二、乐观并发控制原理
2.1 为什么需要并发控制
Elasticsearch是分布式系统,创建、更新或删除文档时,必须将文档的新版本复制到集群中的其他节点。同时,Elasticsearch也是异步和并发的,复制请求是并行发送的,并且可能不按顺序到达目的地。
Elasticsearch需要一种机制来确保旧版本的文档永远不会覆盖新版本的文档。
2.2 传统的版本号机制
在早期版本中,Elasticsearch使用 _version 字段进行并发控制。每次文档更新时 _version 递增,通过指定版本号来确保操作的是正确版本:
PUT twitter/_doc/1?version=2
{
"message": "updated message"
}
如果当前版本不是2,操作将返回版本冲突错误。
2.3 序列号机制(_seq_no + _primary_term)
从Elasticsearch 6.x/7.x开始,引入了更可靠的序列号机制替代传统版本号。对文档执行的每个操作都由主分片分配一个序列号(_seq_no),序列号随每个操作递增,确保新操作的序列号一定比旧操作高。
_primary_term 用于标识主分片的任期,每当主分片发生重新分配时(如重启、Primary选举等),_primary_term 递增1。
创建文档时自动分配:
PUT products/_doc/1567
{
"product": "phone"
}
响应中包含分配的序列号:
{
"_index": "products",
"_type": "_doc",
"_id": "1567",
"_version": 1,
"result": "created",
"_seq_no": 0,
"_primary_term": 1,
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
GET API返回序列号:
GET products/_doc/1567
{
"_index": "products",
"_type": "_doc",
"_id": "1567",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"product": "phone"
}
}
搜索API返回序列号:设置 seq_no_primary_term 参数可以在搜索结果中返回每个命中文档的序列号:
GET products/_search?seq_no_primary_term=true
{
"query": {
"match": {
"product": "phone"
}
}
}
三、条件写入(if_seq_no + if_primary_term)
3.1 基本用法
_seq_no 和 _primary_term 唯一标识一个变更。通过记录这两个值,可以确保在获取文档后没有被其他操作修改的前提下才执行变更。
PUT products/_doc/1567?if_seq_no=0&if_primary_term=1
{
"product": "phone",
"tag": "new"
}
如果序列号匹配,操作成功;如果不匹配(表示文档已被其他操作修改),操作返回版本冲突错误:
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[1567]: version conflict"
}
]
},
"status": 409
}
3.2 条件写入流程
1. GET 文档 → 获取 _seq_no=0, _primary_term=1
2. 修改文档内容(客户端)
3. PUT 文档?if_seq_no=0&if_primary_term=1
├─ 成功 → 序列号匹配,写入成功
└─ 失败 → 序列号不匹配,需要重新GET并重试
3.3 条件删除
同样可以用于条件删除:
DELETE products/_doc/1567?if_seq_no=0&if_primary_term=1
四、外部版本号
4.1 version_type=external
当使用外部版本控制系统(如数据库自增ID、时间戳等)时,可以使用 version_type=external 参数:
PUT twitter/_doc/1?version=2&version_type=external
{
"message": "updated with external version"
}
4.2 外部版本号的行为
| 参数 | 行为 |
|---|---|
version_type=internal(默认) |
版本号必须严格等于当前版本+1 |
version_type=external |
版本号必须大于当前版本号 |
version_type=external_gte |
版本号必须大于等于当前版本号 |
4.3 版本控制策略对比
| 策略 | 参数 | 并发保证 | 适用场景 |
|---|---|---|---|
| 序列号机制 | if_seq_no + if_primary_term |
最强(唯一标识变更) | ES内部并发控制(推荐) |
| 内部版本号 | version |
版本严格递增 | 旧版兼容 |
| 外部版本号 | version_type=external |
版本单调递增 | 外部系统同步 |
| 无控制 | 不指定参数 | 无保证 | 单线程写入 |
五、retry_on_conflict重试策略
5.1 版本冲突的重试
在高并发场景下,多个客户端可能同时读取并尝试更新同一个文档,导致版本冲突。retry_on_conflict 参数指定在发生版本冲突时自动重试的次数。
Update API中的使用:
POST twitter/_update/1?retry_on_conflict=3
{
"script": {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params": {
"count": 1
}
}
}
Bulk API中的使用:
POST _bulk
{"update":{"_id":"1","_index":"test","retry_on_conflict":3}}
{"script":{"source":"ctx._source.counter += 1","lang":"painless"}}
5.2 重试次数建议
| 场景 | 推荐重试次数 | 说明 |
|---|---|---|
| 低并发写入 | 0-1 | 冲突概率低 |
| 中等并发写入 | 3-5 | 一般业务场景 |
| 高并发写入 | 5-10 | 热点数据更新 |
注意:
retry_on_conflict仅适用于Update API。对于Index API的条件写入(if_seq_no),需要由客户端自行实现重试逻辑。
5.3 客户端重试模式示例
对于使用 if_seq_no + if_primary_term 的场景,客户端需要自行实现重试:
max_retries = 3
for i in range(max_retries):
doc = GET /index/_doc/id
seq_no = doc._seq_no
primary_term = doc._primary_term
# 修改文档内容
doc._source.field = new_value
response = PUT /index/_doc/id?if_seq_no=seq_no&if_primary_term=primary_term
if response.status == 200:
break # 成功
elif response.status == 409:
continue # 版本冲突,重试
else:
raise Exception("操作失败")
else:
raise Exception("超过最大重试次数")
六、高并发场景下的并发策略选型
6.1 策略选择决策树
是否需要防止并发覆盖?
├── 否 → 不使用并发控制(默认行为)
└── 是
├── ES内部操作 → 使用 if_seq_no + if_primary_term
├── 外部系统同步 → 使用 version_type=external
└── Update API → 使用 retry_on_conflict
6.2 不同场景的推荐策略
| 场景 | 推荐策略 | 说明 |
|---|---|---|
| 单文档计数器更新 | retry_on_conflict + 脚本 |
自动重试,简单可靠 |
| 读-改-写模式 | if_seq_no + if_primary_term + 客户端重试 |
最严格的并发保证 |
| 数据库同步 | version_type=external |
使用数据库版本号 |
| 批量导入 | 无并发控制 | 单线程顺序写入 |
| 热点文档 | 路由+分片 | 减少单分片压力 |
6.3 数据刷新与可见性策略对比
| 策略 | 数据安全性 | 查询实时性 | 性能 | 适用场景 |
|---|---|---|---|---|
| refresh=false | 最高 | 最差(延迟1s) | 最优 | 批量导入 |
| refresh=wait_for | 高 | 中等(等待刷新) | 中等 | 需要确认可见 |
| refresh=true | 中等 | 最好(即时可见) | 最差 | 测试/调试 |
七、总结与最佳实践
7.1 核心要点回顾
- refresh参数 控制数据的搜索可见性,
false(默认)性能最优,true实时性最强 - 强制刷新代价高昂,会导致频繁创建段和触发段合并,应谨慎使用
- _seq_no + _primary_term 是Elasticsearch推荐的并发控制机制,替代了传统的_version
- if_seq_no + if_primary_term 条件写入提供最强的并发保证,适合读-改-写模式
- 外部版本号 适用于与外部系统集成的场景,版本号单调递增即可
- retry_on_conflict 是Update API的高并发利器,自动处理版本冲突
7.2 生产环境最佳实践
- 默认使用refresh=false:除非业务明确需要实时可见,否则不要设置refresh参数
- 批量导入优化:临时关闭自动刷新(
refresh_interval: -1),导入完成后再恢复 - 优先使用序列号机制:新项目应使用
if_seq_no + if_primary_term而非_version - 合理设置重试次数:根据并发量设置
retry_on_conflict,避免无限重试 - 分而治之:对于极高并发的热点文档,考虑使用自定义路由分散到更多分片
上一篇【第16篇】Elasticsearch批量操作API——Bulk、Reindex与跨集群索引
下一篇【第18篇】Elasticsearch搜索入门——搜索API与URI查询模式
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)