diff --git a/pipe.sh b/pipe.sh index c807807..3850351 100755 --- a/pipe.sh +++ b/pipe.sh @@ -3,6 +3,6 @@ x='' while [ t ]; do - $(`x < RAFTNODESENDMSGRESPPIPE`); + $(`$x < RAFTNODESENDMSGRESPPIPE`); echo "$($x)" > RAFTNODERECEIVEMSGPIPE; done diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index acd9b36..c834e5e 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -19,7 +19,7 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, for peer in node.peers: if peer.hasVoted: cnt.inc - if cnt >= (node.peers.len div 2 + 1): + if cnt >= (node.peers.len div 2 + node.peers.len mod 2): result = true proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] = @@ -40,11 +40,11 @@ proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCom withRLock(node.raftStateMutex): result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false) if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID: - # if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term: - asyncSpawn cancelAndWait(node.electionTimeoutTimer) - node.votedFor = msg.senderId - result.granted = true - RaftNodeScheduleElectionTimeout(node) + if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term: + asyncSpawn cancelAndWait(node.electionTimeoutTimer) + node.votedFor = msg.senderId + result.granted = true + RaftNodeScheduleElectionTimeout(node) proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = withRLock(node.raftStateMutex): diff --git a/raft/log_ops.nim b/raft/log_ops.nim index 933a369..5a42e00 100644 --- a/raft/log_ops.nim +++ b/raft/log_ops.nim @@ -11,10 +11,11 @@ import types # Private Log Ops proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex = - discard + len(node.log.logData) proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): RaftNodeLogEntry[SmCommandType] = - discard + if logIndex > 0: + result = node.log.logData[logIndex] proc RaftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = discard diff --git a/raft/raft_api.nim b/raft/raft_api.nim index 905e31a..6e3898f 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -26,14 +26,14 @@ export proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) # Raft Node Public API -proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node +proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; id: RaftNodeId; peersIds: seq[RaftNodeId]; # persistentStorage: RaftNodePersistentStorage, - msgSendCallback: RaftMessageSendCallback, - electionTimeout: int=150, - heartBeatTimeout: int=180, - appendEntriesTimeout: int=150) -): T = + msgSendCallback: RaftMessageSendCallback; + electionTimeout: int=150; + heartBeatTimeout: int=180; + appendEntriesTimeout: int=150 + ): T = var peers: RaftNodePeers diff --git a/raft/types.nim b/raft/types.nim index 769fce8..6151db3 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -34,8 +34,8 @@ type rnsStopped = 4 RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node - RaftNodeTerm* = uint64 # Raft Node Term Type - RaftLogIndex* = uint64 # Raft Node Log Index Type + RaftNodeTerm* = int # Raft Node Term Type + RaftLogIndex* = int # Raft Node Log Index Type RaftNodePeer* = ref object # Raft Node Peer object id*: RaftNodeId @@ -109,7 +109,6 @@ type RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.) term*: RaftNodeTerm - index*: RaftLogIndex entryType*: LogEntryType # Type of entry - data to append, configuration or no op etc. data*: Option[SmCommandType] # Entry data (State Machine Command) - this is mutually exclusive with configuration # depending on entryType field @@ -126,7 +125,7 @@ type # Timers votesFuts*: seq[Future[RaftMessageResponseBase]] - requestVotesTimeout*: int + electionTimeout*: int heartBeatTimeout*: int appendEntriesTimeout*: int @@ -147,7 +146,7 @@ type currentTerm*: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically) votedFor*: RaftNodeId # Candidate RaftNodeId that received vote in current term (or DefaultUUID if none), # also used to redirect Client Requests in case this Raft Node is not the leader - log: RaftNodeLog[SmCommandType] # This Raft Node Log + log*: RaftNodeLog[SmCommandType] # This Raft Node Log stateMachine*: RaftNodeStateMachine[SmCommandType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's # state is enough to consider it 'persisted' peers*: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent diff --git a/tests/basic_cluster.nim b/tests/basic_cluster.nim index 83410c2..029fbe0 100644 --- a/tests/basic_cluster.nim +++ b/tests/basic_cluster.nim @@ -53,5 +53,5 @@ proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster = peersIds = nodesIds peersIds.del(peersIds.find(nodeId)) - result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result)) + result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result), electionTimeout=50, heartBeatTimeout=50) diff --git a/tests/raft_test_node_standalone.nim b/tests/raft_test_node_standalone.nim index 87d314a..e1f6371 100644 --- a/tests/raft_test_node_standalone.nim +++ b/tests/raft_test_node_standalone.nim @@ -1,17 +1,19 @@ -import ../raft import basic_state_machine +import basic_cluster + import std/json -import uuids -import chronicles import msgpack4nim +import strutils +import std/strformat +import httpclient type - RaftPeerConf = object - id: UUID - host: string - port: int + RaftPeerConf* = object + id*: UUID + host*: string + port*: int - RaftPeersConfContainer = seq[RaftPeerConf] + RaftPeersConfContainer* = seq[RaftPeerConf] var conf: RaftPeersConfContainer @@ -21,9 +23,49 @@ proc loadConfig() = # read and parse file let jsConf = parseFile(jsonFile) for n in jsConf["raftPeers"]: - conf.add(RaftPeerConf(id: parseUUID(n["id"].astToStr), host: n["host"].astToStr, port: int(n["port"].astToStr))) + conf.add(RaftPeerConf(id: parseUUID(n["id"].astToStr), host: n["host"].astToStr, port: parseInt(n["port"].astToStr))) + debug "Conf", conf=conf info "Conf", config=repr(conf) +proc TestRaftMessageSendCallbackCreate(): RaftMessageSendCallback = + proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = + var + host: string + port: int + for c in conf: + if c.id == msg.receiverId: + host = c.host + port = c.port + var + client = newAsyncHttpClient() + s = MsgStream.init() # besides MsgStream, you can also use Nim StringStream or FileStream + + s.pack(msg) #here the magic happened + + var + resp = await client.post(fmt"http://host:port", s.data) + + echo resp.status + var + ss = MsgStream.init(resp.body) + xx: RaftMessageResponseBase + ss.unpack(xx) #and here too + result = xx + if isMainModule: loadConfig() - var node = RaftNode[SmCommand, SmState].new() \ No newline at end of file + var + nodesIds: seq[UUID] + node: BasicRaftNode + + for c in conf: + debug "single conf", single_conf=c + nodesIds.add(c.id) + + var + nodeId = parseUUID("f9695ea4-4f37-11ee-8e75-8ff5a48faa42") + peersIds = nodesIds + + peersIds.del(peersIds.find(nodeId)) + + node = BasicRaftNode.new(nodeId, peersIds, TestRaftMessageSendCallbackCreate()) \ No newline at end of file diff --git a/tests/tornado_simple_raft_node_server.py b/tests/tornado_simple_raft_node_server.py index 10068b4..6a7d195 100644 --- a/tests/tornado_simple_raft_node_server.py +++ b/tests/tornado_simple_raft_node_server.py @@ -15,6 +15,7 @@ def writeFifo(data): def readPipe(): with open(readFIFO, "r") as fifo: data = fifo.read() + # writeFifo(data) if len(data) == 0: pass # print("Writer closed") @@ -55,5 +56,7 @@ if __name__ == "__main__": if oe.errno != errno.EEXIST: raise + readPipe() + asyncio.run(main()) \ No newline at end of file