Skip to content

Instantly share code, notes, and snippets.

@marcogrcr
Last active May 7, 2025 20:35
Show Gist options
  • Save marcogrcr/37062b4df6fad8f17a1cada9b52d4018 to your computer and use it in GitHub Desktop.
Save marcogrcr/37062b4df6fad8f17a1cada9b52d4018 to your computer and use it in GitHub Desktop.
Streams JSON objects from a Readable in a memory-efficient manner
import type { Readable } from "node:stream";
export interface StreamJsonObjectsInput {
/** The {@link Readable} to extract JSON objects from. */
readonly readable: Readable;
}
export interface StreamJsonObjectsOutput<T> {
/** The parsed JSON object. */
readonly object: T;
/** The raw JSON object. */
readonly raw: Uint8Array;
}
/** The {@link TextDecoder} used for decoding data. */
const TEXT_DECODER = new TextDecoder();
/** The `UTF-8` representation of the `{` character. */
const JSON_OBJECT_BEGIN = 0x7b;
/** The `UTF-8` representation of the `}` character. */
const JSON_OBJECT_END = 0x7d;
/** The `UTF-8` representation of the `"` character. */
const JSON_STRING_DELIMITER = 0x22;
/** The `UTF-8` representation of the `\` character. */
const JSON_STRING_ESCAPE = 0x5c;
/**
* Reads a {@link Readable} that yields {@link Uint8Array} or `string` chunks and streams encountered JSON objects.
* @example
* async function* data() {
* yield '[{"foo":"bar"},{"foo":';
* yield Buffer.from('"baz"}]');
* }
*
* // prints:
* // { foo: "bar" }
* // { foo: "baz" }
* for await (const { object } of streamJsonObjects({ readable: Readable.from(data()) })) {
* console.log(object);
* }
*/
export async function* streamJsonObjects<T>(
input: StreamJsonObjectsInput,
): AsyncGenerator<StreamJsonObjectsOutput<T>> {
/** Counts curly braces: increases on `{`, decreases on `}`. */
let braceCount = 0;
/** Indicates whether the parsing is inside a JSON string. */
let inString = false;
/** If the previous chunk was processed before finding a complete JSON object, this will contain the "leftover". */
const incompleteChunks: Uint8Array[] = [];
for await (const c of input.readable) {
/** Indicates where to start a `Buffer.subarray()` call to obtain a `JSON` object string. */
let indexStart = 0;
if (!(c instanceof Uint8Array) && typeof c !== "string") {
throw new Error("Readable must provide Uint8Array or string chunks.");
}
const chunk = c instanceof Uint8Array ? c : Buffer.from(c);
for (let i = 0; i < chunk.length; ++i) {
switch (true) {
case !inString && chunk[i] === JSON_OBJECT_BEGIN: {
++braceCount;
// encountered first `{`: start subarray on this index
if (braceCount === 1) {
indexStart = i;
}
break;
}
case !inString && chunk[i] === JSON_OBJECT_END: {
--braceCount;
/**
* Example:
* ```
* const chunks = [
* '[{"hello":"world"},{"foo":',
* '"bar"}]',
* ];
* ```
* First execution:
* - `incompleteChunks` is `[]`
* - `indexStart` is `1`
* - `i` is `17`
* - `subArray` contains `{"hello":"world"}`
* - `json` is equal to `subArray`
*
* Second execution:
* - `incompleteChunks[0]` is `{"foo":`
* - `indexStart` is `0`
* - `i` is `5`
* - `subArray` contains `"bar"}`
* - `json` contains `{"foo":"bar"}`
*/
if (braceCount <= 0) {
const subArray = chunk.subarray(indexStart, i + 1);
const json =
incompleteChunks.length > 0
? Buffer.concat([...incompleteChunks, subArray])
: subArray;
// encountered a `}` without a corresponding `{`: throw error
if (braceCount < 0) {
throw new Error(
`Uneven number of curly braces: ${TEXT_DECODER.decode(json)}`,
);
}
// we've encountered the `}` corresponding to the first `{`: return the JSON object
try {
const object = JSON.parse(TEXT_DECODER.decode(json));
yield { object, raw: json };
} catch {
throw new Error(
`Invalid JSON object: ${TEXT_DECODER.decode(json)}`,
);
}
// remove all incomplete chunks
incompleteChunks.splice(0, incompleteChunks.length);
}
break;
}
case braceCount > 0 && chunk[i] === JSON_STRING_DELIMITER: {
// check if delimiter was escaped
if (inString) {
const escapedInSameChunk =
i > 0 && chunk[i - 1] === JSON_STRING_ESCAPE;
const escapedInPreviousChunk =
!escapedInSameChunk &&
i === 0 &&
incompleteChunks.length > 0 &&
incompleteChunks.at(-1)?.at(-1) === JSON_STRING_ESCAPE;
if (escapedInSameChunk || escapedInPreviousChunk) {
break;
}
}
// if delimiter was not escaped: enter or exit string mode
inString = !inString;
break;
}
}
}
if (braceCount > 0) {
/**
* If we're here it means we ran out of `chunk` bytes before we found a complete JSON object.
* For example:
* ```
* const chunks = [
* '[{"hello":"world"},{"foo":',
* '"bar"}]',
* ];
* ```
* After processing the first chunk:
* - `indexStart` is `20`
* - `subArray` contains `{"foo":`
*/
const subArray = chunk.subarray(indexStart);
incompleteChunks.push(subArray);
}
}
if (incompleteChunks.length) {
throw new Error(
`Incomplete JSON object: ${incompleteChunks.map((c) => TEXT_DECODER.decode(c)).join("")}}`,
);
}
}
import { Readable } from "node:stream";
import { describe, expect, it } from "vitest";
import { streamJsonObjects } from "../../../src/util/stream-json-objects";
function createReadable(...data: unknown[]) {
return Readable.from(
(async function* () {
for (const datum of data) {
yield datum;
}
})(),
);
}
describe("streamJsonObject", () => {
describe("with JSON objects", () => {
it("yields encountered JSON objects", async () => {
const readable = createReadable(
// chunk 1: noise, complete object, noise, partial object
'noise { "foo": "1" } noise { "foo":',
// chunk 2: finish partial object, noise
Buffer.from(' "2" } noise'),
// chunk 3: noise, complete object, noise
new TextEncoder().encode('noise { "foo": "3" } noise'),
// chunk 4: partial object
'{ "foo":',
// chunk 5: partial object
' "4" ',
// chunk 6: finish partial object
"}",
// chunk 7: closing curly braces inside string
'{ "foo": "5}" }',
// chunk 8: noise with a double quote, complete object with escaped double quotes
'"noise { "foo": "6\\"}" }',
// chunk 9: partial object with escaped character
'{ "foo": "7\\',
// chunk 10: complete object with escaped double quotes
'"}" }',
);
const decoder = new TextDecoder();
const actual = [];
for await (const output of streamJsonObjects({ readable })) {
actual.push(output);
}
expect(
actual.map(({ object, raw }) => ({
object,
json: decoder.decode(raw),
})),
).toStrictEqual(
["1", "2", "3", "4", "5}", '6"}', '7"}'].map((x) => ({
object: { foo: x },
json: `{ "foo": ${JSON.stringify(x)} }`,
})),
);
});
});
describe("without JSON objects", () => {
it("yields no JSON objects", async () => {
const readable = createReadable("noise", "noise");
const actual = [];
for await (const output of streamJsonObjects({ readable })) {
actual.push(output);
}
expect(actual).toHaveLength(0);
});
});
describe("invalid chunks", () => {
it("throws error", async () => {
const readable = createReadable(1, 2, 3);
const actual = streamJsonObjects({ readable }).next();
await expect(actual).rejects.toThrow(
"Readable must provide Uint8Array or string chunks.",
);
});
});
describe("uneven closing brackets", () => {
it("throws error", async () => {
const readable = createReadable('"foo": "bar" }');
const actual = streamJsonObjects({ readable }).next();
await expect(actual).rejects.toThrow(
'Uneven number of curly braces: "foo": "bar" }',
);
});
});
describe("invalid JSON object", () => {
it("throws error", async () => {
const readable = createReadable('noise { "foo": { "bar" } }');
const actual = streamJsonObjects({ readable }).next();
await expect(actual).rejects.toThrow(
'Invalid JSON object: { "foo": { "bar" } }',
);
});
});
describe("incomplete JSON object", () => {
it("throws error", async () => {
const readable = createReadable('noise { "foo": "bar"');
const actual = streamJsonObjects({ readable }).next();
await expect(actual).rejects.toThrow(
'Incomplete JSON object: { "foo": "bar"',
);
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment