I have an azure worker role with an event processor host connected to an azure event hub. For some unknown reason - it will not get any messages.
logs show that it opens an EventProcessor
for every partition - and there are no errors - but ProcessEventsAsync
is never called.
using Service Bus Explorer I can see that it receives messages when the processor is down and when it's on it throws an Exception that a receiver is on.
- I did get it to work once, but after a restart it didn't continue working
I have no idea where to look next - this is however the code for the worker role
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost _eventProcessorHost;
private IEventProcessorFactory _processorFactory;
private ConfigurationProvider configuration = new ConfigurationProvider();
private string _eventHubConnectionString;
private string _storageAccountConnectionString;
private string _dbConnectionString;
public override void Run()
{
Trace.TraceInformation("EventHubWorker is running");
try
{
RunAsync(_cancellationTokenSource.Token).Wait();
}
finally
{
_runCompleteEvent.Set();
}
}
public override bool OnStart()
{
Trace.TraceInformation("EventHubWorker is starting");
CompositeResolver.RegisterAndSetAsDefault(FormattersResolver.Instance, ContractlessStandardResolver.Instance, StandardResolver.Instance);
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
SqlMapper.AddTypeHandler(new DateTimeHandler());
_eventHubConnectionString = configuration.EventHubConnectionString;
_dbConnectionString = configuration.DbConnectionString;
_storageAccountConnectionString = configuration.StorageConnectionString;
string hostName = Guid.NewGuid().ToString();
var eventClient = EventHubClient.CreateFromConnectionString(_eventHubConnectionString, configuration.EventHubName);
_eventProcessorHost = new EventProcessorHost(hostName, eventClient.Path, configuration.ConsumerGroupName,
_eventHubConnectionString, _storageAccountConnectionString);
var partitionOptions = new PartitionManagerOptions()
{
LeaseInterval = new TimeSpan(0, 5, 0)
};
_processorFactory = new EventProcessorFactory(/* some data for dependency injection */);
return base.OnStart();
}
public override void OnStop()
{
Trace.TraceInformation("EventHubWorker is stopping");
_cancellationTokenSource.Cancel();
_runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("EventHubWorker has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
int retryCount = 0;
var exceptions = new List<Exception>();
async Task StartProcessing()
{
if (retryCount > 5)
{
throw new AggregateException($"failed to run service, tried {retryCount} times",exceptions);
}
try
{
await _eventProcessorHost.RegisterEventProcessorFactoryAsync(_processorFactory, new EventProcessorOptions
{
InitialOffsetProvider = o => DateTime.UtcNow,
MaxBatchSize = 100,
PrefetchCount = 10,
ReceiveTimeOut = TimeSpan.FromSeconds(20),
});
}
catch(MessagingException e) when (e.IsTransient)
{
retryCount++;
exceptions.Add(e);
await StartProcessing();
}
}
var options = new EventProcessorOptions();
options.ExceptionReceived += Options_ExceptionReceived;
await StartProcessing();
cancellationToken.WaitHandle.WaitOne();
await _eventProcessorHost.UnregisterEventProcessorAsync();
}
private void Options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
Trace.TraceError(e.Exception.Message);
}
}
This is the EventProcessor Code - The factory itself seems irrelevant
class EventProcessor : IEventProcessor
{
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
//never logged
Trace.TraceInformation($"Partition {context.Lease.PartitionId} Closed");
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
else
{
Trace.TraceError(reason.ToString());
}
}
public Task OpenAsync(PartitionContext context)
{
//always logs with the expected lease information
Trace.TraceInformation($"Partition {context.Lease.PartitionId} initiailized with epoch {context.Lease.Epoch}");
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
Trace.TraceInformation("processing event"); //never called
// processing code
}
PartitionManagerOptions has a maximum lease interval of 60 seconds, (same as blob lease) EventProcessorHost won't throw exceptions when initially acquiring leases. Try setting lease interval to 60 seconds instead of 5 minutes.