Last active
January 7, 2023 02:26
-
-
Save kriskowal/cc961ecf3f8cb6e9df658d326a716998 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { makeStream, nullQueue } from '@endo/stream'; | |
/** | |
* @template TValue | |
*/ | |
const makePubSub = () => { | |
// Request pubsub async queue internals | |
let { promise: tailPromise, resolve: tailResolve } = makePromiseKit(); | |
/** | |
* @param {TValue} value | |
*/ | |
const pub = value => { | |
const { resolve, promise } = makePromiseKit(); | |
tailResolve(Object.freeze({ value, promise })); | |
tailResolve = resolve; | |
// Unlike a queue, advance the read head for future subscribers. | |
tailPromise = promise; | |
}; | |
const sub = () => { | |
// Capture the read head for the next published value. | |
let cursor = tailPromise; | |
return () => { | |
const promise = cursor.then(next => next.value); | |
cursor = cursor.then(next => next.promise); | |
return promise; | |
}; | |
}; | |
return harden({ pub, sub }); | |
}; | |
export const makeTopic = () => { | |
const { pub, sub } = makePubSub(); | |
const publisher = makeStream(nullQueue, pub); | |
return harden({ | |
publisher, | |
subscribe: () => makeStream(sub(), nullQueue), | |
}); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment