I have some code that uses the Service Bus Event Data, and I suspect that I need to use the offset property as, currently, my program is (or seems to be) re-running the same Event Hub data over and over again.
My code is as follows:
public class EventHubListener : IEventProcessor
{
private static EventHubClient _eventHubClient;
private const string EhConnectionStringNoPath = "Endpoint=...";
private const string EhConnectionString = EhConnectionStringNoPath + ";...";
private const string EhEntityPath = "...";
public void Start()
{
_eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();
EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);
foreach (string partitionId in eventHub.PartitionIds)
{
defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
{
PartitionId = partitionId
}, new EventProcessorCheckpointManager());
Console.WriteLine("Processing : " + partitionId);
}
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
MyData data = JsonConvert.DeserializeObject<MyData>(bytes);
As I get the same messages over and over again, I suspect that I need to do something like this:
string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);
However, Offset
is a string, even though it seems to be a numeric value ("12345" for example). The documentation on context.CheckPointAsync()
made it seem like that might be the answer; however, issuing that at the end of the loop seems to make no difference.
So, I have a two part question:
- What is offset? Is it what I think it is (i.e. a numeric marker to a point in the stream) and, if so, why is it a string?
- Why would I be getting the same messages over again? As I understand Event Hubs, although they guarantee at least once, once a Checkpoint has been issues, I shouldn't be getting the same messages back.
EDIT:
After a while of messing about, I've come up with something that avoids this problem; however, I certainly wouldn't claim it's a solution:
var filteredMessages =
messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
.OrderBy(a => a.EnqueuedTimeUtc);
Using the EventProcessorHost
seemed to actually make the problem worse; that is, not only were historical events being replayed, but they seemed to be replayed in a random order.
EDIT:
I came across this excellent article by @Mikhail, which does seem to address my exact issue. However; and presumably the root of my problem (or one of them, assuming this is correct, then I'm unsure why using the EventProcessorHost
doesn't just work out of the box as @Mikhail said himself in the comments). However, the ServiceBus version of ICheckpointManager
only has a single interface method:
namespace Microsoft.ServiceBus.Messaging
{
public interface ICheckpointManager
{
Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
}
}
What is offset? Is it what I think it is (i.e. a numeric marker to a point in the stream) and, if so, why is it a string?
The offset is the pointer within a stream. The offset of an event changes as events gets removed from your Event Hub when the Message Retention policy has elapsed. So a message that was once at offset 10, maybe at offset 0 several days later because older messages were dropped from the stream. This has a good diagram: Event Hubs: Stream Offsets.
Why would I be getting the same messages over again? As I understand Event Hubs, although they guarantee at least once, once a Checkpoint has been issues, I shouldn't be getting the same messages back.
You may be getting the same messages again if you are using the low-level EventReceiver
offset
since messages expire from the Event Hub when the Message Retention policy elapses (ie. Default is 1 day).Sequence number
is a better field to leverage because it does not change.When checkpointing succeeds, it tells us the last event that was successfully processed, so you shouldn't be getting the same event back because when the client starts, it'll create a stream to a position in the event stream after that event. You can file an issue on GitHub.
EventProcessorHost
is helpful as it tries to balance the processing of partitions between the number of instances running. (ie. Consider a 6 partition Event Hub. If you have 2 EventProcessorHosts connected to the same Event Hub reading with the same consumer group, they'll end up balancing the processing of those partitions with 3 each.) It also reconnects when there are transient failures like network loss.It supports checkpointing to durable storage like Azure Storage Blob. Here is a sample: Process Events using an EventProcessorClient