I am trying to rewind input feed for one of my samza jobs with checkpoint tool as described here and here. For some reason the checkpoint tool won't output offsets as promised, however I know for a fact that the job has already consumed more than a few messages from the partition in question.
This is a truncated version of the output that the checkpoint tool gives me:
2015-06-11 16:31:04 ZkClient [INFO] zookeeper state changed (SyncConnected)
2015-06-11 16:31:04 ZkEventThread [INFO] Terminate ZkClient event thread.
2015-06-11 16:31:04 ZooKeeper [INFO] Session: 0x14de25b502e01b4 closed
2015-06-11 16:31:04 ClientCnxn [INFO] EventThread shut down
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1 already exists.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Validating checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Successfully validated checkpoint topic __samza_checkpoint_ver_1_for_test-job1_1.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Reading checkpoint for taskName Partition 0
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] No TaskName to checkpoint mapping provided. Reading for first time.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Connecting to leader pavels-mbp.it.local:9092 for topic __samza_checkpoint_ver_1_for_test-job1_1 and to fetch all checkpoint messages.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Got offset 0 for topic __samza_checkpoint_ver_1_for_test-job1_1 and partition 0. Attempting to fetch messages for checkpoint log.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Get latest offset 31 for topic __samza_checkpoint_ver_1_for_test-job1_1 and partition 0.
2015-06-11 16:31:04 KafkaCheckpointManager [INFO] Got checkpoint state for taskName Partition 0: Checkpoint [offsets={}]
This is my test_job.properties
file:
# Job
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.name=test-job1
# Task
task.class=com.xim.test.TestTaskClass
task.inputs=kafka.EnergyPurchaseEvent
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.checkpoint.replication.factor=1
As you can see, checkpointing is enabled.