Rebus can't serialize message that is published with dotnet CAP library in RabbitMQ

60 Views Asked by At

I am using dotnet CAP library for publishing events and Rebus for reading events from RabbitMQ. Publishing with CAP works, but I have problem with reading with Rebus (works perfectly with CAP and RabbitMQ.Client libraries).

So, publishing is configured:

        builder.Services.AddCap(options =>
        {
            options.FailedRetryInterval = capConfig.FailedRetryIntervalSeconds;
            options.FailedRetryCount = capConfig.FailedRetryCount;

            options.UseEntityFramework<MyDbContext>();

            options.UseRabbitMQ(options =>
            {
                options.HostName = rabbitMQConfig.Host;
            });
        });

        builder.Services.AddScoped<IMyPublisher, MyPublisher>();

and MyPublisher has Publish method where MyEvent.NAME is full name of the class with namespace:

public async Task Publish(MyEvent myEvent)
    {
        using (var transaction = _dbContext.Database.BeginTransaction(_capPublisher, autoCommit: false))
        {
            try
            {
                //some transaction logic
                await _capPublisher.PublishAsync(MyEvent.NAME, myEvent);

                await transaction.CommitAsync();
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, ex.Message);
                await transaction.RollbackAsync();
            }
        }
    }

I publish this by injecting IMyPublisher and calling Publish method.

Event is read perfectly with CAP:

    [HttpPost("CAPUNUSED")]
    [CapSubscribe(MyEvent.NAME)]
    public async Task TestCAPSubscribe(MyEvent myEvent)
    {
        await Console.Out.WriteLineAsync($"{myEvent.Id}");
    }

On the other side, I am having problem with deserialization of the message using Rebus. (Tried with couple of versions, same problem, but currenlty using Rebus with version 8.2.2).

...(other nonRebus setup)
 services.AddRebus(configure => configure
    .Transport(c =>
    {
        c.UseRabbitMq(rabbitMqConfig.ConnectionString, rabbitMqConfig.QueueName)
            .InputQueueOptions(o =>
            {
                o.SetAutoDelete(false);
                o.AddArgument("x-message-ttl", rabbitMqConfig.TimeToLive);
            });
    })
        //.Serialization(s => s.Register(c => new CustomJsonSerializer())));
        //.Serialization(s => s.UseNewtonsoftJson()));

services.AutoRegisterHandlersFromAssemblyOf<MyEventHandler>();

var bus = services.BuildServiceProvider().GetRequiredService<IBus>();
await bus.Subscribe<MyEvent>();
...(other nonRebus setup)

and my (testing) handler:

public class MyEventHandler : IHandleMessages<MyEvent>
{
    public Task Handle(MyEvent message)
    {
        Console.WriteLine("IT WORKS");

        return Task.CompletedTask;
    }
}

So, 3 cases:

  1. without explicit serializer (looks like System.Text.Json is by default) I receive in error queue the following exception:
5 unhandled exceptions: 2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)

2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)

2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)

2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)

2/5/2024 11:43:03 AM +01:00: System.FormatException: Could not deserialize JSON text: '{"Id":34282,"CollabInfoId":132,"Application":"application","CompanyId":"d6b4f3bc-0f15-46a1-af86-b3dd369158c4","Name":"Name","Extension":"Extension","SizeQuantity":123,"Path":"path","Type":"Type","CreatedBy":"aecdbdc0-8cbc-48e7-a97c-6d91c1b01a6c","DateCreated":"0001-01-01T00:00:00","DateModified":"0001-01-01T00:00:00","FailedIterationsCounter":0,"MaxIterationsNumber":0}'
---> System.ArgumentNullException: Value cannot be null. (Parameter 'returnType')
at System.Text.Json.ThrowHelper.ThrowArgumentNullException(String parameterName)
at System.Text.Json.JsonSerializer.Deserialize(String json, Type returnType, JsonSerializerOptions options)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
--- End of inner exception stack trace ---
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(String bodyString, Type type)
at Rebus.Serialization.Json.SystemTextJsonSerializer.GetMessage(TransportMessage transportMessage, Encoding bodyEncoding)
at Rebus.Serialization.Json.SystemTextJsonSerializer.Deserialize(TransportMessage transportMessage)
at Rebus.Compression.UnzippingSerializerDecorator.Deserialize(TransportMessage transportMessage)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
  1. with .Serialization(s => s.UseNewtonsoftJson()));
1 unhandled exceptions: 2/5/2024 11:46:54 AM +01:00: Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID knuth-13090383562881270373 and type Newtonsoft.Json.Linq.JObject, Newtonsoft.Json could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)
at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)
at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)
  1. with following code for explicit serializer (.Serialization(s => s.Register(c => new CustomJsonSerializer())));), it works, but I don't want to hardcode serialization process
    public class CustomJsonSerializer : ISerializer
    {
        private readonly JsonSerializerOptions _jsonSerializerOptions;
    
        public CustomJsonSerializer()
        {
            _jsonSerializerOptions = new JsonSerializerOptions
            {
                PropertyNameCaseInsensitive = true,
                PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
            };
        }
    
        public Task<Message> Deserialize(TransportMessage transportMessage)
        {
            if (transportMessage == null)
                throw new ArgumentNullException(nameof(transportMessage));
    
            var jsonString = Encoding.UTF8.GetString(transportMessage.Body);
    
            //works with and without serialization options
            var body = JsonSerializer.Deserialize(jsonString, typeof(MyEvent), _jsonSerializerOptions);
            //var body = JsonSerializer.Deserialize(jsonString, typeof(MyEvent));
    
            var message = new Message(transportMessage.Headers, body);
    
            return Task.FromResult(message);
        }
    
        public Task<TransportMessage> Serialize(Message message)
        {
            throw new NotImplementedException();
        }
    }

I have tried different serialization methods. System.Text.Json (provided exception), Newtonsoft (provided exception) and custom serializer, but I would like to avoid custom serializers since I would like other system to be able to use my event without explicit definition of serializer. Any advice?

0

There are 0 best solutions below