Created
August 20, 2016 09:34
-
-
Save stliu/e20e0b74d5195c2cb7d4f875bb4aa48b to your computer and use it in GitHub Desktop.
paging with cassandra
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import com.datastax.driver.core.*; | |
import com.datastax.driver.core.querybuilder.QueryBuilder; | |
import com.datastax.driver.core.schemabuilder.SchemaBuilder; | |
import com.datastax.driver.core.utils.UUIDs; | |
import lombok.Data; | |
import java.util.*; | |
public class TestCassandraQuery { | |
public static final String KEYSPACE = "weichat"; | |
public static final String TABLE = "messages"; | |
public static final int sessionCount = 1; | |
public static final int messagePerSession = 1000; | |
public static void main(String[] args) { | |
// At initialization: | |
Cluster cluster = Cluster.builder() | |
.addContactPoint("127.0.0.1") | |
.build(); | |
Session session = cluster.connect(); | |
createKeyspace(session); | |
createTable(session); | |
List<Message> messages = buildMessages(); | |
messages.stream().map(message -> QueryBuilder.insertInto(KEYSPACE, TABLE) | |
.value("session_id", message.getSessionId()) | |
.value("message_id", message.getMessageId()) | |
.value("from_user", message.getFromUser()) | |
.value("to_user", message.getToUser()) | |
.value("body", message.getBody())) | |
.forEach(insert -> execute(session, insert)); | |
UUID sessionId = messages.get(0).getSessionId(); | |
int pageSize = 10; | |
String nextPage = fetchByPage(session, sessionId, pageSize, null); | |
while (nextPage != null) { | |
nextPage = fetchByPage(session, sessionId, pageSize, nextPage); | |
} | |
execute(session, SchemaBuilder.dropTable(KEYSPACE, TABLE)); | |
execute(session, SchemaBuilder.dropKeyspace(KEYSPACE)); | |
session.close(); | |
cluster.close(); | |
} | |
private static List<Message> buildMessages() { | |
List<Message> messages = new ArrayList<>(sessionCount * messagePerSession); | |
for (int i = 0; i < 100; i++) { | |
UUID sessionId = UUIDs.timeBased(); | |
String from = "f" + i; | |
String to = "t" + i; | |
for (int j = 0; j < 1000; j++) { | |
UUID messageId = UUIDs.timeBased(); | |
String msg = "msg-" + i + "-" + j; | |
Message message = new Message(); | |
message.setSessionId(sessionId); | |
message.setMessageId(messageId); | |
message.setFromUser(from); | |
message.setToUser(to); | |
message.setBody(msg); | |
messages.add(message); | |
} | |
} | |
return messages; | |
} | |
@Data | |
public static class Message { | |
private UUID sessionId; | |
private UUID messageId; | |
private String fromUser; | |
private String toUser; | |
private String body; | |
} | |
private static void createTable(Session session) { | |
Statement statement = SchemaBuilder.createTable(KEYSPACE, TABLE) | |
.addPartitionKey("session_id", DataType.timeuuid()) | |
.addColumn("from_user", DataType.varchar()) | |
.addColumn("to_user", DataType.varchar()) | |
.addColumn("body", DataType.text()) | |
.addClusteringColumn("message_id", DataType.timeuuid()) | |
.ifNotExists() | |
.withOptions().clusteringOrder("message_id", SchemaBuilder.Direction.DESC); | |
execute(session, statement); | |
} | |
private static void createKeyspace(Session session) { | |
Map<String, Object> replication = new HashMap<>(); | |
replication.put("class", "NetworkTopologyStrategy"); | |
replication.put("datacenter1", 1); | |
Statement statement = SchemaBuilder.createKeyspace(KEYSPACE).ifNotExists().with().replication(replication).durableWrites(false); | |
execute(session, statement); | |
} | |
private static String fetchByPage(Session session, UUID sessionId, int pageSize, String pagingStateBase64ed) { | |
Statement statement = QueryBuilder.select().all().from(KEYSPACE, TABLE).where(QueryBuilder.eq("session_id", sessionId)); | |
statement.setFetchSize(pageSize); | |
if (pagingStateBase64ed != null) { | |
byte[] bytes = Base64.getUrlDecoder().decode(pagingStateBase64ed); | |
PagingState pagingState = PagingState.fromBytes(bytes); | |
statement.setPagingState(pagingState); | |
} | |
ResultSet resultSet = execute(session, statement); | |
PagingState pagingState = resultSet.getExecutionInfo().getPagingState(); | |
String base64Url = pagingState != null ? Base64.getUrlEncoder().encodeToString(pagingState.toBytes()) : null; | |
System.out.println("paging state with base64 url " + base64Url); | |
int rowIndex = 0; | |
for (Row row : resultSet) { | |
UUID sid = row.get("session_id", UUID.class); | |
UUID messageId = row.get("message_id", UUID.class); | |
String body = row.getString("body"); | |
System.out.println(rowIndex + " session " + sid + " message " + messageId + " body " + body); | |
if (++rowIndex >= pageSize) { | |
break; | |
} | |
} | |
return base64Url; | |
} | |
private static ResultSet execute(Session session, Statement statement) { | |
System.out.println("Executing:"); | |
System.out.println("\t" + statement); | |
return session.execute(statement); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment