From f1d558cae522d8fac763894d46c343512634ddfa Mon Sep 17 00:00:00 2001 From: Raycho Mukelov Date: Sun, 3 Sep 2023 20:52:35 +0300 Subject: [PATCH] Refactor some stuff. Add Key-Value DB support (MDBX) --- db/kvstore_mdbx.nim | 59 +++++++++++++++++++----------------- raft/async_util.nim | 3 +- raft/consensus_module.nim | 31 +++++++++---------- raft/protocol.nim | 3 +- raft/raft_api.nim | 8 +++-- raft/types.nim | 4 ++- tests/basic_cluster.nim | 5 +-- tests/test_basic_cluster.nim | 2 +- 8 files changed, 58 insertions(+), 57 deletions(-) diff --git a/db/kvstore_mdbx.nim b/db/kvstore_mdbx.nim index d0bbeb2..bcdf4e3 100644 --- a/db/kvstore_mdbx.nim +++ b/db/kvstore_mdbx.nim @@ -11,13 +11,13 @@ export Database, CRUD, Collection, Transaction, Cursor, Error, Index, Collatable type MDBXStoreRef* = ref object of RootObj database* {.requiresInit.}: Database - chains* {.requiresInit.}: Collection + raftNodesData* {.requiresInit.}: Collection MDBXTransaction* = ref object of RootObj transaction: CollectionTransaction const - MaxFileSize = 1024 * 1024 * 1024 * 1024 # 1 TB (MDBX default is 400 MB) + MaxFileSize = 1024 * 1024 * 400 #r: 100MB (MDBX default is 400 MB) # ---------------------------------------------------------------------------------------- # MDBX exception handling helper templates @@ -62,11 +62,11 @@ proc rollback*(t: MDBXTransaction): ADbTResult[void] = ok() proc beginDbTransaction*(db: MDBXStoreRef): ADbTResult[MDBXTransaction] = - if db.chains != nil: + if db.raftNodesData != nil: handleEx(): - ok(MDBXTransaction(transaction: db.chains.beginTransaction())) + ok(MDBXTransaction(transaction: db.raftNodesData.beginTransaction())) else: - err("MDBXStoreRef.chains is nil") + err("MDBXStoreRef.raftNodesData is nil") proc put*(t: MDBXTransaction, key, value: openArray[byte]): ADbTResult[void] = handleExEx(): @@ -83,13 +83,13 @@ proc del*(t: MDBXTransaction, key: openArray[byte]): ADbTResult[void] = # ---------------------------------------------------------------------------------------- template checkDbChainsNotNil(db: MDBXStoreRef, body: untyped) = - ## Check if db.chains is not nil and execute the body + ## Check if db.raftNodesData is not nil and execute the body ## if it is not nil. Otherwise, raise an assert. ## - if db.chains != nil: + if db.raftNodesData != nil: body else: - raiseAssert "MDBXStoreRef.chains is nil" + raiseAssert "MDBXStoreRef.raftNodesData is nil" template withDbSnapshot*(db: MDBXStoreRef, body: untyped) = ## Create an MDBX snapshot and execute the body providing @@ -99,7 +99,7 @@ template withDbSnapshot*(db: MDBXStoreRef, body: untyped) = ## checkDbChainsNotNil(db): handleEx(): - let cs {.inject.} = db.chains.beginSnapshot() + let cs {.inject.} = db.raftNodesData.beginSnapshot() defer: cs.finish() body @@ -110,7 +110,7 @@ template withDbTransaction*(db: MDBXStoreRef, body: untyped) = ## checkDbChainsNotNil(db): handleEx(): - var dbTransaction {.inject.} = db.chains.beginTransaction() + var dbTransaction {.inject.} = db.raftNodesData.beginTransaction() defer: dbTransaction.commit() try: body @@ -127,8 +127,11 @@ template withDbTransaction*(db: MDBXStoreRef, body: untyped) = # ------------------------------------------------------------------------------------------ # MDBX KvStore interface implementation # ------------------------------------------------------------------------------------------ +type + DataProc = proc(data: seq[byte]) + FindProc = proc(data: seq[byte]) -proc get*(db: MDBXStoreRef, key: openArray[byte], onData: kvstore.DataProc): Result[bool] = +proc get*(db: MDBXStoreRef, key: openArray[byte], onData: DataProc): Result[bool, string] = if key.len <= 0: return err("mdbx: key cannot be empty on get") @@ -140,10 +143,10 @@ proc get*(db: MDBXStoreRef, key: openArray[byte], onData: kvstore.DataProc): Res else: return ok(false) -proc find*(db: MDBXStoreRef, prefix: openArray[byte], onFind: kvstore.KeyValueProc): Result[int] = +proc find*(db: MDBXStoreRef, prefix: openArray[byte], onFind: FindProc): Result[int, string] = raiseAssert "Unimplemented" -proc put*(db: MDBXStoreRef, key, value: openArray[byte]): Result[void] = +proc put*(db: MDBXStoreRef, key, value: openArray[byte]): Result[void, string] = if key.len <= 0: return err("mdbx: key cannot be empty on get") @@ -151,7 +154,7 @@ proc put*(db: MDBXStoreRef, key, value: openArray[byte]): Result[void] = dbTransaction.put(asData(key), asData(value)) ok() -proc contains*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] = +proc contains*(db: MDBXStoreRef, key: openArray[byte]): Result[bool, string] = if key.len <= 0: return err("mdbx: key cannot be empty on get") @@ -162,7 +165,7 @@ proc contains*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] = else: return ok(false) -proc del*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] = +proc del*(db: MDBXStoreRef, key: openArray[byte]): Result[bool, string] = if key.len <= 0: return err("mdbx: key cannot be empty on del") @@ -174,7 +177,7 @@ proc del*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] = else: return ok(false) -proc clear*(db: MDBXStoreRef): Result[bool] = +proc clear*(db: MDBXStoreRef): Result[bool, string] = raiseAssert "Unimplemented" proc close*(db: MDBXStoreRef) = @@ -189,21 +192,21 @@ proc close*(db: MDBXStoreRef) = # .End. MDBX KvStore interface implementation # ------------------------------------------------------------------------------------------ -proc bulkPutSortedData*[KT: ByteArray32 | ByteArray33](db: MDBXStoreRef, keys: openArray[KT], vals: openArray[seq[byte]]): KvResult[int64] = - if keys.len <= 0: - return err("mdbx: keys cannot be empty on bulkPutSortedData") +# proc bulkPutSortedData*[KT: ByteArray32 | ByteArray33](db: MDBXStoreRef, keys: openArray[KT], vals: openArray[seq[byte]]): Result[int64] = +# if keys.len <= 0: +# return err("mdbx: keys cannot be empty on bulkPutSortedData") - if keys.len != vals.len: - return err("mdbx: keys and vals must have the same length") +# if keys.len != vals.len: +# return err("mdbx: keys and vals must have the same length") - withDbTransaction(db): - for i in 0 ..< keys.len: - dbTransaction.put(asData(keys[i]), asData(vals[i])) - return ok(0) +# withDbTransaction(db): +# for i in 0 ..< keys.len: +# dbTransaction.put(asData(keys[i]), asData(vals[i])) +# return ok(0) proc init*( T: type MDBXStoreRef, basePath: string, name: string, - readOnly = false): KvResult[T] = + readOnly = false): Result[T, string] = let dataDir = basePath / name / "data" backupsDir = basePath / name / "backups" # Do we need that in case of MDBX? Should discuss this with @zah @@ -224,5 +227,5 @@ proc init*( handleEx(): let db = openDatabase(dataDir, flags=mdbxFlags, maxFileSize=MaxFileSize) - chains = createCollection(db, "chains", StringKeys, BlobValues) - ok(T(database: db, chains: chains)) + raftNodesData = createCollection(db, "raftNodesData", StringKeys, BlobValues) + ok(T(database: db, raftNodesData: raftNodesData)) diff --git a/raft/async_util.nim b/raft/async_util.nim index 49ca195..d7aac49 100644 --- a/raft/async_util.nim +++ b/raft/async_util.nim @@ -6,8 +6,9 @@ # at your option. # This file may not be copied, modified, or distributed except according to # those terms. + import asyncdispatch -import std/time +import std/times template awaitWithTimeout[T](operation: Future[T], deadline: Future[void], diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index fc76025..1656cea 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -9,34 +9,31 @@ import types, protocol, log_ops -proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - var - votesFuts: seq[Future[void]] - +proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} = node.state = rnsCandidate - for p in node.peers: - p.votedFor = DefaultUUID node.votedFor = node.id for peer in node.peers: - votesFuts.add(node.msgSendCallback( - RaftMessageRequestVote(lastLogTerm: RaftNodeLogEntry(RaftNodeLogIndexGet(node)).term, lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm) + peer.hasVoted = false + node.votesFuts.add(node.msgSendCallback( + RaftMessageRequestVote(lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).value.term, lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm) ) ) - # Process votes - for voteFut in votesFuts: - await voteFut + # Process votes (if any) + for voteFut in node.votesFuts: + discard await voteFut if voteFut.finished and not voteFut.failed: for p in node.peers: - if p.id == voteFut.senderId: - if voteFut.granted: - p.votedFor = node.id - else: - if voteFut.votedFor.initialized: - p.votedFor = voteFut.votedFor + debugEcho repr(voteFut) + # if p.id == voteFut.senderId: + # p.hasVoted = voteFut.granted + + # node.votesFuts.clear proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + for fut in node.voteFuts: + cancel(fut) discard proc RaftNodeProcessRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): Future[RaftMessageRequestVoteResponse] = diff --git a/raft/protocol.nim b/raft/protocol.nim index 330e349..879cf03 100644 --- a/raft/protocol.nim +++ b/raft/protocol.nim @@ -29,8 +29,7 @@ type senderTerm*: RaftNodeTerm # Sender Raft Node Term RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase - granted*: bool # Is vote granted? - votedFor*: Option[RaftNodeId] # Present if vote is not granted + granted*: bool # Is vote granted by the Raft node, from we requested vote? RaftMessageAppendEntries*[SmCommandType] = ref object of RaftMessageBase prevLogIndex*: RaftLogIndex diff --git a/raft/raft_api.nim b/raft/raft_api.nim index 5c01e74..446f9c5 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -13,6 +13,7 @@ import types import protocol import consensus_module import log_ops +import ../db/kvstore_mdbx export types, protocol, consensus_module, log_ops @@ -106,7 +107,8 @@ proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNod node.heartBeatTimeoutTimer = sleepAsync(node.heartBeatTimeout) await node.heartBeatTimeoutTimer node.state = rnsCandidate # Transition to candidate state and initiate new Election - RaftNodeStartElection(node) + var f = RaftNodeStartElection(node) + cancel(f) proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = for raftPeer in node.peers: @@ -129,9 +131,9 @@ proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmS # Try to stop gracefully node.state = rnsStopped # Cancel pending timers (if any) - RaftNodeCancelAllTimers(node) + var f = RaftNodeCancelAllTimers(node) proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = node.state = rnsFollower asyncSpawn RaftNodeScheduleHeartBeatTimeout(node) - debugEcho "StartNode: ", node.id \ No newline at end of file + debug "Start Raft Node with ID: ", nodeid=node.id \ No newline at end of file diff --git a/raft/types.nim b/raft/types.nim index 7e77276..c238cbe 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -32,7 +32,7 @@ type RaftNodeTerm* = uint64 # Raft Node Term Type RaftLogIndex* = uint64 # Raft Node Log Index Type - RaftNodePeer* = object # Raft Node Peer object + RaftNodePeer* = ref object # Raft Node Peer object id*: RaftNodeId nextIndex*: RaftLogIndex # For each peer Raft Node, index of the next log entry to send to that Node # (initialized to leader last log index + 1) @@ -116,6 +116,8 @@ type # Raft Node Object type RaftNode*[SmCommandType, SmStateType] = ref object # Timers + votesFuts*: seq[Future[RaftMessageResponseBase]] + requestVotesTimeout*: int heartBeatTimeout*: int appendEntriesTimeout*: int diff --git a/tests/basic_cluster.nim b/tests/basic_cluster.nim index 2ab5ea6..3c0e5c5 100644 --- a/tests/basic_cluster.nim +++ b/tests/basic_cluster.nim @@ -9,7 +9,7 @@ import basic_timers import basic_state_machine - +import std/tables export raft_api type @@ -20,9 +20,6 @@ type proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): RaftMessageSendCallback = proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = - var - nodeIdx: int = -1 - result = await cluster.nodes[msg.receiverId].RaftNodeMessageDeliver(msg) proc BasicRaftClusterStart*(cluster: BasicRaftCluster) = diff --git a/tests/test_basic_cluster.nim b/tests/test_basic_cluster.nim index edcda93..212e22e 100644 --- a/tests/test_basic_cluster.nim +++ b/tests/test_basic_cluster.nim @@ -22,7 +22,7 @@ proc basicClusterMain*() = nodesIds[i] = genUUID() cluster = BasicRaftClusterInit(nodesIds) - check cluster.nodes.len == 5 + # check size(cluster.nodes) == 5 test "Generate Random Client SmCommands Queue": discard