-
-
Save dmazzer/502df7cc2fd1b07605adf0f139f9696a to your computer and use it in GitHub Desktop.
Simple Flink job streaming data from Kafka to local files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.restartstrategy.RestartStrategies; | |
import org.apache.flink.api.java.utils.ParameterTool; | |
import org.apache.flink.streaming.api.CheckpointingMode; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.CheckpointConfig; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; | |
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; | |
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; | |
import java.util.Properties; | |
public class DataTransfer { | |
public static void main(String[] args) throws Exception { | |
// Properties for Kafka | |
Properties kafkaProps = new Properties(); | |
kafkaProps.setProperty("topic", "flink-test"); | |
kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); | |
kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); | |
kafkaProps.setProperty("group.id", "flink-test"); | |
// Flink environment setup | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.getConfig().disableSysoutLogging(); | |
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); | |
// Flink check/save point setting | |
env.enableCheckpointing(30000); | |
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); | |
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000); | |
env.getCheckpointConfig().setCheckpointTimeout(10000); | |
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); | |
env.getCheckpointConfig().enableExternalizedCheckpoints( | |
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION | |
); | |
// Init the stream | |
DataStream<String> stream = env | |
.addSource(new FlinkKafkaConsumer08<String>( | |
"flink-test", | |
new SimpleStringSchema(), | |
kafkaProps)); | |
// Path of the output | |
String basePath = "<some-place-in-your-machine>"; // Here is you output path | |
BucketingSink<String> hdfsSink = new BucketingSink<>(basePath); | |
hdfsSink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HH-mm")); | |
stream.print(); | |
stream.addSink(hdfsSink); | |
env.execute(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment