Skip to content

Instantly share code, notes, and snippets.

@jasocox
Created May 22, 2019 13:32
Show Gist options
  • Save jasocox/6190d68f679e40364ee0a5bbbd0fb0c9 to your computer and use it in GitHub Desktop.
Save jasocox/6190d68f679e40364ee0a5bbbd0fb0c9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env node
const { Readable } = require("stream");
const { createReadStream, createWriteStream } = require("fs");
const readline = require("readline");
const DATA_MAX = 10000;
const DURATION_MAX = 10000;
function genDataStreamInner(dataMax, count, output) {
if (count < dataMax) {
var dataObj = {
id: count,
name: "Data Object " + count,
val: count
};
console.log("Generated:", dataObj);
output.push(JSON.stringify(dataObj) + "\n");
setTimeout(genDataStreamInner, 0, dataMax, count + 1, output);
}
else {
console.log("Done generating data");
output.push(null);
}
}
function genDataStream(dataMax) {
var ret = new Readable({read() {}});
genDataStreamInner(dataMax, 0, ret);
return ret;
}
function genDataFile(filename, dataStream) {
var file = createWriteStream(filename);
dataStream.pipe(file);
dataStream.on("end", () => {
file.end();
});
return file;
}
// Test data stream
var dataStream = genDataStream(DATA_MAX);
dataStream.on("end", () => {
console.log("Done with data source");
});
// Output Test Data
//dataStream.pipe(process.stdout);
// Write Test Data to file
var testFile = genDataFile("TESTING.txt", dataStream);
testFile.on("close", () => {
console.log("Done writing test file");
});
// Read X at a time
function readXLinesAtATime(x, dataStream) {
var ret = new Readable({read() {}});
var processing = 0;
var paused = false;
var rl = readline.createInterface({
input: dataStream
});
var pause = () => {
console.log("!!! Paused !!! Processing:", processing);
paused = true;
rl.pause();
};
var unpause = () => {
console.log("!!! Unpaused !!! Processing:", processing);
paused = false;
rl.resume();
};
var done = () => {
processing -= 1;
console.log("DONE: processing:", processing);
if (paused && processing < (x * 0.8)) unpause();
};
rl.on('line', line => {
processing += 1;
console.log("Received:", line);
console.log("Currently Processing:", processing);
processLine(line, done);
if (!paused && processing >= x) pause();
});
}
function processLine(line, done) {
setTimeout(() => {
console.log("Processed:", line);
done();
}, Math.floor(Math.random() * DURATION_MAX));
}
//testFile.on("close", startReading);
setTimeout(startReading, 50);
function startReading() {
var throttled = readXLinesAtATime(
50,
createReadStream("TESTING.txt", {
highWaterMark: 1
})
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment