We are using changestream using c# mongodb driver to do some realtime work. We are listening few databases collection data insert and update operation. We received following exception after few hours every time.
$Index was outside the bounds of the array.
at Microsoft.Extensions.Logging.LogValuesFormatter.GetValue(Object[] values, Int32 index) at Microsoft.Extensions.Logging.FormattedLogValues.GetEnumerator()+MoveNext() at Serilog.Extensions.Logging.SerilogLogger.Log[TState](LogLevel logLevel, EventId eventId, TState state, Exception exception, Func3 formatter) at Microsoft.Extensions.Logging.Logger1.Microsoft.Extensions.Logging.ILogger.Log[TState](LogLevel logLevel, EventId eventId, TState state, Exception exception, Func3 formatter) at Microsoft.Extensions.Logging.LoggerExtensions.Log(ILogger logger, LogLevel logLevel, EventId eventId, Exception exception, String message, Object[] args) at SyncServer.Services.MultiDatabaseChangeStreamManager.<>cDisplayClass9_0.<<StartChangeStreams>b6>d.MoveNext() in /app/jenkins/workspace/flecx-prod-changestream/src/Host/SyncServer/Services/MultiDatabaseChangeStreamManager.cs:line 167 — End of stack trace from previous location — at MongoDB.Driver.IAsyncCursorExtensions.ForEachAsync[TDocument](IAsyncCursor1 source, Func`3 processor, CancellationToken cancellationToken) at SyncServer.Services.MultiDatabaseChangeStreamManager.StartChangeStreams() in /app/jenkins/workspace/flecx-prod-changestream/src/Host/SyncServer/Services/MultiDatabaseChangeStreamManager.cs:line 109
On line 109 here is the code
Sharing function code
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Insert || change.OperationType == ChangeStreamOperationType.Update)
.Match(y => client.SyncSettings.WatchedDatabases.Contains(y.DatabaseNamespace.DatabaseName))
.Match(x => client.SyncSettings.WatchedCollections.Contains(x.CollectionNamespace.CollectionName));
var resumeToken = new BsonDocument();
var logCollection = database.GetCollection<Logstream>("logstream");
var lastLog = logCollection.AsQueryable().Where(x => x.Status == LogstreamStatus.Completed).OrderByDescending(o => o.CreatedOn).FirstOrDefault();
if (lastLog is not null)
{
_logger.LogInformation("Last failed log found");
resumeToken = lastLog.ResumeToken;
}
else
{
_logger.LogInformation("No last log found, fetching resume token from mongodb directly");
var previousCursor = await database.WatchAsync();
resumeToken = previousCursor.GetResumeToken();
_logger.LogInformation("Resume token fetched");
previousCursor.Dispose();
}
var options = new ChangeStreamOptions { ResumeAfter = resumeToken, BatchSize = 5 };
CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
CancellationToken token = cancelTokenSource.Token;
try
{
using (var cursor = await mongoclient.WatchAsync(pipeline, options, cancellationToken: token))
{
await cursor.ForEachAsync(async change =>
{
_logger.LogInformation("Inspecting change for | {documentId} | {operationType}", change.DocumentKey["_id"], change.OperationType);
var log = Logstream.Create(client.ClientId, change.DocumentKey, change.FullDocument, change.OperationType == ChangeStreamOperationType.Insert ? null : change.UpdateDescription.UpdatedFields, change.ResumeToken, change.OperationType, change.CollectionNamespace.CollectionName);
try
{
switch (change.OperationType)
{
case ChangeStreamOperationType.Insert:
{
_logger.LogInformation($"Insert object = {change.FullDocument.ToJson()}");
_logger.LogInformation($"ReplayChangeToTarget start");
await ReplayChangeToTarget(change, client.SyncSettings, change.CollectionNamespace.CollectionName);
_logger.LogInformation($"ReplayChangeToTarget end");
_logger.LogInformation($"Inserting logs");
await logCollection.InsertOneAsync(log);
_logger.LogInformation($"logs inserted");
break;
}
case ChangeStreamOperationType.Update:
{
_logger.LogInformation($"Update object = {change.UpdateDescription.UpdatedFields.ToJson()}");
_logger.LogInformation($"ReplayChangeToTarget start");
await ReplayChangeToTarget(change, client.SyncSettings, change.CollectionNamespace.CollectionName);
_logger.LogInformation($"ReplayChangeToTarget end");
_logger.LogInformation($"Inserting logs");
await logCollection.InsertOneAsync(log);
_logger.LogInformation($"logs inserted");
break;
}
}
}
catch (MongoWriteException ex)
{
if (ex.WriteError.Code == 11000)
{
Console.WriteLine($"Exception Dup key: {change.FullDocument.GetValue("_id")}, ignore...");
log.Status = LogstreamStatus.Completed;
await logCollection.InsertOneAsync(log);
}
else
{
_logger.LogInformation($"MongoException Exception :: {ex.Message}", ex.ToString());
await SendEmailReport(client.ClientName, change.CollectionNamespace.CollectionName, ex.Message);
cancelTokenSource.Cancel();
}
}
catch (Exception ex)
{
_logger.LogInformation($"ReplayChangeToTarget Exception :: {ex.Message}", ex.ToString());
await SendEmailReport(client.ClientName, change.CollectionNamespace.CollectionName, ex.Message);
cancelTokenSource.Cancel();
}
}, cancellationToken: token);
}
}
catch (Exception ex)
{
await SendEmailReport("App", "App", ex.Message);
_logger.LogInformation($"Collection.WatchAsync Exception :: ${ex.Message}", ex.ToString());
_logger.LogInformation(ex.StackTrace);
}
private async Task ReplayChangeToTarget(ChangeStreamDocument<BsonDocument> change, SyncSettings settings, string collectionName)
{
_logger.LogInformation($"Sync target started for {collectionName}");
IMongoDatabase database;
IMongoClient client;
if (settings.IsSSLEnable)
{
var path = Environment.CurrentDirectory + Path.DirectorySeparatorChar + "Certificates" + Path.DirectorySeparatorChar + settings.CertificateFilePath;
client = _dbManager.GetClientFromUrl(settings.ConnectionString, path, settings.Password);
database = client.GetDatabase(settings.Database);
}
else
{
database = _dbManager.GetMongoDbFromUrl(settings.ConnectionString);
}
var collection = database.GetCollection<BsonDocument>(collectionName);
switch (change.OperationType)
{
case ChangeStreamOperationType.Insert:
{
_logger.LogInformation($"Inserting to target collection = {collectionName}");
await collection.InsertOneAsync(change.FullDocument);
_logger.LogInformation($"Inserted successfully to target collection = {collectionName}");
break;
}
case ChangeStreamOperationType.Update:
{
_logger.LogInformation($"Updating to target collection = {collectionName}");
var filter = Builders<BsonDocument>.Filter.Eq("_id", change.DocumentKey["_id"]);
var fields = change.UpdateDescription.UpdatedFields;
var updateDefination = new List<UpdateDefinition<BsonDocument>>();
foreach (var dataField in fields)
{
updateDefination.Add(Builders<BsonDocument>.Update.Set(dataField.Name, dataField.Value));
}
if (updateDefination.Count > 0)
{
var combinedUpdate = Builders<BsonDocument>.Update.Combine(updateDefination);
await collection.UpdateOneAsync(filter, combinedUpdate);
_logger.LogInformation($"Updated successfully to target collection = {collectionName}");
}
break;
}
}
_logger.LogInformation($"Sync target ended for {collectionName}");
}
