Last active
April 11, 2019 06:39
-
-
Save bvpatel/5bb98bcdf5622262629c363da5b5b9eb to your computer and use it in GitHub Desktop.
Distributed cache for object in Map reduce job
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pubmatic.matrix.confluent.utils; | |
import org.apache.commons.io.FilenameUtils; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.JobContext; | |
import org.apache.log4j.Logger; | |
import java.io.*; | |
import java.net.URI; | |
import java.util.UUID; | |
/** | |
* {@link DistributedObjectCache} is used to store and distribute java object across all mappers and reducers. | |
* Hadoop API does not support this kind of functionality. We wrote custom wrapper on DistributedCache. | |
* We used serialization and deserialization to store and retrieve the object. The object which is going to | |
* store in DistributedObjectCache should implement serialize interface. We are storing object into HDFS. | |
* | |
* @author Bhavesh Patel | |
* @version 1.0 | |
* @since 2019-05-09 | |
*/ | |
public class DistributedObjectCache { | |
private static Logger LOGGER = Logger.getLogger(DistributedObjectCache.class); | |
//Default object path which is going to use for storing object into HDFS. | |
private static final String DEFAULT_OBJECT_PATH = "/raw/distributed_objects"; | |
//Default prefix for object key | |
private static final String DEFAULT_OBJECT_PREFIX = "distributed_object"; | |
//Default file extension for object key | |
private static final String DEFAULT_OBJECT_EXTENSION = ".ser"; | |
/** | |
* Store object into DistributedObjectCache and returns object key. | |
* The Object which is going to store into DistributedObjectCache implements Serialisation interface. | |
* | |
* @param job The Hadoop job which is used to store and distribute the object across all mappers and reducers | |
* @param object The object which is going to store into DistributedObjectCache | |
* @return The object key which is map to object and it will help to retrieve object from DistributedObjectCache | |
*/ | |
public static String addObjectToCache(Job job, Object object) { | |
return addObjectInCache(job, writeObjectToHDFS(job, DEFAULT_OBJECT_PATH + "/" + generateObjectKey(), object)); | |
} | |
/** | |
* Store object into DistributedObjectCache and returns object key. | |
* The Object which is going to store into DistributedObjectCache implements Serialisation interface. | |
* | |
* @param job The Hadoop job which is used to store and distribute the object across all mappers and reducers | |
* @param object The object which is going to store into DistributedObjectCache | |
* @param objectKey The object key with which the specified object is to be associated | |
* @return The object key which is map to object and it will help to retrieve object from DistributedObjectCache | |
*/ | |
public static String addObjectToCache(Job job, Object object, String objectKey) { | |
return addObjectInCache(job, writeObjectToHDFS(job, DEFAULT_OBJECT_PATH + "/" + getObjectKey(objectKey), object)); | |
} | |
/** | |
* Store object into DistributedObjectCache and returns object key. | |
* The Object which is going to store into DistributedObjectCache implements Serialisation interface. | |
* | |
* @param job The Hadoop job which is used to store and distribute the object across all mappers and reducers | |
* @param object The object which is going to store into DistributedObjectCache | |
* @param objectKey The object key with which the specified object is to be associated | |
* @param hdfsPath The HDFS path which is used to store objects on HDFS. | |
* @return The object key which is map to object and it will help to retrieve object from DistributedObjectCache | |
*/ | |
public static String addObjectToCache(Job job, Object object, String objectKey, String hdfsPath) { | |
return addObjectInCache(job, writeObjectToHDFS(job, hdfsPath + "/" + getObjectKey(objectKey), object)); | |
} | |
/** | |
* Returns the object to which the specified object key is mapped, or null | |
* if {@link DistributedObjectCache} contains no mapping for the object key. | |
* | |
* @param context The hadoop job context i.e. mapper context or reducer context | |
* @param objectKey The object key with which the specified object is to be associated | |
* @param objectClass The class of the object | |
* @return the value to which the specified object key is mapped, or null | |
* if {@link DistributedObjectCache} contains no mapping for the object key | |
*/ | |
public static Object getObjectFromCache(JobContext context, String objectKey, Class objectClass) { | |
Object object = loadObjectFromHDFS(context, getObjectPathFromCache(context, objectKey)); | |
return objectClass.cast(object); | |
} | |
/** | |
* Returns object path to which the specified object key is mapped from hadoop context(mapper or reducer context), | |
* or null if hadoop job context contains on mapping for the object key. | |
* | |
* @param context The hadoop map-reduce job context | |
* @param objectKey The object key with which the specified object path is to be associated | |
* @return the object path to which the specified object key is mapped, or null | |
* if The hadoop map-reduce job context contains no mapping for the object key | |
*/ | |
private static String getObjectPathFromCache(JobContext context, String objectKey) { | |
try { | |
LOGGER.info("Getting object path from cache objects for DistributedObjectCache for Object key: " + objectKey); | |
if (context == null || objectKey == null) { | |
LOGGER.warn("Job context or Object key is null. " + | |
"It is mandatory parameter for getting object from cache"); | |
LOGGER.warn("Object key: " + objectKey); | |
return null; | |
} | |
URI[] cacheObjects = context.getCacheFiles(); | |
if (cacheObjects != null && cacheObjects.length > 0) { | |
for (URI cacheObjectPath : cacheObjects) { | |
LOGGER.info("Objects from cache: " + cacheObjectPath); | |
if (cacheObjectPath.toString().contains(objectKey)) { | |
LOGGER.info("Successfully found object path from cache objects for DistributedObjectCache. Object path: " + cacheObjectPath); | |
return cacheObjectPath.toString(); | |
} | |
} | |
} | |
} catch (IOException ex) { | |
LOGGER.error("Exception occurred while getting object from cache. ", ex); | |
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", ex); | |
} | |
return null; | |
} | |
/** | |
* Returns the object to which the specified object key is mapped, or | |
* null if {@link DistributedObjectCache} contains no mapping for the object key. | |
* | |
* @param context The hadoop job context i.e. mapper context or reducer context | |
* @param objectKey The object key with which the specified object is to be associated | |
* @return the value to which the specified object key is mapped, or | |
* null if {@link DistributedObjectCache} contains no mapping for the object key | |
*/ | |
private static Object loadObjectFromCache(JobContext context, String objectKey) { | |
LOGGER.info("Loading Object from HDFS for DistributedObjectCache with Object key: " + objectKey); | |
if (context == null || objectKey == null) { | |
LOGGER.warn("Job context or Object key is null. " + | |
"It is mandatory parameter for getting object from cache"); | |
LOGGER.warn("Object key: " + objectKey); | |
return null; | |
} | |
ObjectInputStream ois = null; | |
FileInputStream fis = null; | |
try { | |
fis = new FileInputStream(FilenameUtils.getName(objectKey)); | |
ois = new ObjectInputStream(fis); | |
Object object = ois.readObject(); | |
LOGGER.info("Successfully Loaded Object from HDFS for DistributedObjectCache with Object key: " + objectKey); | |
return object; | |
} catch (IOException ex) { | |
LOGGER.error("Exception occurred while getting object from cache. ", ex); | |
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", ex); | |
} catch (ClassNotFoundException e) { | |
LOGGER.error("Exception occurred while getting object from cache. ", e); | |
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", e); | |
} finally { | |
try { | |
if (fis != null) | |
fis.close(); | |
if (ois != null) | |
ois.close(); | |
} catch (IOException e) { | |
LOGGER.error("Exception occurred while getting object from cache", e); | |
} | |
} | |
} | |
/** | |
* Returns the object to which the specified object key is mapped, or | |
* null if {@link DistributedObjectCache} contains no mapping for the object key. | |
* | |
* @param context The hadoop job context i.e. mapper context or reducer context | |
* @param objectKey The object key with which the specified object is to be associated | |
* @return the value to which the specified object key is mapped, or | |
* null if {@link DistributedObjectCache} contains no mapping for the object key | |
*/ | |
private static Object loadObjectFromHDFS(JobContext context, String objectKey) { | |
LOGGER.info("Loading Object from HDFS for DistributedObjectCache with Object key: " + objectKey); | |
if (context == null || objectKey == null) { | |
LOGGER.warn("Job context or Object key is null. " + | |
"It is mandatory parameter for getting object from cache"); | |
LOGGER.warn("Object key: " + objectKey); | |
return null; | |
} | |
ObjectInputStream ois = null; | |
try { | |
FileSystem fileSystem = FileSystem.get(context.getConfiguration()); | |
ois = new ObjectInputStream(fileSystem.open(new Path(objectKey))); | |
Object object = ois.readObject(); | |
LOGGER.info("Successfully Loaded Object from HDFS for DistributedObjectCache with Object key: " + objectKey); | |
return object; | |
} catch (IOException ex) { | |
LOGGER.error("Exception occurred while getting object from cache. ", ex); | |
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", ex); | |
} catch (ClassNotFoundException e) { | |
LOGGER.error("Exception occurred while getting object from cache. ", e); | |
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", e); | |
} finally { | |
try { | |
if (ois != null) | |
ois.close(); | |
} catch (IOException e) { | |
LOGGER.error("Exception occurred while getting object from cache", e); | |
} | |
} | |
} | |
/** | |
* Store object into HDFS. It will serialize object and store into HDFS. Object should support serialization | |
* | |
* @param job Hadoop map-reduce job | |
* @param objectKey The object key with which the specified object is to be associated | |
* @param object The object which store into {@link DistributedObjectCache} | |
* @return The object key with which the specified object is to be associated | |
*/ | |
private static String writeObjectToHDFS(Job job, String objectKey, Object object) { | |
LOGGER.info("Writing Object into HDFS for DistributedObjectCache with Object key: " + objectKey); | |
if (job == null || objectKey == null || object == null) { | |
LOGGER.warn("Job configuration, Object key or Object is null. Please provide valid inputs"); | |
LOGGER.warn("Object key: " + objectKey + " , Object: " + object); | |
return null; | |
} | |
OutputStream out = null; | |
ObjectOutputStream oos = null; | |
try { | |
FileSystem fs = FileSystem.get(URI.create(objectKey), job.getConfiguration()); | |
out = fs.create(new Path(objectKey)); | |
oos = new ObjectOutputStream(out); | |
oos.writeObject(object); | |
LOGGER.info("Successfully wrote Object: " + objectKey + " into HDFS"); | |
return objectKey; | |
} catch (IOException ioe) { | |
LOGGER.error("Unable to add object into DistributedObjectCache.", ioe); | |
throw new DistributedObjectCacheNotFound("Unable to find object key into DistributedObjectCache", ioe); | |
} finally { | |
try { | |
if (out != null) { | |
out.flush(); | |
out.close(); | |
} | |
if (oos != null) { | |
oos.flush(); | |
oos.close(); | |
} | |
} catch (IOException ioe) { | |
LOGGER.warn("Unable to close output stream while adding DistributedObjectCache", ioe); | |
} | |
} | |
} | |
/** | |
* Add object file path into the hadoop map-reduce job cache. | |
* | |
* @param job The hadoop map reduce job | |
* @param objectKey Unique object key | |
* @return Object key which will use for retrieve Object from cache. | |
*/ | |
private static String addObjectInCache(Job job, String objectKey) { | |
if (objectKey != null) | |
job.addCacheFile(new Path(objectKey).toUri()); | |
return FilenameUtils.getName(objectKey); | |
} | |
/** | |
* Check object key is valid. If object key is null then will create new unique object key | |
* | |
* @param objectKey the objectkey | |
* @return Unique object key | |
*/ | |
private static String getObjectKey(String objectKey) { | |
if (objectKey == null) | |
return generateObjectKey(); | |
return isObjectNameExtension(objectKey) ? objectKey : objectKey + DEFAULT_OBJECT_EXTENSION; | |
} | |
/** | |
* Find the object key has extension or not. | |
* | |
* @param objectKey The object key which we want to find object extension. | |
* @return true if object name has extension else false. | |
*/ | |
private static boolean isObjectNameExtension(String objectKey) { | |
String extension = FilenameUtils.getExtension(objectKey); | |
return extension != null && !extension.trim().isEmpty(); | |
} | |
/** | |
* Generate unique object key with default object key prefix and extension | |
* | |
* @return Generated unique object key | |
*/ | |
private static String generateObjectKey() { | |
return DEFAULT_OBJECT_PREFIX + UUID.randomUUID().toString() + DEFAULT_OBJECT_EXTENSION; | |
} | |
} | |
/** | |
* {@link DistributedObjectCacheNotFound} is used to throw exception when object is not found or unable to read from DistributedObjectCache. | |
* | |
* @author Bhavesh Patel | |
* @version 1.0 | |
* @since 2019-05-09 | |
*/ | |
class DistributedObjectCacheNotFound extends RuntimeException { | |
public DistributedObjectCacheNotFound(Exception ex) { | |
super(ex); | |
} | |
public DistributedObjectCacheNotFound(String message, Exception ex) { | |
super(message, ex); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment