Managing high/low priority threads in .net

988 Views Asked by At

The scenario is as follows: There are a couple of low priority threads that can be interrupted by high priority threads. Whenever a high priority thread asks the low priority threads to pause, they will go to Wait state (if they are not in wait state already). However when a high priority thread signals that the low priority threads can Resume, the low priority threads should not resume until all the high priority threads that asked the low priority threads to pause have consented.

To solve this I am keeping a track of Pause() calls from the high priority threads to the low priority thread in a counter variable. Whenever the high priority thread asks the low priority thread to Pause(), the value of the counter is incremented by 1. If after the increment the counter has a value of 1, it means the thread was not in Wait, so ask it to go in Wait state. Otherwise just increment counter value. On the contrary when a high priority thread calls Resume() we decrement the counter value and if after the decrement the value is 0, it means the low priority threads can Resume now.

Here is a simplified implementation of my problem. The comparison operation inside if statements with Interlocked.XXX is not correct i.e.

if (Interlocked.Increment(ref _remain) == 1)

, as the read/modify and comparison operations are not atomic.

What am I missing here? I don't want to use thread priority.

using System;
using System.Collections.Generic;
using System.Threading;

namespace TestConcurrency
{

// I borrowed this class from Joe Duffy's blog and modified it
public class LatchCounter
{
 private long _remain;
 private EventWaitHandle m_event;
 private readonly object _lockObject;

public LatchCounter()
{
    _remain = 0;
    m_event = new ManualResetEvent(true);
    _lockObject = new object();
}

public void Check()
{
    if (Interlocked.Read(ref _remain) > 0)
    {
        m_event.WaitOne();
    }
}

public void Increment()
{
    lock(_lockObject)
    {
       if (Interlocked.Increment(ref _remain) == 1)
           m_event.Reset();
    }
}

public void Decrement()
{
    lock(_lockObject)
    {
       // The last thread to signal also sets the event.
       if (Interlocked.Decrement(ref _remain) == 0)
           m_event.Set();
    }
}
}



public class LowPriorityThreads
{
private List<Thread> _threads;
private LatchCounter _latch;
private int _threadCount = 1;

internal LowPriorityThreads(int threadCount)
{
    _threadCount = threadCount;
    _threads = new List<Thread>();
    for (int i = 0; i < _threadCount; i++)
    {
        _threads.Add(new Thread(ThreadProc));
    }

    _latch = new CountdownLatch();
}


public void Start()
{
    foreach (Thread t in _threads)
    {
        t.Start();
    }
}

void ThreadProc()
{
    while (true)
    {
        //Do something
        Thread.Sleep(Rand.Next());
        _latch.Check();
    }
}

internal void Pause()
{
    _latch.Increment();
}

internal void Resume()
{
    _latch.Decrement();
}
}


public class HighPriorityThreads
{
private Thread _thread;
private LowPriorityThreads _lowPriorityThreads;

internal HighPriorityThreads(LowPriorityThreads lowPriorityThreads)
{
    _lowPriorityThreads = lowPriorityThreads;
    _thread = new Thread(RandomlyInterruptLowPriortyThreads);
}

public void Start()
{
    _thread.Start();
}

void RandomlyInterruptLowPriortyThreads()
{
    while (true)
    {
        Thread.Sleep(Rand.Next());

        _lowPriorityThreads.Pause();

        Thread.Sleep(Rand.Next());
        _lowPriorityThreads.Resume();
    }
}
}

