mirror of
https://github.com/status-im/nim-raft.git
synced 2025-02-26 11:55:27 +00:00
Started writing election/voting
This commit is contained in:
parent
66a6d7bc33
commit
07d87c896f
@ -7,15 +7,45 @@
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import types, protocol
|
||||
import types, protocol, log_ops
|
||||
|
||||
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
var
|
||||
votesFuts: seq[Future[void]]
|
||||
|
||||
node.state = rnsCandidate
|
||||
for p in node.peers:
|
||||
p.votedFor = DefaultUUID
|
||||
node.votedFor = node.id
|
||||
|
||||
for peer in node.peers:
|
||||
votesFuts.add(node.msgSendCallback(
|
||||
RaftMessageRequestVote(lastLogTerm: RaftNodeLogEntry(RaftNodeLogIndexGet(node)).term, lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm)
|
||||
)
|
||||
)
|
||||
|
||||
# Process votes
|
||||
for voteFut in votesFuts:
|
||||
await voteFut
|
||||
if voteFut.finished and not voteFut.failed:
|
||||
for p in node.peers:
|
||||
if p.id == voteFut.senderId:
|
||||
if voteFut.granted:
|
||||
p.votedFor = node.id
|
||||
else:
|
||||
if voteFut.votedFor.initialized:
|
||||
p.votedFor = voteFut.votedFor
|
||||
|
||||
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
discard
|
||||
|
||||
proc RaftNodeProcessRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
|
||||
discard
|
||||
proc RaftNodeProcessRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): Future[RaftMessageRequestVoteResponse] =
|
||||
if msg.senderTerm > node.term:
|
||||
if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(RaftNodeLogIndexGet(node)).term:
|
||||
# grant vote
|
||||
discard
|
||||
|
||||
proc RaftNodeProcessAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse =
|
||||
proc RaftNodeProcessAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): Future[RaftMessageAppendEntriesResponse] =
|
||||
discard
|
||||
|
||||
proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool =
|
||||
@ -24,9 +54,6 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
|
||||
proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) =
|
||||
discard
|
||||
|
||||
proc RaftNodeScheduleElectionTimeOut*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
discard
|
||||
|
||||
proc RaftNodeScheduleRequestVotesCleanUpTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
discard
|
||||
|
||||
|
17
raft/log_ops.nim
Normal file
17
raft/log_ops.nim
Normal file
@ -0,0 +1,17 @@
|
||||
# nim-raft
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import types
|
||||
|
||||
# Private Log Ops
|
||||
proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
|
||||
discard
|
||||
|
||||
proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] =
|
||||
discard
|
@ -29,7 +29,8 @@ type
|
||||
senderTerm*: RaftNodeTerm # Sender Raft Node Term
|
||||
|
||||
RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase
|
||||
granted*: bool
|
||||
granted*: bool # Is vote granted?
|
||||
votedFor*: Option[RaftNodeId] # Present if vote is not granted
|
||||
|
||||
RaftMessageAppendEntries*[SmCommandType] = ref object of RaftMessageBase
|
||||
prevLogIndex*: RaftLogIndex
|
||||
|
@ -12,8 +12,9 @@ import chronicles
|
||||
import types
|
||||
import protocol
|
||||
import consensus_module
|
||||
import log_ops
|
||||
|
||||
export types, protocol, consensus_module
|
||||
export types, protocol, consensus_module, log_ops
|
||||
|
||||
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
@ -96,13 +97,6 @@ template RaftTimerCreate(timerInterval: int, oneshot: bool, timerCallback: RaftT
|
||||
mixin RaftTimerCreateCustomImpl
|
||||
RaftTimerCreateCustomImpl(timerInterval, oneshot, timerCallback)
|
||||
|
||||
# Private Log Ops
|
||||
proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
|
||||
discard
|
||||
|
||||
proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] =
|
||||
discard
|
||||
|
||||
# Timers scheduling stuff etc.
|
||||
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
var fut = sleepAsync(node.heartBeatTimeout)
|
||||
|
@ -16,27 +16,22 @@ type
|
||||
BasicRaftNode* = RaftNode[SmCommand, SmState]
|
||||
|
||||
BasicRaftCluster* = ref object
|
||||
nodes*: seq[BasicRaftNode]
|
||||
nodes*: Table[RaftNodeId, BasicRaftNode]
|
||||
|
||||
proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): RaftMessageSendCallback =
|
||||
proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
|
||||
var
|
||||
nodeIdx: int = -1
|
||||
|
||||
for i in 0..cluster.nodes.len:
|
||||
if RaftNodeIdGet(cluster.nodes[i]) == msg.receiverId:
|
||||
nodeIdx = i
|
||||
break
|
||||
|
||||
result = await cluster.nodes[nodeIdx].RaftNodeMessageDeliver(msg)
|
||||
result = await cluster.nodes[msg.receiverId].RaftNodeMessageDeliver(msg)
|
||||
|
||||
proc BasicRaftClusterStart*(cluster: BasicRaftCluster) =
|
||||
for node in cluster.nodes:
|
||||
for id, node in cluster.nodes:
|
||||
RaftNodeStart(node)
|
||||
|
||||
proc BasicRaftClusterGetLeader*(cluster: BasicRaftCluster): UUID =
|
||||
result = DefaultUUID
|
||||
for node in cluster.nodes:
|
||||
for id, node in cluster.nodes:
|
||||
if RaftNodeIsLeader(node):
|
||||
return RaftNodeIdGet(node)
|
||||
|
||||
@ -50,5 +45,5 @@ proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster =
|
||||
peersIds = nodesIds
|
||||
|
||||
peersIds.del(peersIds.find(nodeId))
|
||||
result.nodes.add(BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result)))
|
||||
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result))
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user