I am running a faust application that receives protobuf messages over a Kafka topic and inserts them into a tumbling window table with the user id as the key and value as the faust record containing the counts, user Id, and logged time( in seconds). On subsequent updates, I merge the incoming count object with the existing object and update the table. Expired records, emitted from the tumbling window, are inserted into a table.
The issue here is that I see a few records written into the table, on expiry, are not emitted. Since the table is relative_to_field, I checked the log time in the faust record and it is on time. I see this issue only in production. In local everything works fine.
Please Note:
- Incoming messages on the topic are in protobuf format. Using a codec (CountsProtoCodec), I convert the incoming protobuf to a faust record.
- The store is rocksdb.
- Faust version is 1.10.4
Please help me here. Below is the sample code snippet I have used.
class Count(faust.Record, serializer=‘json’):
def __abstract_init__(self) -> None:
pass
user_id: str = “”
counts: int = 0
log_time_in_sec: float = 0
def window_processor(event_key, event):
batch_start = current_time_ms()
try:
except Exception as e:
logger.error(f’window_processor,{event.user_id} :: Error {e}’, exc_info=True)
pass
app.conf.table_cleanup_interval = 1.0
app.conf.table_key_index_size = 100000
faust.serializers.codecs.register(‘counts_proto’, CountsProtoCodec())
countsProto_schema = faust.Schema(
key_type=str,
value_type=bytes,
value_serializer=‘counts_proto’
)
topic = app.topic(“counts-topic”, schema=countsProto_schema,
partitions=table_partition)
tumbling_table = (
app.Table(
‘combined-table’,
default=Count,
key_type=str,
value_type=Count,
partitions=table_partition,
on_window_close=window_processor,
).tumbling(900, expires=1).relative_to_field(Count.log_time_in_sec)
)
@app.agent(topic)
async def counts_message(msgs):
async for key, message in msgs.items():
try:
if message.counts > 0:
windowSet = tumbling_table.get(key, None)
prev_count = windowSet.value()
message.counts += prev_count.counts
tumbling_table[key] = message
except Exception as e:
logger.error(f’window_processor,{event.user_id} :: Error {e}’, exc_info=True)
pass