From 833cc236a3be4f2e344050ded157926eebe5c1c1 Mon Sep 17 00:00:00 2001 From: Raycho Mukelov Date: Tue, 22 Aug 2023 04:04:47 +0300 Subject: [PATCH] Add (basic) timer implementation to use for testing --- raft.nim | 4 +-- raft/protocol.nim | 6 ++-- raft/raft_api.nim | 31 ++++++++-------- raft/types.nim | 31 +++++++++------- tests/basic_timers.nim | 70 +++++++++++++++++++++++++++++++++++++ tests/basic_timers_test.nim | 57 ++++++++++++++++++++++++++++++ 6 files changed, 167 insertions(+), 32 deletions(-) create mode 100644 tests/basic_timers.nim create mode 100644 tests/basic_timers_test.nim diff --git a/raft.nim b/raft.nim index 90960d6..a606cf4 100644 --- a/raft.nim +++ b/raft.nim @@ -8,7 +8,7 @@ # those terms. import - raft/api + raft/raft_api export - api, types, protocol + raft_api diff --git a/raft/protocol.nim b/raft/protocol.nim index 4851b9f..595beca 100644 --- a/raft/protocol.nim +++ b/raft/protocol.nim @@ -53,9 +53,9 @@ type rncroExecSmCommand = 1 RaftNodeClientResponseError = enum - rncrSuccess = 0, - rncrFail = 1, - rncrNotLeader = 2 + rncreSuccess = 0, + rncreFail = 1, + rncreNotLeader = 2 RaftNodeClientRequest*[SmCommandType] = ref object op*: RaftNodeClientRequestOps diff --git a/raft/raft_api.nim b/raft/raft_api.nim index e0d2055..a69355f 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -11,7 +11,7 @@ import types import protocol import consensus_module -export types, protocol +export types, protocol, consensus_module # Raft Node Public API procedures / functions proc RaftNodeCreateNew*[SmCommandType, SmStateType]( # Create New Raft Node @@ -73,17 +73,21 @@ proc RaftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMach RaftSmApply(stateMachine, command) # Private Abstract Timer manipulation Ops -proc RaftTimerCreate[TimerDurationType](timerInterval: TimerDurationType, timer_callback: RaftTimerCallback): TimerId = # I guess Duration should be monotonic +template RaftTimerCreate(timerInterval: int, repeat: bool, timerCallback: RaftTimerCallback): RaftTimer = mixin RaftTimerCreateCustomImpl - RaftTimerCreateCustomImpl(timerInterval, timer_callback) + RaftTimerCreateCustomImpl(timerInterval, repeat, timerCallback) -template RaftTimerCancel(TimerId) = +template RaftTimerCancel(timer: RaftTimer) = mixin RaftTimerCancelCustomImpl - RaftTimerCancelCustomImpl(TimerId) + RaftTimerCancelCustomImpl(timer) -template RaftTimerIsExpired(TimerId): bool = - mixin RaftTimerIsExpiredImpl - RaftTimerIsExpiredImpl(TimerId) +template RaftTimerStart() = + mixin RaftTimerStartCustomImpl + RaftTimerStartCustomImpl() + +template RaftTimerStop() = + mixin RaftTimerStopCustomImpl + RaftTimerStopCustomImpl() # Private Log Ops proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = @@ -96,17 +100,14 @@ proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandTyp discard # Private Timers Create Ops -proc RaftNodeScheduleHeartBeat[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId = +proc RaftNodeScheduleHeartBeat[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]) = discard -proc RaftNodeScheduleHeartBeatTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId = +proc RaftNodeScheduleHeartBeatTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]) = discard -proc scheduleElectionTimeOut[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId = +proc RaftNodeScheduleElectionTimeOut[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]) = discard -proc scheduleRequestVoteTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId = - discard - -proc RaftNodeHeartBeatTimeout[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = +proc RaftNodeScheduleRequestVoteTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]) = discard diff --git a/raft/types.nim b/raft/types.nim index e113c1b..8a360b7 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -10,10 +10,8 @@ # Raft Node Public Types import std/locks -import std/sets import options import stew/results -import uuid4 export results, options @@ -25,7 +23,7 @@ type rnsCandidate = 2 rnsLeader = 3 - RaftNodeId* = Uuid # uuid4 uniquely identifying every Raft Node + RaftNodeId* = uint64 # uuid4 uniquely identifying every Raft Node RaftNodeTerm* = uint64 # Raft Node Term Type RaftLogIndex* = uint64 # Raft Node Log Index Type @@ -55,6 +53,7 @@ type RaftConsensusModule*[SmCommandType, SmStateType] = object of RootObj stateTransitionsFsm: seq[byte] # I plan to use nim.fsm https://github.com/ba0f3/fsm.nim + gatheredVotesCount: int raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType] RaftLogCompactionModule*[SmCommandType, SmStateType] = object of RootObj @@ -64,7 +63,7 @@ type raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType] # Callback for sending messages out of this Raft Node - RaftMessageId* = Uuid # UUID assigned to every Raft Node Message, + RaftMessageId* = uint64 # UUID assigned to every Raft Node Message, # so it can be matched with it's corresponding response etc. RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages @@ -88,27 +87,35 @@ type RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.) term*: RaftNodeTerm index*: RaftLogIndex - clusterTime*: object entryType*: LogEntryType # Type of entry - data to append, configuration or no op etc. configuration: Option[RaftNodeConfiguration] # Node configuration data*: Option[SmCommandType] # Entry data (State Machine Command) - this is mutually exclusive with configuration # depending on entryType field - RaftNodeLog*[SmCommandType] = object # Needs more elaborate definition. + RaftNodeLog*[SmCommandType] = object # Needs more elaborate definition. # Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc. logData*: seq[RaftNodeLogEntry[SmCommandType]] # Raft Node Log Data # Timer types - TimerId* = Uuid - RaftTimerCallback* = proc (timerId: TimerId) {.nimcall, gcsafe.} # Pass any function wrapped in a closure + RaftTimer* = object + mtx*: Lock + canceled*: bool + expired*: bool + timeout*: int + repeat*: bool + + RaftTimerCallback* = proc (timer: var RaftTimer) {.nimcall, gcsafe.} # Pass any function wrapped in a closure # Raft Node Object type RaftNode*[SmCommandType, SmStateType] = object # Timers - activeTimersSet: HashSet[TimerId] - requestVoteTimeout: uint64 - heartBeatTimeOut: uint64 - appendEntriesTimeOut: uint64 + requestVotesTimeout: int + heartBeatTimeout: int + appendEntriesTimeout: int + + requestVotesTimer: RaftTimer + heartBeatTimer: RaftTimer + appendEntriesTimer: RaftTimer # Mtx definition(s) go here raftStateMutex: Lock diff --git a/tests/basic_timers.nim b/tests/basic_timers.nim new file mode 100644 index 0000000..df90026 --- /dev/null +++ b/tests/basic_timers.nim @@ -0,0 +1,70 @@ +# nim-raft +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import std/asyncdispatch +import std/locks +import ../raft/types + +export asyncdispatch + +var + pollThr: Thread[void] + runningMtx: Lock + running: bool + +proc RaftTimerCreateCustomImpl*(timerInterval: int, repeat: bool, timerCallback: RaftTimerCallback): RaftTimer = + var + timer = RaftTimer(canceled: false, expired: false, timeout: timerInterval, repeat: repeat) + initLock(timer.mtx) + + proc CallbackClosureProc(): Callback = + result = proc (fd: AsyncFD): bool {.closure, gcsafe.} = + withLock(timer.mtx): + if not timer.canceled: + timerCallback(timer) + if not timer.repeat: + timer.expired = true + return true + else: + return false + + debugEcho repr(CallbackClosureProc()) + addTimer(timer.timeout, timer.repeat, CallbackClosureProc()) + timer + +proc RaftTimerCancelCustomImpl*(timer: var RaftTimer): bool {.discardable.} = + withLock(timer.mtx): + if not timer.expired and not timer.canceled: + timer.canceled = true + return true + else: + return false + +proc RaftTimerPollThread() {.thread, nimcall, gcsafe.} = + while running: + try: + poll() + except ValueError as e: + debugEcho e.msg + +proc RaftTimerJoinPollThread*() = + joinThread(pollThr) + +proc RaftTimerStartCustomImpl*(joinThread: bool = true) = + withLock(runningMtx): + running = true + createThread(pollThr, RaftTimerPollThread) + if joinThread: + RaftTimerJoinPollThread() + +proc RaftTimerStopCustomImpl*(joinThread: bool = true) = + withLock(runningMtx): + running = false + if joinThread: + RaftTimerJoinPollThread() \ No newline at end of file diff --git a/tests/basic_timers_test.nim b/tests/basic_timers_test.nim new file mode 100644 index 0000000..d66d4ca --- /dev/null +++ b/tests/basic_timers_test.nim @@ -0,0 +1,57 @@ +# nim-raft +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import unittest2 +import ../raft/types +import std/locks +import basic_timers + +var + cancelCond: Cond + cancelLock: Lock + +initLock(cancelLock) +initCond(cancelCond) + +proc timersRunner() = + const + MAX_TIMERS = 50 + var + slowTimers: array[0..MAX_TIMERS, RaftTimer] + fastTimers: array[0..MAX_TIMERS, RaftTimer] + cancelTimer: RaftTimer + + proc CancelTimerCallbackClosure( + slowTimers: var array[0..MAX_TIMERS, RaftTimer], + fastTimers: var array[0..MAX_TIMERS, RaftTimer] + ): RaftTimerCallback = + result = proc (timer: var RaftTimer) {.nimcall, gcsafe.} = + debugEcho "Aahjsbdghajsdhjgshgjd" + signal(cancelCond) + + suite "Create and test basic timers": + test "Create 50 slow timers (100-150 ms)": + check true + test "Create 50 fast timers (20-50 ms)": + check true + test "Create cancel timer": + check true + test "Start timers": + cancelTimer = RaftTimerCreateCustomImpl(250, false, CancelTimerCallbackClosure(slowTimers, fastTimers)) + RaftTimerStartCustomImpl(joinThread=false) + debugEcho repr(cancelTimer) + check true + test "Wait cancel timer 250 ms and stop timers": + wait(cancelCond, cancelLock) + check true + test "Check timers consistency": + check true + +if isMainModule: + timersRunner() \ No newline at end of file