I'd like to be able to dynamically change a compute pipeline at runtime, but it seems that GenStage requires the compute graph to be defined at compile time through the subscribe_to: [...]
mechanism. Is there a way to create dynamic compute graphs? For example in the below, I'd like to switch, at runtime, between the "subtract 7" and "subtract 4" vertices in my pipeline graph.
Is this possible using GenStage? I will likely have very complex pipelines so I need a solution that scales to changing graphs in complex ways, as opposed to ad-hoc solutions such as in this case, say, parameterising the integer to subtract. I'd like to be able to add or remove entire sub-trees, switch between subtrees, and add nodes into the graph including splicing them into the middle of any subtree including the main tree.
Please refer to EDIT further down
Here is the initial producer:
defmodule GenstageTest.Producer do
use GenStage
def start_link(initial \\ 1) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
end
Here is one of the producer_consumers:
defmodule GenstageTest.PcTimesFive do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state, subscribe_to: [GenstageTest.PcAddOne]}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.map(&(&1 * 5))
{:noreply, numbers, state}
end
end
and here is the final consumer:
defmodule GenstageTest.Consumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state, subscribe_to: [GenstageTest.PcDivTwo]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect({self(), event, state})
end
# As a consumer we never emit events
{:noreply, [], state}
end
end
I It is all modelled off the Elixir School Genstage tutorial.
All the modules and the mix.exs can be found on github.
EDIT 3 days later after partial answer from @AquarHEAD L.
I have managed to get runtime subscriptions working. Here are some modified producers, producer_consumers, and consumers respectively:
Producer:
defmodule GenstageTest.Producer do
use GenStage
def start_link(initial \\ 1) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
def handle_info({:doprint}, state) do
IO.puts "yep"
{:noreply, [], state}
end
def handle_info({:cancel, sublink}, state) do
GenStage.cancel sublink, []
{:noreply, [], state}
end
end
Producter_consumer:
defmodule GenstageTest.PcAddOne do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.map(&(&1 + 1))
{:noreply, numbers, state}
end
end
Consumer:
defmodule GenstageTest.Consumer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect event
#File.write("/home/tbrowne/scratch/output.txt",
# Kernel.inspect(event) <> " ", [:append])
:timer.sleep(100)
end
# As a consumer we never emit events
{:noreply, [], state}
end
end
Now once these are all available in the lib directory (remember to add {:gen_stage, "~> 0.11"}
to your mix.exs deps), or copied and pasted into IEX, then the following will work perfectly:
{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1, cancel: :transient)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1, cancel: :transient)
The issue now is though, that I still don't know how to cancel the subscription. There is a cancel function and there is also a stop function. GenStage.stop(c)
for example seems to do nothing, while my various attempts at GenStage.cancel/3
only give errors.
To recap, what I need now is to be able to stop certain stages and replace them with others. What is the syntax for cancelling a subcsription, and from where is it called? It is not well explained in the docs as there is no concrete example.
Why not implement your own
GenStage.Dispatcher
? Here is behaviour