Last active
August 18, 2020 07:49
-
-
Save fhueske/e7820b47c2f8739931e9fc1a66d2268e to your computer and use it in GitHub Desktop.
Simple test for checking side outputs from onTimer() method.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ververica; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
import org.apache.flink.util.Collector; | |
import org.apache.flink.util.OutputTag; | |
public class UserTest { | |
public static OutTag sideOutput = new OutTag(); | |
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(2); | |
DataStream<Tuple2<Integer, String>> src1 = env.addSource(new GenSrc("first_")); | |
DataStream<Tuple2<Integer, String>> src2 = env.addSource(new GenSrc("second_")); | |
SingleOutputStreamOperator<String> connected = src1.keyBy(x -> x.f0) | |
.connect(src2.keyBy(x -> x.f0)).process(new MyCoFunc()); | |
DataStream<String> sideOut = connected.getSideOutput(sideOutput); | |
connected.print(); | |
sideOut.print(); | |
env.execute(); | |
} | |
public static class OutTag extends OutputTag<String> { | |
public OutTag() { | |
super("outT"); | |
} | |
} | |
public static class MyCoFunc extends KeyedCoProcessFunction<Integer, Tuple2<Integer, String>, Tuple2<Integer, String>, String> { | |
@Override | |
public void processElement1(Tuple2<Integer, String> v, Context ctx, Collector<String> collector) { | |
collector.collect(v.f1); | |
ctx.output(sideOutput, "SideOutput" + v.f1); | |
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 500); | |
} | |
@Override | |
public void processElement2(Tuple2<Integer, String> v, Context ctx, Collector<String> collector) { | |
collector.collect(v.f1); | |
ctx.output(sideOutput, "SideOutput" + v.f1); | |
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 500); | |
} | |
@Override | |
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) { | |
ctx.output(sideOutput, "TimerSideOut @ " + timestamp); | |
} | |
} | |
public static class GenSrc implements SourceFunction<Tuple2<Integer, String>> { | |
private final String val; | |
private long cnt; | |
private int keyCnt; | |
public GenSrc(String val) { | |
this.val = val; | |
this.cnt = 0; | |
this.keyCnt = 0; | |
} | |
@Override | |
public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws Exception { | |
while (true) { | |
int key = keyCnt++; | |
if (key > 4) { | |
key = 0; | |
} | |
sourceContext.collect(Tuple2.of(key, val + (cnt++))); | |
Thread.sleep(1000); | |
} | |
} | |
@Override | |
public void cancel() { } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment