How to run the continuation of a task in the taskBuilder CE in the same thread of the code before the let! operation?

124 Views Asked by At

I am coding a long running operation in FSharp's taks CE as follows

let longRunningTask = Task.Run(...)


// Now let's do the rest of the multi-tasking program

task {
  DO SOMETHING
  let! result = longRunningTask
  DO SOMETHING ELSE
}

The problem is DO SOMETHING ELSE appears to be running on an arbitrary thread (as observed also by printing the current thread id), whereas I absolutely need it to run on the same thread as DO SOMETHING, as I don't want any other form of concurrency except for the longRunningTask.

I've tried in many ways to set the current synchronization context, creating first a unique value of that type, but that doesn't seem to affect the result.

2

There are 2 best solutions below

8
JL0PD On BEST ANSWER

It might be an overkill, but SynchronizationContext may help you. It's used to dispatch delegates to some threads. There's a lot of explanations on how it's working (search for ConfigureAwait(false)), so I'll focus on implementation

type ThreadOwningSyncCtx() =
    inherit SynchronizationContext()

    let _queue = new BlockingCollection<(SendOrPostCallback * obj)>()

    member _.DoWork(cancellationToken: CancellationToken) =
        while not cancellationToken.IsCancellationRequested do
            let (callback, state) = _queue.Take()
            callback.Invoke state
        ()

    override _.Post(callback, state) =
        _queue.Add((callback, state))
        ()

    override _.Send(callback, state) =
        let tcs = TaskCompletionSource()
        let cb s =
            callback.Invoke s
            tcs.SetResult()
        _queue.Add((cb, state))
        tcs.Task.Wait()
        ()

Notes on methods:

  • Post: Method which is executed on async path. This method is called from infrastructure of Task when C# await or F# let! do! completes asynchronously. Callback is queued to be completed sometime.

  • Send: Method which is executed on sync path. It's expected that callback will be executed before this method returns. For example when someone calls a CancellationTokenSource.Cancel or WPF's Dispatcher.Invoke or WinForms Control.Invoke

  • DoWork: Method which blocks current thread to execute all pending callback, because we can't just interrupt thread to perform some task, it must be waiting for it.

Usage:

let syncCtx = ThreadOwningSyncCtx()
// set current sync ctx, so every continuation is queued back to main thread.
// comment this line and `printThreadId` will return different numbers
SynchronizationContext.SetSynchronizationContext syncCtx

let printThreadId() =
    printfn "%d" Thread.CurrentThread.ManagedThreadId

// create cancellation token, so app won't run indefinitely
let cts = new CancellationTokenSource()

// task to simulate some meaningful work
task {
    printThreadId()
    do! Task.Yield() // this action always completes asynchronously
    printThreadId()

    cts.Cancel() // cancel token, so main thread can continue it's work
} |> ignore

// process all pending continuations
syncCtx.DoWork(cts.Token)
4
Brian Berns On

If you really need to make sure that the main computation occurs on a single thread, you can just avoid the computation expression entirely:

printfn "Do something (on thread %A)" Thread.CurrentThread.ManagedThreadId
let task = startLongRunningTask ()
let result = task.Result
printfn "Do something else (on the same thread %A)" Thread.CurrentThread.ManagedThreadId

Note that Result blocks the calling thread until the task is complete, which is the behavior you seem to want. (Even simpler: You could just run the long-running task on the main thread as well, but I assume there's some reason that's not desirable.)

Related Questions in F#