Last active
March 22, 2017 12:10
-
-
Save th-yoo/1e9766c22554b808e9d5785c450a916a to your computer and use it in GitHub Desktop.
Promisify node stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
// (node -v) => 5.2.0 | |
const stream = require('stream'); | |
class StreamPromise { | |
constructor(stream) { | |
this._promises = []; | |
this._add(stream); | |
this._cur = stream; | |
// http://2ality.com/2014/10/es6-promises-api.html | |
// http://exploringjs.com/es6/ch_promises.html | |
if (!Promise.prototype.finally) { | |
Promise.prototype.finally = function (cb) { | |
const P = this.constructor; | |
return this.then( | |
value => | |
P.resolve(cb()).then(()=>value), | |
reason => | |
P.resolve(cb()).then(()=>{throw reason;}) | |
); | |
} | |
} | |
} | |
// N.B, Do not pipe process.stdout nor stderr | |
// https://nodejs.org/api/process.html#process_a_note_on_process_i_o | |
// Both of them are unexpectedly derived from Readable | |
// and never emit 'finish' | |
pipe(stream) { | |
if (this._p) { | |
throw Error('Can not pipe after calling Promise API'); | |
} | |
this._add(stream); | |
this._cur.pipe(stream); | |
this._cur = stream; | |
return this; | |
} | |
on(ev, cb) { | |
this._cur.on(ev, cb); | |
return this; | |
} | |
then(fulfilled, rejected) { | |
this._promise().then(fulfilled, rejected); | |
return this; | |
} | |
catch(rejected) { | |
this._promise().catch(rejected); | |
return this; | |
} | |
finally(cb) { | |
this._promise().finally(cb); | |
return this; | |
} | |
_promise() { | |
if (!this._p) { | |
if (this._promises.length == 0) { | |
this._p = Promise.reject(Error('No stream')); | |
} | |
else { | |
this._p = Promise.all(this._promises); | |
} | |
} | |
return this._p; | |
} | |
_add(s) { | |
let cur = new Promise((resolve, reject) => { | |
let success; | |
if (s instanceof stream.Writable) { | |
success = 'finish'; | |
} | |
else if (s instanceof stream.Readable) { | |
success = 'end'; | |
} | |
else { | |
throw Error('Unknown stream'); | |
} | |
s.on(success, resolve).on('error', reject); | |
}); | |
this._promises.push(cur); | |
} | |
}; | |
if (require.main === module) { | |
class Stdout extends stream.Writable { | |
constructor() { super({objectMode: true}); } | |
_write(chunk, enc, cb) { | |
process.stdout.write(chunk); | |
cb(); | |
} | |
}; | |
const fs = require('fs'); | |
let src = fs.createReadStream(require.main.filename); | |
src = new StreamPromise(src); | |
src.pipe(new Stdout) | |
.then(()=> { console.error('streaming succeeded'); }) | |
.catch(err=> { console.error('streaming failed:', err); }) | |
.finally(()=> { console.error('streaming ends'); }); | |
} | |
else { | |
module.exports = StreamPromise; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
// Single promise + onFullfilled, onRejected | |
const stream = require('stream'); | |
class StreamPromise { | |
constructor(stream, resolve) { | |
this._streams = []; | |
this._add(stream, resolve); | |
this._cur = stream; | |
// http://2ality.com/2014/10/es6-promises-api.html | |
// http://exploringjs.com/es6/ch_promises.html | |
if (!Promise.prototype.finally) { | |
Promise.prototype.finally = function (cb) { | |
const P = this.constructor; | |
return this.then( | |
value => | |
P.resolve(cb()).then(()=>value), | |
reason => | |
P.resolve(cb()).then(()=>{throw reason;}) | |
); | |
} | |
} | |
} | |
// N.B, Do not pipe process.stdout nor stderr | |
// https://nodejs.org/api/process.html#process_a_note_on_process_i_o | |
// Both of them are unexpectedly derived from Readable | |
// and never emit 'finish' | |
pipe(stream, resolve) { | |
this._add(stream, resolve); | |
this._cur.pipe(stream); | |
this._cur = stream; | |
return this; | |
} | |
on(ev, cb) { | |
this._cur.on(ev, cb); | |
return this; | |
} | |
then(fulfilled, rejected) { | |
return this._promise().then(fulfilled, rejected); | |
} | |
catch(rejected) { | |
return this._promise().catch(rejected); | |
} | |
finally(cb) { | |
return this._promise().finally(cb); | |
} | |
_promise() { | |
if (!this._p) { | |
function callback(s, resolve) { | |
if (!s.resolve) { | |
return resolve; | |
} | |
if (s.resolve instanceof Function) { | |
return ()=>resolve(s.resolve()); | |
} | |
else { | |
return ()=>resolve(s.resolve); | |
} | |
} | |
this._p = new Promise((resolve, reject) => { | |
let last = this._streams[this._streams.length-1]; | |
last.stream.on('finish', callback(last, resolve)); | |
for (let s of this._streams) { | |
s.stream.on('error', reject); | |
}; | |
}); | |
} | |
return this._p; | |
} | |
_add(stream, resolve) { | |
this._streams.push({ stream: stream | |
, resolve: resolve }); | |
} | |
}; | |
if (require.main === module) { | |
class Stdout extends stream.Writable { | |
constructor() { super({objectMode: true}); } | |
_write(chunk, enc, cb) { | |
process.stdout.write(chunk); | |
cb(); | |
} | |
}; | |
const fs = require('fs'); | |
let src = fs.createReadStream(require.main.filename); | |
src = new StreamPromise(src); | |
src.pipe(new Stdout, 'GOOD!!!') | |
.then( msg=> { console.error('streaming succeeded', msg); }) | |
.catch(err=> { console.error('streaming failed:', err); }) | |
.finally(()=> { console.error('streaming ends'); }); | |
} | |
else { | |
module.exports = StreamPromise; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment