《2》动手写一个operator

说明

我们昨天已经明确项目以及创建好基础代码框架了并运行起来了。今天我们主要探讨下这个项目想要做成什么样,以及一些crd的设计。

介绍

首先我们熟悉下代码目录,我这边只讲你会用到的,其他的我不一定能讲的明白。
png2
图中1、2两个文件,是我们需要修改和主要编写的文件。

  1. 第一个文件主要是定义你的crd文件的参数,包括specstatus参数.
  2. 第二个文件主要是写你controller的逻辑,比如你监听到了合适的事件消息,如何去相应,不论是调用三方接口或者去创建你的cr资源,都是响应的一种形式。

设计

针对我们这个项目,我们是想要实现自定义cr资源能够管理我们的定时任务,并触发我们的deployment滚动更新。
所以我们的cr资源字段应该这样设计:

  1. spec,我们涉及3个字段,一个是这个cr资源绑定的deployment,另一个是定时任务表达式,最后一个是是否启动这个定时任务
  2. status,我们涉及到的字段包含状态和条件,而条件是一个array,每一条都记录着当前的状态信息。

png3

开发

  1. 首先确定我们要监听的资源

修改pkg/controller/eru/eru_controller.go文件,在这里可以看到我们监听两个资源,一个你自定义的cr资源,一个是deployment资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func add(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New("eru-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

err = c.Watch(&source.Kind{Type: &mtsrev1.Eru{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}

err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}

return nil
}
  1. 判断deployment和eru自定义资源是否存在,我们的逻辑是,如果deployment不存在了,那么我们就删除与其绑定的eru资源.

修改pkg/controller/eru/eru_controller.go文件,Reconcile这个作为主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// deployment 和 eru 实例
deployment_instance := &appsv1.Deployment{}
eru_instance := &mtsrev1.Eru{}

// deploymentExist 和 eruExist 实例
deploymentExist := r.client.Get(context.TODO(), request.NamespacedName, deployment_instance)
eruExist := r.client.Get(context.TODO(),request.NamespacedName,eru_instance)

// *****判断deployment是否存在*****
//如果不存在,那么就要一同删除绑定的eru cr资源
if deploymentExist != nil && errors.IsNotFound(deploymentExist) {
if eruExist == nil {
erulog.Warningf("this deployment : %v not found,maybe deleted,so eru cr need to deleted",request.String())
if deletestatus ,err := r.DeleteEruCr(eru_instance); deletestatus {
return reconcile.Result{},nil
} else {
return reconcile.Result{}, err
}
}
}

删除自定义cr资源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 删除eru cr资源
func (r *ReconcileEru) DeleteEruCr(eru *mtsrev1.Eru) (bool, error) {
// 删除aom 策略
// 华为云目前观测到有定时去清理aom规则,如果deployment被删除的话。
retryError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.client.Delete(context.TODO(), eru)
})
// 判断删除cr资源是否成功
if retryError != nil {
erulog.Errorf("eru delete failure, %v/%v", eru.Namespace, eru.Name)
return false, retryError
} else {
erulog.Infof("eru delete success, %v/%v", eru.Namespace, eru.Name)
return true, nil
}
}
  1. 当我们的deployment存在,我们需要去判断他是否含有对应的annotations,约定的是mtsre.meitu.com/eru: "true"
1
2
3
4
5
6
7
8
9
10
//如果存在,请继续
// 我们继续检查deployment是否有设置符合条件的annotations,我们约定是否含有`mtsre.meitu.com/eru: "true"`,就添加
deploymentAnnotation := deployment_instance.Annotations
if _,ok := deploymentAnnotation["mtsre.meitu.com/eru"]; ok {
fmt.Println("yes have")
// 判断eru资源是否存在,如果不存在,那么就创建
} else {
// 该deployment没有设置对应的annotations,所以我们不做处理,进行下一次循环
return reconcile.Result{}, nil
}
  1. 创建cr资源,我们主要做3个事情,第一创建cr资源,第二更新cr资源的annotation,第三更新cr资源的status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 创建eru cr资源
func (r *ReconcileEru) CreateEruCr(deployment *appsv1.Deployment) (bool,error) {
// 如果创建失败,那么就更新status,说失败了
eru := eru_res.NewEru(deployment)
if err := r.client.Create(context.TODO(),eru); err != nil {
erulog.Errorf("create eru cr failure,reason: %v",err)
failureCondition := mtsrev1.EruCondition{
Ready: false,
Reason: "create eru cr failure",
Message: "create eru cr failure",
LastedTranslationTime: v1.Now(),
}
if updateerror := r.updateStatus(eru,failureCondition,mtsrev1.EruFailure); updateerror != nil {
return false,updateerror
}
return false,err
}
// 如果创建cr成功,我们需要更新annotation、更新status,默认cr的定时任务是停止的,我们需要用户手动去触发是否启动,并修改对应的定时任务表达式
// 这里是更新annotation
deploymentData,enabledData,scheduleData := toString(eru)
eru.Annotations = map[string]string{
"mtsre.meitu.com/deploymentName": deploymentData,
"mtsre.meitu.com/enabled": enabledData,
"mtsre.meitu.com/schedule": scheduleData,
}
r.UpdateAnnotations(eru)
// 这里是更新status
stopCondition := mtsrev1.EruCondition{
Ready: true,
Reason: "create eru cr success,wait to start",
Message: "create eru cr success,wait to start",
LastedTranslationTime: v1.Time{},
}
if updateerror := r.updateStatus(eru,stopCondition,mtsrev1.EruStoping); updateerror != nil {
return false,updateerror
}
return true,nil
}

eru_res.NewEru这个方法很简单,初始化了一个cr对象,并给这个cr对象设置一些初始值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewEru(deployment *appsv1.Deployment) *mtsrev1.Eru {
return &mtsrev1.Eru{
TypeMeta: v1.TypeMeta{
Kind: "Eru",
APIVersion: "v1",
},
ObjectMeta: v1.ObjectMeta{
Name: deployment.Name,
Namespace: deployment.Namespace,
},
Spec: mtsrev1.EruSpec{
DeploymentName: deployment.Name,
Enabled: false,
Schedule: "0 0 * * *", // 默认每天凌晨执行一次
},
}
}

更新status和更新annotation都单独抽离了方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//更新cr的status函数
func (r *ReconcileEru) updateStatus(eru *mtsrev1.Eru, condition mtsrev1.EruCondition, phase mtsrev1.EruPhase) error {
eru.Status.Phase = phase
eru.Status.Conditions = append(eru.Status.Conditions, condition)
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.client.Status().Update(context.TODO(), eru)
})

}
// 更新cr的annotation函数,更新一般不会失败,所以不做判断处理
func (r *ReconcileEru) UpdateAnnotations(eru *mtsrev1.Eru) {
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.client.Update(context.TODO(), eru)
})
}

测试运行

因为我们修改了types.go这个文件,所以需要执行如下代码去生成代码:

1
$ operator-sdk generate k8s

最后我们就可以运行试下了:

1
operator-sdk run --local  --namespace your-namespace

可以看到输出如下:
png4

我们创建一个deployment试试,然后我们先不带annotation,看看有什么效果
测试发现,只会监听到你这个deployment的信息,由于没有任何配置,所以不会创建cr资源等。
png5

我们删除deployment,创建一个带正确annotation的deployment测试,看看效果
png6
png7
可以看到,我们首先是检测到了annotation,然后就开始创建cr资源

接下来我们看下创建的cr资源是否正常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
➜  kubectl get eru -n lb6
NAME AGE
opsnatmonitor-v1 5m
➜ kubectl get eru opsnatmonitor-v1 -n lb6 -o yaml
apiVersion: mtsre.meitu.com/v1
kind: Eru
metadata:
annotations:
mtsre.meitu.com/deploymentName: '"opsnatmonitor-v1"'
mtsre.meitu.com/enabled: "false"
mtsre.meitu.com/schedule: '"0 0 * * *"'
creationTimestamp: "2020-12-23T07:54:43Z"
generation: 1
name: opsnatmonitor-v1
namespace: lb6
resourceVersion: "147596357"
selfLink: /apis/mtsre.meitu.com/v1/namespaces/lb6/erus/opsnatmonitor-v1
uid: 1e4a28d9-44f4-11eb-89e4-fa163ebf9460
spec:
deploymentname: opsnatmonitor-v1
enabled: false
schedule: 0 0 * * *
status:
conditions:
- lastedTranslationTime: null
message: create eru cr success,wait to start
ready: true
reason: create eru cr success,wait to start
phase: Stoping

最后我们可以测试下删除deployment,会不会一起删除我们的cr资源

1
➜ kubectl delete deployment *** -n ***

png8

controller完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package eru

import (
"context"
"encoding/json"
"eru-operator/resource/eru_res"
mtsrev1 "eru-operator/pkg/apis/mtsre/v1"
erulog "eru-operator/tools/log"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"


"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

)

var log = logf.Log.WithName("controller_eru")

func Add(mgr manager.Manager) error {
return add(mgr, newReconciler(mgr))
}

func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileEru{client: mgr.GetClient(), scheme: mgr.GetScheme()}
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New("eru-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

err = c.Watch(&source.Kind{Type: &mtsrev1.Eru{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}

err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}

return nil
}

var _ reconcile.Reconciler = &ReconcileEru{}

type ReconcileEru struct {
client client.Client
scheme *runtime.Scheme
}

func (r *ReconcileEru) Reconcile(request reconcile.Request) (reconcile.Result, error) {
erulog.Infof("Reconciling Eru,%v",request.String())
// deployment 和 eru 实例
deployment_instance := &appsv1.Deployment{}
eru_instance := &mtsrev1.Eru{}

// deploymentExist 和 eruExist 实例
deploymentExist := r.client.Get(context.TODO(), request.NamespacedName, deployment_instance)
eruExist := r.client.Get(context.TODO(),request.NamespacedName,eru_instance)

// *****判断deployment是否存在*****
//如果不存在,那么就要一同删除绑定的eru cr资源
if deploymentExist != nil && errors.IsNotFound(deploymentExist) {
if eruExist == nil {
erulog.Warningf("this deployment : %v not found,maybe deleted,so eru cr need to deleted",request.String())
if deletestatus ,err := r.DeleteEruCr(eru_instance); deletestatus {
return reconcile.Result{},nil
} else {
return reconcile.Result{}, err
}
}
}

//如果存在,请继续
// 我们继续检查deployment是否有设置符合条件的annotations,我们约定是否含有`mtsre.meitu.com/eru: "true"`,就添加
deploymentAnnotation := deployment_instance.Annotations
if _,ok := deploymentAnnotation["mtsre.meitu.com/eru"]; ok {
// 判断eru资源是否存在,如果不存在,那么就创建
if deploymentAnnotation["mtsre.meitu.com/eru"] == "true" && eruExist != nil {
erulog.Infof("check annotation success,now to create cr resource with deployment : %v",request.String())
if _, createcrerror := r.CreateEruCr(deployment_instance); createcrerror != nil {
erulog.Errorf("create cr resource with deployment : %v , failure : %v",request.String(),createcrerror)
return reconcile.Result{}, createcrerror
}
erulog.Infof("create cr resource with deployment : %v success",request.String())
return reconcile.Result{},nil
} else if deploymentAnnotation["mtsre.meitu.com/eru"] == "true" && eruExist == nil {
erulog.Infof("Start checking eru for updates...")
} else {
erulog.Infof("annotations not exist or config error")
return reconcile.Result{},nil
}
} else {
// 该deployment没有设置对应的annotations,所以我们不做处理,进行下一次循环
return reconcile.Result{}, nil
}

// 这边就判断我们的cr资源是否需要更新了
return reconcile.Result{}, nil
}

// 创建eru cr资源
func (r *ReconcileEru) CreateEruCr(deployment *appsv1.Deployment) (bool,error) {
// 如果创建失败,那么就更新status,说失败了
eru := eru_res.NewEru(deployment)
if err := r.client.Create(context.TODO(),eru); err != nil {
erulog.Errorf("create eru cr failure,reason: %v",err)
failureCondition := mtsrev1.EruCondition{
Ready: false,
Reason: "create eru cr failure",
Message: "create eru cr failure",
LastedTranslationTime: v1.Now(),
}
if updateerror := r.updateStatus(eru,failureCondition,mtsrev1.EruFailure); updateerror != nil {
return false,updateerror
}
return false,err
}
// 如果创建cr成功,我们需要更新annotation、更新status,默认cr的定时任务是停止的,我们需要用户手动去触发是否启动,并修改对应的定时任务表达式
// 这里是更新annotation
deploymentData,enabledData,scheduleData := toString(eru)
eru.Annotations = map[string]string{
"mtsre.meitu.com/deploymentName": deploymentData,
"mtsre.meitu.com/enabled": enabledData,
"mtsre.meitu.com/schedule": scheduleData,
}
r.UpdateAnnotations(eru)
// 这里是更新status
stopCondition := mtsrev1.EruCondition{
Ready: true,
Reason: "create eru cr success,wait to start",
Message: "create eru cr success,wait to start",
LastedTranslationTime: v1.Time{},
}
if updateerror := r.updateStatus(eru,stopCondition,mtsrev1.EruStoping); updateerror != nil {
return false,updateerror
}
return true,nil
}

// 删除eru cr资源
func (r *ReconcileEru) DeleteEruCr(eru *mtsrev1.Eru) (bool, error) {
// 删除aom 策略
// 华为云目前观测到有定时去清理aom规则,如果deployment被删除的话。
retryError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.client.Delete(context.TODO(), eru)
})
// 判断删除cr资源是否成功
if retryError != nil {
erulog.Errorf("eru delete failure, %v/%v", eru.Namespace, eru.Name)
return false, retryError
} else {
erulog.Infof("eru delete success, %v/%v", eru.Namespace, eru.Name)
return true, nil
}
}


//更新cr的status函数
func (r *ReconcileEru) updateStatus(eru *mtsrev1.Eru, condition mtsrev1.EruCondition, phase mtsrev1.EruPhase) error {
eru.Status.Phase = phase
eru.Status.Conditions = append(eru.Status.Conditions, condition)
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.client.Status().Update(context.TODO(), eru)
})

}
// 更新cr的annotation函数,更新一般不会失败,所以不做判断处理
func (r *ReconcileEru) UpdateAnnotations(eru *mtsrev1.Eru) {
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.client.Update(context.TODO(), eru)
})
}

// 把spec转化成string
func toString(eru *mtsrev1.Eru) (string,string,string) {
deploymentData,_ := json.Marshal(eru.Spec.DeploymentName)
enabledData,_ := json.Marshal(eru.Spec.Enabled)
scheduleData,_ := json.Marshal(eru.Spec.Schedule)
return string(deploymentData),string(enabledData),string(scheduleData)
}
// 把string转化成spec

计划

下一篇我们主要实现更新cr资源的逻辑,如何让这个定时任务run起来以及修改定时任务表达式之后,怎么生效。