Skip to content

Instantly share code, notes, and snippets.

@thecatontheflat
Last active May 20, 2023 20:07
Show Gist options
  • Save thecatontheflat/316aee98aed3156a7104ddd81ef87824 to your computer and use it in GitHub Desktop.
Save thecatontheflat/316aee98aed3156a7104ddd81ef87824 to your computer and use it in GitHub Desktop.
const { Client } = require('pg');
const losslessJson = require('lossless-json');
const SLOT_NAME = process.env.SLOT_NAME;
const SCHEMA_NAME = 'public';
const SHARDS_MAPPING = {};
async function processInsert(targetClient, change, tableName) {
let i = 1;
const valuesTokens = [];
for (const value of change.columnvalues) {
valuesTokens.push(`$${i}`);
i++;
}
const sql = `INSERT INTO ${change.schema}.${tableName} (${change.columnnames.join(',')}) VALUES (${valuesTokens.join(',')})`;
console.log(sql);
await targetClient.query(sql, change.columnvalues);
}
async function processDelete(targetClient, change, tableName) {
let i = 1;
const valuesTokens = [];
for (const value of change.oldkeys.keyvalues) {
valuesTokens.push(`$${i}`);
i++;
}
const sql = `DELETE FROM ${change.schema}.${tableName} WHERE (${change.oldkeys.keynames.join(',')}) IN ((${valuesTokens.join(',')}))`;
console.log(sql);
await targetClient.query(sql, change.oldkeys.keyvalues);
}
async function processUpdate(targetClient, change, tableName) {
let i = 1;
const oldValuesTokens = [];
const setValues = [];
const setsArray = [];
for (const idx in change.columnvalues) {
const colName = change.columnnames[idx];
if (change.pk.pknames.includes(colName)) continue;
setsArray.push(`${colName} = $${i}`);
setValues.push(change.columnvalues[idx]);
i++;
}
for (const value of change.oldkeys.keyvalues) {
oldValuesTokens.push(`$${i}`);
i++;
}
const sql = `UPDATE ${change.schema}.${tableName} SET ${setsArray.join(',')} WHERE (${change.oldkeys.keynames.join(',')}) IN ((${oldValuesTokens.join(',')}))`;
console.log(sql, [...setValues, ...change.oldkeys.keyvalues]);
await targetClient.query(sql, [...setValues, ...change.oldkeys.keyvalues]);
}
async function run(sourceClient, targetClient, isWorker) {
const result = await sourceClient.query(`SELECT * FROM pg_logical_slot_peek_changes('${SLOT_NAME}', NULL, NULL, 'include-pk', '1')`);
console.log(`Fetched ${result.rows.length} rows from ${isWorker ? 'worker' : 'coordinator'}`);
for (const row of result.rows) {
const { lsn, xid, data } = row;
const parsedData = losslessJson.parse(data, undefined, (value) => value);
for (const change of parsedData.change) {
if (change.schema !== SCHEMA_NAME) continue;
const tableName = isWorker ? SHARDS_MAPPING[change.table] : change.table;
switch (change.kind) {
case 'insert':
await processInsert(targetClient, change, tableName);
break;
case 'delete':
await processDelete(targetClient, change, tableName);
break;
case 'update':
await processUpdate(targetClient, change, tableName);
break;
default:
console.error('NOT IMPLEMENTED', change.kind);
console.log(change);
process.exit();
}
}
console.log('All consumed, advancing', lsn);
await sourceClient.query(`SELECT * FROM pg_replication_slot_advance('${SLOT_NAME}', $1)`, [lsn]);
}
}
async function wait(ms) {
return new Promise((res, rej) => {
setTimeout(res, ms);
});
}
const sourceWorkerClients = [];
const connectionStrings = process.env.SOURCE_WORKERS_DATABASE_URLS.split(' ');
for (const connectionString of connectionStrings) {
const sourceClient = new Client({
connectionString,
ssl: {
rejectUnauthorized: false,
},
});
sourceWorkerClients.push(sourceClient);
}
const sourceClient = new Client({
connectionString: process.env.SOURCE_DATABASE_URL,
ssl: {
rejectUnauthorized: false,
},
});
const targetClient = new Client({
connectionString: process.env.TARGET_DATABASE_URL,
ssl: {
rejectUnauthorized: false,
},
});
Promise.all([
sourceWorkerClients.map((client) => client.connect()),
sourceClient.connect(),
targetClient.connect(),
]).then(async () => {
const shardsResult = await sourceClient.query(`SELECT table_name, shard_name, citus_table_type FROM citus_shards`);
for (const row of shardsResult.rows) {
SHARDS_MAPPING[row['shard_name']] = row['table_name'];
}
while (true) {
await run(sourceClient, targetClient, false);
for (const client of sourceWorkerClients) {
await run(client, targetClient, true);
}
await wait(1000);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment