I have been struggling with this scenario, so I'll post it here and I hope someone point me to the right direction, as I'm not sure whether this is a KNative, client-go or my code issue

I have an operator (based on KNative eventing) that needs to run in 2 different versions of Kubernetes (1.19 and 1.27) Those operators create HPA resources for the broker ingress and for the trigger dispatcher

These are the apiversions of autoscaling on each cluster:

1.19

autoscaling.k8s.io/v1
autoscaling.k8s.io/v1beta2
autoscaling/v1
autoscaling/v2beta1
autoscaling/v2beta2

1.27

autoscaling.k8s.io/v1
autoscaling.k8s.io/v1beta2
autoscaling/v1
autoscaling/v2

Until now, we only had 1.19, and it was working fine creating HPA v2beta2, but as you can see the new cluster doesn't have autoscaling v2beta2, so we need to migrate it to either v1 or v2 v1 is not an option as it doesn't support scaling with Memory

v2 is not available on the 1.19 cluster, so first I tried using a code similar to this:

        import (
       ...
        autoscalingv2listers "k8s.io/client-go/listers/autoscaling/v2"
    autoscalingv2beta2listers "k8s.io/client-go/listers/autoscaling/v2beta2"
         )


       type Reconciler struct {
        ...
    hpaListerv2beta2   autoscalingv2beta2listers.HorizontalPodAutoscalerLister
    hpaListerv2        autoscalingv2listers.HorizontalPodAutoscalerLister
       }       

        ...
    var hpaListerV2 autoscalingv2listers.HorizontalPodAutoscalerLister
    var hpaListerV2beta2 autoscalingv2beta2listers.HorizontalPodAutoscalerLister

    if shared.IsApiVersionSupported(clientSet, "autoscaling", "v2") {
        hpaListerV2 = hpainformerv2.Get(ctx).Lister()
    } else {
        hpaListerV2beta2 = hpainformerv2beta2.Get(ctx).Lister()
    }

    reconciler := &Reconciler{
        hpaListerv2beta2: hpaListerV2beta2,
        hpaListerv2:      hpaListerV2,
    }

EDIT: I also have added the conditional when adding the event handler

    if shared.IsApiVersionSupported(clientSet, "autoscaling", "v2") {
        hpainformerv2.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
            FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Broker")),
            Handler:    controller.HandleAll(impl.EnqueueControllerOf),
        })
    } else {
        hpainformerv2beta2.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
            FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Broker")),
            Handler:    controller.HandleAll(impl.EnqueueControllerOf),
        })
    }

It doesn't work as expected and even tho I have a conditional it tries to watch the resource not available on the cluster, throwing the errors:

W0627 10:03:58.137362       1 reflector.go:533] knative.dev/pkg/controller/controller.go:732: failed to list *v2beta2.HorizontalPodAutoscaler: the server could not find the requested resource
E0627 10:03:58.137387       1 reflector.go:148] knative.dev/pkg/controller/controller.go:732: Failed to watch *v2beta2.HorizontalPodAutoscaler: failed to list *v2beta2.HorizontalPodAutoscaler: the server could not find the requested resource
error: http2: client connection lost

If we look at the line mentioned there controller.go:732 it is the line that calls the Run method of the Informers on KNative source code:

func StartInformers(stopCh <-chan struct{}, informers ...Informer) error {
    for _, informer := range informers {
        informer := informer
        go informer.Run(stopCh) // Here
    }

    for i, informer := range informers {
        if ok := cache.WaitForCacheSync(stopCh, informer.HasSynced); !ok {
            return fmt.Errorf("failed to wait for cache at index %d to sync", i)
        }
    }
    return nil
}

I have tried using an interface, and also using generics, but didn't go far, always ended up on some sort of limitation I have commented out the version not available in the cluster and then I was able to run it successful, but of course, this is not ideal

My last attempt was spending half of day converting everything to dynamic client, the code looked like this

    cfg, err := rest.InClusterConfig()
    if err != nil {
        log.Fatal("Empty getting cluster configuration")
    }

    // Create the dynamic client from the same rest config
    dynamicClient, err := dynamic.NewForConfig(cfg)
    if err != nil {
        log.Fatalf("Errpr creating dynamic client %v", err)
    }

    version := "v2beta2"
    if shared.IsApiVersionSupported(clientSet, "autoscaling", "v2") {
        version = "v2"
    }

    gvr := schema.GroupVersionResource{
        Group:    "autoscaling",
        Version:  version,
        Resource: "horizontalpodautoscalers",
    }

    dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, metav1.NamespaceAll, nil)
    hpaDynamicInformer := dynamicInformerFactory.ForResource(gvr)
    hpaLister := hpaDynamicInformer.Lister()

       ...


// reconcileHorizontalPodAutoscaler reconciles the K8s HPA 'h' beta.
func (r *Reconciler) reconcileHorizontalPodAutoscaler(ctx context.Context, h interface{}) error {
    var namespace, name string
    var spec interface{}

    unstructuredHPA, err := r.hpaLister.ByNamespace(namespace).Get(name)
    unstructuredObj, ok := unstructuredHPA.(*unstructured.Unstructured)
    if !ok {
        return fmt.Errorf("failed to convert runtime.Object to *unstructured.Unstructured")
    }

    if apierrs.IsNotFound(err) {
        // create HPA
        switch h := h.(type) {
        case *v2beta2.HorizontalPodAutoscaler:
            _, err = r.kubeClientSet.AutoscalingV2beta2().HorizontalPodAutoscalers(namespace).Create(ctx, h, metav1.CreateOptions{})
        case *v2.HorizontalPodAutoscaler:
            _, err = r.kubeClientSet.AutoscalingV2().HorizontalPodAutoscalers(namespace).Create(ctx, h, metav1.CreateOptions{})
        }
        if err != nil {
            return err
        }
    } else if err != nil {
        return err
    } else {
        // update HPA if necessary
        current, err := shared.ConvertUnstructuredToHPA(unstructuredObj)
        if err != nil {
            return err
        }

        switch h := h.(type) {
        case *v2beta2.HorizontalPodAutoscaler:
            currentHPA, ok := current.(*v2beta2.HorizontalPodAutoscaler)
            if !ok {
                return fmt.Errorf("error converting unstructured to v2beta2.HorizontalPodAutoscaler")
            }
            if !equality.Semantic.DeepDerivative(spec, currentHPA.Spec) {
                desired := currentHPA.DeepCopy()
                desired.Spec = h.Spec
                _, err = r.kubeClientSet.AutoscalingV2beta2().HorizontalPodAutoscalers(desired.Namespace).Update(ctx, desired, metav1.UpdateOptions{})
            }
        case *v2.HorizontalPodAutoscaler:
            currentHPA, ok := current.(*v2.HorizontalPodAutoscaler)
            if !ok {
                return fmt.Errorf("error converting unstructured to v2.HorizontalPodAutoscaler")
            }
            if !equality.Semantic.DeepDerivative(spec, currentHPA.Spec) {
                desired := currentHPA.DeepCopy()
                desired.Spec = h.Spec
                _, err = r.kubeClientSet.AutoscalingV2().HorizontalPodAutoscalers(desired.Namespace).Update(ctx, desired, metav1.UpdateOptions{})
            }
        }
        if err != nil {
            return err
        }
    }
    return nil
}

Only to end up on the exact same error I'm not sure where exactly those versions are called in runtime, even having a conditional to only lookup for the resources of the versions supported on the cluster it is running

Any idea how to achieve this?

Knative release version v0.37.1

0

There are 0 best solutions below