Created
February 4, 2020 14:33
-
-
Save alessiosavi/4ea88d73d6853de695843631207b7bc6 to your computer and use it in GitHub Desktop.
Consume/Read data from Kinesis Stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example; | |
import com.amazonaws.services.kinesis.AmazonKinesis; | |
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; | |
import com.amazonaws.services.kinesis.model.*; | |
import java.nio.charset.StandardCharsets; | |
import java.util.List; | |
/** | |
* Hello world! | |
*/ | |
public class App { | |
private static final String streamName = "API_NAME" + "_kineis-notification-stream"; | |
private static final AmazonKinesis client = AmazonKinesisClientBuilder.defaultClient(); | |
public static void main(String[] args) { | |
printKinesisRecords(getRecordsFromKinesis(client)); | |
} | |
private static List<Record> getRecordsFromKinesis(AmazonKinesis kClient) { | |
final ListShardsRequest listShardsRequest = new ListShardsRequest().withStreamName(streamName).withMaxResults(1); | |
Shard shard = kClient.listShards(listShardsRequest).getShards().get(0); | |
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); | |
getShardIteratorRequest.setStreamName(streamName); | |
getShardIteratorRequest.setShardId(shard.getShardId()); | |
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); | |
final GetShardIteratorResult getShardIteratorResult = kClient.getShardIterator(getShardIteratorRequest); | |
String shardIterator = getShardIteratorResult.getShardIterator(); | |
// Create a new getRecordsRequest with an existing shardIterator | |
// Set the maximum records to return to 1 | |
GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); | |
getRecordsRequest.setShardIterator(shardIterator); | |
getRecordsRequest.setLimit(10); | |
final GetRecordsResult result = kClient.getRecords(getRecordsRequest); | |
// Put the result into record list. The result can be empty. | |
return result.getRecords(); | |
} | |
private static void printKinesisRecords(List<Record> records) { | |
for (Record record : records) { | |
System.err.println("RECORD: " + StandardCharsets.UTF_8.decode(record.getData()).toString()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment