Created
December 6, 2017 17:20
-
-
Save howarddierking/3184c7d716de8fbb12389d120150024a to your computer and use it in GitHub Desktop.
Sample ParDo transformation for shredding CSV files into a map
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.howarddierking.demo; | |
import org.apache.beam.sdk.io.FileSystems; | |
import org.apache.beam.sdk.io.fs.ResourceId; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.commons.csv.CSVFormat; | |
import org.apache.commons.csv.CSVParser; | |
import org.apache.commons.csv.CSVRecord; | |
import org.apache.commons.io.FilenameUtils; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.nio.channels.FileChannel; | |
import java.nio.channels.ReadableByteChannel; | |
import java.nio.charset.Charset; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.UUID; | |
public class CsvShredder extends DoFn<String, Map<String, String>> { | |
@ProcessElement | |
public void processElement(ProcessContext context){ | |
try { | |
String filePath = context.element(); | |
// it appears from https://stackoverflow.com/questions/33052578/temp-files-in-google-cloud-dataflow that | |
// writing temp files as shown here is appropriate for Cloud Dataflow | |
ResourceId inputResourceId = FileSystems.matchNewResource(filePath, false); | |
ReadableByteChannel inputChannel = FileSystems.open(inputResourceId); | |
File outputFile = File.createTempFile(UUID.randomUUID().toString(),".tmp"); | |
FileOutputStream outStream = new FileOutputStream(outputFile); | |
FileChannel outChannel = outStream.getChannel(); | |
outChannel.transferFrom(inputChannel, 0, Integer.MAX_VALUE); | |
inputChannel.close(); | |
outChannel.close(); | |
CSVParser parser = CSVParser.parse(outputFile, Charset.forName("UTF-8"), CSVFormat.EXCEL.withHeader()); | |
Map<String, Integer> headerMap = parser.getHeaderMap(); | |
for (CSVRecord record : parser) { | |
Map<String, String> values = new HashMap<>(); | |
values.put("InputFile", FilenameUtils.removeExtension(FilenameUtils.getName(filePath))); | |
for (String headerValue : headerMap.keySet()) { | |
values.put(headerValue, record.get(headerValue)); | |
} | |
context.output(values); | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment