-
-
Save mxstbr/e490c116993d5c54b98e79093d2a05a6 to your computer and use it in GitHub Desktop.
A reusable utility to turn a callback-based listener into an async iterable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// @flow | |
// Turn a callback-based listener into many async iterators without buffering | |
import { $$asyncIterator } from 'iterall'; | |
type Listener = ((arg: any) => void) => Promise<any>; | |
const defaultOnError = (err: Error) => { | |
throw new Error(err); | |
}; | |
type Options = {| | |
onError?: (err: Error) => void, | |
filter?: (arg: any) => boolean, | |
|}; | |
type Watcher = { | |
filter?: (arg: any) => boolean, | |
callback?: ({ done: boolean, value: any }) => void, | |
}; | |
const asyncify = (listener: Listener) => { | |
let watchers: Array<Watcher> = []; | |
listener(value => { | |
watchers.forEach(watcher => { | |
if (watcher.callback && (!watcher.filter || watcher.filter(value))) { | |
watcher.callback({ done: false, value }); | |
} | |
}); | |
}); | |
return ({ filter, onError = defaultOnError }: Options = {}) => { | |
let watcher: Watcher = { filter }; | |
let watching = true; | |
const cleanup = () => { | |
if (watching) { | |
watching = false; | |
watchers = watchers.filter(w => w !== watcher); | |
} | |
}; | |
try { | |
return { | |
next: () => | |
new Promise(resolve => { | |
watcher.callback = resolve; | |
watchers.push(watcher); | |
}), | |
return: () => { | |
cleanup(); | |
return Promise.resolve({ done: true }); | |
}, | |
throw: err => { | |
cleanup(); | |
onError(err); | |
return Promise.reject(err); | |
}, | |
[$$asyncIterator]() { | |
return this; | |
}, | |
}; | |
} catch (err) { | |
onError(err); | |
} | |
}; | |
}; | |
export default asyncify; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncify from './asyncify'; | |
// Stardard callback-based listener | |
const listenToNewMessages = (cb) => { | |
return onNewMessageInDb(message => cb(message)); | |
}; | |
const getMessageListener = asyncify(listenToNewMessages); | |
const asyncIterator = getMessageListener({ | |
filter: message => message.threadId === threadId, | |
onError: (err) => { /* handle errors */ }, | |
}); | |
await asyncIterator.next(); | |
await asyncIterator.next(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment