Sync-over-async is bad. I know. But is there a sync-over-async problem when calling TaskCompletionSource.Task.Wait()? Does the answer changes if TaskCompletionSource was created with TaskCreationOptions.RunContinuationsAsynchronously?
Update
To answer the questions from the comments. Not all Task's are equal. The Task object was introduced long before async/await and was used for parallel programming. For example, there is nothing wrong with the following code as it doesn't do any async work.
var task = Task.Run(() => Thread.Sleep(10_000));
task.Wait();
For the context: The Kafka client has a sync method to produce messages which accepts an action to report delivery status asynchronously
void Produce(
TopicPartition topicPartition,
Message<TKey, TValue> message,
Action<DeliveryReport<TKey, TValue>> deliveryHandler = null);
In a few scenarios, I need to wait for the delivery report before continuing the work, which can be in sync or async context. For that, I have the following class:
internal class DeliveryReportAwaiter<TKey, TValue> : IDisposable
{
private const int WaitForDeliveryGracePeriodFactor = 2;
private readonly int _waitDeliveryReportTimeoutMs;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly TaskCompletionSource _taskCompletionSource;
private bool _disposed;
public DeliveryReportAwaiter(int waitDeliveryReportTimeoutMs, ILogger logger)
{
_logger = logger;
_waitDeliveryReportTimeoutMs = waitDeliveryReportTimeoutMs *
WaitForDeliveryGracePeriodFactor;
_taskCompletionSource = new TaskCompletionSource(
TaskCreationOptions.RunContinuationsAsynchronously);
_cancellationTokenSource = new CancellationTokenSource();
// in case OnDeliveryReportReceived was never called
_cancellationTokenSource.Token.Register(SetTaskTimeoutException);
}
public void WaitForDeliveryReport(CancellationToken token)
{
token.ThrowIfCancellationRequested();
_cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
// Is this considered sync-over-async?
_taskCompletionSource.Task.Wait(token);
}
public Task WaitForDeliveryReportAsync(CancellationToken token)
{
token.ThrowIfCancellationRequested();
_cancellationTokenSource.CancelAfter(_waitDeliveryReportTimeoutMs);
return _taskCompletionSource.Task.WaitAsync(token);
}
public void OnDeliveryReportReceived(DeliveryReport<TKey, TValue> deliveryReport,
Action<DeliveryReport<TKey, TValue>> handleReportAction)
{
if (_disposed)
{
_logger.LogWarning(
"The delivery report for the message {Key} on topic {Topic} arrived " +
"after the awaiter was disposed due to timeout or cancellation. " +
"The delivery status is {Status}",
deliveryReport.Key,
deliveryReport.Topic,
deliveryReport.Status);
return;
}
if (!_cancellationTokenSource.TryReset())
{
SetTaskTimeoutException();
}
else
{
handleReportAction?.Invoke(deliveryReport);
_taskCompletionSource.TrySetResult();
}
}
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_cancellationTokenSource.Dispose();
}
private void SetTaskTimeoutException()
{
var errorMessage = $"Producer timed out while waiting for publish " +
$"confirm for {_waitDeliveryReportTimeoutMs}ms!";
_taskCompletionSource.TrySetException(new KafkaTimeoutException(errorMessage));
}
}
See the WaitForDeliveryReport method implementation.
Now the question is much longer but I hope it will help people to understand the reason behind it.
Initializing the
TaskCompletionSourcewith theTaskCreationOptions.RunContinuationsAsynchronouslyoption means that theTaskwill be completed on theThreadPool, and not on the thread that called theTrySetExceptionmethod. This is useful in case that theTaskisawaited, so that the completion thread is not hijacked by the continuation(s) after theawait. Having your thread "stolen" after callingSetResultis a common cause of bugs in asynchronous programming. It is something highly unexpected by most programmers, before they experience it for the first time (usually at the finale a long and unpleasant debugging session).But in case that the
TaskisWaited synchronously, completing it on theThreadPooloffers no advantage. The continuation after theWaitwill run on the waiting thread, and not on the completing thread. All that the completing thread will have to do is to signal an internalManualResetEventSlim, which is a negligible amount of work.So you should avoid initializing the
TaskCompletionSourcewith this option, if you intend toWaitit synchronously. Doing so not only adds overhead for no reason, but also risks the delayed completion of the task, in case theThreadPoolis saturated. In extreme cases it's even possible for a complete deadlock to occur, in case theThreadPoolis saturated and has reached its maximum size.