Sending request reponse message on Artemis using C#

849 Views Asked by At

I am trying to implement a request response pattern in C# with the ArtemisNetClient, but having a bit of trouble finding out how to do so in a more generic way in a real solution.

I was able to do something like this in two console applications based on some Java examples:

Sender

static async System.Threading.Tasks.Task Main(string[] args)
{
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);

    string guid = new Guid().ToString();

    var requestAddress = "TRADE REQ1";
    var responseAddress = "TRADE RESP";

    Message message = new Message("BUY AMD 1000 SHARES");
    message.SetCorrelationId(guid);
    message.ReplyTo = responseAddress;

    var producer = await connection.CreateProducerAsync(requestAddress, RoutingType.Anycast);
    await producer.SendAsync(message);

    var consumer = await connection.CreateConsumerAsync(responseAddress, RoutingType.Anycast);
    var responseMessage = await consumer.ReceiveAsync();

    Console.WriteLine(responseMessage.GetBody<string>());
    
}

Receiver

static async System.Threading.Tasks.Task Main(string[] args)
{
    // Create connection
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);

    var requestAddress = "TRADE REQ1";

    // Create consumer to receive trade request messages
    var consumer = await connection.CreateConsumerAsync(requestAddress, RoutingType.Anycast);
    var message = await consumer.ReceiveAsync();

    Console.WriteLine($"Received message: {message.GetBody<string>()}");

    // Confirm trade request and ssend response message
    if (!string.IsNullOrEmpty(message.ReplyTo))
    {
        Message responseMessage = new Message("Confirmed trade request");
        responseMessage.SetCorrelationId(message.CorrelationId);
        var producer = await connection.CreateProducerAsync(message.ReplyTo);
        await producer.SendAsync(responseMessage);
    }
}

This worked as expected, but I'd like to have something more down the line of what is described in this article, except it doesn't have any examples of a request response pattern.

To elaborate, I currently have two services that I want to communicate across.

In service 1 I want to create and publish a message and then wait for a response to enrich the instance object and save it to a database. I currently have this, but it lacks the await response message.

public async Task<Instance> CreateInstance(Instance instance)
{
    await _instanceCollection.InsertOneAsync(instance);

    var @event = new InstanceCreated
    {
        Id = instance.Id,
        SiteUrl = instance.SiteUrl
    };

    await _messageProducer.PublishAsync(@event);

    return instance;
}

I figured I might need to setup a temporary queue/connection or something in the PublishAsync() and change it to e.g. Task<Message> to support returning a response message. But how would I go about doing that? Would I have to do a new connectionfactory + CreateConsumerAsync etc. like in the console app example?

public class MessageProducer
{
    private readonly IAnonymousProducer _producer;

    public MessageProducer(IAnonymousProducer producer)
    {
        _producer = producer;
    }

    public async Task PublishAsync<T>(T message, string replyTo = null, string correlationId = null)
    {
        var serialized = JsonSerializer.Serialize(message);
        var address = typeof(T).Name;
        var msg = new Message(serialized);
        if (replyTo != null && correlationId != null)
        {
            msg.CorrelationId = correlationId;
            msg.ReplyTo = replyTo;
        }
        await _producer.SendAsync(address, msg);
    }

    public async Task PublishAsync<T>(T message, string routeName, string replyTo = null, string correlationId = null)
    {
        var serialized = JsonSerializer.Serialize(message);
        var address = routeName;
        var msg = new Message(serialized);
        if(replyTo != null && correlationId != null)
        {
            msg.CorrelationId = correlationId;
            msg.ReplyTo = replyTo;
        }
        await _producer.SendAsync(address, msg);
    }
}

In Service 2 I have a InstanceCreatedConsumer which receives messages, but again it lacks a way to return response messages.

public class InstanceCreatedConsumer : ITypedConsumer<InstanceCreated>
{
    private readonly MessageProducer _messageProducer;
    public InstanceCreatedConsumer(MessageProducer messageProducer)
    {
        _messageProducer = messageProducer;
    }
    public async Task ConsumeAsync(InstanceCreated message, CancellationToken cancellationToken)
    {
        // consume message and return response
    }
}

I figured I might be able to extend the ActiveMqExtensions class with a ConsumeAsync and HandleMessage that handles the response message with a return value, but I haven't gotten as far yet.

public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder,
    RoutingType routingType)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    builder.Services.TryAddScoped<TConsumer>();
    builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>);
    return builder;
}

private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    try
    {
        var msg = JsonConvert.DeserializeObject<TMessage>(message.GetBody<string>());
        using var scope = serviceProvider.CreateScope();
        var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
        await typedConsumer.ConsumeAsync(msg, token);
        await consumer.AcceptAsync(message);
    }
    catch(Exception ex)
    {
        // todo
    }
}

Am I totally wrong in what I am trying to achieve here, or is it just not possible with the ArtemisNetClient?

Maybe someone has an example or can confirm whether I am down the right path, or maybe I should be using a different framework.

