見出し画像

kube-rsを使ってRustでKubernetes Operator を作成する

要件

  • rustでKubernetes Operatorを作成する

  • kube-rsを使用する

タスク

  • rustでKubernetes Operatorを作成する

参考記事

▼kubernetesとrustがやりとりするためには、kube-rsを使用する

▼kube-rsの使い方 わかりやすい!

▼rustでCRDを作成する方法


▼kube-rsの動画でわかりやすい


▼kube-rsのcontrollerに関して書いてあります!


▼kube-rsのreconcileの実装方法

▼kube-rsのサンプルコードとてもシンプルでわかりやすい!!


▼Finalizerの仕組み
operatorの作成において、rustよりgoの方がライブラリーが多いので、実はgoのライブラリーの方がドキュメンtのがわかりやすかったりします 自分はkubebuilderのドキュメントからFinalizerの仕組みを理解しました!


学べること

  • rustでOperatorを作成する方法

  • kube-rsの使い方

ヒント

Kubernetes Operatorとは何なのか?
イメージは自分独自の「Custom Resource」と「Custom Controller」を作りよしなに自分の理想状態にもっていってくれる仕組みづくり。

そもそもCustom Resourceとは何なのかといえば、これは簡単にいえばパラメーターだ。プログラミング的に言えば引数で、目標の状態を記載するものと捉えれば分かりやすい。

そして、Custom ControllerとはCustom Resourceで掲げた目標の状態と現状を比較して、目標の状態に近づけてくれるものだ。

もっと単純なイメージで言えば、ダイエット中の人がいて、
その人が体重50キロ目指すぞ、食事は2回、毎日運動するぞ、
みたいな目標シートを書いている人がいるとする。

そしてそれをチェックをするパーソナルトレーナがいるとする。
このパーソナルトレーナーは、ダイエット中の人を監視して、目標に近づけようとする。これがまさにoperatorと同じ仕組みです

目標シートがCustom Resourceで
パーソナルトレーナーがCustom Controllerです

この仕組みをプログラムで行なっていると考えればわかりやすいはずです!

そして今回すごいシンプルなコードを書いてみました。
最小限のoperatorです。参考になれば幸いです。

main.rsのコードとtree

main.rsのコードとtree

(1)main.rsのファイル

mod crds;
mod create_cr;
mod create_crds;
mod sample_controller;

#[tokio::main]
async fn main() {
    // CRDの作成
    create_crds::exec().await;

    // コントローラの起動
    sample_controller::run().await;

    // カスタムリソースの作成
    create_cr::exec().await;

    println!("-----問題なし完了------")
}

(2)まずはCRDの定義 crd.rs

use kube::CustomResource;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;

// rustのstructがCRDになる
#[derive(CustomResource, Deserialize, Debug, Clone, Serialize, JsonSchema)]
#[kube(
    group = "test.dev",
    version = "v2",
    kind = "SampleCustomResource",
    namespaced
)]
#[kube(status = "SampleStatus")]
pub struct SampleSpec {
    pub name: String,
}

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct SampleStatus {
    pub state: SampleState,
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum SampleState {
    WaitForInit,
}

impl SampleStatus {
    pub fn init() -> Self {
        Self {
            state: SampleState::WaitForInit,
        }
    }
}

(4)crdの作成  create_crds.rs

use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::{
    api::{Patch, PatchParams},
    Api, Client, CustomResourceExt,
};

use crate::crds::SampleCustomResource;

// =================================
// CRDの作成
// =================================

// ※crd名はKindの小文字で複数系になる
const CRD_NAME: &str = "samplecustomresources.test.dev";

pub async fn exec() {
    let crd_client = Client::try_default().await.expect("error");

    // Create the CRD controllerを実施者を決める
    let pp = PatchParams::apply("sample_apply");
    // 0. Ensure the CRD is installed, could do this once
    let crd_client: Api<CustomResourceDefinition> = Api::all(crd_client.clone());

    crd_client
        .patch(CRD_NAME, &pp, &Patch::Apply(SampleCustomResource::crd()))
        .await
        .expect("error");

    println!("CRDの作成完了");
}


(4) sample_controller.rs

use crate::crds::{SampleCustomResource, SampleStatus};
use futures::stream::StreamExt;
use kube::{
    api::{Patch, PatchParams},
    runtime::{controller::Action, Controller},
    Api, Client,
};
use serde_json::json;
use std::{io::BufRead, sync::Arc, time::Duration};

#[derive(thiserror::Error, Debug)]
pub enum Error {}
pub type Result<T, E = Error> = std::result::Result<T, E>;

pub struct Context {
    client: Client,
}

pub async fn run() {
    println!("contrllerの起動");

    let client = Client::try_default()
        .await
        .expect("failed to create kube Client");

    // 監視するカスタムリソース
    let api = Api::<SampleCustomResource>::all(client.clone());


    Controller::new(api.clone(), Default::default())
        .shutdown_on_signal()
        .run(reconcile, error_policy, Arc::new(Context { client }))
        .for_each(|_| futures::future::ready(()))
        .await;
}

// error時の挙動を制御するためのもの
fn error_policy(_: Arc<SampleCustomResource>, _error: &Error, _ctx: Arc<Context>) -> Action {
    Action::requeue(Duration::from_secs(5))
}

async fn reconcile(
    sample_custom_resource: Arc<SampleCustomResource>,
    ctx: Arc<Context>,
) -> Result<Action> {
    println!("--reconcileが動いたよ---");

    // 監視しているカスタムリソースを取得
    let api = Api::<SampleCustomResource>::namespaced(ctx.client.clone(), "default");

    // カスタムリソースの名前を取得
    let cr_name = sample_custom_resource.metadata.name.clone().expect("error");

    // カスタムリソースの初期値のステータス
    let status = json!({ "status": SampleStatus::init() });

    api.patch_status(&cr_name, &PatchParams::default(), &Patch::Merge(&status))
        .await
        .expect("error");

    Ok(Action::requeue(Duration::from_secs(3)))
}

(5) カスタムリソースの作成 create_cr.rs

use crate::crds::{SampleCustomResource, SampleSpec};
use kube::{core::ObjectMeta, Api, Client};

// =================================
// カスタムリソースの作成
// =================================
// カスタムリソースはコードで生成する場合もあれば、yamlでapplyする場合もある

pub async fn exec() {
    let client = Client::try_default()
        .await
        .expect("failed to create kube Client");

    // 作成するカスタムリソースの値
    let sample_custom_resource = SampleCustomResource {
        metadata: ObjectMeta {
            name: Some("hoge".to_string()),
            ..Default::default()
        },
        spec: SampleSpec {
            name: "foo".to_string(),
        },
        status: None,
    };

    let api: Api<SampleCustomResource> = Api::namespaced(client.clone(), "default");

    // カスタムリソースを作成
    api.create(&Default::default(), &sample_custom_resource)
        .await
        .expect("error");

    println!("カスタムリソースの作成完了");
}

Cargo.toml

[package]
name = "sample-operator-rust"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
kube = { version = "0.86.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.20.0", features = ["latest"] }
tokio = { version = "1.32.0", features = ["full"] }

serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"

# Each data format lives in its own crate; the sample code below uses JSON
# but you may be using a different one.
serde_json = "1.0"

schemars = { version = "0.8", features = ["chrono"] }

futures = "0.3.28"


thiserror = "1.0.48"


ハマりポイント


関連タグ

#rust #Operator #kubernetes #kube_rs

この記事が気に入ったらサポートをしてみませんか?