Created
February 4, 2014 22:27
-
-
Save twasink/8813628 to your computer and use it in GitHub Desktop.
Example Hadoop Job that reads a cache file loaded from S3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Based on http://pragmaticintegrator.wordpress.com/2013/08/16/writing-a-hadoop-mapreduce-task-in-java/ | |
package net.twasink.hadoop; | |
import java.io.File; | |
import java.net.URI; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.joda.time.DateTime; | |
public class HadoopMain { | |
public static void main(String[] args) throws Exception | |
{ | |
String inputPath = args[0]; | |
String outputPath = args[1]; | |
String s3File = args[2]; | |
Configuration conf = new Configuration(); | |
Job job = Job.getInstance(conf, "twasink"); | |
job.setJarByClass(HadoopMain.class); | |
job.setMapperClass(MyMapper.class); | |
job.setReducerClass(MyReducer.class); | |
job.setOutputKeyClass(LongWritable.class); | |
job.setOutputValueClass(Text.class); | |
job.setMapOutputKeyClass(LongWritable.class); | |
job.setMapOutputValueClass(Text.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
FileInputFormat.addInputPath(job, new Path(inputPath)); | |
FileOutputFormat.setOutputPath(job, new Path(outputPath)); | |
// s3File should be a URI with s3: or s3n: protocol. It will be accessible as a local filed called 'theFile' | |
job.addCacheFile(new URI(s3File + "#theFile")); | |
boolean result = job.waitForCompletion(true); | |
System.exit(result ? 0 : 1); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Based on http://pragmaticintegrator.wordpress.com/2013/08/16/writing-a-hadoop-mapreduce-task-in-java/ | |
package net.twasink.hadoop; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.URI; | |
import java.util.StringTokenizer; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { | |
// Default implementation - pass the input to the output | |
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException | |
{ | |
super.map(key, value, context); | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, InterruptedException { | |
super.cleanup(context); | |
} | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) { | |
URI mappingFileUri = context.getCacheFiles()[0]; | |
if (mappingFileUri != null) { | |
// Would probably be a good idea to inspect the URI to see what the bit after the # is, as that's the file name | |
System.out.println("Mapping File: " + FileUtils.readFileToString(new File("./theFile"))); | |
} else { | |
System.out.println(">>>>>> NO MAPPING FILE"); | |
} | |
} else { | |
System.out.println(">>>>>> NO CACHE FILES AT ALL"); | |
} | |
} | |
} |
Super helpful. Thanks!
Sir, I am getting error for the getCacheFiles() method
The error is :- The method getCacheFiles() is undefined for the type Mapper<HipiImageHeader,FloatImage,IntWritable,IntWritable>.Context
May I get any solution for this as soon as possible
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I had all sorts of problems getting my head around how cache files work with Hadoop. Finally, I stumbled across the answer - when you add a cache file (see HadoopMain#48), it's available to read as a local file inside the mapper (MyMapper#36).
When running in Elastic MapReduce, the file URI can be an S3 file, using either s3://bucket/path or s3n://bucket/path - this may or may not work in other Hadoop implementations, but the general approach would work fine.