Created
November 10, 2015 21:03
-
-
Save fhueske/4ea5422edb5820915fa4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws Exception { | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
DataStream<Tuple1<String>> stream = ... | |
DataStream<Tuple1<String>> filters = ... | |
stream | |
.keyBy(0) | |
.connect(filters.keyBy(0)) // partition both streams on same field and connect them | |
.flatMap(new StreamFilter()) // apply CoFlatMap function to update filters and to filter data | |
.print(); | |
// execute program | |
env.execute("Stream Filter"); | |
} | |
public static class StreamFilter | |
extends RichCoFlatMapFunction<Tuple1<String>, Tuple1<String>, Tuple1<String>> | |
implements Checkpointed<HashSet<String>> | |
{ | |
private HashSet<String> filters = new HashSet<>(); | |
@Override | |
public void flatMap1(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception { | |
if(!filters.contains(value.f0)) { | |
out.collect(value); | |
} | |
} | |
@Override | |
public void flatMap2(Tuple1<String> filter, Collector<Tuple1<String>> out) throws Exception { | |
if(!filters.contains(filter.f0)) { | |
filters.add(filter.f0); | |
} | |
else { | |
filters.remove(filter.f0); | |
} | |
} | |
@Override | |
public HashSet<String> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { | |
return filters; | |
} | |
@Override | |
public void restoreState(HashSet<String> state) throws Exception { | |
filters = state; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment