People always ask for the best way to store data.
Most often they don't disclose their requirements. So let's assume a) we only need to store a few megabytes of data (which easily fit into the main memory of your device), b) we have more reads than writes, c) we need only be faster than 1ms, and d) we don't need complex queries. A simple key/value store will suffice.
Here's a minimal key-value store API:
abstract class KV<T> {
Future<T?> get(String key);
Future<void> set(String key, T value);
Future<void> delete(String key);
...
To make things more interesting, I'll add one additional method to enumerate all keys, though:
...
Stream<String> keys([String? prefix]);
}
Note that you cannot distinguish non-existing values from stored null
s in get
, if T
isn't restricted to Object
. If you want to support null
values, consider changing the API to something like Future<(bool exists, T? value)> get(String key)
.
To test this, here's an in-memory implementation, basically a glorified wrapper around a Map<String, T>
that is asynchronous in nature:
class MemoryKV<T> implements KV<T> {
MemoryKV([Map<String, T>? data]) : _data = {...?data};
final Map<String, T> _data;
@override
Future<T?> get(String key) async => _data[key];
@override
Future<void> set(String key, T value) async => _data[key] = value;
@override
Future<void> delete(String key) async => _data.remove(key);
@override
Stream<String> keys([String? prefix]) async* {
for (final key in _data.keys.toList()) {
if (prefix == null || key.startsWith(prefix)) {
yield key;
}
}
}
}
Sidetrack: I assume that all operations are atomic, but something like incr
, which would need to use both get
and set
, wouldn't be atomic without synchronisation, because of the asynchronous nature of all calls. So don't forget to always think about atomicity of operations.
extension on KV<int> {
Future<int> incr(String key) async {
int next;
set(key, next = (await get(key) ?? 0) + 1);
return next;
}
}
To make data persistent, we can use different approaches. a) We could save each value as a single file (restricting the keys to valid file names). b) We could write a redo-log, still keeping everything in memory for easy read access. c) We could use sqlite3 to store everything in an objects table. d) We could use indexedDB on the web (or ram a nail in your knee which is more pleasent). We could even use the shared preferences on a mobile device.
In all cases, we need to convert T
into some other (more primitive) type, so let's abstract this first. A Codec
can transform data in Dart. A CodecKV
wraps another KV
, converting the values:
class CodecKV<T, U> extends KV<T> {
CodecKV(this.kv, this.codec);
final KV<U> kv;
final Codec<T, U> codec;
@override
Future<T?> get(String key) async {
final value = await kv.get(key);
return value == null ? null : codec.decode(value);
}
@override
Future<void> set(String key, T value) async {
await kv.set(key, codec.encode(value));
}
@override
Future<void> delete(String key) async => await kv.delete(key);
@override
Stream<String> keys([String? prefix]) => kv.keys();
}
A simple file store looks like this (you probably want to make sure that all keys are valid file paths on all supported platforms):
class FileKV extends KV<List<int>> {
FileKV(this.base);
final Directory base;
@override
Future<List<int>?> get(String key) async {
final file = _file(key);
return file.existsSync() ? await file.readAsBytes() : null;
}
@override
Future<void> set(String key, List<int> value) async {
final file = _file(key);
await file.parent.create(recursive: true);
await file.writeAsBytes(value);
}
@override
Future<void> delete(String key) async {
final file = _file(key);
if (file.existsSync()) {
await file.delete();
}
}
@override
Stream<String> keys([String? prefix]) async* {
final skip = base.path.length + 1;
await for (final entry in base.list(recursive: true)) {
if (entry is! File) continue;
final key = entry.path
.substring(skip)
.replaceAll(Platform.pathSeparator, '/');
if (prefix == null || key.startsWith(prefix)) {
yield key;
}
}
}
File _file(String key) =>
File('${base.path}/$key'.replaceAll('/', Platform.pathSeparator));
}
To store JSON instead of raw bytes, we combine this with our codec:
KV<Object?> jsonKV(Directory base) {
return CodecKV(FileKV(base), JsonCodec().fuse(Utf8Codec()));
}
Personally, I really like to use a Redo-log-based KV, though. You provide a File
and it will populate an in-memory KV by replaying all set
and delete
operations from the redo log, continuing to use that log to track future operations. Instead of List<int>
, I'll use String
as the base type, because, why not.
class RedoKV extends KV<String> {
RedoKV(this.file);
final File file;
final kv = MemoryKV<String>();
bool _loaded = false;
@override
Future<String?> get(String key) async {
await _redo();
return kv.get(key);
}
@override
Future<void> set(String key, String value) async {
await _log('$key|$value');
await kv.set(key, value);
}
@override
Future<void> delete(String key) async {
await _log(key);
await kv.delete(key);
}
@override
Stream<String> keys([String? prefix]) async* {
await _redo();
yield* kv.keys(prefix);
}
Future<void> _log(String message) async {
final sink = file.openWrite(mode: FileMode.append);
sink.writeln(message);
await sink.close();
}
Future<void> _redo() async {
if (!_loaded) {
await load();
await save();
_loaded = true;
}
}
Future<void> load() async {
// clear the database
await for (final key in kv.keys()) {
await kv.delete(key);
}
// replay the redo log
await for (final line
in file.openRead().transform(utf8.decoder).transform(LineSplitter())) {
final i = line.indexOf('|');
if (i != -1) {
kv.set(line.substring(0, i), line.substring(i + 1));
} else {
kv.delete(line);
}
}
}
Future<void> save() async {
final newFile = File('${file.path}.new');
final sink = newFile.openWrite();
await for (final key in kv.keys()) {
if (await kv.get(key) case String value) {
sink.writeln('$key|$value');
}
}
await sink.flush();
await sink.close();
await newFile.rename(file.path);
}
}
Again, we can use this to store JSON data:
KV<Object?> jsonKV(File log) {
return CodecKV(RedoKV(log), JsonCodec());
}
Sidetrack: Did you notice that the above code has a racing condition because load
and save
aren't atomic and must not interleave with set
or delete
?
A Mutex
can help to avoid this:
class Mutex {
Completer<void>? _lock;
Future<T> protect<T>(FutureOr<T> Function() execute) async {
while (_lock != null) {
await _lock!.future;
}
_lock = Completer();
try {
return execute();
} finally {
// order is important here
final lock = _lock!;
_lock = null;
lock.complete();
}
}
}
Then wrap all calls in mutex.protect(() { ... })
. End of sidetrack.
But what if you want to store arbitrary types like Person
?
We need some boilerplate code to create a Codec
to map specific types to JSON-encoded strings and back again:
class Mapper<T> extends Codec<T, String> {
Mapper(dynamic Function(T) encode, T Function(dynamic json) decode)
: encoder = MapperEncoder(encode),
decoder = MapperDecoder(decode);
@override
final Converter<T, String> encoder;
@override
final Converter<String, T> decoder;
}
class MapperEncoder<T> extends Converter<T, String> {
MapperEncoder(this.encode);
final dynamic Function(T) encode;
@override
String convert(T input) => json.encode(encode(input));
}
class MapperDecoder<T> extends Converter<String, T> {
MapperDecoder(this.decode);
final T Function(dynamic json) decode;
@override
T convert(String input) => decode(json.decode(input));
}
Given
class Person {
Person(this.name, this.age);
String name;
int age;
}
Setup this mapper:
final personMapper = Mapper<Person>(
(p) => {'name': p.name, 'age': p.age},
(json) => Person(json['name'], json['age']),
);
And then setup a person KV store:
final kv = CodecKV(RedoKV(log), personMapper);
Instead of serializing to JSON, the mapper could also create a list of bytes which might be more suitable for a relational database table. It's pluggable.
Sometimes, we might want to create a sub KV from a given KV:
class SubKV<T> extends KV<T> {
SubKV(this.kv, this.sub);
final KV<T> kv;
final String sub;
@override
Future<T?> get(String key) async {
return kv.get('$sub/$key');
}
@override
Future<void> set(String key, T value) async {
await kv.set('$sub/$key', value);
}
@override
Future<void> delete(String key) async {
await kv.delete('$sub/$key');
}
@override
Stream<String> keys([String? prefix]) {
final skip = sub.length + 1;
return kv.keys('$sub/${prefix ?? ''}').map((key) => key.substring(skip));
}
}
With these helpers:
extension<T> on KV<T> {
KV<T> sub(String sub) => SubKV(this, sub);
KV<U> codec<U>(Codec<U, T> codec) => CodecKV(this, codec);
}
I can create a people store as part of a larger kv store much easier:
final kv = RedoKV(File('kvtest.log'));
final peopleKV = kv.sub('people').codec(personMapper);
But let's switch back to the FileKV
. It doesn't have to keep everything in memory (otherwise, we could have used the RedoKV
), but it might be nice to have at least some caching. So here's that puzzle piece:
class CachedKV<T> extends KV<T> {
CachedKV(this.kv, [this.limit = 1_000, int Function(T)? size])
: _size = size ?? ((_) => 1);
final KV<T> kv;
final int limit;
final int Function(T) _size;
final _cache = <String, T>{};
final _keys = <String>[];
int _cacheSize = 0;
static CachedKV<String> string(KV<String> kv, [int limit = 1_000_000]) =>
CachedKV(kv, limit, (value) => value.length * 2);
static CachedKV<Uint8List> uint8List(
KV<Uint8List> kv, [
int limit = 1_000_000,
]) => CachedKV(kv, limit, (value) => value.length);
@override
Future<T?> get(String key) async {
// if the value is cached, track for LRU and return the value
if (_cache[key] case final value?) {
if (_keys.length > 1) {
_keys.remove(key);
_keys.insert(0, key);
}
return value;
}
// if the value can be retrieved, cache it,
// evicting old values if needed to make space
if (await kv.get(key) case final value?) {
_cacheSize += _size(value);
while (_cacheSize > limit && _keys.isNotEmpty) {
final value = _cache.remove(_keys.removeLast());
if (value != null) _cacheSize -= _size(value);
}
_cache[key] = value;
_keys.insert(0, key);
return value;
}
// value doesn't exist
return null;
}
@override
Future<void> set(String key, T value) async {
// if the old value was cached, remove it
// the new value is then cached as with `get`
if (_cache.remove(key) case final oldValue?) {
_cacheSize -= _size(oldValue);
_keys.remove(key);
_cacheSize += _size(value);
while (_cacheSize > limit && _keys.isNotEmpty) {
final value = _cache.remove(_keys.removeLast());
if (value != null) _cacheSize -= _size(value);
}
_cache[key] = value;
_keys.insert(0, key);
}
await kv.set(key, value);
}
@override
Future<void> delete(String key) async {
if (_cache.remove(key) case final oldValue?) {
_cacheSize -= _size(oldValue);
_keys.remove(key);
}
await kv.delete(key);
}
@override
Stream<String> keys([String? prefix]) => kv.keys(prefix);
}
The implementation uses an LRU list to sort cached objects by the time of the last access. This way, the "least recently used" objects are evicted first. It also limits the number of items and/or the size of the cache, using a size
function to get an per-object size. By default this function returns 1, so the number of objects is counted. But for files, we could use the number of bytes here, limiting the cache to an overall byte size.
We can now use this to store people as individual files, still having a decent read access because of the caching. And we could derive other statically typed values from that base KV (I had to explicitly convert the string to an array of bytes using a Utf8Codec
, but that's just a minor inconvenience):
final kv = CachedKV.uint8List(FileKV(Directory('kvtest')))
.sub('people').codec(personMapper.fuse(Utf8Codec()));
But wait, there's more. Instead of caching a FileKV
, we could acess a RESTful server and cache those replies. The get
maps to a GET
request, the set
maps to a POST
or (more correctly)PUT
request and the delete
to a DELETE
request. For implementing keys
, we need some support from the server, because it has to enumerate all keys that match the provided prefix. I'll send such a query to the server's base URL with a keys
query parameter.
Here's the KV using the http
client library:
class RestKV extends KV<List<int>> {
RestKV(this.baseUrl);
final Uri baseUrl;
final client = http.Client();
@override
Future<List<int>?> get(String key) async {
final response = await client.get(_url(key));
return response.statusCode == 200 ? response.bodyBytes : null;
}
@override
Future<void> set(String key, List<int> value) async {
await client.put(_url(key), body: value);
}
@override
Future<void> delete(String key) async {
await client.delete(_url(key));
}
@override
Stream<String> keys([String? prefix]) async* {
final response = await client.send(
http.Request('GET', baseUrl.replace(queryParameters: {'keys': prefix})),
);
if (response.statusCode != 200) {
throw Exception('keys query failed');
}
yield* response.stream.transform(utf8.decoder).transform(LineSplitter());
}
Uri _url(String key) => baseUrl.replace(path: '${baseUrl.path}$key');
}
To test this, let's also create a server implementation:
Future<void> server(KV<List<int>> kv) async {
const maxContentLength = 10_000_000;
void handle(HttpRequest request) async {
final key = request.uri.path.substring(1);
if (key.contains('..') || key.length > 2048) return;
switch (request.method) {
case 'GET':
if (request.uri.queryParameters['keys'] case final prefix?) {
if (prefix.contains('..') || prefix.length > 2048) return;
request.response.statusCode = HttpStatus.ok;
request.response.headers.contentType = ContentType.text;
await for (final key in kv.keys(prefix.isEmpty ? null : prefix)) {
request.response.writeln(key);
}
await request.response.close();
} else {
final value = await kv.get(key);
if (value != null) {
request.response.statusCode = HttpStatus.ok;
request.response.headers.contentType = ContentType.binary;
request.response.add(value);
} else {
request.response.statusCode = HttpStatus.notFound;
}
await request.response.close();
}
case 'PUT':
final bb = BytesBuilder(copy: false);
await for (final bytes in request) {
bb.add(bytes);
if (bb.length > maxContentLength) return;
}
await kv.set(key, bb.takeBytes());
request.response.statusCode = HttpStatus.noContent;
await request.response.close();
case 'DELETE':
await kv.delete(key);
request.response.statusCode = HttpStatus.noContent;
await request.response.close();
default:
request.response.statusCode = HttpStatus.methodNotAllowed;
await request.response.close();
}
}
final server = await HttpServer.bind('0.0.0.0', 3333);
await for (final request in server) {
handle(request);
}
}
On the server-side, we can use e.g. a FileKV
to implement the KV and then run an HttpServer
to wait for GET
or PUT
or DELETE
requests or for requests to enumerate all keys.
await server(FileKV(Directory('kv')));
On the client-side, we can replace FileKV
with RestKV
, using something like:
final kv = CachedKV.uint8List(
RestKV(Uri.parse('http://localhost:3333/')),
).sub('people').codec(personMapper.bytes);
I didn't bother with HTTP cache control. This could also greatly improve the performance because the built-in HTTP client of your operation system will probably automatically cache resources it is allowed to cache.
Starting from a very basic API with four methods, we created multiple implementations, a data mapper and even a basic client/server implementation. We could also use an isolate to run a dedicated KV and then use ports to send messages from other isolates.