Last active
July 24, 2024 17:17
-
-
Save drewmccormack/fc3c26f0e3fe630c7b712b90ab8089fa to your computer and use it in GitHub Desktop.
Proof of concept for a serializing queue for async funcs in Swift. It orders the calls, but also ensures each call is executed to completion before the next starts, thus avoiding interleaving races which can arise in multiple calls to an async func.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// A queue that executes async functions in order, and atomically. | |
/// That is, each enqueued func executes fully before the next one is started. | |
struct AsyncSerialQueue { | |
typealias Block = () async throws -> [Int] | |
private struct Item { | |
let block: Block | |
let continuation: CheckedContinuation<[Int], Error> | |
} | |
private var stream: AsyncStream<Item> | |
private var streamContinuation: AsyncStream<Item>.Continuation | |
init() { | |
var cont: AsyncStream<Item>.Continuation? = nil | |
let stream: AsyncStream<Item> = AsyncStream { continuation in | |
cont = continuation | |
} | |
self.stream = stream | |
self.streamContinuation = cont! | |
Task { | |
for await item in stream { | |
do { | |
let returned = try await item.block() | |
item.continuation.resume(returning: returned) | |
} catch { | |
item.continuation.resume(throwing: error) | |
} | |
} | |
} | |
} | |
func enqueue(_ block: @escaping Block) async throws -> [Int] { | |
try await withCheckedThrowingContinuation { continuation in | |
let item = Item(block: block, continuation: continuation) | |
streamContinuation.yield(item) | |
} | |
} | |
static let serializedQueue = AsyncSerialQueue() | |
/// Passing a function to this will execute it in the queue atomically, and in order. | |
static func serialized(_ wrapped: @escaping Block) async throws -> [Int] { | |
try await serializedQueue.enqueue(wrapped) | |
} | |
} | |
// Example: Deliberately introducing shared data to show an interleaving race | |
// when a queue is not used. | |
var shared: [Int] = [] | |
/// Shoulld returrn the numbers 1 to 10 if working properly | |
func numbersToTen() async throws -> [Int] { | |
shared = [] | |
for i in 1...10 { | |
shared.append(i) | |
try? await Task.sleep(for: .seconds(TimeInterval.random(in: 0.01...0.02))) | |
} | |
return shared | |
} | |
Task { | |
print("INTERLEAVING") | |
async let i: [Int] = try await numbersToTen() | |
async let j: [Int] = try await numbersToTen() | |
let a = try await [i, j] | |
print("\(a[0])") | |
print("\(a[1])") | |
print() | |
print("SERIALIZED") | |
async let k: [Int] = try await AsyncSerialQueue.serialized(numbersToTen) | |
async let l: [Int] = try await AsyncSerialQueue.serialized(numbersToTen) | |
let m = try await [k, l] | |
print("\(m[0])") | |
print("\(m[1])") | |
} | |
// Sample Output | |
// | |
// INTERLEAVING | |
// [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 7, 9, 8, 10, 9] | |
// [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 8, 7, 9, 8, 10, 9, 10] | |
// | |
// SERIALIZED | |
// [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | |
// [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | |
// |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment