Skip to content

Instantly share code, notes, and snippets.

@aspett
Last active May 19, 2021 21:31
Show Gist options
  • Save aspett/aea2861356ae3a36886aea097f2ca17f to your computer and use it in GitHub Desktop.
Save aspett/aea2861356ae3a36886aea097f2ca17f to your computer and use it in GitHub Desktop.
defmodule BlockingBuffer do
@moduledoc """
A process which holds a queue and blocks when the length of the queue
exceeds a predefined number (100)
"""
use GenServer
@max_buffer 100 # Tweak as required, or move into opts
# GenServer startup
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(__opts) do
{:ok, {:queue.new(), 0}}
end
# Public API
@spec push(any) :: :ok | no_return
def push(message) do
case GenServer.call(__MODULE__, {:push, message}) do
:wait ->
wait_time = 100 # ms - consider tweaking and adding jitter
:timer.sleep(wait_time) # Calling process (KafkaEx consumer) sleeps
throttled_push(message) # Try again!
:pushed ->
:ok
end
end
@spec drain(pos_integer) :: list(any)
def drain(demand) do
GenServer.call(__MODULE__, {:drain, demand})
end
# Message callbacks
@impl true
def handle_call({:push, _message}, _from, {_queue, count} = state) when count > @max_buffer do
{:reply, :wait, state}
end
def handle_call({:push, message}, _from, {queue, count}) do
{:reply, :pushed, {:queue.in(message, queue), count + 1}}
end
def handle_call({:drain, demand}, _from, {queue, count}) when demand > 0 do
{items, queue} =
Enum.reduce((0..demand), {[], queue}, fn _i, {items, queue} ->
case :queue.out(queue) do
{{:value, value}, queue} ->
{[value | items], queue}
{:empty, queue} ->
{items, queue}
end
end)
{:reply, Enum.reverse(items), {queue, count - length(items)}}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment