找回密码
 立即注册
首页 业界区 业界 使用scheduler-plugins实现自定义调度器

使用scheduler-plugins实现自定义调度器

颜才 前天 16:08
一、环境说明

开发环境部署环境操作系统Windows10Centos7.9Go版本go version go1.24.2 windows/amd64go version go1.23.6 linux/amd64插件版本Master分支Docker版本Docker version 26.1.4, build 5650f9bk8s版本v1.28.0 (minikube)补充说明:
k8s环境是由minikube创建,CRI为docker,如果CRI为Containerd,也不影响,后面会说明如何部署。
二、开发

1.png

本次开发是在scheduler-plugins源码基础上进行开发。
通过上图可以看到,Filter和Score是两个核心,一般开发也是围绕着Filter和Score。
首先需要把scheduler-plugins的源码下载到本地,直接使用git进行拉取即可。
  1. git clone https://github.com/kubernetes-sigs/scheduler-plugins.git
复制代码
当然如果对版本有特定要求,请根据官方提供的readme进行分支切换。
插件的代码都放在pkg目录下, 现在需要自定义一个插件,当然也是在pkg目录下进行开发。
pkg目录下创建一个新的目录,比如叫prefernode,在prefernode目录下创建创建prefernode.go文件。
接下来就可以在prefernode.go里编写自定义调度器的核心逻辑了。
假如现在想让所有使用自定义调度器的pod都调度到指定的某个节点上,这里直接实现Score。
  1. package prefernode1
  2. import (
  3.         "context"
  4.         v1 "k8s.io/api/core/v1"
  5.         "k8s.io/kubernetes/pkg/scheduler/framework"
  6.         "k8s.io/apimachinery/pkg/runtime"
  7.         "k8s.io/klog/v2"
  8. )
  9. const Name = "PreferNode"
  10. type PreferNode struct {
  11.         handle framework.Handle
  12. }
  13. func (p *PreferNode) Name() string {
  14.         return Name
  15. }
  16. func (p *PreferNode) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  17.         klog.V(5).Infof("Scoring pod %s on node %s", pod.Name, nodeName)
  18.         if nodeName == "minikube-m03" {
  19.                 return 100, nil
  20.         }
  21.         return 0, nil
  22. }
  23. func (p *PreferNode) ScoreExtensions() framework.ScoreExtensions {
  24.         return nil
  25. }
  26. func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
  27.         return &PreferNode{}, nil
  28. }
复制代码
以上代码,已经实现了具体需求,将所有使用我们自定义插件的Pod都调度到某个节点上。这里指定的是"minikube-m03"。
插件核心代码写好了,还需要进行注册,让框架知道我们是现在自定义插件。
返回项目根目录,进入到cmd/scheduler,编辑main.go,在command中进行注册。
  1. func main() {
  2.         // Register prefernode1 plugins to the scheduler framework.
  3.         // Later they can consist of scheduler profile(s) and hence
  4.         // used by various kinds of workloads.
  5.         command := app.NewSchedulerCommand(
  6.                 app.WithPlugin(capacityscheduling.Name, capacityscheduling.New),
  7.                 app.WithPlugin(coscheduling.Name, coscheduling.New),
  8.                 app.WithPlugin(loadvariationriskbalancing.Name, loadvariationriskbalancing.New),
  9.                 app.WithPlugin(networkoverhead.Name, networkoverhead.New),
  10.                 app.WithPlugin(topologicalsort.Name, topologicalsort.New),
  11.                 app.WithPlugin(noderesources.AllocatableName, noderesources.NewAllocatable),
  12.                 app.WithPlugin(noderesourcetopology.Name, noderesourcetopology.New),
  13.                 app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New),
  14.                 app.WithPlugin(targetloadpacking.Name, targetloadpacking.New),
  15.                 app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New),
  16.                 app.WithPlugin(sysched.Name, sysched.New),
  17.                 app.WithPlugin(peaks.Name, peaks.New),
  18.                 // Sample plugins below.
  19.                 // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
  20.                 app.WithPlugin(podstate.Name, podstate.New),
  21.                 app.WithPlugin(qos.Name, qos.New),
  22.         // 这是我们自定义的插件
  23.                 app.WithPlugin(prefernode.Name, prefernode.New),
  24.         )
  25.         code := cli.Run(command)
  26.         os.Exit(code)
  27. }
