Confluent Kafka python schema parser causes conflict with fastavro

752 Views Asked by At

I am running Python 3.9 with Confluent Kafka 1.7.0, avro-python3 1.10.0 and fastavro 1.4.1.

The following code uses Avro schema encoder in order to encode a message, which succeeds only if we transform the resulting schema encoding by getting rid of the MappingProxyType:

from confluent_kafka      import Producer
from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer

from fastavro.schema import parse_schema
from fastavro.validation import validate

from types  import MappingProxyType
from typing import Any

import sys

def transformMap(item: Any) -> Any:
    if type(item) in {dict, MappingProxyType}:
        return {k:transformMap(v) for k,v in item.items()}
    elif type(item) is list:
        return [transformMap(v) for v in item]
    else:
        return item

def main(argv = None):
    msgType = 'InstrumentIdMsg'
    idFigi  = 'BBG123456789'
    head = {'sDateTime': 1, 'msgType': msgType,      'srcSeq': 1,
            'rDateTime': 1, 'src':     'Brownstone', 'reqID':  None,
            'sequence':  1}
    msgMap = {'head': head, 'product':  'Port', 'idIsin': None, 'idFigi': idFigi,
              'idBB': None, 'benchmark': None,  'idCusip': None,'idCins': None}

    registryClient = CachedSchemaRegistryClient(url = 'http://local.KafkaRegistry.com:8081')
    schemaId, schema, version = registryClient.get_latest_schema(msgType)

    serializer = MessageSerializer(registry_client = registryClient)

    schemaMap = schema.to_json()

    # NOTE:
    # schemaMap cannot be used since it uses mappingproxy
    # which causes validate() and parse_schema() to throw
    schemaDict = transformMap(schemaMap)

    isValid = validate(datum = msgMap, schema = schemaDict, raise_errors = True)
    parsed_schema = parse_schema(schema = schemaDict)

    msg = serializer.encode_record_with_schema_id(schema_id = schemaId,
                                                  record    = msgMap)

    producer = Producer({'bootstrap.servers': 'kafkaServer:9092'})
    producer.produce(key = idFigi,
                     topic = 'TOPIC_NAME',
                     value = msg)
    return 0


if __name__ == '__main__':
    sys.exit(main())

The transformation basically leaves everything unchanged except altering MappingProxyType to dict instances.

  1. Is there a problem in the way I am calling the standard library which causes mapping proxy to be used, which in turn causes fastavro to throw? Can this be fixed by something as a user, or is this really a bug in the Confluent Kafka library?
  2. In addition, the output schemaId from registryClient.get_latest_schema() is marked in the docs to return str but returns int. If I understand correctly, this is the intended input into the schema_id parameter of serializer.encode_record_with_schema_id() (and it works correctly if I call it), which is also marked as int. Is that a typo in the docs? In other words, it seems either registryClient.get_latest_schema() should return an integer, or serializer.encode_record_with_schema_id() should take a string, or I am doing something incorrectly :) Which one is it?

Thank you very much.

0

There are 0 best solutions below