I am using dotnet CAP library for publishing events and Rebus for reading events from RabbitMQ. Publishing with CAP works, but I have problem with reading with Rebus (works perfectly with CAP and RabbitMQ.Client libraries).
So, publishing is configured:
builder.Services.AddCap(options =>
{
options.FailedRetryInterval = capConfig.FailedRetryIntervalSeconds;
options.FailedRetryCount = capConfig.FailedRetryCount;
options.UseEntityFramework<MyDbContext>();
options.UseRabbitMQ(options =>
{
options.HostName = rabbitMQConfig.Host;
});
});
builder.Services.AddScoped<IMyPublisher, MyPublisher>();
and MyPublisher has Publish method where MyEvent.NAME is full name of the class with namespace:
public async Task Publish(MyEvent myEvent)
{
using (var transaction = _dbContext.Database.BeginTransaction(_capPublisher, autoCommit: false))
{
try
{
//some transaction logic
await _capPublisher.PublishAsync(MyEvent.NAME, myEvent);
await transaction.CommitAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);
await transaction.RollbackAsync();
}
}
}
I publish this by injecting IMyPublisher and calling Publish method.
Event is read perfectly with CAP:
[HttpPost("CAPUNUSED")]
[CapSubscribe(MyEvent.NAME)]
public async Task TestCAPSubscribe(MyEvent myEvent)
{
await Console.Out.WriteLineAsync($"{myEvent.Id}");
}
On the other side, I am having problem with deserialization of the message using Rebus. (Tried with couple of versions, same problem, but currenlty using Rebus with version 8.2.2).
...(other nonRebus setup)
services.AddRebus(configure => configure
.Transport(c =>
{
c.UseRabbitMq(rabbitMqConfig.ConnectionString, rabbitMqConfig.QueueName)
.InputQueueOptions(o =>
{
o.SetAutoDelete(false);
o.AddArgument("x-message-ttl", rabbitMqConfig.TimeToLive);
});
})
//.Serialization(s => s.Register(c => new CustomJsonSerializer())));
//.Serialization(s => s.UseNewtonsoftJson()));
services.AutoRegisterHandlersFromAssemblyOf<MyEventHandler>();
var bus = services.BuildServiceProvider().GetRequiredService<IBus>();
await bus.Subscribe<MyEvent>();
...(other nonRebus setup)
and my (testing) handler:
public class MyEventHandler : IHandleMessages<MyEvent>
{
public Task Handle(MyEvent message)
{
Console.WriteLine("IT WORKS");
return Task.CompletedTask;
}
}
So, 3 cases:
- without explicit serializer (looks like System.Text.Json is by default) I receive in error queue the following exception:
5 unhandled exceptions: 2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
- with .Serialization(s => s.UseNewtonsoftJson()));
1 unhandled exceptions: 2/5/2024 11:46:54 AM +01:00: Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID knuth-13090383562881270373 and type Newtonsoft.Json.Linq.JObject, Newtonsoft.Json could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)
at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
- with following code for explicit serializer (.Serialization(s => s.Register(c => new CustomJsonSerializer())));), it works, but I don't want to hardcode serialization process
public class CustomJsonSerializer : ISerializer
{
private readonly JsonSerializerOptions _jsonSerializerOptions;
public CustomJsonSerializer()
{
_jsonSerializerOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};
}
public Task<Message> Deserialize(TransportMessage transportMessage)
{
if (transportMessage == null)
throw new ArgumentNullException(nameof(transportMessage));
var jsonString = Encoding.UTF8.GetString(transportMessage.Body);
//works with and without serialization options
var body = JsonSerializer.Deserialize(jsonString, typeof(MyEvent), _jsonSerializerOptions);
//var body = JsonSerializer.Deserialize(jsonString, typeof(MyEvent));
var message = new Message(transportMessage.Headers, body);
return Task.FromResult(message);
}
public Task<TransportMessage> Serialize(Message message)
{
throw new NotImplementedException();
}
}
I have tried different serialization methods. System.Text.Json (provided exception), Newtonsoft (provided exception) and custom serializer, but I would like to avoid custom serializers since I would like other system to be able to use my event without explicit definition of serializer. Any advice?