Faust Application Seeing messages dropped in Tumbling Window

264 Views Asked by At

I am running a faust application that receives protobuf messages over a Kafka topic and inserts them into a tumbling window table with the user id as the key and value as the faust record containing the counts, user Id, and logged time( in seconds). On subsequent updates, I merge the incoming count object with the existing object and update the table. Expired records, emitted from the tumbling window, are inserted into a table.

The issue here is that I see a few records written into the table, on expiry, are not emitted. Since the table is relative_to_field, I checked the log time in the faust record and it is on time. I see this issue only in production. In local everything works fine.

Please Note:

  1. Incoming messages on the topic are in protobuf format. Using a codec (CountsProtoCodec), I convert the incoming protobuf to a faust record.
  2. The store is rocksdb.
  3. Faust version is 1.10.4

Please help me here. Below is the sample code snippet I have used.

class Count(faust.Record, serializer=‘json’):
    def __abstract_init__(self) -> None:
        pass
    user_id: str = “”
    counts: int = 0
    log_time_in_sec: float = 0
def window_processor(event_key, event):
    batch_start = current_time_ms()
    try:
    except Exception as e:
        logger.error(f’window_processor,{event.user_id} ::  Error  {e}’, exc_info=True)
        pass
app.conf.table_cleanup_interval = 1.0
app.conf.table_key_index_size = 100000
faust.serializers.codecs.register(‘counts_proto’, CountsProtoCodec())
countsProto_schema = faust.Schema(
    key_type=str,
    value_type=bytes,
    value_serializer=‘counts_proto’
)
topic = app.topic(“counts-topic”, schema=countsProto_schema,
                           partitions=table_partition)
tumbling_table = (
    app.Table(
        ‘combined-table’,
        default=Count,
        key_type=str,
        value_type=Count,
        partitions=table_partition,
        on_window_close=window_processor,
    ).tumbling(900, expires=1).relative_to_field(Count.log_time_in_sec)
)
@app.agent(topic)
async def counts_message(msgs):
    async for key, message in msgs.items():
        try:
            if message.counts > 0:
                windowSet = tumbling_table.get(key, None)
                prev_count = windowSet.value()
                message.counts += prev_count.counts
                tumbling_table[key] = message
        except Exception as e:
        logger.error(f’window_processor,{event.user_id} ::  Error  {e}’, exc_info=True)
        pass 
0

There are 0 best solutions below