How can I auto-generate a pulsar AvroSchema class from an existing model?

437 Views Asked by At

I'm running Apache Pulsar schemaless where the content is often changing. Now, there is some specific data for which I've written "data" classes (derived of SQLModel, which doesn't really matter in this case). Since these models (data classes) are already available and will always lead the way, I wanted to use them for Apache Pulsar too.

I want to use pulsar.schema.avro_schema.AvroSchema over JsonSchema because the models are complex. I am very certain I'm not the first to run into this requirement.

I searched the internet but could not find a library, which

  • copies the existing model (with all the fields and sub classes during run-time) into a pulsar.schema.record.Record
  • and then populates the values of the existing instance of the model into that Pulsar Schema Record.

I really want to avoid the manual process, which would look like this: I would specify the NewMetering_AVRO, which is derived of the NewMetering model.

class NewMetering_AVRO(Record):
    id = Integer()
    speed = Float()
    light = Float()

Then create the producer.

producer = client.create_producer(
            topic='persistent://my_tenant/my_namespace/new_metering',
            schema=AvroSchema(NewMetering_AVRO),
            )

Right after I would have to build the object in a manual way and finally send the content.

new_metering_AVRO_record = NewMetering_AVRO()
new_metering_AVRO_record.id = uuid.uuid4()
new_metering_AVRO_record.speed = new_metering.speed
new_metering_AVRO_record.light = new_metering.light
producer.send(content=new_metering, partition_key=new_metering.id)

This manual process is error-prone and cumbersome. I really don't want to go down this road.

Anyone any hints on libraries or approaches? Otherwise I'm going to write my own public library.

1

There are 1 best solutions below

0
On

The only challenge is to translate the python class to a Avro schema in json format (schema definition). The Avro schema is then passed to Pulsar's AvroSchema class. There is a library for this: How can I create an Avro schema from a python class?

Not only does this Schema generator translates regular python classes. It can also translate pydantic and SQLModel classes (based on pydantic and sqlalchemy).