Created
June 10, 2011 03:24
-
-
Save mpouttuclarke/1018180 to your computer and use it in GitHub Desktop.
Cascading.Avro_issue_6_Avro 1.4_2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: src/com/bixolabs/cascading/avro/AvroScheme.java | |
=================================================================== | |
--- src/com/bixolabs/cascading/avro/AvroScheme.java (revision 1744) | |
+++ src/com/bixolabs/cascading/avro/AvroScheme.java (working copy) | |
@@ -41,14 +41,12 @@ | |
import org.apache.avro.mapred.AvroKey; | |
import org.apache.avro.mapred.AvroKeyComparator; | |
import org.apache.avro.mapred.AvroOutputFormat; | |
-import org.apache.avro.mapred.AvroRecordReader; | |
import org.apache.avro.mapred.AvroSerialization; | |
import org.apache.avro.mapred.AvroValue; | |
import org.apache.avro.mapred.AvroWrapper; | |
import org.apache.avro.mapred.FsInput; | |
-import org.apache.avro.reflect.ReflectDatumReader; | |
-import org.apache.avro.specific.SpecificDatumReader; | |
import org.apache.avro.util.Utf8; | |
+import org.apache.commons.collections.iterators.ArrayListIterator; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
@@ -192,6 +190,7 @@ | |
private HashMap<Class<?>, Schema.Type> _typeMap = createTypeMap(); | |
private transient Schema _schema; | |
+ private boolean convertingTypes; | |
public AvroScheme(Fields schemeFields, Class<?>[] schemeTypes) { | |
super(schemeFields, schemeFields); | |
@@ -202,10 +201,33 @@ | |
_schemeTypes = schemeTypes; | |
} | |
- @Override | |
+ /** | |
+ * @return the convertingTypes | |
+ */ | |
+ public boolean isConvertingTypes() | |
+ { | |
+ return convertingTypes; | |
+ } | |
+ | |
+ /** | |
+ * @param convertingTypes the convertingTypes to set | |
+ */ | |
+ public void setConvertingTypes(boolean convertingTypes) | |
+ { | |
+ this.convertingTypes = convertingTypes; | |
+ } | |
+ | |
+ @Override | |
public void sourceInit(Tap tap, JobConf conf) throws IOException { | |
- // conf.set(AvroJob.INPUT_SCHEMA, getSchema().toString()); | |
- conf.setInputFormat(CascadingAvroInputFormat.class); | |
+ if (convertingTypes) | |
+ { // Many schemas supported | |
+ conf.setInputFormat(CascadingAvroInputFormat.class); | |
+ } | |
+ else | |
+ { // We can only support 1 fixed schema with this option | |
+ conf.set(AvroJob.INPUT_SCHEMA, getSchema().toString()); | |
+ conf.setInputFormat(AvroInputFormat.class); | |
+ } | |
// add AvroSerialization to io.serializations | |
Collection<String> serializations = conf | |
@@ -266,20 +288,39 @@ | |
AvroWrapper<GenericData.Record> wrapper = (AvroWrapper<GenericData.Record>) key; | |
GenericData.Record datum = wrapper.datum(); | |
for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sourceFields.size(); fieldIndex++, typeIndex++) { | |
- Class<?> curType = _schemeTypes[typeIndex]; | |
String fieldName = sourceFields.get(fieldIndex).toString(); | |
- Object inObj = datum.get(fieldName); | |
- if (curType == ARRAY_CLASS) { | |
- typeIndex++; | |
- result.add(convertFromAvroArray(inObj, _schemeTypes[typeIndex])); | |
- } else if (curType == MAP_CLASS) { | |
- typeIndex++; | |
- result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex])); | |
- } else if (curType.isEnum()) { | |
- result.add(convertFromAvroPrimitive(inObj, Enum.class)); | |
- } else { | |
- result.add(convertFromAvroPrimitive(inObj, curType)); | |
+ try | |
+ { | |
+ Class<?> curType = _schemeTypes[typeIndex]; | |
+ Object inObj = datum.get(fieldName); | |
+ if (curType == ARRAY_CLASS) | |
+ { | |
+ typeIndex++; | |
+ result.add(convertFromAvroArray(inObj, _schemeTypes[typeIndex])); | |
+ } | |
+ else if (curType == MAP_CLASS) | |
+ { | |
+ typeIndex++; | |
+ result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex])); | |
+ } | |
+ else if (curType.isEnum()) | |
+ { | |
+ result.add(convertFromAvroPrimitive(inObj, Enum.class)); | |
+ } | |
+ else | |
+ { | |
+ result.add(convertFromAvroPrimitive(inObj, curType)); | |
+ } | |
} | |
+ catch (CascadingException e) | |
+ { | |
+ throw e; | |
+ } | |
+ catch (Exception e) | |
+ { | |
+ throw new CascadingException(datum.getSchema().getFullName() + " failed to load: " | |
+ + fieldName + "=" + _schemeTypes[typeIndex], e); | |
+ } | |
} | |
return result; | |
} | |
@@ -299,28 +340,37 @@ | |
for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sinkFields.size(); fieldIndex++, typeIndex++) { | |
String fieldName = sinkFields.get(fieldIndex).toString(); | |
- Class<?> curType = _schemeTypes[typeIndex]; | |
- if (curType == ARRAY_CLASS) { | |
- typeIndex++; | |
- datum.put(fieldName, convertToAvroArray(result.get(fieldIndex), | |
- _schemeTypes[typeIndex])); | |
- } else if (curType == MAP_CLASS) { | |
- typeIndex++; | |
- datum.put(fieldName, convertToAvroMap(result.get(fieldIndex), | |
- _schemeTypes[typeIndex])); | |
- } else { | |
- try { | |
- datum.put(fieldName, convertToAvroPrimitive(result | |
- .get(fieldIndex), _schemeTypes[typeIndex])); | |
- } catch (ClassCastException e) { | |
- throw new CascadingException("Type for field name: " | |
- + fieldName + "=" + _schemeTypes[typeIndex] | |
- + " does not match type of value " | |
- + result.get(fieldIndex) + "=" | |
- + result.get(fieldIndex).getClass() | |
- + ", try using the unionOf factory method " | |
- + "to create the AvroScheme", e); | |
- } | |
+ try | |
+ { | |
+ Class<?> curType = _schemeTypes[typeIndex]; | |
+ if (curType == ARRAY_CLASS) | |
+ { | |
+ typeIndex++; | |
+ datum.put(fieldName, | |
+ convertToAvroArray(result.get(fieldIndex), _schemeTypes[typeIndex])); | |
+ } | |
+ else if (curType == MAP_CLASS) | |
+ { | |
+ typeIndex++; | |
+ datum.put(fieldName, | |
+ convertToAvroMap(result.get(fieldIndex), _schemeTypes[typeIndex])); | |
+ } | |
+ else | |
+ { | |
+ datum.put(fieldName, | |
+ convertToAvroPrimitive(result.get(fieldIndex), | |
+ _schemeTypes[typeIndex])); | |
+ } | |
+ } | |
+ catch (CascadingException e) | |
+ { | |
+ throw e; | |
+ } | |
+ catch (Exception e) | |
+ { | |
+ throw new CascadingException(datum.getSchema().getFullName() + " failed to save: " | |
+ + fieldName + ", " + _schemeTypes[typeIndex] + "=" | |
+ + result.get(fieldIndex), e); | |
} | |
} | |
@@ -473,15 +523,15 @@ | |
//Data type conversion required due to type promotions | |
if(targetType == Long.class) { | |
if(sourceType == Integer.class) { | |
- return new Long(((Integer)inObj).longValue()); | |
+ return ((Integer) inObj).longValue(); | |
} else { | |
return Long.valueOf(inObj.toString()); | |
} | |
} else if(targetType == Double.class) { | |
if(sourceType == Float.class) { | |
- return new Double(((Float)inObj).doubleValue()); | |
+ return ((Float) inObj).doubleValue(); | |
} else if(sourceType == Integer.class) { | |
- return new Double(((Integer)inObj).doubleValue()); | |
+ return ((Integer) inObj).doubleValue(); | |
} else { | |
return Double.valueOf(inObj.toString()); | |
} | |
@@ -722,55 +772,87 @@ | |
* JobConf. | |
* | |
* @param fsType | |
- * @param path | |
+ * @param paths | |
* @throws IOException | |
*/ | |
- public static AvroScheme unionOf(String path) | |
+ public static AvroScheme unionOf(String... paths) | |
throws IOException | |
{ | |
- return unionOf(path, new JobConf()); | |
+ return unionOf(new JobConf(), null, paths); | |
} | |
/** | |
- * Assembles the scheme using a union of all fields within the input path using the specified | |
- * JobConf. Extracts all the Avro Schemas from all files to create the scheme. | |
+ * Assembles the scheme using a union of all schemas within the input path using the default | |
+ * JobConf and the specified projection. | |
* | |
+ * @param projection | |
+ * @param paths | |
+ * @return | |
+ * @throws IOException | |
+ */ | |
+ public static AvroScheme unionOf(Fields projection, String... paths) | |
+ throws IOException | |
+ { | |
+ return unionOf(new JobConf(), projection, paths); | |
+ } | |
+ | |
+ /** | |
+ * Assembles the scheme using a union of all fields within the input paths using the specified | |
+ * JobConf. Extracts all the Avro Schemas from all files to create the scheme. Paths can refer | |
+ * to any combination of direct paths to Avro files or paths to directories containing Avro | |
+ * files. This method does not recurse directories. | |
+ * | |
* It is possible to have multiple Avro Schemas present within the input directory(s). Usually | |
* this occurs when the data format changes over time. This method create a unified scheme which | |
* can read multiple Avro schemas and return fields and data types as a normalized union of the | |
- * schemas present in the input files one the path, using the following rules: | |
- * 1. If data types of the fields do not conflict, simply take the union of the fields and data | |
- * types of all input Schemas. | |
- * 2. If the data types conflict, but can be promoted, then promote them (int -> long, | |
- * float -> double, int -> double). | |
- * 3. Otherwise, convert conflicting data types to String. | |
+ * schemas present in the input files one the path, using the following rules: 1. If data types | |
+ * of the fields do not conflict, simply take the union of the fields and data types of all | |
+ * input Schemas. 2. If the data types conflict, but can be promoted, then promote them (int -> | |
+ * long, float -> double, int -> double). 3. Otherwise, convert conflicting data types to | |
+ * String. | |
* | |
* Fields are returned in field name order. Does not currently support nested array or map | |
- * types, these are removed from the union projection. | |
+ * types, these are removed from the union projection. If a projection is provided, then only | |
+ * fields in the projection will be returned. | |
* | |
- * @param path | |
* @param conf | |
+ * @param projection | |
+ * @param paths | |
* @return | |
* @throws IOException | |
*/ | |
- public static AvroScheme unionOf(String path, | |
- JobConf conf) | |
+ public static AvroScheme unionOf(JobConf conf, Fields projection, String... paths) | |
throws IOException | |
{ | |
- Path pathObj = new Path(path); | |
DatumReader<GenericRecord> reader = | |
- new GenericDatumReader<GenericRecord>(); | |
- FileSystem fs = pathObj.getFileSystem(conf); | |
+ new GenericDatumReader<GenericRecord>(); | |
Map<String, Class<?>> coerceTo = new TreeMap<String, Class<?>>(); | |
- FileStatus[] stati = fs.listStatus(pathObj); | |
- for(int x = 0; x < stati.length; x++) { | |
- if(stati[x].isDir()) { | |
- continue; | |
+ for (String path : paths) | |
+ { | |
+ Path pathObj = new Path(path); | |
+ FileSystem fs = pathObj.getFileSystem(conf); | |
+ if(fs.isFile(pathObj)) | |
+ { | |
+ Schema schema = getSchema(reader, fs, pathObj); | |
+ unifyToMap(coerceTo, schema); | |
} | |
- Schema schema = getSchema(reader, fs, stati[x]); | |
- unifyToMap(coerceTo, schema); | |
+ else | |
+ { | |
+ FileStatus[] stati = fs.listStatus(pathObj); | |
+ for (int x = 0; x < stati.length; x++) | |
+ { | |
+ if (stati[x].isDir()) | |
+ { | |
+ continue; | |
+ } | |
+ Schema schema = getSchema(reader, fs, stati[x].getPath()); | |
+ unifyToMap(coerceTo, projection, schema); | |
+ } | |
+ } | |
} | |
- return fromMap(coerceTo); | |
+ AvroScheme result = fromMap(coerceTo); | |
+ result.setConvertingTypes(true); | |
+ return result; | |
} | |
/** | |
@@ -798,30 +880,59 @@ | |
* | |
* @param map | |
* @param schema | |
+ * @param map | |
+ * @param schema | |
*/ | |
public static void unifyToMap(Map<String, Class<?>> map, Schema schema) | |
{ | |
+ unifyToMap(map, null, schema); | |
+ } | |
+ | |
+ /** | |
+ * Creates a unified Schema by adding the field names from the passed schema into the Map and | |
+ * transforming Avro types to Java types usable to Cascading.Avro. Does not currently support | |
+ * nested array or map types, these are removed from the union projection. | |
+ * | |
+ * Fields not in the projection set are ignored. | |
+ * | |
+ * @param map | |
+ * @param projection | |
+ * @param schema | |
+ */ | |
+ public static void unifyToMap(Map<String, Class<?>> map, Fields projection, Schema schema) | |
+ { | |
List<Schema.Field> fields = schema.getFields(); | |
- String[] names = new String[fields.size()]; | |
- Class<?>[] types = new Class<?>[fields.size()]; | |
- for(int y = 0; y < names.length; y++) | |
+ List<String> projectionList = new ArrayList<String>(); | |
+ if (projection != null) | |
{ | |
+ for (int x = 0; x < projection.size(); x++) | |
+ { | |
+ projectionList.add(projection.get(x).toString()); | |
+ } | |
+ } | |
+ for (int y = 0; y < fields.size(); y++) | |
+ { | |
Schema.Field field = fields.get(y); | |
- names[y] = field.name(); | |
+ String name = field.name(); | |
+ if (projection != null && !projectionList.contains(name)) | |
+ { | |
+ continue; | |
+ } | |
Schema fieldSchema = field.schema(); | |
- types[y] = getJavaType(fieldSchema); | |
+ Class<?> type = getJavaType(fieldSchema); | |
// TODO: currently excluding maps and lists, need a different data | |
// structure to support them, Map with a single type won't work | |
- if (!Map.class.isAssignableFrom(types[y]) | |
- && !List.class.isAssignableFrom(types[y])) | |
+ if (!MAP_CLASS.isAssignableFrom(type) && !ARRAY_CLASS.isAssignableFrom(type)) | |
{ | |
- if(map.containsKey(names[y])) { | |
- Class<?> otherClass = map.get(names[y]); | |
- if(!otherClass.equals(types[y])) { | |
- map.put(names[y], promotion(types[y], otherClass)); | |
+ if (map.containsKey(name)) | |
+ { | |
+ Class<?> otherClass = map.get(name); | |
+ if (!otherClass.equals(type)) | |
+ { | |
+ map.put(name, promotion(type, otherClass)); | |
} | |
} else { | |
- map.put(names[y], types[y]); | |
+ map.put(name, type); | |
} | |
} | |
} | |
@@ -857,10 +968,10 @@ | |
protected static Schema getSchema(DatumReader<GenericRecord> reader, | |
FileSystem fs, | |
- FileStatus status) | |
+ Path path) | |
throws IOException | |
{ | |
- InputStream stream = fs.open(status.getPath()); | |
+ InputStream stream = fs.open(path); | |
DataFileStream<GenericRecord> in = | |
new DataFileStream<GenericRecord>(stream, reader); | |
Schema schema = in.getSchema(); | |
Index: test/com/bixolabs/cascading/avro/AvroSchemeTest.java | |
=================================================================== | |
--- test/com/bixolabs/cascading/avro/AvroSchemeTest.java (revision 1744) | |
+++ test/com/bixolabs/cascading/avro/AvroSchemeTest.java (working copy) | |
@@ -33,7 +33,6 @@ | |
public class AvroSchemeTest { | |
- private static final String UNION_STR = "UNION"; | |
private static final String OUTPUT_DIR = "build/test/AvroSchmeTest/"; | |
private static enum TestEnum { | |
@@ -131,7 +130,8 @@ | |
"bytesField", | |
"arrayOfLongsField", | |
"mapOfStringsField", | |
- "enumField"); | |
+ "enumField", | |
+ "dummyField"); | |
final Class<?>[] schemeTypes2 = | |
{ Double.class, | |
Integer.class, | |
@@ -144,7 +144,8 @@ | |
Long.class, | |
Map.class, | |
String.class, | |
- TestEnum.class }; | |
+ TestEnum.class, | |
+ String.class }; | |
final String in = OUTPUT_DIR+ "testRoundTrip/in"; | |
final String out = OUTPUT_DIR + "testRoundTrip/out"; | |
@@ -163,12 +164,14 @@ | |
// Now read from the results, and write to an Avro file. | |
Pipe writePipe = new Pipe("tuples to avro"); | |
- Tap avroSink = new Lfs(new AvroScheme(testFields, schemeTypes), out); | |
+ AvroScheme avroScheme = new AvroScheme(testFields, schemeTypes); | |
+ assertFalse("Default converting types?", avroScheme.isConvertingTypes()); | |
+ Tap avroSink = new Lfs(avroScheme, out); | |
Flow flow = new FlowConnector().connect(lfsSource, avroSink, writePipe); | |
flow.complete(); | |
// Now read it back in, and verify that the data/types match up. | |
- Tap avroSource = new Lfs(new AvroScheme(testFields, schemeTypes), out); | |
+ Tap avroSource = new Lfs(avroScheme, out); | |
Tap verifySink = avroToTuples(testFields, verifyout, avroSource); | |
TupleEntryIterator sinkTuples = verifySink.openForRead(new JobConf()); | |
verifyOutput(numRecords, sinkTuples, false); | |
@@ -185,10 +188,23 @@ | |
// Now read union back in, and verify that the data/types match up. | |
AvroScheme unionOf = AvroScheme.unionOf(out); | |
+ assertTrue("Union converting types?", unionOf.isConvertingTypes()); | |
assertEquals("Check union scheme", | |
+ "{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'booleanField','type':['null','boolean'],'doc':''},{'name':'bytesField','type':['null','bytes'],'doc':''},{'name':'doubleField','type':['null','double'],'doc':''},{'name':'dummyField','type':['null','string'],'doc':''},{'name':'enumField','type':['null','string'],'doc':''},{'name':'floatField','type':['null','string'],'doc':''},{'name':'integerField','type':['null','string'],'doc':''},{'name':'longField','type':['null','long'],'doc':''},{'name':'stringField','type':['null','string'],'doc':''}]}", | |
+ unionOf.getJsonSchema().replace('"', '\'')); | |
+ AvroScheme unionOfFiles = | |
+ AvroScheme.unionOf(new String[] { | |
+ out + File.separatorChar + "part-00000.avro", | |
+ out + File.separatorChar + "part-00001.avro"}); | |
+ assertEquals("Check union of files scheme identical to union of path scheme", | |
+ unionOf.getJsonSchema(), unionOfFiles.getJsonSchema()); | |
+ AvroScheme unionProjection = AvroScheme.unionOf(testFields, out); | |
+ assertEquals("Check union projection scheme", | |
"{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'booleanField','type':['null','boolean'],'doc':''},{'name':'bytesField','type':['null','bytes'],'doc':''},{'name':'doubleField','type':['null','double'],'doc':''},{'name':'enumField','type':['null','string'],'doc':''},{'name':'floatField','type':['null','string'],'doc':''},{'name':'integerField','type':['null','string'],'doc':''},{'name':'longField','type':['null','long'],'doc':''},{'name':'stringField','type':['null','string'],'doc':''}]}", | |
- unionOf.getJsonSchema().replace('"', '\'')); | |
+ unionProjection.getJsonSchema().replace('"', '\'')); | |
+ | |
avroSource = new Lfs(unionOf, out); | |
+ | |
TupleEntryIterator sinkTuples2 = avroSource.openForRead(new JobConf()); | |
verifyOutput(numRecords * 2, sinkTuples2, true); | |
sinkTuples2.close(); | |
@@ -363,6 +379,12 @@ | |
AvroScheme.addToTuple(t, mapOfStrings); | |
AvroScheme.addToTuple(t, TestEnum.ONE); | |
+ | |
+ if (unionOf) | |
+ { | |
+ t.add(null); | |
+ } | |
+ | |
write.add(t); | |
t = new Tuple(); | |
@@ -404,6 +426,10 @@ | |
t.add(new Tuple(0L, 1L)); | |
t.add(new Tuple("key-0", "value-0", "key-1", "value-1")); | |
AvroScheme.addToTuple(t, TestEnum.TWO); | |
+ if (unionOf) | |
+ { | |
+ t.add(null); | |
+ } | |
write.add(t); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment