Skip to content

Instantly share code, notes, and snippets.

@polRk
Last active January 23, 2025 05:15
Show Gist options
  • Save polRk/a9d3631c6f710ad980a131ded1c1e479 to your computer and use it in GitHub Desktop.
Save polRk/a9d3631c6f710ad980a131ded1c1e479 to your computer and use it in GitHub Desktop.
YDB Butch Insert without data helpers
import {
Column,
Driver,
getCredentialsFromEnv,
getLogger,
Logger,
Session,
TableDescription,
Types,
Ydb,
} from 'ydb-sdk';
const TABLE_NAME = 'log_messages';
const BATCH_SIZE = 1000;
const now = Date.now();
function getLogBatch(offset: number) {
let messages = [];
for (let i = 0; i < BATCH_SIZE; i++) {
let message = Ydb.Value.create({
items: [
{
textValue: `App_${Math.trunc(i / 256)}`,
},
{
textValue: `192.168.0.${offset % 256}`,
},
{
// Timestamp in microseconds
uint64Value: (now + offset * 1000 + (i % 1000)) * 1000,
},
{
uint32Value: 200,
},
{
textValue: i % 2 === 0 ? 'GET / HTTP/1.1' : 'GET /images/logo.png HTTP/1.1',
},
],
});
messages.push(message);
}
// It's like a table, but first, we describe the table header (column types).
// And then, push lists of values as rows of the table.
// Table in YDB represents as list of structs.
return Ydb.TypedValue.create({
type: {
// Define list (rows)
listType: {
item: {
// Define struct type (a row)
structType: {
members: [
{
name: 'app',
type: Types.UTF8,
},
{
name: 'host',
type: Types.UTF8,
},
{
name: 'timestamp',
type: Types.TIMESTAMP,
},
{
name: 'httpCode',
type: Types.UINT32,
},
{
name: 'message',
type: Types.UTF8,
},
],
},
},
},
},
// Values
value: {
items: messages,
},
});
}
async function createTable(session: Session, logger: Logger) {
logger.info('Dropping old table...');
await session.dropTable(TABLE_NAME);
logger.info('Creating table...');
await session.createTable(
TABLE_NAME,
new TableDescription()
.withColumn(new Column('app', Types.optional(Types.UTF8)))
.withColumn(new Column('timestamp', Types.optional(Types.TIMESTAMP)))
.withColumn(new Column('host', Types.optional(Types.UTF8)))
.withColumn(new Column('http_code', Types.optional(Types.UINT32)))
.withColumn(new Column('message', Types.optional(Types.UTF8)))
.withPrimaryKeys('app', 'timestamp', 'host'),
);
}
async function run(logger: Logger, connectionString: string) {
const authService = getCredentialsFromEnv();
logger.info('Driver initializing...');
const driver = new Driver({connectionString, authService});
const timeout = 10000;
if (!(await driver.ready(timeout))) {
logger.fatal(`Driver has not become ready in ${timeout}ms!`);
process.exit(1);
}
await driver.tableClient.withSession(async (session) => {
await createTable(session, logger);
for (let offset = 0; offset < 1000; offset++) {
const logs = getLogBatch(offset);
logger.info(`Write log batch with offset ${offset}`);
await session.bulkUpsert(TABLE_NAME, logs);
}
logger.info('Done');
});
await driver.destroy();
}
const logger = getLogger();
run(logger, process.env['YDB_CONNECTION_STRING'] || 'grpc://localhost:2136').catch((err) => {
logger.error(err);
process.exit(1);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment