Last active
August 20, 2016 09:39
-
-
Save stliu/4d75b4e1cfbd4c7da4691df4edd853b3 to your computer and use it in GitHub Desktop.
paging with cassandra
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import com.datastax.driver.core.*; | |
import com.datastax.driver.core.querybuilder.QueryBuilder; | |
import com.datastax.driver.core.schemabuilder.SchemaBuilder; | |
import com.datastax.driver.core.utils.UUIDs; | |
import lombok.Data; | |
import java.util.*; | |
/** | |
* 演示了如何使用cassandra来做分页查询 | |
* | |
* 基本思路是: | |
* | |
* 1. 通过fetch size来指定单页面查询的limit | |
* 2. 通过查询返回的{@link PagingState}对象来保存当前的分页信息 | |
* | |
* 这个对象可以被序列化(然后再使用base64编码)就可以返回给前端了, 这样前端需要下一页的数据的时候, 带上这个信息就可以了 | |
* 3. 查询下一页的时候, 给出这个{@link PagingState}即可 | |
* | |
* 还可以通过保存这个, 来在返回的response中直接给出"上一页"和"下一页"的link | |
* | |
* @see <a href="https://datastax.github.io/java-driver/2.1.8/features/paging/">Cassandra Paging</a> | |
*/ | |
public class TestCassandraQuery { | |
public static final String KEYSPACE = "weichat"; | |
public static final String TABLE = "messages"; | |
public static final int sessionCount = 1; | |
public static final int messagePerSession = 1000; | |
public static void main(String[] args) { | |
// At initialization: | |
Cluster cluster = Cluster.builder() | |
.addContactPoint("127.0.0.1") | |
.build(); | |
Session session = cluster.connect(); | |
createKeyspace(session); | |
createTable(session); | |
List<Message> messages = buildMessages(); | |
messages.stream().map(message -> QueryBuilder.insertInto(KEYSPACE, TABLE) | |
.value("session_id", message.getSessionId()) | |
.value("message_id", message.getMessageId()) | |
.value("from_user", message.getFromUser()) | |
.value("to_user", message.getToUser()) | |
.value("body", message.getBody())) | |
.forEach(insert -> execute(session, insert)); | |
UUID sessionId = messages.get(0).getSessionId(); | |
int pageSize = 10; | |
String nextPage = fetchByPage(session, sessionId, pageSize, null); | |
while (nextPage != null) { | |
nextPage = fetchByPage(session, sessionId, pageSize, nextPage); | |
} | |
execute(session, SchemaBuilder.dropTable(KEYSPACE, TABLE)); | |
execute(session, SchemaBuilder.dropKeyspace(KEYSPACE)); | |
session.close(); | |
cluster.close(); | |
} | |
private static List<Message> buildMessages() { | |
List<Message> messages = new ArrayList<>(sessionCount * messagePerSession); | |
for (int i = 0; i < 100; i++) { | |
UUID sessionId = UUIDs.timeBased(); | |
String from = "f" + i; | |
String to = "t" + i; | |
for (int j = 0; j < 1000; j++) { | |
UUID messageId = UUIDs.timeBased(); | |
String msg = "msg-" + i + "-" + j; | |
Message message = new Message(); | |
message.setSessionId(sessionId); | |
message.setMessageId(messageId); | |
message.setFromUser(from); | |
message.setToUser(to); | |
message.setBody(msg); | |
messages.add(message); | |
} | |
} | |
return messages; | |
} | |
@Data | |
public static class Message { | |
private UUID sessionId; | |
private UUID messageId; | |
private String fromUser; | |
private String toUser; | |
private String body; | |
} | |
private static void createTable(Session session) { | |
Statement statement = SchemaBuilder.createTable(KEYSPACE, TABLE) | |
.addPartitionKey("session_id", DataType.timeuuid()) | |
.addColumn("from_user", DataType.varchar()) | |
.addColumn("to_user", DataType.varchar()) | |
.addColumn("body", DataType.text()) | |
.addClusteringColumn("message_id", DataType.timeuuid()) | |
.ifNotExists() | |
.withOptions().clusteringOrder("message_id", SchemaBuilder.Direction.DESC); | |
execute(session, statement); | |
} | |
private static void createKeyspace(Session session) { | |
Map<String, Object> replication = new HashMap<>(); | |
replication.put("class", "NetworkTopologyStrategy"); | |
replication.put("datacenter1", 1); | |
Statement statement = SchemaBuilder.createKeyspace(KEYSPACE).ifNotExists().with().replication(replication).durableWrites(false); | |
execute(session, statement); | |
} | |
private static String fetchByPage(Session session, UUID sessionId, int pageSize, String pagingStateBase64ed) { | |
Statement statement = QueryBuilder.select().all().from(KEYSPACE, TABLE).where(QueryBuilder.eq("session_id", sessionId)); | |
statement.setFetchSize(pageSize); | |
if (pagingStateBase64ed != null) { | |
byte[] bytes = Base64.getUrlDecoder().decode(pagingStateBase64ed); | |
PagingState pagingState = PagingState.fromBytes(bytes); | |
statement.setPagingState(pagingState); | |
} | |
ResultSet resultSet = execute(session, statement); | |
PagingState pagingState = resultSet.getExecutionInfo().getPagingState(); | |
String base64Url = pagingState != null ? Base64.getUrlEncoder().encodeToString(pagingState.toBytes()) : null; | |
System.out.println("paging state with base64 url " + base64Url); | |
int rowIndex = 0; | |
for (Row row : resultSet) { | |
UUID sid = row.get("session_id", UUID.class); | |
UUID messageId = row.get("message_id", UUID.class); | |
String body = row.getString("body"); | |
System.out.println(rowIndex + " session " + sid + " message " + messageId + " body " + body); | |
if (++rowIndex >= pageSize) { | |
break; | |
} | |
} | |
return base64Url; | |
} | |
private static ResultSet execute(Session session, Statement statement) { | |
System.out.println("Executing:"); | |
System.out.println("\t" + statement); | |
return session.execute(statement); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment