Skip to content

Instantly share code, notes, and snippets.

@cmc5788
Last active February 21, 2021 18:07
Show Gist options
  • Save cmc5788/be2a52e9f1fc3e155a276f76fe60ea37 to your computer and use it in GitHub Desktop.
Save cmc5788/be2a52e9f1fc3e155a276f76fe60ea37 to your computer and use it in GitHub Desktop.
updated bloc / reducer_bloc
import 'dart:async';
import 'dart:math';
import 'package:equatable/equatable.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:rxdart/rxdart.dart';
//
// Notes:
//
// Can achieve almost identical pattern to what `ReducerBloc` is doing with
// `Bloc` if desired. However `ReducerBloc` locks this pattern down and is less
// flexible.
//
// If it's a good pattern to lock, less flexibility is good for project with a
// lot of devs. (But not for OSS use case, where flexibility is better).
//
// But is the pattern good? Some categories of state mutation errors that are
// possible to demonstrate with `Bloc` in example below are not occurring with
// `ReducerBloc`, so I'm inclined to think so right now.
//
// `ReducerBloc` also makes it easy to write libraries like:
// extension RxBloc<T> on Stream<T> { ... }
// Typical usage patterns in base `Bloc` make these harder to use.
//
// `ReducerBloc`'s internal events cause some extra boilerplate. On the other
// hand, this makes state mutations more traceable since event to state change
// ratio is "1 to 1" instead of "1 to 0...n".
//
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// Some utility (repo) to use in examples
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
class MyDataRepo {
final _rngs = <int, Random>{};
// added some pseudorandom errors
Future<int> fetchData(int seed) {
final rng = _rngs.putIfAbsent(seed, () => Random(seed));
return Future<int>.delayed(Duration(milliseconds: rng.nextInt(700) + 300))
.then((_) => rng.nextBool() ? rng.nextInt(100) + 1 : throw 'Oops!');
}
// A bit contrived to show issue with near-simultaneous events
Stream<int> fetchValues(int n) => Rx.merge([
for (int i = 0; i < n; i++) Rx.timer(1, Duration(milliseconds: 250)),
]);
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// Some utility (event / state) to use in examples
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
abstract class MyBlocEvent extends Equatable {
const MyBlocEvent();
@override
List<Object> get props => [];
}
class RefreshPressed extends MyBlocEvent {
const RefreshPressed(this.randomSeedText);
final String randomSeedText;
@override
List<Object> get props => [randomSeedText];
}
class AddTwoPressed extends MyBlocEvent {
const AddTwoPressed();
}
class _TimerTicked extends MyBlocEvent {
const _TimerTicked();
}
class _ErrorMessageDisplayed extends MyBlocEvent {
const _ErrorMessageDisplayed(this.errorMessage);
final String errorMessage;
@override
List<Object> get props => [errorMessage];
}
// ReducerBloc only
class _ReceivedAddedData extends MyBlocEvent {
const _ReceivedAddedData(this.data);
final int data;
@override
List<Object> get props => [data];
}
// ReducerBloc only
class _StartedRefreshData extends MyBlocEvent {
const _StartedRefreshData();
}
// ReducerBloc only
class _DataRefreshed extends MyBlocEvent {
const _DataRefreshed(this.data);
final int data;
@override
List<Object> get props => [data];
}
// ReducerBloc only
class _FailedRefreshData extends MyBlocEvent {
const _FailedRefreshData(this.errorMessage);
final String errorMessage;
@override
List<Object> get props => [errorMessage];
}
// ReducerBloc only
class _ErrorMessageDismissed extends MyBlocEvent {
const _ErrorMessageDismissed(this.errorMessage);
final String errorMessage;
@override
List<Object> get props => [errorMessage];
}
class MyBlocState extends Equatable {
const MyBlocState({
this.val = 0,
this.loading = false,
this.errorMessage = '',
});
final int val;
final bool loading;
final String errorMessage;
MyBlocState copyWith({
int val,
bool loading,
String errorMessage,
}) {
return MyBlocState(
val: val ?? this.val,
loading: loading ?? this.loading,
errorMessage: errorMessage ?? this.errorMessage,
);
}
@override
List<Object> get props => [val, loading, errorMessage];
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// Simple timer Bloc to validate "Bloc -> Bloc" dependency use case.
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
mixin MyTimerBloc on Bloc<void, int> {}
class MyTimerReducerBloc extends ReducerBloc<void, int> with MyTimerBloc {
MyTimerReducerBloc() : super(0);
@override
List<Stream<void>> processEvents(_) =>
[Stream.periodic(Duration(seconds: 1))];
@override
int reduceState(int lastState, _) => lastState + 1;
}
// for comparison
class MyTimerNormalBloc extends Bloc<void, int> with MyTimerBloc {
MyTimerNormalBloc() : super(0);
@override
Stream<Transition<void, int>> transformEvents(_, transitionFn) =>
Stream.periodic(Duration(seconds: 1)).asyncExpand(transitionFn);
@override
Stream<int> mapEventToState(_) async* {
yield state + 1;
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// A ReducerBloc example
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
class MyReducerBloc extends ReducerBloc<MyBlocEvent, MyBlocState> {
MyReducerBloc(MyBlocState state, this.repo, this.timerBloc) : super(state);
final MyDataRepo repo;
final MyTimerBloc timerBloc;
// I like list of streams so dev doesn't have to think about merge.
@override
List<Stream<MyBlocEvent>> processEvents(Stream<MyBlocEvent> events) => [
_timerTickEvents(),
_addTwoPressedEvents(events),
_refreshPressedEvents(events),
_errorDisplayedEvents(events),
];
// Dependencies on other `Bloc`s can just be an event source... no need to
// juggle subscriptions.
Stream<MyBlocEvent> _timerTickEvents() =>
timerBloc.where((x) => x % 5 == 0).mapTo(const _TimerTicked());
// Below events can be processed without Rx usage. To achieve same with
// normal `Bloc` patterns, it's hard to wrap the Rx and hide the details.
// These can all be done with `exhaustMap` and `switchMap`, but hiding Rx
// behind an extension is a nice way to prevent abuse of patterns.
Stream<MyBlocEvent> _addTwoPressedEvents(Stream<MyBlocEvent> events) =>
events.processEventsOfType<AddTwoPressed>(
concurrency: EventConcurrency.ignoreUntilComplete,
processor: (_) => repo.fetchValues(2).map((x) => _ReceivedAddedData(x)),
);
Stream<MyBlocEvent> _refreshPressedEvents(Stream<MyBlocEvent> events) =>
events.processEventsOfType<RefreshPressed>(
concurrency: EventConcurrency.cancelPrevious,
pre: (_) async* {
yield const _StartedRefreshData();
},
processor: (event) async* {
final seed = int.tryParse(event.randomSeedText) ??
event.randomSeedText.hashCode;
yield _DataRefreshed(await repo.fetchData(seed));
},
post: (_, error) async* {
if (error != null) yield _FailedRefreshData('$error');
},
);
Stream<MyBlocEvent> _errorDisplayedEvents(Stream<MyBlocEvent> events) =>
events.processEventsOfType<_ErrorMessageDisplayed>(
concurrency: EventConcurrency.cancelPrevious,
processor: (event) async* {
yield await Future.delayed(
Duration(seconds: 2),
() => _ErrorMessageDismissed(event.errorMessage),
);
},
);
// `reduceState` might grow a bit large, so added a utility for switching.
// If Dart adds sealed classes / pattern matching similar to Kotlin this may
// stop being useful, but for now this feels like a much better pattern than
// a massive type-checking `if / else` statement.
@override
MyBlocState reduceState(MyBlocState lastState, MyBlocEvent event) =>
switchEvent(lastState, event, [
_timerTickedCase(),
_receivedAddedDataCase(),
_startedRefreshDataCase(),
_dataRefreshedCase(),
_failedRefreshDataCase(),
_errorDismissedCase(),
]);
_timerTickedCase() => whenIs<_TimerTicked>((state, event) {
if (state.loading == true) return state;
return state.copyWith(val: state.val + 1);
});
_receivedAddedDataCase() => whenIs<_ReceivedAddedData>((state, event) {
return state.copyWith(val: state.val + event.data);
});
_startedRefreshDataCase() => whenIs<_StartedRefreshData>((state, event) {
return state.copyWith(loading: true, errorMessage: '');
});
_dataRefreshedCase() => whenIs<_DataRefreshed>((state, event) {
return state.copyWith(
loading: false, val: event.data, errorMessage: '');
});
_failedRefreshDataCase() => whenIs<_FailedRefreshData>((state, event) {
add(_ErrorMessageDisplayed(event.errorMessage));
return state.copyWith(loading: false, errorMessage: event.errorMessage);
});
_errorDismissedCase() => whenIs<_ErrorMessageDismissed>((state, event) {
if (state.errorMessage != event.errorMessage) return state;
return state.copyWith(errorMessage: '');
});
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// A (normal) Bloc example
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
class MyNormalBloc extends Bloc<MyBlocEvent, MyBlocState> {
MyNormalBloc(MyBlocState state, this.repo, this.timerBloc) : super(state);
final MyDataRepo repo;
final MyTimerBloc timerBloc;
// Almost equivalent function to `ReducerBloc`, but definitely more obscure
// for a typical dev between the `Rx.merge` usage and `transitionFn`.
@override
Stream<Transition<MyBlocEvent, MyBlocState>> transformEvents(
events,
transitionFn,
) {
return Rx.merge([
_timerTickEvents(transitionFn),
_addTwoPressedEvents(events, transitionFn),
_refreshPressedEvents(events, transitionFn),
_errorDisplayedEvents(events, transitionFn),
]);
}
Stream<Transition<MyBlocEvent, MyBlocState>> _timerTickEvents(transitionFn) =>
timerBloc
.where((x) => x % 5 == 0)
.mapTo(const _TimerTicked())
.asyncExpand(transitionFn);
Stream<Transition<MyBlocEvent, MyBlocState>> _addTwoPressedEvents(
Stream<MyBlocEvent> events, transitionFn) =>
events.whereType<AddTwoPressed>().exhaustMap(transitionFn);
Stream<Transition<MyBlocEvent, MyBlocState>> _refreshPressedEvents(
Stream<MyBlocEvent> events, transitionFn) =>
events.whereType<RefreshPressed>().switchMap(transitionFn);
Stream<Transition<MyBlocEvent, MyBlocState>> _errorDisplayedEvents(
Stream<MyBlocEvent> events, transitionFn) =>
events.whereType<_ErrorMessageDisplayed>().switchMap(transitionFn);
// Patterns that rely on looking up external state from "inside" streams tend
// to result in problems like comments below; usually prefer stream operators
// to be pure functions, where all input is param & output is `return/yield`.
@override
Stream<MyBlocState> mapEventToState(MyBlocEvent event) async* {
if (event is _TimerTicked) {
if (state.loading == true) return;
yield state.copyWith(val: state.val + 1);
} else if (event is AddTwoPressed) {
yield* _mapAddTwoPressed();
} else if (event is RefreshPressed) {
yield* _mapRefreshPressed(event);
} else if (event is _ErrorMessageDisplayed) {
yield* _mapDismissErrors(event);
} else {
throw UnimplementedError('Handling of $event is not implemented.');
}
}
Stream<MyBlocState> _mapRefreshPressed(RefreshPressed event) async* {
yield state.copyWith(loading: true, errorMessage: '');
try {
final seed =
int.tryParse(event.randomSeedText) ?? event.randomSeedText.hashCode;
yield state.copyWith(
loading: false,
val: await repo.fetchData(seed),
errorMessage: '',
);
// Below are two examples that don't work because `this.state` does not
// update between the calls.
// -- 1 --
// yield state.copyWith(val: await repo.fetchData(seed));
// yield state.copyWith(loading: false, errorMessage: '');
// -- 2 --
// yield await repo
// .fetchData(seed)
// .then((data) => state.copyWith(val: data))
// .then((_) => print('loaded!'))
// .then((_) => state.copyWith(loading: false, errorMessage: ''));
} catch (error) {
yield state.copyWith(loading: false, errorMessage: '$error');
add(_ErrorMessageDisplayed('$error'));
}
}
Stream<MyBlocState> _mapAddTwoPressed() async* {
// Below doesn't work, same reason as comments in `_mapRefreshPressed`.
// Could be fixed with `await for`, but purpose is to demo problem typical
// dev might encounter. I anticipate a lot of hacky workarounds like
// `await Future.delayed(Duration.zero)`.
yield* repo
.fetchValues(2)
.map((val) => state.copyWith(val: state.val + val));
}
Stream<MyBlocState> _mapDismissErrors(_ErrorMessageDisplayed event) async* {
await Future.delayed(Duration(seconds: 2));
if (event.errorMessage != state.errorMessage) return;
yield state.copyWith(errorMessage: '');
}
}
class MyBlocObserver extends BlocObserver {
@override
void onTransition(Bloc bloc, Transition transition) {
super.onTransition(bloc, transition);
print('${bloc.runtimeType} $transition');
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// An example app ...
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
void main() {
EquatableConfig.stringify = kDebugMode;
Bloc.observer = MyBlocObserver();
runApp(MyApp());
}
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MultiBlocProvider(
providers: [
BlocProvider<MyTimerBloc>(
create: (_) => MyTimerReducerBloc(),
// create: (_) => MyTimerNormalBloc(),
),
BlocProvider(
create: (context) => MyReducerBloc(
MyBlocState(),
MyDataRepo(),
context.read<MyTimerBloc>(),
),
),
BlocProvider(
create: (context) => MyNormalBloc(
MyBlocState(),
MyDataRepo(),
context.read<MyTimerBloc>(),
),
),
],
child: MaterialApp(
home: MyHomePage(title: 'Bloc Comparison'),
),
);
}
}
class MyHomePage extends StatefulWidget {
MyHomePage({Key key, this.title}) : super(key: key);
final String title;
@override
State<StatefulWidget> createState() => _MyHomePageState();
}
class _MyHomePageState extends State<MyHomePage> {
final TextEditingController _textController = TextEditingController();
@override
Widget build(BuildContext context) {
final textTheme = Theme.of(context).textTheme.headline5;
return Scaffold(
appBar: AppBar(title: Text(widget.title)),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
BlocBuilder<MyReducerBloc, MyBlocState>(
builder: (_, state) {
return Text(
'Reducer Bloc\n'
'Value = ${state.loading == true ? '-' : state.val}\n'
'Error = ${state.errorMessage}',
style: textTheme,
);
},
),
SizedBox(height: 16),
BlocBuilder<MyNormalBloc, MyBlocState>(
builder: (_, state) {
return Text(
'Normal Bloc\n'
'Value = ${state.loading == true ? '-' : state.val}\n'
'Error = ${state.errorMessage}',
style: textTheme,
);
},
),
SizedBox(height: 16),
SizedBox(
width: 200,
child: TextField(
controller: _textController,
decoration: InputDecoration(
border: OutlineInputBorder(),
labelText: 'Enter random seed',
),
),
),
],
),
),
floatingActionButton: Row(
mainAxisAlignment: MainAxisAlignment.end,
children: [
FloatingActionButton(
onPressed: () {
context.read<MyReducerBloc>().add(const AddTwoPressed());
// note that `MyNormalBloc` doesn't actually add two.
// problem explained in comments inside class.
context.read<MyNormalBloc>().add(const AddTwoPressed());
},
tooltip: 'Add Two',
child: Icon(Icons.exposure_plus_2),
),
SizedBox(width: 16),
FloatingActionButton(
onPressed: () {
final seedText = _textController.text;
context.read<MyReducerBloc>().add(RefreshPressed(seedText));
context.read<MyNormalBloc>().add(RefreshPressed(seedText));
},
tooltip: 'Calc New Value',
child: Icon(Icons.refresh),
),
],
),
);
}
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// ReducerBloc
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
abstract class ReducerBloc<Event, State> extends Bloc<Event, State> {
ReducerBloc(State state) : super(state);
@protected
State reduceState(State lastState, Event event);
@protected
List<Stream<Event>> processEvents(Stream<Event> events) => [events];
@nonVirtual
@protected
@override
Stream<Transition<Event, State>> transformEvents(Stream<Event> events, _) {
final allEvents = processEvents(events);
if (allEvents.isEmpty) return Stream.empty();
final processedEvents = allEvents.length == 1 //
? allEvents.first
: Rx.merge(allEvents);
return processedEvents.map(
(event) => Transition(
currentState: state,
event: event,
nextState: state,
),
);
}
@nonVirtual
@protected
@override
Stream<State> mapEventToState(Event event) => throw UnimplementedError(
'ReducerBloc does not support mapEventToState. '
'Use reduceState instead.',
);
@nonVirtual
@protected
@override
Stream<Transition<Event, State>> transformTransitions(
Stream<Transition<Event, State>> transitions,
) {
return transitions.scan<Transition<Event, State>>((a, b, _) {
return Transition(
currentState: a.nextState,
event: b.event,
nextState: reduceState(a.nextState, b.event),
);
}, Transition(nextState: state, event: null, currentState: state));
}
@nonVirtual
@protected
State switchEvent(
State lastState,
Event event,
List<EventCase<State, Event, dynamic>> cases,
) {
for (final c in cases) {
if (c.matches(event)) {
return c.handler?.call(lastState, event) ?? lastState;
}
}
throw EventNotHandledError(
'Handling of $event by $runtimeType is not implemented.',
);
}
@nonVirtual
@protected
EventCase<State, Event, Event> whenEquals(
Event event,
State Function(State state) handler,
) {
return EventCase._(
(e) => e == event,
(State s, dynamic _) => handler(s),
);
}
@nonVirtual
@protected
EventCase<State, Event, E> whenIs<E extends Event>(
State Function(State state, E event) handler,
) {
return EventCase._(
null,
(State s, dynamic e) => handler(s, e),
);
}
}
class EventNotHandledError extends UnimplementedError {
EventNotHandledError(String message) : super(message);
}
@protected
class EventCase<State, SuperEvent, Event extends SuperEvent> {
EventCase._(this.matcher, this.handler);
final bool Function(SuperEvent event) matcher;
final State Function(State state, Event event) handler;
bool matches(SuperEvent event) =>
event is Event && (matcher?.call(event) ?? true);
}
// Wrap difficult Rx concepts in extensions that are easy to understand
enum EventConcurrency {
runConcurrently,
addToQueue,
ignoreUntilComplete,
cancelPrevious,
}
extension RxEventProcessing<T> on Stream<T> {
Stream<T> processEventsOfType<Event extends T>({
EventConcurrency concurrency = EventConcurrency.cancelPrevious,
Stream<T> Function(Event event) pre,
Stream<T> Function(Event event) processor,
Stream<T> Function(Event event, dynamic error) post,
}) {
Stream<T> handler(Event event) {
final operations = <Stream<T>>[
if (pre != null) pre(event),
processor(event),
];
if (post == null) return Rx.concat(operations);
return Rx.concat([
...operations,
Rx.defer(() => post(event, null)),
]).onErrorResume((error) => post(event, error));
}
switch (concurrency) {
case EventConcurrency.runConcurrently:
return this.whereType<Event>().flatMap(handler);
case EventConcurrency.addToQueue:
return this.whereType<Event>().asyncExpand(handler);
case EventConcurrency.ignoreUntilComplete:
return this.whereType<Event>().exhaustMap(handler);
case EventConcurrency.cancelPrevious:
default:
return this.whereType<Event>().switchMap(handler);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment