Skip to content

Instantly share code, notes, and snippets.

@bsidhom
Last active April 27, 2026 23:29
Show Gist options
  • Select an option

  • Save bsidhom/86a033f4c2175072d338b60eab653fc2 to your computer and use it in GitHub Desktop.

Select an option

Save bsidhom/86a033f4c2175072d338b60eab653fc2 to your computer and use it in GitHub Desktop.
JavaScript TransformStream + generator example pipeline
import * as fs from "node:fs/promises";
// Simple JavaScript stream pipeline example to demonstrate idiomatic use. This
// particular example reads binary data from a words file, decodes as utf-8,
// splits it into lines, and then aggressively normalizes and retains just
// alphanumeric characters. The entire pipeline is wrapped in a generator which
// yields the final results while handling the file handle lifecycle under the
// hood.
const main = async () => {
const dictionary = new Set();
for await (const line of normalizedLines("/usr/share/dict/words")) {
if (line.length == 0) {
continue;
}
dictionary.add(line);
}
console.log(dictionary.size);
};
const normalizedLines = async function* (path) {
const handle = await fs.open(path, "r");
try {
// NOTE: We break the stream into lines before normalizing. This lets us run
// arbitrary normalization/stripping without breaking word separation (e.g.,
// space removal).
const stream = handle
.readableWebStream()
.pipeThrough(new TextDecoderStream("utf-8"))
.pipeThrough(lineStream())
.pipeThrough(normalizerStream());
const reader = stream.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
handle.close();
}
};
const lineStream = () => {
let s = "";
return new TransformStream({
start(controller) {},
transform(chunk, controller) {
// Precondition: `s` is a string and does not have an embedded newline.
// NOTE: The input chunks are typically around 16 KiB. By chunking to the
// line level, we're blowing up the number of promises passed through the
// stream (which translates into more generator handoffs above). If this
// is not acceptable for performance, multiple lines could be passed in
// each stream chunk (and generator result). Some naive benchmarks show
// that doing so and collapsing line splitting and normalization into a
// single transform stream gives about 4x throughput. However, it's less
// readable and reusable (since the chunking and normalization are
// coupled).
let i = 0;
while (true) {
const j = chunk.indexOf("\n", i);
if (j < 0) {
s += chunk.slice(i);
return;
}
// NOTE: j is the index of the next newline, which we want to _include_
// in the output. String slicing excludes the upper index.
const next = j + 1;
const c = chunk.slice(i, next);
i = next;
// WARNING: This technically does not respect backpressure in aggregate
// because TransformStreamDefaultController does not expose a .ready
// promise similar to that found in WritableStreamDefaultController. As
// such, there is no way to robustly and cleanly await resolution of
// .desiredSize if we're already above the high water mark. The stream
// designers apparently intended for each chunk to be transformed in
// full at each callback invocation. This works fine if the transform
// stream does not _expand_ chunks into substantially larger output data
controller.enqueue(s + c);
s = "";
}
},
flush(controller) {
if (s.length > 0) {
controller.enqueue(s);
s = "";
}
},
});
};
const normalizerStream = () =>
new TransformStream({
start(controller) {},
transform(chunk, controller) {
controller.enqueue(normalize(chunk));
},
flush(controller) {},
});
function normalize(s) {
return s
.normalize("NFKD")
.toLowerCase()
.normalize("NFKD") // Just in case pseudo "casefolding" breaks NFKD.
.replace(/ß|/g, "ss") // German hard s casefold gap
.replace(/ς/g, "σ") // Greek final sigma casefold gap
.replace(/[^\p{L}\p{N}]/gu, ""); // Aggressively strip non-alphanumeric characters
}
main().catch(console.log);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment