Created
August 20, 2016 10:33
-
-
Save stliu/12becc2e229d73f24a3cd8bf1b5e7c6f to your computer and use it in GitHub Desktop.
counter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import com.datastax.driver.core.*; | |
import com.datastax.driver.core.querybuilder.Assignment; | |
import com.datastax.driver.core.querybuilder.QueryBuilder; | |
import com.datastax.driver.core.querybuilder.Update; | |
import com.datastax.driver.core.schemabuilder.SchemaBuilder; | |
import com.datastax.driver.core.utils.UUIDs; | |
import lombok.Data; | |
import java.util.*; | |
/** | |
* 使用cassandra的counter表来记录一个坐席的未读消息数 | |
*/ | |
public class UnreadMessageCount { | |
public static final String KEYSPACE = "weichat"; | |
public static final String TABLE = "unread_message_count"; | |
public static final String UNREAD_MESSAGE_COUNT = "unread_message_count"; | |
public static final String AGENT_ID = "agent_id"; | |
public static final String SESSION_ID = "session_id"; | |
public static void main(String[] args) { | |
// At initialization: | |
Cluster cluster = Cluster.builder() | |
.addContactPoint("127.0.0.1") | |
.build(); | |
Session session = cluster.connect(); | |
createKeyspace(session); | |
createTable(session); | |
UUID agentId = UUIDs.timeBased(); | |
UUID sessionId = UUIDs.timeBased(); | |
for (int i = 0; i < 100; i++) { | |
addNewMessage(session, sessionId, agentId); | |
} | |
long count = getUnreadMessageCount(session, sessionId, agentId); | |
System.out.println("current unread message count is " + count + " which should be " + 100); | |
for (int i = 0; i < 50; i++) { | |
markReadMessage(session, sessionId, agentId); | |
} | |
count = getUnreadMessageCount(session, sessionId, agentId); | |
System.out.println("current unread message count is " + count + " which should be " + 50); | |
execute(session, SchemaBuilder.dropTable(KEYSPACE, TABLE)); | |
execute(session, SchemaBuilder.dropKeyspace(KEYSPACE)); | |
session.close(); | |
cluster.close(); | |
} | |
private static void addNewMessage(Session session, UUID sessionId, UUID agentId) { | |
changeCounterValue(session, sessionId, agentId, QueryBuilder.incr(UNREAD_MESSAGE_COUNT)); | |
} | |
private static void markReadMessage(Session session, UUID sessionId, UUID agentId) { | |
changeCounterValue(session, sessionId, agentId, QueryBuilder.decr(UNREAD_MESSAGE_COUNT)); | |
} | |
private static long getUnreadMessageCount(Session session, UUID sessionId, UUID agentId) { | |
Statement update = QueryBuilder.select(UNREAD_MESSAGE_COUNT) | |
.from(KEYSPACE, TABLE) | |
.where(QueryBuilder.eq(AGENT_ID, agentId)) | |
.and(QueryBuilder.eq(SESSION_ID, sessionId)); | |
ResultSet resultSet = execute(session, update); | |
Row counterRow = resultSet.one(); | |
return counterRow == null ? 0 : counterRow.getLong(UNREAD_MESSAGE_COUNT); | |
} | |
private static void changeCounterValue(Session session, UUID sessionId, UUID agentId, Assignment assignment) { | |
Statement update = QueryBuilder.update(KEYSPACE, TABLE) | |
.with(assignment) | |
.where(QueryBuilder.eq(AGENT_ID, agentId)) | |
.and(QueryBuilder.eq(SESSION_ID, sessionId)); | |
execute(session, update); | |
} | |
private static void createTable(Session session) { | |
Statement statement = SchemaBuilder.createTable(KEYSPACE, TABLE) | |
.addPartitionKey(AGENT_ID, DataType.timeuuid()) | |
.addClusteringColumn(SESSION_ID, DataType.timeuuid()) | |
.addColumn(UNREAD_MESSAGE_COUNT, DataType.counter()) | |
.ifNotExists(); | |
execute(session, statement); | |
} | |
private static void createKeyspace(Session session) { | |
Map<String, Object> replication = new HashMap<>(); | |
replication.put("class", "NetworkTopologyStrategy"); | |
replication.put("datacenter1", 1); | |
Statement statement = SchemaBuilder.createKeyspace(KEYSPACE).ifNotExists().with().replication(replication).durableWrites(false); | |
execute(session, statement); | |
} | |
private static ResultSet execute(Session session, Statement statement) { | |
// System.out.println("Executing:"); | |
// System.out.println("\t" + statement); | |
return session.execute(statement); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment