Skip to content

Instantly share code, notes, and snippets.

@tobyjsullivan
Forked from j14159/gist:d3cbe172f7b962d74d09
Last active August 29, 2015 14:05

Revisions

  1. tobyjsullivan revised this gist Sep 17, 2014. 1 changed file with 57 additions and 67 deletions.
    124 changes: 57 additions & 67 deletions gistfile1.scala
    Original file line number Diff line number Diff line change
    @@ -1,92 +1,82 @@
    /**
    * Started to rough this naive S3-native filesystem RDD out because I need to use IAM
    * profiles for S3 access and also https://issues.apache.org/jira/browse/HADOOP-3733.
    *
    * Use at your own risk, bear in mind this is maybe 30 - 45min of work and testing and
    * expect it to behave as such.
    *
    * Feedback/criticism/discussion welcome via Github/Twitter
    *
    * In addition to Spark 1.0.x, this depends on Amazon's S3 SDK, dependency is as follows:
    * "com.amazonaws" % "aws-java-sdk" % "1.7.4"
    */
    package com.askuity.rdd
    * Based on the S3N RDD gist from Jeremy Pierre https://gist.github.com/j14159/d3cbe172f7b962d74d09
    *
    * Modified to use Jets3t
    */
    package net.tobysullivan.spark.rdd

    import com.amazonaws.auth.InstanceProfileCredentialsProvider
    import com.amazonaws.services.s3.AmazonS3Client
    import com.amazonaws.services.s3.model.{ GetObjectRequest, ObjectListing }
    import java.io.{BufferedInputStream, BufferedReader, InputStreamReader}
    import java.util.zip.GZIPInputStream

    import java.io.{ BufferedReader, InputStreamReader }

    import org.apache.spark.{ Partition, SparkContext, TaskContext }
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{Partition, SparkContext, TaskContext}
    import org.jets3t.service.impl.rest.httpclient.RestS3Service
    import org.jets3t.service.model.S3Bucket
    import org.jets3t.service.security.AWSCredentials

    import scala.collection.JavaConverters._
    import scala.io.Source

    private [rdd] case class S3NPartition(idx: Int, bucket: String, path: String) extends Partition {
    def index = idx
    }

    /**
    * Directly construct and use, roughly equivalent to SparkContext.textFile calls but give this
    * a list/sequence of files you want to load. This currently makes 1 Partition per file and
    * once constructed, just use it like any other RDD.
    *
    * Example below will construct a RDD from all files starting with "some-files/file-" in the
    * S3 bucket "my-bucket":
    *
    * new S3RDD(yourSparkContext, "my-bucket", new S3NListing("my-bukkit").list("some-files/file-"))
    */
    class S3NRDD(sc: SparkContext, bucket: String, files: Seq[String]) extends RDD[String](sc, Nil) {
    private def instanceCreds() = new InstanceProfileCredentialsProvider().getCredentials
    * Directly construct and use, roughly equivalent to SparkContext.textFile calls but give this
    * a list/sequence of files you want to load. This currently makes 1 Partition per file and
    * once constructed, just use it like any other RDD.
    *
    * Example below will construct a RDD from all files starting with "some-files/file-" in the
    * S3 bucket "my-bucket":
    *
    * new S3RDD(awsAccessKey, awsSecretKey, yourSparkContext, "my-bucket", new S3NListing("my-bukkit").list("some-files/file-"))
    */
    class S3NRDD(awsAccessKeyId: String, awsSecretAccessKey: String, sc: SparkContext, bucket: String, files: Seq[String]) extends RDD[String](sc, Nil) {
    lazy val credentials = new AWSCredentials(awsAccessKeyId, awsSecretAccessKey)

    override def getPartitions: Array[Partition] =
    override def getPartitions: Array[Partition] =
    files.zipWithIndex.map { case (fn, i) => S3NPartition(i, bucket, fn) }.toArray

    override def compute(split: Partition, context: TaskContext): Iterator[String] = split match {
    case S3NPartition(_, bucket, path) =>
    val client = new AmazonS3Client(instanceCreds())
    val obj = client.getObject(new GetObjectRequest(bucket, path))
    val br = new BufferedReader(new InputStreamReader(obj.getObjectContent()))
    case S3NPartition(_, bucket, path) =>
    val service = new RestS3Service(credentials)
    val s3Bucket = new S3Bucket(bucket)
    val obj = service.getObject(s3Bucket, path)
    val content = obj.getDataInputStream()

    Iterator.continually(br.readLine()).takeWhile {
    case null =>
    br.close()
    false
    case _ => true
    // Uncompress any gzipped files
    if (obj.getKey.endsWith(".gz"))
    Source.fromInputStream(new GZIPInputStream(new BufferedInputStream(content))).getLines()
    else {
    val br = new BufferedReader(new InputStreamReader(content))
    Iterator.continually(br.readLine()).takeWhile {
    case null =>
    br.close()
    false
    case _ => true
    }
    }
    }
    }

    /**
    * Simple helper to find files within the given bucket.
    */
    class S3NListing(bucket: String) {
    private def instanceCreds() = new InstanceProfileCredentialsProvider().getCredentials
    lazy val client = new AmazonS3Client(instanceCreds)
    * Simple helper to find files within the given bucket.
    */
    class S3NListing(awsAccessKeyId: String, awsSecretAccessKey: String, bucket: String) {
    lazy val credentials = new AWSCredentials(awsAccessKeyId, awsSecretAccessKey)
    lazy val service = new RestS3Service(credentials)
    lazy val s3Bucket = new S3Bucket(bucket, S3Bucket.LOCATION_US)

    /**
    * List files behind a given prefix, e.g. "" for all, "my-folder",
    * "my-folder/files-that-start-like-this", etc. Will eagerly fetch
    * all truncated results.
    */
    def list(folder: String) = recursiveListing(folder, None, Nil)

    @scala.annotation.tailrec
    private def recursiveListing(folder: String, prev: Option[ObjectListing], memo: List[Seq[String]]): List[String] = prev match {
    case None =>
    val listing = client.listObjects(bucket, folder)
    val keys = listing.getObjectSummaries.asScala.map(_.getKey)
    if (listing.isTruncated)
    recursiveListing(folder, Some(listing), keys :: memo)
    else
    keys.toList
    case Some(lastListing) =>
    val listing = client.listNextBatchOfObjects(lastListing)
    val keys = listing.getObjectSummaries.asScala.map(_.getKey())
    if(listing.isTruncated)
    recursiveListing(folder, Some(listing), keys :: memo)
    else
    (keys :: memo).flatten
    * List files behind a given prefix, e.g. "" for all, "my-folder",
    * "my-folder/files-that-start-like-this", etc. Will eagerly fetch
    * all truncated results.
    */
    def list(folder: String) = {
    val listing = service.listObjects(s3Bucket, folder, null)
    val keys = listing.map { obj =>
    val key = obj.getKey
    key
    }
    keys.toSeq
    }
    }
  2. @j14159 j14159 created this gist Jul 18, 2014.
    92 changes: 92 additions & 0 deletions gistfile1.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,92 @@
    /**
    * Started to rough this naive S3-native filesystem RDD out because I need to use IAM
    * profiles for S3 access and also https://issues.apache.org/jira/browse/HADOOP-3733.
    *
    * Use at your own risk, bear in mind this is maybe 30 - 45min of work and testing and
    * expect it to behave as such.
    *
    * Feedback/criticism/discussion welcome via Github/Twitter
    *
    * In addition to Spark 1.0.x, this depends on Amazon's S3 SDK, dependency is as follows:
    * "com.amazonaws" % "aws-java-sdk" % "1.7.4"
    */
    package com.askuity.rdd

    import com.amazonaws.auth.InstanceProfileCredentialsProvider
    import com.amazonaws.services.s3.AmazonS3Client
    import com.amazonaws.services.s3.model.{ GetObjectRequest, ObjectListing }

    import java.io.{ BufferedReader, InputStreamReader }

    import org.apache.spark.{ Partition, SparkContext, TaskContext }
    import org.apache.spark.rdd.RDD

    import scala.collection.JavaConverters._

    private [rdd] case class S3NPartition(idx: Int, bucket: String, path: String) extends Partition {
    def index = idx
    }

    /**
    * Directly construct and use, roughly equivalent to SparkContext.textFile calls but give this
    * a list/sequence of files you want to load. This currently makes 1 Partition per file and
    * once constructed, just use it like any other RDD.
    *
    * Example below will construct a RDD from all files starting with "some-files/file-" in the
    * S3 bucket "my-bucket":
    *
    * new S3RDD(yourSparkContext, "my-bucket", new S3NListing("my-bukkit").list("some-files/file-"))
    */
    class S3NRDD(sc: SparkContext, bucket: String, files: Seq[String]) extends RDD[String](sc, Nil) {
    private def instanceCreds() = new InstanceProfileCredentialsProvider().getCredentials

    override def getPartitions: Array[Partition] =
    files.zipWithIndex.map { case (fn, i) => S3NPartition(i, bucket, fn) }.toArray

    override def compute(split: Partition, context: TaskContext): Iterator[String] = split match {
    case S3NPartition(_, bucket, path) =>
    val client = new AmazonS3Client(instanceCreds())
    val obj = client.getObject(new GetObjectRequest(bucket, path))
    val br = new BufferedReader(new InputStreamReader(obj.getObjectContent()))

    Iterator.continually(br.readLine()).takeWhile {
    case null =>
    br.close()
    false
    case _ => true
    }
    }
    }

    /**
    * Simple helper to find files within the given bucket.
    */
    class S3NListing(bucket: String) {
    private def instanceCreds() = new InstanceProfileCredentialsProvider().getCredentials
    lazy val client = new AmazonS3Client(instanceCreds)

    /**
    * List files behind a given prefix, e.g. "" for all, "my-folder",
    * "my-folder/files-that-start-like-this", etc. Will eagerly fetch
    * all truncated results.
    */
    def list(folder: String) = recursiveListing(folder, None, Nil)

    @scala.annotation.tailrec
    private def recursiveListing(folder: String, prev: Option[ObjectListing], memo: List[Seq[String]]): List[String] = prev match {
    case None =>
    val listing = client.listObjects(bucket, folder)
    val keys = listing.getObjectSummaries.asScala.map(_.getKey)
    if (listing.isTruncated)
    recursiveListing(folder, Some(listing), keys :: memo)
    else
    keys.toList
    case Some(lastListing) =>
    val listing = client.listNextBatchOfObjects(lastListing)
    val keys = listing.getObjectSummaries.asScala.map(_.getKey())
    if(listing.isTruncated)
    recursiveListing(folder, Some(listing), keys :: memo)
    else
    (keys :: memo).flatten
    }
    }