I am consuming one data streaming source which pushes data in interval >= 5 seconds. I have one observer like below to receive push notifications:
public class ConsoleDataObserver : IObserver<DataPipeEvent>
{
private DataPipeType _dataPipeType;
private LimitedConcurrencyLevelTaskScheduler _scheduler;//This is based on msdn parallel extensions examples
private TaskFactory _factory;
public ConsoleDataObserver(DataPipeType afDataPipeType)
{
_dataPipeType = afDataPipeType;
_scheduler = new LimitedConcurrencyLevelTaskScheduler(5);
_factory = new TaskFactory(_scheduler);
}
public void OnNext(DataPipeEvent dataPipeEvent)
{
_factory.StartNew(() => ProcessDataNow(dataPipeEvent));
}
private void ProcessDataNow(DataPipeEvent dataPipeEvent)
{
Thread.Sleep(8000); //just want to simulate long running tasks
}
public void OnError(Exception error)
{
Console.WriteLine("Provider has sent an error");
}
public void OnCompleted()
{
Console.WriteLine("Provider has terminated sending data");
}
}
I have following requirements:
In my OnNext, I don't want to block main thread and want to do long running processing tasks in other thread. I am using TaskScheduler to used ThreadPool. Is it good implementation? OnNext can get 500-1000 events per second. ProcessDataNow will log in case of any exception.