I have been struggling with this scenario, so I'll post it here and I hope someone point me to the right direction, as I'm not sure whether this is a KNative, client-go or my code issue
I have an operator (based on KNative eventing) that needs to run in 2 different versions of Kubernetes (1.19 and 1.27) Those operators create HPA resources for the broker ingress and for the trigger dispatcher
These are the apiversions of autoscaling
on each cluster:
1.19
autoscaling.k8s.io/v1
autoscaling.k8s.io/v1beta2
autoscaling/v1
autoscaling/v2beta1
autoscaling/v2beta2
1.27
autoscaling.k8s.io/v1
autoscaling.k8s.io/v1beta2
autoscaling/v1
autoscaling/v2
Until now, we only had 1.19, and it was working fine creating HPA v2beta2
, but as you can see the new cluster doesn't have autoscaling v2beta2, so we need to migrate it to either v1
or v2
v1
is not an option as it doesn't support scaling with Memory
v2
is not available on the 1.19
cluster, so first I tried using a code similar to this:
import (
...
autoscalingv2listers "k8s.io/client-go/listers/autoscaling/v2"
autoscalingv2beta2listers "k8s.io/client-go/listers/autoscaling/v2beta2"
)
type Reconciler struct {
...
hpaListerv2beta2 autoscalingv2beta2listers.HorizontalPodAutoscalerLister
hpaListerv2 autoscalingv2listers.HorizontalPodAutoscalerLister
}
...
var hpaListerV2 autoscalingv2listers.HorizontalPodAutoscalerLister
var hpaListerV2beta2 autoscalingv2beta2listers.HorizontalPodAutoscalerLister
if shared.IsApiVersionSupported(clientSet, "autoscaling", "v2") {
hpaListerV2 = hpainformerv2.Get(ctx).Lister()
} else {
hpaListerV2beta2 = hpainformerv2beta2.Get(ctx).Lister()
}
reconciler := &Reconciler{
hpaListerv2beta2: hpaListerV2beta2,
hpaListerv2: hpaListerV2,
}
EDIT: I also have added the conditional when adding the event handler
if shared.IsApiVersionSupported(clientSet, "autoscaling", "v2") {
hpainformerv2.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Broker")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
} else {
hpainformerv2beta2.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterControllerGK(eventingv1.Kind("Broker")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
}
It doesn't work as expected and even tho I have a conditional it tries to watch the resource not available on the cluster, throwing the errors:
W0627 10:03:58.137362 1 reflector.go:533] knative.dev/pkg/controller/controller.go:732: failed to list *v2beta2.HorizontalPodAutoscaler: the server could not find the requested resource
E0627 10:03:58.137387 1 reflector.go:148] knative.dev/pkg/controller/controller.go:732: Failed to watch *v2beta2.HorizontalPodAutoscaler: failed to list *v2beta2.HorizontalPodAutoscaler: the server could not find the requested resource
error: http2: client connection lost
If we look at the line mentioned there controller.go:732
it is the line that calls the Run
method of the Informers on KNative source code:
func StartInformers(stopCh <-chan struct{}, informers ...Informer) error {
for _, informer := range informers {
informer := informer
go informer.Run(stopCh) // Here
}
for i, informer := range informers {
if ok := cache.WaitForCacheSync(stopCh, informer.HasSynced); !ok {
return fmt.Errorf("failed to wait for cache at index %d to sync", i)
}
}
return nil
}
I have tried using an interface, and also using generics, but didn't go far, always ended up on some sort of limitation I have commented out the version not available in the cluster and then I was able to run it successful, but of course, this is not ideal
My last attempt was spending half of day converting everything to dynamic client, the code looked like this
cfg, err := rest.InClusterConfig()
if err != nil {
log.Fatal("Empty getting cluster configuration")
}
// Create the dynamic client from the same rest config
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
log.Fatalf("Errpr creating dynamic client %v", err)
}
version := "v2beta2"
if shared.IsApiVersionSupported(clientSet, "autoscaling", "v2") {
version = "v2"
}
gvr := schema.GroupVersionResource{
Group: "autoscaling",
Version: version,
Resource: "horizontalpodautoscalers",
}
dynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, metav1.NamespaceAll, nil)
hpaDynamicInformer := dynamicInformerFactory.ForResource(gvr)
hpaLister := hpaDynamicInformer.Lister()
...
// reconcileHorizontalPodAutoscaler reconciles the K8s HPA 'h' beta.
func (r *Reconciler) reconcileHorizontalPodAutoscaler(ctx context.Context, h interface{}) error {
var namespace, name string
var spec interface{}
unstructuredHPA, err := r.hpaLister.ByNamespace(namespace).Get(name)
unstructuredObj, ok := unstructuredHPA.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("failed to convert runtime.Object to *unstructured.Unstructured")
}
if apierrs.IsNotFound(err) {
// create HPA
switch h := h.(type) {
case *v2beta2.HorizontalPodAutoscaler:
_, err = r.kubeClientSet.AutoscalingV2beta2().HorizontalPodAutoscalers(namespace).Create(ctx, h, metav1.CreateOptions{})
case *v2.HorizontalPodAutoscaler:
_, err = r.kubeClientSet.AutoscalingV2().HorizontalPodAutoscalers(namespace).Create(ctx, h, metav1.CreateOptions{})
}
if err != nil {
return err
}
} else if err != nil {
return err
} else {
// update HPA if necessary
current, err := shared.ConvertUnstructuredToHPA(unstructuredObj)
if err != nil {
return err
}
switch h := h.(type) {
case *v2beta2.HorizontalPodAutoscaler:
currentHPA, ok := current.(*v2beta2.HorizontalPodAutoscaler)
if !ok {
return fmt.Errorf("error converting unstructured to v2beta2.HorizontalPodAutoscaler")
}
if !equality.Semantic.DeepDerivative(spec, currentHPA.Spec) {
desired := currentHPA.DeepCopy()
desired.Spec = h.Spec
_, err = r.kubeClientSet.AutoscalingV2beta2().HorizontalPodAutoscalers(desired.Namespace).Update(ctx, desired, metav1.UpdateOptions{})
}
case *v2.HorizontalPodAutoscaler:
currentHPA, ok := current.(*v2.HorizontalPodAutoscaler)
if !ok {
return fmt.Errorf("error converting unstructured to v2.HorizontalPodAutoscaler")
}
if !equality.Semantic.DeepDerivative(spec, currentHPA.Spec) {
desired := currentHPA.DeepCopy()
desired.Spec = h.Spec
_, err = r.kubeClientSet.AutoscalingV2().HorizontalPodAutoscalers(desired.Namespace).Update(ctx, desired, metav1.UpdateOptions{})
}
}
if err != nil {
return err
}
}
return nil
}
Only to end up on the exact same error I'm not sure where exactly those versions are called in runtime, even having a conditional to only lookup for the resources of the versions supported on the cluster it is running
Any idea how to achieve this?
Knative release version
v0.37.1