Skip to content

Instantly share code, notes, and snippets.

@hartikainen
Last active November 22, 2024 03:48
Show Gist options
  • Save hartikainen/60a4d438f00804554645a4815ad74387 to your computer and use it in GitHub Desktop.
Save hartikainen/60a4d438f00804554645a4815ad74387 to your computer and use it in GitHub Desktop.
Create an arrow-js table from csv file using node streams
const assert = require('assert');
const fs = require('fs');
const stream = require('stream');
const _ = require('lodash');
const fastCsv = require('fast-csv');
const { metrohash64 } = require('metrohash');
const {
Bool, Utf8, Int32, Int64, Float32, Float64, Struct, Dictionary,
Field, Builder, RecordBatch, RecordBatchWriter, RecordBatchReader,
Table,
} = require('apache-arrow');
const CSV_PATH = null;
const BUILDER_OPTIONS_BY_RAW_TYPE = {
'int32': { type: new Int32() },
'float32': { type: new Float32() },
'bool': { type: new Bool() },
'string': {
type: new Dictionary(new Utf8(), new Int32()),
dictionaryHashFunction: metrohash64,
}
}
function isInteger(value) {
const intValue = parseInt(value);
const floatValue = parseFloat(value);
return (_.isFinite(intValue)
&& _.isFinite(floatValue)
&& (floatValue === intValue))
}
function isFloat(value) {
return _.isFinite(parseFloat(value))
}
const isBoolean = _.isBoolean;
async function inferArrowColumnType(column) {
const type = null;
const values = column.toArray();
if (values.every(isInteger)) {
return 'int32'
}
if (values.every(isFloat)) {
return 'float32'
}
if (values.every(isBoolean)) {
return 'bool'
}
return 'string'
}
async function inferArrowTableTypes(table) {
if (!Array.isArray(table.schema.fields)) {
throw {
name: "NotImplementedError",
message: "NOTE(hartikainen): Currently only supports flat tables.",
};
}
const columnNames = Array.from(
{length: table.numCols}, (x, i) => table.getColumnAt(i).name)
const tableTypesList = await Promise.all(columnNames.map(
(name) => inferArrowColumnType(table.getColumn(name))));
const tableTypes = Object.fromEntries(_.zip(columnNames, tableTypesList))
return tableTypes
}
async function typifyTable(table) {
const inferredTypes = await inferArrowTableTypes(table);
const { childFields, children } = _.reduce(
inferredTypes,
(memo, columnType, columnName) => {
const { type, ...childBuilderOptions } = BUILDER_OPTIONS_BY_RAW_TYPE[columnType];
memo.childFields.push(new Field(columnName, type, true));
memo.children[columnName] = childBuilderOptions;
return memo;
},
{ childFields: [], children: {} })
const type = new Struct(childFields);
const builderTransform = Builder.throughAsyncIterable({
type,
children,
// flush chunks once their size grows beyond 64kb
queueingStrategy: 'bytes',
highWaterMark: 1 << 16,
nullValues: [null, undefined, 'n/a', 'NULL'],
});
const typeConversionStream = stream.pipeline(
stream.Readable.from(table),
builderTransform,
new stream.Transform({
objectMode: true,
transform(chunk, encoding, callback) {
return callback(
null, RecordBatch.new(chunk.data.childData, chunk.type.children));
},
}),
RecordBatchWriter.throughNode(),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
}
}
)
const typifiedTable = await Table.from(typeConversionStream);
return typifiedTable
}
async function csvToArrowTable(filePath) {
const csvToJSONStream = fastCsv.parseFile(
filePath, { headers: true, maxRows: 1 })
let firstRow;
for await (row of csvToJSONStream) {
assert(firstRow === undefined);
firstRow = row;
}
assert(firstRow !== undefined);
const header = Object.keys(firstRow);
const { type, ...otherBuilderOptions } = jsValueToArrowBuilderOptions(firstRow);
const builderTransform = Builder.throughAsyncIterable({
type,
...otherBuilderOptions,
// flush chunks once their size grows beyond 64kb
queueingStrategy: 'bytes',
highWaterMark: 1 << 16,
nullValues: [null, undefined, 'n/a', 'NULL'],
});
const csvToArrowStream = stream.pipeline(
fastCsv.parseFile(filePath, { headers: true, }),
builderTransform,
new stream.Transform({
objectMode: true,
transform(chunk, encoding, callback) {
return callback(
null, RecordBatch.new(chunk.data.childData, chunk.type.children));
},
}),
RecordBatchWriter.throughNode(),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
}
}
)
const table = await Table.from(csvToArrowStream);
const typifiedTable = await typifyTable(table);
return typifiedTable
}
function jsValueToArrowBuilderOptions(value) {
if (value) {
// console.log(value, typeof value);
switch (typeof value) {
case 'bigint':
return { type: new Int64() };
case 'boolean':
return { type: new Bool() };
case 'number':
return { type: new Float64() };
case 'string':
return {
type: new Dictionary(new Utf8(), new Int32()),
dictionaryHashFunction: metrohash64
};
case 'object':
const { childFields, childBuilderOptions } = Object.keys(value).reduce((memo, name) => {
const { type, ...childBuilderOptions } = jsValueToArrowBuilderOptions(value[name]);
if (type) {
memo.childBuilderOptions.push(childBuilderOptions);
memo.childFields.push(new Field(name, type, true));
}
return memo;
}, { childFields: [], childBuilderOptions: [] });
if (Array.isArray(value)) {
return { type: new Struct(childFields), children: childBuilderOptions };
}
return {
type: new Struct(childFields),
children: childBuilderOptions.reduce((children, childBuilderOptions, childIndex) => ({
...children, [childFields[childIndex].name]: childBuilderOptions
}), {})
};
}
}
return {};
}
csvToArrowTable(CSV_PATH);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment