Can not get results of TaskCompletionSource

1.6k Views Asked by At

Hello I have the following problem:

I want to perform something similar to a transaction. I want to execute a number of async operations after I receive an external trigger.Therefore I am using a TaskCompletionSource that gets set in a method representing the trigger :TriggerTransaction.

This trigger method gets called in Main on the thread pool when i press a specific console key.

After I press the A keyword the TriggerTransaction gets executed and the TaskCompletionSource-s get set.Still the main thread does not compute the sum of the two awaited tasks.

     class Program
                {
                    public static Task<Task<int>> TransactionOperation1()
                    {
                        TaskCompletionSource<Task<int>> tcs = new TaskCompletionSource<Task<int>>();
                        tasks.Add(tcs);
                        Task<Task<int>> result = tcs.Task;
                        return result;
                    }
                    public static Task<Task<int>> TransactionOperation2()
                    {
                        TaskCompletionSource<Task<int>> tcs = new TaskCompletionSource<Task<int>>();
                        tasks.Add(tcs);
                        Task<Task<int>> result = tcs.Task;
                        return result;
                    }

                    public static async Task<int> ExecuteTransactionOnDB()
                    {
                        await Task.Delay(1000);
                        return 5;
                    }

                    public static async Task TriggerTransaction()
                    {
                        int value = await ExecuteTransactionOnDB();
                        foreach (var item in tasks)
                        {
                            item.SetResult(value);
                        }
                    }
                    public static List<dynamic> tasks = new List<dynamic>();

                    static async Task Main(string[] args)
                    {
                        Task<Task<int>> a = TransactionOperation1();
                        Task<Task<int>> b = TransactionOperation2();
                        Task.Run(async() =>
                        {
                            while (Console.ReadKey().Key != ConsoleKey.A) ;
                            await TriggerTransaction();
                        });
                        if (!File.Exists("D:\\data.txt"))
                        {
                            File.Create("D:\\data.txt");
                        }
                        using(FileStream stream=new FileStream("data.txt",FileMode.Append,FileAccess.Write))
                        {
                        int sum=await await a + await await b;//thread wont pass this line when tasks are set.
                        ReadOnlyMemory<byte> bytes = Encoding.UTF8.GetBytes(sum);

                            stream.Write(bytes.ToArray());
                        }
                        Console.WriteLine(await await a + await await b);

                    }
                }
        }

P.S If you are wondering why I did use a List<dynamic> to store the TaskCompletionSource-s ,it's because the TransactionOperations will differ in return type.Some of them will return int,others String ..Bool..etc.

For a better understanding i made a schema- As you will see there are:
-A list where i want to store the TCS-es
-Some Calls that are completed only after the external trigger was set(the transaction was executed)
As you can see in the Calls,all have different return types.
enter image description here

2

There are 2 best solutions below

0
On BEST ANSWER

Why would you need a Task<Task<int>>? Simply Task<int> is enough, and accordingly, TaskCompletionSource<int>. And you also get rid of an awkward await await ..., which isn't required in your case either.

Note that I also added Close() to the stream returned by File.Create().

Here is a working version of the program:

class Program
{
    public static Task<int> TransactionOperation1()
    {
        TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
        tasks.Add(tcs);
        return tcs.Task;
    }
    public static Task<int> TransactionOperation2()
    {
        TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
        tasks.Add(tcs);
        return tcs.Task;
    }
    public static async Task<int> ExecuteTransactionOnDB()
    {
        await Task.Delay(1000);
        return 5;
    }
    public static async Task TriggerTransaction()
    {
        int value = await ExecuteTransactionOnDB();
        foreach (var item in tasks)
        {
            item.SetResult(value);
        }
    }

    public static List<dynamic> tasks = new List<dynamic>();

    static async Task Main(string[] args)
    {
        Task<int> a = TransactionOperation1();
        Task<int> b = TransactionOperation2();
        Task input = Task.Run(async () => {
            while (Console.ReadKey().Key != ConsoleKey.A);
            await TriggerTransaction();
        });
        if (!File.Exists("C:\\temp\\data.txt"))
        {
            File.Create("C:\\temp\\data.txt").Close();
        }
        using (FileStream stream = new FileStream("C:\\temp\\data.txt", FileMode.Append, FileAccess.Write))
        {
            int sum = await a + await b; // now it works ok
            var bytes = Encoding.UTF8.GetBytes(sum.ToString());
            stream.Write(bytes);
        }

        Console.WriteLine(await a + await b);
    }
}
6
On

Check out the modified version of the code, it produce the expected result, by executing the Task created using TaskCompletionSource. I have made the code the Generic too, so that you don't need to use the dynamic type and define the datatype at the compile time

static async Task Main(string[] args)
{
    var a = Program<int>.TransactionOperation1();
    var b = Program<int>.TransactionOperation2();
    await Task.Run(async() =>
    {
    Console.ReadLine();
    await Program<int>.TriggerTransaction(5);
    });

    if (!File.Exists("D:\\data.txt"))
    {
    File.Create("D:\\data.txt");
    }

    using (FileStream stream = new FileStream("D:\\data.txt", FileMode.Append, FileAccess.Write))
    {
        int sum = await a + await b;//thread wont pass this line when tasks are set.        
        var bytes = Encoding.UTF8.GetBytes(sum.ToString());     
        stream.Write(bytes, 0, bytes.Length);
    }

    Console.WriteLine(await a + await b);
}

class Program<T>
{
    public static Task<T> TransactionOperation1()
    {
        var tcs = new TaskCompletionSource<T>();
        tasks.Add(tcs);
        return tcs.Task;
    }

    public static Task<T> TransactionOperation2()
    {
        var tcs = new TaskCompletionSource<T>();
        tasks.Add(tcs);
        return tcs.Task;
    }

    public static async Task<T> ExecuteTransactionOnDB(T t)
    {
        return await Task.FromResult(t);
    }

    public static async Task TriggerTransaction(T t)
    {
        T value = await ExecuteTransactionOnDB(t);

        foreach (var item in tasks)
        {
            item.SetResult(value);
        }
    }

    public static List<TaskCompletionSource<T>> tasks = new List<TaskCompletionSource<T>>();

}

Following are the important modifications:

  1. List<dynamic> is replaced by List<TaskCompletionSource<T>>
  2. TransactionOperation1/2 have return type Task<T>, which is the Task created using the TaskCompletionSource<T>
  3. Added an extra await to the Task.Run, which executes the TriggerTransaction internally, though you can replace the following code:

     await Task.Run(async() =>
     {
       Console.ReadLine();
       await Program<int>.TriggerTransaction(5);
     });
    

    with await Program<int>.TriggerTransaction(5);

Now it produces the result as you expect, it will sum up the two integers. Few more small changes like removing Task.Delay, which is not required

EDIT 1 - Using Task.WhenAll

static async Task Main(string[] args)
{
    var a = Program.TransactionOperation1(5);
    var b = Program.TransactionOperation1(5);

    Console.ReadLine();

    var taskResults  = await Task.WhenAll(a,b);


    dynamic finalResult = 0;

    foreach(var t in taskResults)
        finalResult += t;


    if (!File.Exists("D:\\data.txt"))
    {
        File.Create("D:\\data.txt");
    }

    using (FileStream stream = new FileStream("D:\\data.txt", FileMode.Append, FileAccess.Write))
    {
        var bytes = Encoding.UTF8.GetBytes(finalResult.ToString());
        stream.Write(bytes, 0, bytes.Length);
    }

    Console.WriteLine(finalResult);
}

class Program
{
    public static Task<dynamic> TransactionOperation1(dynamic val)
    {
        return Task<dynamic>.Run(() => val);
    }

    public static Task<dynamic> TransactionOperation2(dynamic val)
    {
        return Task<dynamic>.Run(() => val);
    }

}