Dynamically add label to Pods in Kubernetes with Client-go

2.2k Views Asked by At

I want to create a controller and listen to the pod events when new pod is created (by a deployment) then add all labels belong to deployment to the created pod, is this possible at scale with client-go?

2

There are 2 best solutions below

2
On

What about this??

Here are the pods before they are labeled manually:

❯ kubectl get po --show-labels
NAME                            READY   STATUS    RESTARTS   AGE   LABELS
nginx-deploy-6bdc4445fd-5qlhg   1/1     Running   0          13h   app=nginx,pod-template-hash=6bdc4445fd
nginx-deploy-6bdc4445fd-pgkhb   1/1     Running   0          13h   app=nginx,pod-template-hash=6bdc4445fd
nginx-deploy-6bdc4445fd-xdz59   1/1     Running   0          13h   app=nginx,pod-template-hash=6bdc4445fd

I, here, label the pod "nginx-deploy-6bdc4445fd-5qlhg" with the label "test=2":

❯ kubectl label pod nginx-deploy-6bdc4445fd-5qlhg test=2
pod/nginx-deploy-6bdc4445fd-5qlhg labeled

Here are the pods after they are labeled manually:

❯ kubectl get po --show-labels
NAME                            READY   STATUS    RESTARTS   AGE   LABELS
nginx-deploy-6bdc4445fd-5qlhg   1/1     Running   0          13h   app=nginx,pod-template-hash=6bdc4445fd,test=2
nginx-deploy-6bdc4445fd-pgkhb   1/1     Running   0          13h   app=nginx,pod-template-hash=6bdc4445fd
nginx-deploy-6bdc4445fd-xdz59   1/1     Running   0          13h   app=nginx,pod-template-hash=6bdc4445fd
0
On

In order to observe pod events, you need to use informers. Informers have built-in optimizations to avoid overloading API servers.

There is a patch method available in the PodInterface that allows you to add a label to a pod.

Here is a sample code for your reference. In the main function, the informer code is added, and the LabelPod function implements the label logic.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    v1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/types"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
)

type patchStringValue struct {
    Op    string `json:"op"`
    Path  string `json:"path"`
    Value string `json:"value"`
}

func main() {
    clientSet := GetK8sClient()

    labelOptions := informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
        opts.LabelSelector = GetLabelSelectorForDeployment("deployment-name", "namespace-name")
    })
    informers := informers.NewSharedInformerFactoryWithOptions(clientSet, 10*time.Second, informers.WithNamespace("namespace-name"), labelOptions)

    podInformer := informers.Core().V1().Pods()

    podInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc: handleAdd,
        },
    )

    informers.Start(wait.NeverStop)
    informers.WaitForCacheSync(wait.NeverStop)

}

func GetLabelSelectorForDeployment(Name string, Namespace string) string {
    clientSet := GetK8sClient()

    k8sClient := clientSet.AppsV1()

    deployment, _ := k8sClient.Deployments(Namespace).Get(context.Background(), Name, metav1.GetOptions{})

    labelSet := labels.Set(deployment.Spec.Selector.MatchLabels)

    return string(labelSet.AsSelector().String())
}

func handleAdd(obj interface{}) {
    k8sClient := GetK8sClient().CoreV1()
    pod := obj.(*v1.Pod)
    fmt.Println("Pod", pod.GetName(), pod.Spec.NodeName, pod.Spec.Containers)
    payload := []patchStringValue{{
        Op:    "replace",
        Path:  "/metadata/labels/testLabel",
        Value: "testValue",
    }}
    payloadBytes, _ := json.Marshal(payload)

    _, updateErr := k8sClient.Pods(pod.GetNamespace()).Patch(context.Background(), pod.GetName(), types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
    if updateErr == nil {
        fmt.Println(fmt.Sprintf("Pod %s labelled successfully.", pod.GetName()))
    } else {
        fmt.Println(updateErr)
    }

}

func GetK8sClient() *kubernetes.Clientset {
    config, err := rest.InClusterConfig()
    if err != nil {
        panic(err.Error())
    }
    // creates the clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    return clientset
}