diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index 1f62e4e..fc76025 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -7,15 +7,45 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import types, protocol +import types, protocol, log_ops proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + var + votesFuts: seq[Future[void]] + + node.state = rnsCandidate + for p in node.peers: + p.votedFor = DefaultUUID + node.votedFor = node.id + + for peer in node.peers: + votesFuts.add(node.msgSendCallback( + RaftMessageRequestVote(lastLogTerm: RaftNodeLogEntry(RaftNodeLogIndexGet(node)).term, lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm) + ) + ) + + # Process votes + for voteFut in votesFuts: + await voteFut + if voteFut.finished and not voteFut.failed: + for p in node.peers: + if p.id == voteFut.senderId: + if voteFut.granted: + p.votedFor = node.id + else: + if voteFut.votedFor.initialized: + p.votedFor = voteFut.votedFor + +proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = discard -proc RaftNodeProcessRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse = - discard +proc RaftNodeProcessRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): Future[RaftMessageRequestVoteResponse] = + if msg.senderTerm > node.term: + if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(RaftNodeLogIndexGet(node)).term: + # grant vote + discard -proc RaftNodeProcessAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse = +proc RaftNodeProcessAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): Future[RaftMessageAppendEntriesResponse] = discard proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = @@ -24,9 +54,6 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) = discard -proc RaftNodeScheduleElectionTimeOut*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - discard - proc RaftNodeScheduleRequestVotesCleanUpTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = discard diff --git a/raft/log_ops.nim b/raft/log_ops.nim new file mode 100644 index 0000000..fc6c1e6 --- /dev/null +++ b/raft/log_ops.nim @@ -0,0 +1,17 @@ +# nim-raft +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import types + +# Private Log Ops +proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex = + discard + +proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] = + discard \ No newline at end of file diff --git a/raft/protocol.nim b/raft/protocol.nim index 19ca68c..330e349 100644 --- a/raft/protocol.nim +++ b/raft/protocol.nim @@ -29,7 +29,8 @@ type senderTerm*: RaftNodeTerm # Sender Raft Node Term RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase - granted*: bool + granted*: bool # Is vote granted? + votedFor*: Option[RaftNodeId] # Present if vote is not granted RaftMessageAppendEntries*[SmCommandType] = ref object of RaftMessageBase prevLogIndex*: RaftLogIndex diff --git a/raft/raft_api.nim b/raft/raft_api.nim index 1d93f06..5c01e74 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -12,8 +12,9 @@ import chronicles import types import protocol import consensus_module +import log_ops -export types, protocol, consensus_module +export types, protocol, consensus_module, log_ops proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) @@ -96,13 +97,6 @@ template RaftTimerCreate(timerInterval: int, oneshot: bool, timerCallback: RaftT mixin RaftTimerCreateCustomImpl RaftTimerCreateCustomImpl(timerInterval, oneshot, timerCallback) -# Private Log Ops -proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex = - discard - -proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] = - discard - # Timers scheduling stuff etc. proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = var fut = sleepAsync(node.heartBeatTimeout) diff --git a/tests/basic_cluster.nim b/tests/basic_cluster.nim index 6c38cee..2ab5ea6 100644 --- a/tests/basic_cluster.nim +++ b/tests/basic_cluster.nim @@ -16,27 +16,22 @@ type BasicRaftNode* = RaftNode[SmCommand, SmState] BasicRaftCluster* = ref object - nodes*: seq[BasicRaftNode] + nodes*: Table[RaftNodeId, BasicRaftNode] proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): RaftMessageSendCallback = proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = var nodeIdx: int = -1 - for i in 0..cluster.nodes.len: - if RaftNodeIdGet(cluster.nodes[i]) == msg.receiverId: - nodeIdx = i - break - - result = await cluster.nodes[nodeIdx].RaftNodeMessageDeliver(msg) + result = await cluster.nodes[msg.receiverId].RaftNodeMessageDeliver(msg) proc BasicRaftClusterStart*(cluster: BasicRaftCluster) = - for node in cluster.nodes: + for id, node in cluster.nodes: RaftNodeStart(node) proc BasicRaftClusterGetLeader*(cluster: BasicRaftCluster): UUID = result = DefaultUUID - for node in cluster.nodes: + for id, node in cluster.nodes: if RaftNodeIsLeader(node): return RaftNodeIdGet(node) @@ -50,5 +45,5 @@ proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster = peersIds = nodesIds peersIds.del(peersIds.find(nodeId)) - result.nodes.add(BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result))) + result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result))