Last active
November 17, 2019 08:31
-
-
Save jrask/ef4a8531b0563f1420ce276e7b0f59ce to your computer and use it in GitHub Desktop.
Flink hdfs -> hdfs does to "commit" files, they stay as ".in-progress" files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws Exception { | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1), CheckpointingMode.EXACTLY_ONCE); | |
env.setStateBackend(new FsStateBackend("hdfs://<server><dir>/checkpoints",true)); | |
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); | |
StreamingFileSink<JsonObject> hdfsSink = StreamingFileSink | |
.<JsonObject>forRowFormat(new Path("hdfs://<server>/<dir>"), new SimpleStringEncoder<>("UTF-8")) | |
.withBucketAssigner(new EventTimeDateTimeBuckerAssigner<>("'/year'=YYYY/'month'=MM/'day'=dd/'hour'=HH")) | |
.build(); | |
env.readTextFile("hdfs://<server><dir><file>") | |
.map(Parser::parse) | |
.addSink(hdfsSink); | |
env.execute("some-pipeline"); | |
} | |
/* | |
hdfs -> hdfs esults in the following. However if I use kafka as source, it works properly. | |
rw-rw----+ 3 someuser supergroup 87792789 2019-11-16 20:57 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-62.inprogress.8f9c6104-4c6c-4eee-8650-dd5d1d12d668 | |
-rw-rw----+ 3 someuser supergroup 64696413 2019-11-16 20:58 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-63.inprogress.42589a04-601b-496d-ae20-7db1d56089dc | |
-rw-rw----+ 3 someuser supergroup 71108086 2019-11-16 20:59 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-64.inprogress.97b00857-808d-4c4e-9a50-a42af6c604f5 | |
-rw-rw----+ 3 someuser supergroup 74191577 2019-11-16 21:00 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-65.inprogress.41ff8792-2647-4a50-b2b1-f74cb94aeafe | |
-rw-rw----+ 3 someuser supergroup 68633473 2019-11-16 21:01 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-66.inprogress.62bf4ee9-f897-4782-823b-23ccc7d58d96 | |
-rw-rw----+ 3 someuser supergroup 71332201 2019-11-16 21:03 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-67.inprogress.d51af4c9-c200-4cee-84e4-979fb6b1e958 | |
-rw-rw----+ 3 someuser supergroup 21657645 2019-11-16 21:04 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-68.inprogress.de9a2497-e99c-48ef-85b8-a73dd9f62643 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment