Created
June 19, 2020 06:08
-
-
Save TheNilesh/81fdba0db00861ba3eeca0c4589fb10f to your computer and use it in GitHub Desktop.
Apache Flink Example of using CoGroupFunction on shoe stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.quickheal.correlation; | |
import java.util.Date; | |
import java.util.Properties; | |
import org.apache.flink.api.common.functions.CoGroupFunction; | |
import org.apache.flink.api.common.functions.MapFunction; | |
import org.apache.flink.api.common.serialization.SimpleStringSchema; | |
import org.apache.flink.api.java.functions.KeySelector; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.ProcessFunction; | |
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; | |
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; | |
import org.apache.flink.util.Collector; | |
import org.apache.flink.util.OutputTag; | |
public class CoGroupExampleKafkaConsumer { | |
public static void main(String[] args) throws Exception { | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
DataStream<String> shoeStream = env.addSource(new FlinkKafkaConsumer010<>("test", | |
new SimpleStringSchema(), createConsumerProperties())); | |
DataStream<Tuple2<String, String>> tupleShoeStream = shoeStream | |
.map(new MapFunction<String, Tuple2<String, String>>() { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Tuple2<String, String> map(String value) throws Exception { | |
String[] items = value.split(","); | |
String color = items[0]; | |
String leg = items[1]; | |
return Tuple2.of(color, leg); | |
} | |
}) | |
.assignTimestampsAndWatermarks(new TimestampExtractor(Time.seconds(1))); | |
final OutputTag<String> outputTag = | |
new OutputTag<String>("side-output"){ | |
private static final long serialVersionUID = -1L; | |
}; | |
SingleOutputStreamOperator<String> leftShoeStream = tupleShoeStream | |
.process(new ProcessFunction<Tuple2<String, String>, String>() { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public void processElement( | |
Tuple2<String, String> value, | |
Context ctx, | |
Collector<String> out) throws Exception { | |
if (value.f1.startsWith("L") || value.f1.startsWith("l")) { | |
out.collect(value.f0); | |
} else { | |
ctx.output(outputTag, value.f0); | |
} | |
} | |
}); | |
DataStream<String> rightShoeStream = leftShoeStream.getSideOutput(outputTag); | |
DataStream<String> cogroupResult = leftShoeStream.coGroup(rightShoeStream) | |
.where(new FirstFieldSelector()) //f0 | |
.equalTo(new FirstFieldSelector()) | |
.window(TumblingEventTimeWindows.of(Time.seconds(5))) | |
.apply(new CoGroupFunction<String, String, String>() { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public void coGroup(Iterable<String> leftShoes, Iterable<String> rightShoes, | |
Collector<String> out) throws Exception { | |
System.out.println(System.currentTimeMillis()); | |
for (String shoeColor : leftShoes) { | |
System.out.println(shoeColor + ",left"); | |
} | |
for (String shoeColor : rightShoes) { | |
System.out.println(shoeColor + ",right"); | |
} | |
System.out.println("------------------"); | |
} | |
}); | |
cogroupResult.print(); | |
env.execute(); | |
} | |
protected static Properties createConsumerProperties() { | |
Properties properties = new Properties(); | |
properties.setProperty("bootstrap.servers", "localhost:9092"); | |
properties.setProperty("group.id", "connection"); | |
return properties; | |
} | |
public static class FirstFieldSelector implements KeySelector<String, String>{ | |
private static final long serialVersionUID = 1L; | |
@Override | |
public String getKey(String value) throws Exception { | |
return value; | |
} | |
} | |
public static class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, String>> { | |
private static final long serialVersionUID = 1L; | |
public TimestampExtractor(Time maxOutOfOrderness) { | |
super(maxOutOfOrderness); | |
} | |
@Override | |
public long extractTimestamp(Tuple2<String, String> event) { | |
return new Date().getTime(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output:
Where is the last shoe?