Created
June 23, 2018 02:10
-
-
Save long1eu/db508dcf2c1a28ec226a939ca5fe28e0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'package:rxdart/src/streams/utils.dart'; | |
typedef Stream<T> RetryWhenStreamFactory<T>(dynamic error, StackTrace s); | |
class RetryWhenStream<T> extends Stream<T> { | |
final StreamFactory<T> streamFactory; | |
final RetryWhenStreamFactory<T> retryWhenFactory; | |
StreamController<T> controller; | |
StreamSubscription<T> subscription; | |
bool _isUsed = false; | |
final List<ErrorAndStacktrace> _errors = <ErrorAndStacktrace>[]; | |
RetryWhenStream(this.streamFactory, this.retryWhenFactory); | |
@override | |
StreamSubscription<T> listen( | |
void onData(T event), { | |
Function onError, | |
void onDone(), | |
bool cancelOnError, | |
}) { | |
if (_isUsed) throw new StateError('Stream has already been listened to.'); | |
_isUsed = true; | |
controller = new StreamController<T>( | |
sync: true, | |
onListen: retry, | |
onPause: ([Future<dynamic> resumeSignal]) => subscription.pause(resumeSignal), | |
onResume: () => subscription.resume(), | |
onCancel: () => subscription.cancel()); | |
return controller.stream.listen( | |
onData, | |
onError: onError, | |
onDone: onDone, | |
cancelOnError: cancelOnError, | |
); | |
} | |
void retry() { | |
subscription = streamFactory().listen(controller.add, onError: (dynamic e, StackTrace s) { | |
subscription.cancel(); | |
_errors.add(new ErrorAndStacktrace(e, s)); | |
retryWhenFactory(e, s).listen( | |
(dynamic event) => retry(), | |
onError: (dynamic e, StackTrace s) { | |
controller.addError(new RetryError(e.toString(), _errors..add(new ErrorAndStacktrace(e, s)))); | |
controller.close(); | |
}, | |
); | |
}, onDone: controller.close, cancelOnError: false); | |
} | |
} | |
class RetryError extends Error { | |
final String message; | |
final List<ErrorAndStacktrace> errors; | |
RetryError(this.message, this.errors); | |
@override | |
String toString() => message; | |
} | |
class ErrorAndStacktrace { | |
final dynamic error; | |
final StackTrace stacktrace; | |
ErrorAndStacktrace(this.error, this.stacktrace); | |
@override | |
String toString() { | |
return 'ErrorAndStacktrace{error: $error, stacktrace: $stacktrace}'; | |
} | |
@override | |
bool operator ==(Object other) => | |
identical(this, other) || | |
other is ErrorAndStacktrace && runtimeType == other.runtimeType && error == other.error && stacktrace == other.stacktrace; | |
@override | |
int get hashCode => error.hashCode ^ stacktrace.hashCode; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment