Created
April 7, 2014 20:20
-
-
Save keeganwitt/10044534 to your computer and use it in GitHub Desktop.
A Groovy script to download HDFS files in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Usage: groovy hdfsDownloader.groovy [prod|qa|<dev username>] <file> (<output directory>) | |
// <file> can be an individual file or a directory of part files. | |
// If <output directory> is not specified, the current working directory is used | |
import groovy.json.JsonSlurper | |
import java.util.concurrent.Executors | |
import java.util.concurrent.ExecutorService | |
import java.util.concurrent.TimeUnit | |
class Main { | |
static final int THREAD_POOL_SIZE = 20 | |
static final String DEV_BASE_URL = "http://webhdfs.mycompany.com:14000/webhdfs/v1" | |
static final String QA_BASE_URL = "http://webhdfs.mycompany.com:14000/webhdfs/v1" | |
static final String PROD_BASE_URL = "http://webhdfs.mycompany.com:14000/webhdfs/v1" | |
static final String QA_USERNAME = "qa" | |
static final String PROD_USERNAME = "prod" | |
static void main(String[] args) { | |
String file = args[1] | |
File outputDir | |
if (args.size() == 3) { | |
outputDir = new File(args[2]) | |
} else { | |
outputDir = new File("").getCanonicalFile() | |
} | |
String username | |
String urlPrefix | |
if (args[0].equals("prod")) { | |
username = PROD_USERNAME | |
urlPrefix = urlPrefix = file.startsWith("/") ? PROD_BASE_URL : "${PROD_BASE_URL}/user/${username}" | |
} else if (args[0].equals("qa")) { | |
username = QA_USERNAME | |
urlPrefix = file.startsWith("/") ? QA_BASE_URL : "${QA_BASE_URL}/user/${username}" | |
} else { | |
username = args[0] | |
urlPrefix = file.startsWith("/") ? DEV_BASE_URL : "${DEV_BASE_URL}/user/${username}" | |
} | |
def json = new JsonSlurper().parseText("${urlPrefix}/${file}?op=LISTSTATUS&user.name=${username}".toURL().getText()) | |
def fileStatuses = json.FileStatuses.FileStatus | |
ExecutorService executor | |
if (fileStatuses.size() > 1) { | |
executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE) | |
File dir = new File(outputDir, new File(file).getName()) | |
dir.mkdir() | |
fileStatuses.each { status -> | |
if (status.type?.equals("FILE") && status.pathSuffix?.startsWith("part-")) { | |
executor.execute(new FileWriter(new File(dir, status.pathSuffix), "${urlPrefix}/${file}/${status.pathSuffix}?op=OPEN&user.name=${username}".toURL())) | |
} | |
} | |
} else { | |
executor = Executors.newSingleThreadExecutor() | |
executor.execute(new FileWriter(new File(outputDir, new File(file).getName()), "${urlPrefix}/${file}?op=OPEN&user.name=${username}".toURL())) | |
} | |
executor.shutdown() | |
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS) | |
} | |
} | |
class FileWriter implements Runnable { | |
static final int CHUNK_SIZE = 1024 | |
File file | |
URL url | |
FileWriter(File file, URL url) { | |
this.file = file | |
this.url = url | |
} | |
@Override | |
void run() { | |
println "Writing file [${file.getCanonicalPath()}]..." | |
InputStream is = url.openStream() | |
byte[] chunk = new byte[CHUNK_SIZE] | |
BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file)) | |
int n | |
while ((n = is.read(chunk)) > 0) { | |
outputStream.write(chunk, 0, n) | |
} | |
is.close() | |
outputStream.flush() | |
outputStream.close() | |
println "Finished writing file [${file.getCanonicalPath()}]." | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment