I am running Python 3.9 with Confluent Kafka 1.7.0, avro-python3 1.10.0 and fastavro 1.4.1.
The following code uses Avro schema encoder in order to encode a message, which succeeds only if we transform the resulting schema encoding by getting rid of the MappingProxyType
:
from confluent_kafka import Producer
from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer
from fastavro.schema import parse_schema
from fastavro.validation import validate
from types import MappingProxyType
from typing import Any
import sys
def transformMap(item: Any) -> Any:
if type(item) in {dict, MappingProxyType}:
return {k:transformMap(v) for k,v in item.items()}
elif type(item) is list:
return [transformMap(v) for v in item]
else:
return item
def main(argv = None):
msgType = 'InstrumentIdMsg'
idFigi = 'BBG123456789'
head = {'sDateTime': 1, 'msgType': msgType, 'srcSeq': 1,
'rDateTime': 1, 'src': 'Brownstone', 'reqID': None,
'sequence': 1}
msgMap = {'head': head, 'product': 'Port', 'idIsin': None, 'idFigi': idFigi,
'idBB': None, 'benchmark': None, 'idCusip': None,'idCins': None}
registryClient = CachedSchemaRegistryClient(url = 'http://local.KafkaRegistry.com:8081')
schemaId, schema, version = registryClient.get_latest_schema(msgType)
serializer = MessageSerializer(registry_client = registryClient)
schemaMap = schema.to_json()
# NOTE:
# schemaMap cannot be used since it uses mappingproxy
# which causes validate() and parse_schema() to throw
schemaDict = transformMap(schemaMap)
isValid = validate(datum = msgMap, schema = schemaDict, raise_errors = True)
parsed_schema = parse_schema(schema = schemaDict)
msg = serializer.encode_record_with_schema_id(schema_id = schemaId,
record = msgMap)
producer = Producer({'bootstrap.servers': 'kafkaServer:9092'})
producer.produce(key = idFigi,
topic = 'TOPIC_NAME',
value = msg)
return 0
if __name__ == '__main__':
sys.exit(main())
The transformation basically leaves everything unchanged except altering MappingProxyType
to dict
instances.
- Is there a problem in the way I am calling the standard library which causes mapping proxy to be used, which in turn causes
fastavro
to throw? Can this be fixed by something as a user, or is this really a bug in the Confluent Kafka library? - In addition, the output
schemaId
fromregistryClient.get_latest_schema()
is marked in the docs to returnstr
but returnsint
. If I understand correctly, this is the intended input into theschema_id
parameter ofserializer.encode_record_with_schema_id()
(and it works correctly if I call it), which is also marked asint
. Is that a typo in the docs? In other words, it seems eitherregistryClient.get_latest_schema()
should return an integer, orserializer.encode_record_with_schema_id()
should take a string, or I am doing something incorrectly :) Which one is it?
Thank you very much.