Skip to content

Instantly share code, notes, and snippets.

@arekmaz
Last active July 1, 2025 23:29
Show Gist options
  • Save arekmaz/ae0bb76898c55086818c6926823cc241 to your computer and use it in GitHub Desktop.
Save arekmaz/ae0bb76898c55086818c6926823cc241 to your computer and use it in GitHub Desktop.
effect fair product of streams
// the same as carthesian product, but in a breadth first/fair way
const takeIndex = <A, E, R>(stream: Stream.Stream<A, E, R>, index: number) => {
return stream.pipe(Stream.drop(index), Stream.take(1));
};
// 1,2,3+a,b -> 1a,2b,3a,1b,2a,3b
const fairCarthesianProduct = <A, B, EA, EB, RA, RB>(
a: Stream.Stream<A, EA, RA>,
b: Stream.Stream<B, EB, RB>
) =>
Stream.suspend(() => {
let countA = 0;
let countB = 0;
let aCounted = false;
let bCounted = false;
const aWithIndex = Stream.zipWithIndex(a);
const bWithIndex = Stream.zipWithIndex(b);
const aWithCount = a.pipe(
Stream.map((a) => {
countA += 1;
return a;
}),
Stream.onEnd(
Effect.sync(() => {
aCounted = true;
})
)
);
const bWithCount = b.pipe(
Stream.map((b) => {
countB += 1;
return b;
}),
Stream.onEnd(
Effect.sync(() => {
bCounted = true;
})
)
);
return Stream.zip(aWithCount, bWithCount).pipe(
Stream.concat(
Stream.suspend(() => {
if (aCounted && bCounted) {
return Stream.empty;
}
if (aCounted) {
return Stream.zip(Stream.forever(a), Stream.drop(b, countA));
}
return Stream.zip(Stream.drop(a, countB), Stream.forever(b));
})
),
Stream.concat(
Stream.suspend(() =>
bWithIndex.pipe(
Stream.drop(1),
Stream.flatMap(([, iB]) =>
aWithIndex.pipe(
Stream.drop(iB === 1 && countB > countA ? countB - countA : 0),
Stream.flatMap(([elA, iA]) => {
const multiplier = countA === countB ? 1 : countA;
const bI = (iB * multiplier + iA) % countB;
return Stream.zip(
Stream.succeed(elA),
takeIndex(Stream.forever(b), bI)
);
})
)
)
)
)
)
);
});
const test = Effect.gen(function* () {
const choices = Stream.make(1, 2, 3);
const c2 = Stream.make("test", "something", "asdf");
// fairly explores both streams
return yield* fairCarthesianProduct(c2, choices).pipe(
Stream.runForEach(Console.log)
// output:
// [ 'test', 1 ]
// [ 'something', 2 ]
// [ 'asdf', 3 ]
// [ 'test', 2 ]
// [ 'something', 3 ]
// [ 'asdf', 1 ]
// [ 'test', 3 ]
// [ 'something', 1 ]
// [ 'asdf', 2 ]
);
// as opposed to normal carthesian product
// return yield* Stream.cross(c2, choices).pipe(
// Stream.take(100),
// Stream.runForEach(Console.log)
// // outputs;
// // [ 'test', 1 ]
// // [ 'test', 2 ]
// // [ 'test', 3 ]
// // [ 'something', 1 ]
// // [ 'something', 2 ]
// // [ 'something', 3 ]
// // [ 'asdf', 1 ]
// // [ 'asdf', 2 ]
// // [ 'asdf', 3 ]
// );
});
// array version
const breadthFirstCartesian = <A, B>(a: A[], b: B[]): (readonly [A, B])[] =>
b.flatMap((_, i) =>
a.map((elA, j) => {
const multiplier = a.length === b.length ? 1 : a.length;
const bI = (i * multiplier + j) % b.length;
return [elA, b[bI]] as const;
})
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment