Access Queue<T> from two delegate.BeginInvoke Async Methods Simultaneously

539 Views Asked by At

I'm inexperienced with threading and asynchronous processing so I'm not sure if what I'm doing is safe.

UPDATE: This is restricted to .Net 3.5 unfortunately so no ConcurrentQueue (unless someone's implemented it for .Net 3.5 or knows an alternative implementation?)

UPDATE 2: I found this approach using the source from a Mono implementation of ConcurrentQueue (which is perfectly compatible code for .Net 3.5).

I have one object who's job it is to manage a Queue<MyObject> internally. This class needs to do its operations asynchronously (it will be running continuously between the OnStart and OnStop events of a Windows Service). Anyway, I'll let the code do the talking:

public class MyObjectQueueManager : IMyObjectQueueManager
{
    private bool shouldContinueEnqueuing;
    private readonly Queue<MyObject> queue;
    private readonly IMyObjectIdentifier myObjectIdentifier;

    public MyObjectQueueManager( IMyObjectIdentifier myObjectIdentifier )
    {
        this.myObjectIdentifier = myObjectIdentifier;
        queue = new Queue<MyObject>();
    }

    public void StartQueue()
    {
        shouldContinueEnqueuing = true;

        var enqueuer = new MyObjectEnqueuer( EnqueueMyObjects );
        enqueuer.BeginInvoke( myObjectIdentifier, queue, StopEnqueuingMyObjects, new object() );

        var dequeuer = new MyObjectDequeuer( DequeueMyObjects );
        dequeuer.BeginInvoke( queue, StopDequeuingMyObjects, new object() );
    }

    public void StopQueue()
    {
        shouldContinueEnqueuing = false;
    }

    public event EventHandler<NextMyObjectEventArgs> OnNextMyObjectAvailable;

    private void EnqueueMyObjects( IMyObjectIdentifier identifier, Queue<MyObject> queue )
    {
        while ( shouldContinueEnqueuing )
        {
            var myObjects = identifier.GetMyObjects();

            foreach ( var myObject in myObjects )
            {
                queue.Enqueue( myObject );
            }

            WaitForQueueToShrink( queue, 1000 /* arbitrary queue limiter - will come from settings. */ );
        }
    }

    private void DequeueMyObjects( Queue<MyObject> queue )
    {
        while ( queue.Count > 0 || shouldContinueEnqueuing )
        {
            if ( queue.Count > 0 )
            {
                OnNextMyObjectAvailable( this, new NextMyObjectEventArgs( queue.Dequeue() ) );
            }

            Thread.Sleep( 1 );
        }
    }

    private static void WaitForQueueToShrink( ICollection queue, int queueLimit )
    {
        while ( queue.Count > queueLimit )
        {
            Thread.Sleep( 10 );
        }
    }

    private static void StopEnqueuingMyObjects( IAsyncResult result )
    {
        var asyncResult = ( AsyncResult ) result;
        var x = ( MyObjectEnqueuer ) asyncResult.AsyncDelegate;

        x.EndInvoke( asyncResult );
    }

    private static void StopDequeuingMyObjects( IAsyncResult result )
    {
        var asyncResult = ( AsyncResult ) result;
        var x = ( MyObjectDequeuer ) asyncResult.AsyncDelegate;

        x.EndInvoke( asyncResult );
    }

    private delegate void MyObjectEnqueuer( IMyObjectIdentifier myObjectIdentifier, Queue<MyObject> queue );

    private delegate void MyObjectDequeuer( Queue<MyObject> queue );
}

My notes/thoughts/concerns:

  • If I understand delegate.BeginInvoke correctly, I'm spawning two threads and both are accessing the Queue (through the invocation methods parameters).
  • One thread is only enqueuing items.
  • One thread is only dequeuing items, and it always checks that items are in the queue first.
  • I'm unsure if this is thread safe - will the queue get locked by one thread and not be accessible from the other thread?
  • If so, what alternative approach/technique could I use which would allow me to independently enqueue/dequeue items without holding up the main worker thread?
  • If this approach is actually somehow sound (you never know) will it scale to handle large volume? I.e. if the IMyObjectIdentifier is returning thousands of items per second will the scenario work (of course, I will be doing volume testing to confirm).

As a bonus, I've noticed that there is similar code being used to call EndInvoke from the StopEnqueuingMyObjects and StopDequeuingMyObjects methods - I'd like to refactor to use a single method if I can but when I tried using a generic method like this:

private static void StopProcessingMyObjects<T>( IAsyncResult result )
{
    var asyncResult = ( AsyncResult ) result;
    var x = ( T ) asyncResult.AsyncDelegate;

    x.EndInvoke( asyncResult ); 
}

It doesn't like this because it doesn't know that T is a delgate and so EndInvoke can't be called. Is it possible to do this?

1

There are 1 best solutions below

3
On BEST ANSWER

Try the ConcurrentQueue<T>. This is a thread safe implementation of a Queue. The API is slightly different so it might require some modification of current code to implement.