Last active
February 1, 2026 20:59
-
-
Save hernandohhoyos/7a6e75f98e52cc5a9c9562b102152dfe to your computer and use it in GitHub Desktop.
SQL: Audit Log Using BlockChain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Audit logging implementing blockchain-inspired principles. ( Postgres 18 ). | |
| * | |
| * This system ensures data integrity and traceability by chaining transaction logs | |
| * through cryptographic hashing. | |
| * | |
| * Advantages: | |
| * - High Integrity: Each log entry is linked to the previous one, making tampering detectable. | |
| * - Traceability: Complete history of changes (snapshots/deltas) for auditing and recovery. | |
| * - Non-repudiation: Provides a verifiable record of operations performed on the database. | |
| * | |
| * Disadvantages: | |
| * - Performance Overhead: Additional processing required for hashing and snapshot generation on every write. | |
| * - Storage Growth: Audit logs and snapshots can consume significant disk space over time. | |
| * - Complexity: Increased system complexity for maintenance and data verification. | |
| */ | |
| CREATE EXTENSION IF NOT EXISTS pgcrypto; | |
| -- 1. Table: integrations | |
| CREATE TABLE IF NOT EXISTS integrations ( | |
| id UUID PRIMARY KEY DEFAULT uuidv7(), | |
| api_key VARCHAR(50) NOT NULL, | |
| is_active BOOLEAN DEFAULT TRUE, -- For logical deletion | |
| created_at TIMESTAMPTZ DEFAULT now(), | |
| expires_at TIMESTAMPTZ DEFAULT now(), | |
| owner_full_name VARCHAR(50), | |
| owner_email VARCHAR(50) | |
| ); | |
| -- Index for API key lookups (often used). | |
| CREATE INDEX idx_integrations_api_key ON integrations(api_key); | |
| CREATE INDEX idx_integrations_is_active ON integrations(is_active); | |
| CREATE INDEX idx_integrations_owner_email ON integrations(owner_email); | |
| -- 2. Table: transactions (Blockchain - Audit Log). | |
| CREATE TABLE IF NOT EXISTS transactions ( | |
| id UUID DEFAULT uuidv7(), | |
| db_user TEXT DEFAULT current_user, | |
| created_at TIMESTAMPTZ DEFAULT now(), | |
| table_name VARCHAR(25) NOT NULL, | |
| operation VARCHAR(10) NOT NULL, | |
| record_id UUID NOT NULL, | |
| snapshot_hash TEXT, -- SHA256 of the row data (Integrity) in hex format. | |
| snapshot JSONB, -- Full or delta record snapshot at time of transaction (Recovery/Audit). | |
| PRIMARY KEY (id, created_at) | |
| ) PARTITION BY RANGE (created_at); | |
| CREATE TABLE IF NOT EXISTS transactions_2026_2 PARTITION OF transactions FOR VALUES FROM (make_date(2026,2,1)) to (make_date(2026,3,1)); | |
| CREATE TABLE IF NOT EXISTS last_transaction ( | |
| record_id UUID PRIMARY KEY NOT NULL, | |
| transaction_hash TEXT NOT NULL | |
| ); | |
| -- 4. Generic Audit Trigger Function. | |
| CREATE OR REPLACE FUNCTION jsonb_diff(l JSONB, r JSONB) | |
| RETURNS JSONB AS $$ | |
| BEGIN | |
| RETURN ( | |
| SELECT jsonb_object_agg(a.key, a.value) | |
| FROM jsonb_each(l) a | |
| LEFT JOIN jsonb_each(r) b ON a.key = b.key | |
| WHERE a.value IS DISTINCT FROM b.value OR b.key IS NULL | |
| ); | |
| END; | |
| $$ LANGUAGE plpgsql; | |
| CREATE OR REPLACE FUNCTION log_transaction() | |
| RETURNS TRIGGER AS $$ | |
| DECLARE | |
| rec_id UUID; | |
| hashed_data TEXT; | |
| prev_hash TEXT; | |
| rec_snapshot JSONB; | |
| BEGIN | |
| -- Gets Snapshot. | |
| IF (TG_OP = 'UPDATE') THEN | |
| rec_id := NEW.id; | |
| rec_snapshot := jsonb_diff(to_jsonb(NEW), to_jsonb(OLD)); | |
| ELSIF (TG_OP = 'DELETE') THEN | |
| rec_id := OLD.id; | |
| rec_snapshot := to_jsonb(OLD); | |
| ELSE | |
| rec_id := NEW.id; | |
| rec_snapshot := to_jsonb(NEW); | |
| END IF; | |
| -- Gets previous hash. | |
| SELECT transaction_hash INTO prev_hash FROM last_transaction WHERE record_id = rec_id; | |
| IF prev_hash IS NULL THEN | |
| prev_hash := '00000000000000000000000000000000'; | |
| END IF; | |
| -- Calculate Hash. | |
| hashed_data := encode(digest(rec_snapshot::TEXT || prev_hash, 'sha256'), 'hex'); | |
| -- Insert blockchain. | |
| INSERT INTO transactions (table_name, operation, record_id, snapshot_hash, snapshot) | |
| VALUES (TG_TABLE_NAME, TG_OP, rec_id, hashed_data, rec_snapshot); | |
| INSERT INTO last_transaction (record_id, transaction_hash) | |
| VALUES (rec_id, hashed_data) | |
| ON CONFLICT (record_id) | |
| DO UPDATE SET | |
| transaction_hash = EXCLUDED.transaction_hash; | |
| -- ¡IMPORTANT! | |
| IF (TG_OP = 'DELETE') THEN | |
| RETURN OLD; | |
| END IF; | |
| RETURN NEW; | |
| END; | |
| $$ LANGUAGE plpgsql; | |
| -- 5. Apply Audit Trigger to integrations. | |
| CREATE OR REPLACE TRIGGER audit_integrations_changes | |
| AFTER INSERT OR UPDATE OR DELETE ON integrations | |
| FOR EACH ROW | |
| EXECUTE FUNCTION log_transaction(); | |
| -- 6. Audit the transactions integrity. | |
| CREATE OR REPLACE FUNCTION verify_blockchain_integrity(record_id_ UUID) | |
| RETURNS TABLE(transaction_id UUID, status TEXT, expected_hash TEXT, stored_hash TEXT) AS $$ | |
| DECLARE | |
| curr_record RECORD; | |
| prev_hash TEXT; | |
| calculated_hash TEXT; | |
| BEGIN | |
| -- Getting the first hash to get the seed. | |
| SELECT snapshot_hash INTO prev_hash | |
| FROM transactions | |
| WHERE record_id = record_id_ | |
| ORDER BY created_at ASC LIMIT 1; | |
| -- If there is no previous one, it means we are validating from the genesis. | |
| IF prev_hash IS NULL THEN | |
| status := 'INVALID_HASH_DETECTION: There no transactions.'; | |
| RETURN NEXT; | |
| END IF; | |
| -- go through the records in chronological order. | |
| FOR curr_record IN | |
| SELECT id, snapshot_hash, snapshot | |
| FROM transactions | |
| WHERE record_id = record_id_ | |
| ORDER BY created_at ASC | |
| OFFSET 1 | |
| LOOP | |
| -- Re-calculates the hash. | |
| calculated_hash := encode(digest(curr_record.snapshot::TEXT || prev_hash, 'sha256'), 'hex'); | |
| transaction_id := curr_record.id; | |
| stored_hash := curr_record.snapshot_hash; | |
| expected_hash := calculated_hash; | |
| IF (calculated_hash = curr_record.snapshot_hash) THEN | |
| status := 'OK'; | |
| ELSE | |
| status := 'INVALID_HASH_DETECTION in id %s', curr_record.id; | |
| END IF; | |
| -- The current hash becomes the 'prev_hash' for the next cycle. | |
| prev_hash := curr_record.snapshot_hash; | |
| RETURN NEXT; | |
| END LOOP; | |
| END; | |
| $$ LANGUAGE plpgsql; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment