実践入門 Kubernetes カスタムコントローラーへの道
第1章 CRDとController
k8sにはノードが二種類ある
マスターノード
認証認可
resourceの管理
コンテナのスケジューリング
ワーカーノード
コンテナの実行
マスターノードにはいろんなコンポーネントがいる
etcd
kube-apiserver
kube-scheduler
kube-controller-manager
cloud-controller-manager
kube-dns
ワーカーノードは
kubelet
kube-proxy
Container runtime
kube-apiserver
kubectlからのリクエストを受けた際にDeploymentやServiceなどのオブジェクトの作成・更新・削除をする
リクエストを受け付けたら、情報はetcdに格納して永続化する
schedulerやcontroller-managerがresourceを操作する際も、api-serverを経由する
とにかく、データストアであるetcdにアクセスできるのは、いつだってapi-serverだけ
Control Loop(Reconciliation Loop)
controller-managerは、DeploymentやServiceなどのk8s標準resourceのObjectの管理を行う
controller-managerの中には、Deployment ControllerやService Controllerなどの複数のControllerが一つのバイナリに纏まっている
Controllerでは、Control Loopと呼ばれる処理が実行されている
Control Loopとは、あるべき姿と実際の状態を比較して、実際の状態をあるべき状態に近づけるためのロジック
Reconciliation Loopとも呼ぶらしい
処理の流れ
resourceの状態を読み込む
Desired stateに合わせて、現状の状態を変更する
現在のresourceの状態を更新する
例えば、ReplicaSet ControllerからPodが作成されるケースについて考える
処理の流れ
ReplicaSetをApplyする
ReplicaSet Controllerが、ReplicaSetが作成されたことを検知し、メインロジックの中でspec.nodeNameが空っぽのPodを作成する
Schedulerがspec.nodeNameが空であることを検知し、Scheduler Queueに追加する
kubeletが新しいPodの存在を検知するが、spec.nodeNameが空っぽなのでコンテナ起動をスキップする
SchedulerがPodをScheduler Queueから取り出して、スケジューリング可能かつSystem Resourceが十分なノードにアサインする。アサイン時に、spec.nodeNameをapi-server経由で更新する
kubeletがPodの更新イベントを検知し、spec.nodeNameが自身のノードに合致するかを比較する。一致したらコンテナを起動して、api-server経由で状態を更新する
(何らかの理由で)PodがTerminatingしたとする
kubeletがapi-server経由でPodのstatus.conditionをterminatedに更新する
ReplicaSet ControllerはPodのTerminateを検知し、api-server経由でPodを削除し、新しくPodを生成する
以下ループ
Controllerと他のMasterコンポーネントが協調して動くとはいったものの、実際にはそれぞれのコンポーネントはapi-serverをのぞいて、命令を送るような明示的な連携を行なっていない
各コントローラーがControl loopを回すことだけに集中する
カスタムコントローラーでも、このControl loopを実装することがメインになる
Kubernetesの拡張機能
CRDを使って、独自のリソースを追加できる
1. Admission Webhook
APIリクエストを変更・検証するためのWebhookを追加できる方法
2. CRD
独自のResourceを定義するAPI拡張方法
3. API Aggregation
追加のAPIを実装し、Aggregation Layerに登録することで拡張する方法
Admission Webhook
プラグイン型のWebhook
Mutation admission webhook
APIリクエストの変更
validating admission webhook
APIリクエストの検証
例えば、Pod Objectを作る際に、pod.spec.imagePullPolicyを記載しなくても、作成後のObjectのspecにはpod.spec.imagePullPolicy: Alwaysがデフォルトで設定される
これはAlwaysPullImagesというAdmission Controllerによる効果
同じように、APIリクエストにデフォルト値を設定したり、値を変更したりするような独自実装を行う場合には、Mutation Webhookで実現できる
Validatingは、APIリクエストの検証を行う
labelやspecなどのフィールドチェックを行い、チェックエラーの場合はエラーを返す
標準で備わっているAdmission Webhookと同様に、独自にAPIリクエストの変更やバリデーションチェックをカスタマイズして行いたい場合は、プラグイン用のAPIを実装し、Admission webhookとしてkube-api-serverに追加登録する
API Aggregation
kube-api-server以外にcustom api-serverを実装し拡張するための方法
Cloud Providerが提供するk8sクラスタ外部のresource(マネージドサービス)を扱うためのService CatalogやMetrics ServerなどのAPIを実現するために利用されている
CRDはAPI Serverの処理が終わった後に非同期でメインロジックであるReconcilation Loopを実行するが、API AggregationはAPI Serverの処理途中でフックするために、いくつかの制限があるCRDよりも自由度が高い処理を行える
でも難しいので、CRDによる拡張が主流らしい
CRDとCR
Custom Rseourceを定義するのがCRD
k8s v1.6からGA
CRD
code:sample-crd.yaml
apiVersion: apiextensions.k8s.io/v1beta1
//必ず必要
kind: CustomResourceDefinition
metadata:
name: samples.stable.example.com
spec:
group: stable.example.com
versions:
- name: v1alpha
served: true
storage: true
scope: Namespaced
names:
kind: Sample
plural: samples
singular: sample
kubectl explain
API Resourceのフィールドを調べるコマンド
kubectl explain resourcenameやkubectl explain resoucename.specなどで各フィールドの説明や必須項目の有無を確認できる
code:bash
$ k explain deployment.spec
KIND: Deployment
VERSION: apps/v1
RESOURCE: spec <Object>
DESCRIPTION:
Specification of the desired behavior of the Deployment.
DeploymentSpec is the specification of the desired behavior of the
Deployment.
FIELDS:
minReadySeconds <integer>
Minimum number of seconds for which a newly created pod should be ready
without any of its container crashing, for it to be considered available.
Defaults to 0 (pod will be considered available as soon as it is ready)
paused <boolean>
Indicates that the deployment is paused.
progressDeadlineSeconds <integer>
The maximum time in seconds for a deployment to make progress before it is
considered to be failed. The deployment controller will continue to process
failed deployments and a condition with a ProgressDeadlineExceeded reason
will be surfaced in the deployment status. Note that progress will not be
estimated during the time a deployment is paused. Defaults to 600s.
replicas <integer>
Number of desired pods. This is a pointer to distinguish between explicit
zero and not specified. Defaults to 1.
revisionHistoryLimit <integer>
The number of old ReplicaSets to retain to allow rollback. This is a
pointer to distinguish between explicit zero and not specified. Defaults to
10.
selector <Object> -required-
Label selector for pods. Existing ReplicaSets whose pods are selected by
this will be the ones affected by this deployment. It must match the pod
template's labels.
strategy <Object>
The deployment strategy to use to replace existing pods with new ones.
template <Object> -required-
Template describes the pods that will be created.
CRD specの各項目について
spec.group
作成するCRが所属するAPI groupを指定
spec.versions.served
REST API経由を介して提供されるかどうかのフラグ
spec.versions.storage
storage versionとして登録するかのフラグ
storage versionはetcdに保存される
複数バージョン(v1aplha, v1beta, v1など)が存在する場合、一つのバージョンだけ選択されている必要がある
spec.scope
作成するresourceが、クラスタ全体に適用されるか、namespace内にだけ適用されるかの指定
spec.names.kind
resourceのkindを指定する
キャメルケースで
spec.names.plural
resourceの名前の複数形を指定
spec.names.singular
単数系の名前
基本的にはkindと同じ
全て小文字
spec.names.shortNames
短縮系の名前
特に重要になるのが、spec.versionsとspec.scope
CR
CRDで定義した後に、通常のResourceと同様にYAMLで記述してapplyすると作れる
code:example-cr.yaml
apiVersion: "stable.example.com/v1alpha"
kind: Sample
metadata:
name: my-cr-sample-object
spec:
image: my-cr-image
message: "Hello Custom Resource!"
作ったらこんな感じで取れる
code:bash
$ k get sample -o yaml (docker-desktop/default)
apiVersion: v1
items:
- apiVersion: stable.example.com/v1alpha
kind: Sample
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"stable.example.com/v1alpha","kind":"Sample","metadata":{"annotations":{},"name":"my-cr-sample-object","namespace":"default"},"spec":{"image":"my-cr-image","message":"Hello Custom Resource!"}}
creationTimestamp: "2021-06-24T16:35:21Z"
generation: 1
name: my-cr-sample-object
namespace: default
resourceVersion: "1490495"
selfLink: /apis/stable.example.com/v1alpha/namespaces/default/samples/my-cr-sample-object
uid: ea5711ca-52e7-4416-a424-79dc62e549bc
spec:
image: my-cr-image
message: Hello Custom Resource!
kind: List
metadata:
resourceVersion: ""
selfLink: ""
ただし、これだけでは何も起こらない
実際にこれを使って何かするには、Control loopを実行するためのカスタムコントローラーが必要となる
CRDの応用機能
validation
CRのspecのフィールドをバリデーションチェックする機能
CRDに書くことで有効化
しかしまあ、yamlで書くには辛そうosamtimizer.icon
apiVersionがapiextensions.k8s.ip/v1でないと使えない
code:sample-crd.validation-ga.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: samples.stable.example.com
spec:
group: stable.example.com
scope: Namespaced
names:
kind: Sample
plural: samples
singular: sample
versions:
- name: v1alpha
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
image:
type: string
message:
type: string
// バージョンによってはmapではなくarrで指定しないといけない模様
code:bash
$ k apply -f cr-validation.yaml
The Sample "my-cr-sample-object" is invalid:
* spec.image: Invalid value: "integer": spec.image in body must be of type string: "integer"
* message: Required value
validationは必ず設定した方が良い
というか、必須になったらしい
しかし、この辺はKubebuilderを使えばいい感じに生成されるので、詳しい書き方は気にしなくてもOK
Additional Printer Columns
kubectlでobjectを取得したときに、任意のフィールドを表示させる機能
code:bash
❯ k get sp (docker-desktop/default)
NAME AGE
my-cr-sample-object 59m
NAME, AGE以外を出したいときに使う
code:crd-printcolumn-ga.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: samples.stable.example.com
spec:
group: stable.example.com
scope: Namespaced
names:
kind: Sample
plural: samples
singular: sample
versions:
- name: v1alpha
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
image:
type: string
message:
type: string
additionalPrinterColumns:
- name: message
type: string
description: message content which want to show
jsonPath: .spec.message
- name: AGE
type: date
jsonPath: .metadata.creationTimestamp
code:bash
❯ k get sp (docker-desktop/default)
NAME MESSAGE AGE
my-cr-sample-object Hello Custom Resource! 61m
SubResource
resourceに特殊なエンドポイントを実装する方法
Podには通常、/api/v1/namespace/<namespace>/pods/<name>というHTTPパスでアクセスする
そこにSubResourceとして/logsや/portforward, /statusなどが加わることで、次のエンドポイントが機能する
/api/v1/namespace/<namespace>/pods/<name>/logs
/api/v1/namespace/<namespace>/pods/<name>/lportforward
/api/v1/namespace/<namespace>/pods/<name>/status
CRにもSubResourceを追加できる
CRに追加できるのは、status scaleの二つだけ
これ以外を追加したければ、API Aggregationで拡張APIを実装する必要がある
Status SubResource
/statusを使うためのSubResource
API Resourceに/statusのエンドポイントを用意することで、statusフィールドだけを更新し、他のフィールドには影響を与えないようにできる
ユーザーがSpecを定義・更新し、ControllerがStatusを更新する、といった責務の分割ができる
有効にするには、CRDにsubresources.statusフィールドを追加する
code:crd-status-ga.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: samples.stable.example.com
spec:
group: stable.example.com
scope: Namespaced
names:
kind: Sample
plural: samples
singular: sample
versions:
- name: v1alpha
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
image:
type: string
message:
type: string
subresources:
status: {}
scale:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
labelSelectorPath: .status.labelSelector
Scale SubResources
/scaleを使う
これを付けると、Deploymentのようにresourceのreplicasを増減できる
例自体は上記のyamlにある
これを有効化すると、CRにspec.replicasが必須になる
code:cr.yaml
apiVersion: "stable.example.com/v1alpha"
kind: Sample
metadata:
name: my-cr-sample-object
spec:
image: my-cr-image
message: "hoge"
hoge: "fuga"
replicas: 10
code:bash
k describe sample|grep Replicas (docker-desktop/default)
Replicas: 10
直接変更もできる(宣言的じゃないけど
code:bash
k scale sample/my-cr-sample-object --replicas 5 (docker-desktop/default)
sample.stable.example.com/my-cr-sample-object scaled
Structural Schema
構造化されたCRDフォーマットのこと
OpenAPI v3.0 shcemaのルールに則ったフォーマット
v1.16以降では、このフォーマットでCRDを書かないといけない
Pruning
CRのマニフェストに定義していないSpecフィールドが設定されていたら、それを破棄する機能
Structual Schemaになっていることが前提条件
Defaulting
CRのマニフェストに定義しているSpecフィールドにデフォルト値を付与する
code:yaml
shcema:
OpenAPIV3Schema:
type: object
propeties:
spec:
type: object
propeties:
image:
type: string
default: "nginx.latest"
Conversion
バージョンの異なるCRDのバージョン互換を行う
一つのCRDにv1beta、v1など二つ以上のバージョンがあった際に、その互換を行う
ControllerとCRDを自作するために必要なもの
1. Kubernetes Way(client-go + code-generator)
2. Kubebuilder
3. Operator SDK
Kubernetes way
client-goと、code-generatorというライブラリを使ってControllerを実装する
これはDeploymentやServiceなどを実装するのに使われている、伝統的な方法
低レベルAPIを扱うので、難しいけど自由度が高い
普通に作るなら2か3
Kubebuilder
SIGs(Special Interest Group)で開発されている、Operator開発用のフレームワークを使う
Controllerのロジックにだけ集中できる
Controllerだけでなく、Operatorとして動作するために必要なもの一式も提供してくれる
内部ではcontroller-runtimeとcontroller-tools
Operator SDK
やはり内部ではcontroller-runtimeとcontroller-tools
説明が少ないw
ControllerとOperatorって何が違うの?
明確には定義されてない
Controller
CRの管理を行うController
Control Loopを実行するコンポーネント
Operator
CRDとcustom Controllerのセット。etcd operatorなどのように、特定のソフトウェアの管理を自動化するためのソフトウェア
2章 client-goと知っておくべき周辺知識
client-goとapimachinery
client-goはGoのk8sクライアントライブラリ
主にapi-serverへのアクセスに利用
kubernetes自体の開発にも使われている
apimachinery
k8s API Objectのscheme, typing, encoding, decoding, conversionなどの機能がある
client-goでPodの一覧を取得してみる
code:go
package main
import (
"context"
"flag"
"fmt"
"path/filepath"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var defaultKubeConfigPath string
if home := homedir.HomeDir(); home != "" {
// build kubeconfig path from $HOME dir
defaultKubeConfigPath = filepath.Join(home, ".kube", "config")
}
// set kubeconfig flag
// if flag is not set, use $HOME/.kube/config as default file path
kubeconfig := flag.String("kubeconfig", defaultKubeConfigPath, "kubeconfig config file")
flag.Parse()
// this method generates config for k8s from kubeconfig file.
config, _ := clientcmd.BuildConfigFromFlags("", *kubeconfig)
// get clientset for kubernetes resources.
// clientset is the set of clients for k8s.
// however, this set doesn't include a client for custom resource.
clientset, _ := kubernetes.NewForConfig(config)
// Get list of pod objects
pods, _ := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
// show pod object to stdout
for i, pod := range pods.Items {
}
}
code:bash
❯ go run main.go (docker-desktop/default)
Pod Name 2ingress-nginx-controller-57cb5bf694-rxtn8 知っておくべき周辺知識
client-goやapimachineryなどのライブラリを利用して、Custom Controllerを作る上で覚えておくべきものがいくつかある
Informer
k8s上のObjectの変更を監視し、インメモリキャッシュにデータを格納する
Lister
インメモリキャッシュからデータを取得する
Workqueue
Controllerが処理するアイテムを登録するキュー
runtime.Object
全てのAPI Object共通のインターフェイス
Scheme
k8s APIとgo Typesの架け橋
3章 Sample Controller解説
Sample Controllerはk8s wayで実装された Controller
KubebuilderやOperator SDKとは実装方法が大きく異なる
このControllerは何をするものか
Deploymentの上位ResourceであるFooを管理するController
Foo Controllerは、Nginxイメージを持った、任意のReploca数のDeploymentを管理する
コードを読んでいく
まずはCRDの確認から
code:crd.yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
spec:
group: samplecontroller.k8s.io
version: v1alpha1
names:
kind: Foo
plural: foos
scope: Namespaced
code:crd-validation.yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
spec:
group: samplecontroller.k8s.io
version: v1alpha1
names:
kind: Foo
plural: foos
scope: Namespaced
validation:
openAPIV3Schema:
properties:
spec:
properties:
replicas:
type: integer
minimum: 1
maximum: 10
code:crd-subresource.yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
spec:
group: samplecontroller.k8s.io
version: v1alpha1
names:
kind: Foo
plural: foos
scope: Namespaced
subresources:
status: {}
types.go
CRDの次は、具体的なFoo Resourceのフィールドを定義しているtypes.goを見ていく
code:types.go
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Foo構造体を定義
// ここではFooSpecとFooStatusを内包している
// Foo is a specification for a Foo resource
type Foo struct {
metav1.TypeMeta json:",inline"
metav1.ObjectMeta json:"metadata,omitempty"
Spec FooSpec json:"spec"
Status FooStatus json:"status"
}
// FooSpec構造体を定義
// ここではDeploymentNameとReplicasのフィールドがあり、
// 管理するDeploymentのNameとReplicasを利用者が自由に定義できる
// FooSpec is the spec for a Foo resource
type FooSpec struct {
DeploymentName string json:"deploymentName"
Replicas *int32 json:"replicas"
}
// FooStatus構造体を定義
// FooStatus is the status for a Foo resource
type FooStatus struct {
AvailableReplicas int32 json:"availableReplicas"
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// FooList is a list of Foo resources
type FooList struct {
metav1.TypeMeta json:",inline"
metav1.ListMeta json:"metadata"
Items []Foo json:"items"
}
main func
controllerの生成と、そのStartが主な役割
code:main.go
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"time"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
informers "k8s.io/sample-controller/pkg/generated/informers/externalversions"
"k8s.io/sample-controller/pkg/signals"
)
var (
masterURL string
kubeconfig string
)
func main() {
klog.InitFlags(nil)
flag.Parse()
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
//kubeconfig(cfg)を基にclientsetを生成する
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
// こちらはFooResource用に自動生成したclientsetを生成
exampleClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error())
}
// Informerはインメモリキャッシュにデータを保持してくれるやつ
// 30秒を指定しているが、これによって、30秒ごとにインメモリキャッシュの同期が行われる
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
// NewControllerでコントローラを作っている
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
// StartメソッドでDeployment Informerと、Foo Informerを開始している
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
// controller.Runでコントローラを起動
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
Controllerの定義
code:controller.go
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
samplev1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
samplescheme "k8s.io/sample-controller/pkg/generated/clientset/versioned/scheme"
informers "k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1"
listers "k8s.io/sample-controller/pkg/generated/listers/samplecontroller/v1alpha1"
)
const controllerAgentName = "sample-controller"
const (
// SuccessSynced is used as part of the Event 'reason' when a Foo is synced
SuccessSynced = "Synced"
// ErrResourceExists is used as part of the Event 'reason' when a Foo fails
// to sync due to a Deployment of the same name already existing.
ErrResourceExists = "ErrResourceExists"
// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a Deployment already existing
MessageResourceExists = "Resource %q already exists and is not managed by Foo"
// MessageResourceSynced is the message used for an Event fired when a Foo
// is synced successfully
MessageResourceSynced = "Foo synced successfully"
)
// Controller構造体の定義
// Controller is the controller implementation for Foo resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
sampleclientset clientset.Interface
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}
// Foo Controllerを生成する関数
// NewController returns a new sample controller
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
// まず初めに、utilruntime.MustでSchemeへの登録を行
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
// recorderとして、EventRecorderを生成する
// これは名前の通り、Eventを生成・記録するためのもの
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
}
klog.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change
// Event発火時に呼び出す関数の定義をする
// AddFunc, UpdateFuncはどちらもEvent発火時に、enqueueFooという関数を呼び出すようにする
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
// Set up an event handler for when Deployment resources change. This
// handler will lookup the owner of the given Deployment, and if it is
// owned by a Foo resource will enqueue that Foo resource for
// processing. This way, we don't need to implement custom logic for
// handling Deployment resources. More info on this pattern:
// DeploymentInformatのEventHandlerを設定している
// ここでは、AddFunc, UpdateFunc, DeleteFuncでhandleObject関数を呼び出すように登録している
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
// Start the informer factories to begin populating the informer caches
klog.Info("Starting Foo controller")
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
// Launch two workers to process Foo resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
// syncHandlerがControllerのメインロジックを担う
// このロジックはReconcileと呼ばれ、processNextWorkItem関数という WorkQueuに存在するキューを処理する関数から呼ばれる
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the Foo resource with this namespace/name
foo, err := c.foosLister.Foos(namespace).Get(name)
if err != nil {
// The Foo resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
return nil
}
return err
}
deploymentName := foo.Spec.DeploymentName
if deploymentName == "" {
// We choose to absorb the error here as the worker would requeue the
// resource otherwise. Instead, the next time the resource is updated
// the resource will be queued again.
utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
return nil
}
// Get the deployment with the name specified in Foo.spec
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
}
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and return error msg.
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
}
// If an error occurs during Update, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error {
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
fooCopy := foo.DeepCopy()
fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
// If the CustomResourceSubresources feature gate is not enabled,
// we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
// UpdateStatus will not allow changes to the Spec of the resource,
// which is ideal for ensuring nothing other than resource status has been updated.
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(context.TODO(), fooCopy, metav1.UpdateOptions{})
return err
}
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
// enqueueFooはControllerに実装される
func (c *Controller) enqueueFoo(obj interface{}) {
var key string
var err error
// cache.MetaNamespaceKeyFuncで、インメモリキャッシュからnemaspace/nameをキーとして取得し、そのキーをWorkQueueに追加する
// こうして追加されたアイテムは、メインロジック(Reconcile)によって処理される
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}
klog.V(4).Infof("Processing object: %s", object.GetName())
// GetControllerOfでDeployment ObjectのOwner referenceを取得して、親ResourceがFooかどうかを判定
// WorkQueueに追加する
// fooが管理するObjectでなければ特に処理せず、そのまま関数を終了する
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not owned by a Foo, we should not do anything more
// with it.
if ownerRef.Kind != "Foo" {
return
}
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
return
}
c.enqueueFoo(foo)
return
}
}
// newDeployment creates a new Deployment for a Foo resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the Foo resource that 'owns' it.
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
"app": "nginx",
"controller": foo.Name,
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: foo.Spec.DeploymentName,
Namespace: foo.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(foo, samplev1alpha1.SchemeGroupVersion.WithKind("Foo")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: foo.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
},
}
}
メインロジック(Reconcile)
このロジック(syncHandler)はprocessNextWorkItem関数から呼ばれる
processNextWorkItemではループで処理が繰り返されている
code:controller.go
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
メインロジックの処理の流れは以下の通り
runWorker
processNextWorkItem
SyncHandler
runWorker
processNextWorkItem関数
WorkQueuのキューからアイテムを取り出して、処理する関数
EventHandlerから呼ばれるhandleObject関数やenqueueFoo関数によって WorkQueueに追加されたアイテムを処理する
これらはさっきcontroller.goで登録していたやつ
https://gyazo.com/43fe8615d7c7603fc4e7051158faac7e
code:controller.go
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
// queueからアイテムを取得
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
// defer c.workqueue.Done(obj)で、processNextWorkItem関数の終了時に WorkQueuのアイテムがDone状態になるようにする。
// 完了状態になると、WorkQueueからアイテム(namespace/name)が取り除かれる
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
// c.syncHandlerで、Reconcile処理を行う
// namespace/name形式のキー(ex: default/example-foo)を引数として、関数に渡す
// ちなみに、Reconcile関数(syncHandler)の名前は自由に変えられる
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
// エラーが発生したら、レートリミットを設けて一定時間待った後、再度Requeueされる
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
// うまく処理できたら、アイテムの追跡を停止する
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
// 最後まで処理ができたら、deferでアイテムが WorkQueueから完全に取り除かれる
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
syncHandler関数
Reconcile処理に相当する部分
code:controller.go
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
// processNextWorkItemから渡されたキーを、namespaceとnameに分割する
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the Foo resource with this namespace/name
// keyに含まれていたnamespaceとnameを持つfoo Objectをインメモリキャッシュから取得する
foo, err := c.foosLister.Foos(namespace).Get(name)
if err != nil {
// The Foo resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
return nil
}
return err
}
// foo ObjectのSpecからDeploymentNameを取得する
// Specというのはyamlに書かれてるアレを想像すると良い
deploymentName := foo.Spec.DeploymentName
if deploymentName == "" {
// We choose to absorb the error here as the worker would requeue the
// resource otherwise. Instead, the next time the resource is updated
// the resource will be queued again.
utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
return nil
}
// Get the deployment with the name specified in Foo.spec
// foo.Spec.DeploymentNameを持つオブジェクトを取得しようとして、なかったら新しく作る
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
}
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and return error msg.
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
// 現在の状態が望んだ状態になっているか確認する
// なってなければ、Updateで更新する
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
}
// If an error occurs during Update, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
// 最後に、updateFooStatusによってfoo ObjectのStatusを更新する
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}
// SuccessSyncのEventを記録する
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error {
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
// DeepCopyを作ってから更新
fooCopy := foo.DeepCopy()
fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
// If the CustomResourceSubresources feature gate is not enabled,
// we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
// UpdateStatus will not allow changes to the Spec of the resource,
// which is ideal for ensuring nothing other than resource status has been updated.
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(context.TODO(), fooCopy, metav1.UpdateOptions{})
return err
}
5章 KubebuilderでSample Controllerを実装しよう
3章で取り上げたFoo Controllerを作る
開発の流れ
1. kubebuilderでプロジェクト初期化
2. API ObjectとControllerのテンプレを作成
3. types.goでAPI Objectを定義
4. controller.goでReconcileを実装
5. main.goを編集
6. 実際にOperatorを動かす
初期化
kubebuilder init --domain k8s.io
go mod tidy
make
kubebuilder create api
これでOperator用のファイル群が生成されるらしいが、、、
code:bash
❯ kubebuilder create api
y
y
Error: failed to create API: unable to inject the resource to "base.go.kubebuilder.io/v3": version cannot be empty
Usage:
kubebuilder create api flags --version --kindが必要なようだった
code:zsh
➜ kubebuilder create api --group samplecontroller --version v1alpha1 --kind Foo
y
y
Writing kustomize manifests for you to edit...
Writing scaffold for you to edit...
api/v1alpha1/foo_types.go
controllers/foo_controller.go
Update dependencies:
$ go mod tidy
Running make:
$ make generate
go: creating new go.mod: module tmp
これで、いくつかのディレクトリとファイルが追加される
api/
controllers/
config/crd
config/sample
生成されたfoo_types.goとfoo_controller.goを見てみる
code:api/v1/alpha1/foo_types.go
/*
Copyright 2021.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// 下記のコメントの通り、このファイルはただのscaffoldなので編集が必要
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// FooSpec defines the desired state of Foo
type FooSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// Foo is an example field of Foo. Edit foo_types.go to remove/update
Foo string json:"foo,omitempty"
}
// FooStatus defines the observed state of Foo
type FooStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// Foo is the Schema for the foos API
type Foo struct {
metav1.TypeMeta json:",inline"
metav1.ObjectMeta json:"metadata,omitempty"
Spec FooSpec json:"spec,omitempty"
Status FooStatus json:"status,omitempty"
}
//+kubebuilder:object:root=true
// FooList contains a list of Foo
type FooList struct {
metav1.TypeMeta json:",inline"
metav1.ListMeta json:"metadata,omitempty"
Items []Foo json:"items"
}
func init() {
SchemeBuilder.Register(&Foo{}, &FooList{})
}
code:controllers/foo_controller.go
/*
Copyright 2021.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
samplecontrollerv1alpha1 "github.com/osamtimizer/kubebuilder-sample/api/v1alpha1"
)
// FooReconciler reconciles a Foo object
type FooReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=samplecontroller.example.com,resources=foos,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=samplecontroller.example.com,resources=foos/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=samplecontroller.example.com,resources=foos/finalizers,verbs=update
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the Foo object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
// Reconcile関数に、このコントローラのロジックを書く必要がある
// your logic here
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *FooReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&samplecontrollerv1alpha1.Foo{}).
Complete(r)
}
どちらも現状では動かないので、ある程度の実装が必要
types.goを編集して、API Objectを定義
foo_types.goを編集することで、Foo ResourceのSpecとStatusが実装できる
SpecはDeploymentNameとReplicasをもつ
StatusはAvailableReplicasをもつ
code:foo_types.go
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type FooSpec struct {
// 以下のコメントはマーカー
// フィールドのバリデーションを設定できる
// make manifestsコマンドを実行すると、controller-genがマーカーを解析
// CRDやRBACなどのマニフェスト群を生成する
// その際、CRDにはマーカーで指定したバリデーションが記載される
// +kubebuilder:validation:Required
// +kubebuilder:validation:Format:=string
// the name of deployment which is owned by foo
DeploymentName string 'json:"deploymentName"'
// +kubebuilder:validation:Required
// +kubebuilder:validation:Minimum=0
// the replicas of deployment which is owned by foo
Replicas *int32 'json:"Replicas"'
}
type FooStatus struct {
//this is equal deployment.status.availableReplicas
// オプション項目であることを宣言
// + optional
AvailableReplicas int32 'json:"availableReplicas"'
}
// subresourceのマーカーは、make manifestsコマンド実行時に、
// CRDの Status SubResourceが有効化されたマニフェストを自動生成させる
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// Foo is the Schema for the foos API
type Foo struct {
metav1.TypeMeta json:",inline"
metav1.ObjectMeta json:"metadata,omitempty"
Spec FooSpec json:"spec,omitempty"
Status FooStatus json:"status,omitempty"
}
//+kubebuilder:object:root=true
type FooList struct {
metav1.TypeMeta json:",inline"
metav1.ListMeta json:"metadata,omitempty"
Items []Foo json:"items"
}
func init() {
SchemeBuilder.Register(&Foo{}, &FooList{})
}
controller.goを編集して、Reconcileを実装
まず、importするパッケージを書く
code:controllers/foo_controller.go
import (
)