Created
March 4, 2023 11:01
-
-
Save LiamKarlMitchell/b1c724896978683c2a4b439eaaf279d6 to your computer and use it in GitHub Desktop.
Flutter Bloc Delayed Event Emit - Restartable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This example will use Restartable event transformer and cancel/ignore previous events that are emitted only accepting the latest event. | |
// Can kind of see this in console and added a random color on build of text. | |
// This code is distributed under the MIT License. | |
// Copyright (c) 2018 Felix Angelov. | |
// You can find the original at https://github.com/felangel/bloc. | |
import 'package:flutter/material.dart'; | |
import 'package:flutter_bloc/flutter_bloc.dart'; | |
// import 'package:bloc_concurrency/bloc_concurrency.dart'; (Unsupported import on DartPad) so I'll just copy it below. | |
import 'dart:math'; | |
// Copied code for restartable EventTransformer as can't import bloc_concurrency on DartPad. | |
// import 'package:stream_transform/stream_transform.dart'; (Unsupported import on DartPad) | |
// Copied switchMap extension. | |
// https://github.com/dart-lang/stream_transform/blob/master/lib/src/switch.dart | |
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file | |
// for details. All rights reserved. Use of this source code is governed by a | |
// BSD-style license that can be found in the LICENSE file. | |
import 'dart:async'; | |
// Copied asyncExpanded. | |
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file | |
// for details. All rights reserved. Use of this source code is governed by a | |
// BSD-style license that can be found in the LICENSE file. | |
//import 'dart:async'; | |
// import 'switch.dart'; | |
/// Alternatives to [asyncExpand]. | |
/// | |
/// The built in [asyncExpand] will not overlap the inner streams and every | |
/// event will be sent to the callback individually. | |
/// | |
/// - [concurrentAsyncExpand] allow overlap and merges inner streams without | |
/// ordering guarantees. | |
extension AsyncExpand<T> on Stream<T> { | |
/// Like [asyncExpand] but the [convert] callback may be called for an element | |
/// before the [Stream] emitted by the previous element has closed. | |
/// | |
/// Events on the result stream will be emitted in the order they are emitted | |
/// by the sub streams, which may not match the order of this stream. | |
/// | |
/// Errors from [convert], the source stream, or any of the sub streams are | |
/// forwarded to the result stream. | |
/// | |
/// The result stream will not close until the source stream closes and all | |
/// sub streams have closed. | |
/// | |
/// If the source stream is a broadcast stream, the result will be as well, | |
/// regardless of the types of streams created by [convert]. In this case, | |
/// some care should be taken: | |
/// - If [convert] returns a single subscription stream it may be listened to | |
/// and never canceled. | |
/// - For any period of time where there are no listeners on the result | |
/// stream, any sub streams from previously emitted events will be ignored, | |
/// regardless of whether they emit further events after a listener is added | |
/// back. | |
/// | |
/// See also: | |
/// - [switchMap], which cancels subscriptions to the previous sub stream | |
/// instead of concurrently emitting events from all sub streams. | |
Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) { | |
final controller = isBroadcast | |
? StreamController<S>.broadcast(sync: true) | |
: StreamController<S>(sync: true); | |
controller.onListen = () { | |
final subscriptions = <StreamSubscription<dynamic>>[]; | |
final outerSubscription = map(convert).listen((inner) { | |
if (isBroadcast && !inner.isBroadcast) { | |
inner = inner.asBroadcastStream(); | |
} | |
final subscription = | |
inner.listen(controller.add, onError: controller.addError); | |
subscription.onDone(() { | |
subscriptions.remove(subscription); | |
if (subscriptions.isEmpty) controller.close(); | |
}); | |
subscriptions.add(subscription); | |
}, onError: controller.addError); | |
outerSubscription.onDone(() { | |
subscriptions.remove(outerSubscription); | |
if (subscriptions.isEmpty) controller.close(); | |
}); | |
subscriptions.add(outerSubscription); | |
if (!isBroadcast) { | |
controller | |
..onPause = () { | |
for (final subscription in subscriptions) { | |
subscription.pause(); | |
} | |
} | |
..onResume = () { | |
for (final subscription in subscriptions) { | |
subscription.resume(); | |
} | |
}; | |
} | |
controller.onCancel = () { | |
if (subscriptions.isEmpty) return null; | |
var cancels = [for (var s in subscriptions) s.cancel()] | |
// Handle opt-out nulls | |
..removeWhere((Object? f) => f == null); | |
return Future.wait(cancels).then((_) => null); | |
}; | |
}; | |
return controller.stream; | |
} | |
} | |
// End of AsyncExpanded | |
/// A utility to take events from the most recent sub stream returned by a | |
/// callback. | |
extension Switch<T> on Stream<T> { | |
/// Maps events to a Stream and emits values from the most recently created | |
/// Stream. | |
/// | |
/// When the source emits a value it will be converted to a [Stream] using | |
/// [convert] and the output will switch to emitting events from that result. | |
/// Like [asyncExpand] but the [Stream] emitted by a previous element | |
/// will be ignored as soon as the source stream emits a new event. | |
/// | |
/// This means that the source stream is not paused until a sub stream | |
/// returned from the [convert] callback is done. Instead, the subscription | |
/// to the sub stream is canceled as soon as the source stream emits a new event. | |
/// | |
/// Errors from [convert], the source stream, or any of the sub streams are | |
/// forwarded to the result stream. | |
/// | |
/// The result stream will not close until the source stream closes and | |
/// the current sub stream have closed. | |
/// | |
/// If the source stream is a broadcast stream, the result will be as well, | |
/// regardless of the types of streams created by [convert]. In this case, | |
/// some care should be taken: | |
/// | |
/// * If [convert] returns a single subscription stream it may be listened to | |
/// and never canceled. | |
/// | |
/// See also: | |
/// - [concurrentAsyncExpand], which emits events from all sub streams | |
/// concurrently instead of cancelling subscriptions to previous subs streams. | |
Stream<S> switchMap<S>(Stream<S> Function(T) convert) { | |
return map(convert).switchLatest(); | |
} | |
} | |
/// A utility to take events from the most recent sub stream. | |
extension SwitchLatest<T> on Stream<Stream<T>> { | |
/// Emits values from the most recently emitted Stream. | |
/// | |
/// When the source emits a stream, the output will switch to emitting events | |
/// from that stream. | |
/// | |
/// Whether the source stream is a single-subscription stream or a | |
/// broadcast stream, the result stream will be the same kind of stream, | |
/// regardless of the types of streams emitted. | |
Stream<T> switchLatest() { | |
var controller = isBroadcast | |
? StreamController<T>.broadcast(sync: true) | |
: StreamController<T>(sync: true); | |
controller.onListen = () { | |
StreamSubscription<T>? innerSubscription; | |
var outerStreamDone = false; | |
void listenToInnerStream(Stream<T> innerStream) { | |
assert(innerSubscription == null); | |
var subscription = innerStream | |
.listen(controller.add, onError: controller.addError, onDone: () { | |
innerSubscription = null; | |
if (outerStreamDone) controller.close(); | |
}); | |
// If a pause happens during an innerSubscription.cancel, | |
// we still listen to the next stream when the cancel is done. | |
// Then we immediately pause it again here. | |
if (controller.isPaused) subscription.pause(); | |
innerSubscription = subscription; | |
} | |
var addError = controller.addError; | |
final outerSubscription = listen(null, onError: addError, onDone: () { | |
outerStreamDone = true; | |
if (innerSubscription == null) controller.close(); | |
}); | |
outerSubscription.onData((innerStream) async { | |
var currentSubscription = innerSubscription; | |
if (currentSubscription == null) { | |
listenToInnerStream(innerStream); | |
return; | |
} | |
innerSubscription = null; | |
outerSubscription.pause(); | |
try { | |
await currentSubscription.cancel(); | |
} catch (error, stack) { | |
controller.addError(error, stack); | |
} finally { | |
if (!isBroadcast && !controller.hasListener) { | |
// Result single-subscription stream subscription was cancelled | |
// while waiting for previous innerStream cancel. | |
// | |
// Ensure that the last received stream is also listened to and | |
// cancelled, then do nothing further. | |
innerStream.listen(null).cancel().ignore(); | |
} else { | |
outerSubscription.resume(); | |
listenToInnerStream(innerStream); | |
} | |
} | |
}); | |
if (!isBroadcast) { | |
controller | |
..onPause = () { | |
innerSubscription?.pause(); | |
outerSubscription.pause(); | |
} | |
..onResume = () { | |
innerSubscription?.resume(); | |
outerSubscription.resume(); | |
}; | |
} | |
controller.onCancel = () { | |
var sub = innerSubscription; | |
var cancels = [ | |
if (!outerStreamDone) outerSubscription.cancel(), | |
if (sub != null) sub.cancel(), | |
] | |
// Handle opt-out nulls | |
..removeWhere((Object? f) => f == null); | |
if (cancels.isEmpty) return null; | |
return Future.wait(cancels).then(_ignore); | |
}; | |
}; | |
return controller.stream; | |
} | |
} | |
/// Helper function to ignore future callback | |
void _ignore(_, [__]) {} | |
// End of switchMap extension. | |
/// Process only one event by cancelling any pending events and | |
/// processing the new event immediately. | |
/// | |
/// Avoid using [restartable] if you expect an event to have | |
/// immediate results -- it should only be used with asynchronous APIs. | |
/// | |
/// **Note**: there is no event handler overlap and any currently running tasks | |
/// will be aborted if a new event is added before a prior one completes. | |
EventTransformer<Event> restartable<Event>() { | |
return (events, mapper) => events.switchMap(mapper); | |
} | |
// End of Restartable. | |
var rng = Random(); | |
void main() { | |
Bloc.observer = AppBlocObserver(); | |
runApp(const App()); | |
} | |
/// Custom [BlocObserver] that observes all bloc and cubit state changes. | |
class AppBlocObserver extends BlocObserver { | |
@override | |
void onChange(BlocBase bloc, Change change) { | |
super.onChange(bloc, change); | |
if (bloc is Cubit) print(change); | |
} | |
@override | |
void onTransition(Bloc bloc, Transition transition) { | |
super.onTransition(bloc, transition); | |
print(transition); | |
} | |
} | |
/// {@template app} | |
/// A [StatelessWidget] that: | |
/// * uses [bloc](https://pub.dev/packages/bloc) and | |
/// [flutter_bloc](https://pub.dev/packages/flutter_bloc) | |
/// to manage the state of a counter and the app theme. | |
/// {@endtemplate} | |
class App extends StatelessWidget { | |
/// {@macro app} | |
const App({Key? key}) : super(key: key); | |
@override | |
Widget build(BuildContext context) { | |
return BlocProvider( | |
create: (_) => ThemeCubit(), | |
child: const AppView(), | |
); | |
} | |
} | |
/// {@template app_view} | |
/// A [StatelessWidget] that: | |
/// * reacts to state changes in the [ThemeCubit] | |
/// and updates the theme of the [MaterialApp]. | |
/// * renders the [CounterPage]. | |
/// {@endtemplate} | |
class AppView extends StatelessWidget { | |
/// {@macro app_view} | |
const AppView({Key? key}) : super(key: key); | |
@override | |
Widget build(BuildContext context) { | |
return BlocBuilder<ThemeCubit, ThemeData>( | |
builder: (_, theme) { | |
return MaterialApp( | |
theme: theme, | |
home: const CounterPage(), | |
); | |
}, | |
); | |
} | |
} | |
/// {@template counter_page} | |
/// A [StatelessWidget] that: | |
/// * provides a [CounterBloc] to the [CounterView]. | |
/// {@endtemplate} | |
class CounterPage extends StatelessWidget { | |
/// {@macro counter_page} | |
const CounterPage({Key? key}) : super(key: key); | |
@override | |
Widget build(BuildContext context) { | |
return BlocProvider( | |
create: (_) => CounterBloc(), | |
child: const CounterView(), | |
); | |
} | |
} | |
/// {@template counter_view} | |
/// A [StatelessWidget] that: | |
/// * demonstrates how to consume and interact with a [CounterBloc]. | |
/// {@endtemplate} | |
class CounterView extends StatelessWidget { | |
/// {@macro counter_view} | |
const CounterView({Key? key}) : super(key: key); | |
@override | |
Widget build(BuildContext context) { | |
return Scaffold( | |
appBar: AppBar(title: const Text('Counter')), | |
body: Center( | |
child: BlocBuilder<CounterBloc, int>( | |
builder: (context, count) { | |
return Text('$count', style: Theme.of(context).textTheme.displayLarge?.merge(TextStyle( color: Colors.primaries[rng.nextInt(Colors.primaries.length)] ))); | |
}, | |
), | |
), | |
floatingActionButton: Column( | |
crossAxisAlignment: CrossAxisAlignment.end, | |
mainAxisAlignment: MainAxisAlignment.end, | |
children: <Widget>[ | |
FloatingActionButton( | |
child: const Icon(Icons.add), | |
onPressed: () => context.read<CounterBloc>().add(Increment()), | |
), | |
const SizedBox(height: 4), | |
FloatingActionButton( | |
child: const Icon(Icons.remove), | |
onPressed: () => context.read<CounterBloc>().add(Decrement()), | |
), | |
const SizedBox(height: 4), | |
FloatingActionButton( | |
child: const Icon(Icons.brightness_6), | |
onPressed: () => context.read<ThemeCubit>().toggleTheme(), | |
), | |
], | |
), | |
); | |
} | |
} | |
/// Event being processed by [CounterBloc]. | |
abstract class CounterEvent {} | |
/// Notifies bloc to increment state. | |
class Increment extends CounterEvent {} | |
/// Notifies bloc to decrement state. | |
class Decrement extends CounterEvent {} | |
/// {@template counter_bloc} | |
/// A simple [Bloc] that manages an `int` as its state. | |
/// {@endtemplate} | |
class CounterBloc extends Bloc<CounterEvent, int> { | |
/// {@macro counter_bloc} | |
CounterBloc() : super(0) { | |
on<Increment>((event, emit) => Future.delayed(Duration(seconds: rng.nextInt(5)), () { | |
print('Time has passed.'); | |
emit(state + 1); | |
}), | |
/// Specify a custom event transformer from `package:bloc_concurrency` | |
/// restartable - process only the latest event and cancel previous event handlers | |
transformer: restartable(), | |
); | |
on<Decrement>((event, emit) => Future.delayed(Duration(seconds: rng.nextInt(5)), () { | |
print('Time has passed.'); | |
emit(state - 1); | |
}), | |
/// Specify a custom event transformer from `package:bloc_concurrency` | |
/// restartable - process only the latest event and cancel previous event handlers | |
transformer: restartable(), | |
); | |
} | |
} | |
/// {@template brightness_cubit} | |
/// A simple [Cubit] that manages the [ThemeData] as its state. | |
/// {@endtemplate} | |
class ThemeCubit extends Cubit<ThemeData> { | |
/// {@macro brightness_cubit} | |
ThemeCubit() : super(_lightTheme); | |
static final _lightTheme = ThemeData( | |
floatingActionButtonTheme: const FloatingActionButtonThemeData( | |
foregroundColor: Colors.white, | |
), | |
brightness: Brightness.light, | |
); | |
static final _darkTheme = ThemeData( | |
floatingActionButtonTheme: const FloatingActionButtonThemeData( | |
foregroundColor: Colors.black, | |
), | |
brightness: Brightness.dark, | |
); | |
/// Toggles the current brightness between light and dark. | |
void toggleTheme() { | |
emit(state.brightness == Brightness.dark ? _lightTheme : _darkTheme); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment