Skip to content

Instantly share code, notes, and snippets.

@hackergrrl
Last active February 1, 2019 23:50

Revisions

  1. @noffle noffle revised this gist Feb 1, 2019. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions counter.js
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,8 @@
    var stream = require('stream')

    module.exports = function (id) {
    // opts.allowHalfDuplex is _very_ important! Otherwise ending the readable
    // half of the duplex will leave the writable side open!
    var counter = new stream.Duplex({allowHalfOpen:false})
    counter.processed = []

  2. @noffle noffle revised this gist Feb 1, 2019. 1 changed file with 6 additions and 0 deletions.
    6 changes: 6 additions & 0 deletions usage.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,6 @@
    var counterDuplex = require('./counter')

    var a = counterDuplex('a')
    var b = counterDuplex('b')

    a.pipe(b).pipe(a)
  3. @noffle noffle created this gist Feb 1, 2019.
    55 changes: 55 additions & 0 deletions counter.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,55 @@
    var stream = require('stream')

    module.exports = function (id) {
    var counter = new stream.Duplex({allowHalfOpen:false})
    counter.processed = []

    var payloadsLeft = 3

    counter._read = function () {
    if (!payloadsLeft) return this.push(null)

    var payload = JSON.stringify({ sender: id, left: payloadsLeft })

    var prefix = Buffer.alloc(4)
    prefix.writeUInt32LE(payload.length, 0)

    this.push(prefix)
    this.push(payload)

    payloadsLeft--
    }

    var expected = 0
    var accum = Buffer.alloc(0)
    counter._write = function (chunk, enc, next) {
    accum = Buffer.concat([accum, chunk])
    tryParse()
    next()
    }

    function tryParse () {
    // haven't recv'd prefix len yet
    if (!expected && accum.length < 4) return

    // read prefix length
    if (!expected) {
    expected = accum.readUInt32LE(0)
    accum = accum.slice(4)
    }

    // read next chunk
    if (accum.length >= expected) {
    var buf = accum.slice(0, expected)
    var value = JSON.parse(buf.toString())
    counter.processed.push(value)
    accum = accum.slice(expected)
    expected = 0
    tryParse()
    }

    }

    // exposes '.processed' so you can examine the payloads received
    return counter
    }