Skip to content

Instantly share code, notes, and snippets.

@niquola
Last active February 28, 2026 22:32
Show Gist options
  • Select an option

  • Save niquola/022d0346e343d0b388d4a8dc041c7d2c to your computer and use it in GitHub Desktop.

Select an option

Save niquola/022d0346e343d0b388d4a8dc041c7d2c to your computer and use it in GitHub Desktop.
impl1 Bun migration CLI+library and integration tests
import {
afterAll,
beforeAll,
beforeEach,
describe,
expect,
setDefaultTimeout,
test,
} from "bun:test";
import { SQL } from "bun";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join, resolve } from "node:path";
const DATABASE_URL =
process.env.TEST_DATABASE_URL ??
"postgres://postgres:postgres@localhost:55432/migrator";
const CLI_PATH = resolve(import.meta.dir, "..", "src", "migrate.ts");
const LOCK_ID = 791730412;
const db = new SQL(DATABASE_URL);
const tempDirs: string[] = [];
setDefaultTimeout(30_000);
type CliResult = {
code: number;
stdout: string;
stderr: string;
};
async function waitForDb(): Promise<void> {
for (let attempt = 0; attempt < 30; attempt += 1) {
try {
await db`SELECT 1`;
return;
} catch {
await Bun.sleep(1000);
}
}
throw new Error(`Postgres is not reachable at ${DATABASE_URL}`);
}
async function resetDb(): Promise<void> {
await db.unsafe(`
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;
GRANT ALL ON SCHEMA public TO postgres;
GRANT ALL ON SCHEMA public TO public;
`);
}
async function createWorkspace(): Promise<string> {
const workspace = await mkdtemp(join(tmpdir(), "impl1-"));
tempDirs.push(workspace);
await mkdir(join(workspace, "migrations"), { recursive: true });
return workspace;
}
async function writeMigration(
workspace: string,
version: string,
upSql: string,
downSql: string,
): Promise<void> {
await writeFile(join(workspace, "migrations", `${version}.up.sql`), upSql, "utf8");
await writeFile(join(workspace, "migrations", `${version}.down.sql`), downSql, "utf8");
}
async function runCli(workspace: string, ...args: string[]): Promise<CliResult> {
const proc = Bun.spawn(["bun", CLI_PATH, ...args], {
cwd: workspace,
env: { ...process.env, DATABASE_URL },
stdout: "pipe",
stderr: "pipe",
});
const [code, stdout, stderr] = await Promise.all([
proc.exited,
new Response(proc.stdout).text(),
new Response(proc.stderr).text(),
]);
return { code, stdout, stderr };
}
describe("migration CLI integration", () => {
beforeAll(async () => {
await waitForDb();
});
beforeEach(async () => {
await resetDb();
});
afterAll(async () => {
await db.close();
await Promise.all(
tempDirs.map((workspace) => rm(workspace, { recursive: true, force: true })),
);
});
test("applies migrations in lexical order", async () => {
const workspace = await createWorkspace();
const v1 = "20260228120000_create_users";
const v2 = "20260228120100_create_posts";
await writeMigration(
workspace,
v1,
"CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL);",
"DROP TABLE users;",
);
await writeMigration(
workspace,
v2,
"CREATE TABLE posts (id INTEGER PRIMARY KEY, user_id INTEGER NOT NULL);",
"DROP TABLE posts;",
);
const result = await runCli(workspace, "up");
expect(result.code).toBe(0);
const rows = (await db.unsafe(
"SELECT version FROM schema_migrations ORDER BY version ASC",
)) as Array<{ version: string }>;
expect(rows.map((row) => row.version)).toEqual([v1, v2]);
const [{ users }] = (await db.unsafe(
"SELECT to_regclass('public.users') AS users",
)) as Array<{ users: string | null }>;
const [{ posts }] = (await db.unsafe(
"SELECT to_regclass('public.posts') AS posts",
)) as Array<{ posts: string | null }>;
expect(users).not.toBeNull();
expect(posts).not.toBeNull();
});
test("rolls back last migration", async () => {
const workspace = await createWorkspace();
const v1 = "20260228121000_create_users";
const v2 = "20260228121100_create_posts";
await writeMigration(
workspace,
v1,
"CREATE TABLE users (id INTEGER PRIMARY KEY);",
"DROP TABLE users;",
);
await writeMigration(
workspace,
v2,
"CREATE TABLE posts (id INTEGER PRIMARY KEY);",
"DROP TABLE posts;",
);
const upResult = await runCli(workspace, "up");
expect(upResult.code).toBe(0);
const downResult = await runCli(workspace, "down", "1");
expect(downResult.code).toBe(0);
const rows = (await db.unsafe(
"SELECT version FROM schema_migrations ORDER BY version ASC",
)) as Array<{ version: string }>;
expect(rows.map((row) => row.version)).toEqual([v1]);
const [{ users }] = (await db.unsafe(
"SELECT to_regclass('public.users') AS users",
)) as Array<{ users: string | null }>;
const [{ posts }] = (await db.unsafe(
"SELECT to_regclass('public.posts') AS posts",
)) as Array<{ posts: string | null }>;
expect(users).not.toBeNull();
expect(posts).toBeNull();
});
test("up is idempotent", async () => {
const workspace = await createWorkspace();
const v1 = "20260228122000_create_users";
await writeMigration(
workspace,
v1,
"CREATE TABLE users (id INTEGER PRIMARY KEY);",
"DROP TABLE users;",
);
const first = await runCli(workspace, "up");
const second = await runCli(workspace, "up");
expect(first.code).toBe(0);
expect(second.code).toBe(0);
const [{ count }] = (await db.unsafe(
"SELECT COUNT(*)::int AS count FROM schema_migrations",
)) as Array<{ count: number }>;
expect(count).toBe(1);
});
test("fails on checksum drift", async () => {
const workspace = await createWorkspace();
const v1 = "20260228123000_create_users";
const upPath = join(workspace, "migrations", `${v1}.up.sql`);
await writeMigration(
workspace,
v1,
"CREATE TABLE users (id INTEGER PRIMARY KEY);",
"DROP TABLE users;",
);
const upResult = await runCli(workspace, "up");
expect(upResult.code).toBe(0);
await writeFile(
upPath,
"CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT);",
"utf8",
);
const statusResult = await runCli(workspace, "status");
expect(statusResult.code).toBe(1);
expect(statusResult.stderr).toContain(`Checksum mismatch for ${v1}`);
});
test("fails when applied migration file is missing", async () => {
const workspace = await createWorkspace();
const v1 = "20260228124000_create_users";
const upPath = join(workspace, "migrations", `${v1}.up.sql`);
await writeMigration(
workspace,
v1,
"CREATE TABLE users (id INTEGER PRIMARY KEY);",
"DROP TABLE users;",
);
const upResult = await runCli(workspace, "up");
expect(upResult.code).toBe(0);
await rm(upPath);
const statusResult = await runCli(workspace, "status");
expect(statusResult.code).toBe(1);
expect(statusResult.stderr).toContain(`Applied migration ${v1} missing from disk`);
});
test("fails fast when lock is already held", async () => {
const workspace = await createWorkspace();
const v1 = "20260228125000_create_users";
await writeMigration(
workspace,
v1,
"CREATE TABLE users (id INTEGER PRIMARY KEY);",
"DROP TABLE users;",
);
await db`SELECT pg_advisory_lock(${LOCK_ID})`;
try {
const upResult = await runCli(workspace, "up");
expect(upResult.code).toBe(1);
expect(upResult.stderr).toContain("Another migration is running");
} finally {
await db`SELECT pg_advisory_unlock(${LOCK_ID})`;
}
});
});
/**
* Migration System - Single-File Design Contract
*
* Source docs:
* - /Users/niquola/experiment/design.final.md
* - /Users/niquola/experiment/arch.final.md
*
* Goals:
* - One implementation file (`src/migrate.ts`)
* - Zero external deps for migration logic (Bun + bun:sql only)
* - Production-safe defaults with minimal complexity
*
* Migration format:
* - Required pair per version:
* `YYYYMMDDHHMMSS_name.up.sql`
* `YYYYMMDDHHMMSS_name.down.sql`
* - Strict filename pattern; invalid names are rejected.
* - Lexicographic ordering by version.
*
* Core safety model:
* - Tracking table: `schema_migrations(version, checksum, applied_at)`
* - Checksum: SHA-256 of `.up.sql`
* - Integrity checks before `up`, `down`, `status`:
* 1) Every applied DB version must have local `.up.sql`
* 2) Applied checksum must match current `.up.sql` content
* - Locking: fail-fast `pg_try_advisory_lock` with constant lock id
* - Transaction policy: each migration executes inside `db.begin()`
*
* CLI interface:
* - `bun src/migrate.ts create <name>`
* - `bun src/migrate.ts up [count]`
* - `bun src/migrate.ts down [count]` (default: 1)
* - `bun src/migrate.ts status`
*
* Library interface:
* - `create(name, options)`
* - `up({ databaseUrl?, migrationsDir?, logger?, db?, count? })`
* - `down({ databaseUrl?, migrationsDir?, logger?, db?, count? })`
* - `status({ databaseUrl?, migrationsDir?, logger?, db? })`
*
* File acts as both CLI and library:
* - CLI runs only under `import.meta.main`
* - Imports do not trigger process-level side effects
*/
import { SQL } from "bun";
import { lstat, mkdir, writeFile } from "node:fs/promises";
import { join, resolve } from "node:path";
const LOCK_ID = 791730412;
const FILE_PATTERN = /^(\d{14}_[a-z0-9_]+)\.(up|down)\.sql$/;
type Direction = "up" | "down";
type MigrationFile = {
version: string;
direction: Direction;
path: string;
checksum: string;
};
type MigrationPair = {
version: string;
up: MigrationFile;
down: MigrationFile;
};
type IncompleteMigration = {
version: string;
up?: MigrationFile;
down?: MigrationFile;
};
type Applied = {
version: string;
checksum: string;
applied_at: string;
};
export type StatusResult = {
applied: string[];
pending: string[];
};
export type RunnerOptions = {
databaseUrl?: string;
migrationsDir?: string;
logger?: (message: string) => void;
db?: SQL;
};
function usage(): string {
return [
"Usage:",
" bun src/migrate.ts create <name>",
" bun src/migrate.ts up [count]",
" bun src/migrate.ts down [count]",
" bun src/migrate.ts status",
].join("\n");
}
function checksum(content: string): string {
return Bun.CryptoHasher.hash("sha256", content, "hex") as string;
}
function parseCount(value: string): number {
const parsed = Number.parseInt(value, 10);
if (!Number.isInteger(parsed) || parsed <= 0) {
throw new Error(`Invalid count: ${value}`);
}
return parsed;
}
function slugify(name: string): string {
return name
.trim()
.toLowerCase()
.replace(/[^a-z0-9]+/g, "_")
.replace(/^_+|_+$/g, "")
.replace(/_+/g, "_");
}
function utcTimestamp(): string {
const now = new Date();
const pad = (value: number) => value.toString().padStart(2, "0");
return [
now.getUTCFullYear().toString(),
pad(now.getUTCMonth() + 1),
pad(now.getUTCDate()),
pad(now.getUTCHours()),
pad(now.getUTCMinutes()),
pad(now.getUTCSeconds()),
].join("");
}
function resolveMigrationsDir(options: RunnerOptions): string {
return options.migrationsDir ?? resolve(process.cwd(), "migrations");
}
function resolveDatabaseUrl(options: RunnerOptions): string {
if (options.db) {
return "";
}
const databaseUrl = options.databaseUrl ?? process.env.DATABASE_URL;
if (!databaseUrl) {
throw new Error("DATABASE_URL is required");
}
return databaseUrl;
}
function logMessage(options: RunnerOptions, message: string): void {
if (options.logger) {
options.logger(message);
}
}
async function withDb<T>(
options: RunnerOptions,
fn: (db: SQL, migrationsDir: string) => Promise<T>,
): Promise<T> {
const migrationsDir = resolveMigrationsDir(options);
if (options.db) {
return fn(options.db, migrationsDir);
}
const db = new SQL(resolveDatabaseUrl(options));
try {
return await fn(db, migrationsDir);
} finally {
await db.close();
}
}
async function discoverMigrations(migrationsDir: string): Promise<Map<string, IncompleteMigration>> {
const names = new Set<string>();
try {
for (const name of new Bun.Glob("*").scanSync(migrationsDir)) {
names.add(name);
}
for (const name of new Bun.Glob(".*").scanSync(migrationsDir)) {
names.add(name);
}
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code === "ENOENT") {
return new Map();
}
throw error;
}
const byVersion = new Map<string, IncompleteMigration>();
for (const name of names) {
const path = join(migrationsDir, name);
let isFile = false;
try {
isFile = (await lstat(path)).isFile();
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code === "ENOENT") {
continue;
}
throw error;
}
if (!isFile) {
continue;
}
const match = name.match(FILE_PATTERN);
if (!match) {
throw new Error(`Invalid migration filename: ${name}`);
}
const version = match[1];
const direction = match[2] as Direction;
const sql = await Bun.file(path).text();
const migration: MigrationFile = {
version,
direction,
path,
checksum: checksum(sql),
};
const current = byVersion.get(version) ?? { version };
if (direction === "up") {
if (current.up) {
throw new Error(`Duplicate up migration for ${version}`);
}
current.up = migration;
} else {
if (current.down) {
throw new Error(`Duplicate down migration for ${version}`);
}
current.down = migration;
}
byVersion.set(version, current);
}
return byVersion;
}
async function ensureTable(dbClient: SQL): Promise<void> {
await dbClient.unsafe(`
CREATE TABLE IF NOT EXISTS schema_migrations (
version TEXT PRIMARY KEY,
checksum TEXT NOT NULL,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
`);
}
async function getApplied(dbClient: SQL): Promise<Applied[]> {
const rows = await dbClient.unsafe(`
SELECT version, checksum, applied_at
FROM schema_migrations
ORDER BY version ASC;
`);
return rows as Applied[];
}
function validate(applied: Applied[], files: Map<string, IncompleteMigration>): void {
for (const entry of applied) {
const file = files.get(entry.version);
if (!file?.up) {
throw new Error(`Applied migration ${entry.version} missing from disk`);
}
if (entry.checksum !== file.up.checksum) {
throw new Error(`Checksum mismatch for ${entry.version}`);
}
}
}
function completePairs(files: Map<string, IncompleteMigration>): MigrationPair[] {
const versions = [...files.keys()].sort();
const pairs: MigrationPair[] = [];
for (const version of versions) {
const pair = files.get(version);
if (!pair?.up || !pair.down) {
throw new Error(`Both .up.sql and .down.sql required for ${version}`);
}
pairs.push({ version, up: pair.up, down: pair.down });
}
return pairs;
}
async function withLock<T>(dbClient: SQL, fn: () => Promise<T>): Promise<T> {
const [result] = await dbClient`
SELECT pg_try_advisory_lock(${LOCK_ID}) AS locked
`;
if (!result?.locked) {
throw new Error("Another migration is running");
}
try {
return await fn();
} finally {
await dbClient`SELECT pg_advisory_unlock(${LOCK_ID})`;
}
}
export async function create(name: string, options: RunnerOptions = {}): Promise<string> {
const slug = slugify(name);
if (!slug) {
throw new Error("Migration name must contain at least one alphanumeric character");
}
const migrationsDir = resolveMigrationsDir(options);
await mkdir(migrationsDir, { recursive: true });
const version = `${utcTimestamp()}_${slug}`;
const upPath = join(migrationsDir, `${version}.up.sql`);
const downPath = join(migrationsDir, `${version}.down.sql`);
if (await Bun.file(upPath).exists() || await Bun.file(downPath).exists()) {
throw new Error(`Migration already exists: ${version}`);
}
await writeFile(upPath, "-- Write forward migration SQL here.\n", {
encoding: "utf8",
flag: "wx",
});
await writeFile(downPath, "-- Write rollback migration SQL here.\n", {
encoding: "utf8",
flag: "wx",
});
logMessage(options, `Created ${upPath}`);
logMessage(options, `Created ${downPath}`);
return version;
}
export async function up(
options: RunnerOptions & { count?: number } = {},
): Promise<string[]> {
return withDb(options, async (dbClient, migrationsDir) => {
return withLock(dbClient, async () => {
await ensureTable(dbClient);
const filesByVersion = await discoverMigrations(migrationsDir);
const applied = await getApplied(dbClient);
validate(applied, filesByVersion);
const files = completePairs(filesByVersion);
const appliedSet = new Set(applied.map((entry) => entry.version));
const pending = files.filter((pair) => !appliedSet.has(pair.version));
const target = options.count ? pending.slice(0, options.count) : pending;
if (target.length === 0) {
logMessage(options, "No pending migrations.");
return [];
}
for (const migration of target) {
await dbClient.begin(async (tx) => {
await tx.file(migration.up.path);
await tx`
INSERT INTO schema_migrations (version, checksum)
VALUES (${migration.version}, ${migration.up.checksum})
`;
});
logMessage(options, `Applied ${migration.version}`);
}
logMessage(options, `Applied ${target.length} migration(s).`);
return target.map((migration) => migration.version);
});
});
}
export async function down(
options: RunnerOptions & { count?: number } = {},
): Promise<string[]> {
const count = options.count ?? 1;
if (!Number.isInteger(count) || count <= 0) {
throw new Error(`Invalid count: ${count}`);
}
return withDb(options, async (dbClient, migrationsDir) => {
return withLock(dbClient, async () => {
await ensureTable(dbClient);
const filesByVersion = await discoverMigrations(migrationsDir);
const applied = await getApplied(dbClient);
validate(applied, filesByVersion);
const files = completePairs(filesByVersion);
const completeByVersion = new Map(files.map((pair) => [pair.version, pair]));
const targets = applied.slice(-count).reverse();
if (targets.length === 0) {
logMessage(options, "No applied migrations to roll back.");
return [];
}
for (const target of targets) {
const migration = completeByVersion.get(target.version);
if (!migration) {
throw new Error(`Applied migration ${target.version} missing from disk`);
}
await dbClient.begin(async (tx) => {
await tx.file(migration.down.path);
await tx`DELETE FROM schema_migrations WHERE version = ${target.version}`;
});
logMessage(options, `Rolled back ${target.version}`);
}
logMessage(options, `Rolled back ${targets.length} migration(s).`);
return targets.map((target) => target.version);
});
});
}
export async function status(options: RunnerOptions = {}): Promise<StatusResult> {
return withDb(options, async (dbClient, migrationsDir) => {
return withLock(dbClient, async () => {
await ensureTable(dbClient);
const filesByVersion = await discoverMigrations(migrationsDir);
const applied = await getApplied(dbClient);
validate(applied, filesByVersion);
const files = completePairs(filesByVersion);
const appliedVersions = applied.map((entry) => entry.version);
const appliedSet = new Set(appliedVersions);
const pending = files
.map((pair) => pair.version)
.filter((version) => !appliedSet.has(version));
return { applied: appliedVersions, pending };
});
});
}
export async function runCli(argv: string[] = process.argv.slice(2)): Promise<void> {
const [command, ...args] = argv;
const cliOptions: RunnerOptions = { logger: (message) => console.log(message) };
if (command === "create") {
if (args.length !== 1) {
throw new Error(usage());
}
await create(args[0], cliOptions);
return;
}
if (command === "up") {
if (args.length > 1) {
throw new Error(usage());
}
const count = args[0] ? parseCount(args[0]) : undefined;
await up({ ...cliOptions, count });
return;
}
if (command === "down") {
if (args.length > 1) {
throw new Error(usage());
}
const count = args[0] ? parseCount(args[0]) : 1;
await down({ ...cliOptions, count });
return;
}
if (command === "status") {
if (args.length !== 0) {
throw new Error(usage());
}
const result = await status(cliOptions);
console.log("Applied:");
if (result.applied.length === 0) {
console.log(" (none)");
} else {
for (const version of result.applied) {
console.log(` ${version}`);
}
}
console.log("Pending:");
if (result.pending.length === 0) {
console.log(" (none)");
} else {
for (const version of result.pending) {
console.log(` ${version}`);
}
}
return;
}
throw new Error(usage());
}
if (import.meta.main) {
void runCli().catch((error) => {
console.error(error instanceof Error ? error.message : String(error));
process.exitCode = 1;
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment