I have setup MassTransit with Kafka and Outbox pattern
var confluentConfiguration = builder.Configuration.GetSection("Kafka").ToConfluentConfiguration();
builder.Services.AddSingleton<ISchemaRegistryClient>(new CachedSchemaRegistryClient(confluentConfiguration));
builder.Services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddEntityFrameworkOutbox<DatabaseContext>(outboxConfigurator =>
{
outboxConfigurator.DuplicateDetectionWindow = outboxOptions.DuplicateDetectionWindow;
outboxConfigurator.IsolationLevel = outboxOptions.IsolationLevel;
outboxConfigurator.QueryDelay = outboxOptions.QueryDelay;
outboxConfigurator.QueryMessageLimit = outboxOptions.QueryMessageLimit;
outboxConfigurator.QueryTimeout = outboxOptions.QueryTimeout;
outboxConfigurator.UsePostgres();
outboxConfigurator.UseBusOutbox();
});
busConfigurator.UsingInMemory(); // mass transit needs a bus, so we use the in-memory bus
busConfigurator.AddRider(rider =>
{
rider.AddProducer<string, UserCreated>("user-created", (context, config) =>
{
config.SetValueSerializer(new ProtobufSerializer<UserCreated>(context.GetRequiredService<ISchemaRegistryClient>()));
});
rider.UsingKafka(new ProducerConfig(confluentConfiguration), (context, config) =>
{
config.UseSendFilter(typeof(CustomCloudEventsHeaderFilter<>), context);
});
});
});
Then, I inject ITopicProducer<string, UserCreated > userCreatedProducer into my minimal-api method and call await userCreatedProducer.Produce(...). This produces the following exception
System.InvalidOperationException: No such configuration property: "schema.registry.url"
at Confluent.Kafka.Impl.SafeConfigHandle.Set(String name, String value)
at Confluent.Kafka.Producer`2.<>c__DisplayClass56_0.<.ctor>b__5(KeyValuePair`2 kvp)
at System.Collections.Generic.List`1.ForEach(Action`1 action)
at Confluent.Kafka.Producer`2..ctor(ProducerBuilder`2 builder)
at Confluent.Kafka.ProducerBuilder`2.Build()
at MassTransit.KafkaIntegration.KafkaProducerContext..ctor(ProducerBuilder`2 producerBuilder, IHostConfiguration hostConfiguration, CancellationToken cancellationToken) in /_/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/KafkaProducerContext.cs:line 24
at MassTransit.KafkaIntegration.ProducerContextFactory.<>c__DisplayClass7_0.<CreateProducer>g__Create|0(ClientContext clientContext, CancellationToken createCancellationToken) in /_/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ProducerContextFactory.cs:line 55
the key schema.registry.url is set in confluentConfiguration (if not, the constructor of CachedSchemaRegistryClient is throwing already).
Anything else I am missing?
Do I need to call config.Host(...)?
For completness, the appsettings.json:
"Kafka": {
"bootstrap.servers": "localhost:9092",
"security.protocol": "SaslPlaintext",
"sasl.mechanism": "PLAIN",
"sasl.username": "***",
"sasl.password": "***",
"schema.registry.url": "http://localhost:8081"
}
which is read as is using the ToConfluentConfiguration extension method.
Kafka doesn't know about the property
schema.registry.url- so it's telling you about it via that exception.Also, FYI, the MassTransit outbox doesn't work with Kafka. Only bus transports.