There are a bunch of items added to the database continuously which need to be processed. I want non similar items to be processed in parallel.
for e.g.:
Type A Items: Item 1, Item 2 ,Item 3
Type B Items: Item 4, Item 5 ,Item 6
Type C Items: Item 7, Item 8 ,Item 9
Item 1, Item 4 and Item 7 should be processed paralelly.
As more items of a type get added to the database, they will be selected and queued to be processed only after the previous items of this type are processed.
I think I can do this using a static Task Factory with a CustomTaskScheduler, which will start a new task only after the previous task of that type has finished? My question is how should my CustomTaskScheduler look?
class test
{
private static void Main()
{
//List of items from the database
var itemList = new List<Item>();
itemList.Add(new Item(1, "A"));
itemList.Add(new Item(2, "A"));
itemList.Add(new Item(3, "A"));
itemList.Add(new Item(4, "B"));
itemList.Add(new Item(5, "B"));
itemList.Add(new Item(6, "B"));
itemList.Add(new Item(7, "C"));
itemList.Add(new Item(8, "C"));
itemList.Add(new Item(9, "C"));
//This needs to be run on a timer picking up new items from the database every time
new ProcessQueue().ProcessAllItems(itemList);
Console.ReadLine();
}
}
public class ProcessQueue
{
private static CustomTaskScheduler customTaskScheduler = new CustomTaskScheduler(1);
private static TaskFactory factory = new TaskFactory(customTaskScheduler);
public void ProcessAllItems(List<Item> itemList)
{
var cts = new CancellationTokenSource();
foreach (var item in itemList)
{
factory.StartNew(
o =>
executeTask(item.Id, item.ItemType),
item.ItemType, //unique identifier for multiple threads
cts.Token);
}
}
public void executeTask(int id, string parentId)
{
Console.WriteLine("Item - {0} ItemType - {1} on thread {1} ", id, parentId,
Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(5000);
}
}
public class Item
{
public Item(int id, string itemType)
{
Id = id;
ItemType = itemType;
}
public int Id { get; set; }
public string ItemType { get; set; }
}
How about this, adapted from How to: Create a Task Scheduler That Limits Concurrency. It uses Task.AsyncState, which should be your Item.ItemType given your example, to synchronise threads.
I'm not sure about how thread safety works with GetScheduledTasks though.