Created
April 13, 2018 16:33
-
-
Save jmason/015fbd242ec241d49fef76606a26058a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.orc.CompressionCodec; | |
import org.apache.orc.OrcFile; | |
import org.apache.orc.OrcProto; | |
import org.apache.orc.Writer; | |
import org.apache.orc.impl.PhysicalFsWriter; | |
import org.apache.orc.impl.StreamName; | |
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter; | |
import org.apache.orc.mapreduce.OrcOutputFormat; | |
import java.io.IOException; | |
/** | |
* A hacky override of OrcOutputFormat which won't generate Presto-incompatible | |
* UTF-8 Bloom Filter structures which Presto doesn't yet support. | |
* See https://github.com/prestodb/presto/issues/7120 . | |
* | |
* Use in place of OrcOutputFormat in your Hadoop mapper or reducer, and you | |
* can then call the usual OrcOutputFormat setter methods to set compressor | |
* classes, output path, etc. as normal. | |
*/ | |
public class PrestoCompatibleOrcOutputFormat<V extends Writable> extends OrcOutputFormat<V> { | |
private static final String EXTENSION = ".orc"; | |
@Override | |
public RecordWriter<NullWritable, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException { | |
Configuration conf = taskAttemptContext.getConfiguration(); | |
Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION); | |
OrcFile.WriterOptions opts = org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf); | |
FileSystem fs = filename.getFileSystem(opts.getConfiguration()); | |
opts.physicalWriter(new PrestoCompatiblePhysicalWriter(fs, filename, opts)); | |
Writer writer = OrcFile.createWriter(filename, opts); | |
return new OrcMapreduceRecordWriter<V>(writer); | |
} | |
public static class PrestoCompatiblePhysicalWriter extends PhysicalFsWriter { | |
PrestoCompatiblePhysicalWriter(FileSystem fs, Path filename, OrcFile.WriterOptions opts) throws IOException { | |
super(fs, filename, opts); | |
} | |
@Override | |
public void writeBloomFilter(StreamName name, | |
OrcProto.BloomFilterIndex.Builder bloom, | |
CompressionCodec codec) throws IOException { | |
if (!name.getKind().equals(OrcProto.Stream.Kind.BLOOM_FILTER_UTF8)) { | |
super.writeBloomFilter(name, bloom, codec); | |
} | |
// else inhibit writing this stream entry entirely | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment