How to debug kafka in kubernetes (minikube) using telepresence

597 Views Asked by At

I have Kafka running inside Minikube. Kafka works fine, but I want to use it in my Java app that is located on my local machine (not minikube). I know that it's possible to proxy requests from a local machine to a kubernetes cluster using Telepresence.

I'm trying to run telepresence intercept -n kafka kafka-service 9092:9092, but the error occurs telepresence: error: workload "kafka-service.kafka" not found. I know the actual addresses of kafka brokers look like kafka-0.kafka-service.kafka.svc.cluster.local:9092. However, I don't understand how to make it work.

kafka.yaml

apiVersion: v1
kind: Namespace
metadata:
  name: kafka
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-pv-volume
  labels:
    type: local
spec:
  storageClassName: manual
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteOnce
  hostPath:
    path: '/mnt/data'
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kafka-pv-claim
  namespace: kafka
spec:
  storageClassName: manual
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 3Gi
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  namespace: kafka
  labels:
    app: kafka
spec:
  selector:
    app: kafka
  clusterIP: None
  ports:
    - port: 9092
      protocol: TCP
      targetPort: 9092
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka
  namespace: kafka
spec:
  serviceName: kafka-service
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      volumes:
        - name: kafka-storage
          persistentVolumeClaim:
            claimName: kafka-pv-claim
      containers:
        - name: kafka-container
          image: evmusonov/kafka-kraft
          resources:
          ports:
            - containerPort: 9092
            - containerPort: 9093
          env:
            - name: REPLICAS
              value: '3'
            - name: SERVICE
              value: kafka-service
            - name: NAMESPACE
              value: kafka
            - name: SHARE_DIR
              value: /mnt/kafka
          volumeMounts:
            - name: kafka-storage
              mountPath: /mnt/kafka

Dockerfile:

FROM openjdk:11

ENV KAFKA_VERSION=3.2.0
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin

LABEL name="kafka" version=${KAFKA_VERSION}

RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
 && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
 && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
 && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
 && rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz

COPY ./entrypoint.sh /
RUN ["chmod", "+x", "/entrypoint.sh"]
ENTRYPOINT ["/entrypoint.sh"]

And the entrypoint.sh script:

#!/bin/bash

NODE_ID=${HOSTNAME:6}
LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093"
ADVERTISED_LISTENERS="PLAINTEXT://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:9092"

CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS); do
    if [[ $i != $REPLICAS ]]; then
        CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
    else
        CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
    fi
done

mkdir -p $SHARE_DIR/$NODE_ID

if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
    CLUSTER_ID=$(kafka-storage.sh random-uuid)
    echo $CLUSTER_ID > $SHARE_DIR/cluster_id
else
    CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
fi

sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
-e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \
-e "s+^listeners=.*+listeners=$LISTENERS+" \
-e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
-e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
/opt/kafka/config/kraft/server.properties > server.properties.updated \
&& mv server.properties.updated /opt/kafka/config/kraft/server.properties

kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties

exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties

Spring-boot application.properties

spring.kafka.producer.bootstrap-servers = kafka-0.kafka-service.kafka.svc.cluster.local:9092,kafka-1.kafka-service.kafka.svc.cluster.local:9092,kafka-2.kafka-service.kafka.svc.cluster.local:9092
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer

spring.kafka.consumer.bootstrap-servers = kafka-0.kafka-service.kafka.svc.cluster.local:9092,kafka-1.kafka-service.kafka.svc.cluster.local:9092,kafka-2.kafka-service.kafka.svc.cluster.local:9092
spring.kafka.consumer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id = "custom"
0

There are 0 best solutions below