I am using Bokeh server to plot online data going from a sensor. I wrote a multiprocessing.Process
subclass that reads data from the sensor and then feeds it through a pipe to another subclass that should update a Bokeh figure with the incoming data.
How to make plotting Process
asynchronously read from the pipe and plot it to Bokeh?
Consider the Grapher
class, assuming another process already sends data to the input_pipe
:
from multiprocessing import Process
from bokeh.client import push_session
from bokeh.models import ColumnDataSource
from bokeh.plotting import curdoc, figure
from functools import partial
class Grapher(Process):
def __init__(self, name, input_pipe):
super(Grapher, self).__init__(name=name, daemon=True)
self.input_pipe = input_pipe
self.doc = curdoc()
self.source = ColumnDataSource(dict(time=[], value=[])
self.fig = figure()
self.fig.line(source=self.source, x='time', y='value')
self.doc.add_root(self.fig)
self.session = push_session(self.doc) # To keep session updated.
def run(self):
while True:
time, value = self.input_pipe.recv()
self.doc.add_next_tick_callback(partial(self.update, time, value))
@gen.coroutine
def update(self, time, value):
self.source.stream(dict(time=[time], value=[value]))
Code here is adopted from the example in Bokeh docs. Separately I run bokeh serve
so that curdoc()
has something to connect to. From the Bokeh server logs I see that connection is there.
However, the problem seems to be that update
does not really get executed after the next tick of the while
loop here. I can check that by adding a logging message inside update
, which gets never printed out.
If I try to change add_next_tick_callback
to just update
call, the function is indeed run, but the session does not plot anything.
What can cause the issue here? Code looks logical, and I cannot find using docs where this approach may fail.
Thanks.
When using the
bokeh.client
approach to apps, if you want events to be serviced, and callbacks to be called when they happen, then you have to call the blocking functionsession.loop_until_closed()
at the end. That's what keeps the thing that monitors events and calls callbacks, actually staying around, in order to monitor events and call callbacks.If calling a blocking function is problematic, and you don't want to use a
bokeh serve app.py
style of app, then you can try callingsession.loop_until_closed()
in another thread (not 100% this will work), or start your own Tornado ioloop and piggy back a Bokeh server app on that directly. That technique will be demonstrated better in the upcoming0.12.4
release, but you may find this notebook a useful reference for now:https://gist.github.com/bryevdv/ff84871fcd843aceea4f0197be9c57e0
Note that that notebook is a proof of concept, you will need a
0.12.4
dev release for it to work, and some of the details of usage may change.