My Genserver consumer looks like so:
defmodule Consumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :ok, subscribe_to: [Broadcaster]}
end
def handle_events(documents, _from, state) do
for document <- documents do
Task.async(fn -> Processor.process_document(document) end)
end
{:noreply, [], state}
end
def handle_info({_reference, {_int_state, _msg}}, state) do
{:noreply, [], state}
end
def handle_info({:DOWN, _reference, :process, pid, int_state}, state) do
{:stop, int_state, state}
end
end
I want to "split" Processor
tasks (i.e., the tasks in handle_events/3
) across multiple machines if the preceding one is busy, like, waiting for a task to finish. How do I go about doing that?
I've read the guide on distributed tasks, I'm just not sure how to design it so that 1. it splits across machines, and 2. it knows when to split i.e., when it's busy:
task = Task.Supervisor.async {Something, :"server@other-machine"}, fn ->
{:ok, node()}
end
I know it involves some arrangement like this but I'm not sure where to start. Anyone have any experience with this kinda thing?
Update
The reason I need this is that Processor.process_document(document)
takes ~30 seconds to complete, and only one can be run at a time. Adding a second node capable of taking half the workload essentially halves the processing time.