Add more consensus code etc.

This commit is contained in:
Raycho Mukelov 2023-09-03 03:53:48 +03:00
parent 31a3ae1078
commit 66a6d7bc33
7 changed files with 107 additions and 49 deletions

View File

@ -21,6 +21,7 @@ requires "stew >= 0.1.0"
requires "unittest2 >= 0.0.4"
requires "uuids >= 0.1.11"
requires "chronicles >= 0.10.3"
requires "chronos >= 3.0.11"
proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
if not dirExists "build":

46
raft/async_util.nim Normal file
View File

@ -0,0 +1,46 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import asyncdispatch
import std/time
template awaitWithTimeout[T](operation: Future[T],
deadline: Future[void],
onTimeout: untyped): T =
let f = operation
await f or deadline
if not f.finished:
# If we don't wait for for the cancellation here, it's possible that
# the "next" operation will run concurrently to this one, messing up
# the order of operations (since await/async is not fair)
await cancelAndWait(f)
onTimeout
else:
f.read
template awaitWithTimeout[T](operation: Future[T],
timeout: Duration,
onTimeout: untyped): T =
awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)
template awaitWithTimeout(operation: Future[void],
deadline: Future[void],
onTimeout: untyped) =
let f = operation
await f or deadline
if not f.finished:
# If we don't wait for for the cancellation here, it's possible that
# the "next" operation will run concurrently to this one, messing up
# the order of operations (since await/async is not fair)
await cancelAndWait(f)
onTimeout
template awaitWithTimeout(operation: Future[void],
timeout: Duration,
onTimeout: untyped) =
awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)

View File

@ -28,4 +28,10 @@ proc RaftNodeScheduleElectionTimeOut*[SmCommandType, SmStateType](node: RaftNode
discard
proc RaftNodeScheduleRequestVotesCleanUpTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
discard
proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
discard

View File

@ -17,7 +17,7 @@ export types, protocol, consensus_module
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.}
# Raft Node Public API procedures / functions
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node
@ -60,21 +60,24 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
# Deliver Raft Message to the Raft Node and dispatch it
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
case raftMessage.type:
of RaftMessageAppendEntries: # Dispatch different Raft Message types
discard
of RaftMessageRequestVote:
discard
# case raftMessage.type
# of RaftMessageAppendEntries: # Dispatch different Raft Message types
# discard
# of RaftMessageRequestVote:
# discard
# else: discard
discard
# Process RaftNodeClientRequests
proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
case req.op:
case req.op
of rncroExecSmCommand:
# TODO: implemenmt command handling
discard
of rncroRequestSmState:
if RaftNodeIsLeader(node):
return RaftNodeClientResponse(error: rncreSuccess, state: RaftNodeStateGet(node))
else: discard
# Abstract State Machine Ops
func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
@ -94,12 +97,6 @@ template RaftTimerCreate(timerInterval: int, oneshot: bool, timerCallback: RaftT
RaftTimerCreateCustomImpl(timerInterval, oneshot, timerCallback)
# Private Log Ops
proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
discard
proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
discard
proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
discard
@ -111,11 +108,11 @@ proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCom
var fut = sleepAsync(node.heartBeatTimeout)
fut.callback = proc () = RaftNodeSendHeartBeat(node)
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.} =
node.heartBeatTimeoutTimer = sleepAsync(node.heartBeatTimeout)
node.heartBeatTimeoutTimer.callback = proc() =
node.state = rnsCandidate # Transition to candidate state and initiate new Election
RaftNodeStartElection(node)
await node.heartBeatTimeoutTimer
node.state = rnsCandidate # Transition to candidate state and initiate new Election
RaftNodeStartElection(node)
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
for raftPeer in node.peers:
@ -128,11 +125,19 @@ proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommand
RaftNodeScheduleHeartBeat(node)
# Raft Node Control
proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
node.requestVotesTimer.fail(newException(Exception, "fail"))
node.heartBeatTimer.fail(newException(Exception, "fail"))
node.heartBeatTimeoutTimer.fail(newException(Exception, "fail"))
node.appendEntriesTimer.fail(newException(Exception, "fail"))
proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
# Try to stop gracefully
node.state = rnsStopped
# Cancel pending timers (if any)
RaftNodeCancelAllTimers(node)
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
if node.state != rnsFollower:
raiseAssert "Something's wrong - Follower state expected at start!"
RaftNodeScheduleHeartBeatTimeout(node)
node.state = rnsFollower
asyncSpawn RaftNodeScheduleHeartBeatTimeout(node)
debugEcho "StartNode: ", node.id

View File

@ -13,9 +13,9 @@ import std/locks
import options
import stew/results
import uuids
import asyncdispatch
import chronos
export results, options, locks, uuids, asyncdispatch
export results, options, locks, uuids, chronos
const
DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000
@ -25,7 +25,8 @@ type
rnsUnknown = 0,
rnsFollower = 1,
rnsCandidate = 2
rnsLeader = 3
rnsLeader = 3,
rnsStopped = 4
RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node
RaftNodeTerm* = uint64 # Raft Node Term Type
@ -115,14 +116,14 @@ type
# Raft Node Object type
RaftNode*[SmCommandType, SmStateType] = ref object
# Timers
requestVotesTimeout: int
heartBeatTimeout: int
appendEntriesTimeout: int
requestVotesTimeout*: int
heartBeatTimeout*: int
appendEntriesTimeout*: int
requestVotesTimer: Future[void]
heartBeatTimer: Future[void]
heartBeatTimeoutTimer: Future[void]
appendEntriesTimer: Future[void]
requestVotesTimer*: Future[void]
heartBeatTimer*: Future[void]
heartBeatTimeoutTimer*: Future[void]
appendEntriesTimer*: Future[void]
# Mtx definition(s) go here
raftStateMutex*: Lock

View File

@ -11,12 +11,6 @@ import ../raft/raft_api
export raft_api
var
pollThr: Thread[void]
runningMtx: Lock
running: bool
proc RaftTimerCreateCustomImpl*(timerInterval: int, oneshot: bool, timerCallback: RaftTimerCallback): Future[void] {.async, nimcall, gcsafe.} =
var fut = sleepAsync(timerInterval)
fut.callback=proc()=timerCallback()
await fut
await sleepAsync(timerInterval)
timerCallback()

View File

@ -22,18 +22,23 @@ const
proc basicTimersMain*() =
var
slowTimers: array[0..MAX_TIMERS, RaftTimer]
fastTimers: array[0..MAX_TIMERS, RaftTimer]
slowTimers: array[0..MAX_TIMERS, Future[void]]
fastTimers: array[0..MAX_TIMERS, Future[void]]
var
RaftDummyTimerCallback = proc (timer: RaftTimer) {.nimcall, gcsafe.} =
discard
slowCnt: ref int
RaftDummyTimerCallback = proc () {.nimcall, gcsafe.} = discard
RaftTimerCallbackCnt = proc (cnt: ref int): RaftTimerCallback =
proc () {.gcsafe.} = cnt[].inc
slowCnt = new(int)
slowCnt[] = 0
suite "Create and test basic timers":
test "Create 'slow' and 'fast' timers":
for i in 0..MAX_TIMERS:
slowTimers[i] = RaftTimerCreateCustomImpl(max(SLOW_TIMERS_MIN, rand(SLOW_TIMERS_MAX)), true, RaftDummyTimerCallback)
slowTimers[i] = RaftTimerCreateCustomImpl(max(SLOW_TIMERS_MIN, rand(SLOW_TIMERS_MAX)), true, RaftTimerCallbackCnt(slowCnt))
for i in 0..MAX_TIMERS:
fastTimers[i] = RaftTimerCreateCustomImpl(max(FAST_TIMERS_MIN, rand(FAST_TIMERS_MAX)), true, RaftDummyTimerCallback)
@ -41,7 +46,8 @@ proc basicTimersMain*() =
test "Wait for and cancel 'slow' timers":
waitFor sleepAsync(WAIT_FOR_SLOW_TIMERS)
for i in 0..MAX_TIMERS:
RaftTimerCancelCustomImpl(slowTimers[i])
if not slowTimers[i].finished:
cancel(slowTimers[i])
test "Final wait timers":
waitFor sleepAsync(FINAL_WAIT)
@ -51,14 +57,13 @@ proc basicTimersMain*() =
pass = true
for i in 0..MAX_TIMERS:
if not fastTimers[i].expired:
pass = false
break
if not slowTimers[i].canceled:
if not fastTimers[i].finished:
debugEcho repr(fastTimers[i])
pass = false
break
check pass
check slowCnt[] == 0
if isMainModule:
basicTimersMain()