containerd 2.1.8 超深度分析 — CLI入口 + Server + Plugin加载 + 配置

源码:cmd/containerd/main.go (32行) + cmd/containerd/command/main.go (约200行) + cmd/containerd/server/server.go (600行) + cmd/containerd/server/config/config.go (497行) + plugins/types.go (105行) + cmd/containerd/builtins/


一、main.go — 最简入口

package main

import (
    "fmt"
    "os"
    "github.com/containerd/containerd/v2/cmd/containerd/command"
    _ "github.com/containerd/containerd/v2/cmd/containerd/builtins"  // 侧导入:注册所有内置插件
)

func main() {
    app := command.App()         // 构建 urfave/cli 应用
    if err := app.Run(os.Args); err != nil {
        fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
        os.Exit(1)
    }
}

关键设计_ "builtins" 是 Go 侧导入模式。builtins 包的 init() 函数调用 plugin.Register() 注册所有内置插件。这样:

  1. 编译时决定包含哪些插件(通过 build tags)
  2. 运行时无需动态加载
  3. 插件注册通过 Go 的 init 链自动完成

二、command.App — CLI 命令注册

func App() *cli.App {
    app := cli.NewApp()
    app.Name = "containerd"
    app.Version = version.Version
    app.Usage = usage  // ASCII art logo

    app.Flags = []cli.Flag{
        cli.StringFlag{Name: "config", Value: defaultConfigPath, Usage: "path to configuration file"},
        cli.StringFlag{Name: "address", Value: defaults.DefaultAddress, Usage: "Address for containerd's GRPC server"},
        cli.StringFlag{Name: "root", Value: defaults.DefaultRootDir, Usage: "containerd root directory"},
        cli.StringFlag{Name: "state", Value: defaults.DefaultStateDir, Usage: "containerd state directory"},
        cli.StringFlag{Name: "log-level", Value: "info", Usage: "log level"},
        cli.StringFlag{Name: "log-format", Usage: "log format (text|json)"},
        // ... 更多 flags
    }

    app.Commands = []*cli.Command{
        configCommand,   // containerd config (dump/default/migrate)
        publishCommand,  // containerd publish (事件发布)
        ociHookCommand,  // containerd oci-hook
    }

    app.Action = func(cliContext *cli.Context) error {
        // ─── 主入口:启动 daemon ───
        return serverAction(cliContext)
    }
    return app
}

三、serverAction — Daemon 启动主流程

func serverAction(cliContext *cli.Context) error {
    // ─── Step 1: 读取配置 ───
    configPath := cliContext.String("config")
    config, err := srvconfig.LoadConfig(ctx, configPath)

    // ─── Step 2: 命令行覆盖 ───
    if root := cliContext.String("root"); root != "" {
        config.Root = root
    }
    if state := cliContext.String("state"); state != "" {
        config.State = state
    }
    if address := cliContext.String("address"); address != "" {
        config.GRPC.Address = address
    }

    // ─── Step 3: 创建顶层目录 ───
    if err := server.CreateTopLevelDirectories(config); err != nil {
        return err
    }
    // root: /var/lib/containerd (0700)
    // state: /run/containerd (0711)
    // TMPDIR: 设置临时目录

    // ─── Step 4: 设置日志 ───
    logLevel := cliContext.String("log-level")
    logFormat := cliContext.String("log-format")

    // ─── Step 5: 创建 Server ───
    srv, err := server.New(ctx, config)
    // 内部: 加载插件 → 初始化 → 注册 gRPC/ttrpc 服务

    // ─── Step 6: 注册信号处理 ───
    signals := make(chan os.Signal, 2048)
    signal.Notify(signals, ...)

    // ─── Step 7: 启动所有服务 ───
    // gRPC listener
    grpcListener, _ := net.Listen("unix", config.GRPC.Address)
    go srv.ServeGRPC(grpcListener)

    // ttrpc listener
    ttrpcListener, _ := net.Listen("unix", config.TTRPC.Address)
    go srv.ServeTTRPC(ttrpcListener)

    // Metrics listener (可选)
    if config.Metrics.Address != "" {
        metricsListener, _ := net.Listen("tcp", config.Metrics.Address)
        go srv.ServeMetrics(metricsListener)
    }

    // Debug listener (可选)
    if config.Debug.Address != "" {
        debugListener, _ := net.Listen("tcp", config.Debug.Address)
        go srv.ServeDebug(debugListener)
    }

    // ─── Step 8: 就绪通知 ───
    if config.GRPC.Address != "" {
        // 通知 systemd: READY=1 (sd-notify)
    }

    // ─── Step 9: 等待退出信号 ───
    select {
    case <-signals:
        // 收到 SIGTERM/SIGINT → 优雅关闭
    }
    srv.Stop()
}

Daemon 启动时序

Signal Handler Metrics Listener ttrpc Listener gRPC Listener server.New() CreateTopLevelDirectories Config.Load serverAction() App() main() Signal Handler Metrics Listener ttrpc Listener gRPC Listener server.New() CreateTopLevelDirectories Config.Load serverAction() App() main() 注册 flags + commands 加载插件 → 初始化 → 注册服务 阻塞等待信号 App() app.Run(os.Args) serverAction(ctx) LoadConfig(configPath) CreateTopLevelDirectories() server.New(ctx, config) *Server Listen("unix", address) Listen("unix", ttrpcAddress) ServeGRPC + ServeTTRPC + ServeMetrics signal.Notify() SIGTERM Stop()

四、server.New — 核心初始化

4.1 完整流程

func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
    // ─── Step 1: 配置迁移 ───
    if config.Version < version.ConfigVersion {
        config.MigrateConfig(ctx)
    }

    // ─── Step 2: 应用配置 ───
    apply(ctx, config)  // 解析 timeouts 等

    // ─── Step 3: 加载插件 ───
    loaded, err := LoadPlugins(ctx, config)
    // 1. 注册 proxy 插件 (config.ProxyPlugins)
    // 2. 生成拓扑排序的依赖图
    // 3. 过滤掉 DisabledPlugins

    // ─── Step 4: 注册 stream processors ───
    for id, p := range config.StreamProcessors {
        diff.RegisterProcessor(diff.BinaryHandler(id, ...))
    }

    // ─── Step 5: 创建 gRPC/ttrpc Server ───
    grpcServer := grpc.NewServer(serverOpts...)
    ttrpcServer := newTTRPCServer()
    tcpServer := grpc.NewServer(tcpServerOpts...)

    // ─── Step 6: 初始化所有插件 ───
    for _, p := range loaded {
        initContext := plugin.NewContext(ctx, initialized, properties)
        // 设置 root/state/grpc-address/ttrpc-address

        // 加载插件配置
        if p.Config != nil {
            pc, _ := config.Decode(ctx, p.URI(), p.Config)
            initContext.Config = pc
        }

        // 初始化插件
        result := p.Init(initContext)
        instance, _ := result.Instance()

        // 类型断言:注册到对应 Server
        if src, ok := instance.(grpcService); ok {
            grpcServices = append(grpcServices, src)
        }
        if src, ok := instance.(ttrpcService); ok {
            ttrpcServices = append(ttrpcServices, src)
        }
        if src, ok := instance.(tcpService); ok {
            tcpServices = append(tcpServices, src)
        }
    }

    // ─── Step 7: 注册服务 ───
    for _, service := range grpcServices {
        service.Register(grpcServer)
    }
    for _, service := range ttrpcServices {
        service.RegisterTTRPC(ttrpcServer)
    }
    for _, service := range tcpServices {
        service.RegisterTCP(tcpServer)
    }

    return &Server{...}, nil
}

4.2 Plugin 初始化时序

Yes+fail

No

plugin.Register()
init() 中注册

registry.Graph(filter)
拓扑排序

遍历 sorted plugins

NewContext(initialized, props)

config.Decode()

p.Init(initContext)

result.Instance()

类型断言

grpcService → Register

ttrpcService → RegisterTTRPC

tcpService → RegisterTCP

SkipPlugin → continue

Required?

返回错误


五、LoadPlugins — 插件加载与拓扑排序

func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]plugin.Registration, error) {
    // ─── Step 1: 注册 Proxy 插件 ───
    clients := &proxyClients{}
    for name, pp := range config.ProxyPlugins {
        // 根据类型创建不同的 proxy
        switch pp.Type {
        case string(plugins.SnapshotPlugin):
            // → ssproxy.NewSnapshotter
        case string(plugins.ContentPlugin):
            // → csproxy.NewContentStore
        case string(plugins.SandboxControllerPlugin):
            // → sbproxy.NewSandboxController
        case string(plugins.DiffPlugin):
            // → diffproxy.NewDiffApplier
        }
        registry.Register(&plugin.Registration{Type: t, ID: name, InitFn: ...})
    }

    // ─── Step 2: 生成依赖图 ───
    filter := srvconfig.V2DisabledFilter
    return registry.Graph(filter(config.DisabledPlugins)), nil
    // registry.Graph 内部:
    //   1. 收集所有注册的 Registration
    //   2. 根据 Requires 字段建立依赖关系
    //   3. 拓扑排序
    //   4. 过滤掉 DisabledPlugins
}

插件依赖图示例

Requires

Requires

Requires

Requires

Requires

Requires

Requires

Requires

Requires

Requires

Requires

MetadataPlugin
(io.containerd.metadata.v1)

ContentPlugin
(io.containerd.content.v1)

SnapshotPlugin
(io.containerd.snapshotter.v1)

DiffPlugin
(io.containerd.differ.v1)

GCPlugin
(io.containerd.gc.v1)

GRPC-Containers
(io.containerd.grpc.v1)

GRPC-Tasks
(io.containerd.grpc.v1)

GRPC-Images
(io.containerd.grpc.v1)

RuntimeV2
(io.containerd.runtime.v2)

CRIService
(io.containerd.cri.v1)


六、Config — 配置结构

6.1 顶层 Config

type Config struct {
    Version    int                // 配置版本
    Root       string             // 根目录 (/var/lib/containerd)
    State      string             // 状态目录 (/run/containerd)
    TempDir    string             // 临时目录
    PluginDir  string             // 插件目录
    GRPC       GRPCConfig         // gRPC 配置
    TTRPC      TTRPCConfig        // ttrpc 配置
    Debug      DebugConfig        // Debug 配置
    Metrics    MetricsConfig      // Metrics 配置
    DisabledPlugins []string      // 禁用的插件
    RequiredPlugins []string      // 必需的插件
    Plugins    map[string]Plugin  // 插件配置 (key = plugin URI)
    ProxyPlugins map[string]ProxyPlugin  // 代理插件
    StreamProcessors map[string]StreamProcessor  // 流处理器
    Timeouts   map[string]string  // 超时配置
    OOMScore   int                // OOM score
    CgroupPath string             // Cgroup 路径
}

6.2 GRPCConfig

type GRPCConfig struct {
    Address        string         // Unix socket 地址 (/run/containerd/containerd.sock)
    TCPTLSCert     string         // TCP TLS 证书
    TCPTLSKey      string         // TCP TLS 私钥
    TCPTLSCA       string         // TCP TLS CA
    MaxRecvMsgSize int            // 最大接收消息大小 (默认 16MB)
    MaxSendMsgSize int            // 最大发送消息大小 (默认 16MB)
}

七、builtins — 内置插件注册

7.1 builtins 目录结构

cmd/containerd/builtins/
├── builtins.go          // 导入所有内置模块
├── builtins_linux.go    // Linux 特有
├── builtins_unix.go     // Unix 特有
├── builtins_windows.go  // Windows 特有

builtins.go 核心导入

package builtins

import (
    _ "github.com/containerd/containerd/v2/plugins/cri"        // CRI 插件
    _ "github.com/containerd/containerd/v2/plugins/gc"         // GC 调度器
    _ "github.com/containerd/containerd/v2/plugins/metadata"   // Metadata
    _ "github.com/containerd/containerd/v2/plugins/restart"    // Restart 监控
    _ "github.com/containerd/containerd/v2/plugins/sandbox"    // Sandbox
    _ "github.com/containerd/containerd/v2/plugins/streaming"  // Streaming
    _ "github.com/containerd/containerd/v2/plugins/transfer"   // Transfer
    _ "github.com/containerd/containerd/v2/plugins/leases"     // Leases
    _ "github.com/containerd/containerd/v2/plugins/events"     // Events
    _ "github.com/containerd/containerd/v2/plugins/services"   // gRPC services
    _ "github.com/containerd/containerd/v2/plugins/nri"        // NRI
    _ "github.com/containerd/containerd/v2/plugins/imageverifier" // Image Verifier
    // ... runtime, snapshotter, differ, content 等
)

每个包的 init() 调用 plugin.Register(),注册自身到全局 registry。

7.2 插件注册示例 (metadata plugin)

// plugins/metadata/plugin.go
func init() {
    plugin.Register(&plugin.Registration{
        Type: plugins.MetadataPlugin,
        ID:   "bolt",
        Requires: []plugin.Type{
            plugins.ContentPlugin,
            plugins.SnapshotPlugin,
        },
        Config: &srvconfig.BoltConfig{ContentSharingPolicy: "shared"},
        InitFn: func(ic *plugin.InitContext) (interface{}, error) {
            // 获取依赖的 Content 和 Snapshot 插件实例
            contentProvider, _ := getPlugin[content.Store](ic, plugins.ContentPlugin)
            snapshotter, _ := getPlugin[snapshots.Snapshotter](ic, plugins.SnapshotPlugin)

            // 打开 BoltDB
            db, _ := bolt.Open(path, 0o600, &bolt.Options{Timeout: 10 * time.Second})

            // 返回 DB 实例
            return metadata.NewDB(db, contentProvider, map[string]snapshots.Snapshotter{...})
        },
    })
}

八、Namespace 拦截器 — 多租户隔离

func unaryNamespaceInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // 从 gRPC metadata 提取 namespace
    ns, _ := namespaces.NamespaceFrom(ctx)
    if ns == "" {
        ns = namespaces.Default  // "default"
    }
    // 注入 namespace 到 context
    ctx = namespaces.WithNamespace(ctx, ns)
    return handler(ctx, req)
}

设计意图:containerd 通过 namespace 实现多租户隔离。Kubernetes 的每个 namespace 对应 containerd 的一个 namespace。BoltDB 中不同 namespace 的数据完全隔离。


九、设计模式总结

# 模式 体现
1 Plugin 体系 Registration + InitContext + Graph + 依赖注入
2 侧导入 _ "builtins" 触发 init() 注册
3 拓扑排序 registry.Graph() 解析依赖
4 类型断言分发 grpcService/ttrpcService/tcpService 接口
5 Proxy 模式 ProxyPlugins → gRPC proxy 到远端
6 多租户 Namespace 拦截器 + BoltDB bucket 隔离
7 双协议 gRPC (外部API) + ttrpc (shim通信)
8 配置迁移 MigrateConfig + ConfigVersion
9 必选/禁用 RequiredPlugins / DisabledPlugins
10 优雅关闭 Stop() 反向遍历 plugins + Close()
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