Various fixes

This commit is contained in:
Raycho Mukelov 2023-09-04 12:47:27 +03:00
parent f1d558cae5
commit e333d6e7c0
7 changed files with 120 additions and 63 deletions

View File

@ -11,7 +11,7 @@ export Database, CRUD, Collection, Transaction, Cursor, Error, Index, Collatable
type
MDBXStoreRef* = ref object of RootObj
database* {.requiresInit.}: Database
raftNodesData* {.requiresInit.}: Collection
raftNodeData* {.requiresInit.}: Collection
MDBXTransaction* = ref object of RootObj
transaction: CollectionTransaction
@ -62,9 +62,9 @@ proc rollback*(t: MDBXTransaction): ADbTResult[void] =
ok()
proc beginDbTransaction*(db: MDBXStoreRef): ADbTResult[MDBXTransaction] =
if db.raftNodesData != nil:
if db.raftNodeData != nil:
handleEx():
ok(MDBXTransaction(transaction: db.raftNodesData.beginTransaction()))
ok(MDBXTransaction(transaction: db.raftNodeData.beginTransaction()))
else:
err("MDBXStoreRef.raftNodesData is nil")
@ -86,7 +86,7 @@ template checkDbChainsNotNil(db: MDBXStoreRef, body: untyped) =
## Check if db.raftNodesData is not nil and execute the body
## if it is not nil. Otherwise, raise an assert.
##
if db.raftNodesData != nil:
if db.raftNodeData != nil:
body
else:
raiseAssert "MDBXStoreRef.raftNodesData is nil"
@ -99,7 +99,7 @@ template withDbSnapshot*(db: MDBXStoreRef, body: untyped) =
##
checkDbChainsNotNil(db):
handleEx():
let cs {.inject.} = db.raftNodesData.beginSnapshot()
let cs {.inject.} = db.raftNodeData.beginSnapshot()
defer: cs.finish()
body
@ -110,7 +110,7 @@ template withDbTransaction*(db: MDBXStoreRef, body: untyped) =
##
checkDbChainsNotNil(db):
handleEx():
var dbTransaction {.inject.} = db.raftNodesData.beginTransaction()
var dbTransaction {.inject.} = db.raftNodeData.beginTransaction()
defer: dbTransaction.commit()
try:
body
@ -227,5 +227,5 @@ proc init*(
handleEx():
let
db = openDatabase(dataDir, flags=mdbxFlags, maxFileSize=MaxFileSize)
raftNodesData = createCollection(db, "raftNodesData", StringKeys, BlobValues)
ok(T(database: db, raftNodesData: raftNodesData))
raftNodeData = createCollection(db, "raftNodeData", StringKeys, BlobValues)
ok(T(database: db, raftNodeData: raftNodeData))

View File

@ -7,42 +7,64 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import types, protocol, log_ops
{.hint[XDeclaredButNotUsed]: off.}
import types
import protocol
import log_ops
import chronicles
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
node.state = rnsCandidate
node.votedFor = node.id
withLock(node.raftStateMutex):
debug "Raft Node started election. Node ID: ", node_id=node.id
node.currentTerm.inc
node.state = rnsCandidate
node.votedFor = node.id
for peer in node.peers:
peer.hasVoted = false
node.votesFuts.add(node.msgSendCallback(
RaftMessageRequestVote(lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).value.term, lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm)
for peer in node.peers:
peer.hasVoted = false
node.votesFuts.add(node.msgSendCallback(
RaftMessageRequestVote(lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).value.term, lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm)
)
)
)
# Process votes (if any)
for voteFut in node.votesFuts:
discard await voteFut
if voteFut.finished and not voteFut.failed:
for p in node.peers:
debugEcho repr(voteFut)
# if p.id == voteFut.senderId:
# p.hasVoted = voteFut.granted
var
r: RaftMessageRequestVoteResponse
# node.votesFuts.clear
r = RaftMessageRequestVoteResponse(waitFor voteFut)
debug "voteFut.finished", voteFut_finished=voteFut.finished
withLock(node.raftStateMutex):
for p in node.peers:
debug "voteFut: ", Response=repr(r)
debug "senderId: ", sender_id=r.senderId
debug "granted: ", granted=r.granted
if p.id == r.senderId:
p.hasVoted = r.granted
withLock(node.raftStateMutex):
node.votesFuts.clear
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
for fut in node.voteFuts:
cancel(fut)
withLock(node.raftStateMutex):
for fut in node.voteFuts:
if not fut.finished and not fut.failed:
cancel(fut)
proc RaftNodeProcessRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
withLock(node.raftStateMutex):
result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: msg.senderId, receiverId: msg.reciverId, granted: false)
if msg.senderTerm > node.term:
if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(RaftNodeLogIndexGet(node)).term:
result.granted = true
proc RaftNodeProcessAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse =
discard
proc RaftNodeProcessRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): Future[RaftMessageRequestVoteResponse] =
if msg.senderTerm > node.term:
if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(RaftNodeLogIndexGet(node)).term:
# grant vote
discard
proc RaftNodeProcessAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): Future[RaftMessageAppendEntriesResponse] =
proc RaftNodeProcessHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse =
discard
proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool =

View File

@ -7,18 +7,12 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
# #
# **************************** #
# Raft Protocol definition #
# #
# **************************** #
import types
type
# Raft Node Messages OPs
RaftMessageOps* = enum
rmoRequestVote = 0,
rmoAppendLogEntry = 1,
rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes
RaftMessageRespoonseError* = enum # Raft message response errors
rmreSuccess = 0,
rmreFail = 1
@ -59,6 +53,7 @@ type
payload*: Option[SmCommandType] # Optional RaftMessagePayload carrying a Log Entry
RaftNodeClientResponse*[SmStateType] = ref object
nodeId*: RaftNodeId
error*: RaftNodeClientResponseError
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
raftNodeRedirectId*: Option[RaftNodeId] # Optional Raft Node ID to redirect the request to in case of failure

View File

@ -7,21 +7,28 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import chronicles
{.hint[XDeclaredButNotUsed]: off.}
import types
import protocol
import consensus_module
import log_ops
import ../db/kvstore_mdbx
import chronicles
export types, protocol, consensus_module, log_ops
export
types,
protocol,
consensus_module,
log_ops,
chronicles
# Forward declarations
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.}
# Raft Node Public API procedures / functions
# Raft Node Public API
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node
id: RaftNodeId; peersIds: seq[RaftNodeId];
# persistentStorage: RaftNodePersistentStorage,
@ -62,24 +69,27 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
# Deliver Raft Message to the Raft Node and dispatch it
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
# case raftMessage.type
# of RaftMessageAppendEntries: # Dispatch different Raft Message types
# discard
# of RaftMessageRequestVote:
# discard
# else: discard
case raftMessage.op
of rmoRequestVote: # Dispatch different Raft Message types based on the operation code
discard
of rmoAppendLogEntry:
discard
else: discard
discard
# Process RaftNodeClientRequests
proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
case req.op
of rncroExecSmCommand:
# TODO: implemenmt command handling
discard
of rncroRequestSmState:
if RaftNodeIsLeader(node):
return RaftNodeClientResponse(error: rncreSuccess, state: RaftNodeStateGet(node))
else: discard
return RaftNodeClientResponse(nodeId: node.id, error: rncreSuccess, state: RaftNodeStateGet(node))
else:
return RaftNodeClientResponse(nodeId: node.id, error: rncreNotLeader, currentLeaderId: node.currentLeaderId)
else:
raiseAssert "Unknown client request operation."
# Abstract State Machine Ops
func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
@ -117,7 +127,7 @@ proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommand
senderTerm: RaftNodeTermGet(node), commitIndex: node.commitIndex,
prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntry(node, RaftNodeLogIndexGet(node) - 1).term else: 0
)
asyncCheck node.msgSendCallback(msgHrtBt)
asyncSpawn node.msgSendCallback(msgHrtBt)
RaftNodeScheduleHeartBeat(node)
# Raft Node Control

View File

@ -15,7 +15,12 @@ import stew/results
import uuids
import chronos
export results, options, locks, uuids, chronos
export
results,
options,
locks,
uuids,
chronos
const
DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000
@ -71,10 +76,20 @@ type
RaftMessageId* = UUID # UUID assigned to every Raft Node Message,
# so it can be matched with it's corresponding response etc.
RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages
msgId*: RaftMessageId # Message UUID
senderId*: RaftNodeId # Sender Raft Node ID
receiverId*: RaftNodeId # Receiver Raft Node ID
# Raft Node Messages OPs
RaftMessageOps* = enum
rmoRequestVote = 0, # Request Raft Node vote during election.
rmoAppendLogEntry = 1, # Append log entry (when replicating) or represent a Heart-Beat
# if log entries are missing.
rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes to speed up the new nodes
# when they have to catch-up to the currently replicated log.
RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages.
op*: RaftMessageOps # Message op. Used to distinguish between different message types
# and cast the base class to the correct derived class where necessary.
msgId*: RaftMessageId # Message UUID.
senderId*: RaftNodeId # Sender Raft Node ID.
receiverId*: RaftNodeId # Receiver Raft Node ID.
RaftMessageResponseBase* = ref object of RaftMessageBase
@ -82,7 +97,8 @@ type
# out of this Raft Node.
# For later use when adding/removing new nodes (dynamic configuration chganges)
RaftNodeConfiguration* = object
RaftNodeConfiguration* = ref object
peers*: RaftNodePeers
# Raft Node Log definition
LogEntryType* = enum
@ -91,13 +107,13 @@ type
etData = 2,
etNoOp = 3
RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
index*: RaftLogIndex
entryType*: LogEntryType # Type of entry - data to append, configuration or no op etc.
data*: Option[SmCommandType] # Entry data (State Machine Command) - this is mutually exclusive with configuration
# depending on entryType field
configuration*: Option[RaftNodeConfiguration] # Node configuration
configuration*: Option[RaftNodeConfiguration] # Node configuration
RaftNodeLog*[SmCommandType] = object # Needs more elaborate definition.
# Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc.

View File

@ -10,6 +10,8 @@
import basic_timers
import basic_state_machine
import std/tables
import std/random
export raft_api
type
@ -32,10 +34,17 @@ proc BasicRaftClusterGetLeader*(cluster: BasicRaftCluster): UUID =
if RaftNodeIsLeader(node):
return RaftNodeIdGet(node)
proc BasicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClientRequest): RaftNodeClientResponse =
discard
proc BasicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClientRequest): Future[RaftNodeClientResponse] {.async.} =
case req.op:
of rncroRequestSmState:
var
nodeId = cluster.nodesIds[random(cluster.nodesIds.len)]
result =
of rncroExecSmCommand:
discard
proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster =
randomize()
new(result)
for nodeId in nodesIds:
var

View File

@ -9,6 +9,7 @@
import unittest2
import basic_cluster
import std/times
proc basicClusterMain*() =
var
@ -29,6 +30,10 @@ proc basicClusterMain*() =
test "Start Basic Raft Cluster And wait it to converge (Elect a Leader)":
BasicRaftClusterStart(cluster)
var
dur: times.Duration
dur = initDuration(seconds = 5, milliseconds = 100)
waitFor sleepAsync(5000)
test "Simulate Basic Raft Cluster Client SmCommands Execution / Log Replication":
discard