Created
May 25, 2011 20:52
-
-
Save mpouttuclarke/991941 to your computer and use it in GitHub Desktop.
Cacading.Avro issue 6 diff
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: AvroScheme.java | |
=================================================================== | |
--- AvroScheme.java (revision 1644) | |
+++ AvroScheme.java (working copy) | |
@@ -17,6 +17,7 @@ | |
package com.bixolabs.cascading.avro; | |
import java.io.IOException; | |
+import java.io.InputStream; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
@@ -25,10 +26,15 @@ | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Map; | |
+import java.util.Set; | |
import org.apache.avro.Schema; | |
+import org.apache.avro.file.DataFileStream; | |
import org.apache.avro.generic.GenericData; | |
+import org.apache.avro.generic.GenericDatumReader; | |
import org.apache.avro.generic.GenericEnumSymbol; | |
+import org.apache.avro.generic.GenericRecord; | |
+import org.apache.avro.io.DatumReader; | |
import org.apache.avro.mapred.AvroInputFormat; | |
import org.apache.avro.mapred.AvroJob; | |
import org.apache.avro.mapred.AvroKey; | |
@@ -38,12 +44,18 @@ | |
import org.apache.avro.mapred.AvroValue; | |
import org.apache.avro.mapred.AvroWrapper; | |
import org.apache.avro.util.Utf8; | |
+import org.apache.hadoop.fs.FileStatus; | |
+import org.apache.hadoop.fs.FileSystem; | |
+import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.BytesWritable; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.OutputCollector; | |
import org.apache.log4j.Logger; | |
+import com.bixolabs.cascading.avro.UnionAssembly.FSType; | |
+ | |
+import cascading.CascadingException; | |
import cascading.scheme.Scheme; | |
import cascading.tap.Tap; | |
import cascading.tuple.Fields; | |
@@ -56,14 +68,42 @@ | |
*/ | |
@SuppressWarnings("serial") | |
public class AvroScheme extends Scheme { | |
+ private static final String NESTED_DIRECTORY_MESG = "Can only process Arvo files, not directory "; | |
+ | |
+ public static final String DATA_CONVERSION_MESG = | |
+ "Cannot find data conversion for Avro: "; | |
+ | |
+ public static final String CLASS_FOR_DATA_TYPE_MESG = | |
+ "Failed to load type class for Avro data type: "; | |
+ | |
public static final Class<?> ARRAY_CLASS = List.class; | |
public static final Class<?> MAP_CLASS = Map.class; | |
+ public static enum FSType { | |
+ HFS, LFS | |
+ } | |
+ | |
+ private static final Map<String, String> REVERSE_TYPE_MAP = | |
+ new HashMap<String, String>(); | |
+ | |
+ static { | |
+ REVERSE_TYPE_MAP.put(Schema.Type.STRING.toString(), "java.lang.String"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.BOOLEAN.toString(), "java.lang.Boolean"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.LONG.toString(), "java.lang.Long"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.INT.toString(), "java.lang.Integer"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.FLOAT.toString(), "java.lang.Float"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.DOUBLE.toString(), "java.lang.Double"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.BYTES.toString(), "org.apache.hadoop.io.BytesWritable"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.ENUM.toString(), "lava.lang.Enum"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.ARRAY.toString(), ARRAY_CLASS.getName()); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.MAP.toString(), MAP_CLASS.getName()); | |
+ } | |
+ | |
/** | |
-* Helper class used to save an Enum name in a type that Avro requires for | |
-* serialization. | |
-* | |
-*/ | |
+ * Helper class used to save an Enum name in a type that Avro requires for | |
+ * serialization. | |
+ * | |
+ */ | |
private static class CascadingEnumSymbol implements GenericEnumSymbol { | |
private String _name; | |
@@ -153,6 +193,7 @@ | |
// Unpack this datum into source tuple fields | |
AvroWrapper<GenericData.Record> wrapper = (AvroWrapper<GenericData.Record>) key; | |
GenericData.Record datum = wrapper.datum(); | |
+ Schema schema = datum.getSchema(); | |
for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sourceFields.size(); fieldIndex++, typeIndex++) { | |
Class<?> curType = _schemeTypes[typeIndex]; | |
String fieldName = sourceFields.get(fieldIndex).toString(); | |
@@ -163,8 +204,14 @@ | |
} else if (curType == MAP_CLASS) { | |
typeIndex++; | |
result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex])); | |
+ } else if (curType.isEnum()) { | |
+ result.add(convertFromAvroPrimitive(inObj, | |
+ curType, | |
+ Enum.class)); | |
} else { | |
- result.add(convertFromAvroPrimitive(inObj, curType)); | |
+ result.add(convertFromAvroPrimitive(inObj, | |
+ curType, | |
+ getJavaType(schema.getField(fieldName).schema()))); | |
} | |
} | |
return result; | |
@@ -304,20 +351,42 @@ | |
for (Object name : names) { | |
enumNames.add(name.toString()); | |
} | |
- | |
return Schema.createEnum(fieldTypes[0].getName(), null, null, enumNames); | |
} else { | |
return Schema.create(avroType); | |
} | |
} | |
- private Object convertFromAvroPrimitive(Object inObj, Class<?> inType) { | |
+ private Object convertFromAvroPrimitive(Object inObj, | |
+ Class<?> targetType, | |
+ Class<?> sourceType) { | |
if (inObj == null) { | |
return null; | |
- } else if (inType == String.class) { | |
- String convertedObj = ((Utf8) inObj).toString(); | |
- return convertedObj; | |
- } else if (inType == BytesWritable.class) { | |
+ } else if (targetType == String.class | |
+ || targetType.isEnum()) { | |
+ return inObj.toString(); | |
+ } else if (sourceType != null && targetType != sourceType) { | |
+ //Data type conversion required due to type promotions | |
+ if(targetType == Long.class) { | |
+ if(sourceType == Integer.class) { | |
+ return new Long(((Integer)inObj).longValue()); | |
+ } else { | |
+ return Long.valueOf(inObj.toString()); | |
+ } | |
+ } else if(targetType == Double.class) { | |
+ if(sourceType == Float.class) { | |
+ return new Double(((Float)inObj).doubleValue()); | |
+ } else if(sourceType == Integer.class) { | |
+ return new Double(((Integer)inObj).doubleValue()); | |
+ } else { | |
+ return Double.valueOf(inObj.toString()); | |
+ } | |
+ } else { | |
+ throw new CascadingException("Cannot convert " + | |
+ sourceType.getName() + " to " + | |
+ targetType.getName()); | |
+ } | |
+ } else if (targetType == BytesWritable.class) { | |
// TODO KKr - this is very inefficient, since we make a copy of | |
// the ByteBuffer array in the call to BytesWritable.set(buffer, pos, length). | |
// A potentially small win is to check if buffer.position() == 0, and if | |
@@ -327,8 +396,6 @@ | |
BytesWritable result = new BytesWritable(); | |
result.set(buffer.array(), buffer.position(), buffer.limit()); | |
return result; | |
- } else if (inType.isEnum()) { | |
- return inObj.toString(); | |
} else { | |
return inObj; | |
} | |
@@ -347,7 +414,7 @@ | |
Tuple arrayTuple = new Tuple(); | |
Iterator<?> iter = arr.iterator(); | |
while (iter.hasNext()) { | |
- arrayTuple.add(convertFromAvroPrimitive(iter.next(), arrayType)); | |
+ arrayTuple.add(convertFromAvroPrimitive(iter.next(), arrayType, null)); | |
} | |
return arrayTuple; | |
} | |
@@ -363,7 +430,7 @@ | |
Tuple convertedMapTuple = new Tuple(); | |
for (Map.Entry<Utf8, Object> e : inMap.entrySet()) { | |
convertedMapTuple.add(e.getKey().toString()); | |
- convertedMapTuple.add(convertFromAvroPrimitive(e.getValue(), mapValueClass)); | |
+ convertedMapTuple.add(convertFromAvroPrimitive(e.getValue(), mapValueClass, null)); | |
} | |
return convertedMapTuple; | |
} | |
@@ -535,4 +602,195 @@ | |
t.add(mapTuple); | |
} | |
+ | |
+ /** | |
+ * Assembles the scheme assuming Hfs input path and default configuration. | |
+ * @param path | |
+ * @throws IOException | |
+ */ | |
+ public static AvroScheme unionOf(String path) | |
+ throws IOException | |
+ { | |
+ return unionOf(path, FSType.HFS); | |
+ } | |
+ | |
+ /** | |
+ * Assembles the scheme using a union of all fields within the input path | |
+ * using the default JobConf. | |
+ * @param fsType | |
+ * @param path | |
+ * @throws IOException | |
+ */ | |
+ @SuppressWarnings("deprecation") | |
+ public static AvroScheme unionOf(String path, FSType fsType) | |
+ throws IOException | |
+ { | |
+ return unionOf(path, FSType.HFS, new JobConf()); | |
+ } | |
+ | |
+ /** | |
+ * Assembles the scheme using a union of all fields within the input path | |
+ * using the specified JobConf. | |
+ * | |
+ * It is possible to have multiple Avro Schemas present within the input | |
+ * directory(s). Usually this occurs when the data format changes over time. | |
+ * This method create a unified scheme which can read multiple Avro schemas | |
+ * and return fields and data types as a normalized union of the schemas | |
+ * present in the input files one the path, using the following rules: | |
+ * 1. If data types of the fields do not conflict, simply take the union of | |
+ * the fields and data types of all input Schemas. | |
+ * 2. If the data types conflict, but can be promoted, then promote them | |
+ * (int -> long, float -> double, int -> double). | |
+ * 3. Otherwise, convert conflicting data types to String. | |
+ * Fields are returned in field name order. | |
+ * @param fsType | |
+ * @param conf | |
+ * @param path | |
+ * @throws IOException | |
+ */ | |
+ @SuppressWarnings("deprecation") | |
+ public static AvroScheme unionOf(String path, | |
+ FSType fsType, | |
+ JobConf conf) | |
+ throws IOException | |
+ { | |
+ DatumReader<GenericRecord> reader = | |
+ new GenericDatumReader<GenericRecord>(); | |
+ FileSystem fs = (fsType == FSType.HFS ? | |
+ FileSystem.get(conf) : | |
+ FileSystem.getLocal(conf)); | |
+ Map<String, Class<?>> coerceTo = new HashMap<String, Class<?>>(); | |
+ FileStatus[] stati = fs.listStatus(new Path(path)); | |
+ for(int x = 0; x < stati.length; x++) { | |
+ if(stati[x].isDir()) { | |
+ throw new IOException(NESTED_DIRECTORY_MESG + | |
+ stati[x].getPath().toString()); | |
+ } | |
+ Schema schema = getSchema(reader, fs, stati[x]); | |
+ unifyToMap(coerceTo, schema); | |
+ } | |
+ return fromMap(coerceTo); | |
+ } | |
+ | |
+ /** | |
+ * Gets an AvroScheme from a Map<String, Class<?>> | |
+ * @param map | |
+ * @return | |
+ */ | |
+ public static AvroScheme fromMap(Map<String, Class<?>> map) | |
+ { | |
+ String[] allNames = new String[map.size()]; | |
+ Class<?>[] allTypes = new Class<?>[allNames.length]; | |
+ Iterator<Map.Entry<String, Class<?>>> entries = map.entrySet().iterator(); | |
+ for(int x = 0; x < allNames.length; x++) { | |
+ Map.Entry<String, Class<?>> entry = entries.next(); | |
+ allNames[x] = entry.getKey(); | |
+ allTypes[x] = entry.getValue(); | |
+ } | |
+ return new AvroScheme(new Fields(allNames), allTypes); | |
+ } | |
+ | |
+ /** | |
+ * Creates a unified Schema by adding the field names from the passed | |
+ * schema into the Map and transforming Avro types to Java types usable | |
+ * to Cascading.Avro | |
+ * @param map | |
+ * @param schema | |
+ */ | |
+ public static void unifyToMap(Map<String, Class<?>> map, Schema schema) | |
+ { | |
+ List<Schema.Field> fields = schema.getFields(); | |
+ String[] names = new String[fields.size()]; | |
+ Class<?>[] types = new Class<?>[fields.size()]; | |
+ for(int y = 0; y < names.length; y++) | |
+ { | |
+ Schema.Field field = fields.get(y); | |
+ names[y] = field.name(); | |
+ Schema fieldSchema = field.schema(); | |
+ types[y] = getJavaType(fieldSchema); | |
+ if(map.containsKey(names[y])) { | |
+ Class<?> otherClass = map.get(names[y]); | |
+ if(!otherClass.equals(types[y])) { | |
+ map.put(names[y], promotion(types[y], otherClass)); | |
+ } | |
+ } else { | |
+ map.put(names[y], types[y]); | |
+ } | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * From a field's Schema, get the Java type associated to it. Note this is | |
+ * NOT a record schema, which contains a list of all fields. | |
+ * @param fieldSchema | |
+ */ | |
+ public static Class<?> getJavaType(Schema fieldSchema) | |
+ { | |
+ if(Schema.Type.RECORD.equals(fieldSchema.getType())) { | |
+ return null; | |
+ } | |
+ Class<?> result = null; | |
+ if (fieldSchema != null) { | |
+ List<Schema> typesList = fieldSchema.getTypes(); | |
+ if (typesList != null) { | |
+ //Skip 'null' type, which is assumed first in list | |
+ String dataTypeName = String.valueOf(typesList | |
+ .get(typesList.size() - 1)); | |
+ if (dataTypeName != null) { | |
+ result = getClassForAvroType(dataTypeName); | |
+ } | |
+ } | |
+ } | |
+ return result; | |
+ } | |
+ | |
+ protected static Schema getSchema(DatumReader<GenericRecord> reader, | |
+ FileSystem fs, | |
+ FileStatus status) | |
+ throws IOException | |
+ { | |
+ InputStream stream = fs.open(status.getPath()); | |
+ DataFileStream<GenericRecord> in = | |
+ new DataFileStream<GenericRecord>(stream, reader); | |
+ Schema schema = in.getSchema(); | |
+ in.close(); | |
+ return schema; | |
+ } | |
+ | |
+ protected static Class<?> promotion(Class<?> newType, Class<?> oldType) | |
+ { | |
+ if(newType == Long.class | |
+ && oldType == Integer.class | |
+ || newType == Double.class | |
+ && oldType == Float.class | |
+ || newType == Double.class | |
+ && oldType == Integer.class) | |
+ { | |
+ return newType; | |
+ } else { | |
+ return String.class; | |
+ } | |
+ } | |
+ | |
+ protected static Class<?> getClassForAvroType(String dataType) | |
+ { | |
+ Class<?> type = null; | |
+ /* | |
+ * TODO: this next line is due to an Avro bug where the data types are | |
+ * surrounded by quote characters when 'null' is present in types list | |
+ * need to log Avro bug. This is a work around... | |
+ */ | |
+ String dataTypeStripped = dataType.replaceAll("\\W+", "").toUpperCase(); | |
+ if (REVERSE_TYPE_MAP.containsKey(dataTypeStripped)) { | |
+ try { | |
+ type = Class.forName(REVERSE_TYPE_MAP.get(dataTypeStripped)); | |
+ } catch (ClassNotFoundException e) { | |
+ throw new CascadingException(CLASS_FOR_DATA_TYPE_MESG | |
+ + dataTypeStripped, e); | |
+ } | |
+ } else { | |
+ throw new CascadingException(DATA_CONVERSION_MESG + dataTypeStripped); | |
+ } | |
+ return type; | |
+ } | |
} | |
\ No newline at end of file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment