Introduce async messaging

This commit is contained in:
Raycho Mukelov 2023-09-01 05:55:55 +03:00
parent ad5dd67560
commit f95937107a
6 changed files with 22 additions and 18 deletions

View File

@ -20,6 +20,7 @@ requires "nim >= 1.6.14"
requires "stew >= 0.1.0"
requires "unittest2 >= 0.0.4"
requires "uuids >= 0.1.11"
requires "chronicles >= 0.10.3"
proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
if not dirExists "build":
@ -47,7 +48,7 @@ proc test(path: string, name: string, params = "", lang = "c") =
exec runPrefix & "build/" & name
task test, "Run tests":
test "tests", "all_tests", "-d:chronicles_log_level=ERROR -d:unittest2DisableParamFiltering"
test "tests", "all_tests", "-d:chronicles_sinks=textlines -d:chronicles_log_level=ERROR -d:unittest2DisableParamFiltering"
# Helper functions

View File

@ -10,11 +10,14 @@
import protocol
import types
proc RaftConsensusProcessRequestVote*(consensus: RaftConsensusModule, msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
proc RaftNodeConsensusStartElection*(consensus: RaftConsensusModule) =
discard
proc RaftConsensusProcessAppendEntries*(consensus: RaftConsensusModule, msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse =
proc RaftNodeConsensusProcessRequestVote*(consensus: RaftConsensusModule, msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
discard
proc RaftConsensusQuorumMin(consensus: RaftConsensusModule): bool =
proc RaftNodeConsensusProcessAppendEntries*(consensus: RaftConsensusModule, msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse =
discard
proc RaftNodeConsensusQuorumMin(consensus: RaftConsensusModule): bool =
discard

View File

@ -26,6 +26,7 @@ type
RaftMessageRequestVote* = ref object of RaftMessageBase
lastLogTerm*: RaftNodeTerm
lastLogIndex*: RaftLogIndex
senderTerm*: RaftNodeTerm # Sender Raft Node Term
RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase
granted*: bool
@ -35,6 +36,7 @@ type
prevLogTerm*: RaftNodeTerm
commitIndex*: RaftLogIndex
logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat
senderTerm*: RaftNodeTerm # Sender Raft Node Term
RaftMessageAppendEntriesResponse*[SmStateType] = ref object of RaftMessageResponseBase
success*: bool

View File

@ -10,6 +10,8 @@
import types
import protocol
import consensus_module
import chronicles
import asyncdispatch
export types, protocol, consensus_module
@ -62,11 +64,11 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
node.state == rnsLeader
# Deliver Raft Message to the Raft Node and dispatch it
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): RaftMessageResponseBase =
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
discard
# Process RaftNodeClientRequests
proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): RaftNodeClientResponse[SmStateType] =
proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
discard
# Abstract State Machine Ops

View File

@ -13,6 +13,7 @@ import std/locks
import options
import stew/results
import uuids
import asyncdispatch
export results, options, locks, uuids
@ -74,15 +75,10 @@ type
msgId*: RaftMessageId # Message UUID
senderId*: RaftNodeId # Sender Raft Node ID
receiverId*: RaftNodeId # Receiver Raft Node ID
senderTerm*: RaftNodeTerm # Sender Raft Node Term
RaftMessageResponseBase* = ref object of RootObj
msgId*: RaftMessageId # Original Message ID
senderId*: RaftNodeId # Sender Raft Node ID
respondentId: RaftNodeId # Responding RaftNodeId
senderTerm*: RaftNodeTerm # Sender Raft Node Term
RaftMessageResponseBase* = ref object of RaftMessageBase
RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase): RaftMessageResponseBase {.gcsafe.} # Callback for Sending Raft Node Messages
RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} # Callback for Sending Raft Node Messages
# out of this Raft Node.
# For later use when adding/removing new nodes (dynamic configuration chganges)
@ -115,7 +111,7 @@ type
timeout*: int
oneshot*: bool
RaftTimerCallback* = proc (timer: RaftTimer) {.nimcall, gcsafe.} # Pass any function wrapped in a closure
RaftTimerCallback* = proc (timer: RaftTimer) {.gcsafe.} # Pass any function wrapped in a closure
# Raft Node Object type
RaftNode*[SmCommandType, SmStateType] = ref object

View File

@ -19,7 +19,7 @@ type
nodes*: seq[BasicRaftNode]
proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): RaftMessageSendCallback =
proc (msg: RaftMessageBase): RaftMessageResponseBase {.closure.} =
proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
var
nodeIdx: int = -1
@ -28,7 +28,7 @@ proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): R
nodeIdx = i
break
cluster.nodes[nodeIdx].RaftNodeMessageDeliver(msg)
result = await cluster.nodes[nodeIdx].RaftNodeMessageDeliver(msg)
proc BasicRaftClusterStart*(cluster: BasicRaftCluster) =
for node in cluster.nodes: