Skip to content

Instantly share code, notes, and snippets.

@boltzj
Created November 8, 2016 17:44
Show Gist options
  • Save boltzj/f03aef5465772e20531a95cc5664d1af to your computer and use it in GitHub Desktop.
Save boltzj/f03aef5465772e20531a95cc5664d1af to your computer and use it in GitHub Desktop.
var fs = require('fs');
var StringDecoder = require('string_decoder').StringDecoder;
var Transform = require('stream').Transform;
var util = require('util');
util.inherits(DatasetParser, Transform);
function DatasetParser(options) {
if (!(this instanceof DatasetParser))
return new DatasetParser(options);
var self = this;
options.objectMode = true; //activate objectMode
Transform.call(this, options);
self._decoder = new StringDecoder('utf8');
// options
if (!options || !options.delimiter) {
throw new Error('Delimiter missing.')
}
self._delimiter = options.delimiter;
self._offset = parseInt(options.offset) || 0;
self._limit = parseInt(options.limit) || 100;
// Chunk read
self._buffer = '';
// Buffer split by lines
self._current = [];
// Chunk last line
self._chunkLastLine = 0;
// Json result
self._grid = {
// Columns
header: [],
// Rows
rows: []
};
}
DatasetParser.prototype._bufferize = function (chunk) {
// Append chunk to previous buffer
this._buffer += this._decoder.write(chunk);
// Split buffer with EOF
this._current = this._buffer.split(/\r\n|\r|\n/g);
// Stop buffer to last full line
this._chunkLastLine += this._current.length - 1;
};
DatasetParser.prototype._transform = function (chunk, encoding, callback) {
// Parse chunk in a split buffer
this._bufferize(chunk);
// Use current as buffer
var buffer = this._current;
// Lines already read = previously read lines and just read
this.linesRead = this._chunkLastLine - this._current.length;
// Extract header for the first line of the file (and first chunk)
if (this.linesRead === -1) {
// FIXME: Check outbound here !
this._grid.header = buffer[0].split(this._delimiter);
buffer.shift();
this.linesRead = 0;
}
var currentBufferLine;
// Skip this chunk
if (this.linesRead + buffer.length <= this._offset) {
currentBufferLine = this.linesRead + buffer.length;
}
// Lines in current buffer need to be put in response
else {
// Go to the good line
currentBufferLine = this._offset - this.linesRead >= 0 ? this._offset - this.linesRead : 0;
// Number of the last line to parse
var lastLine = this._offset + this._limit;
// Number of line to parse in current chunk
// if last line is in this chunk, limit is on the last line
// else limit is number of line to read in this chunk (the last line is in another chunk)
var limit = (lastLine > this._chunkLastLine) ? (this._chunkLastLine - this._offset - 1) : (lastLine - this.linesRead);
// Lines read in this chunk
var currentLinesRead = 0;
// Limit for this buffer ('endOfBuffer' if need more data, or index of the last line to read)
var endOfBuffer = buffer.length - 1;
var distanceToLastLine = lastLine - this.linesRead;
var bufferMaxIndex = Math.min(endOfBuffer, distanceToLastLine);
while (currentBufferLine < bufferMaxIndex && currentLinesRead < limit) {
// Put split line in the response
this._grid.rows.push(buffer[currentBufferLine].split(this._delimiter));
// Increment current buffer index and processed lines count
currentBufferLine += 1;
currentLinesRead += 1;
}
}
// Add lines from this chunk to processed lines index
this.linesRead += currentBufferLine;
// Remove read lines from current and put rest in buffer (Next chunk will be append in the buffer)
this._current.splice(0, currentBufferLine);
this._buffer = this._current.join('\n');
// Send result if last line had been parsed
if (this._chunkLastLine > this._offset + this._limit) {
// Send data if it's done or end of file
this.push(this._grid);
this.emit('end');
}
callback();
};
DatasetParser.prototype._flush = function () {
this.push(this._grid);
this.emit('end');
};
exports.DatasetParser = DatasetParser;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment