Federation v2 分为两个大部分:configuration 和 propagation。KubeFed 最大改变是将 API  Server 移除,通过 CRD 来完成 Federated Resources 的扩充。而 KubeFed Controller 则管理这些 CRD,并实现同步 Resources、跨集群编排等功能。代码目录很清晰,

   pkg/core 定义了核心的 CRD,包括:

NAME API Group KIND Desc
clusterpropagatedversions core.kubefed.io ClusterPropagatedVersion ClusterPropagatedVersion 保存有关从 KubeFed API(由 FederatedTypeConfig 资源配置)传播到成员集群的状态的版本信息。 ClusterPropagatedVersion 的名称对其存储信息的资源的种类和名称进行编码(即 <小写种类>-<资源名称>)。
federatedservicestatuses core.kubefed.io FederatedServiceStatus FederatedServiceClusterStatus 是已命名集群的资源的观察状态
federatedtypeconfigs core.kubefed.io FederatedTypeConfig

FederatedTypeConfig 编程 KubeFed 以了解用户想要联邦的单个 API 类型——“目标类型”。 对于每个目标类型,都有一个对应的 FederatedType,它具有以下字段:

template 字段指定联邦资源的基本定义

placement 字段指定联邦资源的放置信息

overiders 字段指定目标资源应如何在集群之间变化

kubefedclusters core.kubefed.io KubeFedCluster KubeFedCluster 将 KubeFed 配置为感知 Kubernetes 集群并封装与集群通信所需的细节。
kubefedconfigs core.kubefed.io KubeFedConfig 配置信息
propagatedversions core.kubefed.io PropagatedVersion PropagatedVersion 保存有关从 KubeFed API(由 FederatedTypeConfig 资源配置)传播到成员集群的状态的版本信息。 PropagatedVersion 的名称对其存储信息的资源的种类和名称进行编码(即 <小写种类>-<资源名称>)。

    pkg/apis/scheduling 只定义了 CRD  ReplicaSchedulingPreference(RSP)   

Resource API Group KIND Desc
replicaschedulingpreferences scheduling.kubefed.io ReplicaSchedulingPreference

ReplicaSchedulingPreference 提供了一种自动化机制,用于将 deployment 或 replicaset 基于联邦工作负载,其以总的副本书分配和维护到联合集群中

RSP 控制器在同步循环中工作,观察 RSP 资源和匹配的命名空间/名称对,针对的资源FederatedDeployment 或 FederatedReplicaset。

    部署执行应用创建的 CRD 类型资源,API Group 对应为 types.kubefed.io

NAME SHORTNAMES API Group NAMESPACED KIND
federatedapplications fapp types.kubefed.io true FederatedApplication
federatedclusterrolebindings types.kubefed.io false FederatedClusterRoleBinding
federatedclusterroles types.kubefed.io false FederatedClusterRole
federatedconfigmaps fcm types.kubefed.io true FederatedConfigMap
federateddeployments fdeploy types.kubefed.io true FederatedDeployment
federatedglobalrolebindings types.kubefed.io false FederatedGlobalRoleBinding
federatedglobalroles types.kubefed.io false FederatedGlobalRole
federatedgroupbindings types.kubefed.io false FederatedGroupBinding
federatedgroups types.kubefed.io false FederatedGroup
federatedingresses fing types.kubefed.io true FederatedIngress
federatedjobs types.kubefed.io true FederatedJob
federatedlimitranges flimits types.kubefed.io true FederatedLimitRange
federatednamespaces fns types.kubefed.io true FederatedNamespace
federatednotificationconfigs types.kubefed.io false FederatedNotificationConfig
federatednotificationreceivers types.kubefed.io false FederatedNotificationReceiver
federatedpersistentvolumeclaims fpvc types.kubefed.io true FederatedPersistentVolumeClaim
federatedreplicasets frs types.kubefed.io true FederatedReplicaSet
federatedsecrets types.kubefed.io true FederatedSecret
federatedserviceaccounts fsa types.kubefed.io true FederatedServiceAccount
federatedservices fsvc types.kubefed.io true FederatedService
federatedstatefulsets fsts types.kubefed.io true FederatedStatefulSet
federatedusers types.kubefed.io false FederatedUser
federatedworkspacerolebindings types.kubefed.io false FederatedWorkspaceRoleBinding
federatedworkspaceroles types.kubefed.io false FederatedWorkspaceRole
federatedworkspaces types.kubefed.io false FederatedWorkspace

main

   | --> NewControllerManagerCommand

            | --> Run

                    | --> NewKubeFedLeaderElector

                             | --> startControllers

                                      | --> StartClusterController

                                                 | --> newClusterController

                                                 | --> Run

                                      | --> StartSchedulingManager

                                                 | --> newSchedulingManager

                                                 | --> Run

                                      | --> StartController

                                                | --> newController

                                                | --> Run

1. kubefedcluster.StartClusterController

    实现路径为 pkg/controller/kubefedcluster/controller.go

   1.1 newClusterController 函数

    实例化 ClusterController 结构体,NewGenericInformerWithEventHandler 函数注册的为 fedv1b1.KubeFedCluster 资源对象,handler 函数为 DeleteFunc  AddFunc  UpdateFunc,主要是加入到 ClusterController 中的 clusterDataMap 中。cache.NewInformer 函数第三方 client-go 实现,定时同步资源,调用注册的 handler 进行处理。

   1.2 updateClusterStatus 函数

    拿到所有的 kubefedclusters 列表,如果不再 clusterController 的 clusterDataMap 中,则调用 addToClusterSet 加入,调用 updateIndividualClusterStatus 更新集群的状态

   1.3 updateIndividualClusterStatus

     clusterClient.GetClusterHealthStatus() 函数获取集群的状态,最终调用的是 kubeClient.DiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(context.Background()).Raw(),返回 ok 证明集群正常运行。定期更新 kubefedclusters 的 status

status:
  conditions:
  - lastProbeTime: "2021-08-04T01:00:31Z"
    lastTransitionTime: "2021-08-03T07:47:43Z"
    message: /healthz responded with ok
    reason: ClusterReady
    status: "True"
    type: Ready

2. schedullingmanager.StartChedulingManager

    实现路径为 pkg/controller/schedulingmanager/controller.go,RegisterSchedulingType 注册全局 typeRegistry map,其中只注册了 deploymnets.apps 和 replicasets.apps,reconcile 会用到 GetSchedulingType 是否时 scheduling 类型。这里 schedulingType 后面会用到,Kind 为 RSP,SchedulerFactory 实例化,其定义实现在 pkg/schedulingtypes/replicascheduler.go 文件中

func init() {
	schedulingType := SchedulingType{
		Kind:             RSPKind,
		SchedulerFactory: NewReplicaScheduler,
	}
	RegisterSchedulingType("deployments.apps", schedulingType)
	RegisterSchedulingType("replicasets.apps", schedulingType)
}

    ReplicaScheduler 关注的是 pod 资源

func NewReplicaScheduler(controllerConfig *ctlutil.ControllerConfig, eventHandlers SchedulerEventHandlers) (Scheduler, error) {
	client := genericclient.NewForConfigOrDieWithUserAgent(controllerConfig.KubeConfig, "replica-scheduler")
	scheduler := &ReplicaScheduler{
		plugins:          ctlutil.NewSafeMap(),
		controllerConfig: controllerConfig,
		eventHandlers:    eventHandlers,
		client:           client,
	}

	// TODO: Update this to use a typed client from single target informer.
	// As of now we have a separate informer for pods, whereas all we need
	// is a typed client.
	// We ignore the pod events in this informer from clusters.
	var err error
	scheduler.podInformer, err = ctlutil.NewFederatedInformer(
		controllerConfig,
		client,
		PodResource,
		func(runtimeclient.Object) {},
		eventHandlers.ClusterLifecycleHandlers,
	)

   2.1 newSchedulingManager 函数实例化 SchedulingManager 结构体

    2.1.1 NewReconcileWorker

     实例化 asyncWorker 结构体,包括核心处理函数 reconcile

func (c *SchedulingManager) reconcile(qualifiedName util.QualifiedName) util.ReconciliationStatus {
	defer metrics.UpdateControllerReconcileDurationFromStart("schedulingmanagercontroller", time.Now())

	key := qualifiedName.String()

     schedulingtypes.GetSchedulingType(typeConfigName) 根据前面 init 注册的类型,过滤调非 deployments.apps 和 replicasets.apps 资源类型,kind 为 ReplicaSchedulingPreference

     schedulingpreference.StartSchedulingPreferenceController 在第 2.2 章节分析,这里首次启动会执行,加入到 SchedulingManager 的 schedulers 的 map 中。

klog.Infof("Starting schedulingpreference controller for %s", schedulingKind)
stopChan := make(chan struct{})
schedulerInterface, err := schedulingpreference.StartSchedulingPreferenceController(c.config, *schedulingType, stopChan)
if err != nil {
	runtime.HandleError(errors.Wrapf(err, "Error starting schedulingpreference controller for %s", schedulingKind))
	return util.StatusError
}
abstractScheduler = newSchedulerWrapper(schedulerInterface, stopChan)
c.schedulers.Store(schedulingKind, abstractScheduler)

    2.1.2 NewGenericInformer 函数

    关注的资源对象为 corev1b1.FederatedTypeConfig

   2.2 StartSchedulingPreferenceController

    newSchedulingPrederenceController 实例化 SchedulingPreferenceController 结构体,NewReconcileWorker 实例化 asyncWorker,核心 handler 函数为 reconcile 

    intersectWithClusterSelector: 如果设置为true,则将使用 RSP 放置调度结果和目标种类上指定的clusterSelector (spec.placement.clusterSelector) 的交集来确定目标种类的放置。
如果设置为 false 或未定义,则 RSP 放置调度结果将覆盖目标资源的 spec.placement.clusters 中的集群列表。

apiVersion: scheduling.kubefed.io/v1alpha1
kind: ReplicaSchedulingPreference
metadata:
  name: test-deployment
  namespace: test-namespace
spec:
  targetKind: FederatedDeployment
  totalReplicas: 9

  intersectWithClusterSelector: false

    2.2.1 reconcile 函数

    目前只支持 FederatedDeployment 和 FederatedReplicaset, 从 metadata 的 namespace 与 name 取得其对应的资源对象,如果没有创建 fdeploy 或 frs 则暂不处理等待下一个周期处理。如果已经创建了,则调用 scheduler.Reconcile 进行处理,第 2 章节起始已经初始化,定义文件在 pkg/schedulingtypes/replicascheduler.go

func (s *ReplicaScheduler) Reconcile(obj runtimeclient.Object, qualifiedName ctlutil.QualifiedName) ctlutil.ReconciliationStatus {
	rsp, ok := obj.(*fedschedulingv1a1.ReplicaSchedulingPreference)
	if !ok {
		runtime.HandleError(errors.Errorf("Incorrect runtime object for RSP: %v", rsp))
		return ctlutil.StatusError
	}

	fedClusters, err := s.podInformer.GetReadyClusters()
	if err != nil {
		runtime.HandleError(errors.Wrap(err, "Failed to get cluster list"))
		return ctlutil.StatusError
	}

    GetReadyClusters 获得所有集群

s.podInformer.GetReadyClusters()

    如果 rsp.spec.intersectWithClusterSelector 设置为 true,如果设置为true,则将使用RSP 放置调度结果和目标种类上指定的clusterSelector (spec.placement.clusterSelector) 的交集来确定目标种类的放置。如果设置为 false 或未定义,则 RSP 放置调度结果将覆盖目标资源的 spec.placement.clusters 中的集群列表。https://github.com/kubernetes-sigs/kubefed/blob/master/docs/userguide.md#using-cluster-selector 规则

if rsp.Spec.IntersectWithClusterSelector {
	klog.V(3).Infof("Computing placement of resource %q", qualifiedName)

	resultClusters, err := plugin.(*Plugin).GetResourceClusters(qualifiedName, fedClusters)
	if err != nil {
		runtime.HandleError(errors.Wrapf(err, "Failed to get preferred clusters while reconciling RSP named %q", key))
		return ctlutil.StatusError
	}

	preferredClusters := []string{}
	for clusterName := range resultClusters {
		preferredClusters = append(preferredClusters, clusterName)
	}
	if len(preferredClusters) == 0 {
		return ctlutil.StatusAllOK
	}
	clusterNames = preferredClusters

	klog.V(3).Infof("Preferred clusters %q", clusterNames)
}

    这里更改资源的 spec 定义,包括 placement 和 overrides 

err = plugin.(*Plugin).Reconcile(qualifiedName, result)
if err != nil {
	runtime.HandleError(errors.Wrapf(err, "Failed to reconcile federated targets for RSP named %q", key))
	return ctlutil.StatusError
}

spec:
  overrides:
  - clusterName: admin
    clusterOverrides:
    - path: /spec/replicas
      value: 5
  - clusterName: test-2
    clusterOverrides:
    - path: /spec/replicas
      value: 4
  placement:
    clusters:
    - name: test-2
    - name: admin

3. federatedtypeconfig.StartController

    实现路径为 pkg/controller/federatedtypeconfig/controller.go,NewReconcileWorker 实例化 asyncWorker 核心处理函数为 reconcile,对于目标类型为 deploymnets.apps 的 FederatedTypeConfig FederatedDeployment  的定义为:

spec:
  federatedType:
    group: types.kubefed.io
    kind: FederatedDeployment
    pluralName: federateddeployments
    scope: Namespaced
    version: v1beta1
  propagation: Enabled
  targetType:
    group: apps
    kind: Deployment
    pluralName: deployments
    scope: Namespaced
    version: v1
status:
  observedGeneration: 1
  propagationController: Running
  statusController: NotRunning

   3.1 reconcile 函数

    3.1.1 startSyncController 函数

    对于每一种 FederatedTypeConfig 资源类型首次需要调用 startSyncController 函数初始化,注册到 stopChannels 表示已经同步开启。

    实例化 KubeFedSyncController 结构体,执行其 Run 方法,一样套路,定期执行其 reconcile 方法。informer 关注的是 k8s 资源类型,也就是 FederatedTypeConfig 对应的目标类型(target type)

    3.1.1.1 syncToCllusters 确保资源对象同步到 member 集群

// syncToClusters ensures that the state of the given object is
// synchronized to member clusters.
func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) util.ReconciliationStatus {
	// Enable raw resource status collection if the statusCollection is enabled for that type
	// and the feature is also enabled.
	enableRawResourceStatusCollection := s.typeConfig.GetStatusEnabled() && s.rawResourceStatusCollection

    获得所有的 kubefedclusters 集群,取得资源 placement 的集群名称,NewManagedDispatcher 实例化 managedDispatcherImpl,实现在 pkg/controller/sync/dispatch/managed.go

clusters, err := s.informer.GetClusters()
selectedClusterNames, err := fedResource.ComputePlacement(clusters)

dispatch.NewManagedDispatcher(s.informer.GetClientForCluster, fedResource, s.skipAdoptingResources, enableRawResourceStatusCollection)

    3.1.1.1.1 不存在则创建,存在则更新

    遍历所有的集群,这就是传播到成员集群

// TODO(marun) Consider waiting until the result of resource
// creation has reached the target store before attempting
// subsequent operations.  Otherwise the object won't be found
// but an add operation will fail with AlreadyExists.
if clusterObj == nil {
	dispatcher.Create(clusterName)
} else {
	dispatcher.Update(clusterName, clusterObj)
}

    3.1.1.1.2 更新写入版本资源

// Write updated versions to the API.
updatedVersionMap := dispatcher.VersionMap()
err = fedResource.UpdateVersions(selectedClusterNames.List(), updatedVersionMap)
if err != nil {
	// Versioning of federated resources is an optimization to
	// avoid unnecessary updates, and failure to record version
	// information does not indicate a failure of propagation.
	runtime.HandleError(err)
}

fedResource.UpdateVersions

    | --> versionManager.Update

             | --> m.writeVersion

参考:

    https://github.com/kubernetes-sigs/kubefed

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