Last active
September 25, 2018 09:58
-
-
Save siyomai/7c20416f17d94bd96b31ff96a502075a to your computer and use it in GitHub Desktop.
GenServersProducerConsumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule QueueGenserver do | |
use GenServer | |
@moduledoc """ | |
Documentation for ProducerConsumerGenserver. | |
""" | |
def start_link(producers \\ 120, consumers \\ 120, size \\ 10) do | |
GenServer.start_link(__MODULE__, {[], size}) | |
for id <- 1..producers, do: ProducerGenserver.start_link(id, self()) | |
for id <- 1..consumers, do: ConsumerGenserver.start_link(id, self()) | |
end | |
def produce(pid, job, producer_id, size) do | |
GenServer.cast(pid, {:produce, job, producer_id, size}) | |
end | |
def consume(pid, jobs, consumer_id}) do | |
GenServer.call(pid, {:consume, consumer_id, jobs, size}) | |
end | |
#callbacks | |
def handle_cast({:consume, consumer_id, jobs, size}, _from, state) do | |
consume_job(jobs, consumer_id, size) | |
end | |
def handle_cast({:produce, job, producer_id, size}, state) do | |
process_job({:job, job, producer_id}) | |
end | |
def handle_info(:work, state) do | |
IO.inspect state | |
end | |
defp consume_job(jobs, consumers, size) when jobs == [] do | |
{:nojobs, "there are no jobs available"} | |
end | |
defp consume_job([job | jobs], consumers, size) do | |
process_job(job, jobs, consumers, size) | |
end | |
defp process_job({:job, job, producer_id}, jobs, consumers, size) when length(jobs) >= size and consumers == [] do | |
IO.puts "Q #{stats(jobs, consumers)}: Full. Discarding job #{job} from PRODUCER #{producer_id}" | |
loop(jobs, consumers, size) | |
end | |
defp process_job({:job, job, producer_id} = new_job, jobs, consumers, size) when consumers == [] do | |
jobs = [new_job | jobs] | |
IO.puts "Q #{stats(jobs, consumers)}: Queueing job #{job} from PRODUCER #{producer_id}" | |
loop(jobs, consumers, size) | |
end | |
defp process_job({:job, job, producer_id} = new_job, jobs, consumers, size) do | |
[{:consumer, consumer_id, consumer_pid} | consumers] = consumers | |
IO.puts "Q #{stats(jobs, consumers)}: Sending job #{job} from PRODUCER #{producer_id} to CONSUMER #{consumer_id}" | |
send(consumer_pid, new_job) | |
loop(jobs, consumers, size) | |
end | |
defp loop(jobs, consumers, size) do | |
Process.send_after(self(), :work, 2 * 60 * 60 * 1000) | |
end | |
# def loop({id, q}) do | |
# :random.seed(System.os_time) | |
# job = :random.uniform(10) * 100 | |
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Generating job #{job}" | |
# Process.sleep(job) | |
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Sending job #{job} to the queue" | |
# send(q, {:producer, job, id}) | |
# GenServer.cast(Producer, {:add, unit}) | |
# loop(id, q) | |
# end | |
end | |
defmodule ConsumerGenserver do | |
use GenServer | |
@moduledoc """ | |
Documentation for ProducerConsumerGenserver. | |
""" | |
def start_link(id, q) do | |
GenServer.start_link(__MODULE__, {id, q}) | |
QueueGenserver.consume(q, {:consume, id}) | |
end | |
#callbacks | |
def handle_call(:consume, _from, state) do | |
IO.inspect state | |
end | |
# def loop({id, q}) do | |
# :random.seed(System.os_time) | |
# job = :random.uniform(10) * 100 | |
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Generating job #{job}" | |
# Process.sleep(job) | |
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Sending job #{job} to the queue" | |
# send(q, {:producer, job, id}) | |
# GenServer.cast(Producer, {:add, unit}) | |
# loop(id, q) | |
# end | |
end | |
defmodule ProducerGenserver do | |
use GenServer | |
@moduledoc """ | |
Documentation for ProducerConsumerGenserver. | |
""" | |
def start_link(producer_id, q) do | |
GenServer.start_link(__MODULE__, {producer_id, q}, name: Producer) | |
end | |
# callbacks | |
def handle_cast({:produce, job}, state) do | |
IO.inspect state | |
IO.inspect job | |
end | |
# def loop({id, q}) do | |
# :random.seed(System.os_time) | |
# job = :random.uniform(10) * 100 | |
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Generating job #{job}" | |
# Process.sleep(job) | |
# IO.puts "\t\t\t\t\tPRODUCER #{id}: Sending job #{job} to the queue" | |
# send(q, {:producer, job, id}) | |
# GenServer.cast(Producer, {:add, unit}) | |
# loop(id, q) | |
# end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment