Skip to content

Instantly share code, notes, and snippets.

@eastlondoner
Created March 27, 2023 11:26
Show Gist options
  • Save eastlondoner/2cd54e7521c499f850c675c1cc534b61 to your computer and use it in GitHub Desktop.
Save eastlondoner/2cd54e7521c499f850c675c1cc534b61 to your computer and use it in GitHub Desktop.
Process Tasks using a Concurrent Queue in javascript
import { Readable } from 'node:stream';
export type ConcurrentQueueTask<T> = {
maxRetries: number;
getPromise: () => Promise<T>;
};
export type CreateConcurrentQueueParams = {
/*
* Max concurrency sets the maximum number of tasks that can be running at the same time.
* default: 100
*/
maxConcurrency?: number;
/*
* If set to true, the queue will not terminate itself when all tasks have been processed. See tests for an example
*/
keepAlive?: boolean;
};
const DEFAULT_MAX_CONCURRENCY = 100;
/*
* Readable isn't generic so I couldn't use Readable<T> here. The produced values are therefore typed as <any> by the compiler.
* To provide typed values, one can use the `results` method.
*/
export class ConcurrentQueue<T> extends Readable {
private inFlight: Map<string, Promise<unknown>>;
private internalId = 0;
private isStarted = false;
private isTerminated = false;
private iteratorCalled = false;
private keepAlive: boolean;
private maxConcurrency: number;
private tasks: Array<ConcurrentQueueTask<T>> = [];
public constructor(params: CreateConcurrentQueueParams) {
super({ objectMode: true });
this.maxConcurrency = params.maxConcurrency ?? DEFAULT_MAX_CONCURRENCY;
if (this.maxConcurrency < 1) {
throw new Error('maxConcurrency must be greater than 0');
}
this.keepAlive = params.keepAlive ?? false;
this.inFlight = new Map<string, Promise<void>>();
}
/*
* Use this for a promise-based interface to the queue. This will resolve when the queue is drained
*/
public async waitForTermination() {
try {
for await (const _ of this) {
// we don't care about the values, we just want to wait for the stream to end
}
} catch (error) {
const e = new Error('ConcurrentQueue failed');
e.cause = error;
throw e;
}
}
/*
* Add a task to the queue
*/
public enqueue(task: ConcurrentQueueTask<T>) {
if (this.isTerminated) {
throw new Error('Cannot enqueue a task after the queue has been terminated');
}
this.tasks.push(task);
this.work();
}
/*
* Get the size of the queue
*/
public get queueSize() {
return this.tasks.length + this.inFlight.size;
}
/*
* Returns true when no task is either in flight or queued
*/
public get isDrained() {
return this.queueSize === 0;
}
/*
* Allow the queue to terminate as soon as it's drained
*/
public terminateWhenDrained() {
this.keepAlive = false;
this.work();
}
/*
* ReadableStream entry: this allows us to use the queue as a stream and use `for await (const ... of)` on the class
*/
public _read() {
this.iteratorCalled = true;
this.start();
}
/*
* converts the stream into an async generator. The only difference is that the compiler can know the type of the yielded values
*/
public async *results() {
for await (const value of this) {
yield value as T;
}
}
private fail(error: Error) {
if (this.iteratorCalled) {
this.destroy(error);
}
}
private getId() {
return `${this.internalId++}`;
}
private work() {
if (this.isDrained && !this.keepAlive) {
this.terminateWithSuccess();
return;
}
if (!this.isStarted) {
// This
return;
}
const emptySlots = this.maxConcurrency - this.inFlight.size;
for (const task of this.tasks.splice(0, emptySlots)) {
const id = this.getId();
this.inFlight.set(
id,
task
.getPromise()
.then((result) => {
const isPipeFillingUp = this.push(result);
if (isPipeFillingUp) {
this.pauseWork();
}
})
.catch((error) => {
if (task.maxRetries > 0) {
this.enqueue({
...task,
maxRetries: task.maxRetries - 1,
});
} else {
this.fail(error instanceof Error ? error : new Error(error as string));
}
})
.finally(() => {
this.inFlight.delete(id);
this.work();
}),
);
}
}
// Start work if not yet started or paused. It's safe to call this multiple times
// Calling this does not guarantee that the whole queue will be processed, that is why it is private.
// To process the whole queue use `waitForTermination` or consume the stream as a readable stream
private start() {
if (this.isStarted) {
return;
}
this.isStarted = true;
this.work();
}
// bootstrap the first work cycle, do not call this more than once
private pauseWork() {
if (!this.isStarted) {
return;
}
this.isStarted = false;
}
private terminateWithSuccess() {
this.isTerminated = true;
setImmediate(() => {
this.push(null);
});
}
// used for debugging
public info() {
return {
inFlight: this.inFlight.size,
queued: this.tasks.length,
maxConcurrency: this.maxConcurrency,
isStarted: this.isStarted,
isTerminated: this.isTerminated,
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment