|
import * as cbor from "https://deno.land/x/[email protected]/index.js"; |
|
import * as nats from "https://deno.land/x/[email protected]/src/mod.ts"; |
|
import * as zstd from "https://deno.land/x/[email protected]/deno/zstd.ts"; |
|
import * as cli from "https://deno.land/x/[email protected]/index.ts"; |
|
import { crayon } from "https://deno.land/x/[email protected]/mod.ts"; |
|
|
|
const argsParser = cli. |
|
args. |
|
describe('Marmot watcher'). |
|
with( |
|
cli.EarlyExitFlag('help', { |
|
describe: 'Show help', |
|
exit() { |
|
console.log(argsParser.help()); |
|
return Deno.exit(); |
|
}, |
|
}), |
|
). |
|
with( |
|
cli.PartialOption('compressed', { |
|
describe: "Stream messages are zstd compressed", |
|
alias: ["c"], |
|
default: "true", |
|
type: cli.values.Text |
|
}) |
|
). |
|
with( |
|
cli.PartialOption('username', { |
|
describe: "Username for NATS stream", |
|
alias: ["u"], |
|
default: undefined, |
|
type: cli.values.Text |
|
}) |
|
). |
|
with( |
|
cli.PartialOption('password', { |
|
describe: "Password for NATS stream", |
|
alias: ["p"], |
|
default: undefined, |
|
type: cli.values.Text |
|
}) |
|
). |
|
with( |
|
cli.PartialOption('servers', { |
|
describe: "NATS server URL", |
|
alias: ["s"], |
|
default: "localhost:4222", |
|
type: cli.values.Text |
|
}) |
|
). |
|
with( |
|
cli.PartialOption('prefix', { |
|
describe: "Prefix for marmot subjects", |
|
default: "marmot-change-log-", |
|
type: cli.values.Text |
|
}) |
|
); |
|
|
|
interface MarmotPublishedRow { |
|
FromNodeId: number; |
|
Payload: { |
|
Id: number; |
|
Type: "insert" | "update" | "delete"; |
|
TableName: string; |
|
|
|
// deno-lint-ignore no-explicit-any |
|
Row: {[ColumnName: string]: any} |
|
}; |
|
} |
|
|
|
export class MarmotStreamListener { |
|
#nc: nats.NatsConnection | null = null; |
|
#jsm: nats.JetStreamManager | null = null; |
|
|
|
async connect(serverConfig: nats.ConnectionOptions) { |
|
this.#nc = await nats.connect(serverConfig); |
|
this.#jsm = await this.#nc.jetstreamManager(); |
|
} |
|
|
|
async subjects(prefix: string): Promise<string[]> { |
|
const streams = await this.#jsm?.streams.list().next(); |
|
if (!streams) { |
|
return []; |
|
} |
|
|
|
return streams.map(c => c.config.subjects). |
|
filter(s => s.length == 1 && s[0].startsWith(prefix)). |
|
flatMap(s => s); |
|
} |
|
|
|
async streams(): Promise<nats.StreamInfo[] | undefined> { |
|
return await this.#jsm?.streams.list().next(); |
|
} |
|
|
|
async subscribe(subj: string): Promise<nats.Subscription | undefined> { |
|
return await this.#nc?.subscribe(subj); |
|
} |
|
} |
|
|
|
function onStreamMessage(m: nats.Msg) { |
|
try { |
|
let data: Uint8Array|ArrayBuffer = m.data; |
|
if (cliArgs.value?.compressed === "true") { |
|
data = zstd.decompress(data); |
|
} |
|
|
|
const payload = cbor.decode(data) as MarmotPublishedRow; |
|
const tupleString = Object.entries(payload.Payload.Row).sort((a, b) => { |
|
if (a[0] < b[0]) return -1; |
|
if (a[0] > b[0]) return 1; |
|
return 0; |
|
}).map(([k, v]) => { |
|
if (typeof v === 'bigint') { |
|
v = v.toString(); |
|
} |
|
|
|
return `${k}=${JSON.stringify(v)}`; |
|
}).join(" "); |
|
|
|
console.log( |
|
crayon.cyan("🖥️ ", payload.FromNodeId), |
|
crayon.lightBlue("📨", m.subject), |
|
crayon.lightGreen("⚡", payload.Payload.TableName), |
|
crayon.lightRed("🔹", payload.Payload.Type), |
|
crayon.lightWhite("▶️ ", tupleString), |
|
); |
|
} catch(e) { |
|
console.error(e); |
|
} |
|
} |
|
|
|
|
|
const cliArgs = argsParser.parse(Deno.args); |
|
async function main() { |
|
const natsConfig = { |
|
user: cliArgs.value?.username, |
|
pass: cliArgs.value?.password, |
|
servers: cliArgs.value?.servers.split(",").map(s => s.trim()) || "localhost:4222", |
|
}; |
|
|
|
console.log(" 🚀 Connecting to server ", natsConfig); |
|
const mst = new MarmotStreamListener(); |
|
await mst.connect(natsConfig); |
|
|
|
const subs = await mst.subjects(cliArgs.value?.prefix || "marmot-change-log"); |
|
console.log(crayon.lightYellow(" 🤙 Starting subscription"), subs); |
|
const promises = subs.map(sub => mst.subscribe(sub)).map(async sub => { |
|
const strm = await sub; |
|
|
|
if (!strm) { |
|
return () => Promise.reject(); |
|
} |
|
|
|
return (async () => { |
|
for await (const m of strm) { |
|
onStreamMessage(m); |
|
} |
|
})(); |
|
}); |
|
|
|
Promise.any(promises); |
|
} |
|
|
|
// Learn more at https://deno.land/manual/examples/module_metadata#concepts |
|
if (import.meta.main) { |
|
if (cliArgs.error) { |
|
console.error('Failed to parse CLI arguments'); |
|
console.error(cliArgs.error.toString()); |
|
Deno.exit(1); |
|
} |
|
|
|
await zstd.init(); |
|
await main(); |
|
} |