I am implementing the request/response pattern for MassTransit using RabbitMQ using an IBusControl object and only 1 Queue.
We will have 1 Request originator app (currently a Unit Test) running and multiple consumer (console) apps running. Each consumer app will ONLY be responding to messages with a given MyId (command-line argument).
I am assuming that I should accomplish this by adding a filter to the ReceiveEndpoint definition. I am using the latest MassTransit (8.1.3.0).
I tried creating a ConsumeContext filter (MyFilter1) in the code below (e.UseFilter(new MyFilter1(myId));) This filter was never hit. Now I am trying to create a ConsumerConsumeContext filter (MyFilter). I believe that this is the type of filter that I want.
All of the code samples I find seem to be using an older version of MassTransit, and they are able to call UseConsumeFilter passing in the new filter object.
like so...
e.UseConsumeFilter(new MyFilter1(myId));
e.UseConsumeFilter now takes in a type and a registration context, not a filter object.
I can add a static variable to Program class and give it a MyId.
But what do I send in for a registration context?
Or is there a better way to filter out messages that don't apply?
Also, if I get the filter to work, will MassTransit take this request and send it to the next consumer or will it be lost?
Here is the code. I have changed the class names and removed all try/catch blocks and some functions for readability.
Consumer App
static async Task<int> Main(string[] args)
{
Console.WriteLine("Consumer started ...");
if (args.Length == 0)
{
Console.WriteLine("You have given no command-line arguments. Expected <myId> ...");
return -1;
}
string myId = args[0];
string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"];
Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUserName);
h.Password(rabbitPassword);
});
x.ReceiveEndpoint(rabbitQueue,
e =>{
e.Consumer<MyConsumer>( );
!!!!INSERT LINE HERE!!!!
//the following line never hits the filter before consumer
//gets called
//e.UseFilter(new MyFilter1(myId));
//e.UseConsumeFilter(typeof (MyFilter),null);
}
);
});
Console.WriteLine($"Finished Creating Bus..MyId={myId}");
}
public class MyFilter:
IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
{
private string MyId { get; set; }
public MyFilter(string myId)
{
MyId = myId;
}
public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
{
if (context.Message.MyId == MyId)
{
await next.Send(context);
}
}
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("MyId");
scope.Add("MyId", MyId);
}
}
public class MyFilter1 :
IFilter<ConsumeContext<IMyRequest>>
{
private string MyId { get; set; }
public MyFilter1(string myId)
{
MyId = myId;
}
async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
{
if (context.Message.MyId == MyId )
{
await next.Send(context);
}
}
void IProbeSite.Probe(ProbeContext context)
{
}
}
public class MyConsumer : IConsumer<MyRequest>
{
public async Task Consume(ConsumeContext<MyRequest> context)
{
DateTime receivedTime = DateTime.Now;
Console.WriteLine($"Received Request for {context.Message.MyId}");
. . . do stuff . . .
context.Respond(
new MyResponse
{
MyId = context.Message.MyId,
ResponseStatus = ResponseStatus.Passed
});
Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");
}
}
Request Originator App
const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
{
h.Username(RABBIT_USER_NAME);
h.Password(RABBIT_PASSWORD);
}));
TaskUtil.Await(() => bus.StartAsync());
var serviceAddress = new Uri(RABBIT_SERVICE);
IRequestClient<LightupRequest> client =
busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));
MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
request.MyId = myId;
request.sid = 123456;
request.tid = 123457;
request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
request.Token = "111111111111111111111111111111";
request.Version = "10.02.003.0004";
Task.Run(async () =>
{
request.StartTime = DateTime.Now;
var response = await client.GetResponse<MyResponse>(request);
//do something with response
}).Wait();
}
Request/Response Types
public interface IMyRequest
{
string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
public string MyId { get; set; }
public string Token { get; set; }
public string Data { get; set; }
public int someId { get; set; }
public int someId2 { get; set; }
public string Version { get; set; }
public DateTime StartTime { get; set; }
}
public class MyResponse
{
public enum ResponseStatus { passed, failed, timeout, networkError }
public string MyId { get; set; }
public string ErrorData { get; set; }
}
I tried creating a ConsumeContext filter (MyFilter1) in the code below (e.UseFilter(new MyFilter1(myId));)
I was hoping that this filter would be hit BEFORE the consumer so that it could prevent this consumer from consuming the message if the ID isn't the ID that I am expecting.
I put a breakpoint on MyFilter1.Send and it never hit.
Furthermore, when I run 4 instances of my subscriber app, I can see that MassTransit distributes the message to only 1 of the 4 executables, and that the next message will be on a different executable.
Changes After Response
I changed the Queue Name to include myId, and I added a bind
Consumer App
static async Task<int> Main(string[] args)
{
Console.WriteLine("Consumer started ...");
if (args.Length == 0)
{
Console.WriteLine("You have given no command-line arguments. Expected <myId> ...");
return -1;
}
string myId = args[0];
string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"]+ $".{myId}";
Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUserName);
h.Password(rabbitPassword);
});
x.ReceiveEndpoint(rabbitQueue,
e =>{
e.Consumer<MyConsumer>( );
e.Bind<MyConsumer>();
}
);
});
Console.WriteLine($"Finished Creating Bus..MyId={myId}");
}
public class MyFilter:
IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
{
private string MyId { get; set; }
public MyFilter(string myId)
{
MyId = myId;
}
public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
{
if (context.Message.MyId == MyId)
{
await next.Send(context);
}
}
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("MyId");
scope.Add("MyId", MyId);
}
}
public class MyFilter1 :
IFilter<ConsumeContext<IMyRequest>>
{
private string MyId { get; set; }
public MyFilter1(string myId)
{
MyId = myId;
}
async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
{
if (context.Message.MyId == MyId )
{
await next.Send(context);
}
}
void IProbeSite.Probe(ProbeContext context)
{
}
}
public class MyConsumer : IConsumer<MyRequest>
{
public async Task Consume(ConsumeContext<MyRequest> context)
{
DateTime receivedTime = DateTime.Now;
Console.WriteLine($"Received Request for {context.Message.MyId}");
. . . do stuff . . .
context.Respond(
new MyResponse
{
MyId = context.Message.MyId,
ResponseStatus = ResponseStatus.Passed
});
Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");
}
}
Request Originator App
const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
{
h.Username(RABBIT_USER_NAME);
h.Password(RABBIT_PASSWORD);
}));
TaskUtil.Await(() => bus.StartAsync());
MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
var serviceAddress = new Uri($"{RABBIT_SERVICE}.{myId}");
IRequestClient<LightupRequest> client =
busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));
request.MyId = myId;
request.sid = 123456;
request.tid = 123457;
request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
request.Token = "111111111111111111111111111111";
request.Version = "10.02.003.0004";
Task.Run(async () =>
{
request.StartTime = DateTime.Now;
var response = await client.GetResponse<MyResponse>(request);
//do something with response
}).Wait();
}
Request/Response Types (these didn't change)
public interface IMyRequest
{
string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
public string MyId { get; set; }
public string Token { get; set; }
public string Data { get; set; }
public int someId { get; set; }
public int someId2 { get; set; }
public string Version { get; set; }
public DateTime StartTime { get; set; }
}
public class MyResponse
{
public enum ResponseStatus { passed, failed, timeout, networkError }
public string MyId { get; set; }
public string ErrorData { get; set; }
}
Observations
When the consumer App Runs, the following are created on RabbitMQ
- 1 Connection
- 2 channels
- Exchange . binds from Exchange :MyConsumer (no routing key) Exchange :IMyRequest (no routing key) binds to Queue .
- Exchange :MyConsumer binds to Exchange . (no routing key)
- Exchange :IMyRequest binds to Exchange . (no routing key)
- Queue . binds from Exchange . (no routing key)
When I run a 2nd instance of a consumer app with a new myId, here's what gets added
1 connection
2 channels
Exchange .<2nd Id> binds from Exchange :MyConsumer (no routing key) Exchange :IMyRequest (no routing key) binds to Queue .<2nd Id>
Queue .<2nd Id> binds from Exchange .<2nd Id> (no routing key)
Exchange :MyConsumer gets a binding added to Exchange .<2nd Id>
Exchange :IMyRequest gets a binding added to Exchange .<2nd Id>
Final Question
I really like the easy usage of this pattern, and it is an easy way to ensure that your requests don't go into a black hole. However, there is 1 queue and 1 exchange created per id. And I am seeing 2 channels and 1 connection each time the consumer app is ran. Is this the best way to ensure that our responses don't get dropped?
After Chris Patterson answered, Here is the final code
Consumer App
This is a console App.
Request Originator App In my case, this app is a C# Unit Test, but in your case, it could be anything from a console app, to a unit test, to a website.
Request/Response Types (these didn't change)