 class Program
 {
  static void Main(string[] args)
  {
    LowPriorityThreads lowPriorityThreads = new LowPriorityThreads(3);
    HighPriorityThreads highPriorityThreadOne = new HighPriorityThreads(lowPriorityThreads);
    HighPriorityThreads highPriorityThreadTwo = new HighPriorityThreads(lowPriorityThreads);

    lowPriorityThreads.Start();
    highPriorityThreadOne.Start();
    highPriorityThreadTwo.Start();
}
}


class Rand
{
internal static int Next()
{
    // Guid idea has been borrowed from somewhere on StackOverFlow coz I like it
    return new System.Random(Guid.NewGuid().GetHashCode()).Next() % 30000;
}
}
1

There are 1 best solutions below

0
On

I don't know about your requirements hence I won't discuss them here. As far as the implementation goes, I would introduce a "dispatcher" class that will handle inter-threads interaction and also acts a a factory for "runnable" objects.

The implementation, of course is very rough and open for criticism.

class Program
{
    static void Main(string[] args)
    {
        ThreadDispatcher td=new ThreadDispatcher();
        Runner r1 = td.CreateHpThread(d=>OnHpThreadRun(d,1));
        Runner r2 = td.CreateHpThread(d => OnHpThreadRun(d, 2));

        Runner l1 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 1"));
        Runner l2 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 2"));
        Runner l3 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 3"));


        l1.Start();
        l2.Start();
        l3.Start();

        r1.Start();
        r2.Start();

        Console.ReadLine();

        l1.Stop();
        l2.Stop();
        l3.Stop();

        r1.Stop();
        r2.Stop();
    }

    private static void OnHpThreadRun(ThreadDispatcher d,int number)
    {
        Random r=new Random();
        Thread.Sleep(r.Next(100,2000));
        d.CheckedIn();
        Console.WriteLine(string.Format("*** Starting High Priority Thread {0} ***",number));
        Thread.Sleep(r.Next(100, 2000));
        Console.WriteLine(string.Format("+++ Finishing High Priority Thread {0} +++", number));
        Thread.Sleep(300);
        d.CheckedOut();           
    }
}

public abstract class Runner
{
    private Thread _thread;
    protected readonly Action<ThreadDispatcher> _action;
    private readonly ThreadDispatcher _dispathcer;
    private long _running;
    readonly ManualResetEvent _stopEvent=new ManualResetEvent(false);
    protected Runner(Action<ThreadDispatcher> action,ThreadDispatcher dispathcer)
    {
        _action = action;
        _dispathcer = dispathcer;
    }

    public void Start()
    {
        _thread = new Thread(OnThreadStart);
        _running = 1;
        _thread.Start();
    }

    public void Stop()
    {
        _stopEvent.Reset();
        Interlocked.Exchange(ref _running, 0);
        _stopEvent.WaitOne(2000);
        _thread = null;
        Console.WriteLine("The thread has been stopped.");

    }
    protected virtual void OnThreadStart()
    {
        while (Interlocked.Read(ref _running)!=0)
        {
            OnStartWork();
            _action.Invoke(_dispathcer);
            OnFinishWork();
        }
        OnFinishWork();
        _stopEvent.Set();
    }

    protected abstract void OnStartWork();
    protected abstract void OnFinishWork();
}

public class ThreadDispatcher
{
    private readonly ManualResetEvent _signal=new ManualResetEvent(true);
    private int _hpCheckedInThreads;
    private readonly object _lockObject = new object();

    public void CheckedIn()
    {
        lock(_lockObject)
        {
            _hpCheckedInThreads++;
            _signal.Reset();
        }
    }
    public void CheckedOut()
    {
        lock(_lockObject)
        {
            if(_hpCheckedInThreads>0)
                _hpCheckedInThreads--;
            if (_hpCheckedInThreads == 0)
                _signal.Set();
        }
    }

    private class HighPriorityThread:Runner 
    {
        public HighPriorityThread(Action<ThreadDispatcher> action, ThreadDispatcher dispatcher) : base(action,dispatcher)
        {
        }

        protected override void OnStartWork()
        {
        }

        protected override void OnFinishWork()
        {
        }
    }
    private class LowPriorityRunner:Runner
    {
        private readonly ThreadDispatcher _dispatcher;
        public LowPriorityRunner(Action<ThreadDispatcher> action, ThreadDispatcher dispatcher)
            : base(action, dispatcher)
        {
            _dispatcher = dispatcher;
        }

        protected override void OnStartWork()
        {
            Console.WriteLine("LP Thread is waiting for a signal.");
            _dispatcher._signal.WaitOne();
            Console.WriteLine("LP Thread got the signal.");
        }

        protected override void OnFinishWork()
        {

        }
    }

    public Runner CreateLpThread(Action<ThreadDispatcher> action)
    {
        return new LowPriorityRunner(action, this);
    }

    public Runner CreateHpThread(Action<ThreadDispatcher> action)
    {
        return new HighPriorityThread(action, this);
    }
}

}