Skip to content

Instantly share code, notes, and snippets.

@erraggy
Created December 3, 2013 01:22
Show Gist options
  • Select an option

  • Save erraggy/7762285 to your computer and use it in GitHub Desktop.

Select an option

Save erraggy/7762285 to your computer and use it in GitHub Desktop.
class WebTablePagesBySiteJob extends HJob[NoSettings]("Get articles by site",
HMapReduceTask(
HTaskID("Articles by Site"),
HTaskConfigs(),
HIO(
HTableInput(WebCrawlingSchema.WebTable),
HPathOutput("/reports/wordcount")
),
new FromTableBinaryMapperFx(WebCrawlingSchema.WebTable) {
val webPage : WebPageRow = row //For illustrative purposes we're specifying the type here, no need to
val domain = row.domain //We've added a convenience method to WebPageRow to extract the domain for us
write(
{keyOutput=>keyOutput.writeUTF(domain)}, //This writes out the domain as the key
{valueOutput=>valueOutput.writeRow(WebCrawlingSchema.WebTable,webPage)} //This writes the entire value of the row out
)
},
new BinaryToTextReducerFx {
val domain = readKey(_.readUTF()) //This allows you to read out the key
perValue{valueInput=>
val webPage : WebPageRow = valueInput.readRow(WebCrawlingSchema.WebTable) //Now you can read out the entire WebPageRow object from the value stream
ctr("Pages for domain " + domain)
writeln(domain + "\t" + webPage.column(_.title).getOrElse("No Title")) //This is a convenience function that writes a line to the text output
}
}
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment