From f95937107ae1755ea7eb214e99b760a2aaf41667 Mon Sep 17 00:00:00 2001 From: Raycho Mukelov Date: Fri, 1 Sep 2023 05:55:55 +0300 Subject: [PATCH] Introduce async messaging --- raft.nimble | 3 ++- raft/consensus_module.nim | 9 ++++++--- raft/protocol.nim | 2 ++ raft/raft_api.nim | 6 ++++-- raft/types.nim | 16 ++++++---------- tests/basic_cluster.nim | 4 ++-- 6 files changed, 22 insertions(+), 18 deletions(-) diff --git a/raft.nimble b/raft.nimble index 0700dc4..f28daac 100644 --- a/raft.nimble +++ b/raft.nimble @@ -20,6 +20,7 @@ requires "nim >= 1.6.14" requires "stew >= 0.1.0" requires "unittest2 >= 0.0.4" requires "uuids >= 0.1.11" +requires "chronicles >= 0.10.3" proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = if not dirExists "build": @@ -47,7 +48,7 @@ proc test(path: string, name: string, params = "", lang = "c") = exec runPrefix & "build/" & name task test, "Run tests": - test "tests", "all_tests", "-d:chronicles_log_level=ERROR -d:unittest2DisableParamFiltering" + test "tests", "all_tests", "-d:chronicles_sinks=textlines -d:chronicles_log_level=ERROR -d:unittest2DisableParamFiltering" # Helper functions \ No newline at end of file diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index 063ee45..9b664c0 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -10,11 +10,14 @@ import protocol import types -proc RaftConsensusProcessRequestVote*(consensus: RaftConsensusModule, msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse = +proc RaftNodeConsensusStartElection*(consensus: RaftConsensusModule) = discard -proc RaftConsensusProcessAppendEntries*(consensus: RaftConsensusModule, msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse = +proc RaftNodeConsensusProcessRequestVote*(consensus: RaftConsensusModule, msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse = discard -proc RaftConsensusQuorumMin(consensus: RaftConsensusModule): bool = +proc RaftNodeConsensusProcessAppendEntries*(consensus: RaftConsensusModule, msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse = + discard + +proc RaftNodeConsensusQuorumMin(consensus: RaftConsensusModule): bool = discard \ No newline at end of file diff --git a/raft/protocol.nim b/raft/protocol.nim index c288ae9..d442784 100644 --- a/raft/protocol.nim +++ b/raft/protocol.nim @@ -26,6 +26,7 @@ type RaftMessageRequestVote* = ref object of RaftMessageBase lastLogTerm*: RaftNodeTerm lastLogIndex*: RaftLogIndex + senderTerm*: RaftNodeTerm # Sender Raft Node Term RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase granted*: bool @@ -35,6 +36,7 @@ type prevLogTerm*: RaftNodeTerm commitIndex*: RaftLogIndex logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat + senderTerm*: RaftNodeTerm # Sender Raft Node Term RaftMessageAppendEntriesResponse*[SmStateType] = ref object of RaftMessageResponseBase success*: bool diff --git a/raft/raft_api.nim b/raft/raft_api.nim index 1ffffaf..13e0200 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -10,6 +10,8 @@ import types import protocol import consensus_module +import chronicles +import asyncdispatch export types, protocol, consensus_module @@ -62,11 +64,11 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, node.state == rnsLeader # Deliver Raft Message to the Raft Node and dispatch it -proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): RaftMessageResponseBase = +proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = discard # Process RaftNodeClientRequests -proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): RaftNodeClientResponse[SmStateType] = +proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} = discard # Abstract State Machine Ops diff --git a/raft/types.nim b/raft/types.nim index c758f59..eee6b6d 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -13,6 +13,7 @@ import std/locks import options import stew/results import uuids +import asyncdispatch export results, options, locks, uuids @@ -73,17 +74,12 @@ type RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages msgId*: RaftMessageId # Message UUID senderId*: RaftNodeId # Sender Raft Node ID - receiverId*: RaftNodeId # Receiver Raft Node ID - senderTerm*: RaftNodeTerm # Sender Raft Node Term + receiverId*: RaftNodeId # Receiver Raft Node ID - RaftMessageResponseBase* = ref object of RootObj - msgId*: RaftMessageId # Original Message ID - senderId*: RaftNodeId # Sender Raft Node ID - respondentId: RaftNodeId # Responding RaftNodeId - senderTerm*: RaftNodeTerm # Sender Raft Node Term + RaftMessageResponseBase* = ref object of RaftMessageBase - RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase): RaftMessageResponseBase {.gcsafe.} # Callback for Sending Raft Node Messages - # out of this Raft Node. + RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} # Callback for Sending Raft Node Messages + # out of this Raft Node. # For later use when adding/removing new nodes (dynamic configuration chganges) RaftNodeConfiguration* = object @@ -115,7 +111,7 @@ type timeout*: int oneshot*: bool - RaftTimerCallback* = proc (timer: RaftTimer) {.nimcall, gcsafe.} # Pass any function wrapped in a closure + RaftTimerCallback* = proc (timer: RaftTimer) {.gcsafe.} # Pass any function wrapped in a closure # Raft Node Object type RaftNode*[SmCommandType, SmStateType] = ref object diff --git a/tests/basic_cluster.nim b/tests/basic_cluster.nim index a591942..6c38cee 100644 --- a/tests/basic_cluster.nim +++ b/tests/basic_cluster.nim @@ -19,7 +19,7 @@ type nodes*: seq[BasicRaftNode] proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): RaftMessageSendCallback = - proc (msg: RaftMessageBase): RaftMessageResponseBase {.closure.} = + proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = var nodeIdx: int = -1 @@ -28,7 +28,7 @@ proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): R nodeIdx = i break - cluster.nodes[nodeIdx].RaftNodeMessageDeliver(msg) + result = await cluster.nodes[nodeIdx].RaftNodeMessageDeliver(msg) proc BasicRaftClusterStart*(cluster: BasicRaftCluster) = for node in cluster.nodes: