I have an app that receives a stream of XML events from Kafka. These events have to be deserialized/parsed and otherwise converted, before being handed in-order to some business logic. (This logic then on emits other events on the output side).
The parsing/conversion code is stateless, while the domain code is stateful and has to receive events in order. These two steps are decoupled through use of a System.Threading Channel so that parsing step gets a full 'thread'/'cpu' (async task).
My challenge is that the parsing is CPU heavy, and it hits a 100% CPU on one core thereby being the bottleneck for service throughput. I've tried to use multi-threading / parallel processing, and this has improved the throughput somewhat. However my approach seems un-elegant, and potentially with a lot of overhead.
In the parsing step I've used Task.Run() to spawn a Task for each 'item', and then added the Task to the output queue ensuring the Tasks are added according to input order. The consumer then pulls tasks from the Channel one at a time, and waits for it to complete with a result before continuing.
This means I'm creating and submitting a large amount of Tasks, and in general seems like I'm using a lot of thread coordinating operations in the hot-path.
Was hoping someone here would have a good approach for processing items in order while respecting the ordering of the output.
So you have a
Channel<Task<T>>as a conveyor belt, the producer adds tasks withchannel.Writer.TryWrite(Task.Run(() => Parse(item))), and the consumer reads the tasks and awaits them the one after the other:This is a quite good setup. A disadvantage is that you are not controlling the degree of parallelism. So at some moments you might have too many
Task.Runactions running in parallel, resulting inThreadPoolstarvation, that might affect negatively other parts of your application. You can solve this problem by scheduling the work with the more advancedTask.Factory.StartNewinstead of theTask.Run, and configure theschedulerargument with theConcurrentSchedulerproperty of a sharedConcurrentExclusiveSchedulerPairinstance.Another approach is to replace the channel with a
TransformBlock<TInput,TOutput>from the TPL Dataflow library. This component combines an input buffer, an output buffer, and a processor that transforms theTInputtoTOutput. It is equipped out of the box with parallel capabilities and order preservation. Here is an example:The producer feeds the block with
block.Post(item), and the consumer enumerates the output buffer of the block with theReceiveAllAsyncmethod:The
await block.Completion;at the end is needed because theReceiveAllAsyncmethod currently has a bug, and doesn't propagate possible exceptions as part of the enumeration.My expectation is that the
TransformBlockapproach should have less overhead, and consume less memory than your current setup. The TPL Dataflow library is advertised by Microsoft as suitable for "coarse-grained dataflow and pipelining tasks". This means that yourParsemethod should be chunky. In case it is feather-weight, like parsing a single number, most likely the benefits of parallelization will be negated by the synchronization overhead. In that case the solution might be to chunkify the work using aBatchBlock<T>.The TPL Dataflow library is not exactly cutting edge technology. It predates
ValueTasks and so it doesn't take advantage of them. It also comes with some quirks, like swallowingOperationCanceledExceptions that might be thrown by thetransformdelegate. It is also very difficult to extend. Although it should be better than what you have already, it's not the absolutely optimal solution, but it might be good enough for your needs.