I am new to this kind of communication through messages like ActiveMQ Artemis, so any guidance is appreciated.

2

There are 2 best solutions below

2
On BEST ANSWER

With version 2.7.0 ArtemisNetClient introduces IRequestReplyClient interface that can be used to implement a request-response messaging pattern. With ArtemisNetClient.Extensions.DependencyInjection this may look as follows:

Client Side:

First you need to register your typed request-reply client in DI:

public void ConfigureServices(IServiceCollection services)
{
    /*...*/ 
    var endpoints = new[] { Endpoint.Create(host: "localhost", port: 5672, "guest", "guest") };
    services.AddActiveMq("bookstore-cluster", endpoints)
            .AddRequestReplyClient<MyRequestReplyClient>();
    /*...*/
}

MyRequestReplyClient is your custom class that expects the IRequestReplyClient to be injected via the constructor. Once you have your custom class, you can either expose the IRequestReplyClient directly or encapsulate sending logic inside of it:

public class MyRequestReplyClient
{
    private readonly IRequestReplyClient _requestReplyClient;
    public MyRequestReplyClient(IRequestReplyClient requestReplyClient)
    {
        _requestReplyClient = requestReplyClient;
    }

    public async Task<TResponse> SendAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken)
    {
        var serialized = JsonSerializer.Serialize(request);
        var address = typeof(TRequest).Name;
        var msg = new Message(serialized);
        var response = await _requestReplyClient.SendAsync(address, msg, cancellationToken);
        return JsonSerializer.Deserialize<TResponse>(response.GetBody<string>());
    }
}

That's it regarding the client-side.

Worker side

To implement the worker side you can (as you suggested), change the ITypedConsumer interface to return the message that would be sent back, or you can provide the additional data (ReplyTo and CorrelationId headers) so you can send the response back as part of your consumer logic. I prefer the latter as it's a more flexible option in my opinion.

Modified ITypedConsumer might look like that:

public interface ITypedConsumer<in T>
{
    public Task ConsumeAsync(T message, MessageContext context, CancellationToken cancellationToken);
}

Where MessageContext is just a simple dto:

public class MessageContext
{
    public string ReplyTo { get; init; }
    public string CorrelationId { get; init; }
}

HandleMessage extension method:

private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    var msg = JsonSerializer.Deserialize<TMessage>(message.GetBody<string>());
    using var scope = serviceProvider.CreateScope();
    var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
    var messageContext = new MessageContext
    {
        ReplyTo = message.ReplyTo,
        CorrelationId = message.CorrelationId
    };
    await typedConsumer.ConsumeAsync(msg, messageContext, token);
    await consumer.AcceptAsync(message);
}

MessageProducer has to be slightly changed as well, so you can explicitly pass address and CorrelationId:

public class MessageProducer
{
    private readonly IAnonymousProducer _producer;

    public MessageProducer(IAnonymousProducer producer)
    {
        _producer = producer;
    }

    public async Task PublishAsync<T>(string address, T message, MessageContext context, CancellationToken cancellationToken)
    {
        var serialized = JsonSerializer.Serialize(message);
        var msg = new Message(serialized);
        if (!string.IsNullOrEmpty(context.CorrelationId))
        {
            msg.CorrelationId = context.CorrelationId;
        }

        await _producer.SendAsync(address, msg, cancellationToken);
    }
}

And finally, the exemplary consumer could work like that:

public class CreateBookConsumer : ITypedConsumer<CreateBook>
{
    private readonly MessageProducer _messageProducer;

    public CreateBookConsumer(MessageProducer messageProducer)
    {
        _messageProducer = messageProducer;
    }

    public async Task ConsumeAsync(CreateBook message, MessageContext context, CancellationToken cancellationToken)
    {
        var @event = new BookCreated
        {
            Id = Guid.NewGuid(),
            Title = message.Title,
            Author = message.Author,
            Cost = message.Cost,
            InventoryAmount = message.InventoryAmount,
            UserId = message.UserId,
            Timestamp = DateTime.UtcNow
        };

        await _messageProducer.PublishAsync(context.ReplyTo, @event, new MessageContext
        {
            CorrelationId = context.CorrelationId
        }, cancellationToken);
    }
}
1
On

I don't see anything in the ArtemisNetClient that would simplify the request/response pattern from your application's point of view. One might expect something akin to JMS' QueueRequestor, but I don't see anything like that in the code, and I don't see anything like that listed in the documentation.

I recommend you simply do in your application what you did in your example (i.e. manually create the consumer & producer to deal with the responses on each end respectively). The only change I would recommend is to re-use connections so you create as few as possible. A connection pool would be ideal here.


For what it's worth, it looks to me like the first release of ArtemisNetClient was just 3 months ago and according to GitHub all but 2 of the commits to the code-base came from one developer. ArtemisNetClient may grow into a very successful C# client implementation, but at this point it seems relatively immature. Even if the existing code is high quality if there isn't a solid community around the client then chances are it won't have the support necessary to get timely bug fixes, new features, etc. Only time will tell.