Skip to content

Events from internal buffer are not sent after consumer reconnect #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
stefanc18 opened this issue Apr 29, 2025 · 2 comments
Open

Events from internal buffer are not sent after consumer reconnect #311

stefanc18 opened this issue Apr 29, 2025 · 2 comments

Comments

@stefanc18
Copy link

Hi,

We've encountered an unexpected behavior with GenStage's buffering. According to the Buffering docs, events will be placed in the internal buffer whenever the producer tries to send an event but there are no consumers, and when the consumers come back available they will receive the buffered events.

However it seems that this is not happening as expected. When a consumer goes down, events are queued, then the consumer comes back up, the events from the buffer are not sent anymore, but kept in the buffer continuously.

Here is an example to reproduce the issue. We have a producer and a consumer on 2 different nodes.

producer.ex:

defmodule Producer do
  use GenStage

  def start_link(_args) do
    GenStage.start_link(__MODULE__, [])
  end

  def init(_args) do
    {:producer, %{events: [], pending_demand: 0}}
  end

  def handle_cast({:add_event, event}, state) do
    new_events = state.events ++ [event]

    {events_to_dispatch, remaining_events} = Enum.split(new_events, state.pending_demand)

    events_to_dispatch_count = length(events_to_dispatch)
    new_state = %{state | events: remaining_events, pending_demand: state.pending_demand - events_to_dispatch_count}

    {:noreply, events_to_dispatch, new_state}
  end

  def handle_demand(demand, state) do
    {events_to_dispatch, remaining_events} = Enum.split(state.events, demand)

    events_to_dispatch_count = length(events_to_dispatch)
    new_state = %{state | events: remaining_events, pending_demand: state.pending_demand + demand - events_to_dispatch_count}

    {:noreply, events_to_dispatch, new_state}
  end
end

consumer.ex:

defmodule Consumer do
  use GenStage

  def start_link(consumer_index) do
    GenStage.start_link(__MODULE__, consumer_index)
  end

  @impl GenStage
  def init(_args) do
    {:consumer, :ok, subscribe_to: [{{:global, :producer}, max_demand: 1, min_demand: 0}]}
  end

  @impl GenStage
  def handle_events(events, _from, state) do
    IO.inspect("Handling events: #{inspect(events)}")

    {:noreply, [], state}
  end
end

These files were placed in a mix project. Steps taken to reproduce:

  1. Open 2 terminals
  2. Run iex --sname node_1 -S mix run --no-halt in the first terminal and iex --sname node_2 -S mix run --no-halt in the second terminal
  3. Start the producer in the first terminal: GenStage.start_link(Producer, [], name: {:global, :producer})
  4. Connect nodes and start the consumer from the second terminal: Node.connect(:"node_1@myuser") and GenStage.start_link(Consumer, :ok)
  5. Send an event from the first terminal to verify that it works: GenServer.whereis({:global, :producer}) |> GenServer.cast({:add_event, "event1"}). The log from the consumer should be visible in the second terminal.
  6. Stop the second node (Ctrl+C in the second terminal); now the consumer is stopped and we have only the producer running.
  7. Send another event from the first terminal (while there is no consumer).
  8. Run GenServer.whereis({:global, :producer}) |> :sys.get_state() in the first terminal and notice that the event we just sent is now in the internal buffer of the producer (for example: buffer: {{["event2"], []}, 1, 10000})
  9. Start again the consumer by repeating step 4 Notice that even the consumer is now back up, it didn't receive the event sent at step 7. By running GenServer.whereis({:global, :producer}) |> :sys.get_state() in the first terminal, notice how the event is still in the internal buffer.

The expectation, at least from my understanding in the docs, is that as soon as the consumer gets connected back to the producer, it should receive the event from the buffer. It does not. The consumer basically becomes stale, not demanding anymore events after this.

I investigated a bit the dispatcher code and noticed that if I change this line of code

from this:

{:ok, 0, {demands, current + pending, max, shuffle_demand}}

to this:

{:ok, 0, {demands, pending, max, shuffle_demand}}

the problem is fixed. I'm not sure if this is the correct fix and what other implications this change has, but changing this will keep the pending to 0, so when the consumer reconnects and asks again for an event, the event from the buffer will be sent & received by the consumer.

Creating this issue as this looks like a bug in the dispatcher's implementation, but maybe I'm missing something. Looking forward to your thoughts.

@josevalim
Copy link
Member

Thank you for the issue. Can you please open up a PR? If you can add some dispatcher tests, even better :)

@stefanc18
Copy link
Author

stefanc18 commented May 5, 2025

I opened up a PR: #312 and also added some tests for this behavior, feel free to have a look when you have the time. Thank you!

I took a different approach than the one I mentioned in this issue so both these cases work:

Case 1 (worked before my change too):

  1. Consumer subscribes and asks for event
  2. No events are dispatched from the producer
  3. Consumer cancels then subscribes again
  4. No events are demanded because they were already demanded

Case 2 (didn't work before, now works after my changes from the PR):

  1. Consumer subscribes and asks for event
  2. No events are dispatched from the producer
  3. Consumer cancels
  4. Producer dispatches an event => it gets added to the internal buffer, as there are no consumers available
  5. Consumer subscribes again
  6. An event is demanded because there was already one in the internal buffer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants