MongoDB C# Changestream cursor $Index was outside the bounds of the array

69 Views Asked by At

We are using changestream using c# mongodb driver to do some realtime work. We are listening few databases collection data insert and update operation. We received following exception after few hours every time.

$Index was outside the bounds of the array.

at Microsoft.Extensions.Logging.LogValuesFormatter.GetValue(Object[] values, Int32 index) at Microsoft.Extensions.Logging.FormattedLogValues.GetEnumerator()+MoveNext() at Serilog.Extensions.Logging.SerilogLogger.Log[TState](LogLevel logLevel, EventId eventId, TState state, Exception exception, Func3 formatter) at Microsoft.Extensions.Logging.Logger1.Microsoft.Extensions.Logging.ILogger.Log[TState](LogLevel logLevel, EventId eventId, TState state, Exception exception, Func3 formatter) at Microsoft.Extensions.Logging.LoggerExtensions.Log(ILogger logger, LogLevel logLevel, EventId eventId, Exception exception, String message, Object[] args) at SyncServer.Services.MultiDatabaseChangeStreamManager.<>cDisplayClass9_0.<<StartChangeStreams>b6>d.MoveNext() in /app/jenkins/workspace/flecx-prod-changestream/src/Host/SyncServer/Services/MultiDatabaseChangeStreamManager.cs:line 167 — End of stack trace from previous location — at MongoDB.Driver.IAsyncCursorExtensions.ForEachAsync[TDocument](IAsyncCursor1 source, Func`3 processor, CancellationToken cancellationToken) at SyncServer.Services.MultiDatabaseChangeStreamManager.StartChangeStreams() in /app/jenkins/workspace/flecx-prod-changestream/src/Host/SyncServer/Services/MultiDatabaseChangeStreamManager.cs:line 109

On line 109 here is the code

enter image description here

Sharing function code

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
               .Match(change => change.OperationType == ChangeStreamOperationType.Insert || change.OperationType == ChangeStreamOperationType.Update)
               .Match(y => client.SyncSettings.WatchedDatabases.Contains(y.DatabaseNamespace.DatabaseName))
               .Match(x => client.SyncSettings.WatchedCollections.Contains(x.CollectionNamespace.CollectionName));

        var resumeToken = new BsonDocument();
        var logCollection = database.GetCollection<Logstream>("logstream");
        var lastLog = logCollection.AsQueryable().Where(x => x.Status == LogstreamStatus.Completed).OrderByDescending(o => o.CreatedOn).FirstOrDefault();

        if (lastLog is not null)
        {
            _logger.LogInformation("Last failed log found");
            resumeToken = lastLog.ResumeToken;
        }
        else
        {
            _logger.LogInformation("No last log found, fetching resume token from mongodb directly");

            var previousCursor = await database.WatchAsync();

            resumeToken = previousCursor.GetResumeToken();

            _logger.LogInformation("Resume token fetched");

            previousCursor.Dispose();
        }
        var options = new ChangeStreamOptions { ResumeAfter = resumeToken, BatchSize = 5 };

        CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
        CancellationToken token = cancelTokenSource.Token;

        try
        {
            using (var cursor = await mongoclient.WatchAsync(pipeline, options, cancellationToken: token))
            {
                await cursor.ForEachAsync(async change =>
                {
                    _logger.LogInformation("Inspecting change for | {documentId} | {operationType}", change.DocumentKey["_id"], change.OperationType);

                    var log = Logstream.Create(client.ClientId, change.DocumentKey, change.FullDocument, change.OperationType == ChangeStreamOperationType.Insert ? null : change.UpdateDescription.UpdatedFields, change.ResumeToken, change.OperationType, change.CollectionNamespace.CollectionName);

                    try
                    {
                        switch (change.OperationType)
                        {
                            case ChangeStreamOperationType.Insert:
                                {
                                    _logger.LogInformation($"Insert object = {change.FullDocument.ToJson()}");
                                    _logger.LogInformation($"ReplayChangeToTarget start");
                                    await ReplayChangeToTarget(change, client.SyncSettings, change.CollectionNamespace.CollectionName);
                                    _logger.LogInformation($"ReplayChangeToTarget end");
                                    _logger.LogInformation($"Inserting logs");
                                    await logCollection.InsertOneAsync(log);
                                    _logger.LogInformation($"logs inserted");
                                    break;
                                }
                            case ChangeStreamOperationType.Update:
                                {
                                    _logger.LogInformation($"Update object = {change.UpdateDescription.UpdatedFields.ToJson()}");
                                    _logger.LogInformation($"ReplayChangeToTarget start");
                                    await ReplayChangeToTarget(change, client.SyncSettings, change.CollectionNamespace.CollectionName);
                                    _logger.LogInformation($"ReplayChangeToTarget end");
                                    _logger.LogInformation($"Inserting logs");
                                    await logCollection.InsertOneAsync(log);
                                    _logger.LogInformation($"logs inserted");
                                    break;
                                }
                        }

                    }
                    catch (MongoWriteException ex)
                    {

                        if (ex.WriteError.Code == 11000)
                        {
                            Console.WriteLine($"Exception Dup key: {change.FullDocument.GetValue("_id")}, ignore...");
                            log.Status = LogstreamStatus.Completed;
                            await logCollection.InsertOneAsync(log);
                        }
                        else
                        {
                            _logger.LogInformation($"MongoException Exception :: {ex.Message}", ex.ToString());
                            await SendEmailReport(client.ClientName, change.CollectionNamespace.CollectionName, ex.Message);
                            cancelTokenSource.Cancel();
                        }
                    }
                    catch (Exception ex)
                    {

                        _logger.LogInformation($"ReplayChangeToTarget Exception :: {ex.Message}", ex.ToString());
                        await SendEmailReport(client.ClientName, change.CollectionNamespace.CollectionName, ex.Message);
                        cancelTokenSource.Cancel();
                    }
                }, cancellationToken: token);
            }
        }
        catch (Exception ex)
        {
            await SendEmailReport("App", "App", ex.Message);
            _logger.LogInformation($"Collection.WatchAsync Exception :: ${ex.Message}", ex.ToString());
            _logger.LogInformation(ex.StackTrace);
        }




 private async Task ReplayChangeToTarget(ChangeStreamDocument<BsonDocument> change, SyncSettings settings, string collectionName)
        {
            _logger.LogInformation($"Sync target started for {collectionName}");
            IMongoDatabase database;
            IMongoClient client;
            if (settings.IsSSLEnable)
            {
                var path = Environment.CurrentDirectory + Path.DirectorySeparatorChar + "Certificates" + Path.DirectorySeparatorChar + settings.CertificateFilePath;
                client = _dbManager.GetClientFromUrl(settings.ConnectionString, path, settings.Password);
                database = client.GetDatabase(settings.Database);
            }
            else
            {
                database = _dbManager.GetMongoDbFromUrl(settings.ConnectionString);
            }

            var collection = database.GetCollection<BsonDocument>(collectionName);

            switch (change.OperationType)
            {
                case ChangeStreamOperationType.Insert:
                    {
                        _logger.LogInformation($"Inserting to target collection = {collectionName}");
                        await collection.InsertOneAsync(change.FullDocument);
                        _logger.LogInformation($"Inserted successfully to target collection = {collectionName}");
                        break;
                    }
                case ChangeStreamOperationType.Update:
                    {
                        _logger.LogInformation($"Updating to target collection = {collectionName}");

                        var filter = Builders<BsonDocument>.Filter.Eq("_id", change.DocumentKey["_id"]);
                        var fields = change.UpdateDescription.UpdatedFields;
                        var updateDefination = new List<UpdateDefinition<BsonDocument>>();
                        foreach (var dataField in fields)
                        {
                            updateDefination.Add(Builders<BsonDocument>.Update.Set(dataField.Name, dataField.Value));
                        }
                        if (updateDefination.Count > 0)
                        {
                            var combinedUpdate = Builders<BsonDocument>.Update.Combine(updateDefination);
                            await collection.UpdateOneAsync(filter, combinedUpdate);
                            _logger.LogInformation($"Updated successfully to target collection = {collectionName}");
                        }
                        break;
                    }
            }
            _logger.LogInformation($"Sync target ended for {collectionName}");
        }
0

There are 0 best solutions below