How to test the Elixir GenStage Consumer?

956 Views Asked by At

I found some resources on how to test the producer, however there is nothing I could find which shows how to test the Consumer.

In producer, I create a dummy consumer and everything works fine, however in consumer I am struggling with testing.

defmodule DataProducer do
      use GenStage

      def start_link([]) do
        GenStage.start_link(__MODULE__, 0, name: __MODULE__)
      end

      # {:queue.new, demand, size}
      def init(counter) do
        {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
      end

      def handle_demand(demand, state) do 
        events = Enum.to_list(state..state + demand + 1)
        # Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
        {:noreply, events, (state + demand)}
      end
    end

Producer Test:

 defmodule DataProducerTest do
      use ExUnit.Case

      test "check the results" do
        {:ok, stage} = DataProducer.start_link([])
        {:ok, _cons} = TestConsumer.start_link(stage)
        assert_receive {:received, events}
        GenStage.stop(stage)
      end

    end

    defmodule TestConsumer do
      def start_link(producer) do
        GenStage.start_link(__MODULE__, {producer, self()})
      end
      def init({producer, owner}) do
        {:consumer, owner, subscribe_to: [producer]}
      end
      def handle_events(events, _from, owner) do
        send(owner, {:received, events})
        {:noreply, [], owner}
      end
    end

And consumer:

defmodule DataConsumer do
  use GenStage
  def start_link([]) do
    GenStage.start_link(__MODULE__, :any_state)
  end
  def init(state) do
    {:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
  def handle_events(events, _from, state) do
    for event <- events do
      # :timer.sleep(250)
      Logger.info inspect( {self(), event, state} )
    end
    {:noreply, [], state}
  end
end

Thank you in advanced.

2

There are 2 best solutions below

0
On BEST ANSWER

In the test for the consumer:

 test "should behave like consumer" do
    {:ok, producer} = DummyProducer.start_link(1)
    {:ok, consumer} = Consumer.start_link(producer)
    Process.register self, :test
    assert_receive {:called_back, 10}
  end

Now DummyProducer

defmodule DummyProducer do
  use GenStage

  def start_link(demand) do
    GenStage.start_link(__MODULE__, demand)
  end

  def init(demand) do
    {:producer, demand}
  end

  def handle_demand(demand, counter) when demand > 0 do
    events = Enum.to_list(counter..counter+demand-1)
    Process.send_after(self(), {:stop, demand}, 1)
    {:noreply, events, demand + counter}
  end

  def handle_info({:stop, demand}, state) do
    send :test, {:called_back, demand}
    {:stop, :normal, demand}
  end
end

I think,

The point of Testing the consumer is checking if consumer can send the demand and stick with max demand allocated in the subscription.

3
On

No reason to use ex_mock here. It would much easier if you made the producer your consumer is subscribing to an argument like so:

defmodule DataConsumer do
  use GenStage

  def start_link(producer) do
    GenStage.start_link(__MODULE__, producer)
  end

  def init(producer) do
    {:consumer, state, subscribe_to: [{producer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
end

Then you could have a TestProducer:

defmodule TestProducer
  use GenStage

  def notify(pid, event) do
    GenServer.cast(pid, {:notify, event})
  end

  def start_link do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end

  def handle_cast({:notify, event}, state) do
    {:noreply, [event], state}
  end
end

And subscribe to it in your test and assert on the expected outcome:

defmodule DataConsumerTest do
  use ExUnit.Case

  test "consumes events" do
    {:ok, pid} = TestProducer.start_link()
    DataConsumer.start_link(pid)
    TestProducer.notify(%{data: :event_data})

    # assert thing you expected to happen happens
  end
end

TLDR; If you're working with a lot of different consumers in your codebase, a manual/test event producer is a must. A consumer does not really care what a producer does to produce events, just that it can subscribe to and consume them. So your tests just need to make sure the consumer is able to receive events from any producer and you can send them the correct events its looking for in the tests.