I am setting up a project utilizing NServiceBus and SQL Server, with a publish/subscribe model and running into an issue that makes no sense. I have two messages - OpportunityMessage and PageVisitMessage, both of which are standard POCO classes that are flagged with IEvent and look like this:
public class OpportunityMessage : IEvent
{
/// <summary>
/// Gets or sets the name of the form
/// </summary>
public string FormName { get; set; }
/// <summary>
/// Gets or sets the page identifier
/// </summary>
public byte[] PageIdentifier { get; set; }
/// <summary>
/// Gets or sets any supplemental data with a form
/// </summary>
public string SupplementalData { get; set; }
}
public class PageVisitMessage : IEvent
{
/// <summary>
/// Gets or sets the IP address of the requestor
/// </summary>
public string IpAddress { get; set; }
/// <summary>
/// Gets or sets the URL that generated a page visit
/// </summary>
public string NavigateUrl { get; set; }
/// <summary>
/// Gets or sets the referrer
/// </summary>
public string Referrer { get; set; }
/// <summary>
/// Gets or sets the user agent
/// </summary>
public string UserAgent { get; set; }
}
From a publishing perspective, everything works as expected - messages get published based on what's stored in the Subscriptions table I've configured; however, that's where I get confused. Each message type is destined for a different queue, messaging_opportunity_in and messaging_visitors_in, but whenever either message is fired, it shows up in both queues.
I did some digging and looked at the configured Subscriptions table and each message is configured to go to both queues, so it showing up in both makes sense; however, it's not setup that way from a code perspective. Each message type has a dedicated "handler" - one for PageVisitMessage and one for OpportunityMessage. The class structure for those looks like this:
public class OpportunityMessageHandler : BaseMessageHandler, IHandleMessages<OpportunityMessage>
{
//configured in appsettings.json as "messaging_opportunity_in"
public override string QueueName => this.Configuration["Messaging:OpportunityPublishQueue"];
public Task Handle(OpportunityMessage message, IMessageHandlerContext context)
{
//code to process the message
}
}
public class VisitorMessageHandler : BaseMessageHandler, IHandleMessages<PageVisitMessage>
{
//configured in appsettings.json as "messaging_visitors_in"
public override string QueueName => this.Configuration["Messaging:VisitorPublishQueue"];
public Task Handle(OpportunityMessage message, IMessageHandlerContext context)
{
//code to process the message
}
}
Each of those inherits from BaseMessageHandler which is where the endpoint is configured to listen to the configured queue name via an Initialize method that looks like this:
/// <summary>
/// Initializes this message handler.
/// </summary>
/// <param name="configuration">The <see cref="IConfiguration"/> instance containing the application's configuration.</param>
public async Task Initialize(IConfiguration configuration)
{
this.Configuration = configuration;
//setup our base listener
var listener = new EndpointConfiguration(this.QueueName);
listener.EnableInstallers();
listener.SendFailedMessagesTo($"{this.QueueName}_errors");
//configure the SQL Server transport
var transport = new SqlServerTransport(Configuration.GetConnectionString("CS"))
{
DefaultSchema = this.DefaultSchemaName
};
transport.SchemaAndCatalog.UseSchemaForQueue($"{this.QueueName}_errors", this.DefaultSchemaName);
listener.UseTransport(transport);
//configure the subscription listener
transport.Subscriptions.DisableCaching = true;
transport.Subscriptions.SubscriptionTableName = new NServiceBus.Transport.SqlServer.SubscriptionTableName(this.Configuration["Messaging:Subscriptions"], schema: this.DefaultSchemaName);
//spin it up
this.EndpointInstance = await Endpoint.Start(listener).ConfigureAwait(false);
}
What I'm trying to track down is why do these two handlers get setup to listen to messages they aren't configured for? When I run the backend message processor that spins these up, the configured Subscriptions table adds both endpoints (messaging_opportunities_in and messaging_visitors_in) as listening to all messages that exist, regardless of the fact that their handlers are configured to listen to a specific queue and not all. This essentially means every message is duplicated between the two queues and since the handlers are running, they end up processing everything twice even though each should be responsible solely for its' own message type.
Any help would be greatly appreciated!
Nowhere on the NServiceBus documentation website is something similar to your
BaseMessageHandlerclass on message handlers used. But this is where the problem starts.You're not providing all code, so you refer to a
BaseMessageHandlerbut only show theInitialize()method. I'm assuming that this method is inside theBaseMessageHandlerclass. But the code you shared doesn't show how and when that method is called.Let's dive into this line:
First of all, the object
listenershould be calledconfigurationorendpointConfiguration, because it'll never listen to anything. Also, the parameter in the constructor isendpointNameand not expecting a queue name. For example, in SQL Server your actual queue name would be something likeopportunity@dbo@nservicebus.Since you mention
messaging_opportunity_inandmessaging_visitors_in, without having any additional context, I would name the endpointsopportunitiesandvisitors.Now let's look at the following line:
The
Endpoint.Start()method returns the actual endpoint instance. The thing that is trying to process messages. But before it's started, it will do some scanning of all the types that inherit theIHandleMessages<T>interface. IfType<T>is an event (in your case, classes marked withIEvent) and it will create a subscription for that event.Now, I don't know how the
Initialize()method on those base classes is executed, because you did not include that code. But you execute it twice. Once foropportunityand once forvisitors. This means theopportunityendpoint starts scanning for classes that implementIMessageHandler<T>and finds two.OpportunityMessageHandlerVisitorMessageHandlerIt sees two messages, both are events.
OpportunityMessagePageVisitMessageThe endpoint
opportunityendpoint stores the fact that it wants to subscribe to both messages. Resulting in the subscription you're seeing.The next thing that happens is that the
visitorsendpoint is started and does EXACTLY THE SAME THING. It starts scanning, finds 2 message handlers and 2 events and creates another subscription.How to solve this
This assumes you want 2 components where you send messages between
visitorstoopportunitiesand vice versa.I recommend following the tutorial on the NServiceBus website and contact Particular Software, makers of NServiceBus. If you mention my name, we can set up a call and run through this together and you can even request free help with a proof of concept if you'd like that.