I have simple faust agent. It consumes jsons from kafka topic, and parses them to dicts by default faust serializator:
@app.agent(source_topic, sink=[destination_topic])
async def fetch(records):
async for record in records:
result = do_some_stuff(record)
yield result
The deserialization itself happens somewhere outside my code, it managed by faust framework, not by me. How I can catch and process deserialization exceptions, e.g in case of invalid json?
You could manually serialize your data and catch your error:
source: https://faust.readthedocs.io/en/latest/userguide/models.html