Created
January 7, 2023 02:26
-
-
Save kriskowal/c91006189f4aac26dea8f3f2d4815a95 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { makeStream, nullQueue } from '@endo/stream'; | |
/** | |
* @template TValue | |
*/ | |
const makePubSub = () => { | |
// Request pubsub async queue internals | |
let { promise: tailPromise, resolve: tailResolve } = makePromiseKit(); | |
/** | |
* @param {TValue} value | |
*/ | |
const pub = value => { | |
const { resolve, promise } = makePromiseKit(); | |
tailResolve(Object.freeze({ value, promise })); | |
tailResolve = resolve; | |
// Unlike a queue, advance the read head for future subscribers. | |
tailPromise = promise; | |
}; | |
const sub = () => { | |
// Capture the read head for the next published value. | |
let cursor = tailPromise; | |
return () => { | |
const promise = cursor.then(next => next.value); | |
cursor = cursor.then(next => next.promise); | |
return promise; | |
}; | |
}; | |
return harden({ pub, sub }); | |
}; | |
export const makeTopic = () => { | |
const { pub, sub } = makePubSub(); | |
const publisher = makeStream(nullQueue, pub); | |
return harden({ | |
publisher, | |
subscribe: () => makeStream(sub(), nullQueue), | |
}); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment