My application streams from a camera using the RTSPClientSharp library, there is an OnFramesReceived event that gets raised when a decoded frame is ready. I was converting the decoded frame to a Bitmap in the same Event, This is a blocking call and takes more than 100ms and is causing the frame rate to slow down to 10 FPS.
To solve this I have used the Task Queue code from here which queues up the ProcessFrame event(has the code to convert decoded frame to Bitmap) using Task.ContinueWith.UnWrap. My aim is to execute the ProcessFrame calls sequentially in the order I received the frames. Using the Task Queue solved the problem of blocking call and now I'm able to process 30 Frames Per Second.
However, I'm having a memory issue now, if my application is running longer, the memory usage is gradually increasing. ANTS memory profiler says (Check ScreenShot) that the ContinuationResultFrom Task is the largest class in the Gen2.
Update Some of the facts I'd like to include, I have 10 such cameras connected to my application, each camera has its own instance of the camera class. I'm using a 16 core processor with hyperthreading and 32GB of RAM, still, if the CPU can't handle the load I would prefer to decrease the FPS to 10.
private void OnFramesReceived(object sender, RawFrame rawFrame)
{
taskQueue.Enqueue(() => Task.Run(() => ProcessFrame?.Invoke(this, decodedFrame)));
}
private void HandleProcessFrame(object sender, IDecodedVideoFrame decodedFrame)
{
try
{
using (Bitmap bmpBitmap = new Bitmap(m_Width, m_Height))
{
BitmapData bmpData = bmpBitmap.LockBits(new Rectangle(0, 0, bmpBitmap.Width, bmpBitmap.Height), ImageLockMode.WriteOnly, bmpBitmap.PixelFormat);
try
{
decodedFrame.TransformTo(
bmpData.Scan0,
bmpData.Stride,
_transformParameters);
}
finally
{
bmpBitmap.UnlockBits(bmpData);
}
base.OnNewFrameEvent(this, bmpBitmap);
decodedFrame = null;
}
}
catch (Exception ex)
{
Logng.LogError(ex);
}
}
public class TaskQueue
{
private Task previous = Task.FromResult(false);
private object key = new object();
public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
lock (key)
{
var next = previous.ContinueWith(t => taskGenerator()).Unwrap();
previous = next;
return next;
}
}
public Task Enqueue(Func<Task> taskGenerator)
{
lock (key)
{
var next = previous.ContinueWith(t => taskGenerator(), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
previous = next;
return next;
}
}
}
By using continuations you are creating a queue that is not centrally controlled, and also one that is not memory efficient. Your are paying 200-300 bytes overhead for each continuation, on top of the actual payload (the
RawFrame). I suggest to switch to something more organized and efficient, like the TPL Dataflow library.Below is an example of using the TPL Dataflow library. A single
ActionBlock, the simplest component of this library, provides the horse power for the computations. You can configure the size of its internal queue by setting theBoundedCapacityoption. When the queue becomes full, oncoming messages will be dropped (thePostmethod will returnfalse). You can also configure theMaxDegreeOfParallelism. You can either utilize all the available cores/processors of the machine, or let a core or two free to do other work.The TPL Dataflow library is built-in the .NET Core, and available as a package for .NET Framework.