Add (basic) timer implementation to use for testing

This commit is contained in:
Raycho Mukelov 2023-08-22 04:04:47 +03:00
parent e1df04eb53
commit 833cc236a3
6 changed files with 167 additions and 32 deletions

View File

@ -8,7 +8,7 @@
# those terms.
import
raft/api
raft/raft_api
export
api, types, protocol
raft_api

View File

@ -53,9 +53,9 @@ type
rncroExecSmCommand = 1
RaftNodeClientResponseError = enum
rncrSuccess = 0,
rncrFail = 1,
rncrNotLeader = 2
rncreSuccess = 0,
rncreFail = 1,
rncreNotLeader = 2
RaftNodeClientRequest*[SmCommandType] = ref object
op*: RaftNodeClientRequestOps

View File

@ -11,7 +11,7 @@ import types
import protocol
import consensus_module
export types, protocol
export types, protocol, consensus_module
# Raft Node Public API procedures / functions
proc RaftNodeCreateNew*[SmCommandType, SmStateType]( # Create New Raft Node
@ -73,17 +73,21 @@ proc RaftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMach
RaftSmApply(stateMachine, command)
# Private Abstract Timer manipulation Ops
proc RaftTimerCreate[TimerDurationType](timerInterval: TimerDurationType, timer_callback: RaftTimerCallback): TimerId = # I guess Duration should be monotonic
template RaftTimerCreate(timerInterval: int, repeat: bool, timerCallback: RaftTimerCallback): RaftTimer =
mixin RaftTimerCreateCustomImpl
RaftTimerCreateCustomImpl(timerInterval, timer_callback)
RaftTimerCreateCustomImpl(timerInterval, repeat, timerCallback)
template RaftTimerCancel(TimerId) =
template RaftTimerCancel(timer: RaftTimer) =
mixin RaftTimerCancelCustomImpl
RaftTimerCancelCustomImpl(TimerId)
RaftTimerCancelCustomImpl(timer)
template RaftTimerIsExpired(TimerId): bool =
mixin RaftTimerIsExpiredImpl
RaftTimerIsExpiredImpl(TimerId)
template RaftTimerStart() =
mixin RaftTimerStartCustomImpl
RaftTimerStartCustomImpl()
template RaftTimerStop() =
mixin RaftTimerStopCustomImpl
RaftTimerStopCustomImpl()
# Private Log Ops
proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
@ -96,17 +100,14 @@ proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandTyp
discard
# Private Timers Create Ops
proc RaftNodeScheduleHeartBeat[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId =
proc RaftNodeScheduleHeartBeat[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc RaftNodeScheduleHeartBeatTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId =
proc RaftNodeScheduleHeartBeatTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc scheduleElectionTimeOut[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId =
proc RaftNodeScheduleElectionTimeOut[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc scheduleRequestVoteTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId =
discard
proc RaftNodeHeartBeatTimeout[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc RaftNodeScheduleRequestVoteTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]) =
discard

View File

@ -10,10 +10,8 @@
# Raft Node Public Types
import std/locks
import std/sets
import options
import stew/results
import uuid4
export results, options
@ -25,7 +23,7 @@ type
rnsCandidate = 2
rnsLeader = 3
RaftNodeId* = Uuid # uuid4 uniquely identifying every Raft Node
RaftNodeId* = uint64 # uuid4 uniquely identifying every Raft Node
RaftNodeTerm* = uint64 # Raft Node Term Type
RaftLogIndex* = uint64 # Raft Node Log Index Type
@ -55,6 +53,7 @@ type
RaftConsensusModule*[SmCommandType, SmStateType] = object of RootObj
stateTransitionsFsm: seq[byte] # I plan to use nim.fsm https://github.com/ba0f3/fsm.nim
gatheredVotesCount: int
raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType]
RaftLogCompactionModule*[SmCommandType, SmStateType] = object of RootObj
@ -64,7 +63,7 @@ type
raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType]
# Callback for sending messages out of this Raft Node
RaftMessageId* = Uuid # UUID assigned to every Raft Node Message,
RaftMessageId* = uint64 # UUID assigned to every Raft Node Message,
# so it can be matched with it's corresponding response etc.
RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages
@ -88,27 +87,35 @@ type
RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
index*: RaftLogIndex
clusterTime*: object
entryType*: LogEntryType # Type of entry - data to append, configuration or no op etc.
configuration: Option[RaftNodeConfiguration] # Node configuration
data*: Option[SmCommandType] # Entry data (State Machine Command) - this is mutually exclusive with configuration
# depending on entryType field
RaftNodeLog*[SmCommandType] = object # Needs more elaborate definition.
RaftNodeLog*[SmCommandType] = object # Needs more elaborate definition.
# Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc.
logData*: seq[RaftNodeLogEntry[SmCommandType]] # Raft Node Log Data
# Timer types
TimerId* = Uuid
RaftTimerCallback* = proc (timerId: TimerId) {.nimcall, gcsafe.} # Pass any function wrapped in a closure
RaftTimer* = object
mtx*: Lock
canceled*: bool
expired*: bool
timeout*: int
repeat*: bool
RaftTimerCallback* = proc (timer: var RaftTimer) {.nimcall, gcsafe.} # Pass any function wrapped in a closure
# Raft Node Object type
RaftNode*[SmCommandType, SmStateType] = object
# Timers
activeTimersSet: HashSet[TimerId]
requestVoteTimeout: uint64
heartBeatTimeOut: uint64
appendEntriesTimeOut: uint64
requestVotesTimeout: int
heartBeatTimeout: int
appendEntriesTimeout: int
requestVotesTimer: RaftTimer
heartBeatTimer: RaftTimer
appendEntriesTimer: RaftTimer
# Mtx definition(s) go here
raftStateMutex: Lock

70
tests/basic_timers.nim Normal file
View File

@ -0,0 +1,70 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/asyncdispatch
import std/locks
import ../raft/types
export asyncdispatch
var
pollThr: Thread[void]
runningMtx: Lock
running: bool
proc RaftTimerCreateCustomImpl*(timerInterval: int, repeat: bool, timerCallback: RaftTimerCallback): RaftTimer =
var
timer = RaftTimer(canceled: false, expired: false, timeout: timerInterval, repeat: repeat)
initLock(timer.mtx)
proc CallbackClosureProc(): Callback =
result = proc (fd: AsyncFD): bool {.closure, gcsafe.} =
withLock(timer.mtx):
if not timer.canceled:
timerCallback(timer)
if not timer.repeat:
timer.expired = true
return true
else:
return false
debugEcho repr(CallbackClosureProc())
addTimer(timer.timeout, timer.repeat, CallbackClosureProc())
timer
proc RaftTimerCancelCustomImpl*(timer: var RaftTimer): bool {.discardable.} =
withLock(timer.mtx):
if not timer.expired and not timer.canceled:
timer.canceled = true
return true
else:
return false
proc RaftTimerPollThread() {.thread, nimcall, gcsafe.} =
while running:
try:
poll()
except ValueError as e:
debugEcho e.msg
proc RaftTimerJoinPollThread*() =
joinThread(pollThr)
proc RaftTimerStartCustomImpl*(joinThread: bool = true) =
withLock(runningMtx):
running = true
createThread(pollThr, RaftTimerPollThread)
if joinThread:
RaftTimerJoinPollThread()
proc RaftTimerStopCustomImpl*(joinThread: bool = true) =
withLock(runningMtx):
running = false
if joinThread:
RaftTimerJoinPollThread()

View File

@ -0,0 +1,57 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import ../raft/types
import std/locks
import basic_timers
var
cancelCond: Cond
cancelLock: Lock
initLock(cancelLock)
initCond(cancelCond)
proc timersRunner() =
const
MAX_TIMERS = 50
var
slowTimers: array[0..MAX_TIMERS, RaftTimer]
fastTimers: array[0..MAX_TIMERS, RaftTimer]
cancelTimer: RaftTimer
proc CancelTimerCallbackClosure(
slowTimers: var array[0..MAX_TIMERS, RaftTimer],
fastTimers: var array[0..MAX_TIMERS, RaftTimer]
): RaftTimerCallback =
result = proc (timer: var RaftTimer) {.nimcall, gcsafe.} =
debugEcho "Aahjsbdghajsdhjgshgjd"
signal(cancelCond)
suite "Create and test basic timers":
test "Create 50 slow timers (100-150 ms)":
check true
test "Create 50 fast timers (20-50 ms)":
check true
test "Create cancel timer":
check true
test "Start timers":
cancelTimer = RaftTimerCreateCustomImpl(250, false, CancelTimerCallbackClosure(slowTimers, fastTimers))
RaftTimerStartCustomImpl(joinThread=false)
debugEcho repr(cancelTimer)
check true
test "Wait cancel timer 250 ms and stop timers":
wait(cancelCond, cancelLock)
check true
test "Check timers consistency":
check true
if isMainModule:
timersRunner()