What's the right way to implement ValueTaskSource.SetCompleted

705 Views Asked by At

So we have this class implementing IValueTaskSource This code cannot be written as async-await because there's nothing to await on. We send a message to another running thread and get back a ValueTask that can be awaited by the caller to get the notification that the other thread has finished processing the message. However the other thread is pre-existing and already doing work. It receives the message by a completely other way; processes the message, then needs to tell the threadpool-origin thread that it finished. Thus; IValueTaskSource

There is no stock ValueTaskSource (not getting into whether or not there should be; however in this case a stock version would be of questionable utility). What we actully have looks very much like this:

class Message : IValueTaskSource {
    public ValueTask Send()
    {
        /* how the message is sent is irrelevant */
        return new ValueTask(this, 0);
    }

    private Action<object> continuation;
    private object continuationState;

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short _, ValueTaskSourceOnCompletedFlags __)
    {
         lock(this) {
              if (GetStatus(_) == ValueTaskSourceStatus.Pending)
              {
                  this.continuation = continuation;
                  this.continuationState = state;
                  return;
              }
              continuation(continuationState); /* Suspect */
         }
    }

    public void SetCompleted()
    {
        lock (this)
        {
             /* set state completed omitted for brevity */
             continuation?.Invoke(continuationState); /* Suspect */             
        }
    }
}

I think I'm doing this wrong. Imagine a large chain of these; it seems like it would build up too much stack. In particular, the lines marked /* Suspect */ are exactly that; and ValueTaskSourceOnCompletionFlags is unused. Although it does have the nicety in that an exception thrown by continuation always goes somewhere; assuming that's even a real issue.

Right now, the code works because there are only three of them and the continuations that use them are very much thread agnostic which thread they are on.

2

There are 2 best solutions below

6
On BEST ANSWER

Based on the link to ManualResetValueTaskSource provided by Stephen Cleary and the corresponding source code I was able to produce an answer.

ManualResetValueTaskSourceCore<T> provides a complete implementation of IValueTaskSource<T> and IValueTaskSource<T>. This is currently a case of there's no void implementation so create a void implementation with a dummy type. There's some generalized debate on whether bool or object is the best dummy type but I think it doesn't really matter because member padding of T will force alignment anyway.

So the answer is to forward all the methods.

    public ValueTask Send()
    {
        /* how the message is sent is irrelevant */
        return CraeteValueTask();
    }

    private ManualResetValueTaskSourceCore<object> taskSource;

    private ValueTask CreateValueTask() => new ValueTask(this, taskSource.Version);
    public ValueTaskSourceStatus GetStatus(short version) => taskSource.GetStatus(version);
    public void OnCompleted(Action<object> continuation, object state, short version, ValueTaskSourceOnCompletedFlags flags) => taskSource.OnCompleted(continuation, state, version, flags);
    public void SetCompleted() => taskSource.SetResult(null);

In this case each message is in its own object so there's no pooling. Doesn't matter. Calling the existing implementation is so much easier than trying to write down the smallest correct implementation that it's still the better way.

I'm pretty sure if I were pooling value task sources the correct way would be to call Reset() inside CreateValueTask().

0
On

Here is an example using the INotifyCompletion interface to get the notification, instead of the heavier IValueTaskSource+ValueTask mechanism. The Message class is amended with just one additional instance field, an Action, and it has become awaitable by exposing a GetAwaiter method. Each Message instance is intended to be awaited only once.

public class Message : INotifyCompletion
{
    private static readonly Action _completedSentinel = new(() => { });
    private Action _continuation;

    public Message GetAwaiter() { return this; }
    public bool IsCompleted
        => ReferenceEquals(Volatile.Read(ref _continuation), _completedSentinel);
    public void OnCompleted(Action continuation)
    {
        Action original = Interlocked.CompareExchange(ref _continuation,
            continuation, null);
        if (original is null) return; // Normal case
        if (ReferenceEquals(original, _completedSentinel))
            continuation(); // Rare case
        else
            throw new InvalidOperationException("Double await");
    }
    public void GetResult() { }

    public void SetCompleted()
    {
        Action continuation = Interlocked.Exchange(ref _continuation,
            _completedSentinel);
        if (continuation is null) return;
        ThreadPool.QueueUserWorkItem(state => ((Action)state).Invoke(), continuation);
    }
}

Online demo.

The static _completedSentinel field is used in order to resolve a race condition that might occur, between the thread that awaits and the thread that invokes the SetCompleted method. Normally the await will happen first, but the implementation above will not break if it happens after the SetCompleted, or even if the SetCompleted is invoked between the IsCompleted/OnCompleted calls (these are called by the async/await machinery).