I am new to Pulsar and I am just exploring the functionalities for a new project. I am trying a very basic example to send data from a producer based on schema. To give some background, my idea is to send the data from apache-pulsar to Clickhouse database. I have completed the setup of sink connector and have validated the same using the commands below
bin/pulsar-admin sinks status --tenant public --namespace default --name jdbc-clickhouse-sink
bin/pulsar-admin sinks list --tenant public --namespace default Output: [ "jdbc-clickhouse-sink" ]
So I have a table created in Clickhouse DB. I want the data to be sent to a topic which should be saved in the database. In doing so, I want to keep the schema consistent and so I want to setup a schema. Sample Code below
import pulsar
from pulsar.schema import *
class Example(Record):
a = Integer()
b = Integer()
c = Integer()
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
topic='my-topic',
schema=AvroSchema(Example) )
producer.send(Example( a=444 , b=62, c=999 ))
When I run the above code, I get the following error
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-114-3b0aa7d0415f> in <module>
9
10 client = pulsar.Client('pulsar://localhost:6650' class="ansi-blue-fg">)
---> 11 producer = client.create_producer(
12 topic='my-topic',
13 schema=AvroSchema(Example) )
~/opt/anaconda3/lib/python3.8/site-packages/pulsar/__init__.py in
create_producer(self, topic, producer_name, schema, initial_sequence_id,
send_timeout_millis, compression_type, max_pending_messages,
max_pending_messages_across_partitions, block_if_queue_full, batching_enabled,
batching_max_messages, batching_max_allowed_size_in_bytes,
batching_max_publish_delay_ms, message_routing_mode, properties, batching_type)
560
561 p = Producer()
--> 562 p._producer = self._client.create_producer(topic, conf)
563 p._schema = schema
564 return p
Exception: Pulsar error: IncompatibleSchema
Can someone help what am I missing here
Make sure you have Pulsar Python client installed with avro
pip3 install fastavro pip3 install pytz pip3 install pulsar-client[avro]
Check out my python example here with schema https://github.com/tspannhw/FLiP-Pi-Weather/blob/main/weather.py
Check out my examples https://github.com/tspannhw/FLiP-Stream2Clickhouse
Check your schema bin/pulsar-admin schemas get persistent://public/default/my-topic
Python Docs https://pulsar.apache.org/api/python/ https://pulsar.apache.org/api/python/schema/schema.m.html#pulsar.schema.schema.AvroSchema
Features available per Client for Pulsar https://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit
You may need to generate the class from an actual Avro Schema file, that is what is usually done in Java.
See this example:
https://github.com/ta1meng/pulsar-python-avro-schema-examples
If you don't need Avro, the JsonSchema does not require this extra step