Created
June 21, 2014 05:40
-
-
Save MLnick/5864741781b9340cb211 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import org.apache.avro.Schema.Parser | |
import java.io.{DataInput, DataOutput, File} | |
import org.apache.avro.generic.GenericData.Record | |
import org.apache.avro.generic.{GenericRecord, GenericDatumWriter} | |
import org.apache.avro.file.DataFileWriter | |
import org.apache.spark.SparkContext | |
import org.apache.avro.mapreduce.AvroKeyInputFormat | |
import org.apache.avro.mapred.AvroKey | |
import org.apache.hadoop.io.{Writable, NullWritable} | |
import org.apache.spark.api.python.Converter | |
object WriteAvro extends App { | |
/* | |
{"namespace": "example.avro", | |
"type": "record", | |
"name": "User", | |
"fields": [ | |
{"name": "name", "type": "string"}, | |
{"name": "favorite_number", "type": ["int", "null"]}, | |
{"name": "favorite_color", "type": ["string", "null"]} | |
] | |
} | |
*/ | |
val path = "/tmp/avro/schema/user.avsc" | |
val schema = new Parser().parse(new File(path)) | |
val user1 = new Record(schema) | |
user1.put("name", "Alyssa") | |
user1.put("favorite_number", 256) | |
val user2 = new Record(schema) | |
user2.put("name", "Ben") | |
user2.put("favorite_number", 7) | |
user2.put("favorite_color", "red") | |
val file = new File("/tmp/avro/users/users.avro") | |
val datumWriter = new GenericDatumWriter[GenericRecord](schema) | |
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) | |
dataFileWriter.create(schema, file) | |
dataFileWriter.append(user1) | |
dataFileWriter.append(user2) | |
dataFileWriter.close() | |
} | |
class AvroConverter extends Converter[AvroKey[GenericRecord], java.util.Map[String, Any]] { | |
import collection.JavaConversions._ | |
override def convert(obj: AvroKey[GenericRecord]): java.util.Map[String, Any] = { | |
val record = obj.datum() | |
val schema = record.getSchema | |
mapAsJavaMap(schema.getFields.map { f => (f.name, record.get(f.name)) }.toMap) | |
} | |
} | |
object ReadAvro extends App { | |
val sc = new SparkContext("local[2]", "TestAvro") | |
val path = "/tmp/avro/users/users.avro" | |
val avroRDD = sc.newAPIHadoopFile(path, | |
classOf[AvroKeyInputFormat[GenericRecord]], | |
classOf[AvroKey[GenericRecord]], | |
classOf[NullWritable]) | |
val userRDD = avroRDD.map{ case (k, v) => new AvroConverter().convert(k) } | |
println(userRDD.collect().mkString("\n")) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Nicks-MacBook-Pro:incubator-spark-mlnick Nick$ SPARK_CLASSPATH=/Users/Nick/workspace/scala/avro-example/target/avro-0.1.jar | |
IPYTHON=1 ./bin/pyspark | |
Python 2.7.6 |Anaconda 1.8.0 (x86_64)| (default, Jan 10 2014, 11:23:15) | |
Type "copyright", "credits" or "license" for more information. | |
IPython 2.0.0 -- An enhanced Interactive Python. | |
? -> Introduction and overview of IPython's features. | |
%quickref -> Quick reference. | |
help -> Python's own help system. | |
object? -> Details about 'object', use 'object??' for extra details. | |
Welcome to | |
____ __ | |
/ __/__ ___ _____/ /__ | |
_\ \/ _ \/ _ `/ __/ '_/ | |
/__ / .__/\_,_/_/ /_/\_\ version 1.0.0-SNAPSHOT | |
/_/ | |
Using Python version 2.7.6 (default, Jan 10 2014 11:23:15) | |
SparkContext available as sc. | |
In [1]: path = "/tmp/avro/users/users.avro" | |
In [2]: avro = sc.newAPIHadoopFile(path, "org.apache.avro.mapreduce.AvroKeyInputFormat", "org.apache.avro.mapred.AvroKey", " | |
org.apache.hadoop.io.NullWritable", "example.AvroConverter") | |
In [3]: avro.collect() | |
Out[3]: | |
[(u'{name=Alyssa, favorite_number=256, favorite_color=null}', None), | |
(u'{name=Ben, favorite_number=7, favorite_color=red}', None)] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>example</groupId> | |
<artifactId>avro</artifactId> | |
<version>0.1</version> | |
<inceptionYear>2014</inceptionYear> | |
<properties> | |
<scala.version>2.10.4</scala.version> | |
</properties> | |
<repositories> | |
<repository> | |
<id>scala-tools.org</id> | |
<name>Scala-Tools Maven2 Repository</name> | |
<url>http://scala-tools.org/repo-releases</url> | |
</repository> | |
</repositories> | |
<pluginRepositories> | |
<pluginRepository> | |
<id>scala-tools.org</id> | |
<name>Scala-Tools Maven2 Repository</name> | |
<url>http://scala-tools.org/repo-releases</url> | |
</pluginRepository> | |
</pluginRepositories> | |
<dependencies> | |
<dependency> | |
<groupId>org.scala-lang</groupId> | |
<artifactId>scala-library</artifactId> | |
<version>${scala.version}</version> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-core_2.10</artifactId> | |
<version>1.1.0-SNAPSHOT</version> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-client</artifactId> | |
<version>1.0.4</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.avro</groupId> | |
<artifactId>avro</artifactId> | |
<version>1.7.6</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.avro</groupId> | |
<artifactId>avro-mapred</artifactId> | |
<version>1.7.6</version> | |
</dependency> | |
<!-- TEST --> | |
<dependency> | |
<groupId>org.scalatest</groupId> | |
<artifactId>scalatest_2.10</artifactId> | |
<version>2.1.0</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<sourceDirectory>src/main/scala</sourceDirectory> | |
<testSourceDirectory>src/test/scala</testSourceDirectory> | |
<plugins> | |
<!-- disable surefire --> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-surefire-plugin</artifactId> | |
<version>2.7</version> | |
<configuration> | |
<skipTests>true</skipTests> | |
</configuration> | |
</plugin> | |
<!-- enable scalatest --> | |
<plugin> | |
<groupId>org.scalatest</groupId> | |
<artifactId>scalatest-maven-plugin</artifactId> | |
<version>1.0-RC1</version> | |
<configuration> | |
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> | |
<junitxml>.</junitxml> | |
<filereports>WDF TestSuite.txt</filereports> | |
</configuration> | |
<executions> | |
<execution> | |
<id>test</id> | |
<goals> | |
<goal>test</goal> | |
</goals> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<groupId>net.alchim31.maven</groupId> | |
<artifactId>scala-maven-plugin</artifactId> | |
<version>3.1.6</version> | |
<executions> | |
<execution> | |
<goals> | |
<goal>compile</goal> | |
<goal>testCompile</goal> | |
</goals> | |
</execution> | |
</executions> | |
<configuration> | |
<scalaVersion>${scala.version}</scalaVersion> | |
<!-- <args> | |
<arg>-target:jvm-1.7</arg> | |
</args> --> | |
<jvmArgs> | |
<jvmArg>-Xmx3g</jvmArg> | |
</jvmArgs> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-eclipse-plugin</artifactId> | |
<configuration> | |
<downloadSources>true</downloadSources> | |
<buildcommands> | |
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> | |
</buildcommands> | |
<additionalProjectnatures> | |
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> | |
</additionalProjectnatures> | |
<classpathContainers> | |
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> | |
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> | |
</classpathContainers> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-shade-plugin</artifactId> | |
<version>2.1</version> | |
<configuration> | |
<!-- put your configurations here --> | |
<filters> | |
<filter> | |
<artifact>*:*</artifact> | |
<excludes> | |
<exclude>META-INF/*.SF</exclude> | |
<exclude>META-INF/*.DSA</exclude> | |
<exclude>META-INF/*.RSA</exclude> | |
</excludes> | |
</filter> | |
</filters> | |
</configuration> | |
<executions> | |
<execution> | |
<phase>package</phase> | |
<goals> | |
<goal>shade</goal> | |
</goals> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-jar-plugin</artifactId> | |
<version>2.1</version> | |
<configuration> | |
<archive> | |
<manifest> | |
<addDefaultImplementationEntries>true</addDefaultImplementationEntries> | |
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries> | |
</manifest> | |
</archive> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
<reporting> | |
<plugins> | |
<plugin> | |
<groupId>org.scala-tools</groupId> | |
<artifactId>maven-scala-plugin</artifactId> | |
<configuration> | |
<scalaVersion>${scala.version}</scalaVersion> | |
</configuration> | |
</plugin> | |
</plugins> | |
</reporting> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment