I use event hubs processor host to receive and process the events from event hubs. For better performance, I call checkpoint every 3 minutes instead of every time when receiving the events:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
await context.CheckpointAsync();
}
}
But the problem is, that there might be some events never being checkpoint if not new events sending to event hubs, as the ProcessEventAsync won't be invoked if no new messages.
Any suggestions to make sure all processed events being checkpoint, but still checkpoint every several mins?
Update: Per Sreeram's suggestion, I updated the code as below:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
this.lastProcessedEventsCount += messages.Count();
if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
this.checkpointStopWatch.Restart();
if (this.lastProcessedEventsCount > 0)
{
await context.CheckpointAsync();
this.lastProcessedEventsCount = 0;
}
}
}
Great case - you are covering!
You could experience loss of
event checkpoints
(and as a resultevent replay
) in the below 2 cases:when you have sparse data flow (for ex: a batch of messages every 5 mins and your checkpoint interval is 3 mins) and
EventProcessorHost
instance closes for some reason - you could see2 min
ofEventData
- re-processing. To handle that case, Keep track of thelastProcessedEvent
after completingIEventProcessor.onEvents
/IEventProcessor.ProcessEventsAsync
& checkpoint when you get notified on close -IEventProcessor.onClose
/IEventProcessor.CloseAsync
.There might just be a case when - there are no more events to a specific
EventHubs partition
. In this case, you would never see the last event being checkpointed - with yourCheckpointing strategy
. However, this is uncommon, when you have continuous flow ofEventData
and you are not sending to specific EventHubs partition (EventHubClient.send(EventData_Without_PartitionKey)
). If you think - you could run into this situation, use the:flag to wake up the
processEventsAsync
every so often. Then, keep track of,LastProcessedEventData
andLastCheckpointedEventData
and make a judgement whether to checkpoint when noEvents
are received, based onEventData.SequenceNumber
property on those events.