-
-
Save robertmryan/fec5ea71c26a1098bc4c64c14eab5577 to your computer and use it in GitHub Desktop.
| import Foundation | |
| import PlaygroundSupport | |
| PlaygroundPage.current.needsIndefiniteExecution = true | |
| class Demo { | |
| func getAsyncStream() -> AsyncStream<Int> { | |
| AsyncStream { continuation in | |
| let task = Task { | |
| for i in 1...5 { | |
| try await Task.sleep(for: .seconds(1)) | |
| continuation.yield(i) | |
| } | |
| continuation.finish() | |
| } | |
| continuation.onTermination = { _ in | |
| task.cancel() | |
| } | |
| } | |
| } | |
| init() { print("initialised") } | |
| deinit { print("deinitialised") } | |
| } | |
| do { | |
| let demo = Demo() | |
| for await value in demo.getAsyncStream() { | |
| print(value) | |
| } | |
| } | |
| PlaygroundPage.current.finishExecution() // see https://stackoverflow.com/questions/78556983/async-await-unexpected-error-in-playground |
Thanks @robertmryan for your reply.
Regarding the second point, instead of using a fixed ClosedRange (1...5), what I have in mind is a more realistic scenario.
Assume we have an array of images—say 20 images—each with different sizes and in a random order. Rather than using sleep, each image would be downloaded from the network, and every download would take a different amount of time.
To download these images concurrently, we need to create a separate task for each image download.
I have a concern around finishing the continuation. Initially, I thought of calling finish() when the last element in the array completes. However, this approach is incorrect because earlier elements may take longer than the last one, which would prematurely end the stream and prevent some images from being propagated.
To fix this, I considered maintaining a count variable that increments after each yield, and calling finish() when count == images.count.
However, I believe this approach is unsafe, because multiple tasks could access and increment the count variable simultaneously, leading to race conditions.
To address this, I see two possible options:
Use a shared/static count variable and synchronize access to it.
Convert the surrounding class into an actor, so the count variable is accessed in a thread-safe, serialized manner.
I expect both approaches to work, but I’d like your thoughts on whether this is the best solution or if there’s a cleaner or more idiomatic way to handle this.
To download these images concurrently, we need to create a separate task for each image download.
Yes, we would want a separate task (lowercase “t”) for each. But we would avoid introducing unnecessary unstructured concurrency (a Task {…}), but rather remain within structured concurrency. It keeps life much simpler.
Assume we have an array of images—say 20 images—each with different sizes and in a random order. Rather than using sleep, each image would be downloaded from the network, and every download would take a different amount of time.
The idiomatic solution would be a “task group”:
- It won’t complete until all the child tasks are done. There is no “counter” to keep track of.
- It remains with structured concurrency (meaning that if you cancel the task group, all the child tasks will be canceled for you).
Thus, perhaps a simple solution would look like:
class SimpleImageDownloader {
func images(for urls: [URL]) -> AsyncStream<UIImage> {
AsyncStream { continuation in
let task = Task {
await withTaskGroup(of: UIImage.self) { group in
// create child tasks
for url in urls {
group.addTask { await self.image(for: url) }
}
// await and yield the results (in the order that they complete)
for await image in group {
continuation.yield(image)
}
// finish the stream
continuation.finish()
}
}
continuation.onTermination = { state in
if case .cancelled = state {
task.cancel()
}
}
}
}
func image(for url: URL) async -> UIImage {…}
}
var urls: [URL] = …
let downloader = SimpleImageDownloader()
for await image in downloader.images(for: urls) {
// do something with image
}Now, that having been said, there are two limitations in the above:
- You don’t know which image is associated with which URL (because, as you say, they may complete in any order). We might, for example, use a tuple, instead, that includes the original URL, so we can correlate the image yielded by the sequence with our original sequence of URLs.
- We’re not handling errors. We would generally expect
image(for:)tothrowerrors (e.g., a network request fails) and then the asynchronous sequence would yield aResult<UIImage, any Error>, instead, so the caller knows which images succeeded and which failed (while not stopping just because one of the images failed).
So, perhaps:
class BetterImageDownloader {
func images(for urls: [URL]) -> AsyncStream<(URL, Result<UIImage, any Error>)> {
AsyncStream { continuation in
let task = Task {
await withTaskGroup(of: (URL, Result<UIImage, any Error>).self) { group in
for url in urls {
group.addTask {
do {
let image = try await self.image(for: url)
return (url, .success(image))
} catch {
return (url, .failure(error))
}
}
}
for await urlAndResult in group {
continuation.yield(urlAndResult)
}
continuation.finish()
}
}
continuation.onTermination = { state in
if case .cancelled = state {
task.cancel()
}
}
}
}
func image(for url: URL) async throws -> UIImage {…}
}
var urls: [URL] = …
let downloader = BetterImageDownloader()
for await (url, result) in downloader.images(for: urls) {
switch result {
case .success(let image):
// do something with image
case .failure(let error):
// do something with error
}
}This enjoys automatic tracking of when the collection of asynchronous tasks is complete, while remaining in a pattern that supports cancellation of the whole sequence.
If you want to go nuts, I might suggest a few more refinements: Notably, the above is fine if you have a dozen images. But what if you have 1,000? You really don’t want to create an unbridled number of tasks, but rather constrain it to some reasonable number that both enjoys concurrency, but limits it to some reasonable number. People often use something like 6 or 10 concurrent requests, because beyond that, you don’t see discernible performance benefits, but do suffer memory issues or overloading the system with too many tasks.
So, you might end up with something like the following, where we will await group.next() above a certain number. This will constrain the degree of concurrency to no more than n requests at a time:
class ImageDownloader {
func images(for urls: [URL], maxConcurrent: Int = 6) -> AsyncStream<(URL, Result<UIImage, any Error>)> {
precondition(maxConcurrent > 0)
return AsyncStream { continuation in
let task = Task {
await withTaskGroup(of: (URL, Result<UIImage, any Error>).self) { group in
for (index, url) in urls.enumerated() {
if index >= maxConcurrent, let urlAndResult = await group.next() {
continuation.yield(urlAndResult)
}
group.addTaskUnlessCancelled {
do {
let image = try await self.image(for: url)
return (url, .success(image))
} catch {
return (url, .failure(error))
}
}
}
for await urlAndResult in group {
continuation.yield(urlAndResult)
}
continuation.finish()
}
}
continuation.onTermination = { state in
if case .cancelled = state {
task.cancel()
}
}
}
}
func image(for url: URL) async throws -> UIImage {
let (data, response) = try await URLSession.shared.data(from: url)
if let httpResponse = response as? HTTPURLResponse, !((200..<300) ~= httpResponse.statusCode) {
throw DownloaderError.httpError(httpResponse.statusCode)
}
guard let image = UIImage(data: data) else {
throw DownloaderError.notImage
}
return image
}
enum DownloaderError: LocalizedError {
case httpError(Int)
case notImage
var errorDescription: String {
return switch self {
case .httpError: NSLocalizedString("Web server error", comment: "Downloader error")
case .notImage: NSLocalizedString("Not image", comment: "Downloader error")
}
}
}
}There are additional refinements one might add, but this strikes me as a good starting point. But the key is:
- Remain within structured concurrency so that you can gracefully cancel all pending child tasks if the overall sequence is cancelled.
- Use task group to simplify the process of knowing when a series of child tasks finish.
FWIW, I have not done exhaustive testing of the above, so I apologize for any errors herein. But hopefully it illustrates the idea.
Two observations:
finishExecutionto allow it to clean up properly.AsyncStreamin this example, you always want to make sure thatAsyncStreamdetects cancellation and cancels the task doing the work should the stream, itself, ever be cancelled. To that end, it’s much easier to do this cleanup if you have oneTask, rather than five. So, to that end, I’m moved theforloop inside theTaskand have added anonTerminationhandler for the continuation.