From 6c92519a8771e40042e157d25198261eb16626d7 Mon Sep 17 00:00:00 2001 From: Raycho Mukelov Date: Sun, 17 Sep 2023 03:47:29 +0300 Subject: [PATCH] Refactor the Raft Protocol with variants. Removed python test server. etc. --- raft/async_util.nim | 8 +-- raft/consensus_module.nim | 79 ++++++++++++++---------- raft/protocol.nim | 45 +++++++------- raft/raft_api.nim | 39 +++++++----- raft/types.nim | 16 ++--- tests/basic_cluster.nim | 6 +- tests/raft_test_node_standalone.nim | 51 ++++++++------- tests/tornado_simple_raft_node_server.py | 66 -------------------- 8 files changed, 130 insertions(+), 180 deletions(-) delete mode 100644 tests/tornado_simple_raft_node_server.py diff --git a/raft/async_util.nim b/raft/async_util.nim index 0ed6f26..4ec13fb 100644 --- a/raft/async_util.nim +++ b/raft/async_util.nim @@ -9,7 +9,7 @@ import chronos -template awaitWithTimeout[T](operation: Future[T], +template awaitWithTimeout*[T](operation: Future[T], deadline: Future[void], onTimeout: untyped): T = let f = operation @@ -23,12 +23,12 @@ template awaitWithTimeout[T](operation: Future[T], else: f.read -template awaitWithTimeout[T](operation: Future[T], +template awaitWithTimeout*[T](operation: Future[T], timeout: Duration, onTimeout: untyped): T = awaitWithTimeout(operation, sleepAsync(timeout), onTimeout) -template awaitWithTimeout(operation: Future[void], +template awaitWithTimeout*(operation: Future[void], deadline: Future[void], onTimeout: untyped) = let f = operation @@ -40,7 +40,7 @@ template awaitWithTimeout(operation: Future[void], await cancelAndWait(f) onTimeout -template awaitWithTimeout(operation: Future[void], +template awaitWithTimeout*(operation: Future[void], timeout: Duration, onTimeout: untyped) = awaitWithTimeout(operation, sleepAsync(timeout), onTimeout) \ No newline at end of file diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index 4216d5b..9d6cbd6 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -11,20 +11,21 @@ import types import protocol import log_ops import chronicles +import async_util proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = result = false - withRLock(node.raftStateMutex): - var cnt = 0 - for peer in node.peers: - if peer.hasVoted: - cnt.inc - if cnt >= (node.peers.len div 2 + node.peers.len mod 2): - result = true + var cnt = 0 + for peer in node.peers: + if peer.hasVoted: + cnt.inc + if cnt >= (node.peers.len div 2 + 1): + result = true -proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] = +proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]): + RaftMessageResponse[SmCommandType, SmStateType] = debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm - result = RaftMessageAppendEntriesResponse[SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false) + result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false) withRLock(node.raftStateMutex): if msg.senderTerm >= node.currentTerm: RaftNodeCancelAllTimers(node) @@ -36,9 +37,10 @@ proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmComma node.currentLeaderId = msg.senderId RaftNodeScheduleElectionTimeout(node) -proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse = +proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]): + RaftMessageResponse[SmCommandType, SmStateType] = withRLock(node.raftStateMutex): - result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false) + result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoRequestVote, msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false) if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID: if msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term or (msg.lastLogTerm == RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term and msg.lastLogIndex >= RaftNodeLogIndexGet(node)): @@ -50,49 +52,58 @@ proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCom proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = withRLock(node.raftStateMutex): node.state = rnsFollower - for fut in node.votesFuts: - waitFor cancelAndWait(fut) + # for fut in node.votesFuts: + # waitFor cancelAndWait(fut) proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} = + while node.votesFuts.len > 0: + discard node.votesFuts.pop + mixin RaftNodeScheduleElectionTimeout + RaftNodeScheduleElectionTimeout(node) + withRLock(node.raftStateMutex): node.currentTerm.inc node.state = rnsCandidate node.votedFor = node.id debug "Raft Node started election", node_id=node.id, state=node.state, voted_for=node.votedFor - for peer in node.peers: - peer.hasVoted = false - node.votesFuts.add(node.msgSendCallback( - RaftMessageRequestVote( - op: rmoRequestVote, msgId: genUUID(), senderId: node.id, - receiverId: peer.id, lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term, - lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm) - ) + for peer in node.peers: + peer.hasVoted = false + node.votesFuts.add(node.msgSendCallback( + RaftMessage[SmCommandType, SmStateType]( + op: rmoRequestVote, msgId: genUUID(), senderId: node.id, + receiverId: peer.id, lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term, + lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm) ) + ) # Process votes (if any) for voteFut in node.votesFuts: - let r = await voteFut - let respVote = RaftMessageRequestVoteResponse(r) - debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted + try: + await voteFut or sleepAsync(milliseconds(node.votingTimeout)) + if not voteFut.finished: + await cancelAndWait(voteFut) + else: + if not voteFut.cancelled: + let respVote = RaftMessageResponse[SmCommandType, SmStateType](voteFut.read) + debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted - withRLock(node.raftStateMutex): - for p in node.peers: - if p.id == respVote.senderId: - p.hasVoted = respVote.granted + for p in node.peers: + if p.id == respVote.senderId: + p.hasVoted = respVote.granted + except Exception as e: + discard withRLock(node.raftStateMutex): - while node.votesFuts.len > 0: - discard node.votesFuts.pop - if node.state == rnsCandidate: if RaftNodeQuorumMin(node): - asyncSpawn cancelAndWait(node.electionTimeoutTimer) + await cancelAndWait(node.electionTimeoutTimer) debug "Raft Node transition to leader", node_id=node.id node.state = rnsLeader # Transition to leader state and send Heart-Beat to establish this node as the cluster leader - asyncSpawn RaftNodeSendHeartBeat(node) + RaftNodeSendHeartBeat(node) -proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] = +proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]): + RaftMessageResponse[SmCommandType, SmStateType] = discard proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) = diff --git a/raft/protocol.nim b/raft/protocol.nim index 5a093d4..6d0c438 100644 --- a/raft/protocol.nim +++ b/raft/protocol.nim @@ -13,29 +13,30 @@ import types type - RaftMessageRespoonseError* = enum # Raft message response errors - rmreSuccess = 0, - rmreFail = 1 + RaftMessage*[SmCommandType, SmStateType] = ref object of RaftMessageBase[SmCommandType, SmStateType] + senderTerm*: RaftNodeTerm # Sender Raft Node Term + case op*: RaftMessageOps + of rmoRequestVote: + lastLogTerm*: RaftNodeTerm + lastLogIndex*: RaftLogIndex + of rmoAppendLogEntry: + prevLogIndex*: RaftLogIndex + prevLogTerm*: RaftNodeTerm + commitIndex*: RaftLogIndex + logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat + of rmoInstallSnapshot: + discard - RaftMessageRequestVote* = ref object of RaftMessageBase - lastLogTerm*: RaftNodeTerm - lastLogIndex*: RaftLogIndex - senderTerm*: RaftNodeTerm # Sender Raft Node Term - - RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase - granted*: bool # Is vote granted by the Raft node, from we requested vote? - - RaftMessageAppendEntries*[SmCommandType] = ref object of RaftMessageBase - prevLogIndex*: RaftLogIndex - prevLogTerm*: RaftNodeTerm - commitIndex*: RaftLogIndex - logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat - senderTerm*: RaftNodeTerm # Sender Raft Node Term - - RaftMessageAppendEntriesResponse*[SmStateType] = ref object of RaftMessageResponseBase - success*: bool - lastLogIndex*: RaftLogIndex - state*: Option[SmStateType] # Optional Raft Abstract State Machine State + RaftMessageResponse*[SmCommandType, SmStateType] = ref object of RaftMessageResponseBase[SmCommandType, SmStateType] + case op*: RaftMessageOps + of rmoRequestVote: + granted*: bool # Is vote granted by the Raft node, from we requested vote? + of rmoAppendLogEntry: + success*: bool + lastLogIndex*: RaftLogIndex + state*: Option[SmStateType] # Optional Raft Abstract State Machine State + of rmoInstallSnapshot: + discard # Raft Node Client Request/Response definitions RaftNodeClientRequestOps* = enum diff --git a/raft/raft_api.nim b/raft/raft_api.nim index 6e3898f..ecae4b0 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -31,8 +31,9 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp # persistentStorage: RaftNodePersistentStorage, msgSendCallback: RaftMessageSendCallback; electionTimeout: int=150; - heartBeatTimeout: int=180; - appendEntriesTimeout: int=150 + heartBeatTimeout: int=150; + appendEntriesTimeout: int=50; + votingTimeout: int=50 ): T = var peers: RaftNodePeers @@ -43,7 +44,8 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp result = T( id: id, state: rnsFollower, currentTerm: 0, peers: peers, commitIndex: 0, lastApplied: 0, msgSendCallback: msgSendCallback, votedFor: DefaultUUID, currentLeaderId: DefaultUUID, - electionTimeout: electionTimeout, heartBeatTimeout: heartBeatTimeout, appendEntriesTimeout: appendEntriesTimeout + electionTimeout: electionTimeout, heartBeatTimeout: heartBeatTimeout, appendEntriesTimeout: appendEntriesTimeout, + votingTimeout: votingTimeout ) RaftNodeSmInit(result.stateMachine) @@ -75,20 +77,24 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, result = node.state == rnsLeader # Deliver Raft Message to the Raft Node and dispatch it -proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = - case raftMessage.op - of rmoRequestVote: # Dispatch different Raft Message types based on the operation code - result = RaftNodeHandleRequestVote(node, RaftMessageRequestVote(raftMessage)) +proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase[SmCommandType, SmStateType]): + Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} = + var + rm = RaftMessage[SmCommandType, SmStateType](raftMessage) + + case rm.op # Dispatch different Raft Message types based on the operation code + of rmoRequestVote: + result = RaftNodeHandleRequestVote(node, rm) of rmoAppendLogEntry: - var appendMsg = RaftMessageAppendEntries[SmCommandType](raftMessage) - if appendMsg.logEntries.isSome: - result = RaftNodeHandleAppendEntries(node, appendMsg) + if rm.logEntries.isSome: + result = RaftNodeHandleAppendEntries(node, rm) else: - result = RaftNodeHandleHeartBeat(node, appendMsg) + result = RaftNodeHandleHeartBeat(node, rm) else: discard # Process Raft Node Client Requests -proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} = +proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): + Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} = case req.op of rncroExecSmCommand: # TODO: implemenmt command handling @@ -122,19 +128,18 @@ template RaftTimerCreate(timerInterval: int, timerCallback: RaftTimerCallback): # Timers scheduling stuff etc. proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - node.heartBeatTimer = RaftTimerCreate(node.heartBeatTimeout, proc() = asyncSpawn RaftNodeSendHeartBeat(node)) + node.heartBeatTimer = RaftTimerCreate(node.heartBeatTimeout, proc() = RaftNodeSendHeartBeat(node)) -proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} = +proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = debug "Raft Node sending Heart-Beat to peers", node_id=node.id for raftPeer in node.peers: - let msgHrtBt = RaftMessageAppendEntries[SmCommandType]( + let msgHrtBt = RaftMessage[SmCommandType, SmStateType]( op: rmoAppendLogEntry, senderId: node.id, receiverId: raftPeer.id, senderTerm: RaftNodeTermGet(node), commitIndex: node.commitIndex, prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node) - 1).term else: 0 ) - let r = await node.msgSendCallback(msgHrtBt) + let r = node.msgSendCallback(msgHrtBt) discard r - debug "Sent Heart-Beat", sender=node.id, to=raftPeer.id RaftNodeScheduleHeartBeat(node) proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = diff --git a/raft/types.nim b/raft/types.nim index 6151db3..648cac7 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -84,17 +84,16 @@ type rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes to speed up the new nodes # when they have to catch-up to the currently replicated log. - RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages. - op*: RaftMessageOps # Message op. Used to distinguish between different message types - # and cast the base class to the correct derived class where necessary. + RaftMessageBase*[SmCommandType, SmStateType] = ref object of RootObj # Base Type for Raft Protocol Messages. msgId*: RaftMessageId # Message UUID. senderId*: RaftNodeId # Sender Raft Node ID. receiverId*: RaftNodeId # Receiver Raft Node ID. - RaftMessageResponseBase* = ref object of RaftMessageBase + RaftMessageResponseBase*[SmCommandType, SmStateType] = ref object of RaftMessageBase[SmCommandType, SmStateType] - RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} # Callback for Sending Raft Node Messages - # out of this Raft Node. + # Callback for Sending Raft Node Messages out of this Raft Node. + RaftMessageSendCallback*[SmCommandType, SmStateType] = proc (raftMessage: RaftMessageBase[SmCommandType, SmStateType]): + Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} # For later use when adding/removing new nodes (dynamic configuration chganges) RaftNodeConfiguration* = ref object @@ -123,11 +122,12 @@ type # Raft Node Object type RaftNode*[SmCommandType, SmStateType] = ref object # Timers - votesFuts*: seq[Future[RaftMessageResponseBase]] + votesFuts*: seq[Future[RaftMessageResponseBase[SmCommandType, SmStateType]]] electionTimeout*: int heartBeatTimeout*: int appendEntriesTimeout*: int + votingTimeout*: int heartBeatTimer*: Future[void] electionTimeoutTimer*: Future[void] @@ -137,7 +137,7 @@ type raftStateMutex*: RLock # Misc - msgSendCallback*: RaftMessageSendCallback + msgSendCallback*: RaftMessageSendCallback[SmCommandType, SmStateType] persistentStorage: RaftNodePersistentStorage[SmCommandType, SmStateType] # Persistent state diff --git a/tests/basic_cluster.nim b/tests/basic_cluster.nim index d5165a4..afaafef 100644 --- a/tests/basic_cluster.nim +++ b/tests/basic_cluster.nim @@ -20,8 +20,8 @@ type BasicRaftCluster* = ref object nodes*: Table[RaftNodeId, BasicRaftNode] -proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): RaftMessageSendCallback = - proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = +proc BasicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] = + proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} = result = await cluster.nodes[msg.receiverId].RaftNodeMessageDeliver(msg) proc BasicRaftClusterStart*(cluster: BasicRaftCluster) = @@ -53,5 +53,5 @@ proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster = peersIds = nodesIds peersIds.del(peersIds.find(nodeId)) - result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result), electionTimeout=150, heartBeatTimeout=150) + result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result), electionTimeout=150, heartBeatTimeout=150) diff --git a/tests/raft_test_node_standalone.nim b/tests/raft_test_node_standalone.nim index f19f91b..68a10b6 100644 --- a/tests/raft_test_node_standalone.nim +++ b/tests/raft_test_node_standalone.nim @@ -28,7 +28,7 @@ proc loadConfig(): RaftPeersConfContainer = conf.add(RaftPeerConf(id: parseUUID(n["id"].getStr), host: n["host"].getStr, port: n["port"].getInt)) result = conf -proc RaftPipesRead(node: BasicRaftNode, port: int) = +proc RaftPipesRead[SmCommandType, SmStateType](node: BasicRaftNode, port: int) = var fifoRead = fmt"RAFTNODERECEIVEMSGPIPE{port}" fifoWrite = fmt"RAFTNODESENDMSGRESPPIPE{port}" @@ -37,47 +37,45 @@ proc RaftPipesRead(node: BasicRaftNode, port: int) = var ss = MsgStream.init(frFD.readAll) - xx: RaftMessageBase + xx: RaftMessage[SmCommandType, SmStateType] ss.unpack(xx) #and here too - debug "reqqqq: ", req=repr(xx) + debug "Received Req: ", req=repr(xx) var - r = waitFor RaftNodeMessageDeliver(node, RaftMessageRequestVote(xx)) + r = waitFor RaftNodeMessageDeliver(node, xx) + resp = RaftMessageResponse[SmCommandType, SmStateType](r) rs = MsgStream.init() - rs.pack(r) + rs.pack(resp) fwFD.write(rs.data) -proc TestRaftMessageSendCallbackCreate(conf: RaftPeersConfContainer): RaftMessageSendCallback = - proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = +proc TestRaftMessageSendCallbackCreate[SmCommandType, SmStateType](conf: RaftPeersConfContainer): RaftMessageSendCallback[SmCommandType, SmStateType] = + proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} = var host: string port: int + resp: Response + xx: RaftMessageResponse[SmCommandType, SmStateType] + client = newHttpClient(timeout=50) + m = RaftMessage[SmCommandType, SmStateType](msg) + s = MsgStream.init() # besides MsgStream, you can also use Nim StringStream or FileStream + for c in conf: if c.id == msg.receiverId: host = c.host port = c.port - var - client = newHttpClient() - s = MsgStream.init() # besides MsgStream, you can also use Nim StringStream or FileStream - s.pack(msg) #here the magic happened + s.pack(m) #here the magic happened + debug "Sending Req: ", req=fmt"http://{host}:{port}", data=s.data + resp = client.post(fmt"http://{host}:{port}", s.data) - debug "req: ", req=fmt"http://{host}:{port}", data=s.data - var - resp = client.post(fmt"http://{host}:{port}", s.data) - echo resp.status - - var - ss = MsgStream.init(resp.body) - xx: RaftMessageResponseBase - - ss.unpack(xx) #and here too + s = MsgStream.init(resp.body) + s.unpack(xx) #and here too result = xx -proc main() {.async.} = +proc main() = var conf = loadConfig() var @@ -93,13 +91,14 @@ proc main() {.async.} = peersIds = nodesIds port: int idx = peersIds.find(nodeId) + port = conf[idx].port peersIds.del(idx) + node = BasicRaftNode.new(nodeId, peersIds, TestRaftMessageSendCallbackCreate[SmCommand, SmState](conf)) - node = BasicRaftNode.new(nodeId, peersIds, TestRaftMessageSendCallbackCreate(conf)) RaftNodeStart(node) - spawn RaftPipesRead(node, port) + spawn RaftPipesRead[SmCommand, SmState](node, port) + runForever() if isMainModule: - waitFor main() - runForever() \ No newline at end of file + main() \ No newline at end of file diff --git a/tests/tornado_simple_raft_node_server.py b/tests/tornado_simple_raft_node_server.py deleted file mode 100644 index 2993858..0000000 --- a/tests/tornado_simple_raft_node_server.py +++ /dev/null @@ -1,66 +0,0 @@ -import asyncio -import tornado -import os -import sys -import errno -import msgpack - -writeFIFO = 'RAFTNODERECEIVEMSGPIPE' -readFIFO = 'RAFTNODESENDMSGRESPPIPE' - -def writeFifo(data): - with open(writeFIFO, "ab") as fw: - fw.write(data) - -def readPipe(): - while True: - with open(readFIFO, "rb") as fifo: - data = fifo.read() - # writeFifo(data) - if len(data) == 0: - pass - # print("Writer closed") - else: - print('Read: {0}'.format(data)) - return data - -class MainHandler(tornado.web.RequestHandler): - def post(self): - s = self.request.body - print(s) - writeFifo(s) - r = readPipe() - print(r) - self.write(r) - - def get(self): - self.write("Hello, world") - -def make_app(): - - return tornado.web.Application([ - (r"/", MainHandler), - ]) - -async def main(): - if len(sys.argv) < 2: - print("Usage: tornado_simple_raft_node_server.py ") - return - app = make_app() - app.listen(int(sys.argv[1])) - await asyncio.Event().wait() - -if __name__ == "__main__": - try: - readFIFO = readFIFO + sys.argv[1] - writeFIFO = writeFIFO + sys.argv[1] - os.mkfifo(readFIFO) - os.mkfifo(writeFIFO) - except OSError as oe: - if oe.errno != errno.EEXIST: - raise - - # readPipe() - - asyncio.run(main()) - \ No newline at end of file