Last active
March 3, 2025 15:40
-
-
Save achingbrain/7e65ca748326d3b87b81508020a3321d to your computer and use it in GitHub Desktop.
Streaming benchmark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Readable, Writable } from 'node:stream' | |
import { pipe } from 'it-pipe' | |
const ITERATIONS = 1000 | |
function createData () { | |
return new Array(1024).fill(0).map(() => new Uint8Array(1024)) | |
} | |
async function nodeStreams () { | |
const data = createData() | |
const output = [] | |
await new Promise((resolve, reject) => { | |
const readable = Readable.from(data) | |
const writable = new Writable({ | |
write: (chunk, enc, cb) => { | |
output.push(chunk) | |
cb() | |
} | |
}) | |
writable.on('finish', () => { | |
if (output.length !== data.length) { | |
reject(new Error('Short read')) | |
return | |
} | |
resolve() | |
}) | |
readable.pipe(writable) | |
}) | |
} | |
async function nodeStreamsAsWebStreams () { | |
const data = createData() | |
const output = [] | |
const readable = Readable.from(data) | |
const writable = new Writable({ | |
write: (chunk, enc, cb) => { | |
output.push(chunk) | |
cb() | |
} | |
}) | |
const readableWeb = Readable.toWeb(readable) | |
const writableWeb = Writable.toWeb(writable) | |
await readableWeb.pipeTo(writableWeb) | |
if (output.length !== data.length) { | |
throw new Error('Short read') | |
} | |
} | |
async function webStreams (readableOpts = {}) { | |
const data = createData() | |
const output = [] | |
let index = 0 | |
const readable = new ReadableStream({ | |
pull: (controller) => { | |
if (index === data.length) { | |
controller.close() | |
return | |
} | |
controller.enqueue(data[index]) | |
index++ | |
}, | |
...readableOpts | |
}) | |
const writable = new WritableStream({ | |
write: (chunk) => { | |
output.push(chunk) | |
} | |
}) | |
await readable.pipeTo(writable) | |
if (output.length !== data.length) { | |
throw new Error('Short read') | |
} | |
} | |
async function webByteStreams () { | |
await webStreams({ type: 'bytes' }) | |
} | |
async function itDuplex () { | |
const data = createData() | |
const output = [] | |
const it = { | |
source: async function * () { | |
yield * data | |
}(), | |
sink: async (source) => { | |
for await (const buf of source) { | |
output.push(buf) | |
} | |
} | |
} | |
await pipe(it, it) | |
if (output.length !== data.length) { | |
throw new Error('Short read') | |
} | |
} | |
async function nodeStreamsAsItDuplex () { | |
const data = createData() | |
const output = [] | |
const readable = Readable.from(data) | |
const writable = new Writable({ | |
write: (chunk, enc, cb) => { | |
output.push(chunk) | |
cb() | |
} | |
}) | |
const it = { | |
source: readable, | |
sink: async (source) => { | |
for await (const buf of source) { | |
const writeMore = writable.write(buf) | |
if (!writeMore) { | |
await new Promise((resolve) => { | |
writable.once('drain', () => { | |
resolve() | |
}) | |
}) | |
} | |
} | |
} | |
} | |
await pipe(it, it) | |
if (output.length !== data.length) { | |
throw new Error('Short read') | |
} | |
} | |
async function webStreamsAsItDuplex (readableOpts = {}) { | |
const data = createData() | |
const output = [] | |
let index = 0 | |
const readable = new ReadableStream({ | |
pull: (controller) => { | |
if (index === data.length) { | |
controller.close() | |
return | |
} | |
controller.enqueue(data[index]) | |
index++ | |
}, | |
...readableOpts | |
}) | |
const writable = new WritableStream({ | |
write: (chunk) => { | |
output.push(chunk) | |
} | |
}) | |
const it = { | |
source: async function * () { | |
const reader = readable.getReader() | |
let next = await reader.read() | |
while (next.done !== true) { | |
yield next.value | |
next = await reader.read() | |
} | |
}(), | |
sink: async (source) => { | |
const writer = writable.getWriter() | |
for await (const buf of source) { | |
await writer.ready | |
await writer.write(buf) | |
} | |
await writer.close() | |
} | |
} | |
await pipe(it, it) | |
if (output.length !== data.length) { | |
throw new Error('Short read') | |
} | |
} | |
async function webByteStreamsAsItDuplex () { | |
await webStreamsAsItDuplex({ type: 'bytes'}) | |
} | |
const tests = { | |
'node streams': nodeStreams, | |
'node streams as web streams': nodeStreamsAsWebStreams, | |
'web streams': webStreams, | |
'web byte streams': webByteStreams, | |
'duplex async iterators': itDuplex, | |
'node streams as duplex async iterator': nodeStreamsAsItDuplex, | |
'web streams as duplex async iterator': webStreamsAsItDuplex, | |
'web byte streams as duplex async iterator': webByteStreamsAsItDuplex | |
} | |
// warmup | |
for (const [name, test] of Object.entries(tests)) { | |
for (let i = 0; i < ITERATIONS; i++) { | |
await test() | |
} | |
} | |
const results = [] | |
// test | |
for (const [name, test] of Object.entries(tests)) { | |
const start = Date.now() | |
for (let i = 0; i < ITERATIONS; i++) { | |
await test() | |
} | |
const time = Date.now() - start | |
results.push({ | |
'Name': name, | |
'Time': time, | |
'Ops/s': time/ITERATIONS | |
}) | |
} | |
// print results | |
console.table(results.sort((a, b) => { | |
if (a.Time < b.Time) { | |
return -1 | |
} | |
if (a.Time > b.Time) { | |
return 1 | |
} | |
return 0 | |
})) |
Results on node v20.2.0
shows a bigger performance gap:
% node index.js
┌─────────┬─────────────────────────────────────────────┬──────┬───────┐
│ (index) │ Name │ Time │ Ops/s │
├─────────┼─────────────────────────────────────────────┼──────┼───────┤
│ 0 │ 'node streams' │ 500 │ 0.5 │
│ 1 │ 'duplex async iterators' │ 660 │ 0.66 │
│ 2 │ 'node streams as duplex async iterator' │ 744 │ 0.744 │
│ 3 │ 'web streams' │ 1713 │ 1.713 │
│ 4 │ 'web streams as duplex async iterator' │ 1797 │ 1.797 │
│ 5 │ 'web byte streams' │ 1988 │ 1.988 │
│ 6 │ 'web byte streams as duplex async iterator' │ 2061 │ 2.061 │
│ 7 │ 'node streams as web streams' │ 2151 │ 2.151 │
└─────────┴─────────────────────────────────────────────┴──────┴───────┘
This lives here now for better visibility: https://github.com/ipfs-shipyard/js-streams-benchmark
Results on node v20.12.0
, web streams are catching up:
% node index.js
┌─────────┬─────────────────────────────────────────────┬──────┬───────┐
│ (index) │ Name │ Time │ Ops/s │
├─────────┼─────────────────────────────────────────────┼──────┼───────┤
│ 0 │ 'node streams' │ 404 │ 0.404 │
│ 1 │ 'duplex async iterators' │ 507 │ 0.507 │
│ 2 │ 'node streams as duplex async iterator' │ 745 │ 0.745 │
│ 3 │ 'web streams' │ 839 │ 0.839 │
│ 4 │ 'web streams as duplex async iterator' │ 1214 │ 1.214 │
│ 5 │ 'web byte streams' │ 1313 │ 1.313 │
│ 6 │ 'node streams as web streams' │ 1394 │ 1.394 │
│ 7 │ 'web byte streams as duplex async iterator' │ 1633 │ 1.633 │
└─────────┴─────────────────────────────────────────────┴──────┴───────┘
node v22.11.0
, node streams have got faster:
% node index.js
┌─────────┬─────────────────────────────────────────────┬──────┬───────┐
│ (index) │ Name │ Time │ Ops/s │
├─────────┼─────────────────────────────────────────────┼──────┼───────┤
│ 0 │ 'node streams' │ 274 │ 0.274 │
│ 1 │ 'duplex async iterators' │ 389 │ 0.389 │
│ 2 │ 'node streams as duplex async iterator' │ 522 │ 0.522 │
│ 3 │ 'web streams' │ 826 │ 0.826 │
│ 4 │ 'web streams as duplex async iterator' │ 920 │ 0.92 │
│ 5 │ 'web byte streams' │ 952 │ 0.952 │
│ 6 │ 'node streams as web streams' │ 1066 │ 1.066 │
│ 7 │ 'web byte streams as duplex async iterator' │ 1164 │ 1.164 │
└─────────┴─────────────────────────────────────────────┴──────┴───────┘
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Results on node
v18.16.0
: