【containerd 2.1.8】(Part 2)containerd 2.1.8 超深度分析 — CLI入口 + Server + Plugin加载 + 配置
·
containerd 2.1.8 超深度分析 — CLI入口 + Server + Plugin加载 + 配置
源码:
cmd/containerd/main.go(32行) +cmd/containerd/command/main.go(约200行) +cmd/containerd/server/server.go(600行) +cmd/containerd/server/config/config.go(497行) +plugins/types.go(105行) +cmd/containerd/builtins/
一、main.go — 最简入口
package main
import (
"fmt"
"os"
"github.com/containerd/containerd/v2/cmd/containerd/command"
_ "github.com/containerd/containerd/v2/cmd/containerd/builtins" // 侧导入:注册所有内置插件
)
func main() {
app := command.App() // 构建 urfave/cli 应用
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
os.Exit(1)
}
}
关键设计:_ "builtins" 是 Go 侧导入模式。builtins 包的 init() 函数调用 plugin.Register() 注册所有内置插件。这样:
- 编译时决定包含哪些插件(通过 build tags)
- 运行时无需动态加载
- 插件注册通过 Go 的 init 链自动完成
二、command.App — CLI 命令注册
func App() *cli.App {
app := cli.NewApp()
app.Name = "containerd"
app.Version = version.Version
app.Usage = usage // ASCII art logo
app.Flags = []cli.Flag{
cli.StringFlag{Name: "config", Value: defaultConfigPath, Usage: "path to configuration file"},
cli.StringFlag{Name: "address", Value: defaults.DefaultAddress, Usage: "Address for containerd's GRPC server"},
cli.StringFlag{Name: "root", Value: defaults.DefaultRootDir, Usage: "containerd root directory"},
cli.StringFlag{Name: "state", Value: defaults.DefaultStateDir, Usage: "containerd state directory"},
cli.StringFlag{Name: "log-level", Value: "info", Usage: "log level"},
cli.StringFlag{Name: "log-format", Usage: "log format (text|json)"},
// ... 更多 flags
}
app.Commands = []*cli.Command{
configCommand, // containerd config (dump/default/migrate)
publishCommand, // containerd publish (事件发布)
ociHookCommand, // containerd oci-hook
}
app.Action = func(cliContext *cli.Context) error {
// ─── 主入口:启动 daemon ───
return serverAction(cliContext)
}
return app
}
三、serverAction — Daemon 启动主流程
func serverAction(cliContext *cli.Context) error {
// ─── Step 1: 读取配置 ───
configPath := cliContext.String("config")
config, err := srvconfig.LoadConfig(ctx, configPath)
// ─── Step 2: 命令行覆盖 ───
if root := cliContext.String("root"); root != "" {
config.Root = root
}
if state := cliContext.String("state"); state != "" {
config.State = state
}
if address := cliContext.String("address"); address != "" {
config.GRPC.Address = address
}
// ─── Step 3: 创建顶层目录 ───
if err := server.CreateTopLevelDirectories(config); err != nil {
return err
}
// root: /var/lib/containerd (0700)
// state: /run/containerd (0711)
// TMPDIR: 设置临时目录
// ─── Step 4: 设置日志 ───
logLevel := cliContext.String("log-level")
logFormat := cliContext.String("log-format")
// ─── Step 5: 创建 Server ───
srv, err := server.New(ctx, config)
// 内部: 加载插件 → 初始化 → 注册 gRPC/ttrpc 服务
// ─── Step 6: 注册信号处理 ───
signals := make(chan os.Signal, 2048)
signal.Notify(signals, ...)
// ─── Step 7: 启动所有服务 ───
// gRPC listener
grpcListener, _ := net.Listen("unix", config.GRPC.Address)
go srv.ServeGRPC(grpcListener)
// ttrpc listener
ttrpcListener, _ := net.Listen("unix", config.TTRPC.Address)
go srv.ServeTTRPC(ttrpcListener)
// Metrics listener (可选)
if config.Metrics.Address != "" {
metricsListener, _ := net.Listen("tcp", config.Metrics.Address)
go srv.ServeMetrics(metricsListener)
}
// Debug listener (可选)
if config.Debug.Address != "" {
debugListener, _ := net.Listen("tcp", config.Debug.Address)
go srv.ServeDebug(debugListener)
}
// ─── Step 8: 就绪通知 ───
if config.GRPC.Address != "" {
// 通知 systemd: READY=1 (sd-notify)
}
// ─── Step 9: 等待退出信号 ───
select {
case <-signals:
// 收到 SIGTERM/SIGINT → 优雅关闭
}
srv.Stop()
}
Daemon 启动时序
四、server.New — 核心初始化
4.1 完整流程
func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
// ─── Step 1: 配置迁移 ───
if config.Version < version.ConfigVersion {
config.MigrateConfig(ctx)
}
// ─── Step 2: 应用配置 ───
apply(ctx, config) // 解析 timeouts 等
// ─── Step 3: 加载插件 ───
loaded, err := LoadPlugins(ctx, config)
// 1. 注册 proxy 插件 (config.ProxyPlugins)
// 2. 生成拓扑排序的依赖图
// 3. 过滤掉 DisabledPlugins
// ─── Step 4: 注册 stream processors ───
for id, p := range config.StreamProcessors {
diff.RegisterProcessor(diff.BinaryHandler(id, ...))
}
// ─── Step 5: 创建 gRPC/ttrpc Server ───
grpcServer := grpc.NewServer(serverOpts...)
ttrpcServer := newTTRPCServer()
tcpServer := grpc.NewServer(tcpServerOpts...)
// ─── Step 6: 初始化所有插件 ───
for _, p := range loaded {
initContext := plugin.NewContext(ctx, initialized, properties)
// 设置 root/state/grpc-address/ttrpc-address
// 加载插件配置
if p.Config != nil {
pc, _ := config.Decode(ctx, p.URI(), p.Config)
initContext.Config = pc
}
// 初始化插件
result := p.Init(initContext)
instance, _ := result.Instance()
// 类型断言:注册到对应 Server
if src, ok := instance.(grpcService); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(ttrpcService); ok {
ttrpcServices = append(ttrpcServices, src)
}
if src, ok := instance.(tcpService); ok {
tcpServices = append(tcpServices, src)
}
}
// ─── Step 7: 注册服务 ───
for _, service := range grpcServices {
service.Register(grpcServer)
}
for _, service := range ttrpcServices {
service.RegisterTTRPC(ttrpcServer)
}
for _, service := range tcpServices {
service.RegisterTCP(tcpServer)
}
return &Server{...}, nil
}
4.2 Plugin 初始化时序
五、LoadPlugins — 插件加载与拓扑排序
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]plugin.Registration, error) {
// ─── Step 1: 注册 Proxy 插件 ───
clients := &proxyClients{}
for name, pp := range config.ProxyPlugins {
// 根据类型创建不同的 proxy
switch pp.Type {
case string(plugins.SnapshotPlugin):
// → ssproxy.NewSnapshotter
case string(plugins.ContentPlugin):
// → csproxy.NewContentStore
case string(plugins.SandboxControllerPlugin):
// → sbproxy.NewSandboxController
case string(plugins.DiffPlugin):
// → diffproxy.NewDiffApplier
}
registry.Register(&plugin.Registration{Type: t, ID: name, InitFn: ...})
}
// ─── Step 2: 生成依赖图 ───
filter := srvconfig.V2DisabledFilter
return registry.Graph(filter(config.DisabledPlugins)), nil
// registry.Graph 内部:
// 1. 收集所有注册的 Registration
// 2. 根据 Requires 字段建立依赖关系
// 3. 拓扑排序
// 4. 过滤掉 DisabledPlugins
}
插件依赖图示例
六、Config — 配置结构
6.1 顶层 Config
type Config struct {
Version int // 配置版本
Root string // 根目录 (/var/lib/containerd)
State string // 状态目录 (/run/containerd)
TempDir string // 临时目录
PluginDir string // 插件目录
GRPC GRPCConfig // gRPC 配置
TTRPC TTRPCConfig // ttrpc 配置
Debug DebugConfig // Debug 配置
Metrics MetricsConfig // Metrics 配置
DisabledPlugins []string // 禁用的插件
RequiredPlugins []string // 必需的插件
Plugins map[string]Plugin // 插件配置 (key = plugin URI)
ProxyPlugins map[string]ProxyPlugin // 代理插件
StreamProcessors map[string]StreamProcessor // 流处理器
Timeouts map[string]string // 超时配置
OOMScore int // OOM score
CgroupPath string // Cgroup 路径
}
6.2 GRPCConfig
type GRPCConfig struct {
Address string // Unix socket 地址 (/run/containerd/containerd.sock)
TCPTLSCert string // TCP TLS 证书
TCPTLSKey string // TCP TLS 私钥
TCPTLSCA string // TCP TLS CA
MaxRecvMsgSize int // 最大接收消息大小 (默认 16MB)
MaxSendMsgSize int // 最大发送消息大小 (默认 16MB)
}
七、builtins — 内置插件注册
7.1 builtins 目录结构
cmd/containerd/builtins/
├── builtins.go // 导入所有内置模块
├── builtins_linux.go // Linux 特有
├── builtins_unix.go // Unix 特有
├── builtins_windows.go // Windows 特有
builtins.go 核心导入
package builtins
import (
_ "github.com/containerd/containerd/v2/plugins/cri" // CRI 插件
_ "github.com/containerd/containerd/v2/plugins/gc" // GC 调度器
_ "github.com/containerd/containerd/v2/plugins/metadata" // Metadata
_ "github.com/containerd/containerd/v2/plugins/restart" // Restart 监控
_ "github.com/containerd/containerd/v2/plugins/sandbox" // Sandbox
_ "github.com/containerd/containerd/v2/plugins/streaming" // Streaming
_ "github.com/containerd/containerd/v2/plugins/transfer" // Transfer
_ "github.com/containerd/containerd/v2/plugins/leases" // Leases
_ "github.com/containerd/containerd/v2/plugins/events" // Events
_ "github.com/containerd/containerd/v2/plugins/services" // gRPC services
_ "github.com/containerd/containerd/v2/plugins/nri" // NRI
_ "github.com/containerd/containerd/v2/plugins/imageverifier" // Image Verifier
// ... runtime, snapshotter, differ, content 等
)
每个包的 init() 调用 plugin.Register(),注册自身到全局 registry。
7.2 插件注册示例 (metadata plugin)
// plugins/metadata/plugin.go
func init() {
plugin.Register(&plugin.Registration{
Type: plugins.MetadataPlugin,
ID: "bolt",
Requires: []plugin.Type{
plugins.ContentPlugin,
plugins.SnapshotPlugin,
},
Config: &srvconfig.BoltConfig{ContentSharingPolicy: "shared"},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
// 获取依赖的 Content 和 Snapshot 插件实例
contentProvider, _ := getPlugin[content.Store](ic, plugins.ContentPlugin)
snapshotter, _ := getPlugin[snapshots.Snapshotter](ic, plugins.SnapshotPlugin)
// 打开 BoltDB
db, _ := bolt.Open(path, 0o600, &bolt.Options{Timeout: 10 * time.Second})
// 返回 DB 实例
return metadata.NewDB(db, contentProvider, map[string]snapshots.Snapshotter{...})
},
})
}
八、Namespace 拦截器 — 多租户隔离
func unaryNamespaceInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 从 gRPC metadata 提取 namespace
ns, _ := namespaces.NamespaceFrom(ctx)
if ns == "" {
ns = namespaces.Default // "default"
}
// 注入 namespace 到 context
ctx = namespaces.WithNamespace(ctx, ns)
return handler(ctx, req)
}
设计意图:containerd 通过 namespace 实现多租户隔离。Kubernetes 的每个 namespace 对应 containerd 的一个 namespace。BoltDB 中不同 namespace 的数据完全隔离。
九、设计模式总结
| # | 模式 | 体现 |
|---|---|---|
| 1 | Plugin 体系 | Registration + InitContext + Graph + 依赖注入 |
| 2 | 侧导入 | _ "builtins" 触发 init() 注册 |
| 3 | 拓扑排序 | registry.Graph() 解析依赖 |
| 4 | 类型断言分发 | grpcService/ttrpcService/tcpService 接口 |
| 5 | Proxy 模式 | ProxyPlugins → gRPC proxy 到远端 |
| 6 | 多租户 | Namespace 拦截器 + BoltDB bucket 隔离 |
| 7 | 双协议 | gRPC (外部API) + ttrpc (shim通信) |
| 8 | 配置迁移 | MigrateConfig + ConfigVersion |
| 9 | 必选/禁用 | RequiredPlugins / DisabledPlugins |
| 10 | 优雅关闭 | Stop() 反向遍历 plugins + Close() |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)