MassTransit Kafka and Outbox: Getting No such configuration property: "schema.registry.url" Exception

99 Views Asked by At

I have setup MassTransit with Kafka and Outbox pattern

var confluentConfiguration = builder.Configuration.GetSection("Kafka").ToConfluentConfiguration();
builder.Services.AddSingleton<ISchemaRegistryClient>(new CachedSchemaRegistryClient(confluentConfiguration));
builder.Services.AddMassTransit(busConfigurator =>
{
     busConfigurator.AddEntityFrameworkOutbox<DatabaseContext>(outboxConfigurator =>
     {
        outboxConfigurator.DuplicateDetectionWindow = outboxOptions.DuplicateDetectionWindow;
        outboxConfigurator.IsolationLevel = outboxOptions.IsolationLevel;
        outboxConfigurator.QueryDelay = outboxOptions.QueryDelay;
        outboxConfigurator.QueryMessageLimit = outboxOptions.QueryMessageLimit;
        outboxConfigurator.QueryTimeout = outboxOptions.QueryTimeout;
            
        outboxConfigurator.UsePostgres();
        outboxConfigurator.UseBusOutbox();
     });

     busConfigurator.UsingInMemory(); // mass transit needs a bus, so we use the in-memory bus

     busConfigurator.AddRider(rider =>
     {
         rider.AddProducer<string, UserCreated>("user-created", (context, config) =>
         {
             config.SetValueSerializer(new ProtobufSerializer<UserCreated>(context.GetRequiredService<ISchemaRegistryClient>()));
         });
            
         rider.UsingKafka(new ProducerConfig(confluentConfiguration), (context, config) =>
         {
             config.UseSendFilter(typeof(CustomCloudEventsHeaderFilter<>), context);
         });
     });
});

Then, I inject ITopicProducer<string, UserCreated > userCreatedProducer into my minimal-api method and call await userCreatedProducer.Produce(...). This produces the following exception

System.InvalidOperationException: No such configuration property: "schema.registry.url"
   at Confluent.Kafka.Impl.SafeConfigHandle.Set(String name, String value)
   at Confluent.Kafka.Producer`2.<>c__DisplayClass56_0.<.ctor>b__5(KeyValuePair`2 kvp)
   at System.Collections.Generic.List`1.ForEach(Action`1 action)
   at Confluent.Kafka.Producer`2..ctor(ProducerBuilder`2 builder)
   at Confluent.Kafka.ProducerBuilder`2.Build()
   at MassTransit.KafkaIntegration.KafkaProducerContext..ctor(ProducerBuilder`2 producerBuilder, IHostConfiguration hostConfiguration, CancellationToken cancellationToken) in /_/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/KafkaProducerContext.cs:line 24
   at MassTransit.KafkaIntegration.ProducerContextFactory.<>c__DisplayClass7_0.<CreateProducer>g__Create|0(ClientContext clientContext, CancellationToken createCancellationToken) in /_/src/Transports/MassTransit.KafkaIntegration/KafkaIntegration/ProducerContextFactory.cs:line 55

the key schema.registry.url is set in confluentConfiguration (if not, the constructor of CachedSchemaRegistryClient is throwing already).

Anything else I am missing? Do I need to call config.Host(...)?

For completness, the appsettings.json:

"Kafka": {
    "bootstrap.servers": "localhost:9092",
    "security.protocol": "SaslPlaintext",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "***",
    "sasl.password": "***",
    "schema.registry.url": "http://localhost:8081"
  }

which is read as is using the ToConfluentConfiguration extension method.

1

There are 1 best solutions below

2
Chris Patterson On BEST ANSWER

Kafka doesn't know about the property schema.registry.url - so it's telling you about it via that exception.

Also, FYI, the MassTransit outbox doesn't work with Kafka. Only bus transports.