I have the following scenario:
public class MyHostedService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly IEventSource _eventSource;
private readonly ILogger _logger;
private IDisposable? _subscription;
public MyHostedService(
IServiceProvider serviceProvider,
IEventSource eventSource,
ILogger<MyHostedService> logger)
{
_serviceProvider = serviceProvider;
_eventSource = eventSource;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_subscription = _eventSource.CreateEventsObservable()
.Buffer(TimeSpan.FromSeconds(1), 100)
.Where(batch => batch.Count is not 0)
.Select((pes, batchNumber) => Observable.FromAsync(() => ProcessBatch(batchNumber, pes, stoppingToken)))
.Concat()
.Subscribe(
onNext: (u) => { },
onError: (e) => _logger.LogSubscriptionFailure(e),
onCompleted: () => _logger.LogCompletedSubscription()
);
}
private async Task ProcessBatch(
int batchNumber,
IList<MyEvent> events,
CancellationToken cancellationToken)
{
try
{
using var scope = _serviceProvider.CreateScope();
var processor = scope.ServiceProvider.GetRequiredService<IProcessor>();
await processor.Process(events, cancellationToken);
_logger.LogProcessedBatch(batchNumber, events.Count);
}
catch (Exception e) when (e is not OperationCanceledException)
{
_logger.LogBatchError(e, batchNumber, events.Count);
}
}
public override async Task StopAsync(CancellationToken stoppingToken) => await base.StopAsync(stoppingToken);
public override void Dispose()
{
_subscription?.Dispose();
base.Dispose();
}
}
There are a few things that bother me:
- I noticed that the processing is a bit slow - even though the source observable (
_eventSource) produces lots of events, the batches don't come in too fast. I wonder if I could improve the throughput here. MaybeProcessBatchshould useTask.Runinternally to run the processor? - There are lots of events. Since
IProcessorhas some transient dependencies (like types http clients), I think I shouldn't just injectIProcessorand use that instance for the whole lifetime of the app. Instead I'm creating a scope everytime a batch is being processed. Is it OK for performance? Could it be better?
It's always best to have a full Rx solution. Calling
Subscribein yourExecuteAsyncis not the best way to go.You can await an observable, you know.
That's close. Not quite your original query, but it should do the same processing and it's only creating one scope.
ou need this extension method: