CQRS, multiple write nodes for a single aggregate entry, while maintaining concurrency

382 Views Asked by At

Let's say I have a command to edit a single entry of an article, called ArticleEditCommand.

  • User 1 issues an ArticleEditCommand based on V1 of the article.
  • User 2 issues an ArticleEditCommand based on V1 of the same
    article.

If I can ensure that my nodes process the older ArticleEditCommand commands first, I can be sure that the command from User 2 will fail because User 1's command will have changed the version of the article to V2.

However, if I have two nodes process ArticleEditCommand messages concurrently, even though the commands will be taken of the queue in the correct order, I cannot guarantee that the nodes will actually process the first command before the second command, due to a spike in CPU or something similar. I could use a sql transaction to update an article where version = expectedVersion and make note of the number of records changed, but my rules are more complex, and can't live solely in SQL. I would like my entire logic of the command processing guaranteed to be concurrent between ArticleEditCommand messages that alter that same article.

I don't want to lock the queue while I process the command, because the point of having multiple command handlers is to handle commands concurrently for scalability. With that said, I don't mind these commands being processed consecutively, but only for a single instance/id of an article. I don't expect a high volume of ArticleEditCommand messages to be sent for a single article.

With the said, here is the question.

Is there a way to handle commands consecutively across multiple nodes for a single unique object (database record), but handle all other commands (distinct database records) concurrently?

Or, is this a problem I created myself because of a lack of understanding of CQRS and concurrency?

Is this a problem that message brokers typically have solved? Such as Windows Service Bus, MSMQ/NServiceBus, etc?

EDIT: I think I know how to handle this now. When User 2 issues the ArticleEditCommand, an exception should be throw to the user letting them know that there is a current pending operation on that article that must be completed before then can queue the ArticleEditCommand. That way, there is never two ArticleEditCommand messages in the queue that effect the same article.

3

There are 3 best solutions below

1
On

First let me say, if you don't expect a high volume of ArticleEditCommand messages being sent, this sounds like premature optimization.

In other solutions, this problem is usually not solved by message brokers, but by optimistic locking enforced by the persistence implementation. I don't understand why a simple version field for optimistic locking that can be trivially handled by SQL contradicts complicated business logic/updates, maybe you could elaborate more?

0
On

It's actually quite simple and I did that. Basically, it looks like this ( pseudocode)

//message handler
ModelTools.TryUpdateEntity(
  ()=>{
       var entity= _repo.Get(myId);
       entity.Do(whateverCommand);
       _repo.Save(entity);
       }
10); //retry 10 times until giving up

 //repository
 long? _version;
 public MyObject Get(Guid id)
 {
    //query data and version
    _version=data.version;
    return data.ToMyObject();
  }

 public void Save(MyObject data) 
 {
    //update row in db where version=_version.Value 

    if (rowsUpdated==0)
    {
          //things have changed since we've retrieved the object
         throw new NewerVersionExistsException();
    } 
 }

ModelTools.TryUpdateEntity and NewerVersionExistsException are part of my CavemanTools generic purpose library (available on Nuget).

The idea is to try doing things normally, then if the object version (rowversion/timestamp in sql) has changed we'll retry the whole operation again after waiting a couple of miliseconds. And that's exactly what the TryUpdateEntity() method does. And you can tweak how much to wait between tries or how many times it should retry the operation.

If you need to notify the user, then forget about retrying, just catch the exception directly and then tell the user to refresh or something.

0
On

Partition based solution

Achieve node stickiness by routing the incoming command based on the object's ID (eg. articleId modulo your-number-of-nodes) to make sure the commands of User1 and User2 ends up on the same node, then process the commands consecutively. You can choose to process all commands one by one or if you want to parallelize the execution, partition the commands on something like ID, odd/even, by country or similar.

Grid based solution

Use an in-memory grid (eg. Hazelcast or Coherence) and use a distributed Executor Service (http://docs.hazelcast.org/docs/2.0/manual/html/ch09.html#DistributedExecution) or similar to coordinate the command processing across the cluster.

Regardless - before adding this kind of complexity, you should of course ask yourself if it's really a problem if User2's command would be accepted and User1 got a concurrency error back. As long as User1's changes are not lost and can be re-applied after a refresh of the article it might be perfectly fine.