Created
March 4, 2025 02:42
-
-
Save hunterboerner/867d62bd6aaa5ef3dde759338eb73470 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule TheronsErp.Scheduler do | |
@moduledoc """ | |
The Scheduler module is responsible for scheduling POs, MOs, | |
and movements based on route rules. | |
# How it works | |
The scheduler takes all of the BoMs and ensures there are no cycles. Then the | |
scheduler creates processes for each location. | |
""" | |
require Ash.Query | |
alias TheronsErp.Scheduler.SchedulerAgent | |
def schedule() do | |
# TheronsErp.Repo.transaction(fn -> | |
# TODO ensure no cycles. For now just set a timeout. | |
locations = | |
TheronsErp.Inventory.Location | |
|> Ash.read!() | |
movements = | |
TheronsErp.Inventory.Movement | |
|> Ash.Query.filter(state == :ready) | |
|> Ash.read!() | |
location_map = | |
for location <- locations, into: %{} do | |
{:ok, pid} = SchedulerAgent.start_link(location_id: location.id) | |
Ecto.Adapters.SQL.Sandbox.allow(TheronsErp.Repo, self(), pid) | |
{location.id, pid} | |
end | |
# Link agents by routes. Products have one route whereas routes have many products. | |
products = | |
TheronsErp.Inventory.Product | |
|> Ash.Query.for_read(:list) | |
|> Ash.read!(load: [:routes]) | |
# Initialize each location with its actual product inventory | |
init_products(location_map, locations, products) | |
for movement <- movements do | |
SchedulerAgent.add_movement(location_map[movement.to_location_id], movement) | |
end | |
purchase_orders = | |
TheronsErp.Purchasing.PurchaseOrder | |
|> Ash.Query.filter(state == :created) | |
|> Ash.read!(load: [items: [:replenishment]]) | |
sales_orders = | |
TheronsErp.Sales.SalesOrder | |
|> Ash.Query.filter(state in [:ready, :invoiced]) | |
|> Ash.read!(load: [sales_lines: [:pull_location, product: [:routes, :replenishment]]]) | |
purchase_orders = | |
sales_orders | |
|> Enum.sort_by(& &1.process_date, Date) | |
|> Enum.flat_map(fn %{sales_lines: lines, process_date: date} -> | |
Enum.map(lines, &Map.put(&1, :date, date)) | |
end) | |
|> Enum.reduce(purchase_orders, fn sales_line, pos -> | |
pull_up_pos(pos, sales_line.product_id, | |
by_date: sales_line.date, | |
quantity: Decimal.new(sales_line.quantity) | |
) | |
end) | |
IO.inspect(purchase_orders, label: "after POs") | |
for purchase_order <- purchase_orders do | |
SchedulerAgent.add_purchase_order( | |
location_map[purchase_order.destination_location_id], | |
purchase_order | |
) | |
end | |
for {_loc, agent} <- location_map do | |
SchedulerAgent.process(agent) | |
end | |
for sales_order <- sales_orders do | |
for sales_line <- sales_order.sales_lines do | |
SchedulerAgent.add_sales_line(location_map[sales_line.pull_location_id], sales_line) | |
end | |
end | |
for {_loc, agent} <- location_map do | |
SchedulerAgent.generate_accounts(agent) | |
end | |
change_list = | |
for {_loc, agent} <- location_map do | |
SchedulerAgent.persist(agent) | |
end | |
|> List.flatten() | |
{:ok, ret} = | |
TheronsErp.Repo.transaction(fn -> | |
for change <- change_list do | |
case change do | |
{:update, changeset} -> | |
Ash.update!(changeset) | |
{:insert, changeset} -> | |
Ash.create!(changeset) | |
end | |
end | |
end) | |
# Shut down agents | |
for {_loc, agent} <- location_map do | |
GenServer.stop(agent) | |
end | |
end | |
defp init_products(location_map, locations, products) do | |
for product <- products do | |
for location <- locations do | |
id = | |
TheronsErp.Inventory.Movement.get_inv_identifier( | |
:actual, | |
location.id, | |
product.identifier | |
) | |
TheronsErp.Inventory.Movement.get_acct_id(id) | |
balance = | |
TheronsErp.Ledger.Account | |
|> Ash.get!(%{identifier: id}, | |
load: :balance_as_of | |
) | |
|> Map.get(:balance_as_of) | |
SchedulerAgent.set_product_inventory(location_map[location.id], {product, balance}) | |
end | |
routes = product.routes | |
for route <- routes do | |
if route do | |
for r <- route.routes do | |
to = location_map[r.to_location_id] | |
from = location_map[r.from_location_id] | |
if route.type == :push do | |
SchedulerAgent.add_to_peer(from, {r.to_location_id, product.id, to}) | |
else | |
SchedulerAgent.add_peer(to, {r.from_location_id, product.id, from}) | |
end | |
end | |
end | |
end | |
end | |
end | |
@doc """ | |
Takes a list of Purchase Orders (POs) and a specified `:quantity` and `:by_date` to move the POs up to. | |
""" | |
def pull_up_pos(poses, product_id, opts \\ []) do | |
{_, po_items} = | |
poses | |
|> Enum.sort_by(& &1.order_date, Date) | |
|> Enum.reduce({Keyword.get(opts, :quantity), []}, fn po, {acc_amt, acc_items} -> | |
qty_wanted = | |
if Decimal.compare(Decimal.new(0), acc_amt) == :lt do | |
acc_amt | |
else | |
Decimal.new(0) | |
end | |
items = | |
pull_up_poitem(po.items, product_id, po, po.order_date, | |
quantity: qty_wanted, | |
by_date: Keyword.get(opts, :by_date) | |
) | |
total = items |> Enum.map(& &1.quantity) |> Enum.reduce(Decimal.new(0), &Decimal.add/2) | |
{Decimal.sub(acc_amt, total), items ++ acc_items} | |
end) | |
# Delete line items that are gone | |
kept_items = po_items |> Enum.map(& &1.existing_line_item) |> Enum.reject(&is_nil/1) | |
record_items = Enum.flat_map(poses, & &1.items) |> Enum.map(& &1.id) | |
delete_items = record_items -- kept_items | |
for item <- Enum.flat_map(poses, & &1.items) do | |
if item.id in delete_items do | |
Ash.destroy!(item) | |
end | |
end | |
# Date to lead times map | |
lead_times_for_key = | |
for {key = {_date, _product_id, _vendor}, items} <- | |
Enum.group_by(po_items, &{&1.date, &1.product_id, &1.vendor_id}), | |
into: %{} do | |
leads = | |
for item <- items do | |
item.lead_time | |
end | |
{key, leads} | |
end | |
# We are consolidating purchase orders for the same vendor on the same day. | |
# We need to delete the extra POs. | |
filter_list = | |
for po_item <- po_items do | |
{po_item.date, po_item.lead_time, po_item.vendor_id, po_item.product_id, po_item.po_id} | |
end | |
cleaned_list = | |
Enum.group_by(filter_list, fn {date, lead_time, vendor_id, product_id, _} -> | |
{date, lead_time, vendor_id, product_id} | |
end) | |
|> Enum.map(fn {key, po_s = [_ | _]} -> | |
hs = Enum.map(po_s, fn {_, _, _, _, po_id} -> po_id end) |> Enum.reject(&is_nil/1) | |
{key, hs} | |
end) | |
cleaned_ids = | |
Enum.flat_map(cleaned_list, fn {_, po_ids} -> po_ids end) | |
|> Enum.reject(&is_nil/1) | |
|> Enum.uniq() | |
total_ids = Enum.map(po_items, & &1.po_id) |> Enum.reject(&is_nil/1) |> Enum.uniq() | |
del_po_ids = total_ids -- cleaned_ids | |
for del_po_id <- del_po_ids do | |
# try do | |
TheronsErp.Purchasing.PurchaseOrder | |
|> Ash.get!(del_po_id) | |
|> Ash.destroy!() | |
end | |
cleaned_keys = Enum.into(cleaned_list, %{}) | |
# Date to product map | |
date_and_lead_times = | |
for po_item <- po_items do | |
key = {po_item.date, po_item.lead_time, po_item.vendor_id, po_item.product_id} | |
po_id = | |
case cleaned_keys[key] do | |
[] -> nil | |
[p | _] -> p | |
end | |
if po_id do | |
[{key, po_id}] | |
else | |
for lead_time <- | |
lead_times_for_key[{po_item.date, po_item.product_id, po_item.vendor_id}] do | |
date = po_item.date | |
product_id = po_item.product_id | |
vendor_id = po_item.vendor_id | |
po = | |
TheronsErp.Purchasing.PurchaseOrder | |
|> Ash.Changeset.for_create(:create, %{ | |
order_date: date, | |
delivery_date: Date.add(date, lead_time), | |
destination_location_id: po_item.destination_location_id, | |
vendor_id: po_item.vendor_id | |
}) | |
|> Ash.create!() | |
{{date, lead_time, product_id, vendor_id}, po.id} | |
end | |
end | |
end | |
|> List.flatten() | |
|> Enum.into(%{}) | |
# Update the items that are kept | |
update_items = kept_items -- (kept_items -- record_items) | |
record_item_map = Enum.flat_map(poses, & &1.items) |> Enum.group_by(& &1.id) | |
modified_item_map = | |
Enum.reject(po_items, &is_nil(&1.existing_line_item)) | |
|> Enum.group_by(& &1.existing_line_item) | |
for item_id <- update_items do | |
[original] = record_item_map[item_id] | |
[modified] = modified_item_map[item_id] | |
po_id = | |
date_and_lead_times[ | |
{modified.date, modified.lead_time, modified.vendor_id, modified.product_id} | |
] | |
original | |
|> Ash.Changeset.for_update(:update, %{ | |
quantity: modified.quantity, | |
purchase_order_id: po_id | |
}) | |
|> Ash.update!() | |
end | |
# New items | |
new_items = | |
po_items |> Enum.filter(&is_nil(&1.existing_line_item)) | |
for item <- new_items do | |
purchase_order_id = | |
date_and_lead_times[ | |
{item.date, item.lead_time, item.product_id, item.vendor_id} | |
] | |
TheronsErp.Purchasing.PurchaseOrderItem | |
|> Ash.Changeset.for_create(:create, %{ | |
quantity: item.quantity, | |
unit_price: item.unit_price, | |
purchase_order_id: purchase_order_id, | |
product_id: item.product_id, | |
replenishment_id: item.replenishment_id | |
}) | |
|> Ash.create!() | |
end | |
# Create the items that are new | |
new_items = Enum.filter(po_items, &is_nil(&1.existing_line_item)) | |
for item <- new_items do | |
po_id = date_and_lead_times[{item.date, item.lead_time, item.vendor_id, item.product_id}] | |
TheronsErp.Purchasing.PurchaseOrderItem | |
|> Ash.Changeset.for_create(:create, %{ | |
quantity: item.quantity, | |
unit_price: item.unit_price, | |
purchase_order_id: po_id, | |
product_id: item.product_id, | |
replenishment_id: item.replenishment_id | |
}) | |
|> Ash.create!() | |
end | |
po_ids = | |
date_and_lead_times | |
|> Enum.map(fn {_, id} -> id end) | |
require Ash.Query | |
TheronsErp.Purchasing.PurchaseOrder | |
|> Ash.Query.filter(id in ^po_ids) | |
|> Ash.read!(load: [items: [:replenishment]]) | |
end | |
def pull_up_poitem(po_items, product_id, po, date, opts \\ []) | |
def pull_up_poitem([], _product_id, _po, _date, _opts) do | |
[] | |
end | |
def pull_up_poitem(po_items = [po_item | _], product_id, po, date, opts) do | |
lead_time = | |
if po_item.replenishment_id do | |
po_item.replenishment.lead_time_days | |
else | |
po_item.lead_time_days | |
end | |
multiple = | |
if po_item.replenishment_id do | |
po_item.replenishment.quantity_multiple | |
else | |
po_item.quantity_multiple | |
end | |
Keyword.validate(opts, [:quantity, :by_date]) | |
quantity = Keyword.get(opts, :quantity) | |
by_date = Keyword.get(opts, :by_date) | |
{_, poses} = | |
Enum.reduce(po_items, {Decimal.new(0), []}, fn one_pos, {acc, new_poses} -> | |
if one_pos.product_id == product_id do | |
if Decimal.compare(acc, quantity) in [:gt, :eq] do | |
# if acc >= quantity do | |
{acc, | |
[ | |
%{ | |
date: date, | |
quantity: one_pos.quantity, | |
existing_line_item: one_pos.id, | |
unit_price: one_pos.unit_price, | |
product_id: one_pos.product_id, | |
replenishment_id: one_pos.replenishment_id, | |
vendor_id: one_pos.replenishment.vendor_id, | |
destination_location_id: po.destination_location_id, | |
po_id: nil | |
} | |
| new_poses | |
]} | |
else | |
if Decimal.compare(Decimal.add(acc, one_pos.quantity), quantity) in [:gt, :eq] do | |
# if acc + one_pos.quantity >= quantity do | |
quantity_float = quantity |> Decimal.to_float() | |
one_pos_qty_float = one_pos.quantity |> Decimal.to_float() | |
first_qty = quantity_float - Decimal.to_float(acc) | |
vendor_id = | |
if one_pos.replenishment_id do | |
one_pos.replenishment.vendor_id | |
else | |
one_pos.vendor_id | |
end | |
new_pos1 = %{ | |
date: floor_date(Date.add(by_date, -lead_time)), | |
quantity: Decimal.new(multiple * ceil(first_qty / multiple)), | |
existing_line_item: nil, | |
lead_time: lead_time, | |
unit_price: one_pos.unit_price, | |
product_id: one_pos.product_id, | |
replenishment_id: one_pos.replenishment_id, | |
vendor_id: vendor_id, | |
destination_location_id: po.destination_location_id, | |
po_id: nil | |
} | |
new_pos2 = %{ | |
date: po.order_date, | |
quantity: | |
Decimal.from_float(one_pos_qty_float - multiple * ceil(first_qty / multiple)), | |
lead_time: lead_time, | |
existing_line_item: nil, | |
unit_price: one_pos.unit_price, | |
product_id: one_pos.product_id, | |
replenishment_id: one_pos.replenishment_id, | |
vendor_id: vendor_id, | |
destination_location_id: po.destination_location_id, | |
po_id: nil | |
} | |
if Decimal.gt?(one_pos.quantity, Decimal.new(0)) do | |
new_pos2 = %{new_pos2 | quantity: create_decimal(new_pos2.quantity)} | |
new_pos1 = %{new_pos1 | quantity: create_decimal(new_pos1.quantity)} | |
{Decimal.add(acc, one_pos.quantity), [new_pos1, new_pos2 | new_poses]} | |
else | |
new_pos1 = %{new_pos1 | quantity: create_decimal(new_pos1.quantity)} | |
{Decimal.add(acc, one_pos.quantity), [new_pos1 | new_poses]} | |
end | |
else | |
date = floor_date(Date.add(by_date, -lead_time)) | |
date = | |
if Date.before?(po.delivery_date, date) do | |
po.delivery_date | |
else | |
date | |
end | |
vendor_id = | |
if one_pos.replenishment_id do | |
one_pos.replenishment.vendor_id | |
else | |
one_pos.vendor_id | |
end | |
{Decimal.add(acc, one_pos.quantity), | |
[ | |
%{ | |
date: date, | |
quantity: one_pos.quantity, | |
lead_time: lead_time, | |
existing_line_item: one_pos.id, | |
unit_price: one_pos.unit_price, | |
product_id: one_pos.product_id, | |
replenishment_id: one_pos.replenishment_id, | |
vendor_id: vendor_id, | |
destination_location_id: po.destination_location_id, | |
po_id: po.id | |
} | |
| new_poses | |
]} | |
end | |
end | |
else | |
# Product id doesn't match | |
vendor_id = | |
if one_pos.replenishment_id do | |
one_pos.replenishment.vendor_id | |
else | |
one_pos.vendor_id | |
end | |
%{ | |
date: po.delivery_date, | |
quantity: one_pos.quantity, | |
lead_time: lead_time, | |
existing_line_item: one_pos.id, | |
unit_price: one_pos.unit_price, | |
product_id: one_pos.product_id, | |
replenishment_id: one_pos.replenishment_id, | |
vendor_id: vendor_id, | |
destination_location_id: po.destination_location_id | |
} | |
end | |
end) | |
poses | |
end | |
defp floor_date(date) do | |
if Date.after?(MyDate.today(), date) do | |
MyDate.today() | |
else | |
date | |
end | |
end | |
defp create_decimal(amount) when is_float(amount) do | |
Decimal.from_float(amount) | |
end | |
defp create_decimal(amount) do | |
Decimal.new(amount) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment