Last active
July 17, 2021 19:30
-
-
Save josephlord/1c2b9d19daed3b61daf2545d6523ee6e to your computer and use it in GitHub Desktop.
The inner actor for AsyncSequencePublisher subscription
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extension PublisherAsyncSequence { | |
public class Iterator { | |
private let iActor = InnerActor() | |
/// Due to bug [SR-14875](https://bugs.swift.org/browse/SR-14875) we need to avoid calling continuations | |
/// from the actor execution context currently which is why we wrap the state in this InnerActor instead of just making the Interator | |
/// and actor | |
private actor InnerActor { | |
/// These typealiases are just for cleaner call sites | |
typealias ElementContinuation = CheckedContinuation<Element?, P.Failure> | |
typealias SubsciptionContinuation = CheckedContinuation<Void, Never> | |
private var subscription: Subscription? | |
private var subscriptionContinuation: SubsciptionContinuation? | |
private var continuation: ElementContinuation? | |
func next() async throws -> Element? { | |
if subscription == nil { | |
await withCheckedContinuation { continuation in | |
subscriptionContinuation = continuation | |
} | |
} | |
return try await withCheckedThrowingContinuation({ continuation in | |
self.continuation = continuation | |
subscription?.request(.max(1)) | |
}) | |
} | |
func setSubscription(subscription: Subscription) -> SubsciptionContinuation? { | |
defer { subscriptionContinuation = nil } | |
assert(self.subscription == nil) | |
self.subscription = subscription | |
return subscriptionContinuation | |
} | |
/// You should resume the completion immediately after calling this | |
func getAndClearMainCompletion() -> ElementContinuation? { | |
defer { continuation = nil } | |
return continuation | |
} | |
/// You should resume the completion immediately after calling this | |
func getAndClearSubscriptionCompletion() -> SubsciptionContinuation? { | |
defer { subscriptionContinuation = nil } | |
return subscriptionContinuation | |
} | |
} | |
private func receive(compl: Subscribers.Completion<Error>) async { | |
let continuation = await iActor.getAndClearMainCompletion() | |
assert(continuation != nil) | |
switch compl { | |
case .finished: | |
continuation?.resume(returning: nil) | |
case .failure(let err): | |
continuation?.resume(throwing: err) | |
} | |
} | |
private func receive(input: Element) async { | |
let continuation = await iActor.getAndClearMainCompletion() | |
assert(continuation != nil) | |
continuation?.resume(returning: input) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment