Good afternoon, I have only recently started working with Kafka and have a question about the producer in connection with the schema.
Initially I tried to build a simple producer without a schema in C#. This works so far, the code is also given in a shortened version.
Code of producer without schema:
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
BrokerAddressFamily = BrokerAddressFamily.V4,
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
producer.Flush();
for(int i = 0; i < 3; i++)
{
producer.Produce("topic", new Message<Null, string> { Value = "Value: " + i + "..." });
}
producer.Flush();
};
But the schema causes me problems (see next section).
Suppose I have given a consumer, say in Python, who uses the following scheme, to receive integer numbers:
{"type": "record",
"name": "Sample",
"fields": [{"name": "Value", "type": ["int"]}]
}
I now want to create a C# producer that uses this scheme and send messages to the Python consumer. The message should contain only numbers, according to the scheme.
I tried to build a producer that uses the schema, but unfortunately in many tutorials the "schema registry url" is necessary to run the producer. This is what I have, but unfortunately I cannot avoid the use of "schema registry url"...
Code of producer that uses the schema
using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry;
using System;
using System.Threading;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.IO;
namespace producer
{
class Program
{
static async Task Main(string[] args)
{
//This is not further specified, nothing is set up here
string schemaRegistryUrl = "localhost:8081";
var schema = (RecordSchema)RecordSchema.Parse(
@"{
""type"": ""record"",
""name"": ""Sample"",
""fields"": [
{""name"": ""Value"", ""type"": [""int""]}
]
}"
);
var config = new Confluent.Kafka.ProducerConfig
{
BootstrapServers = "localhost:9092",
BrokerAddressFamily = BrokerAddressFamily.V4,
};
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
using (var producer = new Confluent.Kafka.ProducerBuilder<Confluent.Kafka.Null, GenericRecord>(config)
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
.Build())
{
for(int i = 0; i < 3; i++)
{
var record = new GenericRecord(schema);
record.Add("Value", i);
producer.ProduceAsync("topic", new Confluent.Kafka.Message<Confluent.Kafka.Null, GenericRecord> { Value = record });
}
producer.Flush();
}
}
}
}
Here are two questions, how can one build a producer without the "schema registry url"? I have already found something like this (but unfortunately in Java). How would it look like in C#? I'm interested in a solution from a producer that sends numbers to the Python consumer using the scheme (from above), preferably without using the "schema registry url". However, I would also be interested in how to get the "schema registry url" to work.
Just as a hint: If the producer tries to send a message to the Python consumer without a schema, the consumer registers this. But the consumer cannot display the sent number, because the simple producer does not use a scheme. I refer to the code of the very first producer!
I hope that my question(s) are as far as understandable, I am looking forward to receiving answers!
The
confluent_kafka
Python library requires the data adheres to the Confluent Schema Registry wire format.This means, you're required to have the producer match that contract as well, so you cannot bypass the registry without writing your own implementation of the
new AvroSerializer
you've referenced in the codeThis doesn't happen in
confluent_kafka
library. The consumer fetches a schema from the registry based on the ID emedded in the message.Therefore, unless you're using some other deserializer, you'll need to write your own implementation there, as well