Last active
September 15, 2017 21:23
-
-
Save cwensel/c093adb247f55168cf24a66626a932b3 to your computer and use it in GitHub Desktop.
Two Cascading Flows, first to read from S3 into a Kafka queue, second to read from the queue to a partitioned directory structure.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class S3LogsViaKafka | |
{ | |
public static final String DD_MMM_YYYY = "dd-MMM-yyyy"; | |
public static final TimeZone UTC = TimeZone.getTimeZone( "UTC" ); | |
public static final DateType DMY = new DateType( DD_MMM_YYYY, UTC ); | |
public static final Fields KEY = new Fields( "date", DMY ); | |
public static final Fields LINE = new Fields( "line", String.class ); | |
public static final Fields KEY_LINE = KEY.append( LINE ); | |
public static void main( String[] args ) | |
{ | |
if( args.length != 3 ) | |
return; | |
System.out.println( "source s3 uri = " + args[ 0 ] ); | |
System.out.println( "kafka host = " + args[ 1 ] ); | |
System.out.println( "sink file path = " + args[ 2 ] ); | |
// read from an S3 bucket | |
Tap inputTap = new S3Tap( new TextLine(), URI.create( args[ 0 ] ) ); | |
// write and read from a Kafka queue | |
Tap queueTap = new KafkaTap<>( new TextKafkaScheme( TOPIC_FIELDS.append( OFFSET_FIELDS ).append( KEY_LINE ) ), args[ 1 ], "logs" ); | |
// write to disk, using log data to create the directory structure | |
DelimitedPartition partitioner = new DelimitedPartition( KEY.append( S3Logs.operation ), "/", "logs.csv" ); | |
Tap outputTap = new PartitionTap( | |
new DirTap( new TextDelimited( true, ",", "\"" ), args[ 2 ], SinkMode.REPLACE ), partitioner | |
); | |
Pipe ingress = new Pipe( "head" ); | |
// extract the log timestamp and reduce to day/month/year for use as the queue key | |
ingress = new Each( ingress, new Fields( "line" ), new RegexParser( S3Logs.time, REGEX, 3 ), new Fields( "time", "line" ) ); | |
ingress = new Each( ingress, S3Logs.time, new DateFormatter( KEY, DD_MMM_YYYY, UTC ), KEY_LINE ); | |
// watch the progress on the console | |
ingress = new Each( ingress, new Debug( true ) ); | |
Flow ingressFlow = new LocalFlowConnector().connect( flowDef() | |
.setName( "ingress" ) | |
.addSource( ingress, inputTap ) | |
.addSink( ingress, queueTap ) | |
.addTail( ingress ) | |
); | |
// start reading from S3 and writing to a Kafka queue | |
ingressFlow.start(); | |
Pipe egress = new Pipe( "head" ); | |
// parse the full log into its fields and primitive values -- S3Logs.FIELDS declard field names and field types | |
egress = new Each( egress, new Fields( "line" ), new RegexParser( S3Logs.FIELDS, REGEX ), KEY.append( S3Logs.FIELDS ) ); | |
// watch the progress on the console | |
egress = new Each( egress, new Debug( true ) ); | |
Flow egressFlow = new LocalFlowConnector().connect( flowDef() | |
.setName( "egress" ) | |
.addSource( egress, queueTap ) | |
.addSink( egress, outputTap ) | |
.addTail( egress ) | |
); | |
// start reading from the Kafka queue and writing to the directory as ./[dd-MMM-yyyy]/[S3 operation]/logs.csv | |
egressFlow.start(); | |
egressFlow.complete(); | |
System.out.println( "completed egress" ); | |
ingressFlow.complete(); | |
System.out.println( "completed ingress" ); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment