Fix typos
This commit is contained in:
parent
c96adcfa10
commit
d5a83ae19e
|
@ -18,9 +18,11 @@ import std/random
|
|||
type
|
||||
RaftRpcMessageType* = enum
|
||||
VoteRequest = 0,
|
||||
VoteReplay = 1,
|
||||
VoteReply = 1,
|
||||
AppendRequest = 2,
|
||||
AppendReplay = 3
|
||||
AppendReply = 3,
|
||||
InstallSnapshot = 4,
|
||||
SnapshotReply = 5
|
||||
|
||||
RaftRpcCode* = enum
|
||||
Rejected = 0,
|
||||
|
@ -49,19 +51,19 @@ type
|
|||
commitIndex*: RaftLogIndex
|
||||
entries*: seq[LogEntry]
|
||||
|
||||
RaftRpcAppendReplayRejected* = object
|
||||
RaftRpcAppendReplyRejected* = object
|
||||
nonMatchingIndex: RaftLogIndex
|
||||
lastIdx: RaftLogIndex
|
||||
|
||||
RaftRpcAppendReplayAccepted* = object
|
||||
RaftRpcAppendReplyAccepted* = object
|
||||
lastNewIndex: RaftLogIndex
|
||||
|
||||
RaftRpcAppendReplay* = object
|
||||
RaftRpcAppendReply* = object
|
||||
commitIndex: RaftLogIndex
|
||||
term: RaftNodeTerm
|
||||
case result: RaftRpcCode:
|
||||
of Accepted: accepted: RaftRpcAppendReplayAccepted
|
||||
of Rejected: rejected: RaftRpcAppendReplayRejected
|
||||
of Accepted: accepted: RaftRpcAppendReplyAccepted
|
||||
of Rejected: rejected: RaftRpcAppendReplyRejected
|
||||
|
||||
RaftRpcVoteRequest* = object
|
||||
currentTerm*: RaftNodeTerm
|
||||
|
@ -69,19 +71,35 @@ type
|
|||
lastLogTerm*: RaftNodeTerm
|
||||
force*: bool
|
||||
|
||||
RaftRpcVoteReplay* = object
|
||||
RaftRpcVoteReply* = object
|
||||
currentTerm*: RaftNodeTerm
|
||||
voteGranted*: bool
|
||||
|
||||
RaftSnapshot* = object
|
||||
index: RaftLogIndex
|
||||
term: RaftNodeTerm
|
||||
config: RaftConfig
|
||||
snapshotId: RaftSnapshotId
|
||||
|
||||
RaftInstallSnapshot* = object
|
||||
term: RaftNodeTerm
|
||||
snapshot: RaftSnapshot
|
||||
|
||||
RaftSnapshotReply* = object
|
||||
term: RaftNodeTerm
|
||||
success: bool
|
||||
|
||||
RaftRpcMessage* = object
|
||||
currentTerm*: RaftNodeTerm
|
||||
sender*: RaftNodeId
|
||||
receiver*: RaftNodeId
|
||||
case kind*: RaftRpcMessageType
|
||||
of VoteRequest: voteRequest*: RaftRpcVoteRequest
|
||||
of VoteReplay: voteReplay*: RaftRpcVoteReplay
|
||||
of VoteReply: voteReply*: RaftRpcVoteReply
|
||||
of AppendRequest: appendRequest*: RaftRpcAppendRequest
|
||||
of AppendReplay: appendReplay*: RaftRpcAppendReplay
|
||||
of AppendReply: appendReply*: RaftRpcAppendReply
|
||||
of InstallSnapshot: installSnapshot*: RaftInstallSnapshot
|
||||
of SnapshotReply: snapshotReply*: RaftSnapshotReply
|
||||
|
||||
RaftStateMachineOutput* = object
|
||||
logEntries*: seq[LogEntry]
|
||||
|
@ -170,17 +188,24 @@ func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option
|
|||
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) =
|
||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendRequest, appendRequest: request))
|
||||
|
||||
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReplay) =
|
||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReplay, appendReplay: request))
|
||||
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReply) =
|
||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReply, appendReply: request))
|
||||
|
||||
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) =
|
||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteRequest, voteRequest: request))
|
||||
|
||||
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReplay) =
|
||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReplay, voteReplay: request))
|
||||
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReply) =
|
||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReply, voteReply: request))
|
||||
|
||||
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftSnapshotReply) =
|
||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.SnapshotReply, snapshotReply: request))
|
||||
|
||||
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftInstallSnapshot) =
|
||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.InstallSnapshot, installSnapshot: request))
|
||||
|
||||
|
||||
func sendTo[MsgType](sm: var RaftStateMachine, id: RaftNodeId, request: MsgType) =
|
||||
sm.debug "Sent to" & $id & $request
|
||||
sm.debug "Send to" & $id & $request
|
||||
if sm.state.isLeader:
|
||||
var follower = sm.findFollowerProggressById(id)
|
||||
if follower.isSome:
|
||||
|
@ -351,18 +376,18 @@ func commit*(sm: var RaftStateMachine) =
|
|||
else:
|
||||
break
|
||||
|
||||
func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: RaftRpcAppendReplay) =
|
||||
func appendEntryReply*(sm: var RaftStateMachine, fromId: RaftNodeId, reply: RaftRpcAppendReply) =
|
||||
if not sm.state.isLeader:
|
||||
sm.debug "You can't append append replay to the follower"
|
||||
sm.debug "You can't append append reply to the follower"
|
||||
return
|
||||
var follower = sm.findFollowerProggressById(fromId)
|
||||
if not follower.isSome:
|
||||
sm.debug "Can't find the follower"
|
||||
return
|
||||
follower.get().commitIndex = max(follower.get().commitIndex, replay.commitIndex)
|
||||
case replay.result:
|
||||
follower.get().commitIndex = max(follower.get().commitIndex, reply.commitIndex)
|
||||
case reply.result:
|
||||
of RaftRpcCode.Accepted:
|
||||
let lastIndex = replay.accepted.lastNewIndex
|
||||
let lastIndex = reply.accepted.lastNewIndex
|
||||
sm.debug "Accpeted" & $fromId & " " & $lastIndex
|
||||
follower.get().accepted(lastIndex)
|
||||
# TODO: add leader stepping down logic here
|
||||
|
@ -370,9 +395,9 @@ func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: Ra
|
|||
if not sm.state.isLeader:
|
||||
return
|
||||
of RaftRpcCode.Rejected:
|
||||
if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0:
|
||||
if reply.rejected.nonMatchingIndex == 0 and reply.rejected.lastIdx == 0:
|
||||
sm.replicateTo(follower.get())
|
||||
follower.get().nextIndex = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1)
|
||||
follower.get().nextIndex = min(reply.rejected.nonMatchingIndex, reply.rejected.lastIdx + 1)
|
||||
# if commit apply configuration that removes current follower
|
||||
# we should take it again
|
||||
var follower2 = sm.findFollowerProggressById(fromId)
|
||||
|
@ -391,27 +416,27 @@ func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpc
|
|||
return
|
||||
let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm)
|
||||
if not match:
|
||||
let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex)
|
||||
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
|
||||
let rejected = RaftRpcAppendReplyRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex)
|
||||
let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
|
||||
sm.sendTo(fromId, responce)
|
||||
sm.debug "Reject to apply the entry"
|
||||
for entry in request.entries:
|
||||
sm.log.appendAsFollower(entry)
|
||||
sm.advanceCommitIdx(request.commitIndex)
|
||||
let accepted = RaftRpcAppendReplayAccepted(lastNewIndex: sm.log.lastIndex)
|
||||
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted)
|
||||
let accepted = RaftRpcAppendReplyAccepted(lastNewIndex: sm.log.lastIndex)
|
||||
let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted)
|
||||
sm.sendTo(fromId, responce)
|
||||
|
||||
func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteRequest) =
|
||||
let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId())
|
||||
if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm):
|
||||
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true)
|
||||
let responce = RaftRpcVoteReply(currentTerm: sm.term, voteGranted: true)
|
||||
sm.sendTo(fromId, responce)
|
||||
else:
|
||||
let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false)
|
||||
let responce: RaftRpcVoteReply = RaftRpcVoteReply(currentTerm: sm.term, voteGranted: false)
|
||||
sm.sendTo(fromId, responce)
|
||||
|
||||
func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReplay) =
|
||||
func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReply) =
|
||||
if not sm.state.isCandidate:
|
||||
sm.debug "Non candidate can't handle votes"
|
||||
return
|
||||
|
@ -441,8 +466,8 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
|
|||
elif msg.currentTerm < sm.term:
|
||||
if msg.kind == RaftRpcMessageType.AppendRequest:
|
||||
# Instruct leader to step down
|
||||
let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex)
|
||||
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
|
||||
let rejected = RaftRpcAppendReplyRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex)
|
||||
let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
|
||||
sm.sendTo(msg.sender, responce)
|
||||
|
||||
sm.warning "Ignore message with lower term"
|
||||
|
@ -457,9 +482,9 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
|
|||
if sm.state.isCandidate:
|
||||
if msg.kind == RaftRpcMessageType.VoteRequest:
|
||||
sm.requestVote(msg.sender, msg.voteRequest)
|
||||
elif msg.kind == RaftRpcMessageType.VoteReplay:
|
||||
elif msg.kind == RaftRpcMessageType.VoteReply:
|
||||
sm.debug "Apply vote"
|
||||
sm.requestVoteReply(msg.sender, msg.voteReplay)
|
||||
sm.requestVoteReply(msg.sender, msg.voteReply)
|
||||
else:
|
||||
sm.warning "Candidate ignore message"
|
||||
elif sm.state.isFollower:
|
||||
|
@ -475,8 +500,8 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
|
|||
elif sm.state.isLeader:
|
||||
if msg.kind == RaftRpcMessageType.AppendRequest:
|
||||
sm.warning "Ignore message leader append his entries directly"
|
||||
elif msg.kind == RaftRpcMessageType.AppendReplay:
|
||||
sm.appendEntryReplay(msg.sender, msg.appendReplay)
|
||||
elif msg.kind == RaftRpcMessageType.AppendReply:
|
||||
sm.appendEntryReply(msg.sender, msg.appendReply)
|
||||
elif msg.kind == RaftRpcMessageType.VoteRequest:
|
||||
sm.requestVote(msg.sender, msg.voteRequest)
|
||||
else:
|
||||
|
|
|
@ -29,5 +29,6 @@ type
|
|||
RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node
|
||||
RaftNodeTerm* = int # Raft Node Term Type
|
||||
RaftLogIndex* = int # Raft Node Log Index Type
|
||||
RaftSnapshotId* = int
|
||||
RaftConfig* = object
|
||||
currentSet*: seq[RaftNodeId]
|
|
@ -90,6 +90,8 @@ proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel =
|
|||
else:
|
||||
if DebugLogLevel.Debug <= logLevel:
|
||||
echo "[" & now.format("HH:mm:ss:fff") & "] rpc message is blocked: " & $msg & $tc.blockedMsgRoutingSet
|
||||
for commit in output.committed:
|
||||
echo $commit
|
||||
debugLogs.sort(cmpLogs)
|
||||
for msg in debugLogs:
|
||||
if msg.level <= logLevel:
|
||||
|
@ -97,6 +99,8 @@ proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel =
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
func getLeader(tc: TestCluster): Option[RaftStateMachine] =
|
||||
var leader = none(RaftStateMachine)
|
||||
for id, node in tc.nodes:
|
||||
|
@ -275,8 +279,8 @@ proc consensusstatemachineMain*() =
|
|||
|
||||
timeNow += 1.milliseconds
|
||||
block:
|
||||
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
|
||||
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: true)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
|
||||
check sm.state.isCandidate
|
||||
sm.advance(msg, timeNow)
|
||||
output = sm.poll()
|
||||
|
@ -287,8 +291,8 @@ proc consensusstatemachineMain*() =
|
|||
|
||||
# Older messages should be ignored
|
||||
block:
|
||||
let voteRaplay = RaftRpcVoteReplay(currentTerm: (output.term - 1), voteGranted: true)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
|
||||
let voteRaplay = RaftRpcVoteReply(currentTerm: (output.term - 1), voteGranted: true)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
|
||||
sm.advance(msg, timeNow)
|
||||
output = sm.poll()
|
||||
check output.stateChange == false
|
||||
|
@ -329,8 +333,8 @@ proc consensusstatemachineMain*() =
|
|||
check output.votedFor.get() == mainNodeId
|
||||
timeNow += 1.milliseconds
|
||||
block:
|
||||
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
|
||||
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
|
||||
check sm.state.isCandidate
|
||||
sm.advance(msg, timeNow)
|
||||
output = sm.poll()
|
||||
|
@ -339,8 +343,8 @@ proc consensusstatemachineMain*() =
|
|||
|
||||
timeNow += 1.milliseconds
|
||||
block:
|
||||
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
|
||||
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
|
||||
check sm.state.isCandidate
|
||||
sm.advance(msg, timeNow)
|
||||
output = sm.poll()
|
||||
|
@ -366,8 +370,8 @@ proc consensusstatemachineMain*() =
|
|||
check output.votedFor.get() == mainNodeId
|
||||
timeNow += 1.milliseconds
|
||||
block:
|
||||
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
|
||||
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
|
||||
check sm.state.isCandidate
|
||||
sm.advance(msg, timeNow)
|
||||
output = sm.poll()
|
||||
|
@ -376,8 +380,8 @@ proc consensusstatemachineMain*() =
|
|||
|
||||
timeNow += 1.milliseconds
|
||||
block:
|
||||
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
|
||||
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: true)
|
||||
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
|
||||
check sm.state.isCandidate
|
||||
sm.advance(msg, timeNow)
|
||||
output = sm.poll()
|
||||
|
|
Loading…
Reference in New Issue