I wrote a consumer in Python as below:
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io
# To consume messages
consumer = KafkaConsumer('test',
group_id='',
bootstrap_servers=['kafka:9092'])
schema = """
{
"namespace":"com.martinkl.bottledwater.dbschema.public",
"type":"record",
"name":"test",
"fields":[
{"name":"id","type":["int", "null"]},
{"name":"value","type":["string", "null"]}
]
}
"""
schema = avro.schema.parse(schema)
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
hello = reader.read(decoder)
print hello
Everything seems OK, but when i run insert data to Postgres:
postgres=# insert into test (value) values('hello world!');
The output of consumer is empty:
$ python consumer_bottledwater-pg.py
{u'id': 0, u'value': u''}
Please help me to fix it out. Thank you in advance.
The Avro-encoded messages that Bottled Water publishes to Kafka are prefixed with a 5-byte header. The first byte is always zero (reserved for future use), and the next 4 bytes are a big-endian 32-bit number indicating the ID of the schema.
In your example you've hard-coded the schema in the Python application, but that approach would break down as soon as the upstream database schema changes. That's why Bottled Water is best used in conjunction with a schema registry. When you read a message from Kafka you first decode the header to find the schema ID, and if you haven't seen that schema ID before, you query the registry to find the schema. Then you can decode the rest of the message using that schema. Schemas can be cached in the consumer, since the registry guarantees that the schema for a particular ID is immutable.
You can also look at the code of the KafkaAvroDeserializer that ships with the schema registry, to see how this decoding is done in Java. You can do the same in Python.