Skip to content

Instantly share code, notes, and snippets.

@Tostino
Last active April 9, 2026 21:26
Show Gist options
  • Select an option

  • Save Tostino/5ccfd6a60bcc38e1caff3949a28aa31a to your computer and use it in GitHub Desktop.

Select an option

Save Tostino/5ccfd6a60bcc38e1caff3949a28aa31a to your computer and use it in GitHub Desktop.
Postgres MV Partial Refresh Demo
-- demo.sql
-- Materialized view, triggers, and test cases.
-- Requires PostgreSQL with WHERE clause support on REFRESH MATERIALIZED VIEW.
-- Run setup.sql first.
----------------------------------------------------------------------
-- TEARDOWN: Clean slate for re-runnability
----------------------------------------------------------------------
-- 1. Drop the Materialized View (Cascade handles attached indexes)
DROP MATERIALIZED VIEW IF EXISTS invoice_totals CASCADE;
-- 2. Drop Immediate Maintenance triggers
DROP TRIGGER IF EXISTS refresh_on_line_insert ON invoice_lines;
DROP TRIGGER IF EXISTS refresh_on_line_update ON invoice_lines;
DROP TRIGGER IF EXISTS refresh_on_line_delete ON invoice_lines;
DROP TRIGGER IF EXISTS refresh_on_invoice_insert ON invoices;
DROP TRIGGER IF EXISTS refresh_on_invoice_update ON invoices;
DROP TRIGGER IF EXISTS refresh_on_invoice_delete ON invoices;
-- 3. Drop Deferred Maintenance triggers & staging table
DROP TRIGGER IF EXISTS queue_on_line_change ON invoice_lines;
DROP TRIGGER IF EXISTS queue_on_invoice_change ON invoices;
DROP TABLE IF EXISTS invoice_refresh_queue CASCADE;
-- 4. Drop Functions
DROP FUNCTION IF EXISTS refresh_invoice_totals() CASCADE;
DROP FUNCTION IF EXISTS queue_invoice_refresh() CASCADE;
DROP FUNCTION IF EXISTS drain_invoice_refresh_queue() CASCADE;
-- 5. Clean up test data if running demo.sql repeatedly without setup.sql
DELETE FROM invoice_lines
WHERE description IN ('Extra widget', 'Handling charge', 'Bonus item', 'State Tax', 'Queued item');
-- 6. Revert header mutations caused by tests to restore pure initial state
UPDATE invoices i
SET header_total = (
SELECT COALESCE(SUM(quantity * unit_price), 0)::numeric(12,2)
FROM invoice_lines l
WHERE l.invoice_id = i.invoice_id
)
WHERE invoice_id IN (1, 2, 3, 4, 10);
----------------------------------------------------------------------
-- Materialized view: per-invoice totals with balance check
----------------------------------------------------------------------
CREATE MATERIALIZED VIEW invoice_totals AS
SELECT
i.invoice_id,
i.vendor_id,
i.invoice_date,
i.header_total,
COALESCE(SUM(l.quantity * l.unit_price)
FILTER (WHERE l.line_type = 'sku'), 0) AS sku_total,
COALESCE(SUM(l.quantity * l.unit_price)
FILTER (WHERE l.line_type = 'lumpsum'), 0) AS lumpsum_total,
COALESCE(SUM(l.quantity * l.unit_price)
FILTER (WHERE l.line_type = 'tax'), 0) AS tax_total,
COALESCE(SUM(l.quantity * l.unit_price), 0)::numeric(12,2)
AS line_total,
i.header_total = COALESCE(SUM(l.quantity * l.unit_price), 0)::numeric(12,2)
AS is_balanced
FROM invoices i
LEFT JOIN invoice_lines l USING (invoice_id)
GROUP BY i.invoice_id;
CREATE UNIQUE INDEX IF NOT EXISTS invoice_totals_idx ON invoice_totals (invoice_id);
----------------------------------------------------------------------
-- Immediate View Maintenance: Synchronous statement-level triggers
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION refresh_invoice_totals()
RETURNS trigger LANGUAGE plpgsql AS $$
DECLARE
affected_ids int[];
BEGIN
IF TG_OP = 'INSERT' THEN
SELECT array_agg(DISTINCT invoice_id) INTO affected_ids FROM new_rows;
ELSIF TG_OP = 'UPDATE' THEN
SELECT array_agg(DISTINCT invoice_id) INTO affected_ids
FROM (SELECT invoice_id FROM new_rows
UNION
SELECT invoice_id FROM old_rows) combined;
ELSIF TG_OP = 'DELETE' THEN
SELECT array_agg(DISTINCT invoice_id) INTO affected_ids FROM old_rows;
END IF;
IF affected_ids IS NOT NULL THEN
EXECUTE 'REFRESH MATERIALIZED VIEW invoice_totals
WHERE invoice_id = ANY($1)' USING affected_ids;
END IF;
RETURN NULL;
END;
$$;
-- invoice_lines triggers
CREATE OR REPLACE TRIGGER refresh_on_line_insert
AFTER INSERT ON invoice_lines
REFERENCING NEW TABLE AS new_rows
FOR EACH STATEMENT EXECUTE FUNCTION refresh_invoice_totals();
CREATE OR REPLACE TRIGGER refresh_on_line_update
AFTER UPDATE ON invoice_lines
REFERENCING NEW TABLE AS new_rows OLD TABLE AS old_rows
FOR EACH STATEMENT EXECUTE FUNCTION refresh_invoice_totals();
CREATE OR REPLACE TRIGGER refresh_on_line_delete
AFTER DELETE ON invoice_lines
REFERENCING OLD TABLE AS old_rows
FOR EACH STATEMENT EXECUTE FUNCTION refresh_invoice_totals();
-- invoices triggers
CREATE OR REPLACE TRIGGER refresh_on_invoice_insert
AFTER INSERT ON invoices
REFERENCING NEW TABLE AS new_rows
FOR EACH STATEMENT EXECUTE FUNCTION refresh_invoice_totals();
CREATE OR REPLACE TRIGGER refresh_on_invoice_update
AFTER UPDATE ON invoices
REFERENCING NEW TABLE AS new_rows OLD TABLE AS old_rows
FOR EACH STATEMENT EXECUTE FUNCTION refresh_invoice_totals();
CREATE OR REPLACE TRIGGER refresh_on_invoice_delete
AFTER DELETE ON invoices
REFERENCING OLD TABLE AS old_rows
FOR EACH STATEMENT EXECUTE FUNCTION refresh_invoice_totals();
----------------------------------------------------------------------
-- Immediate View Maintenance: Test cases
----------------------------------------------------------------------
\echo '\n=== Immediate Test 1: All invoices start balanced ==='
\echo 'Verify: The unbalanced_count below must be exactly 0.'
SELECT count(*) AS unbalanced_count FROM invoice_totals WHERE NOT is_balanced;
\echo '\n=== Immediate Test 2: INSERT makes invoice 1 unbalanced ==='
\echo 'Action: Inserting 1 new line item (75.00 total) to invoice 1.'
INSERT INTO invoice_lines (invoice_id, line_type, description, quantity, unit_price)
VALUES (1, 'sku', 'Extra widget', 3, 25.00);
\echo 'Verify: is_balanced should be "f" because header_total is unchanged, but line_total grew.'
SELECT invoice_id, header_total, line_total, is_balanced
FROM invoice_totals WHERE invoice_id = 1;
\echo '\n=== Immediate Test 3: Fix header (trigger on invoices fires automatically) ==='
\echo 'Action: Updating invoice 1 header_total to match the new actual line sum.'
UPDATE invoices SET header_total = (
SELECT SUM(quantity * unit_price)::numeric(12,2) FROM invoice_lines WHERE invoice_id = 1
) WHERE invoice_id = 1;
\echo 'Verify: is_balanced should now be "t" (true) again.'
SELECT invoice_id, header_total, line_total, is_balanced
FROM invoice_totals WHERE invoice_id = 1;
\echo '\n=== Immediate Test 4: DELETE fires the trigger ==='
\echo 'Action: Deleting the extra line item we just added to invoice 1.'
DELETE FROM invoice_lines
WHERE invoice_id = 1 AND description = 'Extra widget';
\echo 'Verify: is_balanced should be "f" (false) because the header still includes the 75.00 we just deleted.'
SELECT invoice_id, header_total, line_total, is_balanced
FROM invoice_totals WHERE invoice_id = 1;
\echo '\n=== Immediate Test 5: Bulk insert across multiple invoices ==='
\echo 'Action: Inserting 3 different line items into invoices 2, 3, and 4.'
INSERT INTO invoice_lines (invoice_id, line_type, description, quantity, unit_price)
VALUES
(2, 'lumpsum', 'Handling charge', 1, 150.00),
(3, 'sku', 'Bonus item', 2, 30.00),
(4, 'tax', 'State Tax', 1, 25.00);
\echo 'Verify: All three invoices should have is_balanced = "f" (updated in a single trigger firing).'
SELECT invoice_id, header_total, line_total, is_balanced
FROM invoice_totals WHERE invoice_id IN (2, 3, 4)
ORDER BY invoice_id;
\echo '\n=== Immediate Test 6: Verify untouched invoices were not refreshed ==='
\echo 'Action: Checking invoice 500, which we never touched.'
\echo 'Verify: is_balanced should still be "t" (true).'
SELECT invoice_id, is_balanced
FROM invoice_totals WHERE invoice_id = 500;
\echo '\n=== Immediate Test 7: Per-type totals sum to line_total ==='
\echo 'Action: Calculating sum of individual type columns vs the line_total column.'
\echo 'Verify: sum_of_types must exactly match line_total.'
SELECT
invoice_id, sku_total, lumpsum_total, tax_total,
(sku_total + lumpsum_total + tax_total)::numeric(12,2) AS sum_of_types,
line_total
FROM invoice_totals WHERE invoice_id = 5;
----------------------------------------------------------------------
-- Deferred View Maintenance: Asynchronous refresh via staging table
-- Drop the immediate triggers first to avoid conflicts.
----------------------------------------------------------------------
DROP TRIGGER IF EXISTS refresh_on_line_insert ON invoice_lines;
DROP TRIGGER IF EXISTS refresh_on_line_update ON invoice_lines;
DROP TRIGGER IF EXISTS refresh_on_line_delete ON invoice_lines;
DROP TRIGGER IF EXISTS refresh_on_invoice_insert ON invoices;
DROP TRIGGER IF EXISTS refresh_on_invoice_update ON invoices;
DROP TRIGGER IF EXISTS refresh_on_invoice_delete ON invoices;
CREATE UNLOGGED TABLE IF NOT EXISTS invoice_refresh_queue (
invoice_id int PRIMARY KEY
);
-- Single row-level trigger handles all three operations via TG_OP.
CREATE OR REPLACE FUNCTION queue_invoice_refresh()
RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO invoice_refresh_queue (invoice_id)
VALUES (CASE WHEN TG_OP = 'DELETE' THEN OLD.invoice_id
ELSE NEW.invoice_id END)
ON CONFLICT DO NOTHING;
RETURN NULL;
END;
$$;
CREATE OR REPLACE TRIGGER queue_on_line_change
AFTER INSERT OR UPDATE OR DELETE ON invoice_lines
FOR EACH ROW
EXECUTE FUNCTION queue_invoice_refresh();
CREATE OR REPLACE TRIGGER queue_on_invoice_change
AFTER INSERT OR UPDATE OR DELETE ON invoices
FOR EACH ROW
EXECUTE FUNCTION queue_invoice_refresh();
-- Drain function: atomic dequeue + partial refresh
CREATE OR REPLACE FUNCTION drain_invoice_refresh_queue()
RETURNS void LANGUAGE plpgsql AS $$
DECLARE
queued_ids int[];
BEGIN
WITH deleted AS (
DELETE FROM invoice_refresh_queue RETURNING invoice_id
)
SELECT array_agg(invoice_id) INTO queued_ids FROM deleted;
IF queued_ids IS NOT NULL THEN
EXECUTE 'REFRESH MATERIALIZED VIEW invoice_totals
WHERE invoice_id = ANY($1)' USING queued_ids;
END IF;
END;
$$;
----------------------------------------------------------------------
-- Deferred View Maintenance: Test cases
----------------------------------------------------------------------
\echo '\n=== Deferred Test 1: Record initial state ==='
\echo 'Action: Looking at invoice 10 before making changes.'
\echo 'Verify: It should be balanced (t) with matching totals.'
SELECT invoice_id, header_total, line_total, is_balanced
FROM invoice_totals WHERE invoice_id = 10;
\echo '\n=== Deferred Test 2: Perform multiple writes (cross-table) ==='
\echo 'Action: Inserting a +99.00 line item AND updating the invoice header_total to +99.00.'
-- 1. Insert a new line item
INSERT INTO invoice_lines (invoice_id, line_type, description, quantity, unit_price)
VALUES (10, 'sku', 'Queued item', 1, 99.00);
-- 2. Update the header to keep it balanced
UPDATE invoices
SET header_total = header_total + 99.00
WHERE invoice_id = 10;
\echo '\n=== Deferred Test 3: Verify the queue de-duplicates changes ==='
\echo 'Verify: The queue should have EXACTLY 1 row for invoice 10.'
\echo '(Both triggers fired, but ON CONFLICT DO NOTHING collapsed them).'
SELECT invoice_id FROM invoice_refresh_queue WHERE invoice_id = 10;
\echo '\n=== Deferred Test 4: Verify MV remains completely stale before draining ==='
\echo 'Verify: The output below should EXACTLY match Deferred Test 1.'
\echo 'The MV knows nothing about our recent +99.00 changes yet.'
SELECT invoice_id, header_total, line_total, is_balanced
FROM invoice_totals WHERE invoice_id = 10;
\echo '\n=== Deferred Test 5: Drain the queue ==='
\echo 'Action: Running drain_invoice_refresh_queue().'
SELECT drain_invoice_refresh_queue();
\echo 'Verify: The queue should now be empty (0).'
SELECT count(*) AS queue_remaining FROM invoice_refresh_queue;
\echo '\n=== Deferred Test 6: Verify MV reflects all collapsed changes ==='
\echo 'Verify: Both header_total and line_total should be 99.00 higher than in Test 1, and is_balanced = t.'
SELECT invoice_id, header_total, line_total, is_balanced
FROM invoice_totals WHERE invoice_id = 10;
-- setup.sql
-- Base schema and sample data for testing Incremental View Maintenance (IVM).
-- Provides the foundation for both Immediate and Deferred maintenance patterns.
--
-- Adjust invoice count here:
\set num_invoices 100000
DROP TABLE IF EXISTS invoice_lines CASCADE;
DROP TABLE IF EXISTS invoices CASCADE;
-- Cleanup for the Deferred View Maintenance staging table
DROP TABLE IF EXISTS invoice_refresh_queue CASCADE;
-- Base tables utilized by both Immediate and Deferred maintenance strategies
CREATE TABLE invoices (
invoice_id int PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
vendor_id int NOT NULL,
invoice_date date NOT NULL DEFAULT current_date,
header_total numeric(12,2) NOT NULL
);
CREATE TABLE invoice_lines (
line_id int PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
invoice_id int NOT NULL REFERENCES invoices(invoice_id),
line_type text NOT NULL CHECK (line_type IN ('sku', 'lumpsum', 'freight', 'tax', 'discount')),
description text,
quantity numeric(10,2) NOT NULL DEFAULT 1,
unit_price numeric(12,2) NOT NULL
);
CREATE INDEX ON invoice_lines (invoice_id);
-- Generate invoices
INSERT INTO invoices (vendor_id, invoice_date, header_total)
SELECT
(random() * 200 + 1)::int,
current_date - (random() * 730)::int,
0 -- placeholder, corrected after lines
FROM generate_series(1, :num_invoices);
-- Generate 3-25 lines per invoice with varied type distribution
-- ~44% sku, ~22% lumpsum, ~11% each freight/tax/discount
INSERT INTO invoice_lines (invoice_id, line_type, description, quantity, unit_price)
SELECT
i.invoice_id,
(ARRAY['sku','sku','sku','sku','lumpsum','lumpsum','freight','tax','discount'])[
floor(random() * 9 + 1)::int
],
'Line ' || g,
CASE
WHEN random() < 0.6 THEN (random() * 50 + 1)::numeric(10,2)
ELSE 1
END,
CASE
WHEN random() < 0.1 THEN -(random() * 100 + 1)::numeric(12,2)
ELSE (random() * 500 + 1)::numeric(12,2)
END
FROM invoices i
CROSS JOIN LATERAL generate_series(1, (random() * 22 + 3)::int) AS g;
-- Set header_total = sum of lines so all invoices start balanced
UPDATE invoices i
SET header_total = sub.total
FROM (
SELECT invoice_id, COALESCE(SUM(quantity * unit_price), 0)::numeric(12,2) AS total
FROM invoice_lines
GROUP BY invoice_id
) sub
WHERE sub.invoice_id = i.invoice_id;
\echo 'Setup complete. Base tables are ready for IVM pattern testing.'
\echo ''
SELECT count(*) AS invoice_count FROM invoices;
SELECT count(*) AS line_count FROM invoice_lines;
SELECT line_type, count(*) AS cnt FROM invoice_lines GROUP BY line_type ORDER BY cnt DESC;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment