Skip to content

Instantly share code, notes, and snippets.

@gurbuzali
Created February 1, 2018 12:07
Show Gist options
  • Save gurbuzali/c54a10aa1a576dafa4275828fef7be34 to your computer and use it in GitHub Desktop.
Save gurbuzali/c54a10aa1a576dafa4275828fef7be34 to your computer and use it in GitHub Desktop.
/*
* Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jet;
import com.hazelcast.client.proxy.ClientMapProxy;
import com.hazelcast.config.Config;
import com.hazelcast.config.EventJournalConfig;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.accumulator.LongAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WindowDefinition;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.stream.IStreamMap;
import com.hazelcast.map.journal.EventJournalMapEvent;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.spi.properties.GroupProperty;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.hazelcast.core.EntryEventType.ADDED;
import static com.hazelcast.core.EntryEventType.UPDATED;
import static com.hazelcast.jet.JournalInitialPosition.START_FROM_OLDEST;
import static com.hazelcast.jet.core.Edge.between;
import static com.hazelcast.jet.core.Partitioner.HASH_CODE;
import static com.hazelcast.jet.core.WatermarkEmissionPolicy.emitByFrame;
import static com.hazelcast.jet.core.WatermarkGenerationParams.wmGenParams;
import static com.hazelcast.jet.core.WatermarkPolicies.withFixedLag;
import static com.hazelcast.jet.core.WindowDefinition.slidingWindowDef;
import static com.hazelcast.jet.core.processor.Processors.accumulateByFrameP;
import static com.hazelcast.jet.core.processor.Processors.combineToSlidingWindowP;
import static com.hazelcast.jet.core.processor.SourceProcessors.streamMapP;
import static com.hazelcast.jet.function.DistributedFunctions.entryKey;
import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;
import static java.util.concurrent.TimeUnit.MINUTES;
public class EventJournalTest {
private static final int PARTITION_COUNT = 271;
private static final int RING_BUFFER_SIZE = 100_000;
private static final int SLEEPY_MILLIS = 1;
private static final int POLL_COUNT = 20;
private final String mapName = "event-journal-map";
private final String resultsMapName = mapName + "-results";
private final int countPerTicker = 100;
private final int snapshotIntervalMs = 500;
private final int lagMs = 2000;
private final int windowSize = 20;
private final int slideBy = 10;
private final long durationInMillis = MINUTES.toMillis(10);
private final ExecutorService producerExecutor;
private final long[] offsets = new long[PARTITION_COUNT];
private final JetInstance jetClient;
private final JetInstance jet1;
private final JetInstance jet2;
private final JetInstance jet3;
private EventJournalTest() {
JetConfig jetConfig = getJetConfig();
jet1 = Jet.newJetInstance(jetConfig);
jet2 = Jet.newJetInstance(jetConfig);
jet3 = Jet.newJetInstance(jetConfig);
producerExecutor = Executors.newSingleThreadExecutor();
jetClient = Jet.newJetClient();
}
public static void main(String[] args) throws Exception {
EventJournalTest eventJournalTest = new EventJournalTest();
eventJournalTest.test();
}
public void test() throws Exception {
System.out.println("asd starting test");
JobConfig jobConfig = new JobConfig().addClass(EventJournalTest.class);
jobConfig.setSnapshotIntervalMillis(snapshotIntervalMs);
jobConfig.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
jetClient.newJob(testDAG(), jobConfig);
startProducer();
new Thread(() -> {
System.out.println("asd starting terminator");
try {
MINUTES.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("fatal terminating now");
jet2.getHazelcastInstance().getLifecycleService().terminate();
}).start();
long begin = System.currentTimeMillis();
while (System.currentTimeMillis() - begin < durationInMillis) {
List<EventJournalMapEvent<Long, Long>> list = poll(resultsMapName);
list.forEach(e -> {
if (countPerTicker != e.getNewValue()) {
System.out.print("asd -------------> ");
}
System.out.println(e.getKey() + " " + e.getNewValue());
// assertEquals("EXACTLY_ONCE -> Unexpected count for " + e.getKey(),
// countPerTicker, (long) e.getNewValue());
});
}
}
private DAG testDAG() {
WindowDefinition windowDef = slidingWindowDef(windowSize, slideBy);
AggregateOperation1<Object, LongAccumulator, Long> counting = AggregateOperations.counting();
DAG dag = new DAG();
WatermarkGenerationParams<Long> wmGenParams = wmGenParams((Long t) -> t, withFixedLag(lagMs),
emitByFrame(windowDef), 10_000);
Vertex journal = dag.newVertex("journal", streamMapP(mapName, e -> e.getType() == ADDED || e.getType() == UPDATED,
EventJournalMapEvent<Long, Long>::getNewValue, START_FROM_OLDEST, wmGenParams));
Vertex accumulateByF = dag.newVertex("accumulate-by-frame", accumulateByFrameP(
wholeItem(), (Long t) -> t, TimestampKind.EVENT, windowDef, counting)
);
Vertex slidingW = dag.newVertex("sliding-window", combineToSlidingWindowP(windowDef, counting));
Vertex writeMap = dag.newVertex("writeMap", SinkProcessors.writeMapP(resultsMapName));
dag
.edge(between(journal, accumulateByF).partitioned(wholeItem(), HASH_CODE))
.edge(between(accumulateByF, slidingW).partitioned(entryKey())
.distributed())
.edge(between(slidingW, writeMap));
return dag;
}
private JetConfig getJetConfig() {
JetConfig jetConfig = new JetConfig();
Config config = jetConfig.getHazelcastConfig();
config.setProperty(GroupProperty.PARTITION_COUNT.getName(), String.valueOf(PARTITION_COUNT));
EventJournalConfig eventJournalConfig = new EventJournalConfig();
eventJournalConfig.setEnabled(true).setMapName(mapName + "*").setCapacity(PARTITION_COUNT * RING_BUFFER_SIZE);
config.addEventJournalConfig(eventJournalConfig);
return jetConfig;
}
private void startProducer() {
System.out.println("asd starting producer");
producerExecutor.submit(() -> {
IStreamMap<Long, Long> map = jetClient.getMap(mapName);
for (long i = 0; true; i++) {
try {
Thread.sleep(SLEEPY_MILLIS);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < countPerTicker; j++) {
map.set(i, i);
}
}
});
}
public <K, V> List<EventJournalMapEvent<K, V>> poll(String mapName) throws Exception {
ClientMapProxy<K, V> proxy = (ClientMapProxy<K, V>) jetClient.getHazelcastInstance().getMap(mapName);
List<EventJournalMapEvent<K, V>> resultList = new ArrayList<>();
List<ICompletableFuture<ReadResultSet<EventJournalMapEvent<K, V>>>> futureList = new ArrayList<>();
for (int i = 0; i < PARTITION_COUNT; i++) {
ICompletableFuture<ReadResultSet<EventJournalMapEvent<K, V>>> f = proxy.readFromEventJournal(
offsets[i], 0, POLL_COUNT, i, null, null);
futureList.add(f);
}
for (int i = 0; i < PARTITION_COUNT; i++) {
ICompletableFuture<ReadResultSet<EventJournalMapEvent<K, V>>> future = futureList.get(i);
ReadResultSet<EventJournalMapEvent<K, V>> resultSet = future.get();
for (EventJournalMapEvent<K, V> event : resultSet) {
resultList.add(event);
}
offsets[i] = offsets[i] + resultSet.readCount();
}
return resultList;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment