This commit is contained in:
Raycho Mukelov 2023-09-03 00:16:26 +03:00
parent 1f4af32358
commit 949c594c6a
5 changed files with 28 additions and 81 deletions

View File

@ -48,7 +48,7 @@ type
rncroRequestSmState = 0,
rncroExecSmCommand = 1
RaftNodeClientResponseError = enum
RaftNodeClientResponseError* = enum
rncreSuccess = 0,
rncreFail = 1,
rncreNotLeader = 2

View File

@ -8,7 +8,6 @@
# those terms.
import chronicles
import asyncdispatch
import types
import protocol
@ -24,7 +23,6 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp
# persistentStorage: RaftNodePersistentStorage,
msgSendCallback: RaftMessageSendCallback): T =
var
sm: RaftNodeStateMachine[SmCommandType, SmStateType]
peers: RaftNodePeers
for peerId in peersIds:
@ -32,10 +30,10 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp
result = T(
id: id, state: rnsFollower, currentTerm: 0, peers: peers, commitIndex: 0, lastApplied: 0,
stateMachine: sm, msgSendCallback: msgSendCallback, votedFor: DefaultUUID, currentLeaderId: DefaultUUID
msgSendCallback: msgSendCallback, votedFor: DefaultUUID, currentLeaderId: DefaultUUID
)
RaftNodeSmInit[SmCommandType, SmStateType](result.stateMachine)
RaftNodeSmInit(result.stateMachine)
initLock(result.raftStateMutex)
proc RaftNodeLoad*[SmCommandType, SmStateType](
@ -47,6 +45,9 @@ proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmS
discard
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
if node.state != rnsFollower:
raiseAssert "Something's wrong - Follower state expected at start!"
debugEcho "StartNode: ", node.id
func RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID
@ -66,11 +67,21 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
# Deliver Raft Message to the Raft Node and dispatch it
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
discard
case raftMessage.type:
of RaftMessageAppendEntries: # Dispatch different Raft Message types
discard
of RaftMessageRequestVote:
discard
# Process RaftNodeClientRequests
proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
discard
case req.op:
of rncroExecSmCommand:
# TODO: implemenmt command handling
discard
of rncroRequestSmState:
if RaftNodeIsLeader(node):
return RaftNodeClientResponse(error: rncreSuccess, state: RaftNodeStateGet(node))
# Abstract State Machine Ops
func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
@ -89,18 +100,6 @@ template RaftTimerCreate(timerInterval: int, oneshot: bool, timerCallback: RaftT
mixin RaftTimerCreateCustomImpl
RaftTimerCreateCustomImpl(timerInterval, oneshot, timerCallback)
template RaftTimerCancel(timer: RaftTimer) =
mixin RaftTimerCancelCustomImpl
RaftTimerCancelCustomImpl(timer)
template RaftTimerStart() =
mixin RaftTimerStartCustomImpl
RaftTimerStartCustomImpl()
template RaftTimerStop() =
mixin RaftTimerStopCustomImpl
RaftTimerStopCustomImpl()
# Private Log Ops
proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
discard

View File

@ -15,8 +15,7 @@ import stew/results
import uuids
import asyncdispatch
export results, options, locks, uuids
export results, options, locks, uuids, asyncdispatch
const
DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000
@ -111,7 +110,7 @@ type
timeout*: int
oneshot*: bool
RaftTimerCallback* = proc (timer: RaftTimer) {.gcsafe.} # Pass any function wrapped in a closure
RaftTimerCallback* = proc () {.gcsafe.} # Pass any function wrapped in a closure
# Raft Node Object type
RaftNode*[SmCommandType, SmStateType] = ref object
@ -120,9 +119,9 @@ type
heartBeatTimeout: int
appendEntriesTimeout: int
requestVotesTimer: RaftTimer
heartBeatTimer: RaftTimer
appendEntriesTimer: RaftTimer
requestVotesTimer: Future[void]
heartBeatTimer: Future[void]
appendEntriesTimer: Future[void]
# Mtx definition(s) go here
raftStateMutex*: Lock

View File

@ -7,66 +7,16 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/asyncdispatch
import ../raft/raft_api
export asyncdispatch, raft_api
export raft_api
var
pollThr: Thread[void]
runningMtx: Lock
running: bool
proc RaftTimerCreateCustomImpl*(timerInterval: int, oneshot: bool, timerCallback: RaftTimerCallback): RaftTimer {.nimcall, gcsafe.} =
var
timer = RaftTimer(mtx: Lock(), canceled: false, expired: false, timeout: timerInterval, oneshot: oneshot)
initLock(timer.mtx)
addTimer(timer.timeout, timer.oneshot, proc (fd: AsyncFD): bool {.closure, gcsafe.} =
withLock(timer.mtx):
if not timer.canceled:
timerCallback(timer)
if timer.oneshot:
timer.expired = true
return true
else:
return false
else:
return true
)
timer
proc RaftTimerCancelCustomImpl*(timer: RaftTimer): bool {.nimcall, gcsafe, discardable.} =
withLock(timer.mtx):
if not timer.expired and not timer.canceled:
timer.canceled = true
else:
return false
proc RaftTimerPollThread() {.thread, nimcall, gcsafe.} =
while running:
try:
poll()
except ValueError as e:
# debugEcho e.msg
# Add a 'dummy' timer if no other handles are present to prevent more
# ValueError exceptions this is a workaround for a asyncdyspatch bug
# see - https://github.com/nim-lang/Nim/issues/14564
addTimer(1, false, proc (fd: AsyncFD): bool {.closure, gcsafe.} = false)
proc RaftTimerJoinPollThread*() {.nimcall, gcsafe.} =
joinThread(pollThr)
proc RaftTimerStartCustomImpl*(joinThread: bool = true) {.nimcall, gcsafe.} =
withLock(runningMtx):
running = true
createThread(pollThr, RaftTimerPollThread)
if joinThread:
RaftTimerJoinPollThread()
proc RaftTimerStopCustomImpl*(joinThread: bool = true) {.nimcall, gcsafe.} =
withLock(runningMtx):
running = false
if joinThread:
RaftTimerJoinPollThread()
proc RaftTimerCreateCustomImpl*(timerInterval: int, oneshot: bool, timerCallback: RaftTimerCallback): Future[void] {.async, nimcall, gcsafe.} =
var fut = sleepAsync(timerInterval)
fut.callback=proc()=timerCallback()
await fut

View File

@ -8,7 +8,6 @@
# those terms.
import unittest2
import ../raft/types
import basic_state_machine
proc basicStateMachineMain*() =