Last active
October 28, 2019 13:51
-
-
Save haarts/657885fa7c047cbcab7e3bb3e1b320db to your computer and use it in GitHub Desktop.
A transformer to only allow unique entries. `Unique(memory: 1)` is equivalent to `distinct()`.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Unique<T> extends StreamTransformerBase<T, T> { | |
final int memory; | |
const Unique({this.memory = 10}); | |
Stream<T> bind(Stream<T> stream) => Stream<T>.eventTransformed( | |
stream, (EventSink sink) => UniqueSink<T>(sink, memory: memory)); | |
} | |
class UniqueSink<T> implements EventSink<T> { | |
final EventSink<T> _output; | |
final Set<T> _seen = {}; | |
final int memory; | |
final int head; | |
UniqueSink(this._output, {this.memory = 10}) : head = (memory / 5).ceil(); | |
void add(T data) { | |
// add it if we haven't seen it yet | |
if (!_seen.contains(data)) { | |
_seen.add(data); | |
_output.add(data); | |
} | |
// LRU cache | |
if (_seen.length > memory + head) { | |
_seen.removeAll(_seen.take((head)).toList()); | |
print(_seen.toList()); | |
} | |
} | |
void addError(e, [st]) => _output.addError(e, st); | |
void close() => _output.close(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment