共计 4349 个字符,预计需要花费 11 分钟才能阅读完成。
Kubernetes 自定义调度器示例代码
1. 多调度器方式
这种方式我们需要创建一个完整的调度器程序。
// main.go
package main
import (
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func main() {
// 创建 kubernetes 配置
config, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
// 创建 kubernetes 客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 创建调度器
sched := scheduler.New(
clientset,
"custom-scheduler", // 自定义调度器名称
"custom-scheduler",
scheduler.NewDefaultAlgorithmSource(),
30*time.Second,
)
// 运行调度器
sched.Run()
}
部署配置:
apiVersion: apps/v1
kind: Deployment
metadata:
name: custom-scheduler
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: custom-scheduler
template:
metadata:
labels:
app: custom-scheduler
spec:
containers:
- name: custom-scheduler
image: custom-scheduler:v1
args:
- --leader-elect=false
2. 调度器扩展程序(Scheduler Extender)
这种方式需要实现一个 HTTP 服务来处理调度请求。
// main.go
package main
import (
"encoding/json"
"net/http"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
)
// 过滤函数
func filter(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
pod := args.Pod
nodes := args.Nodes.Items
filteredNodes := make([]v1.Node, 0)
for _, node := range nodes {
if fits(pod, node) {
filteredNodes = append(filteredNodes, node)
}
}
return &schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{
Items: filteredNodes,
},
}
}
// 打分函数
func prioritize(args schedulerapi.ExtenderArgs) *schedulerapi.HostPriorityList {
nodes := args.Nodes.Items
hostPriorityList := make(schedulerapi.HostPriorityList, len(nodes))
for i, node := range nodes {
hostPriorityList[i] = schedulerapi.HostPriority{
Host: node.Name,
Score: score(args.Pod, node), // 自定义打分逻辑
}
}
return &hostPriorityList
}
func main() {
http.HandleFunc("/filter", func(w http.ResponseWriter, r *http.Request) {
var args schedulerapi.ExtenderArgs
json.NewDecoder(r.Body).Decode(&args)
result := filter(args)
json.NewEncoder(w).Encode(result)
})
http.HandleFunc("/prioritize", func(w http.ResponseWriter, r *http.Request) {
var args schedulerapi.ExtenderArgs
json.NewDecoder(r.Body).Decode(&args)
result := prioritize(args)
json.NewEncoder(w).Encode(result)
})
http.ListenAndServe(":8888", nil)
}
配置文件:
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
extenders:
- urlPrefix: "http://localhost:8888"
filterVerb: "filter"
prioritizeVerb: "prioritize"
weight: 1
enableHttps: false
3. 调度框架(Scheduler Framework)
这是最新推荐的方式,需要实现相应的插件接口。
// plugin.go
package main
import (
"k8s.io/kubernetes/pkg/scheduler/framework"
v1 "k8s.io/api/core/v1"
)
// 插件名称
const Name = "sample-plugin"
type SamplePlugin struct {
handle framework.FrameworkHandle
}
// 实现 Plugin 接口
func (sp *SamplePlugin) Name() string {
return Name
}
// PreFilter 扩展点
func (sp *SamplePlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
// 在调度之前进行预处理
return framework.NewStatus(framework.Success, "")
}
// Filter 扩展点
func (sp *SamplePlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// 判断 pod 是否可以调度到该节点
if customFitsPred(pod, nodeInfo) {
return framework.NewStatus(framework.Success, "")
}
return framework.NewStatus(framework.Unschedulable, "node not fit")
}
// Score 扩展点
func (sp *SamplePlugin) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
// 对节点进行打分
score := customScore(pod, nodeName)
return score, framework.NewStatus(framework.Success, "")
}
// 插件工厂函数
func New(_ runtime.Object, h framework.FrameworkHandle) (framework.Plugin, error) {
return &SamplePlugin{handle: h}, nil
}
注册和使用插件:
// main.go
package main
import (
"k8s.io/kubernetes/cmd/kube-scheduler/app"
)
func main() {
command := app.NewSchedulerCommand(
app.WithPlugin(Name, New),
)
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
配置文件:
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
plugins:
preFilter:
enabled:
- name: "sample-plugin"
filter:
enabled:
- name: "sample-plugin"
score:
enabled:
- name: "sample-plugin"
weight: 1
部署配置:
apiVersion: apps/v1
kind: Deployment
metadata:
name: custom-scheduler
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: custom-scheduler
template:
metadata:
labels:
app: custom-scheduler
spec:
containers:
- name: scheduler
image: custom-scheduler:v1
args:
- --config=/etc/kubernetes/scheduler-config.yaml
volumeMounts:
- name: config
mountPath: /etc/kubernetes
volumes:
- name: config
configMap:
name: scheduler-config
使用自定义调度器:
apiVersion: v1
kind: Pod
metadata:
name: nginx
spec:
schedulerName: custom-scheduler # 指定使用自定义调度器
containers:
- name: nginx
image: nginx:1.14.2
可运行 demo 学习例子
正文完