I'm working on Argo Kafka Eventsource and not able to set up SASL_SSL with it.
Below is the EventSource manifest
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: kafka
namespace: MY_NAMESPACE
spec:
eventBusName: MY_EVENTBUS
kafka:
kafka-event:
# kafka broker url
url: MY_BROKER_URL
# name of the kafka topic
topic: MY_TOPIC
# jsonBody specifies that all event body payload coming from this
# source will be JSON
jsonBody: true
# partition id
# partition: "1"
# optional backoff time for connection retries.
# if not provided, default connection backoff time will be used.
connectionBackoff:
# duration in nanoseconds, or strings like "3s", "2m". following value is 10 seconds
duration: 10s
# how many backoffs
steps: 5
# factor to increase on each step.
# setting factor > 1 makes backoff exponential.
factor: 2
jitter: 0.2
# Use a consumer group, if this is used you do not need to specify a "partition: <id>"
consumerGroup:
groupName: argo-test-group
oldest: false
rebalanceStrategy: range
limitEventsPerSecond: 1
version: "2.5.0"
# Enable SASL authentication (not to be used with TLS)
sasl:
mechanism: SCRAM-SHA-512
passwordSecret:
key: password
name: kafka-sasl-creds
userSecret:
key: username
name: kafka-sasl-creds
But I'm gettting the below error
{"level":"info","ts":1679338599.3659816,"logger":"argo-events.eventsource","caller":"kafka/start.go:81","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"kafka-event"}
{"level":"error","ts":1679338600.1314366,"logger":"argo-events.eventsource","caller":"kafka/start.go:122","msg":"Error creating consumer group client: kafka: client has run out of available brokers to talk to: unexpected EOF","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"kafka-event","stacktrace":"github.com/argoproj/argo-events/eventsources/sources/kafka.(*EventListener).consumerGroupConsumer\n\t/home/runner/work/argo-events/argo-events/eventsources/sources/kafka/start.go:122\ngithub.com/argoproj/argo-events/eventsources/sources/kafka.(*EventListener).StartListening\n\t/home/runner/work/argo-events/argo-events/eventsources/sources/kafka/start.go:87\ngithub.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).run.func3.1\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:503\ngithub.com/argoproj/argo-events/common.DoWithRetry.func1\n\t/home/runner/work/argo-events/argo-events/common/retry.go:106\nk8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:220\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:233\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:226\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:421\ngithub.com/argoproj/argo-events/common.DoWithRetry\n\t/home/runner/work/argo-events/argo-events/common/retry.go:105\ngithub.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).run.func3\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:502"}
But when I'm adding the below to the manifests to skip TLS verify it works
tls:
insecureSkipVerify: true
However, I want don't want to make it insecure.
Can someone please help me here to figure out what I'm doing wrong ?
Thanks
Argo Events Version: v1.7.6
I have got it working now. So, posting the solution in case someone face the same issue. I found the documentation as well as the error misleading. I had to get into the source code to figure that out.
Issue: Basically the issue is with Kafka connection. I have private certs in kafka. That's why, If I don't enable insecureSkipVerify and don't provide CA cert, it fails to establish the connection with kafka. So, all we need to do is pass our private CA along with sasl creds.
Fix: Now, we can do that using tls and sasl block together as there's no validation logic for that and we can pass our ca into tls block along with sasl. This way we don't need to enable insecureSkipVerify and pass our CA cert. You need to create the k8s secrets beforehand
es-cacert-secret.yaml
es-sasl-secret.yaml
EventSource.yaml