复制代码
到此,开发完成。
如果你觉得上面的实现比较简陋,当然了,这里也提供一个同时实现Filter和Score的插件。
  1. package prefernode
  2. import (
  3.         "k8s.io/kubernetes/pkg/scheduler/framework"
  4.         "context"
  5.         "k8s.io/api/core/v1"
  6.         "k8s.io/klog/v2"
  7.         "fmt"
  8.         "sort"
  9.         "k8s.io/apimachinery/pkg/runtime"
  10. )
  11. const Name = "PreferNode"
  12. type PreferNode struct {
  13.         handler framework.Handle
  14. }
  15. func (p *PreferNode) Name() string {
  16.         return Name
  17. }
  18. // Filter 实现预选逻辑
  19. func (p *PreferNode) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
  20.         if nodeInfo == nil || nodeInfo.Node() == nil {
  21.                 klog.Error("@@@ node not found @@@")
  22.                 return framework.NewStatus(framework.Error, "node not found")
  23.         }
  24.         node := nodeInfo.Node()
  25.         klog.V(4).Infof("prefernode filter pod %s/%s:%s", pod.Namespace, pod.Name, node.Name)
  26.         // 检查节点是否可调度
  27.         if node.Spec.Unschedulable {
  28.                 klog.V(4).Infof("Node %s is unschedulable", node.Name)
  29.                 return framework.NewStatus(framework.Unschedulable, "node is unschedulable")
  30.         }
  31.         // 检查节点是否有足够的资源
  32.         podRequest := calculatePodResourceRequest(pod)
  33.         nodeAllocatable := node.Status.Allocatable
  34.         cpuAvailable := nodeAllocatable.Cpu().MilliValue()
  35.         memAvailable := nodeAllocatable.Memory().MilliValue()
  36.         if cpuAvailable < podRequest.cpu {
  37.                 klog.V(4).Infof("Node %s doesn't have enough CPU: required %d, available: %d", node.Name, podRequest.cpu, cpuAvailable)
  38.                 return framework.NewStatus(framework.Unschedulable, "Insufficient CPU")
  39.         }
  40.         if memAvailable < podRequest.memory {
  41.                 klog.V(4).Infof("Node %s doesn't have enough Memory: required %d, available: %d", node.Name, podRequest.memory, memAvailable)
  42.                 return framework.NewStatus(framework.Unschedulable, "Insufficient Memory")
  43.         }
  44.         // 检查节点标签是否匹配
  45.         if pod.Spec.NodeSelector != nil {
  46.                 for key, value := range pod.Spec.NodeSelector {
  47.                         nodeValue, exists := node.Labels[key]
  48.                         if !exists || nodeValue != value {
  49.                                 klog.V(4).Infof("Node %s does not have label %s=%s", node.Name, key, value)
  50.                                 return framework.NewStatus(framework.Unschedulable, "Insufficient Label")
  51.                         }
  52.                 }
  53.         }
  54.         klog.V(4).Infof("Node %s passed all filters for pod %s/%s", node.Name, pod.Namespace, pod.Name)
  55.         return framework.NewStatus(framework.Success, "")
  56. }
  57. func (p *PreferNode) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  58.         klog.V(4).Infof("Scoring pod %s/%s on node %s", pod.Namespace, pod.Name, nodeName)
  59.         nodeInfo, err := p.handler.SnapshotSharedLister().NodeInfos().Get(nodeName)
  60.         if err != nil {
  61.                 klog.Errorf("Error getting node %s from snapshot: %v", nodeName, err)
  62.                 return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %s from snapshot: %v", nodeName, err))
  63.         }
  64.         node := nodeInfo.Node()
  65.         // 基础分 - 考虑节点负载和可用资源
  66.         score := int64(0)
  67.         // 1、计算CPU得分 - 优先选择CPU资源充足的节点
  68.         cpuCapacity := node.Status.Capacity.Cpu().MilliValue()
  69.         cpuAllocatable := node.Status.Allocatable.Cpu().MilliValue()
  70.         cpuUsed := cpuCapacity - cpuAllocatable
  71.         // 计算cpu使用率
  72.         var cpuUtilization float64
  73.         if cpuCapacity > 0 {
  74.                 cpuUtilization = float64(cpuUsed) / float64(cpuCapacity)
  75.         }
  76.         // CPU得分,使用率越低得分越高,最高40分
  77.         cpuScore := int64((1 - cpuUtilization) * 40)
  78.         // 2、计算内存得分 - 优先选择内存资源充足的节点
  79.         memCapacity := node.Status.Capacity.Memory().Value()
  80.         memAllocatable := node.Status.Allocatable.Memory().Value()
  81.         memUsed := memCapacity - memAllocatable
  82.         // 计算内存使用率
  83.         var memUtilization float64
  84.         if memCapacity > 0 {
  85.                 memUtilization = float64(memUsed) / float64(memCapacity)
  86.         }
  87.         // 内存得分, 使用率越低得分越高,最高40分
  88.         memScore := int64((1 - memUtilization) * 40)
  89.         // 2、节点标签偏好得分
  90.         labelScore := int64(0)
  91.         // 检查是否有特定角色标签
  92.         if value, exits := node.Labels["kubernetes.io/role"]; exits && value == "worker" {
  93.                 labelScore += 10
  94.         }
  95.         if nodeName == "minikube-m03" {
  96.                 labelScore += 10
  97.         }
  98.         // 计算总分
  99.         score = cpuScore + memScore + labelScore
  100.         klog.V(3).Infof("Score for pod %s/%s on node %s: %d (CPU: %d, Memory: %d, Labels: %d)",
  101.                 pod.Namespace, pod.Name, nodeName, score, cpuScore, memScore, labelScore)
  102.         return score, nil
  103. }
  104. // ScoreExtensions 返回扩展接口
  105. func (p *PreferNode) ScoreExtensions() framework.ScoreExtensions {
  106.         return p
  107. }
  108. // NormalizeScore 实现分数归一化
  109. func (p *PreferNode) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
  110.         // 找出最高分和最低分
  111.         var highest int64
  112.         var lowest = framework.MaxNodeScore
  113.         for _, nodeScore := range scores {
  114.                 if nodeScore.Score > highest {
  115.                         highest = nodeScore.Score
  116.                 }
  117.                 if nodeScore.Score < lowest {
  118.                         lowest = nodeScore.Score
  119.                 }
  120.         }
  121.         klog.V(4).Infof("Score range for pod %s/%s: [%d, %d]", pod.Namespace, pod.Name, lowest, highest)
  122.         // 如果所有节点得分相同,则不需要归一化
  123.         if highest == lowest{
  124.                 klog.V(4).Infof("No need to normalize scores as all nodes have the same score")
  125.                 return nil
  126.         }
  127.         // 归一化分数到0-100范围
  128.         for i := range scores{
  129.                 scores[i].Score = framework.MaxNodeScore * (scores[i].Score - lowest) / (highest - lowest)
  130.                 klog.V(4).Infof("Normalized score for node %s:%d",scores[i].Name,scores[i].Score)
  131.         }
  132.         // 按分数排序,记录结果
  133.         sortedScores := make(framework.NodeScoreList, len(scores))
  134.         copy(sortedScores, scores)
  135.         sort.Slice(sortedScores, func(i,j int) bool {
  136.                 return sortedScores[i].Score > sortedScores[j].Score
  137.         })
  138.         klog.V(3).Infof("Final scores for pod %s/%s",pod.Namespace,pod.Name)
  139.         for i, nodeScroe := range sortedScores {
  140.                 klog.V(5).Infof("@@@ %d. Node %s: %d",i+1, nodeScroe.Name,nodeScroe.Score)
  141.         }
  142.         return nil
  143. }
  144. // 资源请求结构体
  145. type resourceRequest struct {
  146.         cpu    int64
  147.         memory int64
  148. }
  149. // 计算Pod资源请求
  150. func calculatePodResourceRequest(pod *v1.Pod) resourceRequest {
  151.         result := resourceRequest{}
  152.         for _, container := range pod.Spec.Containers {
  153.                 if container.Resources.Requests != nil {
  154.                         result.cpu += container.Resources.Requests.Cpu().MilliValue()
  155.                         result.memory += container.Resources.Requests.Memory().Value()
  156.                 }
  157.         }
  158.         // 如果没有明确指定资源请求,使用默认值
  159.         if result.cpu == 0 {
  160.                 result.cpu = 100 // 默认100m CPU
  161.         }
  162.         if result.memory == 0 {
  163.                 result.memory = 256 * 1024 * 1024 // 默认256Mi
  164.         }
  165.         return result
  166. }
  167. // New 创建一个新的PreferNode插件实例
  168. func New(_ context.Context, _ runtime.Object, h framework.Handle) (framework.Plugin, error) {
  169.         return &PreferNode{
  170.                 handler: h,
  171.         }, nil
  172. }
复制代码
三、部署

开发完成后,在编译环境中进行编译。
进入到scheduler-plugins目录下,直接运行make。
  1. # make
  2. go build -ldflags '-X k8s.io/component-base/version.gitVersion=v0.32.5 -w' -o bin/controller cmd/controller/controller.go
  3. go build -ldflags '-X k8s.io/component-base/version.gitVersion=v0.32.5 -w' -o bin/kube-scheduler cmd/scheduler/main.go
复制代码
可以看到编译好的文件放到了同级的bin/目录下,我们需要使用的是kube-scheduler。
现在需要将我们的插件编译成Docker镜像。
  1. FROM debian:bullseye-slim
  2. COPY bin/kube-scheduler /usr/local/bin/kube-scheduler
  3. RUN chmod +x /usr/local/bin/kube-scheduler
  4. ENTRYPOINT ["/usr/local/bin/kube-scheduler"]
复制代码
执行命令进行编译,假如镜像名就叫custom-scheduler:v1.0
  1. docker build -t custom-scheduler:v1.0 .
复制代码
注意,这块需要补充一下,如果集群的容器使用containerd,则需要将docker镜像能让contained使用。
可以直接使用docker将镜像打包成tar,然后使用ctr解包。需要格外注意的是ctr需要指定-n命名空间,不然k8s识别不到。
  1. docker save -o image.tar custom-scheduler:v1.0
  2. ctr -n=k8s.io -images import image.tar
复制代码
或者使用私有仓库。
镜像准备就绪后,就可以进行下一步操作了。将进行部署到k8s集群中。
这里不得不在提k8s环境了,我的环境是minikube起的,并且多节点,所以需要将镜像导入到minikube中,如果你使用的是kind,也需要进行类似的操作。
  1. minikube image load custom-scheduler:v1.0
复制代码
加载完成后,可以使用minikube image ls检查一下。
接下来需要创建configmap,先创建scheduler-config.yaml。注意:如果使用简陋版的,则不需要配置filter。
  1. apiVersion: kubescheduler.config.k8s.io/v1
  2. kind: KubeSchedulerConfiguration
  3. clientConnection:
  4.   kubeconfig: "/etc/kubernetes/kubeconfig"
  5. leaderElection:
  6.   leaderElect: false
  7. profiles:
  8.   - schedulerName: custom-scheduler
  9.     plugins:
  10.       filter:
  11.         enabled:
  12.           - name: PreferNode
  13.       score:
  14.         enabled:
  15.           - name: PreferNode   
复制代码
同时需要准备kubeconfig文件,这个文件可以在.kube下找到。同样为了方便,生成到当前目录下。
  1. kubectl config view --flatten --minify > scheduler.kubeconfig
复制代码
现在就可以创建configMap和Secret了。(其实完全可以创建两个configMap)
创建configMap和secret。
  1. kubectl create configmap scheduler-config \
  2.   --from-file=scheduler-config.yaml=scheduler-config.yaml \
  3.   -n kube-system
  4. kubectl create secret generic scheduler-kubeconfig \
  5.   --from-file=kubeconfig=scheduler.kubeconfig \
  6.   -n kube-system
复制代码
RBAC准入这块也需要进行设置。
  1. apiVersion: v1
  2. kind: ServiceAccount
  3. metadata:
  4.   name: custom-scheduler
  5.   namespace: kube-system
  6. ---
  7. apiVersion: rbac.authorization.k8s.io/v1
  8. kind: ClusterRoleBinding
  9. metadata:
  10.   name: custom-scheduler-rolebinding
  11. roleRef:
  12.   apiGroup: rbac.authorization.k8s.io
  13.   kind: ClusterRole
  14.   name: system:kube-scheduler
  15. subjects:
  16.   - kind: ServiceAccount
  17.     name: custom-scheduler
  18.     namespace: kube-system
复制代码
到此为止,部署的准备工作基本完成了,接下来就是部署自定义调度器了。
  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4.   name: custom-scheduler
  5.   namespace: kube-system
  6. spec:
  7.   replicas: 1
  8.   selector:
  9.     matchLabels:
  10.       component: custom-scheduler
  11.   template:
  12.     metadata:
  13.       labels:
  14.         component: custom-scheduler
  15.     spec:
  16.       serviceAccountName: custom-scheduler
  17.       containers:
  18.         - name: custom-scheduler
  19.           image: docker.io/library/custom-scheduler:v1.0
  20.           args:
  21.             - --config=/etc/kubernetes/scheduler-config.yaml
  22.             - --v=5
  23.           volumeMounts:
  24.             - name: scheduler-config
  25.               mountPath: /etc/kubernetes/scheduler-config.yaml
  26.               subPath: scheduler-config.yaml
  27.             - name: scheduler-kubeconfig
  28.               mountPath: /etc/kubernetes/kubeconfig
  29.               subPath: kubeconfig
  30.       volumes:
  31.         - name: scheduler-config
  32.           configMap:
  33.             name: scheduler-config
  34.         - name: scheduler-kubeconfig
  35.           secret:
  36.             secretName: scheduler-kubeconfig
复制代码
部署完成后,查看pod的运行状况。
四、测试

这里提交一个简单的pod进行测试
  1. apiVersion: v1
  2. kind: Pod
  3. metadata:
  4.   name: test-pod
  5. spec:
  6.   schedulerName: custom-scheduler
  7.   containers:
  8.   - name: nginx
  9.     image: nginx:1.17.1
复制代码
查看,可以发现pod被调度到m03节点上了。
  1. kubectl get pod -o wide
  2. NAME       READY   STATUS    RESTARTS   AGE   IP             NODE           NOMINATED NODE   READINESS GATES
  3. test-pod   1/1     Running   0          37m   10.96.151.14   minikube-m03   <none>           <none>
复制代码
同样可以查看下自定义调度器的日志。可以看到03节点得了100分。
  1. ...
  2. I0607 08:46:21.638574       1 prefernode.go:31] prefernode filter pod default/test-pod:minikube
  3. I0607 08:46:21.638587       1 prefernode.go:67] Node minikube passed all filters for pod default/test-pod
  4. I0607 08:46:21.638597       1 prefernode.go:31] prefernode filter pod default/test-pod:minikube-m02
  5. I0607 08:46:21.638603       1 prefernode.go:67] Node minikube-m02 passed all filters for pod default/test-pod
  6. I0607 08:46:21.638610       1 prefernode.go:31] prefernode filter pod default/test-pod:minikube-m03
  7. I0607 08:46:21.638614       1 prefernode.go:67] Node minikube-m03 passed all filters for pod default/test-pod
  8. I0607 08:46:21.638759       1 prefernode.go:72] Scoring pod default/test-pod on node minikube
  9. I0607 08:46:21.638770       1 prefernode.go:128] Score for pod default/test-pod on node minikube: 80 (CPU: 40, Memory: 40, Labels: 0)
  10. I0607 08:46:21.638782       1 prefernode.go:72] Scoring pod default/test-pod on node minikube-m02
  11. I0607 08:46:21.638787       1 prefernode.go:128] Score for pod default/test-pod on node minikube-m02: 80 (CPU: 40, Memory: 40, Labels: 0)
  12. I0607 08:46:21.638797       1 prefernode.go:72] Scoring pod default/test-pod on node minikube-m03
  13. I0607 08:46:21.638808       1 prefernode.go:128] Score for pod default/test-pod on node minikube-m03: 90 (CPU: 40, Memory: 40, Labels: 10)
  14. I0607 08:46:21.638838       1 prefernode.go:154] Score range for pod default/test-pod: [80, 90]
  15. I0607 08:46:21.638844       1 prefernode.go:165] Normalized score for node minikube:0
  16. I0607 08:46:21.638847       1 prefernode.go:165] Normalized score for node minikube-m02:0
  17. I0607 08:46:21.638850       1 prefernode.go:165] Normalized score for node minikube-m03:100
  18. I0607 08:46:21.638857       1 prefernode.go:175] Final scores for pod default/test-pod
  19. I0607 08:46:21.638861       1 prefernode.go:177] @@@ 1. Node minikube-m03: 100
  20. I0607 08:46:21.638864       1 prefernode.go:177] @@@ 2. Node minikube: 0
  21. I0607 08:46:21.638867       1 prefernode.go:177] @@@ 3. Node minikube-m02: 0
  22. ...
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册