To Implement watch operation on multiple mongo databases in c# using changestreams

393 Views Asked by At
var options= new ChangeStreamOptions {
    FullDocument = ChangeFullStreamOption.UpdateLookup
};
                                      
var enumerator = newclient
    .GetDatabase(databaseName)
    .Watch(options)
    .ToEnumerable()
    .GetEnumerator();

enumerator.MoveNext(); 

Above code is watching single database , need to watch the updates in multiple database parallelly

1

There are 1 best solutions below

4
On

I would suggest using channels for this as they are designed for synchronising parallel sources for processing

This would look something like this

using MongoDB.Bson;
using MongoDB.Driver;
using System.Threading.Channels;


var dblist = new string[]{
    "db1",
    "db2",
    "db3"
};


var options = new ChangeStreamOptions
{
    FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};

var client = new MongoClient();

var watcher = Channel.CreateUnbounded<ChangeStreamDocument<BsonDocument>>();
var tokenSource = new CancellationTokenSource();
List<Task> tasks = new List<Task>();
foreach (var db in dblist)
{
    //create a monitor for each database
    tasks.Add(
        monitorDB(db,client, watcher.Writer, tokenSource.Token)
    );

}
//create the processor for your changes
var processor = processChanges(watcher.Reader);
//wait for all the monitors to complete
await Task.WhenAll(tasks);
// when all monitors have ended mark the channel as completed
watcher.Writer.Complete();
//wait for the processorto complete
await processor;


async Task monitorDB(string name, IMongoClient client, ChannelWriter<ChangeStreamDocument<BsonDocument>> writer, CancellationToken token)
{
    var enumerator = client
        .GetDatabase(name)
        .Watch(options, token)
        .ToEnumerable()
        ;
    foreach (var change in enumerator)
    {
        // wait for the channel to be ready to write
        await writer.WaitToWriteAsync();
        // write the change to the channel
        await writer.WriteAsync(change);
        //if cancelled exit
        if (token.IsCancellationRequested)
            break;
    }

}

async Task processChanges(ChannelReader<ChangeStreamDocument<BsonDocument>> reader)
{
    //while the channel isn't complete, wait for a new document to be received
    while(await reader.WaitToReadAsync())
    {
        //get the waiting document
        ChangeStreamDocument<BsonDocument>> doc = await reader.ReadAsync();
        //do something with doc ...
    }
}