Skip to content

Instantly share code, notes, and snippets.

@gurbuzali
Created February 9, 2018 13:38
Show Gist options
  • Save gurbuzali/897f5cd15d3347e29fb5af6c4ed2e453 to your computer and use it in GitHub Desktop.
Save gurbuzali/897f5cd15d3347e29fb5af6c4ed2e453 to your computer and use it in GitHub Desktop.
/*
* Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jet;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.proxy.ClientMapProxy;
import com.hazelcast.config.Config;
import com.hazelcast.config.EventJournalConfig;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.stream.IStreamMap;
import com.hazelcast.map.journal.EventJournalMapEvent;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.spi.properties.GroupProperty;
import static java.util.concurrent.TimeUnit.SECONDS;
public class EventJournalTest {
private static final int PARTITION_COUNT = 12;
private static final int COUNT_PER_TICKER = 100;
private static final int RING_BUFFER_SIZE = 500_000;
private static final int MAX_POLL = 10_000;
private static final int SLEEP_SECONDS = 20;
private static final String MAP_NAME = "map";
volatile boolean running = true;
volatile long lastIndex = -1;
JetInstance[] instances;
JetInstance client;
Thread thread;
public static void main(String[] args) throws Exception {
EventJournalTest eventJournalTest = new EventJournalTest();
// Create cluster with event-journal config and partition count
eventJournalTest.createCluster();
// Create client
eventJournalTest.createClient();
// start producing
eventJournalTest.startProducing();
SECONDS.sleep(SLEEP_SECONDS);
// terminate first instance
eventJournalTest.terminate(3);
SECONDS.sleep(SLEEP_SECONDS);
// terminate second instance
eventJournalTest.terminate(2);
SECONDS.sleep(SLEEP_SECONDS);
// stop producing
eventJournalTest.stopProducing();
// check size
eventJournalTest.checkEventJournalSize();
Jet.shutdownAll();
}
private void checkEventJournalSize() throws Exception {
ClientMapProxy<Long, Long> proxy = (ClientMapProxy) client.getHazelcastInstance().getMap(MAP_NAME);
long total = 0;
for (int i = 0; i < PARTITION_COUNT; i++) {
long seq = 0;
int readCount = 1;
while (readCount > 0) {
ICompletableFuture<ReadResultSet<EventJournalMapEvent<Long, Long>>> f =
proxy.readFromEventJournal(seq, 0, MAX_POLL, i, null, null);
ReadResultSet<EventJournalMapEvent<Long, Long>> resultSet = f.get();
readCount = resultSet.readCount();
seq += readCount;
}
System.out.println("for partition " + i + ", seq: " + seq);
total += seq;
}
System.out.println("total: " + total + " lastIndex: " + (lastIndex + 1) * COUNT_PER_TICKER);
}
private void createCluster() {
JetConfig jetConfig = getJetConfig(PARTITION_COUNT, MAP_NAME, RING_BUFFER_SIZE);
instances = new JetInstance[]{
Jet.newJetInstance(jetConfig),
Jet.newJetInstance(jetConfig),
Jet.newJetInstance(jetConfig),
Jet.newJetInstance(jetConfig)
};
}
private void createClient() {
ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig().setName("jet").setPassword("jet-pass");
clientConfig.getNetworkConfig().setRedoOperation(false);
client = Jet.newJetClient(clientConfig);
}
private void startProducing() {
thread = new Thread(() -> {
IStreamMap<Object, Object> map = client.getMap(MAP_NAME);
for (long i = 0; running; i++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < COUNT_PER_TICKER; j++) {
map.set(i, i);
}
lastIndex = i;
}
});
thread.start();
}
private void stopProducing() throws InterruptedException {
running = false;
thread.join();
}
private void terminate(int index) {
instances[index].getHazelcastInstance().getLifecycleService().terminate();
}
private static JetConfig getJetConfig(int partitionCount, String mapName, int ringBufferSize) {
JetConfig jetConfig = new JetConfig();
Config config = jetConfig.getHazelcastConfig();
config.setProperty(GroupProperty.PARTITION_COUNT.getName(), String.valueOf(partitionCount));
EventJournalConfig eventJournalConfig = new EventJournalConfig();
eventJournalConfig.setEnabled(true).setMapName(mapName + "*").setCapacity(partitionCount * ringBufferSize);
config.addEventJournalConfig(eventJournalConfig);
return jetConfig;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment