I'm trying to put up a Flink installation with High Availability and Zookeeper, using Docker compose.
Without High Availability enabled everything works smooth, but if i enable Zookeeper i get this exception while the task manager starts:
2023-12-19 20:01:49,999 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Registration at ResourceManager failed due to an error
flink-taskmanager-1 | java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message RemoteFencedMessage(00000000000000000000000000000000, RemoteRpcInvocation(ResourceManagerGateway.registerTaskExecutor(TaskExecutorRegistration, Time))) because the fencing token 00000000000000000000000000000000 did not match the expected fencing token 891fa0b72e1acee18bf55f03e07941c1.
This is my docker-compose.yml file:
version: "2.2"
services:
jobmanager:
image: flink:java8
volumes:
- /home/ubuntu/flink/jobmanager/:/tmp/
- /home/ubuntu/flink/jobmanager/:/opt/flink/flink-web
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
blob.server.port: 6124
high-availability.zookeeper.client.acl: open
zookeeper.sasl.disable: true
spec.job.savepointsDir: /tmp/savepoints
spec.job.restartPolicy: FromSavepointOnFailure
jobmanager.archive.fs.dir: /tmp/archive
web.upload.dir: /opt/flink/flink-web
high-availability.type: zookeeper
high-availability.jobmanager.port: 6123
high-availability.zookeeper.quorum: zookeeper:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster
high-availability.storageDir: file:///tmp/recovery
taskmanager:
image: flink:java8
depends_on:
- jobmanager
command: taskmanager
scale: 1
volumes:
- /home/ubuntu/flink/taskmanager/:/tmp/
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
blob.server.port: 6124
taskmanager.numberOfTaskSlots: 5
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
volumes:
- /home/ubuntu/flink/zookeeper-data:/var/lib/zookeeper/data
- /home/ubuntu/flink/zookeeper-logs:/var/lib/zookeeper/log
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
- 9092:9092
volumes:
- /home/ubuntu/flink/kafka-data:/var/lib/kafka/data
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://10.11.5.101:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
akhq:
image: tchiotludo/akhq
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "100k"
max-file: "5"
depends_on:
- kafka
ports:
- 9080:8080
volumes:
- /home/ubuntu/flink/akhq/application.yml:/app/application.yml
Before this exception the log is clear from other exceptions. The TaskManager tries to connect n times then shuts down. I tried also to restart the JobManager or TaskManager after Zookeeper completely boot but nothing changes.
I'm using a single Zookeeper node, the focus is to have a Flink JobManager where load different Flink Jobs and try to crash/recovery it to develop a monitoring/recovery procedure, but i'm not able at this time to bootstrap a workink TaskManager.
Thanks, Alessio