Skip to content

Instantly share code, notes, and snippets.

@telamon
Created March 11, 2025 23:34
Show Gist options
  • Save telamon/a2064a8a11f7024cf6d32c777534c804 to your computer and use it in GitHub Desktop.
Save telamon/a2064a8a11f7024cf6d32c777534c804 to your computer and use it in GitHub Desktop.
const { Readable } = require('streamx')
const UDX = require('.')
const runtime = (globalThis.Bare || process)
async function udxStreamPair () {
const udx1 = new UDX()
const udx2 = new UDX()
const socket1 = udx1.createSocket()
const socket2 = udx2.createSocket()
socket1.bind()
socket2.bind()
const stream1 = udx1.createStream(1)
const stream2 = udx2.createStream(2)
stream1.connect(socket1, stream2.id, socket2.address().port, '127.0.0.1')
stream2.connect(socket2, stream1.id, socket1.address().port, '127.0.0.1')
const close = () => {
stream1.destroy()
stream2.destroy()
return Promise.all([socket1.close(), socket2.close()])
}
return {
udx1,
udx2,
socket1,
socket2,
stream1,
stream2,
close
}
}
async function testStreamWrite (target = 40) {
console.log('# e2e-benchmark: udx_napi_stream_write(), target transfer', target)
target = 1024 * 1024 * target // convert to MB
const { stream1, stream2, close } = await udxStreamPair()
let sent = 0
let received = 0
const reader = new Promise(resolve => {
stream2.on('data', buf => {
received += buf.byteLength
// console.log('received', received, received / target)
if (received >= target) resolve()
})
})
const payload = Buffer.alloc(255)
const rs = new Readable({
read (cb) {
sent += payload.byteLength
this.push(payload)
if (sent >= target) this.push(null)
cb(null)
}
})
rs.pipe(stream1)
const start = Date.now()
await reader
const ms = Date.now() - start
console.log('all sent time (ms)', ms, 'Mbit/s', ((sent * 8) / 1024 / 1024) / (ms / 1000))
await close()
return ms
}
async function testStreamWritev (target = 400) {
console.log('# e2e-benchmark: udx_napi_stream_writev(), target transfer', target)
target = 1024 * 1024 * target // convert to MB
const { stream1, stream2, close } = await udxStreamPair()
let sent = 0
let received = 0
const reader = new Promise(resolve => {
stream2.on('data', buf => {
received += buf.byteLength
// console.log('received', received, received / target)
if (received >= target) resolve()
})
})
const payload = Buffer.alloc(255)
const start = Date.now()
let batchSize = 0
do {
stream1.write(payload)
sent += payload.byteLength
if (++batchSize === 255) {
batchSize = 0
await stream1.flush()
}
} while (sent <= target)
await reader
const ms = Date.now() - start
console.log('all sent time (ms)', ms, 'Mbit/s', ((sent * 8) / 1024 / 1024) / (ms / 1000))
await close()
return ms
}
async function main () {
const stats = {
runtime: globalThis.Bare ? 'bare' : 'node',
argv: runtime.argv.slice(2)
}
await testStreamWrite(100) // warmup
stats.stream_write = await testStreamWrite(runtime.argv[2])
await testStreamWritev(50) // warmup
stats.stream_writev = await testStreamWritev(runtime.argv[2])
console.log(stats)
}
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment