Skip to content

Instantly share code, notes, and snippets.

@Goblinlordx
Last active August 16, 2024 06:24
Show Gist options
  • Save Goblinlordx/0e5da61658a2a2d92ae3b5f51f986bb6 to your computer and use it in GitHub Desktop.
Save Goblinlordx/0e5da61658a2a2d92ae3b5f51f986bb6 to your computer and use it in GitHub Desktop.
Transformable AsyncIterable Processor
function* range(max: number = Infinity, start: number = 0) {
for (let i = start; i < max; i++) yield i;
}
async function* asyncify<T>(items: Iterable<T>) {
for (const item of items) yield item;
}
class AlreadyProcessedError extends Error {}
type AsyncIterableTransformer<T, U> = (
items: AsyncIterable<T>,
) => AsyncIterable<U>;
type IterableTransformer<T, U> = (items: Iterable<T>) => Iterable<U>;
class Processor<T> {
private processed = false;
constructor(private _items: AsyncIterable<T>) {}
step<U>(step: AsyncIterableTransformer<T, U>): Processor<U> {
return new Processor(step(this._items));
}
async consume() {
this.collect(consume);
}
async collect<U>(collector: (items: AsyncIterable<T>) => U) {
if (this.processed) throw new AlreadyProcessedError();
try {
await collector(this._items);
} finally {
this.processed = true;
}
}
}
async function consume<T>(items: AsyncIterable<T>) {
for await (const _ of items) {
}
}
function take(size: number) {
return async function* take_step<T>(items: AsyncIterable<T>) {
let count = 0;
if (count >= size) return;
for await (const item of items) {
yield item;
count++;
if (count >= size) return;
}
};
}
function tap<T>(fn: (value: T) => void | Promise<void>) {
return async function* tap_step(items: AsyncIterable<T>) {
for await (const item of items) {
await fn(item);
yield item;
}
};
}
function even() {
return async function* even_step(items: AsyncIterable<number>) {
for await (const item of items) {
if (item % 2 === 0) yield item;
}
};
}
function batch(n: number) {
return async function* batch_step<T>(
items: AsyncIterable<T>,
): AsyncIterable<T[]> {
let batch: T[] = [];
for await (const item of items) {
batch.push(item);
if (batch.length >= n) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch;
}
};
}
const p = new Processor(asyncify(range()))
.step(take(59))
.step(even())
.step(batch(4))
.step(take(10));
void (async () => {
await p.step(tap((x) => console.log(x))).consume();
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment