Last active
September 17, 2015 14:27
-
-
Save marmbrus/fff0b058f134fa7752fe to your computer and use it in GitHub Desktop.
Spark Hadoop Filesystem Textfile Iterator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{BufferedReader, InputStreamReader} | |
import java.util.zip.GZIPInputStream | |
import org.apache.hadoop.fs.{FileSystem, Path} | |
import org.apache.hadoop.conf.Configuration | |
/** | |
* Returns an iterator over the lines in the file using the hadoop file system. | |
* | |
* Comparison to sparkContext.textFile: | |
* - Handles large numbers of S3 files with out blocking the driver forever while | |
* retrieving metadata. | |
* - Gives access to whole files, which is useful when a single record can span more | |
* than a line. | |
* - Doesn't split large text files. | |
* - Probably missing all kinds of retry logic. | |
*/ | |
class ReadLinesSafe(fileLocation: String) extends Iterator[String] with Serializable { | |
val maxLineLength = 10000000 | |
var currentLine: StringBuilder = null | |
var eof = false | |
val fs = FileSystem.get(new Configuration()) | |
val path = new Path(fileLocation) | |
val inputStream = { | |
try { | |
if (fileLocation.endsWith("gz")) | |
new java.util.zip.GZIPInputStream(fs.open(path)) | |
else | |
fs.open(path) | |
} catch { | |
case e: Exception => | |
eof = true | |
null | |
} | |
} | |
val reader = if (!eof) new BufferedReader(new InputStreamReader(inputStream)) else null | |
var nextChar: Int = _ | |
def readNext(): Unit = { | |
nextChar = try reader.read() catch { | |
case e: Exception => | |
println("FAILURE") | |
-1 | |
} | |
} | |
def hasNext: Boolean = { | |
if (eof) { | |
if (inputStream != null) inputStream.close(); | |
return false | |
} | |
if (currentLine != null) { | |
return true | |
} | |
currentLine = new StringBuilder() | |
readNext() | |
if (nextChar == -1) { | |
if (inputStream != null) inputStream.close(); | |
eof = true | |
} | |
while (nextChar != -1 && nextChar != '\n') { | |
if (nextChar != 0 && currentLine.length < maxLineLength) | |
currentLine.append(nextChar.toChar) | |
readNext() | |
} | |
!eof | |
} | |
def next(): String = { | |
assert(hasNext) | |
val ret = | |
if (currentLine.length >= maxLineLength) { | |
currentLine.toString + "[TRUNCATED]" | |
} else { | |
currentLine.toString | |
} | |
currentLine = null | |
ret | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment