Flink HA with Zookeeper and Docker Compose: FencingTokenException, Fencing token mismatch

108 Views Asked by At

I'm trying to put up a Flink installation with High Availability and Zookeeper, using Docker compose.

Without High Availability enabled everything works smooth, but if i enable Zookeeper i get this exception while the task manager starts:

2023-12-19 20:01:49,999 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Registration at ResourceManager failed due to an error
flink-taskmanager-1  | java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message RemoteFencedMessage(00000000000000000000000000000000, RemoteRpcInvocation(ResourceManagerGateway.registerTaskExecutor(TaskExecutorRegistration, Time))) because the fencing token 00000000000000000000000000000000 did not match the expected fencing token 891fa0b72e1acee18bf55f03e07941c1.

This is my docker-compose.yml file:

version: "2.2"
services:
  jobmanager:
    image: flink:java8
    volumes:
      - /home/ubuntu/flink/jobmanager/:/tmp/
      - /home/ubuntu/flink/jobmanager/:/opt/flink/flink-web
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        jobmanager.rpc.port: 6123
        blob.server.port: 6124
        high-availability.zookeeper.client.acl: open
        zookeeper.sasl.disable: true
        spec.job.savepointsDir: /tmp/savepoints
        spec.job.restartPolicy: FromSavepointOnFailure
        jobmanager.archive.fs.dir: /tmp/archive
        web.upload.dir: /opt/flink/flink-web
        high-availability.type: zookeeper
        high-availability.jobmanager.port: 6123
        high-availability.zookeeper.quorum: zookeeper:2181
        high-availability.zookeeper.path.root: /flink
        high-availability.cluster-id: /cluster
        high-availability.storageDir: file:///tmp/recovery
  taskmanager:
    image: flink:java8
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    volumes:
      - /home/ubuntu/flink/taskmanager/:/tmp/
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        jobmanager.rpc.port: 6123
        blob.server.port: 6124
        taskmanager.numberOfTaskSlots: 5
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
    volumes:
      - /home/ubuntu/flink/zookeeper-data:/var/lib/zookeeper/data
      - /home/ubuntu/flink/zookeeper-logs:/var/lib/zookeeper/log
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
      - 9092:9092
    volumes:
      - /home/ubuntu/flink/kafka-data:/var/lib/kafka/data
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://10.11.5.101:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  akhq:
    image: tchiotludo/akhq
    restart: unless-stopped
    logging:
      driver: "json-file"
      options:
        max-size: "100k"
        max-file: "5"
    depends_on:
      - kafka
    ports:
      - 9080:8080
    volumes:
      - /home/ubuntu/flink/akhq/application.yml:/app/application.yml

Before this exception the log is clear from other exceptions. The TaskManager tries to connect n times then shuts down. I tried also to restart the JobManager or TaskManager after Zookeeper completely boot but nothing changes.

I'm using a single Zookeeper node, the focus is to have a Flink JobManager where load different Flink Jobs and try to crash/recovery it to develop a monitoring/recovery procedure, but i'm not able at this time to bootstrap a workink TaskManager.

Thanks, Alessio

0

There are 0 best solutions below