Last active
April 15, 2025 17:55
-
-
Save jonatas/434b949313cc1e2f2f75a06ebe7298ee to your computer and use it in GitHub Desktop.
Cumulative example leveraging continuous aggregates pre-computed + real-time values.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- 1. Create the extension (if not already created) | |
CREATE EXTENSION IF NOT EXISTS timescaledb; | |
-- 2. Create the base order table without storing cumulative values | |
DROP TABLE IF EXISTS table_order CASCADE; | |
CREATE TABLE table_order ( | |
time TIMESTAMPTZ NOT NULL DEFAULT NOW(), | |
address TEXT NOT NULL, | |
customer TEXT NOT NULL, | |
amount_order_A INTEGER NOT NULL, | |
amount_order_B INTEGER NOT NULL, | |
total_amount_A INTEGER NOT NULL DEFAULT 0, | |
total_amount_B INTEGER NOT NULL DEFAULT 0, | |
total_orders INTEGER NOT NULL DEFAULT 0 | |
); | |
-- 3. Convert to a TimescaleDB hypertable with optimized chunk size | |
SELECT create_hypertable('table_order', 'time', | |
chunk_time_interval => INTERVAL '1 day', | |
if_not_exists => TRUE | |
); | |
-- 4. Create optimized indices for various query patterns | |
CREATE INDEX IF NOT EXISTS idx_address_time ON table_order (address, time DESC); | |
CREATE INDEX IF NOT EXISTS idx_customer_time ON table_order (customer, time DESC); | |
CREATE INDEX IF NOT EXISTS idx_time_address ON table_order (time DESC, address); | |
CREATE INDEX IF NOT EXISTS idx_address_customer ON table_order (address, customer); | |
-- 5. Set up table compression with segmentation | |
ALTER TABLE table_order SET ( | |
timescaledb.compress_segmentby = 'address,customer', | |
timescaledb.compress_orderby = 'time DESC' | |
); | |
-- 6. Add compression policy (after 7 days) | |
SELECT add_compression_policy('table_order', INTERVAL '7 days'); | |
-- 7. Create trigger function to maintain cumulative values | |
CREATE OR REPLACE FUNCTION update_cumulative_values() | |
RETURNS TRIGGER AS $$ | |
DECLARE | |
last_total_a INTEGER; | |
last_total_b INTEGER; | |
last_orders INTEGER; | |
BEGIN | |
-- Get the last cumulative values for this address and customer | |
SELECT | |
COALESCE(total_amount_A, 0), | |
COALESCE(total_amount_B, 0), | |
COALESCE(total_orders, 0) | |
INTO | |
last_total_a, | |
last_total_b, | |
last_orders | |
FROM table_order | |
WHERE address = NEW.address | |
AND customer = NEW.customer | |
AND time < NEW.time | |
ORDER BY time DESC | |
LIMIT 1; | |
-- Update the new row with cumulative values | |
NEW.total_amount_A := COALESCE(last_total_a, 0) + NEW.amount_order_A; | |
NEW.total_amount_B := COALESCE(last_total_b, 0) + NEW.amount_order_B; | |
NEW.total_orders := COALESCE(last_orders, 0) + 1; | |
RETURN NEW; | |
END; | |
$$ LANGUAGE plpgsql; | |
-- 8. Create the trigger | |
DROP TRIGGER IF EXISTS trg_update_cumulative ON table_order; | |
CREATE TRIGGER trg_update_cumulative | |
BEFORE INSERT ON table_order | |
FOR EACH ROW | |
EXECUTE FUNCTION update_cumulative_values(); | |
-- 9. Add retention policy (90 days) | |
SELECT add_retention_policy('table_order', INTERVAL '90 days'); | |
-- 10. Function to simulate real-world ingestion patterns | |
CREATE OR REPLACE FUNCTION simulate_real_workload( | |
p_duration_seconds INTEGER DEFAULT 10, | |
p_rows_per_second INTEGER DEFAULT 50, | |
p_num_addresses INTEGER DEFAULT 250000 | |
) RETURNS TABLE ( | |
duration_seconds INTEGER, | |
total_rows_inserted INTEGER, | |
avg_rows_per_second NUMERIC, | |
min_latency_ms NUMERIC, | |
max_latency_ms NUMERIC, | |
p95_latency_ms NUMERIC | |
) AS $$ | |
DECLARE | |
start_time TIMESTAMPTZ; | |
end_time TIMESTAMPTZ; | |
iteration_start TIMESTAMPTZ; | |
iteration_end TIMESTAMPTZ; | |
total_rows INTEGER := 0; | |
i INTEGER; | |
j INTEGER; | |
latencies NUMERIC[]; | |
address TEXT; | |
customer TEXT; | |
amount_a INTEGER; | |
amount_b INTEGER; | |
batch_latency NUMERIC; | |
BEGIN | |
start_time := clock_timestamp(); | |
-- Simulate workload for p_duration_seconds | |
WHILE clock_timestamp() < start_time + make_interval(secs := p_duration_seconds) LOOP | |
iteration_start := clock_timestamp(); | |
-- Insert p_rows_per_second rows | |
FOR j IN 1..p_rows_per_second LOOP | |
-- Generate realistic data | |
address := 'address_' || (random() * p_num_addresses)::INTEGER; | |
customer := 'customer_' || (random() * 1000)::INTEGER; | |
amount_a := (random() * 100)::INTEGER; | |
amount_b := (random() * 100)::INTEGER; | |
-- Insert data | |
INSERT INTO table_order (time, address, customer, amount_order_A, amount_order_B) | |
VALUES (clock_timestamp(), address, customer, amount_a, amount_b); | |
total_rows := total_rows + 1; | |
END LOOP; | |
iteration_end := clock_timestamp(); | |
batch_latency := extract(epoch from (iteration_end - iteration_start)) * 1000; | |
latencies := array_append(latencies, batch_latency); | |
-- Sleep if we completed faster than 1 second | |
IF iteration_end < iteration_start + interval '1 second' THEN | |
PERFORM pg_sleep(extract(epoch from (iteration_start + interval '1 second' - iteration_end))); | |
END IF; | |
END LOOP; | |
end_time := clock_timestamp(); | |
-- Calculate statistics | |
RETURN QUERY | |
SELECT | |
p_duration_seconds, | |
total_rows, | |
(total_rows::NUMERIC / extract(epoch from (end_time - start_time)))::NUMERIC, | |
(SELECT min(l) FROM unnest(latencies) l)::NUMERIC, | |
(SELECT max(l) FROM unnest(latencies) l)::NUMERIC, | |
(SELECT percentile_cont(0.95) WITHIN GROUP (ORDER BY l) FROM unnest(latencies) l)::NUMERIC; | |
-- Log performance statistics | |
RAISE NOTICE 'Performance Report:'; | |
RAISE NOTICE '----------------'; | |
RAISE NOTICE 'Duration: % seconds', p_duration_seconds; | |
RAISE NOTICE 'Total Rows: %', total_rows; | |
RAISE NOTICE 'Avg Rows/Second: %', (total_rows::NUMERIC / extract(epoch from (end_time - start_time)))::NUMERIC; | |
RAISE NOTICE 'Min Latency: % ms', (SELECT min(l) FROM unnest(latencies) l); | |
RAISE NOTICE 'Max Latency: % ms', (SELECT max(l) FROM unnest(latencies) l); | |
RAISE NOTICE 'P95 Latency: % ms', (SELECT percentile_cont(0.95) WITHIN GROUP (ORDER BY l) FROM unnest(latencies) l); | |
END; | |
$$ LANGUAGE plpgsql; | |
-- 11. Run POC demonstration | |
DO $$ | |
DECLARE | |
v_address TEXT; | |
v_customer TEXT; | |
v_cum_a INTEGER; | |
v_cum_b INTEGER; | |
v_cum_orders INTEGER; | |
BEGIN | |
-- Clear existing data | |
RAISE NOTICE 'Starting POC demonstration...'; | |
RAISE NOTICE '----------------------------'; | |
TRUNCATE table_order; | |
-- Test 1: Basic ingestion rate (10 seconds) | |
RAISE NOTICE 'Test 1: Basic ingestion rate (10 seconds)'; | |
PERFORM simulate_real_workload(10, 50, 250000); | |
-- Test 2: High-volume test (5 seconds, 200 rows/sec) | |
RAISE NOTICE 'Test 2: High-volume test (5 seconds, 200 rows/sec)'; | |
PERFORM simulate_real_workload(5, 200, 250000); | |
-- Test 3: Burst test (2 seconds, 1000 rows/sec) | |
RAISE NOTICE 'Test 3: Burst test (2 seconds, 1000 rows/sec)'; | |
PERFORM simulate_real_workload(2, 1000, 250000); | |
-- Final report | |
RAISE NOTICE 'POC Summary:'; | |
RAISE NOTICE '-----------'; | |
RAISE NOTICE 'Total rows in table: %', (SELECT count(*) FROM table_order); | |
RAISE NOTICE 'Unique addresses: %', (SELECT count(DISTINCT address) FROM table_order); | |
-- Demonstrate the cumulative values | |
RAISE NOTICE 'Cumulative Values Example:'; | |
RAISE NOTICE '-------------------------'; | |
FOR v_address, v_customer, v_cum_a, v_cum_b, v_cum_orders IN | |
SELECT | |
address, | |
customer, | |
total_amount_A, | |
total_amount_B, | |
total_orders | |
FROM table_order | |
WHERE total_amount_A > 0 | |
ORDER BY time DESC | |
LIMIT 5 | |
LOOP | |
RAISE NOTICE 'Address: %, Customer: %, Cumulative A: %, Cumulative B: %, Total Orders: %', | |
v_address, v_customer, v_cum_a, v_cum_b, v_cum_orders; | |
END LOOP; | |
END $$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment