What is the best way to cancel a task that is in a blocking state?

1.3k Views Asked by At

I have tasks running that call a method that reads from RabbitMQ. When there is nothing in the queue, the method simply blocks. So the tasks have a "running" status, but aren't actually doing anything. Is there any way to gracefully end these tasks?

The code that accesses the queue is as follows:

 private void FindWork(CancellationToken ct)
    {
        if (ct.IsCancellationRequested)
            return;

        bool result = false;
        bool process = false;
        bool queueResult = false;
        Work_Work work = null;

        try
        {
            using (Queue workQueue = new Queue(_workQueue))
            {
                // Look for work on the work queue
                workQueue.Open(Queue.Mode.Consume);
                work = workQueue.ConsumeWithBlocking<Work_Work>();

                // Do some work with the message ...

                return;

The tasks are created as follows:

private void Run()
    {
        while (!_stop)
        {
            // Remove and stopped tasks from the pool
            List<int> removeThreads = new List<int>();

            lock (_tasks)
            {
                foreach (KeyValuePair<int, Task> task in _tasks)
                {
                    if (task.Value.Status != TaskStatus.Running)
                    {
                        task.Value.Wait();
                        removeThreads.Add(task.Value.Id);
                    }
                }

                foreach (int taskID in removeThreads)
                    _tasks.Remove(taskID);
            }

            CancellationToken ct = _cts.Token;
            TaskFactory factory = new TaskFactory(ct, TaskCreationOptions.LongRunning, TaskContinuationOptions.LongRunning, null);

            // Create new tasks if we have room in the pool
            while (_tasks.Count < _runningMax)
            {
                Task task = factory.StartNew(() => FindWork(ct));

                lock (_tasks)
                    _tasks.Add(task.Id, task);
            }

            // Take a rest so we don't run the CPU to death
            Thread.Sleep(1000);
        }
    }

Currently I have changed my task creation code to look like the following so that I can abort the tasks. I know this is not a good solution, but I don't know what else to do.

while (_tasks.Count < _runningMax)
            {
                Task task = factory.StartNew(() =>
                    {
                        try
                        {
                            using (_cts.Token.Register(Thread.CurrentThread.Abort))
                            {
                                FindWork(ct);
                            }
                        }
                        catch (ThreadAbortException)
                        {
                            return;
                        }
                    }, _cts.Token);

                _tasks.Add(task.Id, task);
            }
2

There are 2 best solutions below

0
On BEST ANSWER

Could the following work in your scenario?

Instead of spawning multiple threads and having them waiting in the queue, I would have a single thread in an infinite polling loop and having that one spawn a new thread when a new piede of work comes in. You can add a semaphore to limit the number of threads you create. Check sample code below, I've used a BlockingCollection instead of the RabbitMQ .

  public class QueueManager
    {
        public BlockingCollection<Work> blockingCollection = new BlockingCollection<Work>();
        private const int _maxRunningTasks = 3;

        static SemaphoreSlim _sem = new SemaphoreSlim(_maxRunningTasks);

        public void Queue()
        {
            blockingCollection.Add(new Work());
        }

        public void Consume()
        {
            while (true)
            {
                Work work = blockingCollection.Take();

                _sem.Wait();

                Task t = Task.Factory.StartNew(work.DoWork);
            }
        }

        public class Work
        {
            public void DoWork()
            {
                Thread.Sleep(5000);
                _sem.Release();
                Console.WriteLine("Finished work");
            }
        }
    }

and my testing class

class Test
    {
        static void Main(string[] args)
        {
            Consumer c = new Consumer();
            Task t = Task.Factory.StartNew(c.Consume);

            c.Queue();
            c.Queue();
            c.Queue();
            c.Queue();
            c.Queue();

            Thread.Sleep(1000);
            Console.ReadLine();
        }
    }
0
On

To make this work, you'd need to change ConsumeWithBlocking to support cancellation. I'm not familiar with RabbitMQ, but apparently it supports cancellation on the consumer channel.

So, instead of doing Thread.CurrentThread.Abort from Token.Register callback, do the right thing and cancel the operation gracefully via proper RabbitMQ API.

On a side note, the thread you're currently trying to abort is most likely not the one which is blocked by ConsumeWithBlocking.