Last active
January 17, 2020 05:47
-
-
Save slightair/3a2c02a990c3b176fa2feadcd26e6e15 to your computer and use it in GitHub Desktop.
Apple Combine + withLatestFrom operator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let just = Just<Int>(999) | |
let a = [1, 2, 3, 4, 5].publisher | |
.withLatestFrom(just) | |
.sink(receiveCompletion: { | |
print($0) | |
}, receiveValue: { | |
print($0) | |
}) | |
print() | |
let x = PassthroughSubject<Int, Never>() | |
let y = PassthroughSubject<String, Never>() | |
let b = x.withLatestFrom(y) | |
.sink(receiveCompletion: { | |
print($0) | |
}, receiveValue: { | |
print($0) | |
}) | |
x.send(100) | |
y.send("A") | |
y.send("B") | |
x.send(200) | |
x.send(300) | |
y.send("C") | |
x.send(400) | |
x.send(completion: .finished) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
extension Publishers { | |
struct WithLatestFrom<Upstream: Publisher, P: Publisher, Output>: Publisher { | |
typealias Failure = Error | |
typealias ResultSelector = (Upstream.Output, P.Output) throws -> Output | |
let upstream: Upstream | |
let publisher: P | |
let resultSelector: ResultSelector | |
init(upstream: Upstream, publisher: P, resultSelector: @escaping ResultSelector) { | |
self.upstream = upstream | |
self.publisher = publisher | |
self.resultSelector = resultSelector | |
} | |
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { | |
let subscription = WithLatestFromSubscription(subscriber: subscriber, upstream: upstream, publisher: publisher, resultSelector: resultSelector) | |
subscriber.receive(subscription: subscription) | |
} | |
private final class WithLatestFromSubscription<SubscriberType: Subscriber>: Subscription where SubscriberType.Input == Output, SubscriberType.Failure == Failure { | |
let combineIdentifier = CombineIdentifier() | |
let upstream: Upstream | |
let publisher: P | |
let resultSelector: ResultSelector | |
var subscriber: SubscriberType? | |
var upstreamSubscription: AnyCancellable? | |
var publisherSubscription: AnyCancellable? | |
var latest: P.Output? | |
init(subscriber: SubscriberType, upstream: Upstream, publisher: P, resultSelector: @escaping ResultSelector) { | |
self.subscriber = subscriber | |
self.upstream = upstream | |
self.publisher = publisher | |
self.resultSelector = resultSelector | |
} | |
func request(_ demand: Subscribers.Demand) { | |
publisherSubscription = publisher.sink( | |
receiveCompletion: { completion in | |
switch completion { | |
case .finished: | |
break | |
case let .failure(error): | |
self.subscriber?.receive(completion: .failure(error)) | |
self.cancel() | |
} | |
}, | |
receiveValue: { output in | |
self.latest = output | |
}) | |
upstreamSubscription = upstream.sink( | |
receiveCompletion: { completion in | |
switch completion { | |
case .finished: | |
self.subscriber?.receive(completion: .finished) | |
self.cancel() | |
case let .failure(error): | |
self.subscriber?.receive(completion: .failure(error)) | |
self.cancel() | |
} | |
}, | |
receiveValue: { output in | |
guard let latest = self.latest else { return } | |
do { | |
let result = try self.resultSelector(output, latest) | |
self.subscriber?.receive(result) | |
} catch let error { | |
self.subscriber?.receive(completion: .failure(error)) | |
self.cancel() | |
} | |
} | |
) | |
} | |
func cancel() { | |
publisherSubscription = nil | |
upstreamSubscription = nil | |
subscriber = nil | |
} | |
} | |
} | |
} | |
extension Publisher { | |
func withLatestFrom<P: Publisher, ResultType>(_ publisher: P, resultSelector: @escaping (Output, P.Output) throws -> ResultType) -> Publishers.WithLatestFrom<Self, P, ResultType> { | |
Publishers.WithLatestFrom(upstream: self, publisher: publisher, resultSelector: resultSelector) | |
} | |
func withLatestFrom<P: Publisher>(_ publisher: P) -> Publishers.WithLatestFrom<Self, P, P.Output> { | |
withLatestFrom(publisher, resultSelector: { $1 }) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment