Created
June 13, 2022 09:08
-
-
Save jsdw/8c03faf83fb9fbddf30a2d754ed7b807 to your computer and use it in GitHub Desktop.
A JS/TS Promise Queue implementation (run N tasks concurrently)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A PromiseQueue, enforcing that no more than `maxTasks` number of tasks | |
* are running at a given time. | |
*/ | |
class PromiseQueue<T> { | |
// How many tasks are allowed to run concurrently? | |
#maxTasks: number; | |
// How many tasks are currently running concurrently? | |
#runningTasks: number = 0; | |
// The queued tasks waiting to run | |
#tasks: LinkedList<() => void>; | |
constructor(maxTasks: number) { | |
this.#maxTasks = maxTasks; | |
this.#tasks = new LinkedList(); | |
} | |
// Try to run the next task in the queue. | |
private tryRunNextTask(): void { | |
if (this.#runningTasks >= this.#maxTasks) { | |
return; | |
} | |
let nextTask = this.#tasks.popFront(); | |
if (nextTask) { | |
nextTask(); | |
this.#runningTasks += 1; | |
} | |
} | |
// Take a task and package it up to run, triggering | |
// the next task when it completes (or errors), and returning the | |
// result in the returned promise. | |
private submitTaskToRun(task: () => Promise<T>): Promise<T> { | |
return new Promise((resolve, reject) => { | |
let onFinish = () => { | |
this.#runningTasks -= 1; | |
this.tryRunNextTask(); | |
} | |
let taskToRun = () => { | |
task() | |
.then((item) => { | |
resolve(item); | |
onFinish(); | |
}) | |
.catch((err) => { | |
reject(err); | |
onFinish(); | |
}) | |
}; | |
this.#tasks.pushBack(taskToRun); | |
this.tryRunNextTask(); | |
}) | |
} | |
/** | |
* Push a new task onto the queue. It will run when there are fewer | |
* than `maxTasks` running. | |
*/ | |
run(task: () => Promise<T>): Promise<T> { | |
return this.submitTaskToRun(task) | |
} | |
} | |
/** | |
* A quick LinkedList queue implementation; we can add items to the back | |
* or remove them from the front. | |
*/ | |
class LinkedList<T> { | |
#first: LinkedListItem<T> | null = null; | |
#last: LinkedListItem<T> | null = null; | |
private init(item: T): void { | |
this.#first = this.#last = { item, next: null }; | |
} | |
pushBack(item: T) { | |
if(!this.#first) return this.init(item); | |
const entry = { item, next: null }; | |
this.#last!.next = entry; | |
this.#last = entry; | |
} | |
popFront(): T | null { | |
if(!this.#first) return null; | |
const entry = this.#first; | |
this.#first = this.#first.next; | |
return entry.item; | |
} | |
clear(): void { | |
this.#first = this.#last = null; | |
} | |
empty(): boolean { | |
return this.#first === null; | |
} | |
} | |
type LinkedListItem<T> = { item: T, next: null | LinkedListItem<T> } | |
// Example usage | |
async function examples() { | |
// A helper which returns a promise that resolves after the ms passes. | |
function resolveAfter(ms: number): Promise<void> { | |
return new Promise(resolve => { | |
setInterval(resolve , ms) | |
}) | |
} | |
// the first 4 will resolve at a similar time, the next 4 1s later, then 'i' 1s later again. | |
let q = new PromiseQueue(4); | |
q.run(() => resolveAfter(1000).then(() => console.log("a"))); | |
q.run(() => resolveAfter(1000).then(() => console.log("b"))); | |
q.run(() => resolveAfter(1000).then(() => console.log("c"))); | |
q.run(() => resolveAfter(1000).then(() => console.log("d"))); | |
q.run(() => resolveAfter(1000).then(() => console.log("e"))); | |
q.run(() => resolveAfter(1000).then(() => console.log("f"))); | |
q.run(() => resolveAfter(1000).then(() => console.log("g"))); | |
q.run(() => resolveAfter(1000).then(() => console.log("h"))); | |
q.run(() => resolveAfter(1000).then(() => console.log("i"))); | |
// (we could add an 'after' method to wrap the last thing in the queue and return when it resolves, | |
// but for the sake of laziness we just wait an appropriate time for the above to all finish up) | |
await resolveAfter(3000); | |
// promises are returned which resolve when complete, so this is basically the same as the above: | |
let q2 = new PromiseQueue(4); | |
q2.run(() => resolveAfter(1000)).then(() => console.log("a2")); | |
q2.run(() => resolveAfter(1000)).then(() => console.log("b2")); | |
q2.run(() => resolveAfter(1000)).then(() => console.log("c2")); | |
q2.run(() => resolveAfter(1000)).then(() => console.log("d2")); | |
q2.run(() => resolveAfter(1000)).then(() => console.log("e2")); | |
} | |
examples(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Paste into https://www.typescriptlang.org/play to see it run.