rgw层调用

librados::IoCtxImpl::operate()
  ↓
objecter->prepare_mutate_op(...)
  ↓ Create Op object with callback
  ↓
objecter->op_submit(op)              //  Public API_op_submit_with_budget(op, rl, ...)  //  Throttle and timeout handling_op_submit(op, sul, ptid)            //  Calculate PG mapping, get session_get_session(osd, &s, sul)          // Get or create OSDSession_calc_target(&op->target)           // Calculate PG → OSD mapping_session_op_assign(s, op)           //  Save Op to session->ops[tid]
  ↓
to->ops[op->tid] = op;             // Key: insert into map_send_op(op)   

与librados::IoCtxImpl::operate 对应的还有一个librados::IoCtxImpl::aio_operate。有什么区别呢?

  • librados::IoCtxImpl::aio_operate:立即返回,没有阻塞的
  • librados::IoCtxImpl::operate:同步执行,如果下层没有返回,就直接阻塞

回调逻辑

librados::IoCtxImpl::operate
  下面有cond.wait(l, [&done] { return done;});
  objecter->op_submit(objecter_op);

  Context *oncommit = new C_SafeCond(mylock, cond, &done, &r);
  里面有finish 设定了down


librados::IoCtxImpl::aio_operate
  下面有装到aio_write_list里面
   objecter->op_submit(op, &c->tid);

 Context *oncomplete = new C_aio_Complete(c);
   里面也有finish 清理了 aio_write_waiters

那具体怎么回调呢?

在Objecter.cc的Objecter::_session_op_assign里面有如下代码:

to->ops[op->tid] = op;

把op保存到本地
等下层osd完成任务,会回调Objecter::handle_osd_op_reply方法,传递MOSDOpReply,在里面的

map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);

那重新拿到op,然后调用context的finish。

osd层调用

PGOpItem::run
      OSD::dequeue_op
        PrimaryLogPG:do_request
          PrimaryLogPG:do_op
            PrimaryLogPG:execute_ctx
              PrimaryLogPG::issue_repop
                ReplicatedBackend::submit_transaction
                  PrimaryLogPG::queue_transactions
                    BlueStore::queue_transactions

而在PrimaryLogPG:execute_ctx里面就有如下代码:

ctx->register_on_commit(
            [m, ctx, this](){
                if (ctx->op)
                    log_op_stats(*ctx->op, ctx->bytes_written, ctx->bytes_read);
                    if (m && !ctx->sent_reply) {
                    // CEPH_MSG_OSD_OPREPLY here
                    // ctx->reply define at prepare the reply
                    MOSDOpReply *reply = ctx->reply;
                    ctx->reply = nullptr;
                     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
                    dout(10) << " sending reply on " << *m << " " << reply << dendl;
                    osd->send_message_osd_client(reply, m->get_connection());
                   ctx->sent_reply = true;
                    ctx->op->mark_commit_sent();
                }
            });

在ReplicatedBackend::submit_transaction也有

op_t.register_on_commit(
        parent->bless_context(
                new C_OSD_OnOpCommit(this, &op)));

PrimaryLogPG::issue_repop有

Context *on_all_commit = new C_OSD_RepopCommit(this, repop);

这里都注册了回调,osd给rgw发消息就是MOSDOpReply 这里。

主完成了journal的写入: C_OSD_OnOpCommit pg->op_commit(op); ondisk ondisk_finishers; 此时可继续写?
主完成data写入: C_OSD_OnOpApplied pg->op_applied(op) onreadable apply_finishers; 此时写入的数据可读?

副本完成了journal的写入: C_OSD_RepModifyCommit pg->repop_commit(rm) send_message_osd_cluster发送到主
副本完成data写入: C_OSD_RepModifyApply pg->repop_applied(rm) send_message_osd_cluster发送到主

完成所有副本journal写入:all_committed on_all_commit C_OSD_RepopCommit repop_all_committed waiting_for_ondisk //called when all commit
完成所有副本data写入: all_applied on_all_applied C_OSD_RepopApplied repop_all_applied waiting_for_ack //called when all acked

Context *on_all_commit = new C_OSD_RepopCommit(this, repop);//on_all_commit
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);//on_all_acked
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(xxx); //on_local_applied_sync 本地端sync完成

所有端完成了journal的写入后,此时数据已经写到journal盘,会处理waiting_for_ondisk list,ondisk状态。环境一旦崩溃,可以journal replay方式回放恢复;
所有端完成了data的写入后,即写入到了filestore层,此时代表apply成功,会处理waiting_for_ack list,向client端发送ack通知写完成,此时数据处于可读状态onreadable;

bluestore

op_t.register_on_commit(
            parent->bless_context(
                    new C_OSD_OnOpCommit(this, &op)));

注册了回调
这个回调最后把消息 传给rgw
在bluestore层C_OSD_OnOpCommit最终被传递给了

TransContext *txc = new TransContext(cct, c, osr, on_commits);

而在BlueStore::_txc_committed_kv里面

if (txc->ch->commit_queue) {
  txc->ch->commit_queue->queue(txc->oncommits);
} else {
    // see PG_SendMessageOnConn
  finisher.queue(txc->oncommits);
}

被执行

参考资料

https://blog.51cto.com/wendashuai/2511231

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