Last active
November 15, 2021 17:14
-
-
Save kikuchy/d96a45d0b22b0baa437ccca69ad6679b to your computer and use it in GitHub Desktop.
Just a little study for making the function like `combineLatest2` of RxDart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://gist.github.com/kikuchy/d96a45d0b22b0baa437ccca69ad6679b | |
import 'dart:async'; | |
typedef Combinator<T1, T2, R> = R Function(T1, T2); | |
Stream<R> combineLatestN<R>( | |
List<Stream<dynamic>> streams, R Function(List<dynamic>) combinator) { | |
final combined = StreamController<R>(); | |
final subscriptions = <StreamSubscription>{}; | |
final completed = <Object>{}; | |
final values = List<dynamic>.filled(streams.length, null); | |
final received = <Object>{}; | |
void Function(dynamic) onDataFor(Stream<dynamic> key) { | |
final index = streams.indexOf(key); | |
return (v) { | |
values[index] = v; | |
received.add(key); | |
if (received.length == streams.length) { | |
combined.add(combinator(values)); | |
} | |
}; | |
} | |
void Function() onDoneFor(Object key) { | |
return () { | |
completed.add(key); | |
if (completed.length == streams.length) { | |
combined.close(); | |
} | |
}; | |
} | |
subscriptions.addAll(streams.map((s) => s.listen( | |
onDataFor(s), | |
onError: combined.addError, | |
onDone: onDoneFor(s), | |
))); | |
combined.onCancel = | |
() async => Future.wait(subscriptions.map((s) => s.cancel())); | |
return combined.stream; | |
} | |
Stream<R> combineLatest2<T1, T2, R>( | |
Stream<T1> s1, Stream<T2> s2, Combinator<T1, T2, R> combinator) { | |
return combineLatestN<R>([s1, s2], (values) { | |
return combinator(values.first as T1, values.last as T2); | |
}); | |
} | |
void main() async { | |
final Stream<int> two = Stream.value(2).asBroadcastStream(); | |
final Stream<int> a = Stream.value(1); | |
combineLatest2<int, int, String>(a, two, (x, y) => "${x + y}").listen( | |
print, | |
onError: (e, s) { | |
print(e); | |
print(s); | |
}, | |
onDone: () => print("a Done!"), | |
); | |
final Stream<int> b = Stream.fromIterable([10, 20, 30, 40]); | |
combineLatest2<int, int, String>(b, two, (x, y) => "${x + y}").listen( | |
print, | |
onError: (e, s) { | |
print(e); | |
print(s); | |
}, | |
onDone: () => print("b Done!"), | |
); | |
final Stream<int> c = Stream.error(Error(), StackTrace.current); | |
combineLatest2<int, int, String>(c, two, (x, y) => "${x + y}").listen( | |
print, | |
onError: (e, s) { | |
print(e); | |
print(s); | |
}, | |
onDone: () => print("c Done!"), | |
); | |
final Stream<int?> d = Stream.value(null); | |
combineLatest2<int?, int, String>(d, two, (x, y) => "${x ?? 0 + y}").listen( | |
print, | |
onError: (e, s) { | |
print(e); | |
print(s); | |
}, | |
onDone: () => print("d Done!"), | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment