Skip to content

Instantly share code, notes, and snippets.

@josephlord
Last active July 17, 2021 19:30

Revisions

  1. josephlord revised this gist Jul 17, 2021. 1 changed file with 63 additions and 34 deletions.
    97 changes: 63 additions & 34 deletions AsyncSequencePublisherInnerActor.swift
    Original file line number Diff line number Diff line change
    @@ -1,39 +1,68 @@
    /// Ideally this wouldn't be needed and the ASPSubscriber could be an actor itself but due to issues in Xcode 13 beta 1 and 2
    /// continuations can't safely be resumed from actor contexts so this separate actor is needed to manage the demand and th
    /// continuation but to return it instead of resuming it directly. The callers of `add(demand:)` and
    /// `getContinuationToFireOnCancelation` shoudl always resume the returned value immediately
    private actor Inner {
    var demand: Subscribers.Demand = .none
    var demandUpdatedContinuation: CheckedContinuation<Void, Never>?

    /// Returns immediately if there is demand for an additional item from the subscriber or awaits an increase in demand
    /// then will return when there is some demand (or the task has been cancelled and the continuation fired)
    fileprivate func waitUntilReadyForMore() async {
    if demand > 0 {
    demand -= 1
    return
    extension PublisherAsyncSequence {
    public class Iterator {

    private let iActor = InnerActor()

    /// Due to bug [SR-14875](https://bugs.swift.org/browse/SR-14875) we need to avoid calling continuations
    /// from the actor execution context currently which is why we wrap the state in this InnerActor instead of just making the Interator
    /// and actor
    private actor InnerActor {
    /// These typealiases are just for cleaner call sites
    typealias ElementContinuation = CheckedContinuation<Element?, P.Failure>
    typealias SubsciptionContinuation = CheckedContinuation<Void, Never>

    private var subscription: Subscription?
    private var subscriptionContinuation: SubsciptionContinuation?
    private var continuation: ElementContinuation?

    func next() async throws -> Element? {
    if subscription == nil {
    await withCheckedContinuation { continuation in
    subscriptionContinuation = continuation
    }
    }

    return try await withCheckedThrowingContinuation({ continuation in
    self.continuation = continuation
    subscription?.request(.max(1))
    })
    }

    func setSubscription(subscription: Subscription) -> SubsciptionContinuation? {
    defer { subscriptionContinuation = nil }
    assert(self.subscription == nil)
    self.subscription = subscription
    return subscriptionContinuation
    }

    /// You should resume the completion immediately after calling this
    func getAndClearMainCompletion() -> ElementContinuation? {
    defer { continuation = nil }
    return continuation
    }

    /// You should resume the completion immediately after calling this
    func getAndClearSubscriptionCompletion() -> SubsciptionContinuation? {
    defer { subscriptionContinuation = nil }
    return subscriptionContinuation
    }
    }

    let _: Void = await withCheckedContinuation { continuation in
    demandUpdatedContinuation = continuation
    private func receive(compl: Subscribers.Completion<Error>) async {
    let continuation = await iActor.getAndClearMainCompletion()
    assert(continuation != nil)
    switch compl {
    case .finished:
    continuation?.resume(returning: nil)
    case .failure(let err):
    continuation?.resume(throwing: err)
    }
    }

    private func receive(input: Element) async {
    let continuation = await iActor.getAndClearMainCompletion()
    assert(continuation != nil)
    continuation?.resume(returning: input)
    }
    }

    /// Update the tracked demand for the publisher
    /// - Parameter demand: The additional demand for the publisher
    /// - Returns: A continuation that must be resumed off the actor context immediatly
    func add(demand: Subscribers.Demand) -> CheckedContinuation<Void, Never>? {
    defer { demandUpdatedContinuation = nil }
    self.demand += demand
    guard demand > 0 else { return nil }
    return demandUpdatedContinuation
    }


    /// This is used to prevent being permanently stuck awaiting the continuation if the task has been cancelled
    /// - Returns: Continuation to resume to allow cancellation to complete
    func getContinuationToFireOnCancelation() -> CheckedContinuation<Void, Never>? {
    defer { demandUpdatedContinuation = nil }
    return demandUpdatedContinuation
    }
    }
  2. josephlord created this gist Jul 7, 2021.
    39 changes: 39 additions & 0 deletions AsyncSequencePublisherInnerActor.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,39 @@
    /// Ideally this wouldn't be needed and the ASPSubscriber could be an actor itself but due to issues in Xcode 13 beta 1 and 2
    /// continuations can't safely be resumed from actor contexts so this separate actor is needed to manage the demand and th
    /// continuation but to return it instead of resuming it directly. The callers of `add(demand:)` and
    /// `getContinuationToFireOnCancelation` shoudl always resume the returned value immediately
    private actor Inner {
    var demand: Subscribers.Demand = .none
    var demandUpdatedContinuation: CheckedContinuation<Void, Never>?

    /// Returns immediately if there is demand for an additional item from the subscriber or awaits an increase in demand
    /// then will return when there is some demand (or the task has been cancelled and the continuation fired)
    fileprivate func waitUntilReadyForMore() async {
    if demand > 0 {
    demand -= 1
    return
    }

    let _: Void = await withCheckedContinuation { continuation in
    demandUpdatedContinuation = continuation
    }
    }

    /// Update the tracked demand for the publisher
    /// - Parameter demand: The additional demand for the publisher
    /// - Returns: A continuation that must be resumed off the actor context immediatly
    func add(demand: Subscribers.Demand) -> CheckedContinuation<Void, Never>? {
    defer { demandUpdatedContinuation = nil }
    self.demand += demand
    guard demand > 0 else { return nil }
    return demandUpdatedContinuation
    }


    /// This is used to prevent being permanently stuck awaiting the continuation if the task has been cancelled
    /// - Returns: Continuation to resume to allow cancellation to complete
    func getContinuationToFireOnCancelation() -> CheckedContinuation<Void, Never>? {
    defer { demandUpdatedContinuation = nil }
    return demandUpdatedContinuation
    }
    }