Created
June 8, 2011 21:18
-
-
Save mpouttuclarke/1015429 to your computer and use it in GitHub Desktop.
Cascading.Avro_issue_6_Avro 1.4
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Index: src/com/bixolabs/cascading/avro/AvroScheme.java | |
=================================================================== | |
--- src/com/bixolabs/cascading/avro/AvroScheme.java (revision 1735) | |
+++ src/com/bixolabs/cascading/avro/AvroScheme.java (working copy) | |
@@ -17,6 +17,7 @@ | |
package com.bixolabs.cascading.avro; | |
import java.io.IOException; | |
+import java.io.InputStream; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
@@ -25,25 +26,45 @@ | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Map; | |
+import java.util.TreeMap; | |
import org.apache.avro.Schema; | |
+import org.apache.avro.file.DataFileReader; | |
+import org.apache.avro.file.DataFileStream; | |
import org.apache.avro.generic.GenericData; | |
+import org.apache.avro.generic.GenericDatumReader; | |
import org.apache.avro.generic.GenericEnumSymbol; | |
+import org.apache.avro.generic.GenericRecord; | |
+import org.apache.avro.io.DatumReader; | |
import org.apache.avro.mapred.AvroInputFormat; | |
import org.apache.avro.mapred.AvroJob; | |
import org.apache.avro.mapred.AvroKey; | |
import org.apache.avro.mapred.AvroKeyComparator; | |
import org.apache.avro.mapred.AvroOutputFormat; | |
+import org.apache.avro.mapred.AvroRecordReader; | |
import org.apache.avro.mapred.AvroSerialization; | |
import org.apache.avro.mapred.AvroValue; | |
import org.apache.avro.mapred.AvroWrapper; | |
+import org.apache.avro.mapred.FsInput; | |
+import org.apache.avro.reflect.ReflectDatumReader; | |
+import org.apache.avro.specific.SpecificDatumReader; | |
import org.apache.avro.util.Utf8; | |
+import org.apache.hadoop.fs.FileStatus; | |
+import org.apache.hadoop.fs.FileSystem; | |
+import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.BytesWritable; | |
import org.apache.hadoop.io.NullWritable; | |
+import org.apache.hadoop.mapred.FileSplit; | |
+import org.apache.hadoop.mapred.InputSplit; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.mapred.OutputCollector; | |
+import org.apache.hadoop.mapred.RecordReader; | |
+import org.apache.hadoop.mapred.Reporter; | |
import org.apache.log4j.Logger; | |
+import org.codehaus.jackson.JsonNode; | |
+import org.codehaus.jackson.map.ObjectMapper; | |
+import cascading.CascadingException; | |
import cascading.scheme.Scheme; | |
import cascading.tap.Tap; | |
import cascading.tuple.Fields; | |
@@ -54,30 +75,115 @@ | |
* The AvroScheme class is a {@link Scheme} subclass. It supports reading and | |
* writing of data that has been serialized using Apache Avro. | |
*/ | |
-@SuppressWarnings("serial") | |
+@SuppressWarnings({ "serial", "deprecation" }) | |
public class AvroScheme extends Scheme { | |
- public static final Class<?> ARRAY_CLASS = List.class; | |
- public static final Class<?> MAP_CLASS = Map.class; | |
+ | |
+ public static final String DATA_CONVERSION_MESG = | |
+ "Cannot find data conversion for Avro: "; | |
- /** | |
- * Helper class used to save an Enum name in a type that Avro requires for | |
- * serialization. | |
- * | |
- */ | |
- private static class CascadingEnumSymbol implements GenericEnumSymbol { | |
+ public static final String CLASS_FOR_DATA_TYPE_MESG = | |
+ "Failed to load type class for Avro data type: "; | |
+ | |
+ public static final Class<?> ARRAY_CLASS = List.class; | |
+ public static final Class<?> MAP_CLASS = Map.class; | |
+ | |
+ private static final Map<String, String> REVERSE_TYPE_MAP = | |
+ new HashMap<String, String>(); | |
- private String _name; | |
+ public static final class CascadingAvroInputFormat<T> | |
+ extends AvroInputFormat<T> | |
+ { | |
+ @Override | |
+ public RecordReader<AvroWrapper<T>, NullWritable> getRecordReader(InputSplit split, | |
+ JobConf job, | |
+ Reporter reporter) | |
+ throws IOException | |
+ { | |
+ reporter.setStatus(split.toString()); | |
+ return new CascadingAvroRecordReader<T>(job, (FileSplit) split); | |
+ } | |
+ }; | |
- public CascadingEnumSymbol(String name) { | |
- _name = name; | |
- } | |
+ public static final class CascadingAvroRecordReader<T> | |
+ implements RecordReader<AvroWrapper<T>, NullWritable> | |
+ { | |
- @Override | |
- public String toString() { | |
- return _name; | |
- } | |
- } | |
+ private FsInput in; | |
+ private DataFileReader<T> reader; | |
+ private long start; | |
+ private long end; | |
+ public CascadingAvroRecordReader(JobConf job, FileSplit split) | |
+ throws IOException | |
+ { | |
+ this.in = new FsInput(split.getPath(), job); | |
+ this.reader = new DataFileReader<T>(in, new GenericDatumReader<T>()); | |
+ reader.sync(split.getStart()); // sync to start | |
+ this.start = in.tell(); | |
+ this.end = split.getStart() + split.getLength(); | |
+ } | |
+ | |
+ public AvroWrapper<T> createKey() { | |
+ return new AvroWrapper<T>(null); | |
+ } | |
+ public NullWritable createValue() { return NullWritable.get(); } | |
+ public boolean next(AvroWrapper<T> wrapper, NullWritable ignore) | |
+ throws IOException { | |
+ if (!reader.hasNext() || reader.pastSync(end)) | |
+ return false; | |
+ wrapper.datum(reader.next(wrapper.datum())); | |
+ return true; | |
+ } | |
+ public float getProgress() throws IOException { | |
+ if (end == start) { | |
+ return 0.0f; | |
+ } else { | |
+ return Math.min(1.0f, (in.tell() - start) / (float)(end - start)); | |
+ } | |
+ } | |
+ public long getPos() throws IOException { | |
+ return in.tell(); | |
+ } | |
+ public void close() throws IOException { reader.close(); } | |
+ | |
+ }; | |
+ | |
+ static { | |
+ REVERSE_TYPE_MAP.put(Schema.Type.STRING.toString().toLowerCase(), "java.lang.String"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.BOOLEAN.toString().toLowerCase(), "java.lang.Boolean"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.LONG.toString().toLowerCase(), "java.lang.Long"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.INT.toString().toLowerCase(), "java.lang.Integer"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.FLOAT.toString().toLowerCase(), "java.lang.Float"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.DOUBLE.toString().toLowerCase(), "java.lang.Double"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.BYTES.toString().toLowerCase(), | |
+ "org.apache.hadoop.io.BytesWritable"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.ENUM.toString().toLowerCase(), "lava.lang.Enum"); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.ARRAY.toString().toLowerCase(), ARRAY_CLASS.getName()); | |
+ REVERSE_TYPE_MAP.put(Schema.Type.MAP.toString().toLowerCase(), MAP_CLASS.getName()); | |
+ } | |
+ | |
+ /** | |
+ * Helper class used to save an Enum name in a type that Avro requires for serialization. | |
+ * | |
+ */ | |
+ private static class CascadingEnumSymbol | |
+ implements GenericEnumSymbol | |
+ { | |
+ | |
+ private String _name; | |
+ | |
+ public CascadingEnumSymbol(String name) | |
+ { | |
+ _name = name; | |
+ } | |
+ | |
+ @Override | |
+ public String toString() | |
+ { | |
+ return _name; | |
+ } | |
+ } | |
+ | |
private static final Logger LOGGER = Logger.getLogger(AvroScheme.class); | |
private static final String RECORD_NAME = "CascadingAvroSchema"; | |
private String _recordName = RECORD_NAME; | |
@@ -96,11 +202,10 @@ | |
_schemeTypes = schemeTypes; | |
} | |
- @SuppressWarnings( { "deprecation" }) | |
@Override | |
public void sourceInit(Tap tap, JobConf conf) throws IOException { | |
- conf.set(AvroJob.INPUT_SCHEMA, getSchema().toString()); | |
- conf.setInputFormat(AvroInputFormat.class); | |
+ // conf.set(AvroJob.INPUT_SCHEMA, getSchema().toString()); | |
+ conf.setInputFormat(CascadingAvroInputFormat.class); | |
// add AvroSerialization to io.serializations | |
Collection<String> serializations = conf | |
@@ -116,10 +221,9 @@ | |
_schemeFields)); | |
} | |
- @SuppressWarnings( { "deprecation" }) | |
@Override | |
public void sinkInit(Tap tap, JobConf conf) throws IOException { | |
- conf.set(AvroJob.OUTPUT_SCHEMA, getSchema().toString()); | |
+ conf.set(AvroJob.OUTPUT_SCHEMA, getSchema().toString()); | |
conf.setOutputFormat(AvroOutputFormat.class); | |
// Since we're outputting to Avro, we need to set up output values. | |
@@ -158,32 +262,31 @@ | |
Fields sourceFields = getSourceFields(); | |
- // Unpack this datum into source tuple fields | |
- AvroWrapper<GenericData.Record> wrapper = (AvroWrapper<GenericData.Record>) key; | |
- GenericData.Record datum = wrapper.datum(); | |
- for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sourceFields | |
- .size(); fieldIndex++, typeIndex++) { | |
- Class<?> curType = _schemeTypes[typeIndex]; | |
- String fieldName = sourceFields.get(fieldIndex).toString(); | |
- Object inObj = datum.get(fieldName); | |
- if (curType == ARRAY_CLASS) { | |
- typeIndex++; | |
- result | |
- .add(convertFromAvroArray(inObj, | |
- _schemeTypes[typeIndex])); | |
- } else if (curType == MAP_CLASS) { | |
- typeIndex++; | |
- result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex])); | |
- } else { | |
- result.add(convertFromAvroPrimitive(inObj, curType)); | |
- } | |
- } | |
- return result; | |
- } | |
+ // Unpack this datum into source tuple fields | |
+ AvroWrapper<GenericData.Record> wrapper = (AvroWrapper<GenericData.Record>) key; | |
+ GenericData.Record datum = wrapper.datum(); | |
+ for (int fieldIndex = 0, typeIndex = 0; fieldIndex < sourceFields.size(); fieldIndex++, typeIndex++) { | |
+ Class<?> curType = _schemeTypes[typeIndex]; | |
+ String fieldName = sourceFields.get(fieldIndex).toString(); | |
+ Object inObj = datum.get(fieldName); | |
+ if (curType == ARRAY_CLASS) { | |
+ typeIndex++; | |
+ result.add(convertFromAvroArray(inObj, _schemeTypes[typeIndex])); | |
+ } else if (curType == MAP_CLASS) { | |
+ typeIndex++; | |
+ result.add(convertFromAvroMap(inObj, _schemeTypes[typeIndex])); | |
+ } else if (curType.isEnum()) { | |
+ result.add(convertFromAvroPrimitive(inObj, Enum.class)); | |
+ } else { | |
+ result.add(convertFromAvroPrimitive(inObj, curType)); | |
+ } | |
+ } | |
+ return result; | |
+ } | |
- @SuppressWarnings("unchecked") | |
+ @SuppressWarnings({ "unchecked", "rawtypes" }) | |
@Override | |
- public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) | |
+ public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) | |
throws IOException { | |
// Create the appropriate AvroWrapper<T> from the result, and pass that | |
// as the key for the collect | |
@@ -210,17 +313,19 @@ | |
datum.put(fieldName, convertToAvroPrimitive(result | |
.get(fieldIndex), _schemeTypes[typeIndex])); | |
} catch (ClassCastException e) { | |
- throw new IllegalArgumentException("Type for field name: " | |
+ throw new CascadingException("Type for field name: " | |
+ fieldName + "=" + _schemeTypes[typeIndex] | |
+ " does not match type of value " | |
+ result.get(fieldIndex) + "=" | |
- + result.get(fieldIndex).getClass()); | |
+ + result.get(fieldIndex).getClass() | |
+ + ", try using the unionOf factory method " | |
+ + "to create the AvroScheme", e); | |
} | |
} | |
} | |
- AvroWrapper<GenericData.Record> wrapper = new AvroWrapper<GenericData.Record>( | |
- datum); | |
+ AvroWrapper<GenericData.Record> wrapper = | |
+ new AvroWrapper<GenericData.Record>(datum); | |
outputCollector.collect(wrapper, NullWritable.get()); | |
} | |
@@ -303,65 +408,92 @@ | |
return schema; | |
} | |
- @SuppressWarnings( { "static-access", "unchecked" }) | |
- private Schema createAvroSchema(Fields schemeFields, Class<?>[] fieldTypes, | |
- int depth) { | |
- Schema.Type avroType = toAvroSchemaType(fieldTypes[0]); | |
+ private Schema createAvroSchema(Fields schemeFields, Class<?>[] fieldTypes, int depth) { | |
+ Schema.Type avroType = toAvroSchemaType(fieldTypes[0]); | |
if (avroType == Schema.Type.ARRAY) { | |
Class<?> arrayTypes[] = { fieldTypes[1] }; | |
- Schema schema = Schema.createArray(createAvroSchema(schemeFields | |
+ Schema schema = | |
+ Schema | |
+ .createArray(createAvroSchema(Fields | |
.offsetSelector(schemeFields.size() - 1, 1), arrayTypes, | |
depth + 1)); | |
return schema; | |
} else if (avroType == Schema.Type.MAP) { | |
Class<?> mapTypes[] = { fieldTypes[1] }; | |
- return Schema.createMap(createAvroSchema(schemeFields | |
+ return Schema | |
+ .createMap(createAvroSchema(Fields | |
.offsetSelector(schemeFields.size() - 1, 1), mapTypes, | |
depth + 1)); | |
} else if (avroType == Schema.Type.RECORD) { | |
- return generateSchema(schemeFields.offsetSelector(schemeFields | |
+ return generateSchema(Fields.offsetSelector(schemeFields | |
.size() - 1, 1), fieldTypes, depth + 1); | |
} else if (avroType == Schema.Type.ENUM) { | |
- Class clazz = fieldTypes[0]; | |
+ Class<?> clazz = fieldTypes[0]; | |
Object[] names = clazz.getEnumConstants(); | |
List<String> enumNames = new ArrayList<String>(names.length); | |
for (Object name : names) { | |
enumNames.add(name.toString()); | |
} | |
- | |
- return Schema.createEnum(fieldTypes[0].getName(), null, null, | |
+ return Schema.createEnum(fieldTypes[0].getCanonicalName(), null, null, | |
enumNames); | |
} else { | |
return Schema.create(avroType); | |
} | |
} | |
- private Object convertFromAvroPrimitive(Object inObj, Class<?> inType) { | |
- if (inObj == null) { | |
- return null; | |
- } else if (inType == String.class) { | |
- String convertedObj = ((Utf8) inObj).toString(); | |
- return convertedObj; | |
- } else if (inType == BytesWritable.class) { | |
- // TODO KKr - this is very inefficient, since we make a copy of | |
- // the ByteBuffer array in the call to BytesWritable.set(buffer, | |
- // pos, length). | |
- // A potentially small win is to check if buffer.position() == 0, | |
- // and if | |
- // so then do result = new BytesWritable(buffer.array()), followed | |
- // by | |
- // result.setSize(buffer.limit()) | |
- ByteBuffer buffer = (ByteBuffer) inObj; | |
- BytesWritable result = new BytesWritable(); | |
- result.set(buffer.array(), buffer.position(), buffer.limit()); | |
- return result; | |
- } else if (inType.isEnum()) { | |
- return inObj.toString(); | |
- } else { | |
- return inObj; | |
- } | |
- } | |
+ private Object convertFromAvroPrimitive(Object inObj, Class<?> targetType) | |
+ { | |
+ if (inObj == null) { | |
+ return null; | |
+ } | |
+ Class<?> sourceType = inObj.getClass(); | |
+ if (targetType == String.class) | |
+ { | |
+ return inObj.toString(); | |
+ } | |
+ else if (sourceType.isAssignableFrom(GenericData.EnumSymbol.class)) | |
+ { | |
+ return inObj.toString(); | |
+ } | |
+ else if (targetType == BytesWritable.class) | |
+ { | |
+ // TODO KKr - this is very inefficient, since we make a copy of | |
+ // the ByteBuffer array in the call to BytesWritable.set(buffer, pos, length). | |
+ // A potentially small win is to check if buffer.position() == 0, and if | |
+ // so then do result = new BytesWritable(buffer.array()), followed by | |
+ // result.setSize(buffer.limit()) | |
+ ByteBuffer buffer = (ByteBuffer) inObj; | |
+ BytesWritable result = new BytesWritable(); | |
+ result.set(buffer.array(), buffer.position(), buffer.limit()); | |
+ return result; | |
+ } | |
+ else if (targetType != sourceType) | |
+ { | |
+ //Data type conversion required due to type promotions | |
+ if(targetType == Long.class) { | |
+ if(sourceType == Integer.class) { | |
+ return new Long(((Integer)inObj).longValue()); | |
+ } else { | |
+ return Long.valueOf(inObj.toString()); | |
+ } | |
+ } else if(targetType == Double.class) { | |
+ if(sourceType == Float.class) { | |
+ return new Double(((Float)inObj).doubleValue()); | |
+ } else if(sourceType == Integer.class) { | |
+ return new Double(((Integer)inObj).doubleValue()); | |
+ } else { | |
+ return Double.valueOf(inObj.toString()); | |
+ } | |
+ } else { | |
+ throw new CascadingException("Cannot convert " + | |
+ sourceType.getName() + " to " + | |
+ targetType.getName()); | |
+ } | |
+ } else { | |
+ return inObj; | |
+ } | |
+ } | |
private Object convertFromAvroArray(Object inObj, Class<?> arrayType) { | |
// Since Cascading doesn't have support for arrays - we are using a | |
@@ -372,14 +504,14 @@ | |
return null; | |
} | |
- GenericData.Array<?> arr = (GenericData.Array<?>) inObj; | |
- Tuple arrayTuple = new Tuple(); | |
- Iterator<?> iter = arr.iterator(); | |
- while (iter.hasNext()) { | |
- arrayTuple.add(convertFromAvroPrimitive(iter.next(), arrayType)); | |
- } | |
- return arrayTuple; | |
- } | |
+ GenericData.Array<?> arr = (GenericData.Array<?>) inObj; | |
+ Tuple arrayTuple = new Tuple(); | |
+ Iterator<?> iter = arr.iterator(); | |
+ while (iter.hasNext()) { | |
+ arrayTuple.add(convertFromAvroPrimitive(iter.next(), arrayType)); | |
+ } | |
+ return arrayTuple; | |
+ } | |
@SuppressWarnings("unchecked") | |
private Object convertFromAvroMap(Object inObj, Class<?> mapValueClass) { | |
@@ -389,14 +521,13 @@ | |
Map<Utf8, Object> inMap = (Map<Utf8, Object>) inObj; | |
- Tuple convertedMapTuple = new Tuple(); | |
- for (Map.Entry<Utf8, Object> e : inMap.entrySet()) { | |
- convertedMapTuple.add(e.getKey().toString()); | |
- convertedMapTuple.add(convertFromAvroPrimitive(e.getValue(), | |
- mapValueClass)); | |
- } | |
- return convertedMapTuple; | |
- } | |
+ Tuple convertedMapTuple = new Tuple(); | |
+ for (Map.Entry<Utf8, Object> e : inMap.entrySet()) { | |
+ convertedMapTuple.add(e.getKey().toString()); | |
+ convertedMapTuple.add(convertFromAvroPrimitive(e.getValue(), mapValueClass)); | |
+ } | |
+ return convertedMapTuple; | |
+ } | |
private Object convertToAvroPrimitive(Object inObj, Class<?> curType) { | |
if (inObj == null) { | |
@@ -410,14 +541,13 @@ | |
.getLength()); | |
return convertedObj; | |
} else if (curType.isEnum()) { | |
- Object result = new CascadingEnumSymbol((String) inObj); | |
+ Object result = new CascadingEnumSymbol((String) inObj); | |
return result; | |
} else { | |
return inObj; | |
} | |
} | |
- @SuppressWarnings("unchecked") | |
private Object convertToAvroArray(Object inObj, Class<?> arrayClass) { | |
if (inObj == null) { | |
return null; | |
@@ -425,13 +555,14 @@ | |
Tuple tuple = (Tuple) inObj; | |
- GenericData.Array arr = new GenericData.Array(tuple.size(), Schema | |
+ GenericData.Array<Object> arr = | |
+ new GenericData.Array<Object>(tuple.size(), Schema | |
.createArray(Schema.create(toAvroSchemaType(arrayClass)))); | |
for (int i = 0; i < tuple.size(); i++) { | |
if (tuple.getObject(i).getClass() == arrayClass) { | |
arr.add(convertToAvroPrimitive(tuple.getObject(i), arrayClass)); | |
} else { | |
- throw new RuntimeException( | |
+ throw new CascadingException( | |
String | |
.format( | |
"Found array tuple with class %s instead of expected %", | |
@@ -453,19 +584,19 @@ | |
int tupleSize = tuple.size(); | |
boolean multipleOfTwo = (tupleSize >= 0) && ((tupleSize % 2) == 0); | |
if (!multipleOfTwo) { | |
- throw new RuntimeException( | |
+ throw new CascadingException( | |
"Invalid map definition - maps need to be Tuples made up of key,value pairs"); | |
} | |
for (int i = 0; i < tupleSize; i += 2) { | |
// the tuple entries are key followed by value | |
if (tuple.getObject(i).getClass() != String.class) { | |
- throw new RuntimeException( | |
+ throw new CascadingException( | |
"Invalid map definition - the key should be a String - instead of " | |
+ tuple.getObject(i).getClass()); | |
- } | |
+ } | |
if (tuple.getObject(i + 1).getClass() != valClass) { | |
- throw new RuntimeException(String.format( | |
+ throw new CascadingException(String.format( | |
"Found map value with %s instead of expected %s", tuple | |
.getObject(i + 1).getClass(), valClass)); | |
} | |
@@ -515,7 +646,8 @@ | |
"There must be at least one field"); | |
} | |
- if (getSchemeTypesSize(schemeTypes) != schemeFields.size()) { | |
+ if (getSchemeTypesSize(schemeTypes) != schemeFields.size()) | |
+ { | |
throw new IllegalArgumentException( | |
"You must have a schemeType for every field"); | |
} | |
@@ -561,8 +693,8 @@ | |
t.add(new BytesWritable(bytes)); | |
} | |
- @SuppressWarnings("unchecked") | |
- public static void addToTuple(Tuple t, Enum e) { | |
+ public static void addToTuple(Tuple t, Enum<?> e) | |
+ { | |
t.add(e.toString()); | |
} | |
@@ -584,4 +716,240 @@ | |
t.add(mapTuple); | |
} | |
+ | |
+ /** | |
+ * Assembles the scheme using a union of all schemas within the input path using the default | |
+ * JobConf. | |
+ * | |
+ * @param fsType | |
+ * @param path | |
+ * @throws IOException | |
+ */ | |
+ public static AvroScheme unionOf(String path) | |
+ throws IOException | |
+ { | |
+ return unionOf(path, new JobConf()); | |
+ } | |
+ | |
+ /** | |
+ * Assembles the scheme using a union of all fields within the input path using the specified | |
+ * JobConf. Extracts all the Avro Schemas from all files to create the scheme. | |
+ * | |
+ * It is possible to have multiple Avro Schemas present within the input directory(s). Usually | |
+ * this occurs when the data format changes over time. This method create a unified scheme which | |
+ * can read multiple Avro schemas and return fields and data types as a normalized union of the | |
+ * schemas present in the input files one the path, using the following rules: | |
+ * 1. If data types of the fields do not conflict, simply take the union of the fields and data | |
+ * types of all input Schemas. | |
+ * 2. If the data types conflict, but can be promoted, then promote them (int -> long, | |
+ * float -> double, int -> double). | |
+ * 3. Otherwise, convert conflicting data types to String. | |
+ * | |
+ * Fields are returned in field name order. Does not currently support nested array or map | |
+ * types, these are removed from the union projection. | |
+ * | |
+ * @param path | |
+ * @param conf | |
+ * @return | |
+ * @throws IOException | |
+ */ | |
+ public static AvroScheme unionOf(String path, | |
+ JobConf conf) | |
+ throws IOException | |
+ { | |
+ Path pathObj = new Path(path); | |
+ DatumReader<GenericRecord> reader = | |
+ new GenericDatumReader<GenericRecord>(); | |
+ FileSystem fs = pathObj.getFileSystem(conf); | |
+ Map<String, Class<?>> coerceTo = new TreeMap<String, Class<?>>(); | |
+ FileStatus[] stati = fs.listStatus(pathObj); | |
+ for(int x = 0; x < stati.length; x++) { | |
+ if(stati[x].isDir()) { | |
+ continue; | |
+ } | |
+ Schema schema = getSchema(reader, fs, stati[x]); | |
+ unifyToMap(coerceTo, schema); | |
+ } | |
+ return fromMap(coerceTo); | |
+ } | |
+ | |
+ /** | |
+ * Gets an AvroScheme from a Map<String, Class<?>> | |
+ * @param map | |
+ * @return | |
+ */ | |
+ public static AvroScheme fromMap(Map<String, Class<?>> map) | |
+ { | |
+ String[] allNames = new String[map.size()]; | |
+ Class<?>[] allTypes = new Class<?>[allNames.length]; | |
+ Iterator<Map.Entry<String, Class<?>>> entries = map.entrySet().iterator(); | |
+ for(int x = 0; x < allNames.length; x++) { | |
+ Map.Entry<String, Class<?>> entry = entries.next(); | |
+ allNames[x] = entry.getKey(); | |
+ allTypes[x] = entry.getValue(); | |
+ } | |
+ return new AvroScheme(new Fields(allNames), allTypes); | |
+ } | |
+ | |
+ /** | |
+ * Creates a unified Schema by adding the field names from the passed schema into the Map and | |
+ * transforming Avro types to Java types usable to Cascading.Avro. Does not currently support | |
+ * nested array or map types, these are removed from the union projection. | |
+ * | |
+ * @param map | |
+ * @param schema | |
+ */ | |
+ public static void unifyToMap(Map<String, Class<?>> map, Schema schema) | |
+ { | |
+ List<Schema.Field> fields = schema.getFields(); | |
+ String[] names = new String[fields.size()]; | |
+ Class<?>[] types = new Class<?>[fields.size()]; | |
+ for(int y = 0; y < names.length; y++) | |
+ { | |
+ Schema.Field field = fields.get(y); | |
+ names[y] = field.name(); | |
+ Schema fieldSchema = field.schema(); | |
+ types[y] = getJavaType(fieldSchema); | |
+ // TODO: currently excluding maps and lists, need a different data | |
+ // structure to support them, Map with a single type won't work | |
+ if (!Map.class.isAssignableFrom(types[y]) | |
+ && !List.class.isAssignableFrom(types[y])) | |
+ { | |
+ if(map.containsKey(names[y])) { | |
+ Class<?> otherClass = map.get(names[y]); | |
+ if(!otherClass.equals(types[y])) { | |
+ map.put(names[y], promotion(types[y], otherClass)); | |
+ } | |
+ } else { | |
+ map.put(names[y], types[y]); | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * From a field's Schema, get the Java type associated to it. Note this is NOT a record schema, | |
+ * which contains a list of all fields. Will return null for Schema.Type.RECORD. If 'null' type | |
+ * is present, it is assumed to be first in the type list. | |
+ * | |
+ * @param fieldSchema | |
+ */ | |
+ public static Class<?> getJavaType(Schema fieldSchema) | |
+ { | |
+ if(Schema.Type.RECORD.equals(fieldSchema.getType())) { | |
+ return null; | |
+ } | |
+ Class<?> result = null; | |
+ if (fieldSchema != null) { | |
+ List<Schema> typesList = fieldSchema.getTypes(); | |
+ if (typesList != null && typesList.size() > 0) | |
+ { | |
+ //Skip 'null' type, which is assumed first in list | |
+ String dataTypeName = String.valueOf(typesList | |
+ .get(typesList.size() - 1)); | |
+ if (dataTypeName != null) { | |
+ result = getClassForAvroType(dataTypeName); | |
+ } | |
+ } | |
+ } | |
+ return result; | |
+ } | |
+ | |
+ protected static Schema getSchema(DatumReader<GenericRecord> reader, | |
+ FileSystem fs, | |
+ FileStatus status) | |
+ throws IOException | |
+ { | |
+ InputStream stream = fs.open(status.getPath()); | |
+ DataFileStream<GenericRecord> in = | |
+ new DataFileStream<GenericRecord>(stream, reader); | |
+ Schema schema = in.getSchema(); | |
+ in.close(); | |
+ return schema; | |
+ } | |
+ | |
+ protected static Class<?> promotion(Class<?> newType, Class<?> oldType) | |
+ { | |
+ if(newType == Long.class | |
+ && oldType == Integer.class | |
+ || newType == Double.class | |
+ && oldType == Float.class | |
+ || newType == Double.class | |
+ && oldType == Integer.class) | |
+ { | |
+ return newType; | |
+ } else { | |
+ return String.class; | |
+ } | |
+ } | |
+ | |
+ protected static Class<?> getClassForAvroType(String dataType) | |
+ { | |
+ Class<?> type = null; | |
+ ObjectMapper m = new ObjectMapper(); | |
+ JsonNode rootNode; | |
+ try | |
+ { | |
+ rootNode = m.readValue(dataType, JsonNode.class); | |
+ } | |
+ catch (Exception e) | |
+ { | |
+ throw new CascadingException("Unparsable Schema: " + dataType, e); | |
+ } | |
+ if (rootNode.isTextual()) | |
+ { | |
+ type = avroPrimitiveToJavaClass(rootNode.getTextValue()); | |
+ } | |
+ else if (rootNode.isArray()) | |
+ { | |
+ Iterator<JsonNode> elements = rootNode.getElements(); | |
+ while (elements.hasNext()) | |
+ { | |
+ String typeString = elements.next().getTextValue(); | |
+ type = avroPrimitiveToJavaClass(typeString); | |
+ if (type != null) | |
+ { | |
+ break; | |
+ } | |
+ } | |
+ } | |
+ else if (rootNode.isContainerNode() && rootNode.get("type") != null) | |
+ { | |
+ String containerTypeString = rootNode.get("type").getTextValue(); | |
+ if (Schema.Type.ENUM.toString().equalsIgnoreCase(containerTypeString)) | |
+ { | |
+ type = String.class; | |
+ } | |
+ else | |
+ { | |
+ type = avroPrimitiveToJavaClass(containerTypeString); | |
+ } | |
+ } | |
+ if (type == null) | |
+ { | |
+ throw new CascadingException("Unsupported type: " + dataType); | |
+ } | |
+ return type; | |
+ } | |
+ | |
+ /** | |
+ * @param type | |
+ * @param typeString | |
+ * @return | |
+ */ | |
+ protected static Class<?> avroPrimitiveToJavaClass(String typeString) | |
+ { | |
+ if (REVERSE_TYPE_MAP.containsKey(typeString)) | |
+ { | |
+ try | |
+ { | |
+ return Class.forName(REVERSE_TYPE_MAP.get(typeString)); | |
+ } | |
+ catch (ClassNotFoundException e) | |
+ { | |
+ throw new CascadingException(CLASS_FOR_DATA_TYPE_MESG + typeString, e); | |
+ } | |
+ } | |
+ return null; | |
+ } | |
} | |
\ No newline at end of file | |
Index: test/com/bixolabs/cascading/avro/AvroSchemeTest.java | |
=================================================================== | |
--- test/com/bixolabs/cascading/avro/AvroSchemeTest.java (revision 1735) | |
+++ test/com/bixolabs/cascading/avro/AvroSchemeTest.java (working copy) | |
@@ -1,8 +1,6 @@ | |
package com.bixolabs.cascading.avro; | |
-import static org.junit.Assert.assertEquals; | |
-import static org.junit.Assert.assertTrue; | |
-import static org.junit.Assert.fail; | |
+import static org.junit.Assert.*; | |
import java.io.File; | |
import java.io.IOException; | |
@@ -35,6 +33,7 @@ | |
public class AvroSchemeTest { | |
+ private static final String UNION_STR = "UNION"; | |
private static final String OUTPUT_DIR = "build/test/AvroSchmeTest/"; | |
private static enum TestEnum { | |
@@ -91,17 +90,62 @@ | |
} | |
- | |
- @SuppressWarnings("serial") | |
@Test | |
public void testRoundTrip() throws Exception { | |
// Create a scheme that tests each of the supported types | |
- final Fields testFields = new Fields("integerField", "longField", "booleanField", "doubleField", "floatField", | |
- "stringField", "bytesField", "arrayOfLongsField", "mapOfStringsField", "enumField"); | |
- final Class<?>[] schemeTypes = {Integer.class, Long.class, Boolean.class, Double.class, Float.class, | |
- String.class, BytesWritable.class, List.class, Long.class, Map.class, String.class, TestEnum.class}; | |
+ final Fields testFields = | |
+ new Fields("integerField", | |
+ "longField", | |
+ "booleanField", | |
+ "doubleField", | |
+ "floatField", | |
+ "stringField", | |
+ "bytesField", | |
+ "arrayOfLongsField", | |
+ "mapOfStringsField", | |
+ "enumField"); | |
+ final Class<?>[] schemeTypes = | |
+ { Integer.class, | |
+ Long.class, | |
+ Boolean.class, | |
+ Double.class, | |
+ Float.class, | |
+ String.class, | |
+ BytesWritable.class, | |
+ List.class, | |
+ Long.class, | |
+ Map.class, | |
+ String.class, | |
+ TestEnum.class }; | |
+ | |
+ // Create scheme that tests type conversions for normalized unions | |
+ final Fields testFields2 = | |
+ new Fields("integerField", | |
+ "longField", | |
+ "booleanField", | |
+ "doubleField", | |
+ "floatField", | |
+ "stringField", | |
+ "bytesField", | |
+ "arrayOfLongsField", | |
+ "mapOfStringsField", | |
+ "enumField"); | |
+ final Class<?>[] schemeTypes2 = | |
+ { Double.class, | |
+ Integer.class, | |
+ Boolean.class, | |
+ Float.class, | |
+ String.class, | |
+ String.class, | |
+ BytesWritable.class, | |
+ List.class, | |
+ Long.class, | |
+ Map.class, | |
+ String.class, | |
+ TestEnum.class }; | |
+ | |
final String in = OUTPUT_DIR+ "testRoundTrip/in"; | |
final String out = OUTPUT_DIR + "testRoundTrip/out"; | |
final String verifyout = OUTPUT_DIR + "testRoundTrip/verifyout"; | |
@@ -111,41 +155,9 @@ | |
// Create a sequence file with the appropriate tuples | |
Lfs lfsSource = new Lfs(new SequenceFile(testFields), in, SinkMode.REPLACE); | |
TupleEntryCollector write = lfsSource.openForWrite(new JobConf()); | |
- Tuple t = new Tuple(); | |
- t.add(0); | |
- t.add(0L); | |
- t.add(false); | |
- t.add(0.0d); | |
- t.add(0.0f); | |
- t.add("0"); | |
- AvroScheme.addToTuple(t, new byte[] {0}); | |
- | |
- List<Long> arrayOfLongs = new ArrayList<Long>() {{ | |
- add(0L); | |
- }}; | |
- AvroScheme.addToTuple(t, arrayOfLongs); | |
- Map<String, String> mapOfStrings = new HashMap<String, String>() {{ | |
- put("key-0", "value-0"); | |
- }}; | |
- AvroScheme.addToTuple(t, mapOfStrings); | |
- | |
- AvroScheme.addToTuple(t, TestEnum.ONE); | |
- write.add(t); | |
+ writeTestData(write, false); | |
- t = new Tuple(); | |
- t.add(1); | |
- t.add(1L); | |
- t.add(true); | |
- t.add(1.0d); | |
- t.add(1.0f); | |
- t.add("1"); | |
- AvroScheme.addToTuple(t, new byte[] {0, 1}); | |
- t.add(new Tuple(0L, 1L)); | |
- t.add(new Tuple("key-0", "value-0", "key-1", "value-1")); | |
- AvroScheme.addToTuple(t, TestEnum.TWO); | |
- write.add(t); | |
- | |
write.close(); | |
// Now read from the results, and write to an Avro file. | |
@@ -157,31 +169,98 @@ | |
// Now read it back in, and verify that the data/types match up. | |
Tap avroSource = new Lfs(new AvroScheme(testFields, schemeTypes), out); | |
+ Tap verifySink = avroToTuples(testFields, verifyout, avroSource); | |
+ TupleEntryIterator sinkTuples = verifySink.openForRead(new JobConf()); | |
+ verifyOutput(numRecords, sinkTuples, false); | |
+ | |
+ // Write the test data for union scheme | |
+ FileUtils.moveFile(new File(out + File.separatorChar + "part-00000.avro"), | |
+ new File(verifyout + File.separatorChar + "part-00001.avro")); | |
+ Tap avroSink2 = new Lfs(new AvroScheme(testFields2, schemeTypes2), out, SinkMode.REPLACE); | |
+ TupleEntryCollector write2 = avroSink2.openForWrite(new JobConf()); | |
+ writeTestData(write2, true); | |
+ write2.close(); | |
+ FileUtils.moveFile(new File(verifyout + File.separatorChar + "part-00001.avro"), | |
+ new File(out + File.separatorChar + "part-00001.avro")); | |
+ | |
+ // Now read union back in, and verify that the data/types match up. | |
+ AvroScheme unionOf = AvroScheme.unionOf(out); | |
+ assertEquals("Check union scheme", | |
+ "{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'booleanField','type':['null','boolean'],'doc':''},{'name':'bytesField','type':['null','bytes'],'doc':''},{'name':'doubleField','type':['null','double'],'doc':''},{'name':'enumField','type':['null','string'],'doc':''},{'name':'floatField','type':['null','string'],'doc':''},{'name':'integerField','type':['null','string'],'doc':''},{'name':'longField','type':['null','long'],'doc':''},{'name':'stringField','type':['null','string'],'doc':''}]}", | |
+ unionOf.getJsonSchema().replace('"', '\'')); | |
+ avroSource = new Lfs(unionOf, out); | |
+ TupleEntryIterator sinkTuples2 = avroSource.openForRead(new JobConf()); | |
+ verifyOutput(numRecords * 2, sinkTuples2, true); | |
+ sinkTuples2.close(); | |
+ | |
+ // Ensure that the Avro file we write out is readable via the standard Avro API | |
+ File avroFile = new File(out + "/part-00000.avro"); | |
+ DataFileReader<Object> reader = | |
+ new DataFileReader<Object>(avroFile, new GenericDatumReader<Object>()); | |
+ int i = 0; | |
+ while (reader.hasNext()) | |
+ { | |
+ reader.next(); | |
+ i++; | |
+ } | |
+ assertEquals(numRecords, i); | |
+ } | |
+ | |
+ /** | |
+ * @param testFields | |
+ * @param outPath | |
+ * @param avroSource | |
+ * @return | |
+ */ | |
+ protected Tap avroToTuples(final Fields testFields, final String outPath, Tap avroSource) | |
+ { | |
Pipe readPipe = new Pipe("avro to tuples"); | |
- Tap verifySink = new Hfs(new SequenceFile(testFields), verifyout, SinkMode.REPLACE); | |
+ Tap verifySink = new Hfs(new SequenceFile(testFields), outPath, SinkMode.REPLACE); | |
Flow readFlow = new FlowConnector().connect(avroSource, verifySink, readPipe); | |
readFlow.complete(); | |
+ return verifySink; | |
+ } | |
- TupleEntryIterator sinkTuples = verifySink.openForRead(new JobConf()); | |
+ /** | |
+ * @param numRecords | |
+ * @param sinkTuples | |
+ */ | |
+ protected void verifyOutput(final int numRecords, TupleEntryIterator sinkTuples, boolean unionOf) | |
+ { | |
assertTrue(sinkTuples.hasNext()); | |
- | |
- int i = 0; | |
+ int count = 0, i = 0; | |
while (sinkTuples.hasNext()) { | |
- TupleEntry te = sinkTuples.next(); | |
- | |
- assertTrue(te.get("integerField") instanceof Integer); | |
+ TupleEntry te = sinkTuples.next(); | |
+ if (!unionOf) | |
+ { | |
+ assertTrue(te.get("integerField") instanceof Integer); | |
+ } | |
+ else | |
+ { | |
+ assertTrue(te.get("integerField") instanceof String); | |
+ } | |
assertTrue(te.get("longField") instanceof Long); | |
assertTrue(te.get("booleanField") instanceof Boolean); | |
assertTrue(te.get("doubleField") instanceof Double); | |
- assertTrue(te.get("floatField") instanceof Float); | |
+ if (!unionOf) | |
+ { | |
+ assertTrue(te.get("floatField") instanceof Float); | |
+ } | |
+ else | |
+ { | |
+ assertTrue(te.get("floatField") instanceof String); | |
+ } | |
assertTrue(te.get("stringField") instanceof String); | |
assertTrue(te.get("bytesField") instanceof BytesWritable); | |
- assertTrue(te.get("arrayOfLongsField") instanceof Tuple); | |
- assertTrue(te.get("mapOfStringsField") instanceof Tuple); | |
+ if (!unionOf) | |
+ { | |
+ assertTrue(te.get("arrayOfLongsField") instanceof Tuple); | |
+ assertTrue(te.get("mapOfStringsField") instanceof Tuple); | |
+ } | |
assertTrue(te.get("enumField") instanceof String); | |
- assertEquals(i, te.getInteger("integerField")); | |
+ i = Float.valueOf(te.getString("integerField")).intValue(); | |
assertEquals(i, te.getLong("longField")); | |
assertEquals(i > 0, te.getBoolean("booleanField")); | |
assertEquals((double)i, te.getDouble("doubleField"), 0.0001); | |
@@ -196,48 +275,136 @@ | |
assertEquals(j, bytes[j]); | |
} | |
- Tuple longArray = (Tuple)te.get("arrayOfLongsField"); | |
- assertEquals(i + 1, longArray.size()); | |
- for (int j = 0; j < longArray.size(); j++) { | |
- assertTrue(longArray.get(j) instanceof Long); | |
- assertEquals(j, longArray.getLong(j)); | |
+ if (!unionOf) | |
+ { | |
+ Tuple longArray = (Tuple) te.get("arrayOfLongsField"); | |
+ assertEquals(i + 1, longArray.size()); | |
+ for (int j = 0; j < longArray.size(); j++) | |
+ { | |
+ assertTrue(longArray.get(j) instanceof Long); | |
+ assertEquals(j, longArray.getLong(j)); | |
+ } | |
+ Tuple stringMap = (Tuple) te.get("mapOfStringsField"); | |
+ int numMapEntries = i + 1; | |
+ assertEquals(2 * numMapEntries, stringMap.size()); | |
+ // Build a map from the data | |
+ Map<String, String> testMap = new HashMap<String, String>(); | |
+ for (int j = 0; j < numMapEntries; j++) | |
+ { | |
+ assertTrue(stringMap.get(j * 2) instanceof String); | |
+ String key = stringMap.getString(j * 2); | |
+ assertTrue(stringMap.get((j * 2) + 1) instanceof String); | |
+ String value = stringMap.getString((j * 2) + 1); | |
+ testMap.put(key, value); | |
+ } | |
+ // Now make sure it has everything we're expecting. | |
+ for (int j = 0; j < numMapEntries; j++) | |
+ { | |
+ assertEquals("value-" + j, testMap.get("key-" + j)); | |
+ } | |
} | |
- | |
- Tuple stringMap = (Tuple)te.get("mapOfStringsField"); | |
- int numMapEntries = i + 1; | |
- assertEquals(2 * numMapEntries, stringMap.size()); | |
- | |
- // Build a map from the data | |
- Map<String, String> testMap = new HashMap<String, String>(); | |
- for (int j = 0; j < numMapEntries; j++) { | |
- assertTrue(stringMap.get(j * 2) instanceof String); | |
- String key = stringMap.getString(j * 2); | |
- assertTrue(stringMap.get((j * 2) + 1) instanceof String); | |
- String value = stringMap.getString((j * 2) + 1); | |
- testMap.put(key, value); | |
- } | |
- | |
- // Now make sure it has everything we're expecting. | |
- for (int j = 0; j < numMapEntries; j++) { | |
- assertEquals("value-" + j, testMap.get("key-" + j)); | |
- } | |
- i++; | |
+ count++; | |
} | |
- assertEquals(numRecords, i); | |
+ assertEquals(numRecords, count); | |
+ } | |
+ | |
+ /** | |
+ * @param write | |
+ */ | |
+ protected void writeTestData(TupleEntryCollector write, boolean unionOf) | |
+ { | |
+ Tuple t = new Tuple(); | |
+ if (unionOf) | |
+ { | |
+ t.add(0d); | |
+ } | |
+ else | |
+ { | |
+ t.add(0); | |
+ } | |
+ if (unionOf) | |
+ { | |
+ t.add(0); | |
+ } | |
+ else | |
+ { | |
+ t.add(0L); | |
+ } | |
+ t.add(false); | |
+ if (unionOf) | |
+ { | |
+ t.add(0.0f); | |
+ } | |
+ else | |
+ { | |
+ t.add(0.0d); | |
+ } | |
+ if (unionOf) | |
+ { | |
+ t.add("0.0"); | |
+ } | |
+ else | |
+ { | |
+ t.add(0.0f); | |
+ } | |
+ t.add("0"); | |
+ AvroScheme.addToTuple(t, new byte[] {0}); | |
- // Ensure that the Avro file we write out is readable via the standard Avro API | |
- File avroFile = new File(out + "/part-00000.avro"); | |
- DataFileReader<Object> reader = | |
- new DataFileReader<Object>(avroFile, new GenericDatumReader<Object>()); | |
- i = 0; | |
- while (reader.hasNext()) { | |
- reader.next(); | |
- i++; | |
+ List<Long> arrayOfLongs = new ArrayList<Long>() {{ | |
+ add(0L); | |
+ }}; | |
+ AvroScheme.addToTuple(t, arrayOfLongs); | |
+ | |
+ Map<String, String> mapOfStrings = new HashMap<String, String>() {{ | |
+ put("key-0", "value-0"); | |
+ }}; | |
+ AvroScheme.addToTuple(t, mapOfStrings); | |
+ | |
+ AvroScheme.addToTuple(t, TestEnum.ONE); | |
+ write.add(t); | |
+ | |
+ t = new Tuple(); | |
+ if (unionOf) | |
+ { | |
+ t.add(1d); | |
} | |
- assertEquals(numRecords, i); | |
- | |
+ else | |
+ { | |
+ t.add(1); | |
+ } | |
+ if (unionOf) | |
+ { | |
+ t.add(1); | |
+ } | |
+ else | |
+ { | |
+ t.add(1L); | |
+ } | |
+ t.add(true); | |
+ if (unionOf) | |
+ { | |
+ t.add(1.0f); | |
+ } | |
+ else | |
+ { | |
+ t.add(1.0d); | |
+ } | |
+ if (unionOf) | |
+ { | |
+ t.add("1.0"); | |
+ } | |
+ else | |
+ { | |
+ t.add(1.0f); | |
+ } | |
+ t.add("1"); | |
+ AvroScheme.addToTuple(t, new byte[] {0, 1}); | |
+ t.add(new Tuple(0L, 1L)); | |
+ t.add(new Tuple("key-0", "value-0", "key-1", "value-1")); | |
+ AvroScheme.addToTuple(t, TestEnum.TWO); | |
+ write.add(t); | |
} | |
@Test | |
@@ -362,24 +529,22 @@ | |
@Test | |
public void testSetRecordName() { | |
AvroScheme avroScheme = new AvroScheme(new Fields("a"), new Class[] { Long.class }); | |
- String expected = "{\"type\":\"record\",\"name\":\"CascadingAvroSchema\",\"namespace\":\"\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"long\"],\"doc\":\"\"}]}"; | |
+ String expected = "{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'a','type':['null','long'],'doc':''}]}"; | |
String jsonSchema = avroScheme.getJsonSchema(); | |
- assertEquals(expected, jsonSchema); | |
+ assertEquals(expected, jsonSchema.replace('"', '\'')); | |
avroScheme.setRecordName("FooBar"); | |
String jsonSchemaWithRecordName = avroScheme.getJsonSchema(); | |
- String expectedWithName = "{\"type\":\"record\",\"name\":\"FooBar\",\"namespace\":\"\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"long\"],\"doc\":\"\"}]}"; | |
- assertEquals(expectedWithName, jsonSchemaWithRecordName); | |
+ String expectedWithName = "{'type':'record','name':'FooBar','namespace':'','fields':[{'name':'a','type':['null','long'],'doc':''}]}"; | |
+ assertEquals(expectedWithName, jsonSchemaWithRecordName.replace('"', '\'')); | |
} | |
@Test | |
public void testEnumInSchema() throws Exception { | |
AvroScheme avroScheme = new AvroScheme(new Fields("a"), new Class[] { TestEnum.class }); | |
String jsonSchema = avroScheme.getJsonSchema(); | |
- String enumField = String.format("{\"type\":\"enum\",\"name\":\"%s\",\"namespace\":\"%s\",\"symbols\":[\"ONE\",\"TWO\"]}", | |
- "AvroSchemeTest$TestEnum", TestEnum.class.getPackage().getName()); | |
- String expected = String.format("{\"type\":\"record\",\"name\":\"CascadingAvroSchema\",\"namespace\":\"\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",%s],\"doc\":\"\"}]}", | |
- enumField); | |
- assertEquals(expected, jsonSchema); | |
+ String expected = String.format("{'type':'record','name':'CascadingAvroSchema','namespace':'','fields':[{'name':'a','type':['null',{'type':'enum','name':'%s','namespace':'%s','symbols':['ONE','TWO']}],'doc':''}]}", | |
+ "TestEnum", this.getClass().getCanonicalName()); | |
+ assertEquals(expected, jsonSchema.replace('"', '\'')); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment