Kubernetes 自定义调度器示例代码

共计 4349 个字符,预计需要花费 11 分钟才能阅读完成。

Kubernetes 自定义调度器示例代码

1. 多调度器方式

这种方式我们需要创建一个完整的调度器程序。

// main.go
package main

import (
    "k8s.io/kubernetes/pkg/scheduler"
    "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
)

func main() {
    // 创建 kubernetes 配置
    config, err := rest.InClusterConfig()
    if err != nil {
        panic(err)
    }

    // 创建 kubernetes 客户端
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // 创建调度器
    sched := scheduler.New(
        clientset,
        "custom-scheduler", // 自定义调度器名称
        "custom-scheduler",
        scheduler.NewDefaultAlgorithmSource(),
        30*time.Second,
    )

    // 运行调度器
    sched.Run()
}

部署配置:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: custom-scheduler
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: custom-scheduler
  template:
    metadata:
      labels:
        app: custom-scheduler
    spec:
      containers:
      - name: custom-scheduler
        image: custom-scheduler:v1
        args:
        - --leader-elect=false

2. 调度器扩展程序(Scheduler Extender)

这种方式需要实现一个 HTTP 服务来处理调度请求。

// main.go
package main

import (
    "encoding/json"
    "net/http"

    schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
)

// 过滤函数
func filter(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
    pod := args.Pod
    nodes := args.Nodes.Items
    filteredNodes := make([]v1.Node, 0)

    for _, node := range nodes {
        if fits(pod, node) {
            filteredNodes = append(filteredNodes, node)
        }
    }

    return &schedulerapi.ExtenderFilterResult{
        Nodes: &v1.NodeList{
            Items: filteredNodes,
        },
    }
}

// 打分函数
func prioritize(args schedulerapi.ExtenderArgs) *schedulerapi.HostPriorityList {
    nodes := args.Nodes.Items
    hostPriorityList := make(schedulerapi.HostPriorityList, len(nodes))

    for i, node := range nodes {
        hostPriorityList[i] = schedulerapi.HostPriority{
            Host:  node.Name,
            Score: score(args.Pod, node), // 自定义打分逻辑
        }
    }

    return &hostPriorityList
}

func main() {
    http.HandleFunc("/filter", func(w http.ResponseWriter, r *http.Request) {
        var args schedulerapi.ExtenderArgs
        json.NewDecoder(r.Body).Decode(&args)

        result := filter(args)
        json.NewEncoder(w).Encode(result)
    })

    http.HandleFunc("/prioritize", func(w http.ResponseWriter, r *http.Request) {
        var args schedulerapi.ExtenderArgs
        json.NewDecoder(r.Body).Decode(&args)

        result := prioritize(args)
        json.NewEncoder(w).Encode(result)
    })

    http.ListenAndServe(":8888", nil)
}

配置文件:

apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
extenders:
- urlPrefix: "http://localhost:8888"
  filterVerb: "filter"
  prioritizeVerb: "prioritize"
  weight: 1
  enableHttps: false

3. 调度框架(Scheduler Framework)

这是最新推荐的方式,需要实现相应的插件接口。

// plugin.go
package main

import (
    "k8s.io/kubernetes/pkg/scheduler/framework"
    v1 "k8s.io/api/core/v1"
)

// 插件名称
const Name = "sample-plugin"

type SamplePlugin struct {
    handle framework.FrameworkHandle
}

// 实现 Plugin 接口
func (sp *SamplePlugin) Name() string { 
    return Name 
}

// PreFilter 扩展点
func (sp *SamplePlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
    // 在调度之前进行预处理
    return framework.NewStatus(framework.Success, "")
}

// Filter 扩展点
func (sp *SamplePlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    // 判断 pod 是否可以调度到该节点
    if customFitsPred(pod, nodeInfo) {
        return framework.NewStatus(framework.Success, "")
    }
    return framework.NewStatus(framework.Unschedulable, "node not fit")
}

// Score 扩展点
func (sp *SamplePlugin) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
    // 对节点进行打分
    score := customScore(pod, nodeName)
    return score, framework.NewStatus(framework.Success, "")
}

// 插件工厂函数
func New(_ runtime.Object, h framework.FrameworkHandle) (framework.Plugin, error) {
    return &SamplePlugin{handle: h}, nil
}

注册和使用插件:

// main.go
package main

import (
    "k8s.io/kubernetes/cmd/kube-scheduler/app"
)

func main() {
    command := app.NewSchedulerCommand(
        app.WithPlugin(Name, New),
    )

    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

配置文件:

apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
plugins:
  preFilter:
    enabled:
    - name: "sample-plugin"
  filter:
    enabled:
    - name: "sample-plugin" 
  score:
    enabled:
    - name: "sample-plugin"
      weight: 1

部署配置:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: custom-scheduler
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: custom-scheduler
  template:
    metadata:
      labels:
        app: custom-scheduler
    spec:
      containers:
      - name: scheduler
        image: custom-scheduler:v1
        args:
        - --config=/etc/kubernetes/scheduler-config.yaml
        volumeMounts:
        - name: config
          mountPath: /etc/kubernetes
      volumes:
      - name: config
        configMap:
          name: scheduler-config

使用自定义调度器:

apiVersion: v1
kind: Pod
metadata:
  name: nginx
spec:
  schedulerName: custom-scheduler  # 指定使用自定义调度器
  containers:
  - name: nginx
    image: nginx:1.14.2

可运行 demo 学习例子

https://github.com/cleverhu/custom-scheduler-demo

正文完
 0
评论(没有评论)