Refactor some stuff. Add Key-Value DB support (MDBX)

This commit is contained in:
Raycho Mukelov 2023-09-03 20:52:35 +03:00
parent 2f4095915d
commit f1d558cae5
8 changed files with 58 additions and 57 deletions

View File

@ -11,13 +11,13 @@ export Database, CRUD, Collection, Transaction, Cursor, Error, Index, Collatable
type
MDBXStoreRef* = ref object of RootObj
database* {.requiresInit.}: Database
chains* {.requiresInit.}: Collection
raftNodesData* {.requiresInit.}: Collection
MDBXTransaction* = ref object of RootObj
transaction: CollectionTransaction
const
MaxFileSize = 1024 * 1024 * 1024 * 1024 # 1 TB (MDBX default is 400 MB)
MaxFileSize = 1024 * 1024 * 400 #r: 100MB (MDBX default is 400 MB)
# ----------------------------------------------------------------------------------------
# MDBX exception handling helper templates
@ -62,11 +62,11 @@ proc rollback*(t: MDBXTransaction): ADbTResult[void] =
ok()
proc beginDbTransaction*(db: MDBXStoreRef): ADbTResult[MDBXTransaction] =
if db.chains != nil:
if db.raftNodesData != nil:
handleEx():
ok(MDBXTransaction(transaction: db.chains.beginTransaction()))
ok(MDBXTransaction(transaction: db.raftNodesData.beginTransaction()))
else:
err("MDBXStoreRef.chains is nil")
err("MDBXStoreRef.raftNodesData is nil")
proc put*(t: MDBXTransaction, key, value: openArray[byte]): ADbTResult[void] =
handleExEx():
@ -83,13 +83,13 @@ proc del*(t: MDBXTransaction, key: openArray[byte]): ADbTResult[void] =
# ----------------------------------------------------------------------------------------
template checkDbChainsNotNil(db: MDBXStoreRef, body: untyped) =
## Check if db.chains is not nil and execute the body
## Check if db.raftNodesData is not nil and execute the body
## if it is not nil. Otherwise, raise an assert.
##
if db.chains != nil:
if db.raftNodesData != nil:
body
else:
raiseAssert "MDBXStoreRef.chains is nil"
raiseAssert "MDBXStoreRef.raftNodesData is nil"
template withDbSnapshot*(db: MDBXStoreRef, body: untyped) =
## Create an MDBX snapshot and execute the body providing
@ -99,7 +99,7 @@ template withDbSnapshot*(db: MDBXStoreRef, body: untyped) =
##
checkDbChainsNotNil(db):
handleEx():
let cs {.inject.} = db.chains.beginSnapshot()
let cs {.inject.} = db.raftNodesData.beginSnapshot()
defer: cs.finish()
body
@ -110,7 +110,7 @@ template withDbTransaction*(db: MDBXStoreRef, body: untyped) =
##
checkDbChainsNotNil(db):
handleEx():
var dbTransaction {.inject.} = db.chains.beginTransaction()
var dbTransaction {.inject.} = db.raftNodesData.beginTransaction()
defer: dbTransaction.commit()
try:
body
@ -127,8 +127,11 @@ template withDbTransaction*(db: MDBXStoreRef, body: untyped) =
# ------------------------------------------------------------------------------------------
# MDBX KvStore interface implementation
# ------------------------------------------------------------------------------------------
type
DataProc = proc(data: seq[byte])
FindProc = proc(data: seq[byte])
proc get*(db: MDBXStoreRef, key: openArray[byte], onData: kvstore.DataProc): Result[bool] =
proc get*(db: MDBXStoreRef, key: openArray[byte], onData: DataProc): Result[bool, string] =
if key.len <= 0:
return err("mdbx: key cannot be empty on get")
@ -140,10 +143,10 @@ proc get*(db: MDBXStoreRef, key: openArray[byte], onData: kvstore.DataProc): Res
else:
return ok(false)
proc find*(db: MDBXStoreRef, prefix: openArray[byte], onFind: kvstore.KeyValueProc): Result[int] =
proc find*(db: MDBXStoreRef, prefix: openArray[byte], onFind: FindProc): Result[int, string] =
raiseAssert "Unimplemented"
proc put*(db: MDBXStoreRef, key, value: openArray[byte]): Result[void] =
proc put*(db: MDBXStoreRef, key, value: openArray[byte]): Result[void, string] =
if key.len <= 0:
return err("mdbx: key cannot be empty on get")
@ -151,7 +154,7 @@ proc put*(db: MDBXStoreRef, key, value: openArray[byte]): Result[void] =
dbTransaction.put(asData(key), asData(value))
ok()
proc contains*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] =
proc contains*(db: MDBXStoreRef, key: openArray[byte]): Result[bool, string] =
if key.len <= 0:
return err("mdbx: key cannot be empty on get")
@ -162,7 +165,7 @@ proc contains*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] =
else:
return ok(false)
proc del*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] =
proc del*(db: MDBXStoreRef, key: openArray[byte]): Result[bool, string] =
if key.len <= 0:
return err("mdbx: key cannot be empty on del")
@ -174,7 +177,7 @@ proc del*(db: MDBXStoreRef, key: openArray[byte]): Result[bool] =
else:
return ok(false)
proc clear*(db: MDBXStoreRef): Result[bool] =
proc clear*(db: MDBXStoreRef): Result[bool, string] =
raiseAssert "Unimplemented"
proc close*(db: MDBXStoreRef) =
@ -189,21 +192,21 @@ proc close*(db: MDBXStoreRef) =
# .End. MDBX KvStore interface implementation
# ------------------------------------------------------------------------------------------
proc bulkPutSortedData*[KT: ByteArray32 | ByteArray33](db: MDBXStoreRef, keys: openArray[KT], vals: openArray[seq[byte]]): KvResult[int64] =
if keys.len <= 0:
return err("mdbx: keys cannot be empty on bulkPutSortedData")
# proc bulkPutSortedData*[KT: ByteArray32 | ByteArray33](db: MDBXStoreRef, keys: openArray[KT], vals: openArray[seq[byte]]): Result[int64] =
# if keys.len <= 0:
# return err("mdbx: keys cannot be empty on bulkPutSortedData")
if keys.len != vals.len:
return err("mdbx: keys and vals must have the same length")
# if keys.len != vals.len:
# return err("mdbx: keys and vals must have the same length")
withDbTransaction(db):
for i in 0 ..< keys.len:
dbTransaction.put(asData(keys[i]), asData(vals[i]))
return ok(0)
# withDbTransaction(db):
# for i in 0 ..< keys.len:
# dbTransaction.put(asData(keys[i]), asData(vals[i]))
# return ok(0)
proc init*(
T: type MDBXStoreRef, basePath: string, name: string,
readOnly = false): KvResult[T] =
readOnly = false): Result[T, string] =
let
dataDir = basePath / name / "data"
backupsDir = basePath / name / "backups" # Do we need that in case of MDBX? Should discuss this with @zah
@ -224,5 +227,5 @@ proc init*(
handleEx():
let
db = openDatabase(dataDir, flags=mdbxFlags, maxFileSize=MaxFileSize)
chains = createCollection(db, "chains", StringKeys, BlobValues)
ok(T(database: db, chains: chains))
raftNodesData = createCollection(db, "raftNodesData", StringKeys, BlobValues)
ok(T(database: db, raftNodesData: raftNodesData))

View File

@ -6,8 +6,9 @@
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import asyncdispatch
import std/time
import std/times
template awaitWithTimeout[T](operation: Future[T],
deadline: Future[void],

View File

@ -9,34 +9,31 @@
import types, protocol, log_ops
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
var
votesFuts: seq[Future[void]]
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
node.state = rnsCandidate
for p in node.peers:
p.votedFor = DefaultUUID
node.votedFor = node.id
for peer in node.peers:
votesFuts.add(node.msgSendCallback(
RaftMessageRequestVote(lastLogTerm: RaftNodeLogEntry(RaftNodeLogIndexGet(node)).term, lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm)
peer.hasVoted = false
node.votesFuts.add(node.msgSendCallback(
RaftMessageRequestVote(lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).value.term, lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm)
)
)
# Process votes
for voteFut in votesFuts:
await voteFut
# Process votes (if any)
for voteFut in node.votesFuts:
discard await voteFut
if voteFut.finished and not voteFut.failed:
for p in node.peers:
if p.id == voteFut.senderId:
if voteFut.granted:
p.votedFor = node.id
else:
if voteFut.votedFor.initialized:
p.votedFor = voteFut.votedFor
debugEcho repr(voteFut)
# if p.id == voteFut.senderId:
# p.hasVoted = voteFut.granted
# node.votesFuts.clear
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
for fut in node.voteFuts:
cancel(fut)
discard
proc RaftNodeProcessRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): Future[RaftMessageRequestVoteResponse] =

View File

@ -29,8 +29,7 @@ type
senderTerm*: RaftNodeTerm # Sender Raft Node Term
RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase
granted*: bool # Is vote granted?
votedFor*: Option[RaftNodeId] # Present if vote is not granted
granted*: bool # Is vote granted by the Raft node, from we requested vote?
RaftMessageAppendEntries*[SmCommandType] = ref object of RaftMessageBase
prevLogIndex*: RaftLogIndex

View File

@ -13,6 +13,7 @@ import types
import protocol
import consensus_module
import log_ops
import ../db/kvstore_mdbx
export types, protocol, consensus_module, log_ops
@ -106,7 +107,8 @@ proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNod
node.heartBeatTimeoutTimer = sleepAsync(node.heartBeatTimeout)
await node.heartBeatTimeoutTimer
node.state = rnsCandidate # Transition to candidate state and initiate new Election
RaftNodeStartElection(node)
var f = RaftNodeStartElection(node)
cancel(f)
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
for raftPeer in node.peers:
@ -129,9 +131,9 @@ proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmS
# Try to stop gracefully
node.state = rnsStopped
# Cancel pending timers (if any)
RaftNodeCancelAllTimers(node)
var f = RaftNodeCancelAllTimers(node)
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
node.state = rnsFollower
asyncSpawn RaftNodeScheduleHeartBeatTimeout(node)
debugEcho "StartNode: ", node.id
debug "Start Raft Node with ID: ", nodeid=node.id

View File

@ -32,7 +32,7 @@ type
RaftNodeTerm* = uint64 # Raft Node Term Type
RaftLogIndex* = uint64 # Raft Node Log Index Type
RaftNodePeer* = object # Raft Node Peer object
RaftNodePeer* = ref object # Raft Node Peer object
id*: RaftNodeId
nextIndex*: RaftLogIndex # For each peer Raft Node, index of the next log entry to send to that Node
# (initialized to leader last log index + 1)
@ -116,6 +116,8 @@ type
# Raft Node Object type
RaftNode*[SmCommandType, SmStateType] = ref object
# Timers
votesFuts*: seq[Future[RaftMessageResponseBase]]
requestVotesTimeout*: int
heartBeatTimeout*: int
appendEntriesTimeout*: int

View File

@ -9,7 +9,7 @@
import basic_timers
import basic_state_machine
import std/tables
export raft_api
type
@ -20,9 +20,6 @@ type
proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): RaftMessageSendCallback =
proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
var
nodeIdx: int = -1
result = await cluster.nodes[msg.receiverId].RaftNodeMessageDeliver(msg)
proc BasicRaftClusterStart*(cluster: BasicRaftCluster) =

View File

@ -22,7 +22,7 @@ proc basicClusterMain*() =
nodesIds[i] = genUUID()
cluster = BasicRaftClusterInit(nodesIds)
check cluster.nodes.len == 5
# check size(cluster.nodes) == 5
test "Generate Random Client SmCommands Queue":
discard