From 66a6d7bc33cefcf294e9d769e03d4c0def110561 Mon Sep 17 00:00:00 2001 From: Raycho Mukelov Date: Sun, 3 Sep 2023 03:53:48 +0300 Subject: [PATCH] Add more consensus code etc. --- raft.nimble | 1 + raft/async_util.nim | 46 ++++++++++++++++++++++++++++++++++++ raft/consensus_module.nim | 6 +++++ raft/raft_api.nim | 47 ++++++++++++++++++++----------------- raft/types.nim | 21 +++++++++-------- tests/basic_timers.nim | 10 ++------ tests/test_basic_timers.nim | 25 ++++++++++++-------- 7 files changed, 107 insertions(+), 49 deletions(-) create mode 100644 raft/async_util.nim diff --git a/raft.nimble b/raft.nimble index f28daac..828f868 100644 --- a/raft.nimble +++ b/raft.nimble @@ -21,6 +21,7 @@ requires "stew >= 0.1.0" requires "unittest2 >= 0.0.4" requires "uuids >= 0.1.11" requires "chronicles >= 0.10.3" +requires "chronos >= 3.0.11" proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = if not dirExists "build": diff --git a/raft/async_util.nim b/raft/async_util.nim new file mode 100644 index 0000000..49ca195 --- /dev/null +++ b/raft/async_util.nim @@ -0,0 +1,46 @@ +# nim-raft +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. +import asyncdispatch +import std/time + +template awaitWithTimeout[T](operation: Future[T], + deadline: Future[void], + onTimeout: untyped): T = + let f = operation + await f or deadline + if not f.finished: + # If we don't wait for for the cancellation here, it's possible that + # the "next" operation will run concurrently to this one, messing up + # the order of operations (since await/async is not fair) + await cancelAndWait(f) + onTimeout + else: + f.read + +template awaitWithTimeout[T](operation: Future[T], + timeout: Duration, + onTimeout: untyped): T = + awaitWithTimeout(operation, sleepAsync(timeout), onTimeout) + +template awaitWithTimeout(operation: Future[void], + deadline: Future[void], + onTimeout: untyped) = + let f = operation + await f or deadline + if not f.finished: + # If we don't wait for for the cancellation here, it's possible that + # the "next" operation will run concurrently to this one, messing up + # the order of operations (since await/async is not fair) + await cancelAndWait(f) + onTimeout + +template awaitWithTimeout(operation: Future[void], + timeout: Duration, + onTimeout: untyped) = + awaitWithTimeout(operation, sleepAsync(timeout), onTimeout) \ No newline at end of file diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index 70cd498..1f62e4e 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -28,4 +28,10 @@ proc RaftNodeScheduleElectionTimeOut*[SmCommandType, SmStateType](node: RaftNode discard proc RaftNodeScheduleRequestVotesCleanUpTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + discard + +proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = + discard + +proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) = discard \ No newline at end of file diff --git a/raft/raft_api.nim b/raft/raft_api.nim index fbe7a02..1d93f06 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -17,7 +17,7 @@ export types, protocol, consensus_module proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) -proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) +proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.} # Raft Node Public API procedures / functions proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node @@ -60,21 +60,24 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, # Deliver Raft Message to the Raft Node and dispatch it proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = - case raftMessage.type: - of RaftMessageAppendEntries: # Dispatch different Raft Message types - discard - of RaftMessageRequestVote: - discard + # case raftMessage.type + # of RaftMessageAppendEntries: # Dispatch different Raft Message types + # discard + # of RaftMessageRequestVote: + # discard + # else: discard + discard # Process RaftNodeClientRequests proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} = - case req.op: + case req.op of rncroExecSmCommand: # TODO: implemenmt command handling discard of rncroRequestSmState: if RaftNodeIsLeader(node): return RaftNodeClientResponse(error: rncreSuccess, state: RaftNodeStateGet(node)) + else: discard # Abstract State Machine Ops func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType = @@ -94,12 +97,6 @@ template RaftTimerCreate(timerInterval: int, oneshot: bool, timerCallback: RaftT RaftTimerCreateCustomImpl(timerInterval, oneshot, timerCallback) # Private Log Ops -proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = - discard - -proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) = - discard - proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex = discard @@ -111,11 +108,11 @@ proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCom var fut = sleepAsync(node.heartBeatTimeout) fut.callback = proc () = RaftNodeSendHeartBeat(node) -proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = +proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.} = node.heartBeatTimeoutTimer = sleepAsync(node.heartBeatTimeout) - node.heartBeatTimeoutTimer.callback = proc() = - node.state = rnsCandidate # Transition to candidate state and initiate new Election - RaftNodeStartElection(node) + await node.heartBeatTimeoutTimer + node.state = rnsCandidate # Transition to candidate state and initiate new Election + RaftNodeStartElection(node) proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = for raftPeer in node.peers: @@ -128,11 +125,19 @@ proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommand RaftNodeScheduleHeartBeat(node) # Raft Node Control +proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + node.requestVotesTimer.fail(newException(Exception, "fail")) + node.heartBeatTimer.fail(newException(Exception, "fail")) + node.heartBeatTimeoutTimer.fail(newException(Exception, "fail")) + node.appendEntriesTimer.fail(newException(Exception, "fail")) + proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - discard + # Try to stop gracefully + node.state = rnsStopped + # Cancel pending timers (if any) + RaftNodeCancelAllTimers(node) proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - if node.state != rnsFollower: - raiseAssert "Something's wrong - Follower state expected at start!" - RaftNodeScheduleHeartBeatTimeout(node) + node.state = rnsFollower + asyncSpawn RaftNodeScheduleHeartBeatTimeout(node) debugEcho "StartNode: ", node.id \ No newline at end of file diff --git a/raft/types.nim b/raft/types.nim index 8d44d74..7e77276 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -13,9 +13,9 @@ import std/locks import options import stew/results import uuids -import asyncdispatch +import chronos -export results, options, locks, uuids, asyncdispatch +export results, options, locks, uuids, chronos const DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000 @@ -25,7 +25,8 @@ type rnsUnknown = 0, rnsFollower = 1, rnsCandidate = 2 - rnsLeader = 3 + rnsLeader = 3, + rnsStopped = 4 RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node RaftNodeTerm* = uint64 # Raft Node Term Type @@ -115,14 +116,14 @@ type # Raft Node Object type RaftNode*[SmCommandType, SmStateType] = ref object # Timers - requestVotesTimeout: int - heartBeatTimeout: int - appendEntriesTimeout: int + requestVotesTimeout*: int + heartBeatTimeout*: int + appendEntriesTimeout*: int - requestVotesTimer: Future[void] - heartBeatTimer: Future[void] - heartBeatTimeoutTimer: Future[void] - appendEntriesTimer: Future[void] + requestVotesTimer*: Future[void] + heartBeatTimer*: Future[void] + heartBeatTimeoutTimer*: Future[void] + appendEntriesTimer*: Future[void] # Mtx definition(s) go here raftStateMutex*: Lock diff --git a/tests/basic_timers.nim b/tests/basic_timers.nim index a92eb10..6ecbe6c 100644 --- a/tests/basic_timers.nim +++ b/tests/basic_timers.nim @@ -11,12 +11,6 @@ import ../raft/raft_api export raft_api -var - pollThr: Thread[void] - runningMtx: Lock - running: bool - proc RaftTimerCreateCustomImpl*(timerInterval: int, oneshot: bool, timerCallback: RaftTimerCallback): Future[void] {.async, nimcall, gcsafe.} = - var fut = sleepAsync(timerInterval) - fut.callback=proc()=timerCallback() - await fut \ No newline at end of file + await sleepAsync(timerInterval) + timerCallback() \ No newline at end of file diff --git a/tests/test_basic_timers.nim b/tests/test_basic_timers.nim index 0283247..faeda49 100644 --- a/tests/test_basic_timers.nim +++ b/tests/test_basic_timers.nim @@ -22,18 +22,23 @@ const proc basicTimersMain*() = var - slowTimers: array[0..MAX_TIMERS, RaftTimer] - fastTimers: array[0..MAX_TIMERS, RaftTimer] + slowTimers: array[0..MAX_TIMERS, Future[void]] + fastTimers: array[0..MAX_TIMERS, Future[void]] var - RaftDummyTimerCallback = proc (timer: RaftTimer) {.nimcall, gcsafe.} = - discard + slowCnt: ref int + RaftDummyTimerCallback = proc () {.nimcall, gcsafe.} = discard + RaftTimerCallbackCnt = proc (cnt: ref int): RaftTimerCallback = + proc () {.gcsafe.} = cnt[].inc + + slowCnt = new(int) + slowCnt[] = 0 suite "Create and test basic timers": test "Create 'slow' and 'fast' timers": for i in 0..MAX_TIMERS: - slowTimers[i] = RaftTimerCreateCustomImpl(max(SLOW_TIMERS_MIN, rand(SLOW_TIMERS_MAX)), true, RaftDummyTimerCallback) + slowTimers[i] = RaftTimerCreateCustomImpl(max(SLOW_TIMERS_MIN, rand(SLOW_TIMERS_MAX)), true, RaftTimerCallbackCnt(slowCnt)) for i in 0..MAX_TIMERS: fastTimers[i] = RaftTimerCreateCustomImpl(max(FAST_TIMERS_MIN, rand(FAST_TIMERS_MAX)), true, RaftDummyTimerCallback) @@ -41,7 +46,8 @@ proc basicTimersMain*() = test "Wait for and cancel 'slow' timers": waitFor sleepAsync(WAIT_FOR_SLOW_TIMERS) for i in 0..MAX_TIMERS: - RaftTimerCancelCustomImpl(slowTimers[i]) + if not slowTimers[i].finished: + cancel(slowTimers[i]) test "Final wait timers": waitFor sleepAsync(FINAL_WAIT) @@ -51,14 +57,13 @@ proc basicTimersMain*() = pass = true for i in 0..MAX_TIMERS: - if not fastTimers[i].expired: - pass = false - break - if not slowTimers[i].canceled: + if not fastTimers[i].finished: + debugEcho repr(fastTimers[i]) pass = false break check pass + check slowCnt[] == 0 if isMainModule: basicTimersMain() \ No newline at end of file