从rgw到rocksdb的调用链
rgw层调用
librados::IoCtxImpl::operate()
↓
objecter->prepare_mutate_op(...)
↓ Create Op object with callback
↓
objecter->op_submit(op) // Public API
↓
_op_submit_with_budget(op, rl, ...) // Throttle and timeout handling
↓
_op_submit(op, sul, ptid) // Calculate PG mapping, get session
↓
_get_session(osd, &s, sul) // Get or create OSDSession
↓
_calc_target(&op->target) // Calculate PG → OSD mapping
↓
_session_op_assign(s, op) // Save Op to session->ops[tid]
↓
to->ops[op->tid] = op; // Key: insert into map
↓
_send_op(op)
与librados::IoCtxImpl::operate 对应的还有一个librados::IoCtxImpl::aio_operate。有什么区别呢?
- librados::IoCtxImpl::aio_operate:立即返回,没有阻塞的
- librados::IoCtxImpl::operate:同步执行,如果下层没有返回,就直接阻塞
回调逻辑
librados::IoCtxImpl::operate
下面有cond.wait(l, [&done] { return done;});
objecter->op_submit(objecter_op);
Context *oncommit = new C_SafeCond(mylock, cond, &done, &r);
里面有finish 设定了down
librados::IoCtxImpl::aio_operate
下面有装到aio_write_list里面
objecter->op_submit(op, &c->tid);
Context *oncomplete = new C_aio_Complete(c);
里面也有finish 清理了 aio_write_waiters
那具体怎么回调呢?
在Objecter.cc的Objecter::_session_op_assign里面有如下代码:
to->ops[op->tid] = op;
把op保存到本地
等下层osd完成任务,会回调Objecter::handle_osd_op_reply方法,传递MOSDOpReply,在里面的
map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
那重新拿到op,然后调用context的finish。
osd层调用
PGOpItem::run
OSD::dequeue_op
PrimaryLogPG:do_request
PrimaryLogPG:do_op
PrimaryLogPG:execute_ctx
PrimaryLogPG::issue_repop
ReplicatedBackend::submit_transaction
PrimaryLogPG::queue_transactions
BlueStore::queue_transactions
而在PrimaryLogPG:execute_ctx里面就有如下代码:
ctx->register_on_commit(
[m, ctx, this](){
if (ctx->op)
log_op_stats(*ctx->op, ctx->bytes_written, ctx->bytes_read);
if (m && !ctx->sent_reply) {
// CEPH_MSG_OSD_OPREPLY here
// ctx->reply define at prepare the reply
MOSDOpReply *reply = ctx->reply;
ctx->reply = nullptr;
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending reply on " << *m << " " << reply << dendl;
osd->send_message_osd_client(reply, m->get_connection());
ctx->sent_reply = true;
ctx->op->mark_commit_sent();
}
});
在ReplicatedBackend::submit_transaction也有
op_t.register_on_commit(
parent->bless_context(
new C_OSD_OnOpCommit(this, &op)));
PrimaryLogPG::issue_repop有
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
这里都注册了回调,osd给rgw发消息就是MOSDOpReply 这里。
主完成了journal的写入: C_OSD_OnOpCommit pg->op_commit(op); ondisk ondisk_finishers; 此时可继续写?
主完成data写入: C_OSD_OnOpApplied pg->op_applied(op) onreadable apply_finishers; 此时写入的数据可读?
副本完成了journal的写入: C_OSD_RepModifyCommit pg->repop_commit(rm) send_message_osd_cluster发送到主
副本完成data写入: C_OSD_RepModifyApply pg->repop_applied(rm) send_message_osd_cluster发送到主
完成所有副本journal写入:all_committed on_all_commit C_OSD_RepopCommit repop_all_committed waiting_for_ondisk //called when all commit
完成所有副本data写入: all_applied on_all_applied C_OSD_RepopApplied repop_all_applied waiting_for_ack //called when all acked
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);//on_all_commit
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);//on_all_acked
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(xxx); //on_local_applied_sync 本地端sync完成
所有端完成了journal的写入后,此时数据已经写到journal盘,会处理waiting_for_ondisk list,ondisk状态。环境一旦崩溃,可以journal replay方式回放恢复;
所有端完成了data的写入后,即写入到了filestore层,此时代表apply成功,会处理waiting_for_ack list,向client端发送ack通知写完成,此时数据处于可读状态onreadable;
bluestore
op_t.register_on_commit(
parent->bless_context(
new C_OSD_OnOpCommit(this, &op)));
注册了回调
这个回调最后把消息 传给rgw
在bluestore层C_OSD_OnOpCommit最终被传递给了
TransContext *txc = new TransContext(cct, c, osr, on_commits);
而在BlueStore::_txc_committed_kv里面
if (txc->ch->commit_queue) {
txc->ch->commit_queue->queue(txc->oncommits);
} else {
// see PG_SendMessageOnConn
finisher.queue(txc->oncommits);
}
被执行
参考资料
https://blog.51cto.com/wendashuai/2511231
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)