I want to count the number of messages in a given Kafka topic between two timestamps. I tried doing this using kafkacat, using the following command:
# START_DATE = 01.04.2022 02:00:00Z
# END_DATE = 01.04.2022 02:05:00Z
$ kafkacat -C -b broker:9092 -t mytopic -o s@1648778400000 -o e@1648778700000 -p 0 -f '[ts %T] [partition %p] [offset %o] %k\n' -e -c 1
In fact, this is the same approach that is listed as the answer in a very similar question.
According to kafkacat --help:
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
s@<value> (timestamp in ms to start at)
e@<value> (timestamp in ms to stop at (not included))
Correspondingly, I would expect the above command to give me the first record that has a timestamp greater than s@<value> and smaller than e@<value>. However, it instead gives me a record that has a timestamp prior to s@<value> (in fact, it just gives me the first record in partition 0):
# output of above command
[ts 1648692486141] [partition 0] [offset 2] 643b0013-b3e1-47a5-a9d3-7478c0e91ca4
Am I misunderstanding the consumer options s@<value> and e@<value>?
Kafkacat version:
Version 1.5.0 (JSON, librdkafka 1.2.1 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer)
Additionally, I'm seeing some odd behaviour even with just s@<value>. For example:
kafkacat -C -b broker:9092 -t mytopic -o s@1648778400000 -p 0 -f '[ts %T] [partition %p] [offset %o] %k\n' -e -c 1
should, as I understand it, output the first record with record.timestamp ≥ 1648778400000. The actual output is different:
[ts 1648692486141] [partition 0] [offset 2] 643b0013-b3e1-47a5-a9d3-7478c0e91ca4
and contains a timestamp prior to the one I set (31.03.2022 02:08:06Z vs. 01.04.2022 02:00:00Z).
This output is the same when I tested using docker run edenhill/kcat:1.7.1 (the above was an Ubuntu kafkacat)
I don't think you can provide
-omultiple times. Therefore, your options includeTo read one message from partition 0, which is less than timestamp 1648778700000
To properly consume between timestamps, find the offsets for the start timestamp, commit them to a consumer group, then start a consumer in the group with your end timestamp