Skip to content

Instantly share code, notes, and snippets.

@darrarski
Last active May 6, 2025 11:44
Show Gist options
  • Save darrarski/93120b33fceb314c23689528446edefc to your computer and use it in GitHub Desktop.
Save darrarski/93120b33fceb314c23689528446edefc to your computer and use it in GitHub Desktop.
Swift Serial Queue for async/await operations
import Foundation
/// Serial queue that executes added operations asynchronosly, one at a time.
///
/// Example usage:
/// ```swift
/// let queue = AsyncSerialQueue()
/// for number in 1...100_000 {
/// queue.addOperation {
/// let task = Task { print(number) }
/// await task.value
/// }
/// }
/// // prints numbers from 1 to 100k in incremental order.
/// ```
public final class AsyncSerialQueue: Sendable {
/// Asynchonous operation executed on the queue.
public typealias Operation = @Sendable () async -> Void
/// Create new, empty serial queue.
public init() {
let (stream, continuation) = AsyncStream<Operation>.makeStream()
self.continuation = continuation
self.task = Task {
try Task.checkCancellation()
for await operation in stream {
try Task.checkCancellation()
await operation()
}
}
}
private let continuation: AsyncStream<Operation>.Continuation
private let task: Task<Void, any Error>
deinit {
task.cancel()
}
/// Add async operation to the queue.
///
/// The operation will be executed after all previously added operations.
///
/// > Important: Operations added to cancelled queue are ignored and won't start executing.
///
/// - Parameter operation: Asynchronous operation executed on the queue.
public func addOperation(_ operation: @escaping Operation) {
guard !task.isCancelled else { return }
continuation.yield(operation)
}
/// Cancel the queue.
///
/// > Note:
/// > - The currently running operation will be cancelled.
/// > - All pending operations won't start executing.
/// > - All operations added afterwards won't start executing.
public func cancel() {
task.cancel()
}
/// A Boolean value that indicates whether the queue was cancelled.
///
/// > Important: Operations added to cancelled queue are ignored and won't start executing.
public var isCancelled: Bool {
task.isCancelled
}
}
import ConcurrencyExtras
import CustomDump
import Testing
@testable import <#Module#>
struct AsyncSerialQueueTests {
@Test
func queue() async {
let queue = AsyncSerialQueue()
let numbers = LockIsolated<[Int]>([])
let count = 100_000
let range = 0..<count
for number in range {
queue.addOperation {
let task = Task { numbers.withValue { $0.append(number) } }
await task.value
}
}
while numbers.count < count {
await Task.yield()
}
#expect(queue.isCancelled == false)
for (index, number) in numbers.value.enumerated() {
guard number == index else {
Issue.record("Expected number \(index) at index \(index), got number \(number)")
return
}
}
}
@Test
func cancellation() async {
let queue = AsyncSerialQueue()
let events = LockIsolated<[String]>([])
let firstOperationContinuation = LockIsolated<UnsafeContinuation<Void, Never>?>(nil)
queue.addOperation {
await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
events.withValue { $0.append("started first operation") }
firstOperationContinuation.setValue(continuation)
}
} onCancel: {
events.withValue { $0.append("first operation cancelled") }
}
}
let secondOperationContinuation = LockIsolated<UnsafeContinuation<Void, Never>?>(nil)
queue.addOperation {
await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
events.withValue { $0.append("started second operation") }
secondOperationContinuation.setValue(continuation)
}
} onCancel: {
events.withValue { $0.append("second operation cancelled") }
}
}
let thirdOperationContinuation = LockIsolated<UnsafeContinuation<Void, Never>?>(nil)
queue.addOperation {
await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
events.withValue { $0.append("started third operation") }
thirdOperationContinuation.setValue(continuation)
}
} onCancel: {
events.withValue { $0.append("third operation cancelled") }
}
}
for _ in 1...100 { await Task.yield() }
firstOperationContinuation.value?.resume()
for _ in 1...100 { await Task.yield() }
queue.cancel()
for _ in 1...100 { await Task.yield() }
secondOperationContinuation.value?.resume()
for _ in 1...100 { await Task.yield() }
thirdOperationContinuation.value?.resume()
for _ in 1...100 { await Task.yield() }
let fourthOperationContinuation = LockIsolated<UnsafeContinuation<Void, Never>?>(nil)
queue.addOperation {
await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
events.withValue { $0.append("started fourth operation") }
fourthOperationContinuation.setValue(continuation)
}
} onCancel: {
events.withValue { $0.append("fourth operation cancelled") }
}
}
for _ in 1...100 { await Task.yield() }
fourthOperationContinuation.value?.resume()
for _ in 1...100 { await Task.yield() }
expectNoDifference(
events.value.joined(separator: "\n"),
"""
started first operation
started second operation
second operation cancelled
"""
)
#expect(queue.isCancelled == true)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment