Last active
September 12, 2023 16:12
-
-
Save jarohen/24f6c4dbc505b65748f7b7a50de8d527 to your computer and use it in GitHub Desktop.
A quick hack at actually implementing the Raft algorithm to aid my learning - thanks @bbengfort and @krisajenkins for the podcast!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns rafting | |
(:require [clojure.tools.logging :as log]) | |
(:import java.lang.AutoCloseable | |
(java.util.concurrent CompletableFuture Executors TimeUnit))) | |
(defn new-timeout-ms [] | |
(+ (System/currentTimeMillis) | |
200 | |
(rand-int 150))) | |
(defn timeout? [{:keys [timeout-at-ms]}] | |
(< timeout-at-ms (System/currentTimeMillis))) | |
(defn quorum [ag-count] | |
(inc (quot ag-count 2))) | |
(defn other-agents [{:keys [id]} agents] | |
(for [!ag agents | |
:let [{other-id :id} @!ag] | |
:when (not= other-id id)] | |
!ag)) | |
(defn append-events-resp [{:keys [current-term] :as ag} {resp-term :term, :keys [follower-id success? last-log-idx]} agents] | |
(cond | |
success? | |
(cond-> ag | |
last-log-idx (-> (assoc-in [:next-indices follower-id] (inc last-log-idx)) | |
(assoc-in [:match-indices follower-id] last-log-idx))) | |
(not= resp-term current-term) | |
(-> ag | |
(assoc :state :follower, | |
:current-term resp-term | |
:timeout-at-ms (new-timeout-ms)) | |
(dissoc :next-indices :match-indices)) | |
:else | |
(-> ag | |
(update-in [:next-indices follower-id] dec)))) | |
(defn append-events-req [{:keys [id current-term log commit-idx] :as ag} | |
{req-term :term, :keys [leader-id events prev-log leader-commit-idx]} | |
agents] | |
(if (< req-term current-term) | |
(do | |
(send (nth agents leader-id) append-events-resp | |
{:success? false, :follower-id id, :term current-term} | |
agents) | |
ag) | |
(let [new-commit-idx (if (>= commit-idx leader-commit-idx) | |
commit-idx | |
(cond-> leader-commit-idx | |
(seq events) (min (:log-idx (last events))))) | |
{:keys [current-term] :as ag} (-> ag | |
(assoc :state :follower, | |
:current-term req-term | |
:leader-id leader-id | |
:timeout-at-ms (new-timeout-ms) | |
:commit-idx new-commit-idx) | |
(dissoc :voted-for :votes-recd | |
:next-indices :match-indices))] | |
(cond | |
(when-let [{prev-log-idx :log-idx, prev-log-term :term} prev-log] | |
(or (>= prev-log-idx (count log)) | |
(not= (:term (nth log prev-log-idx)) prev-log-term))) | |
(do | |
(send (nth agents leader-id) append-events-resp | |
{:success? false, :follower-id id, :term current-term} | |
agents) | |
(-> ag | |
(update :log subvec 0 (:log-idx prev-log)))) | |
:else | |
(do | |
(dotimes [n (- new-commit-idx commit-idx)] | |
(log/infof "Agent %d: apply log-idx %d" id (+ commit-idx n 1))) | |
(send (nth agents leader-id) append-events-resp | |
{:success? true, :term current-term, :follower-id id, :last-log-idx (:log-idx (last events))} | |
agents) | |
(-> ag | |
(update :log into events))))))) | |
(defn request-vote-resp [{:keys [id state current-term votes-recd log commit-idx] :as ag} {resp-term :term, :keys [voter-id vote-granted?]} agents] | |
(log/infof "Agent %d received %s from %d" id (if vote-granted? "vote" "rejection") voter-id) | |
(if (and (= state :candidate) vote-granted? (= current-term resp-term)) | |
(let [votes-recd (conj votes-recd voter-id)] | |
(if (>= (count votes-recd) (quorum (count agents))) | |
(do | |
(log/infof "Agent %d is now leader!" id) | |
(doseq [!ag (other-agents ag agents)] | |
(send !ag append-events-req | |
{:term current-term, :leader-id id, :leader-commit-idx commit-idx} | |
agents)) | |
(-> ag | |
(assoc :state :leader, | |
:leader-id id | |
:next-indices (vec (repeat (count agents) (count log))) | |
:match-indices (vec (repeat (count agents) -1))) | |
(dissoc :votes-recd :voted-for))) | |
(assoc ag :votes-recd votes-recd))) | |
ag)) | |
(defn request-vote-req [{:keys [id current-term voted-for] :as ag} {candidate-term :term, :keys [candidate-id]} agents] | |
(log/infof "Agent %d received vote request from %d" id candidate-id) | |
(if (and (< current-term candidate-term) | |
(nil? voted-for)) | |
(do | |
(send (nth agents candidate-id) request-vote-resp | |
{:term candidate-term, | |
:voter-id id | |
:vote-granted? true} | |
agents) | |
(-> ag | |
(assoc :term candidate-term, :voted-for candidate-id))) | |
(do | |
(send (nth agents candidate-id) request-vote-resp | |
{:term current-term, | |
:voter-id id | |
:vote-granted? false} | |
agents) | |
ag))) | |
(defn tick [{:keys [id state current-term] :as ag} agents] | |
(case state | |
(:follower :candidate) | |
(cond | |
(timeout? ag) | |
(let [term (inc current-term)] | |
(doseq [!ag (other-agents ag agents)] | |
(send !ag request-vote-req | |
{:term term, :candidate-id id} | |
agents)) | |
(assoc ag | |
:state :candidate | |
:current-term term | |
:voted-for id | |
:votes-recd #{id} | |
:timeout-at-ms (new-timeout-ms))) | |
:else ag) | |
:leader | |
(let [{:keys [log next-indices match-indices commit-idx]} ag | |
new-commit-idx (nth (sort match-indices) (quorum (count agents)))] | |
(dotimes [n (- new-commit-idx commit-idx)] | |
(log/infof "Agent %d: apply log-idx %d" id (+ commit-idx n 1))) | |
(doseq [!ag (other-agents ag agents) | |
:let [{ag-id :id} @!ag | |
next-idx (nth next-indices ag-id)]] | |
(send !ag append-events-req | |
{:term current-term, | |
:leader-id id | |
:events (when (< next-idx (count log)) | |
(subvec log next-idx (min (- (count log) next-idx) | |
10))) | |
:leader-commit-idx new-commit-idx | |
:prev-log (let [prev-log-idx (dec next-idx)] | |
(when-not (neg? prev-log-idx) | |
{:log-idx prev-log-idx | |
:term (:term (nth log prev-log-idx))}))} | |
agents)) | |
(-> ag | |
(assoc :commit-idx new-commit-idx))))) | |
(defn open-ticker ^AutoCloseable [!ag agents] | |
(let [tp (Executors/newScheduledThreadPool 1)] | |
(.scheduleAtFixedRate tp #(send-off !ag tick agents) 0 20 TimeUnit/MILLISECONDS) | |
(reify AutoCloseable | |
(close [_] | |
(.shutdownNow tp) | |
(.awaitTermination tp 1 TimeUnit/SECONDS))))) | |
(defn submit-tx [{:keys [id state leader-id current-term log] :as ag} tx ^CompletableFuture !fut agents] | |
(cond | |
(= state :leader) (let [log-idx (count log)] | |
(.complete !fut {:log-idx log-idx}) | |
(-> ag | |
(update :log conj {:term current-term | |
:log-idx log-idx | |
:tx tx}) | |
(update-in [:next-indices id] inc) | |
(update-in [:match-indices id] inc))) | |
(nil? leader-id) (do | |
(.completeExceptionally !fut (ex-info "no leader" {})) | |
ag) | |
:else (do | |
(send (nth agents leader-id) submit-tx tx !fut agents) | |
ag))) | |
(defn submit-tx&! [tx agents] | |
(let [!fut (CompletableFuture.)] | |
(send (rand-nth agents) submit-tx tx !fut agents) | |
!fut)) | |
(do | |
(def !ag0 (agent {:id 0, :state :follower, :log [], :current-term -1, :commit-idx -1, :timeout-at-ms (new-timeout-ms)})) | |
(def !ag1 (agent {:id 1, :state :follower, :log [], :current-term -1, :commit-idx -1, :timeout-at-ms (new-timeout-ms)})) | |
(def !ag2 (agent {:id 2, :state :follower, :log [], :current-term -1, :commit-idx -1, :timeout-at-ms (new-timeout-ms)})) | |
(def agents [!ag0 !ag1 !ag2]) | |
(with-open [t0 (open-ticker !ag0 agents) | |
t1 (open-ticker !ag1 agents) | |
t2 (open-ticker !ag2 agents)] | |
(Thread/sleep 500) | |
[@(submit-tx&! :my-tx0 agents) | |
@(submit-tx&! :my-tx1 agents) | |
(Thread/sleep 100) | |
agents] | |
)) | |
#_(agent-error !ag0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment