Created
February 25, 2019 16:04
-
-
Save caioaao/f7480cd9fa2e5f32766027c2e65bcef4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns aeron-cluster-test.core | |
(:require [clojure.stacktrace :refer [print-stack-trace]]) | |
(:import [org.agrona CloseHelper] | |
[org.agrona.concurrent AgentTerminationException] | |
[org.agrona ExpandableArrayBuffer] | |
[org.agrona ErrorHandler] | |
[io.aeron.cluster ClusteredMediaDriver ConsensusModule ConsensusModule$Context] | |
[io.aeron.cluster.client EgressListener AeronCluster AeronCluster$Context ] | |
[io.aeron.cluster.service ClusteredService ClusteredServiceContainer ClusteredServiceContainer$Context Cluster ClusteredServiceAgent ClientSession] | |
[io.aeron.driver MediaDriver$Context ThreadingMode] | |
[io.aeron.archive Archive$Context ArchiveThreadingMode] | |
[java.lang Throwable]) | |
(:gen-class)) | |
(def pst-error-handler | |
(reify ErrorHandler | |
(onError [_ throwable] | |
(print-stack-trace throwable)))) | |
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L49 | |
(defn make-clustered-media-driver! [] | |
(ClusteredMediaDriver/launch | |
(-> (MediaDriver$Context.) | |
(.threadingMode ThreadingMode/SHARED) | |
(.termBufferSparseFile true) | |
(.dirDeleteOnStart true) | |
(.errorHandler pst-error-handler)) | |
(-> (Archive$Context.) | |
(.maxCatalogEntries 1024) | |
(.threadingMode ArchiveThreadingMode/SHARED) | |
(.deleteArchiveOnStart true)) | |
(-> (ConsensusModule$Context.) | |
(.errorHandler pst-error-handler) | |
(.deleteDirOnStart true) | |
(.terminationHook #(throw (AgentTerminationException.)))))) | |
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L67 | |
(defn stop! [clustered-media-driver container cluster] | |
(CloseHelper/close cluster) | |
(CloseHelper/close container) | |
(CloseHelper/close clustered-media-driver) | |
(-> clustered-media-driver | |
.consensusModule | |
.context | |
.deleteDirectory) | |
(-> clustered-media-driver | |
.archive | |
.context | |
.deleteArchiveDirectory) | |
[] (-> clustered-media-driver | |
.mediaDriver | |
.context | |
.deleteAeronDirectory)) | |
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L100 | |
(defn make-egress-listener! [msg-counter] | |
(reify EgressListener | |
(onMessage [this session-id timestamp-ms | |
buffer offset length header] | |
(swap! msg-counter inc)))) | |
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L163 | |
(defn launch-echo-service! [] | |
(let [echo-service (reify ClusteredService | |
(onStart [this new-cluster]) | |
(onSessionOpen [_this _session _timestamp-ms]) | |
(onSessionClose [_this _session _timestamp-ms _close-reason]) | |
(onSessionMessage [this session timestamp-ms buffer offset length header] | |
(while (< (.offer session buffer offset length) 0) | |
(Thread/yield))) | |
(onTimerEvent [_this _correlation-id _timestamp-ms]) | |
(onTakeSnapshot [_this _snapshot-pub]) | |
(onLoadSnapshot [_this _snapshot-image]) | |
(onRoleChange [_this _new-role]))] | |
(ClusteredServiceContainer/launch | |
(-> (ClusteredServiceContainer$Context.) | |
(.clusteredService echo-service) | |
(.errorHandler pst-error-handler))))) | |
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L235 | |
(defn connect-to-cluster! [listener] | |
(AeronCluster/connect | |
(-> (AeronCluster$Context.) | |
(.egressListener listener) | |
(.ingressChannel "aeron:udp") | |
(.clusterMemberEndpoints "0=localhost:9010,1=localhost:9011,2=localhost:9012")))) | |
(comment | |
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L92 | |
(def clustered-media-driver (make-clustered-media-driver!)) | |
(def buff (ExpandableArrayBuffer.)) | |
(.putStringWithoutLengthAscii buff 0 "sup") | |
(def msg-counter (atom 0)) | |
(def egress-listener (make-egress-listener! msg-counter)) | |
(def container (launch-echo-service!)) | |
(def aeron-cluster (connect-to-cluster! egress-listener)) | |
(while (< (.offer aeron-cluster buff 0 3) 0) | |
(Thread/yield)) | |
(while (= @msg-counter 0) | |
(if (<= (.pollEgress aeron-cluster) 0) | |
(Thread/yield))) | |
(stop! clustered-media-driver container aeron-cluster) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment