Post messages from async threads to main thread in F#

702 Views Asked by At

There is a subscription to an observable that sends out log messages. Some of the log messages come from other threads because they are are in F# async blocks. I need to be able to write out the messages from the main thread.

Here is the code that currently filters out many of the log messages because they are not on the main thread:

member x.RegisterTrace() =
    Logging.verbose <- x.Verbose
    let id = Threading.Thread.CurrentThread.ManagedThreadId
    Logging.subscribe (fun trace ->
        if id = Threading.Thread.CurrentThread.ManagedThreadId then
            match trace.Level with
            | TraceLevel.Warning -> x.WriteWarning trace.Text
            | TraceLevel.Error -> x.WriteWarning trace.Text
            | _ -> x.WriteObject trace.Text
        else
            Diagnostics.Debug.Write(sprintf "not on main PS thread: %A" trace)
    )

I have various forms of using System.Threading.SynchronizationContent .Current, .SetSynchronizationConent, .Send, .Post. I've also dabbled with System.Threading.Tasks.TaskScheduler.FromCurrentSynchronizationContext. I've also tried Async.SwitchToContext. No matter what I do, System.Threading.Thread.CurrentThread.ManagedThreadId ends up being different and PowerShell complains. Am I going about this wrong?

Here is the work-in-progress pull request and more details about the problem.

UPDATE 2015-06-16 Tue 11:45 AM PST

@RCH Thank you, but using Async.SwitchToContext to set the SynchronizationContext does not appear to work. Here is the code and debugging output when I do Paket-Restore -Force:

member x.RegisterTrace() =
    let a = Thread.CurrentThread.ManagedThreadId
    Logging.verbose <- x.Verbose
    let ctx = SynchronizationContext.Current
    Logging.subscribe (fun trace ->
        let b = Thread.CurrentThread.ManagedThreadId
        async {
            let c = Thread.CurrentThread.ManagedThreadId
            do! Async.SwitchToContext ctx
            let d = Thread.CurrentThread.ManagedThreadId
            Debug.WriteLine (sprintf "%d %d %d %d %s" a b c d trace.Text)
        } |> Async.Start
    )

Debug Output

An expert at work recommended another solution that I'm going to try that involves passing in the context when subscribing.

UPDATE 2015-06-16 Tue 5:30 PM PST

I got help creating an IObservable.SubscribeOn that allows the SynchrnonizationContext to be passed in. Unfortunately, it doesn't solve the problem eithe, but may be part of the solution. May be a custom SynchronizationContext is needed like that SingleThreadSynchrnonizationContext. I would love help making one, but before I do that, I'm going to try out System.Reactive's Observable.ObserveOn(Scheduler.CurrentThread).

UPDATE 2015-06-16 Tue 8:30 PM PST

I haven't been able to get Rx to work either. Scheduler.CurrentThread doesn't behave the way I was hoping. I then tried out these changes and the callback doesn't get called.

member x.RegisterTrace() =
    Logging.verbose <- x.Verbose
    let a = Threading.Thread.CurrentThread.ManagedThreadId
    let ctx = match SynchronizationContext.Current with null -> SynchronizationContext() | sc -> sc
    let sch = SynchronizationContextScheduler ctx
    Logging.event.Publish.ObserveOn sch
    |> Observable.subscribe (fun trace ->
        let b = Threading.Thread.CurrentThread.ManagedThreadId
        Debug.WriteLine(sprintf "%d %d %s" a b trace.Text)

A custom SynchronizationContext may be what is needed. :/

2

There are 2 best solutions below

0
On BEST ANSWER

I ended up creating an EventSink that has a queue of callbacks that are executed on the main PowerShell thread via Drain(). I put the main computation on another thread. The pull request has the full code and more details.

enter image description here

enter image description here

3
On

For UI applications (Forms, WPF, F# interactive, ...) selecting SynchronizationContext.Current and Async.SwitchToContext from your trials as well as borrowing some code from Paket is enough.

For console applications, however, there is no SynchronizationContext and thus no thread the continuations can be Posted to, so they will end up on the thread pool. A possible workaround is found on MSDN Blogs.

Solution for UI applications only:

Having

module Logging

open System.Diagnostics

type Trace = { Level: TraceLevel; Text: string }

let public event = Event<Trace>()

let subscribe callback = Observable.subscribe callback event.Publish

and

[<AutoOpen>]
module CmdletExt

open System.Diagnostics
open System.Threading

type PSCmdletStandalone() =
    member x.RegisterTrace() =
        let syncContext = SynchronizationContext.Current
        Logging.subscribe (fun trace ->
            async {
                do! Async.SwitchToContext syncContext
                let threadId = Thread.CurrentThread.ManagedThreadId
                match trace.Level with
                | TraceLevel.Warning -> printfn "WARN (on %i): %s" threadId trace.Text
                | TraceLevel.Error -> printfn "ERROR (on %i): %s" threadId trace.Text
                | _ -> printfn "(on %i): %s" threadId trace.Text
            } |> Async.Start // or Async.StartImmediate
        )

then, registering on the main thread

#load "Logging.fs"
#load "SO.fs"

open System.Diagnostics
open System.Threading
open System

(new PSCmdletStandalone()).RegisterTrace()

printfn "Main: %i" Thread.CurrentThread.ManagedThreadId

for i in Enum.GetValues(typeof<TraceLevel>) do
    async {
        let workerId = Thread.CurrentThread.ManagedThreadId
        do Logging.event.Trigger { Level = unbox i; Text = sprintf "From %i" workerId }
    } |> Async.Start

yields e.g.

Main: 1
WARN (on 1): From 23
(on 1): From 25
ERROR (on 1): From 22
(on 1): From 13
(on 1): From 14