Created
January 18, 2021 22:14
-
-
Save fuunnx/30846090b99369e0a846aeb59248edaa to your computer and use it in GitHub Desktop.
Xstream operators
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// from https://github.com/ReactiveX/rxjs/blob/b25db9f369b07f26cf2fc11714ec1990b78a4536/src/internal/operators/shareReplay.ts#L26-L37 | |
import xs, { Stream, MemoryStream, Subscription } from 'xstream' | |
export function replay<T>(source: Stream<T>) { | |
let subject: MemoryStream<T> | |
let refCount = 0 | |
let subscription: Subscription | |
let innerSub: Subscription | |
let hasError = false | |
let hasValue = false | |
let lastValue: T | |
let isComplete = false | |
const stream = xs.create<T>({ | |
start(listener) { | |
refCount++ | |
if (!subject || hasError) { | |
hasError = false | |
subject = xs.createWithMemory<T>() | |
subscription = source.subscribe({ | |
next(value) { | |
hasValue = true | |
lastValue = value | |
subject.shamefullySendNext(value) | |
}, | |
error(err) { | |
hasError = true | |
subject.shamefullySendError(err) | |
}, | |
complete() { | |
isComplete = true | |
subject.shamefullySendComplete() | |
}, | |
}) | |
} | |
innerSub = subject.subscribe(listener) | |
}, | |
stop() { | |
refCount-- | |
innerSub.unsubscribe() | |
if (subscription && refCount === 0 && isComplete) { | |
subscription.unsubscribe() | |
} | |
}, | |
}) | |
const _add = stream._add.bind(stream) | |
stream._add = function (il) { | |
if (hasValue) { | |
il._n(lastValue) | |
} | |
_add(il) | |
} | |
return stream | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment