There is a subscription to an observable that sends out log messages. Some of the log messages come from other threads because they are are in F# async blocks. I need to be able to write out the messages from the main thread.
Here is the code that currently filters out many of the log messages because they are not on the main thread:
member x.RegisterTrace() =
Logging.verbose <- x.Verbose
let id = Threading.Thread.CurrentThread.ManagedThreadId
Logging.subscribe (fun trace ->
if id = Threading.Thread.CurrentThread.ManagedThreadId then
match trace.Level with
| TraceLevel.Warning -> x.WriteWarning trace.Text
| TraceLevel.Error -> x.WriteWarning trace.Text
| _ -> x.WriteObject trace.Text
else
Diagnostics.Debug.Write(sprintf "not on main PS thread: %A" trace)
)
I have various forms of using System.Threading.SynchronizationContent
.Current
, .SetSynchronizationConent
, .Send
, .Post
. I've also dabbled with System.Threading.Tasks.TaskScheduler.FromCurrentSynchronizationContext
. I've also tried Async.SwitchToContext
. No matter what I do, System.Threading.Thread.CurrentThread.ManagedThreadId
ends up being different and PowerShell complains. Am I going about this wrong?
Here is the work-in-progress pull request and more details about the problem.
UPDATE 2015-06-16 Tue 11:45 AM PST
@RCH Thank you, but using Async.SwitchToContext
to set the SynchronizationContext
does not appear to work. Here is the code and debugging output when I do Paket-Restore -Force
:
member x.RegisterTrace() =
let a = Thread.CurrentThread.ManagedThreadId
Logging.verbose <- x.Verbose
let ctx = SynchronizationContext.Current
Logging.subscribe (fun trace ->
let b = Thread.CurrentThread.ManagedThreadId
async {
let c = Thread.CurrentThread.ManagedThreadId
do! Async.SwitchToContext ctx
let d = Thread.CurrentThread.ManagedThreadId
Debug.WriteLine (sprintf "%d %d %d %d %s" a b c d trace.Text)
} |> Async.Start
)
An expert at work recommended another solution that I'm going to try that involves passing in the context when subscribing.
UPDATE 2015-06-16 Tue 5:30 PM PST
I got help creating an IObservable.SubscribeOn that allows the SynchrnonizationContext to be passed in. Unfortunately, it doesn't solve the problem eithe, but may be part of the solution. May be a custom SynchronizationContext is needed like that SingleThreadSynchrnonizationContext. I would love help making one, but before I do that, I'm going to try out System.Reactive's Observable.ObserveOn(Scheduler.CurrentThread)
.
UPDATE 2015-06-16 Tue 8:30 PM PST
I haven't been able to get Rx to work either. Scheduler.CurrentThread
doesn't behave the way I was hoping. I then tried out these changes and the callback doesn't get called.
member x.RegisterTrace() =
Logging.verbose <- x.Verbose
let a = Threading.Thread.CurrentThread.ManagedThreadId
let ctx = match SynchronizationContext.Current with null -> SynchronizationContext() | sc -> sc
let sch = SynchronizationContextScheduler ctx
Logging.event.Publish.ObserveOn sch
|> Observable.subscribe (fun trace ->
let b = Threading.Thread.CurrentThread.ManagedThreadId
Debug.WriteLine(sprintf "%d %d %s" a b trace.Text)
A custom SynchronizationContext may be what is needed. :/
I ended up creating an
EventSink
that has a queue of callbacks that are executed on the main PowerShell thread viaDrain()
. I put the main computation on another thread. The pull request has the full code and more details.