Created
November 8, 2016 13:17
-
-
Save TobiaszCudnik/831b23031251b7a09ed7ef5ad8f6779c to your computer and use it in GitHub Desktop.
IterableEmitter for ES6/TypeScript - async iterator for an event stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { EventEmitter } from "events" | |
/** | |
* Transforms a stream of events into an array-like iterable which produces promises. | |
* Empty event value means the end of the stream. | |
* | |
* Poor-man's async iterator. | |
* | |
* Usage: | |
* ``` | |
* async function() { | |
* let events = new IterableEmitter(emitter, 'event_name') | |
* for (let item of events) { | |
* item = await item | |
* console.log(item) | |
* } | |
* console.log('done') | |
* } | |
*/ | |
class IterableEmitter<T> implements Iterable<Promise<T>> { | |
queue: T[] = []; | |
done = false; | |
debug = false | |
protected resolve: ((T) => void) | null = null; | |
constructor( | |
public emitter: EventEmitter, | |
public event_name: string, | |
public unpack: (any) => T = val=>val) { | |
// empty | |
} | |
/** | |
* Current size of the buffer queue. | |
*/ | |
get length() { | |
return this.queue.length | |
} | |
log(msg: string) { | |
if (this.debug) | |
console.log(msg) | |
} | |
listener: Function | null; | |
start() { | |
const context = this | |
this.listener = function(item: T) { | |
item = context.unpack(item) | |
context.log(`emitter.on('${context.event_name}')`) | |
if (!item) { | |
context.done = true | |
context.log('emitter done') | |
} else { | |
context.queue.push(item) | |
} | |
if (context.resolve) { | |
context.resolve(context.queue.shift()) | |
context.resolve = null | |
} | |
} | |
this.emitter.on(this.event_name, this.listener) | |
} | |
dispose() { | |
this.emitter.removeListener(this.event_name, this.listener) | |
} | |
*[Symbol.iterator]() { | |
this.log('iterator start') | |
while (!this.done || this.queue.length) { | |
yield new Promise<T>(resolve=>{ | |
// resolve from the buffer | |
if (this.queue.length) | |
resolve(this.queue.shift()) | |
// wait for data | |
else | |
this.resolve = resolve | |
}) | |
} | |
this.log('iterator end') | |
} | |
} | |
export default IterableEmitter |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment