The JsonLineParser
class implements a transform stream (using Node.js v20.2.0's Stream API) to parse a file that contains JSON objects separated by newlines (See data.ndjson
). It's adapted from Young and Harter's Node.js in Practice book, especifically Technique 31: Implementing a readable stream.
Last active
June 4, 2023 21:12
-
-
Save uzluisf/4774bb39a5d2db131393c0b48d43678b to your computer and use it in GitHub Desktop.
Node.js Transform Stream Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"id":1,"name":"O Brother, Where Art Thou?"} | |
{"id":2,"name":"Home for the Holidays"} | |
{"id":3,"name":"The Firm"} | |
{"id":4,"name":"Broadcast News"} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs'); | |
const JsonLineParser = require('./parser'); | |
const filename = './data.ndjson'; | |
const readStream = fs.createReadStream(filename, { highWaterMark: 10 }); | |
const parser = new JsonLineParser(); | |
readStream.pipe(parser) | |
.on('object', (obj) => { | |
console.log(obj.name) | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
class JsonLineParser extends stream.Transform { | |
constructor(options) { | |
options = options || {}; | |
options.encoding = "utf8"; | |
super(options); | |
this._buffer = ''; | |
this._DELIMITER = '\n'; | |
} | |
_transform(chunk, enc, cb) { | |
try { | |
this._processLine(chunk.toString('utf8')); | |
cb(); | |
} | |
catch (err) { | |
cb(err); | |
} | |
} | |
_processLine(chunk) { | |
this._buffer += chunk; | |
while (this._buffer.includes(this._DELIMITER)) { | |
const lineIndex = this._buffer.indexOf(this._DELIMITER); | |
if (lineIndex !== -1) { | |
const line = this._buffer.slice(0, lineIndex).trim(); | |
if (line) { | |
const result = JSON.parse(line); | |
this._buffer = this._buffer.slice(lineIndex + 1); | |
this.emit('object', result); | |
this.push(JSON.stringify(result)); | |
} | |
else { | |
this._buffer = this._buffer.slice(1); | |
} | |
} | |
} | |
} | |
} | |
module.exports = JsonLineParser; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment