Refactor the Raft Protocol with variants. Removed python test server. etc.
This commit is contained in:
parent
ae298b0af2
commit
6c92519a87
|
@ -9,7 +9,7 @@
|
|||
|
||||
import chronos
|
||||
|
||||
template awaitWithTimeout[T](operation: Future[T],
|
||||
template awaitWithTimeout*[T](operation: Future[T],
|
||||
deadline: Future[void],
|
||||
onTimeout: untyped): T =
|
||||
let f = operation
|
||||
|
@ -23,12 +23,12 @@ template awaitWithTimeout[T](operation: Future[T],
|
|||
else:
|
||||
f.read
|
||||
|
||||
template awaitWithTimeout[T](operation: Future[T],
|
||||
template awaitWithTimeout*[T](operation: Future[T],
|
||||
timeout: Duration,
|
||||
onTimeout: untyped): T =
|
||||
awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)
|
||||
|
||||
template awaitWithTimeout(operation: Future[void],
|
||||
template awaitWithTimeout*(operation: Future[void],
|
||||
deadline: Future[void],
|
||||
onTimeout: untyped) =
|
||||
let f = operation
|
||||
|
@ -40,7 +40,7 @@ template awaitWithTimeout(operation: Future[void],
|
|||
await cancelAndWait(f)
|
||||
onTimeout
|
||||
|
||||
template awaitWithTimeout(operation: Future[void],
|
||||
template awaitWithTimeout*(operation: Future[void],
|
||||
timeout: Duration,
|
||||
onTimeout: untyped) =
|
||||
awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)
|
|
@ -11,20 +11,21 @@ import types
|
|||
import protocol
|
||||
import log_ops
|
||||
import chronicles
|
||||
import async_util
|
||||
|
||||
proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool =
|
||||
result = false
|
||||
withRLock(node.raftStateMutex):
|
||||
var cnt = 0
|
||||
for peer in node.peers:
|
||||
if peer.hasVoted:
|
||||
cnt.inc
|
||||
if cnt >= (node.peers.len div 2 + node.peers.len mod 2):
|
||||
result = true
|
||||
var cnt = 0
|
||||
for peer in node.peers:
|
||||
if peer.hasVoted:
|
||||
cnt.inc
|
||||
if cnt >= (node.peers.len div 2 + 1):
|
||||
result = true
|
||||
|
||||
proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
|
||||
proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
|
||||
RaftMessageResponse[SmCommandType, SmStateType] =
|
||||
debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm
|
||||
result = RaftMessageAppendEntriesResponse[SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
|
||||
result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
|
||||
withRLock(node.raftStateMutex):
|
||||
if msg.senderTerm >= node.currentTerm:
|
||||
RaftNodeCancelAllTimers(node)
|
||||
|
@ -36,9 +37,10 @@ proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmComma
|
|||
node.currentLeaderId = msg.senderId
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
|
||||
proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
|
||||
RaftMessageResponse[SmCommandType, SmStateType] =
|
||||
withRLock(node.raftStateMutex):
|
||||
result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
|
||||
result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoRequestVote, msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
|
||||
if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID:
|
||||
if msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term or
|
||||
(msg.lastLogTerm == RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term and msg.lastLogIndex >= RaftNodeLogIndexGet(node)):
|
||||
|
@ -50,49 +52,58 @@ proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCom
|
|||
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
withRLock(node.raftStateMutex):
|
||||
node.state = rnsFollower
|
||||
for fut in node.votesFuts:
|
||||
waitFor cancelAndWait(fut)
|
||||
# for fut in node.votesFuts:
|
||||
# waitFor cancelAndWait(fut)
|
||||
|
||||
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
|
||||
while node.votesFuts.len > 0:
|
||||
discard node.votesFuts.pop
|
||||
mixin RaftNodeScheduleElectionTimeout
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
withRLock(node.raftStateMutex):
|
||||
node.currentTerm.inc
|
||||
node.state = rnsCandidate
|
||||
node.votedFor = node.id
|
||||
debug "Raft Node started election", node_id=node.id, state=node.state, voted_for=node.votedFor
|
||||
|
||||
for peer in node.peers:
|
||||
peer.hasVoted = false
|
||||
node.votesFuts.add(node.msgSendCallback(
|
||||
RaftMessageRequestVote(
|
||||
op: rmoRequestVote, msgId: genUUID(), senderId: node.id,
|
||||
receiverId: peer.id, lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term,
|
||||
lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm)
|
||||
)
|
||||
for peer in node.peers:
|
||||
peer.hasVoted = false
|
||||
node.votesFuts.add(node.msgSendCallback(
|
||||
RaftMessage[SmCommandType, SmStateType](
|
||||
op: rmoRequestVote, msgId: genUUID(), senderId: node.id,
|
||||
receiverId: peer.id, lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term,
|
||||
lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm)
|
||||
)
|
||||
)
|
||||
|
||||
# Process votes (if any)
|
||||
for voteFut in node.votesFuts:
|
||||
let r = await voteFut
|
||||
let respVote = RaftMessageRequestVoteResponse(r)
|
||||
debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted
|
||||
try:
|
||||
await voteFut or sleepAsync(milliseconds(node.votingTimeout))
|
||||
if not voteFut.finished:
|
||||
await cancelAndWait(voteFut)
|
||||
else:
|
||||
if not voteFut.cancelled:
|
||||
let respVote = RaftMessageResponse[SmCommandType, SmStateType](voteFut.read)
|
||||
debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted
|
||||
|
||||
withRLock(node.raftStateMutex):
|
||||
for p in node.peers:
|
||||
if p.id == respVote.senderId:
|
||||
p.hasVoted = respVote.granted
|
||||
for p in node.peers:
|
||||
if p.id == respVote.senderId:
|
||||
p.hasVoted = respVote.granted
|
||||
except Exception as e:
|
||||
discard
|
||||
|
||||
withRLock(node.raftStateMutex):
|
||||
while node.votesFuts.len > 0:
|
||||
discard node.votesFuts.pop
|
||||
|
||||
if node.state == rnsCandidate:
|
||||
if RaftNodeQuorumMin(node):
|
||||
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
|
||||
await cancelAndWait(node.electionTimeoutTimer)
|
||||
debug "Raft Node transition to leader", node_id=node.id
|
||||
node.state = rnsLeader # Transition to leader state and send Heart-Beat to establish this node as the cluster leader
|
||||
asyncSpawn RaftNodeSendHeartBeat(node)
|
||||
RaftNodeSendHeartBeat(node)
|
||||
|
||||
proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
|
||||
proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
|
||||
RaftMessageResponse[SmCommandType, SmStateType] =
|
||||
discard
|
||||
|
||||
proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) =
|
||||
|
|
|
@ -13,29 +13,30 @@
|
|||
import types
|
||||
|
||||
type
|
||||
RaftMessageRespoonseError* = enum # Raft message response errors
|
||||
rmreSuccess = 0,
|
||||
rmreFail = 1
|
||||
RaftMessage*[SmCommandType, SmStateType] = ref object of RaftMessageBase[SmCommandType, SmStateType]
|
||||
senderTerm*: RaftNodeTerm # Sender Raft Node Term
|
||||
case op*: RaftMessageOps
|
||||
of rmoRequestVote:
|
||||
lastLogTerm*: RaftNodeTerm
|
||||
lastLogIndex*: RaftLogIndex
|
||||
of rmoAppendLogEntry:
|
||||
prevLogIndex*: RaftLogIndex
|
||||
prevLogTerm*: RaftNodeTerm
|
||||
commitIndex*: RaftLogIndex
|
||||
logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat
|
||||
of rmoInstallSnapshot:
|
||||
discard
|
||||
|
||||
RaftMessageRequestVote* = ref object of RaftMessageBase
|
||||
lastLogTerm*: RaftNodeTerm
|
||||
lastLogIndex*: RaftLogIndex
|
||||
senderTerm*: RaftNodeTerm # Sender Raft Node Term
|
||||
|
||||
RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase
|
||||
granted*: bool # Is vote granted by the Raft node, from we requested vote?
|
||||
|
||||
RaftMessageAppendEntries*[SmCommandType] = ref object of RaftMessageBase
|
||||
prevLogIndex*: RaftLogIndex
|
||||
prevLogTerm*: RaftNodeTerm
|
||||
commitIndex*: RaftLogIndex
|
||||
logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat
|
||||
senderTerm*: RaftNodeTerm # Sender Raft Node Term
|
||||
|
||||
RaftMessageAppendEntriesResponse*[SmStateType] = ref object of RaftMessageResponseBase
|
||||
success*: bool
|
||||
lastLogIndex*: RaftLogIndex
|
||||
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
|
||||
RaftMessageResponse*[SmCommandType, SmStateType] = ref object of RaftMessageResponseBase[SmCommandType, SmStateType]
|
||||
case op*: RaftMessageOps
|
||||
of rmoRequestVote:
|
||||
granted*: bool # Is vote granted by the Raft node, from we requested vote?
|
||||
of rmoAppendLogEntry:
|
||||
success*: bool
|
||||
lastLogIndex*: RaftLogIndex
|
||||
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
|
||||
of rmoInstallSnapshot:
|
||||
discard
|
||||
|
||||
# Raft Node Client Request/Response definitions
|
||||
RaftNodeClientRequestOps* = enum
|
||||
|
|
|
@ -31,8 +31,9 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp
|
|||
# persistentStorage: RaftNodePersistentStorage,
|
||||
msgSendCallback: RaftMessageSendCallback;
|
||||
electionTimeout: int=150;
|
||||
heartBeatTimeout: int=180;
|
||||
appendEntriesTimeout: int=150
|
||||
heartBeatTimeout: int=150;
|
||||
appendEntriesTimeout: int=50;
|
||||
votingTimeout: int=50
|
||||
): T =
|
||||
var
|
||||
peers: RaftNodePeers
|
||||
|
@ -43,7 +44,8 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp
|
|||
result = T(
|
||||
id: id, state: rnsFollower, currentTerm: 0, peers: peers, commitIndex: 0, lastApplied: 0,
|
||||
msgSendCallback: msgSendCallback, votedFor: DefaultUUID, currentLeaderId: DefaultUUID,
|
||||
electionTimeout: electionTimeout, heartBeatTimeout: heartBeatTimeout, appendEntriesTimeout: appendEntriesTimeout
|
||||
electionTimeout: electionTimeout, heartBeatTimeout: heartBeatTimeout, appendEntriesTimeout: appendEntriesTimeout,
|
||||
votingTimeout: votingTimeout
|
||||
)
|
||||
|
||||
RaftNodeSmInit(result.stateMachine)
|
||||
|
@ -75,20 +77,24 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
|
|||
result = node.state == rnsLeader
|
||||
|
||||
# Deliver Raft Message to the Raft Node and dispatch it
|
||||
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
|
||||
case raftMessage.op
|
||||
of rmoRequestVote: # Dispatch different Raft Message types based on the operation code
|
||||
result = RaftNodeHandleRequestVote(node, RaftMessageRequestVote(raftMessage))
|
||||
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase[SmCommandType, SmStateType]):
|
||||
Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
|
||||
var
|
||||
rm = RaftMessage[SmCommandType, SmStateType](raftMessage)
|
||||
|
||||
case rm.op # Dispatch different Raft Message types based on the operation code
|
||||
of rmoRequestVote:
|
||||
result = RaftNodeHandleRequestVote(node, rm)
|
||||
of rmoAppendLogEntry:
|
||||
var appendMsg = RaftMessageAppendEntries[SmCommandType](raftMessage)
|
||||
if appendMsg.logEntries.isSome:
|
||||
result = RaftNodeHandleAppendEntries(node, appendMsg)
|
||||
if rm.logEntries.isSome:
|
||||
result = RaftNodeHandleAppendEntries(node, rm)
|
||||
else:
|
||||
result = RaftNodeHandleHeartBeat(node, appendMsg)
|
||||
result = RaftNodeHandleHeartBeat(node, rm)
|
||||
else: discard
|
||||
|
||||
# Process Raft Node Client Requests
|
||||
proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
|
||||
proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]):
|
||||
Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
|
||||
case req.op
|
||||
of rncroExecSmCommand:
|
||||
# TODO: implemenmt command handling
|
||||
|
@ -122,19 +128,18 @@ template RaftTimerCreate(timerInterval: int, timerCallback: RaftTimerCallback):
|
|||
|
||||
# Timers scheduling stuff etc.
|
||||
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
node.heartBeatTimer = RaftTimerCreate(node.heartBeatTimeout, proc() = asyncSpawn RaftNodeSendHeartBeat(node))
|
||||
node.heartBeatTimer = RaftTimerCreate(node.heartBeatTimeout, proc() = RaftNodeSendHeartBeat(node))
|
||||
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
debug "Raft Node sending Heart-Beat to peers", node_id=node.id
|
||||
for raftPeer in node.peers:
|
||||
let msgHrtBt = RaftMessageAppendEntries[SmCommandType](
|
||||
let msgHrtBt = RaftMessage[SmCommandType, SmStateType](
|
||||
op: rmoAppendLogEntry, senderId: node.id, receiverId: raftPeer.id,
|
||||
senderTerm: RaftNodeTermGet(node), commitIndex: node.commitIndex,
|
||||
prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node) - 1).term else: 0
|
||||
)
|
||||
let r = await node.msgSendCallback(msgHrtBt)
|
||||
let r = node.msgSendCallback(msgHrtBt)
|
||||
discard r
|
||||
debug "Sent Heart-Beat", sender=node.id, to=raftPeer.id
|
||||
RaftNodeScheduleHeartBeat(node)
|
||||
|
||||
proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
|
|
|
@ -84,17 +84,16 @@ type
|
|||
rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes to speed up the new nodes
|
||||
# when they have to catch-up to the currently replicated log.
|
||||
|
||||
RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages.
|
||||
op*: RaftMessageOps # Message op. Used to distinguish between different message types
|
||||
# and cast the base class to the correct derived class where necessary.
|
||||
RaftMessageBase*[SmCommandType, SmStateType] = ref object of RootObj # Base Type for Raft Protocol Messages.
|
||||
msgId*: RaftMessageId # Message UUID.
|
||||
senderId*: RaftNodeId # Sender Raft Node ID.
|
||||
receiverId*: RaftNodeId # Receiver Raft Node ID.
|
||||
|
||||
RaftMessageResponseBase* = ref object of RaftMessageBase
|
||||
RaftMessageResponseBase*[SmCommandType, SmStateType] = ref object of RaftMessageBase[SmCommandType, SmStateType]
|
||||
|
||||
RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} # Callback for Sending Raft Node Messages
|
||||
# out of this Raft Node.
|
||||
# Callback for Sending Raft Node Messages out of this Raft Node.
|
||||
RaftMessageSendCallback*[SmCommandType, SmStateType] = proc (raftMessage: RaftMessageBase[SmCommandType, SmStateType]):
|
||||
Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.}
|
||||
|
||||
# For later use when adding/removing new nodes (dynamic configuration chganges)
|
||||
RaftNodeConfiguration* = ref object
|
||||
|
@ -123,11 +122,12 @@ type
|
|||
# Raft Node Object type
|
||||
RaftNode*[SmCommandType, SmStateType] = ref object
|
||||
# Timers
|
||||
votesFuts*: seq[Future[RaftMessageResponseBase]]
|
||||
votesFuts*: seq[Future[RaftMessageResponseBase[SmCommandType, SmStateType]]]
|
||||
|
||||
electionTimeout*: int
|
||||
heartBeatTimeout*: int
|
||||
appendEntriesTimeout*: int
|
||||
votingTimeout*: int
|
||||
|
||||
heartBeatTimer*: Future[void]
|
||||
electionTimeoutTimer*: Future[void]
|
||||
|
@ -137,7 +137,7 @@ type
|
|||
raftStateMutex*: RLock
|
||||
|
||||
# Misc
|
||||
msgSendCallback*: RaftMessageSendCallback
|
||||
msgSendCallback*: RaftMessageSendCallback[SmCommandType, SmStateType]
|
||||
persistentStorage: RaftNodePersistentStorage[SmCommandType, SmStateType]
|
||||
|
||||
# Persistent state
|
||||
|
|
|
@ -20,8 +20,8 @@ type
|
|||
BasicRaftCluster* = ref object
|
||||
nodes*: Table[RaftNodeId, BasicRaftNode]
|
||||
|
||||
proc BasicRaftClusterRaftMessageSendCallbackCreate(cluster: BasicRaftCluster): RaftMessageSendCallback =
|
||||
proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
|
||||
proc BasicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
|
||||
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
|
||||
result = await cluster.nodes[msg.receiverId].RaftNodeMessageDeliver(msg)
|
||||
|
||||
proc BasicRaftClusterStart*(cluster: BasicRaftCluster) =
|
||||
|
@ -53,5 +53,5 @@ proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster =
|
|||
peersIds = nodesIds
|
||||
|
||||
peersIds.del(peersIds.find(nodeId))
|
||||
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result), electionTimeout=150, heartBeatTimeout=150)
|
||||
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result), electionTimeout=150, heartBeatTimeout=150)
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ proc loadConfig(): RaftPeersConfContainer =
|
|||
conf.add(RaftPeerConf(id: parseUUID(n["id"].getStr), host: n["host"].getStr, port: n["port"].getInt))
|
||||
result = conf
|
||||
|
||||
proc RaftPipesRead(node: BasicRaftNode, port: int) =
|
||||
proc RaftPipesRead[SmCommandType, SmStateType](node: BasicRaftNode, port: int) =
|
||||
var
|
||||
fifoRead = fmt"RAFTNODERECEIVEMSGPIPE{port}"
|
||||
fifoWrite = fmt"RAFTNODESENDMSGRESPPIPE{port}"
|
||||
|
@ -37,47 +37,45 @@ proc RaftPipesRead(node: BasicRaftNode, port: int) =
|
|||
|
||||
var
|
||||
ss = MsgStream.init(frFD.readAll)
|
||||
xx: RaftMessageBase
|
||||
xx: RaftMessage[SmCommandType, SmStateType]
|
||||
|
||||
ss.unpack(xx) #and here too
|
||||
|
||||
debug "reqqqq: ", req=repr(xx)
|
||||
debug "Received Req: ", req=repr(xx)
|
||||
|
||||
var
|
||||
r = waitFor RaftNodeMessageDeliver(node, RaftMessageRequestVote(xx))
|
||||
r = waitFor RaftNodeMessageDeliver(node, xx)
|
||||
resp = RaftMessageResponse[SmCommandType, SmStateType](r)
|
||||
rs = MsgStream.init()
|
||||
|
||||
rs.pack(r)
|
||||
rs.pack(resp)
|
||||
fwFD.write(rs.data)
|
||||
|
||||
proc TestRaftMessageSendCallbackCreate(conf: RaftPeersConfContainer): RaftMessageSendCallback =
|
||||
proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
|
||||
proc TestRaftMessageSendCallbackCreate[SmCommandType, SmStateType](conf: RaftPeersConfContainer): RaftMessageSendCallback[SmCommandType, SmStateType] =
|
||||
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
|
||||
var
|
||||
host: string
|
||||
port: int
|
||||
resp: Response
|
||||
xx: RaftMessageResponse[SmCommandType, SmStateType]
|
||||
client = newHttpClient(timeout=50)
|
||||
m = RaftMessage[SmCommandType, SmStateType](msg)
|
||||
s = MsgStream.init() # besides MsgStream, you can also use Nim StringStream or FileStream
|
||||
|
||||
for c in conf:
|
||||
if c.id == msg.receiverId:
|
||||
host = c.host
|
||||
port = c.port
|
||||
var
|
||||
client = newHttpClient()
|
||||
s = MsgStream.init() # besides MsgStream, you can also use Nim StringStream or FileStream
|
||||
|
||||
s.pack(msg) #here the magic happened
|
||||
s.pack(m) #here the magic happened
|
||||
debug "Sending Req: ", req=fmt"http://{host}:{port}", data=s.data
|
||||
resp = client.post(fmt"http://{host}:{port}", s.data)
|
||||
|
||||
debug "req: ", req=fmt"http://{host}:{port}", data=s.data
|
||||
var
|
||||
resp = client.post(fmt"http://{host}:{port}", s.data)
|
||||
echo resp.status
|
||||
|
||||
var
|
||||
ss = MsgStream.init(resp.body)
|
||||
xx: RaftMessageResponseBase
|
||||
|
||||
ss.unpack(xx) #and here too
|
||||
s = MsgStream.init(resp.body)
|
||||
s.unpack(xx) #and here too
|
||||
result = xx
|
||||
|
||||
proc main() {.async.} =
|
||||
proc main() =
|
||||
var conf = loadConfig()
|
||||
|
||||
var
|
||||
|
@ -93,13 +91,14 @@ proc main() {.async.} =
|
|||
peersIds = nodesIds
|
||||
port: int
|
||||
idx = peersIds.find(nodeId)
|
||||
|
||||
port = conf[idx].port
|
||||
peersIds.del(idx)
|
||||
node = BasicRaftNode.new(nodeId, peersIds, TestRaftMessageSendCallbackCreate[SmCommand, SmState](conf))
|
||||
|
||||
node = BasicRaftNode.new(nodeId, peersIds, TestRaftMessageSendCallbackCreate(conf))
|
||||
RaftNodeStart(node)
|
||||
spawn RaftPipesRead(node, port)
|
||||
spawn RaftPipesRead[SmCommand, SmState](node, port)
|
||||
runForever()
|
||||
|
||||
if isMainModule:
|
||||
waitFor main()
|
||||
runForever()
|
||||
main()
|
|
@ -1,66 +0,0 @@
|
|||
import asyncio
|
||||
import tornado
|
||||
import os
|
||||
import sys
|
||||
import errno
|
||||
import msgpack
|
||||
|
||||
writeFIFO = 'RAFTNODERECEIVEMSGPIPE'
|
||||
readFIFO = 'RAFTNODESENDMSGRESPPIPE'
|
||||
|
||||
def writeFifo(data):
|
||||
with open(writeFIFO, "ab") as fw:
|
||||
fw.write(data)
|
||||
|
||||
def readPipe():
|
||||
while True:
|
||||
with open(readFIFO, "rb") as fifo:
|
||||
data = fifo.read()
|
||||
# writeFifo(data)
|
||||
if len(data) == 0:
|
||||
pass
|
||||
# print("Writer closed")
|
||||
else:
|
||||
print('Read: {0}'.format(data))
|
||||
return data
|
||||
|
||||
class MainHandler(tornado.web.RequestHandler):
|
||||
def post(self):
|
||||
s = self.request.body
|
||||
print(s)
|
||||
writeFifo(s)
|
||||
r = readPipe()
|
||||
print(r)
|
||||
self.write(r)
|
||||
|
||||
def get(self):
|
||||
self.write("Hello, world")
|
||||
|
||||
def make_app():
|
||||
|
||||
return tornado.web.Application([
|
||||
(r"/", MainHandler),
|
||||
])
|
||||
|
||||
async def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: tornado_simple_raft_node_server.py <port>")
|
||||
return
|
||||
app = make_app()
|
||||
app.listen(int(sys.argv[1]))
|
||||
await asyncio.Event().wait()
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
readFIFO = readFIFO + sys.argv[1]
|
||||
writeFIFO = writeFIFO + sys.argv[1]
|
||||
os.mkfifo(readFIFO)
|
||||
os.mkfifo(writeFIFO)
|
||||
except OSError as oe:
|
||||
if oe.errno != errno.EEXIST:
|
||||
raise
|
||||
|
||||
# readPipe()
|
||||
|
||||
asyncio.run(main())
|
||||
|
Loading…
Reference in New Issue