Last active
August 21, 2024 06:10
-
-
Save okikio/7843b20b6f00a2236297a4e2469b3183 to your computer and use it in GitHub Desktop.
Splits a single iterator into 2 separate async iterators by some predicate or by whether an error occurs or not.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* An async generator function that pulls values from the source async iterator. | |
* | |
* @template T The type of values being yielded from the source iterator. | |
* | |
* This generator yields values it receives from an external source (via `yield`). | |
* It also receives an initial value when invoked, which is yielded first. | |
* | |
* ### How it Works | |
* | |
* This generator is pulling values rather than pushing them. This means that | |
* instead of actively generating values on its own, it waits for values to be | |
* sent into it through calls to `next()` on the generator object. | |
* | |
* - **Pulling from Yields**: The generator's `yield` expression waits for an external value. | |
* That value is pulled from whatever is passed to the generator's `next()` method. | |
* In this case, the external source (the `splitAsyncIterator` function) sends values | |
* from the original source iterator to this generator. | |
* | |
* ### Memory and Performance | |
* | |
* Pulling values from `yield` helps separate the source iterator's values into | |
* two independent async iterators without duplicating the source data. Instead of | |
* storing results in arrays or queues, this approach keeps memory usage low by only | |
* passing each value once to the correct iterator. | |
* | |
* - **Priming**: We need to call `next()` on the generator once before we start sending | |
* values. This "primes" the generator and ensures it’s ready to receive values. Otherwise, | |
* the generator would not be prepared to handle incoming values. | |
* | |
* - **Completion Notification**: Both the valid and error iterators need to be notified | |
* when the source iterator is done, so they know to terminate properly. Without this, | |
* they could hang indefinitely waiting for more values. | |
* | |
* @param {IteratorResult<T>} initialResult The first result from the source iterator. This value is immediately yielded. | |
* @returns {AsyncGenerator<T, T | undefined, IteratorResult<T>>} The async generator yielding values pulled from the source. | |
* | |
* @example | |
* async function* sourceIterator() { | |
* yield 1; | |
* yield 2; | |
* return 3; | |
* } | |
* | |
* const result = asyncGeneratorWithPullYield({ value: 1, done: false }); | |
* console.log(await result.next()); // { value: 1, done: false } | |
* console.log(await result.next({ value: 2, done: false })); // { value: 2, done: false } | |
* console.log(await result.next({ value: 3, done: true })); // { value: 3, done: true } | |
*/ | |
async function* asyncGeneratorWithPullYield<T>(initialResult: IteratorResult<T>) { | |
// If the initial result is done, return immediately | |
if (initialResult.done) return initialResult.value; | |
// Yield the initial value | |
yield initialResult.value; | |
while (true) { | |
const result = await (yield) as IteratorResult<T>; | |
// If the source is done, exit the generator | |
if (result.done) return result.value; | |
// Yield the actual value from the source | |
yield result.value; | |
} | |
} | |
/** | |
* Splits a source iterator or iterable into two separate async iterators: one for valid values and one for errors. | |
* | |
* @template T The type of values being iterated over by the source. | |
* @template E The type of errors being thrown by the source. | |
* @param {AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>} source The original source iterator or iterable to be split. | |
* @returns {Promise<[AsyncGenerator<T>, AsyncGenerator<E>]>} | |
* An array containing two async iterators: one for valid values and one for errors. | |
* | |
* ### How it Works | |
* | |
* This function processes the source iterator or iterable and directs values into two separate async iterators: | |
* - One for valid values | |
* - One for errors | |
* | |
* Each time the source iterator yields a value, we pass it to the appropriate iterator. If an | |
* error is thrown during iteration, that error is passed to the error iterator. | |
* | |
* ### Priming the Iterators | |
* | |
* Both the valid and error iterators are created using `asyncGeneratorWithPullYield`, which | |
* requires them to be "primed" by calling `next()` once before any values are sent to them. This is because | |
* the generator function starts paused and expects a value to be sent in the next step. Priming ensures that | |
* the generator is ready to handle values properly when the source iterator starts producing them. | |
* | |
* ### Performance Considerations | |
* | |
* This method avoids duplicating the source iterator's values, which keeps memory usage low. However, since | |
* each value from the source iterator must be processed sequentially, there may be a performance trade-off in | |
* scenarios where parallelism is desired. | |
* | |
* ### Completion Handling | |
* | |
* When the source iterator finishes (i.e., returns `done: true`), both sub-async-iterators are notified to | |
* terminate their processing. Without this notification, they could continue waiting for more values and never | |
* complete, causing the application to hang. | |
* | |
* @example | |
* async function* sourceIterator() { | |
* yield 1; | |
* yield 2; | |
* yield new Error("This is an error"); | |
* throw new Error("Something went wrong"); | |
* } | |
* | |
* const [resolved, errored] = await splitAsyncIterator(sourceIterator()); | |
* | |
* (async () => { | |
* for await (const value of resolved) { | |
* console.log("Resolved:", value); // Logs: 1, 2, Error: This is an error | |
* } | |
* })(); | |
* | |
* (async () => { | |
* for await (const error of errored) { | |
* console.log("Errored:", error); // Logs: Error: Something went wrong | |
* } | |
* })(); | |
*/ | |
async function splitAsyncIterator<T, E = unknown>( | |
source: AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T> | |
): Promise<[AsyncGenerator<T>, AsyncGenerator<E>]> { | |
const sourceIterator = | |
(source as AsyncIterable<T>)[Symbol.asyncIterator]?.() ?? | |
(source as Iterable<T>)[Symbol.iterator]?.() ?? | |
source as AsyncIterator<T>; | |
let initialResult: IteratorResult<T>; | |
let hasError: E; | |
try { | |
// Run the source iterator once to get the initial value | |
initialResult = await sourceIterator.next(); | |
} catch (error) { | |
// In case of an exception, treat it as an error result | |
initialResult = { value: error as E, done: false }; | |
hasError = error as E; | |
} | |
let done = initialResult.done; | |
// Initialize the sub-async-iterators with the initial result | |
const validIterator = asyncGeneratorWithPullYield<T>(!done && !hasError ? initialResult : { value: undefined, done: true }); | |
const errorIterator = asyncGeneratorWithPullYield<E>(!done && hasError ? initialResult : { value: undefined, done: true }); | |
// Prime both iterators by calling next() once | |
await validIterator.next(); | |
await errorIterator.next(); | |
while (!done) { | |
try { | |
// Get the next value from the source iterator | |
const result = await sourceIterator.next(); | |
done = result.done; | |
// Pass the result to the valid iterator | |
await validIterator.next(result); | |
} catch (error) { | |
// Pass the error to the error iterator | |
await errorIterator.next({ value: error, done: false }); | |
} finally { | |
if (done) { | |
// Notify both iterators that the source is done | |
await validIterator.next({ value: undefined, done: true }); | |
await errorIterator.next({ value: undefined, done: true }); | |
break; | |
} | |
} | |
} | |
return [validIterator, errorIterator] as const; | |
} | |
/** | |
* Splits a source iterator or iterable into two separate async iterators based on a predicate function. | |
* | |
* @template T The type of values being iterated over by the source. | |
* @template F The type of values being directed to the second iterator. Defaults to type `T`. | |
* | |
* @param {AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>} source The original source iterator or iterable to be split. | |
* @param {(value: T) => Promise<boolean>} predicate The predicate function that determines | |
* which values go to the first iterator. If the predicate resolves to `true`, the value | |
* goes to the first iterator; otherwise, it goes to the second iterator. | |
* | |
* @returns {Promise<[AsyncGenerator<T>, AsyncGenerator<F>]>} | |
* An array containing two async iterators: one for values that satisfy the predicate and | |
* one for values that do not satisfy the predicate. | |
* | |
* ### How it Works | |
* | |
* This function separates values from the source iterator into two different async iterators: | |
* - The first iterator receives values that satisfy the predicate (the predicate returns `true`). | |
* - The second iterator receives values that do not satisfy the predicate (`false`). | |
* | |
* ### Priming the Iterators | |
* | |
* Both iterators are primed by calling `next()` once before processing starts. Priming ensures | |
* that the generators are ready to receive values as soon as the source iterator yields them. | |
* Without priming, the generators would not be in the correct state to handle incoming values. | |
* | |
* ### Error Handling | |
* | |
* If an error occurs during the iteration process, the error is thrown in both sub-async-iterators. | |
* This ensures that both iterators are made aware of the error and can terminate properly. | |
* | |
* ### Why Notify Both Iterators of Completion | |
* | |
* Once the source iterator is exhausted (`done: true`), both sub-async-iterators are notified | |
* that the process is complete. This ensures that they do not remain in a waiting state forever. | |
* | |
* @example | |
* async function* sourceIterator() { | |
* yield 1; | |
* yield 2; | |
* yield 3; | |
* yield 4; | |
* } | |
* | |
* const isEven = async (value: number) => value % 2 === 0; | |
* | |
* const [evens, odds] = await splitAsyncIteratorBy<number>(sourceIterator(), isEven); | |
* | |
* (async () => { | |
* for await (const value of evens) { | |
* console.log("Even:", value); // Logs: 2, 4 | |
* } | |
* })(); | |
* | |
* (async () => { | |
* for await (const value of odds) { | |
* console.log("Odd:", value); // Logs: 1, 3 | |
* } | |
* })(); | |
* | |
* @example | |
* async function* errorProneIterator() { | |
* yield 1; | |
* yield 2; | |
* throw new Error("An error occurred"); | |
* } | |
* | |
* const handleErrors = async (value: number) => { | |
* try { | |
* return value > 0; // Send positive values to the first iterator | |
* } catch (error) { | |
* return false; // Send errors to the second iterator | |
* } | |
* }; | |
* | |
* const [resolved, errored] = await splitAsyncIteratorBy<number, number | Error>(errorProneIterator(), handleErrors); | |
* | |
* (async () => { | |
* for await (const value of resolved) { | |
* console.log("Resolved:", value); // Logs: 1, 2 | |
* } | |
* })(); | |
* | |
* (async () => { | |
* for await (const error of errored) { | |
* console.log("Errored:", error); // Logs the error message | |
* } | |
* })(); | |
*/ | |
async function splitAsyncIteratorBy<T, F = T>( | |
source: AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>, | |
predicate: (value: T) => Promise<boolean> | |
): Promise<[AsyncGenerator<T>, AsyncGenerator<F>]> { | |
// Extract the iterator from the source | |
const sourceIterator = | |
(source as AsyncIterable<T>)[Symbol.asyncIterator]?.() ?? | |
(source as Iterable<T>)[Symbol.iterator]?.() ?? | |
source as AsyncIterator<T>; | |
// Initialize iterators for true/false values | |
let trueIterator: AsyncGenerator<T, T | undefined, IteratorResult<T>>; | |
let falseIterator: AsyncGenerator<F, F | undefined, IteratorResult<F>>; | |
try { | |
// Get the initial result from the source iterator | |
const initialResult = await sourceIterator.next(); | |
// Determine where to send the initial result based on the predicate | |
const predicateResult = await predicate(initialResult.value); | |
// Set the iterators based on the predicate result | |
trueIterator = asyncGeneratorWithPullYield<T>(predicateResult ? initialResult : { value: undefined, done: true }); | |
falseIterator = asyncGeneratorWithPullYield<F>(!predicateResult ? initialResult : { value: undefined, done: true }); | |
} catch (error) { | |
// Handle errors and ensure both iterators throw | |
trueIterator = asyncGeneratorWithPullYield<T>({ value: undefined, done: true }); | |
falseIterator = asyncGeneratorWithPullYield<F>({ value: undefined, done: true }); | |
await trueIterator.throw(error); | |
await falseIterator.throw(error); | |
return [trueIterator, falseIterator]; | |
} | |
// Prime both iterators | |
await trueIterator.next(); | |
await falseIterator.next(); | |
try { | |
// Iterate through the source and apply the predicate to direct the values | |
for await (const result of sourceIterator) { | |
if (await predicate(result)) { | |
await trueIterator.next(result); | |
} else { | |
await falseIterator.next(result as unknown as F); | |
} | |
} | |
} catch (error) { | |
// If an error occurs, throw the error in both iterators | |
await trueIterator.throw(error); | |
await falseIterator.throw(error); | |
} finally { | |
// Notify both iterators that the source has completed | |
await trueIterator.next({ value: undefined, done: true }); | |
await falseIterator.next({ value: undefined, done: true }); | |
} | |
return [trueIterator, falseIterator]; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment