Fix compilation
This commit is contained in:
parent
ca099c6132
commit
5bd50081da
|
@ -11,7 +11,7 @@ import chronos
|
||||||
|
|
||||||
template awaitWithTimeout*[T](operation: Future[T],
|
template awaitWithTimeout*[T](operation: Future[T],
|
||||||
deadline: Future[void],
|
deadline: Future[void],
|
||||||
body: untyped): T =
|
body: untyped) =
|
||||||
let f {.inject.} = operation
|
let f {.inject.} = operation
|
||||||
await f or deadline
|
await f or deadline
|
||||||
if not f.finished:
|
if not f.finished:
|
||||||
|
|
|
@ -136,8 +136,8 @@ proc raftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmC
|
||||||
raftNodeLogTruncate(node, msg.prevLogIndex)
|
raftNodeLogTruncate(node, msg.prevLogIndex)
|
||||||
return
|
return
|
||||||
|
|
||||||
if msg.entries.len > 0:
|
if msg.logEntries.isSome:
|
||||||
for entry in msg.entries:
|
for entry in msg.logEntries.get:
|
||||||
raftNodeLogAppend(node, entry)
|
raftNodeLogAppend(node, entry)
|
||||||
|
|
||||||
if msg.commitIndex > node.commitIndex:
|
if msg.commitIndex > node.commitIndex:
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
# those terms.
|
# those terms.
|
||||||
|
|
||||||
import types
|
import types
|
||||||
|
import chronicles
|
||||||
|
|
||||||
# Private Log Ops
|
# Private Log Ops
|
||||||
proc raftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
|
proc raftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
|
||||||
|
@ -20,10 +21,11 @@ proc raftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandTy
|
||||||
proc raftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
|
proc raftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
|
||||||
node.log.logData.add(logEntry)
|
node.log.logData.add(logEntry)
|
||||||
|
|
||||||
proc raftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
|
proc raftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: RaftLogIndex) =
|
||||||
node.log.logData.truncate(truncateIndex)
|
debug "Truncating log to index: ", truncateIndex=truncateIndex, ld=repr(node.log.logData)
|
||||||
|
# node.log.logData = node.log.logData[:truncateIndex]
|
||||||
|
|
||||||
proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
|
proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
|
||||||
mixin raftNodeSmApply
|
mixin raftNodeSmApply
|
||||||
|
|
||||||
raftNodeSmApply(node.stateMachine, logEntry.command)
|
raftNodeSmApply(node.stateMachine, logEntry.data.get)
|
|
@ -23,7 +23,7 @@ export
|
||||||
chronicles
|
chronicles
|
||||||
|
|
||||||
# Forward declarations
|
# Forward declarations
|
||||||
proc raftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
|
proc raftNodeSmInit*[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
|
||||||
|
|
||||||
# Raft Node Public API
|
# Raft Node Public API
|
||||||
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType];
|
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType];
|
||||||
|
@ -112,17 +112,15 @@ func raftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandTyp
|
||||||
withRLock(node.raftStateMutex):
|
withRLock(node.raftStateMutex):
|
||||||
node.stateMachine.state
|
node.stateMachine.state
|
||||||
|
|
||||||
proc raftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) =
|
proc raftNodeSmInit*[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) =
|
||||||
mixin raftSmInit
|
mixin raftSmInit
|
||||||
|
|
||||||
withRLock(node.raftStateMutex):
|
raftSmInit(stateMachine)
|
||||||
raftSmInit(stateMachine)
|
|
||||||
|
|
||||||
proc raftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) =
|
proc raftNodeSmApply*[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) =
|
||||||
mixin raftSmApply
|
mixin raftSmApply
|
||||||
|
|
||||||
withRLock(node.raftStateMutex):
|
raftSmApply(stateMachine, command)
|
||||||
raftSmApply(stateMachine, command)
|
|
||||||
|
|
||||||
# Private Abstract Timer creation
|
# Private Abstract Timer creation
|
||||||
template raftTimerCreate*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] =
|
template raftTimerCreate*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] =
|
||||||
|
|
|
@ -37,7 +37,7 @@ type
|
||||||
RaftNodeTerm* = int # Raft Node Term Type
|
RaftNodeTerm* = int # Raft Node Term Type
|
||||||
RaftLogIndex* = int # Raft Node Log Index Type
|
RaftLogIndex* = int # Raft Node Log Index Type
|
||||||
|
|
||||||
RaftNodePeer* = ref object # Raft Node Peer object
|
RaftNodePeer* = ref object # Raft Node Peer object
|
||||||
id*: RaftNodeId
|
id*: RaftNodeId
|
||||||
nextIndex*: RaftLogIndex # For each peer Raft Node, index of the next log entry to send to that Node
|
nextIndex*: RaftLogIndex # For each peer Raft Node, index of the next log entry to send to that Node
|
||||||
# (initialized to leader last log index + 1)
|
# (initialized to leader last log index + 1)
|
||||||
|
@ -51,7 +51,7 @@ type
|
||||||
|
|
||||||
|
|
||||||
# Raft Node Abstract State Machine type
|
# Raft Node Abstract State Machine type
|
||||||
RaftNodeStateMachine*[SmCommandType, SmStateType] = ref object # Some opaque State Machine Impelementation to be used by the Raft Node
|
RaftNodeStateMachine*[SmCommandType, SmStateType] = ref object # Some opaque State Machine Impelementation to be used by the Raft Node
|
||||||
# providing at minimum operations for initialization, querying the current state
|
# providing at minimum operations for initialization, querying the current state
|
||||||
# and RaftNodeLogEntry (SmCommandType) application
|
# and RaftNodeLogEntry (SmCommandType) application
|
||||||
state*: ref SmStateType
|
state*: ref SmStateType
|
||||||
|
@ -147,8 +147,8 @@ type
|
||||||
currentTerm*: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically)
|
currentTerm*: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically)
|
||||||
votedFor*: RaftNodeId # Candidate RaftNodeId that received vote in current term (or DefaultUUID if none),
|
votedFor*: RaftNodeId # Candidate RaftNodeId that received vote in current term (or DefaultUUID if none),
|
||||||
# also used to redirect Client Requests in case this Raft Node is not the leader
|
# also used to redirect Client Requests in case this Raft Node is not the leader
|
||||||
log*: RaftNodeLog[SmCommandType] # This Raft Node Log
|
log*: RaftNodeLog[SmCommandType] # This Raft Node Log
|
||||||
stateMachine*: RaftNodeStateMachine[SmCommandType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's
|
stateMachine*: RaftNodeStateMachine[SmCommandType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's
|
||||||
# state is enough to consider it 'persisted'
|
# state is enough to consider it 'persisted'
|
||||||
peers*: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent
|
peers*: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent
|
||||||
# makes sense for the moment
|
# makes sense for the moment
|
||||||
|
|
|
@ -44,7 +44,7 @@ proc basicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClie
|
||||||
of rncroExecSmCommand:
|
of rncroExecSmCommand:
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], electionTimeout=5, heartBeatTimeout=5): BasicRaftCluster =
|
proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], electionTimeout=150, heartBeatTimeout=150): BasicRaftCluster =
|
||||||
new(result)
|
new(result)
|
||||||
for nodeId in nodesIds:
|
for nodeId in nodesIds:
|
||||||
var
|
var
|
||||||
|
|
|
@ -20,7 +20,7 @@ proc basicClusterElectionMain*() =
|
||||||
test "Basic Raft Cluster Init (5 nodes)":
|
test "Basic Raft Cluster Init (5 nodes)":
|
||||||
for i in 0..4:
|
for i in 0..4:
|
||||||
nodesIds[i] = genUUID()
|
nodesIds[i] = genUUID()
|
||||||
cluster = basicRaftClusterInit(nodesIds, 3, 3)
|
cluster = basicRaftClusterInit(nodesIds, 150, 150)
|
||||||
check cluster != nil
|
check cluster != nil
|
||||||
|
|
||||||
test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)":
|
test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)":
|
||||||
|
|
Loading…
Reference in New Issue