Created
March 16, 2016 22:33
-
-
Save jonhoo/ae65c28575b05da1b58e to your computer and use it in GitHub Desktop.
Raft pseudocode
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This file gives pseudocode for the complete operation of a Raft peer, | |
// including the fast backtracking optimization. The implementation here is | |
// currently 0-indexed, as this simplifies the implementation in many cases. | |
// This implementation also does not discuss locks at all, which will be vital | |
// in any real implementation. | |
// | |
// ============================================================================ | |
// The following data needs to be persisted | |
// ============================================================================ | |
// | |
// This is the term this Raft server is currently in | |
currentTerm = 0 | |
// This is the Raft peer that this server has voted for in *this* term (if any) | |
votedFor = None | |
// The log is a list of {term, command} tuples, where the command is an opaque | |
// value which only holds meaning to the replicated state machine running on | |
// top of Raft. | |
log = [] | |
// | |
// ============================================================================ | |
// The following data is ephemeral | |
// ============================================================================ | |
// | |
// The state this server is currently in, can be FOLLOWER, CANDIDATE, or LEADER | |
state = FOLLOWER | |
// The Raft entries up to and including this index are considered committed by | |
// Raft, meaning they will not change, and can safely be applied to the state | |
// machine. | |
commitIndex = -1 | |
// The last command in the log to be applied to the state machine. | |
lastApplied = -1 | |
// nextIndex is a guess as to how much of our log (as leader) matches that of | |
// each other peer. This is used to determine what entries to send to each peer | |
// next. | |
nextIndex = map [..] -> 0 | |
// matchIndex is a measurement of how much of our log (as leader) we know to be | |
// replicated at each other server. This is used to determine when some prefix | |
// of entries in our log from the current term has been replicated to a | |
// majority of servers, and is thus safe to apply. | |
matchIndex = map [..] -> -1 | |
// This function updates the state machine as a result of the command we pass | |
// it. In order to build a replicated state machine, we need to call | |
// stateMachine with the same commands, in the same order, on all servers. | |
stateMachine func(command) | |
// | |
// ============================================================================ | |
// Raft RPC handlers | |
// ============================================================================ | |
// | |
RequestVote(term, candidateID, lastLogIndex, lastLogTerm) | |
-> (term, voteGranted) | |
{ | |
// step down before handling RPC if need be | |
if term > currentTerm { | |
currentTerm = term | |
state = FOLLOWER | |
votedFor = -1 | |
nextIndex = map [..] -> length(log) | |
matchIndex = map [..] -> -1 | |
} | |
// don't vote for out-of-date candidates | |
if term < currentTerm { | |
return (currentTerm, false) | |
} | |
// don't double vote | |
if votedFor != None and votedFor != candidateID { | |
return (currentTerm, false) | |
} | |
// check how up-to-date our log is | |
ourLastLogIndex = length(log) - 1 | |
ourLastLogTerm = -1 | |
if length(log) != 0 { | |
ourLastLogTerm = log[ourLastLogIndex].term | |
} | |
// reject leaders with old logs | |
if lastLogTerm < ourLastLogTerm { | |
return (currentTerm, false) | |
} | |
// reject leaders with short logs | |
if lastLogTerm == ourLastLogTerm and lastLogIndex < ourLastLogIndex { | |
return (currentTerm, false) | |
votedFor = candidateID | |
// TODO: reset election timer | |
// TODO: persist Raft state | |
return (currentTerm, true) | |
} | |
AppendEntries(term, leaderID, prevLogIndex, prevLogTerm, entries[], leaderCommit) | |
-> (term, conflictIndex, conflictTerm, success) | |
{ | |
// step down before handling RPC if need be | |
if term > currentTerm { | |
currentTerm = term | |
state = FOLLOWER | |
votedFor = -1 | |
nextIndex = map [..] -> length(log) | |
matchIndex = map [..] -> -1 | |
} | |
if term < currentTerm { | |
return (currentTerm, -1, -1, false) | |
} | |
// TODO: reset election timer | |
if prevLogIndex >= length(log) { | |
return (currentTerm, length(log), -1, false) | |
} | |
ourPrevLogTerm = log[prevLogIndex].term | |
if prevLogIndex >= 0 and ourPrevLogTerm != prevLogTerm { | |
firstOfTerm = prevLogIndex | |
for i from prevLogIndex to 0 (inclusive) { | |
if log[i].term != ourPrevLogTerm { | |
break | |
} | |
firstOfTerm = i | |
} | |
return (currentTerm, firstOfTerm, ourPrevLogTerm, false) | |
} | |
for i from 0 to length(entries) { | |
index = prevLogIndex + i + 1 | |
if index >= length(log) or log[index].term != entries[i].term { | |
log = log[:index] ++ entries[i:] | |
break | |
} | |
} | |
// TODO: persist Raft state | |
if leaderCommit > commitIndex { | |
commitIndex = length(log) - 1 | |
if commitIndex > leaderCommit { | |
commitIndex = leaderCommit | |
} | |
} | |
if commitIndex > lastApplied { | |
for i from lastApplied+1 to commitIndex (inclusive) { | |
stateMachine(log[i].command) | |
lastApplied = i | |
} | |
} | |
return (currentTerm, -1, -1, true) | |
} | |
// | |
// ============================================================================ | |
// Raft event handlers | |
// ============================================================================ | |
// | |
OnElectionTimer() { | |
if state == LEADER { | |
return | |
} | |
currentTerm += 1 | |
electionTerm = currentTerm | |
votedFor = -1 | |
state = CANDIDATE | |
votes = 0 | |
for each peer p { | |
// NOTE: also request a vote from ourself | |
// NOTE: me here is this server's identifier | |
// NOTE: if the RPC fails, it counts as granted = false | |
// NOTE: these RPCs should be made in parallel | |
term, granted = p.RequestVote(currentTerm, me, length(log)-1, log[-1].term) | |
if term > currentTerm { | |
currentTerm = term | |
state = FOLLOWER | |
votedFor = -1 | |
nextIndex = map [..] -> length(log) | |
matchIndex = map [..] -> -1 | |
} | |
if granted { | |
// TODO: reset election timer | |
votes += 1 | |
} | |
} | |
if currentTerm != electionTerm { | |
return | |
} | |
if votes <= #peers/2 { | |
state = FOLLOWER | |
return | |
} | |
state = LEADER | |
nextIndex = map [..] -> length(log) | |
matchIndex = map [..] -> -1 | |
// TODO: reset election timer | |
// TODO: trigger sending of AppendEntries | |
} | |
OnHeartbeatTimerOrSendTrigger() { | |
// NOTE: it may be useful to have separate timers for each peer, so | |
// that you can retry AppendEntries to one peer without sending to all | |
// peers. | |
if state != LEADER { | |
return | |
} | |
for each peer (including self) { | |
// NOTE: do this in parallel for each peer | |
if nextIndex[peer] > length(log) { | |
rf.nextIndex[peer] = length(log) | |
} | |
entries = log[nextIndex[peer]:] | |
prevLogIndex = rf.nextIndex[peer] - 1 | |
prevLogTerm = -1 | |
if prevLogIndex >= 0 { | |
prevLogTerm = log[prevLogIndex].term | |
} | |
sendTerm = currentTerm | |
// NOTE: if length(entries) == 0, you may want to check that we | |
// haven't sent this peer an AppendEntries recently. If we | |
// have, just return. | |
// NOTE: if the RPC fails, stop processing for this peer, but | |
// trigger sending AppendEntries again immediately. | |
term, conflictIndex, conflictTerm, success | |
= peer.AppendEntries(sendTerm, me, prevLogIndex, prevLogTerm, entries, commitIndex) | |
if term > currentTerm { | |
currentTerm = term | |
state = FOLLOWER | |
votedFor = -1 | |
nextIndex = map [..] -> length(log) | |
matchIndex = map [..] -> -1 | |
} | |
if currentTerm != sendTerm { | |
return | |
} | |
if !success { | |
nextIndex[peer] = conflictIndex | |
if conflictTerm != -1 { | |
ourLastInConflictTerm = -1 | |
for i from prevLogIndex to 0 (inclusive) { | |
if log[i].term == conflictTerm { | |
ourLastInConflictTerm = i | |
break | |
} else if log[i].term < conflictTerm { | |
break | |
} | |
} | |
if ourLastInConflictTerm != -1 { | |
nextIndex[peer] = ourLastInConflictTerm + 1 | |
} | |
} | |
// TODO: Trigger sending AppendEntries again immediately | |
return | |
} | |
matchIndex[peer] = prevLogIndex + length(entries) | |
nextIndex[peer] = matchIndex[peer] + 1 | |
for n from length(log)-1 to commitIndex { | |
if log[n].term != currentTerm { | |
break | |
} | |
replicas = 0 | |
for each peer { | |
if matchIndex[peer] >= n { | |
replicas += 1 | |
} | |
} | |
if replicas > #peers/2 { | |
commitIndex = n | |
break | |
} | |
} | |
} | |
} | |
OnStart(command) | |
-> (accepted, willCommitAt) | |
{ | |
if state != LEADER { | |
return (false, -1) | |
} | |
log = log ++ [(currentTerm, command)] | |
nextIndex[me] = length(log) | |
matchIndex[me] = length(log) - 1 | |
// TODO: persist Raft state | |
// TODO: trigger sending of AppendEntries | |
return true, length(log) - 1 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This implementation is incorrect. Quoting paragraph 3 from page 6 of the Raft paper
You never have this check in the AppendEntries handler.