This commit is contained in:
Raycho Mukelov 2023-09-11 19:55:30 +03:00
parent 1de2422cde
commit fe5c069ac7
8 changed files with 76 additions and 31 deletions

View File

@ -3,6 +3,6 @@
x=''
while [ t ];
do
$(`x < RAFTNODESENDMSGRESPPIPE`);
$(`$x < RAFTNODESENDMSGRESPPIPE`);
echo "$($x)" > RAFTNODERECEIVEMSGPIPE;
done

View File

@ -19,7 +19,7 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
for peer in node.peers:
if peer.hasVoted:
cnt.inc
if cnt >= (node.peers.len div 2 + 1):
if cnt >= (node.peers.len div 2 + node.peers.len mod 2):
result = true
proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
@ -40,11 +40,11 @@ proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCom
withRLock(node.raftStateMutex):
result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID:
# if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term:
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
node.votedFor = msg.senderId
result.granted = true
RaftNodeScheduleElectionTimeout(node)
if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term:
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
node.votedFor = msg.senderId
result.granted = true
RaftNodeScheduleElectionTimeout(node)
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):

View File

@ -11,10 +11,11 @@ import types
# Private Log Ops
proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
discard
len(node.log.logData)
proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): RaftNodeLogEntry[SmCommandType] =
discard
if logIndex > 0:
result = node.log.logData[logIndex]
proc RaftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
discard

View File

@ -26,14 +26,14 @@ export
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
# Raft Node Public API
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType];
id: RaftNodeId; peersIds: seq[RaftNodeId];
# persistentStorage: RaftNodePersistentStorage,
msgSendCallback: RaftMessageSendCallback,
electionTimeout: int=150,
heartBeatTimeout: int=180,
appendEntriesTimeout: int=150)
): T =
msgSendCallback: RaftMessageSendCallback;
electionTimeout: int=150;
heartBeatTimeout: int=180;
appendEntriesTimeout: int=150
): T =
var
peers: RaftNodePeers

View File

@ -34,8 +34,8 @@ type
rnsStopped = 4
RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node
RaftNodeTerm* = uint64 # Raft Node Term Type
RaftLogIndex* = uint64 # Raft Node Log Index Type
RaftNodeTerm* = int # Raft Node Term Type
RaftLogIndex* = int # Raft Node Log Index Type
RaftNodePeer* = ref object # Raft Node Peer object
id*: RaftNodeId
@ -109,7 +109,6 @@ type
RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
index*: RaftLogIndex
entryType*: LogEntryType # Type of entry - data to append, configuration or no op etc.
data*: Option[SmCommandType] # Entry data (State Machine Command) - this is mutually exclusive with configuration
# depending on entryType field
@ -126,7 +125,7 @@ type
# Timers
votesFuts*: seq[Future[RaftMessageResponseBase]]
requestVotesTimeout*: int
electionTimeout*: int
heartBeatTimeout*: int
appendEntriesTimeout*: int
@ -147,7 +146,7 @@ type
currentTerm*: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically)
votedFor*: RaftNodeId # Candidate RaftNodeId that received vote in current term (or DefaultUUID if none),
# also used to redirect Client Requests in case this Raft Node is not the leader
log: RaftNodeLog[SmCommandType] # This Raft Node Log
log*: RaftNodeLog[SmCommandType] # This Raft Node Log
stateMachine*: RaftNodeStateMachine[SmCommandType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's
# state is enough to consider it 'persisted'
peers*: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent

View File

@ -53,5 +53,5 @@ proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster =
peersIds = nodesIds
peersIds.del(peersIds.find(nodeId))
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result))
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate(result), electionTimeout=50, heartBeatTimeout=50)

View File

@ -1,17 +1,19 @@
import ../raft
import basic_state_machine
import basic_cluster
import std/json
import uuids
import chronicles
import msgpack4nim
import strutils
import std/strformat
import httpclient
type
RaftPeerConf = object
id: UUID
host: string
port: int
RaftPeerConf* = object
id*: UUID
host*: string
port*: int
RaftPeersConfContainer = seq[RaftPeerConf]
RaftPeersConfContainer* = seq[RaftPeerConf]
var
conf: RaftPeersConfContainer
@ -21,9 +23,49 @@ proc loadConfig() =
# read and parse file
let jsConf = parseFile(jsonFile)
for n in jsConf["raftPeers"]:
conf.add(RaftPeerConf(id: parseUUID(n["id"].astToStr), host: n["host"].astToStr, port: int(n["port"].astToStr)))
conf.add(RaftPeerConf(id: parseUUID(n["id"].astToStr), host: n["host"].astToStr, port: parseInt(n["port"].astToStr)))
debug "Conf", conf=conf
info "Conf", config=repr(conf)
proc TestRaftMessageSendCallbackCreate(): RaftMessageSendCallback =
proc (msg: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
var
host: string
port: int
for c in conf:
if c.id == msg.receiverId:
host = c.host
port = c.port
var
client = newAsyncHttpClient()
s = MsgStream.init() # besides MsgStream, you can also use Nim StringStream or FileStream
s.pack(msg) #here the magic happened
var
resp = await client.post(fmt"http://host:port", s.data)
echo resp.status
var
ss = MsgStream.init(resp.body)
xx: RaftMessageResponseBase
ss.unpack(xx) #and here too
result = xx
if isMainModule:
loadConfig()
var node = RaftNode[SmCommand, SmState].new()
var
nodesIds: seq[UUID]
node: BasicRaftNode
for c in conf:
debug "single conf", single_conf=c
nodesIds.add(c.id)
var
nodeId = parseUUID("f9695ea4-4f37-11ee-8e75-8ff5a48faa42")
peersIds = nodesIds
peersIds.del(peersIds.find(nodeId))
node = BasicRaftNode.new(nodeId, peersIds, TestRaftMessageSendCallbackCreate())

View File

@ -15,6 +15,7 @@ def writeFifo(data):
def readPipe():
with open(readFIFO, "r") as fifo:
data = fifo.read()
# writeFifo(data)
if len(data) == 0:
pass
# print("Writer closed")
@ -55,5 +56,7 @@ if __name__ == "__main__":
if oe.errno != errno.EEXIST:
raise
readPipe()
asyncio.run(main())