-
-
Save tobyjsullivan/fcef2e1d4da2d792372d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Based on the S3N RDD gist from Jeremy Pierre https://gist.github.com/j14159/d3cbe172f7b962d74d09 | |
* | |
* Modified to use Jets3t | |
*/ | |
package net.tobysullivan.spark.rdd | |
import java.io.{BufferedInputStream, BufferedReader, InputStreamReader} | |
import java.util.zip.GZIPInputStream | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.{Partition, SparkContext, TaskContext} | |
import org.jets3t.service.impl.rest.httpclient.RestS3Service | |
import org.jets3t.service.model.S3Bucket | |
import org.jets3t.service.security.AWSCredentials | |
import scala.io.Source | |
private [rdd] case class S3NPartition(idx: Int, bucket: String, path: String) extends Partition { | |
def index = idx | |
} | |
/** | |
* Directly construct and use, roughly equivalent to SparkContext.textFile calls but give this | |
* a list/sequence of files you want to load. This currently makes 1 Partition per file and | |
* once constructed, just use it like any other RDD. | |
* | |
* Example below will construct a RDD from all files starting with "some-files/file-" in the | |
* S3 bucket "my-bucket": | |
* | |
* new S3RDD(awsAccessKey, awsSecretKey, yourSparkContext, "my-bucket", new S3NListing("my-bukkit").list("some-files/file-")) | |
*/ | |
class S3NRDD(awsAccessKeyId: String, awsSecretAccessKey: String, sc: SparkContext, bucket: String, files: Seq[String]) extends RDD[String](sc, Nil) { | |
lazy val credentials = new AWSCredentials(awsAccessKeyId, awsSecretAccessKey) | |
override def getPartitions: Array[Partition] = | |
files.zipWithIndex.map { case (fn, i) => S3NPartition(i, bucket, fn) }.toArray | |
override def compute(split: Partition, context: TaskContext): Iterator[String] = split match { | |
case S3NPartition(_, bucket, path) => | |
val service = new RestS3Service(credentials) | |
val s3Bucket = new S3Bucket(bucket) | |
val obj = service.getObject(s3Bucket, path) | |
val content = obj.getDataInputStream() | |
// Uncompress any gzipped files | |
if (obj.getKey.endsWith(".gz")) | |
Source.fromInputStream(new GZIPInputStream(new BufferedInputStream(content))).getLines() | |
else { | |
val br = new BufferedReader(new InputStreamReader(content)) | |
Iterator.continually(br.readLine()).takeWhile { | |
case null => | |
br.close() | |
false | |
case _ => true | |
} | |
} | |
} | |
} | |
/** | |
* Simple helper to find files within the given bucket. | |
*/ | |
class S3NListing(awsAccessKeyId: String, awsSecretAccessKey: String, bucket: String) { | |
lazy val credentials = new AWSCredentials(awsAccessKeyId, awsSecretAccessKey) | |
lazy val service = new RestS3Service(credentials) | |
lazy val s3Bucket = new S3Bucket(bucket, S3Bucket.LOCATION_US) | |
/** | |
* List files behind a given prefix, e.g. "" for all, "my-folder", | |
* "my-folder/files-that-start-like-this", etc. Will eagerly fetch | |
* all truncated results. | |
*/ | |
def list(folder: String) = { | |
val listing = service.listObjects(s3Bucket, folder, null) | |
val keys = listing.map { obj => | |
val key = obj.getKey | |
key | |
} | |
keys.toSeq | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment