I'm using an Oracle DB to store some Avro messages of different schemas that represent some Kafka events in my application. Since there are multiple types of messages, I'm using a single table with a few columns and store their whole body in a CLOB column named BODY
. I now need to export all of these messages into different files that are valid Avro messages.
When I'm fetching these data from the DB using something like:
# Return string or bytes instead of a locator (for CLOBs e.g. event.body)
oracledb.defaults.fetch_lobs = False
...
cursor.execute(
'select body from EVENTS' +
(f" where EVENT_TYPE='{event_type}'" if event_type else "") +
" order by CREATED_ON"
)
for body in cursor:
filename = fr'./exported_events/event_{SOME_UNIQUE_ID}.avro'
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb+') as file:
fastavro.writer(
file,
fastavro_schema.load_schema(fr"./event_{event_type}.avsc"),
[body]
)
I get an error saying basically that body
is a bytes
object which it cannot iterate over.
This seems like I should serialize the body first before then deserializing it back in order to write it in the avro file.
Is there a way to avoid that?
PS. I have also tried converting the body
to a dict using json.dumps(body)
but the problem that emerges when doing this is that my message contains bytearrays
and timestamp-millis
types which are obviously wrongly converted by the json
module and then raise errors when validating with the schema.