Skip to content

Instantly share code, notes, and snippets.

@cmc5788
Created June 29, 2021 18:11
Show Gist options
  • Save cmc5788/91c1e2dd711fa74f45440a7ffbd0159f to your computer and use it in GitHub Desktop.
Save cmc5788/91c1e2dd711fa74f45440a7ffbd0159f to your computer and use it in GitHub Desktop.
AsyncChangeNotifier
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter/widgets.dart';
import 'package:rxdart/rxdart.dart';
typedef TaskNotifier = void Function(VoidCallback notify);
typedef TaskWorker = Stream<void> Function(TaskNotifier notifier);
typedef TaskFilter = bool Function(Object task);
typedef TaskMapper = Stream<void> Function(Object task);
typedef TaskConcurrency = Stream<void> Function(
Stream<Object> stream,
TaskMapper mapper,
);
typedef TaskDefinition = void Function({
required TaskFilter where,
TaskConcurrency? concurrency,
});
// Note: all of these operators could be reimplemented easily without Rx
Stream<void> runConcurrently(Stream<Object> stream, TaskMapper mapper) =>
stream.flatMap(mapper);
Stream<void> drop(Stream<Object> stream, TaskMapper mapper) =>
stream.exhaustMap(mapper);
Stream<void> restartable(Stream<Object> stream, TaskMapper mapper) =>
stream.switchMap(mapper);
abstract class AsyncChangeNotifier extends ChangeNotifier {
AsyncChangeNotifier() {
final taskStreams = <Stream<void>>[];
final taskFilters = <TaskFilter>[];
onDefineTasks(({required where, concurrency}) {
taskFilters.add(where);
taskStreams.add(
(concurrency ?? defaultTaskConcurrency).call(
_tasks.stream.where((t) => where(t.task)).cast(),
_taskMapper,
),
);
});
taskStreams.add(
defaultTaskConcurrency(
_tasks.stream.where((t) {
for (final filter in taskFilters) {
if (filter(t.task)) return false;
}
return true;
}).cast(),
_taskMapper,
),
);
// Note: Rx.merge could be reimplemnted easily without the Rx dependency
_sub = Rx.merge(taskStreams).listen(null);
}
@protected
TaskConcurrency get defaultTaskConcurrency => runConcurrently;
@protected
void onDefineTasks(TaskDefinition define) {}
@protected
void runTask(Object task, TaskWorker worker) {
_tasks.add(_TaskData(task, worker));
}
bool _allowNotify = false;
@override
@protected
@visibleForTesting
void notifyListeners() {
assert(
_allowNotify,
'Within AsyncChangeNotifier, notifyListeners '
"must only be called from a task's notifier.",
);
super.notifyListeners();
}
// ignore: cancel_subscriptions
StreamSubscription<void>? _sub;
final _tasks = StreamController<_TaskData>.broadcast(sync: true);
bool _disposed = false;
@override
void dispose() {
_disposed = true;
_sub!.cancel();
_sub = null;
_tasks.close();
super.dispose();
}
Stream<void> _taskMapper(Object t) async* {
// When the parent stream subscription is canceled, this inner stream is
// implicitly canceled. We can use an operator to grab the cancellation
// notification and guard against effects happening after cancellation.
var canceled = false;
yield* (t as _TaskData)
.worker((fn) {
if (!canceled && !_disposed) {
_allowNotify = true;
try {
fn();
notifyListeners();
} finally {
_allowNotify = false;
}
}
})
// Note: `doOnCancel` could be implemented easily without Rx.
// Also, the cast to null is only needed to work around a bug in Rx's
// implementation of `DoStreamTransformer`.
.cast<Null>()
.doOnCancel(() {
canceled = true;
});
}
}
@immutable
class _TaskData {
const _TaskData(this.task, this.worker);
final Object task;
final TaskWorker worker;
}
import 'package:flutter/material.dart';
import 'package:provider/provider.dart';
import 'async_noti.dart';
import 'build_scope.dart';
import 'main.dart';
class LoginBinder extends AsyncChangeNotifier {
LoginBinder(this.repo);
final SessionRepo repo;
bool _loginLoading = false;
bool get loginLoading => _loginLoading;
String? _loginError;
String? get loginError => _loginError;
String? _session;
String? get session => _session;
final _onLogin = ChangeNotifier();
Listenable get onLogin => _onLogin;
final _onLogout = ChangeNotifier();
Listenable get onLogout => _onLogout;
@override
void onDefineTasks(TaskDefinition define) {
define(
where: (task) => task == login || task == logout,
concurrency: drop,
);
}
void login(String username, String pass) => runTask(login, (notify) async* {
notify(() {
_loginError = null;
_loginLoading = true;
});
try {
final session = await repo.login(username, pass);
notify(() {
_session = session;
_loginLoading = false;
});
_onLogin.notifyListeners();
} on Object catch (error) {
notify(() {
_loginError = '$error';
_loginLoading = false;
});
}
});
void logout() => runTask(logout, (notify) async* {
if (_session == null) return;
notify(() {
_loginLoading = true;
});
try {
await repo.logout();
notify(() {
_session = null;
_loginLoading = false;
});
_onLogout.notifyListeners();
} on Object catch (_) {
notify(() {
_loginLoading = false;
});
}
});
}
class LoginPage extends StatelessWidget {
const LoginPage({Key? key}) : super(key: key);
static void _onLogin(BuildContext context) {
ScaffoldMessenger.of(context).showSnackBar(
const SnackBar(content: Text('Signed in')),
);
context.read<AppBinder>().onLogin();
}
@override
Widget build(BuildContext context) => ScopedListener(
listenableFinder: (ctx) =>
ctx.select<LoginBinder, Listenable>((b) => b.onLogin),
listener: _onLogin,
child: Scaffold(
appBar: AppBar(title: const Text('Login')),
body: Container(
alignment: Alignment.center,
child: const Text('Click button to log in.'),
),
floatingActionButton: FloatingActionButton(
onPressed: () {
context.read<LoginBinder>().login('aaaa', 'bbbb');
},
tooltip: 'Login',
child: Selector<LoginBinder, bool>(
selector: (context, binder) =>
binder.loginLoading || binder.session != null,
builder: (context, loading, __) => loading
? SizedBox(
width: 16,
height: 16,
child: CircularProgressIndicator(
strokeWidth: 2,
color: Theme.of(context).colorScheme.surface,
value: null,
),
)
: const Icon(Icons.check),
),
),
),
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment