Created
September 4, 2020 09:13
-
-
Save natpenguin/f52ceeeacb329059fe60e61aa337dee9 to your computer and use it in GitHub Desktop.
An combine operator that pass the stream event with previous stream event
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extension Publishers { | |
struct Previous<Upstream: Publisher>: Publisher { | |
typealias Output = (Upstream.Output?, Upstream.Output) | |
typealias Failure = Upstream.Failure | |
private let upstream: Upstream | |
init(upstream: Upstream) { | |
self.upstream = upstream | |
} | |
func receive<S>(subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input { | |
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber)) | |
} | |
} | |
} | |
extension Publishers.Previous { | |
class Subscription<Downstream: Subscriber>: Combine.Subscription where (Upstream.Output?, Upstream.Output) == Downstream.Input, Downstream.Failure == Failure { | |
private var sink: Sink<Upstream, Downstream>? | |
init(upstream: Upstream, downstream: Downstream) { | |
sink = .init(upstream: upstream, downstream: downstream) | |
} | |
func request(_: Subscribers.Demand) {} | |
func cancel() { | |
sink = nil | |
} | |
} | |
class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where (Upstream.Output?, Upstream.Output) == Downstream.Input, Upstream.Failure == Downstream.Failure { | |
typealias Input = Upstream.Output | |
typealias Failure = Upstream.Failure | |
private var downstream: Downstream | |
private var previousInput: Input? | |
init(upstream: Upstream, downstream: Downstream) { | |
self.downstream = downstream | |
upstream.subscribe(self) | |
} | |
func receive(_ input: Input) -> Subscribers.Demand { | |
let demand = downstream.receive((previousInput, input)) | |
previousInput = input | |
return demand | |
} | |
func receive(completion: Subscribers.Completion<Upstream.Failure>) { | |
downstream.receive(completion: completion) | |
} | |
func receive(subscription: Combine.Subscription) { | |
subscription.request(.unlimited) | |
} | |
} | |
} | |
extension Publisher { | |
func previous() -> Publishers.Previous<Self> { | |
Publishers.Previous(upstream: self) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment