Last active
March 30, 2020 17:08
-
-
Save fabianfett/19b2398254df7c912bec871a6837dce2 to your computer and use it in GitHub Desktop.
Run a number of tasks in a SwiftNIO EventLoop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import NIO | |
extension EventLoop { | |
public func process<Input, Output>( | |
input: [Input], | |
parallel: Int, | |
process: @escaping (Input, EventLoop) -> EventLoopFuture<Output>) | |
-> EventLoopFuture<[Result<Output, Error>]> | |
{ | |
guard self.inEventLoop else { | |
return self.flatSubmit { () in | |
self.process(input: input, parallel: parallel, process: process) | |
} | |
} | |
var slice = input[...] | |
var result = [Result<Output, Error>?](repeating: nil, count: input.count) | |
var running = 0 | |
var promise = self.makePromise(of: [Result<Output, Error>].self) | |
func scheduleNextExecution() { | |
guard running < parallel else { | |
// no more space to schedule tasks | |
return | |
} | |
let index = slice.startIndex | |
guard let element = slice.popFirst() else { | |
guard running == 0 else { | |
// there are still tasks running | |
return | |
} | |
return promise.succeed(result.map { $0! }) | |
} | |
running += 1 | |
_ = process(element, self) | |
.hop(to: self) | |
.always { (res: Result<Output, Error>) in | |
running -= 1 | |
result[index] = res | |
scheduleNextExecution() | |
} | |
// try to schedule something new immediately | |
scheduleNextExecution() | |
} | |
scheduleNextExecution() | |
return promise.futureResult | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment