nim-raft/tests/raft_test_node_standalone.nim

104 lines
2.7 KiB
Nim
Raw Normal View History

2023-09-08 18:45:37 +03:00
import basic_state_machine
2023-09-11 19:55:30 +03:00
import basic_cluster
import std/json
2023-09-09 19:56:54 +03:00
import msgpack4nim
2023-09-11 19:55:30 +03:00
import strutils
import std/strformat
import httpclient
2023-09-12 16:30:17 +03:00
import os
import std/threadpool
2023-09-08 18:45:37 +03:00
type
2023-09-11 19:55:30 +03:00
RaftPeerConf* = object
id*: UUID
host*: string
port*: int
2023-09-11 19:55:30 +03:00
RaftPeersConfContainer* = seq[RaftPeerConf]
2023-09-12 14:49:14 +03:00
proc loadConfig(): RaftPeersConfContainer =
var
conf: RaftPeersConfContainer
let jsonFile = "raft_node_config.json"
# read and parse file
let jsConf = parseFile(jsonFile)
for n in jsConf["raftPeers"]:
2023-09-12 14:49:14 +03:00
conf.add(RaftPeerConf(id: parseUUID(n["id"].getStr), host: n["host"].getStr, port: n["port"].getInt))
result = conf
2023-10-13 07:24:35 +03:00
proc raftPipesRead[SmCommandType, SmStateType](node: BasicRaftNode, port: int) =
2023-09-12 14:49:14 +03:00
var
fifoRead = fmt"RAFTNODERECEIVEMSGPIPE{port}"
fifoWrite = fmt"RAFTNODESENDMSGRESPPIPE{port}"
frFD = open(fifoRead, fmRead)
fwFD = open(fifoWrite, fmAppend)
2023-09-08 18:45:37 +03:00
2023-09-12 14:49:14 +03:00
var
ss = MsgStream.init(frFD.readAll)
xx: RaftMessage[SmCommandType, SmStateType]
2023-09-12 14:49:14 +03:00
ss.unpack(xx) #and here too
debug "Received Req: ", req=repr(xx)
2023-09-12 16:30:17 +03:00
2023-09-12 14:49:14 +03:00
var
2023-10-13 07:24:35 +03:00
r = waitFor raftNodeMessageDeliver(node, xx)
resp = RaftMessageResponse[SmCommandType, SmStateType](r)
2023-09-12 14:49:14 +03:00
rs = MsgStream.init()
rs.pack(resp)
2023-09-12 14:49:14 +03:00
fwFD.write(rs.data)
2023-10-13 07:24:35 +03:00
proc testRaftMessageSendCallbackCreate[SmCommandType, SmStateType](conf: RaftPeersConfContainer): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
2023-09-11 19:55:30 +03:00
var
host: string
port: int
resp: Response
xx: RaftMessageResponse[SmCommandType, SmStateType]
client = newHttpClient(timeout=50)
m = RaftMessage[SmCommandType, SmStateType](msg)
s = MsgStream.init() # besides MsgStream, you can also use Nim StringStream or FileStream
2023-09-11 19:55:30 +03:00
for c in conf:
if c.id == msg.receiverId:
host = c.host
port = c.port
s.pack(m) #here the magic happened
debug "Sending Req: ", req=fmt"http://{host}:{port}", data=s.data
resp = client.post(fmt"http://{host}:{port}", s.data)
2023-09-11 19:55:30 +03:00
s = MsgStream.init(resp.body)
s.unpack(xx) #and here too
2023-09-11 19:55:30 +03:00
result = xx
proc main() =
2023-09-12 14:49:14 +03:00
var conf = loadConfig()
2023-09-11 19:55:30 +03:00
var
nodesIds: seq[UUID]
node: BasicRaftNode
for c in conf:
debug "single conf", single_conf=c
nodesIds.add(c.id)
var
2023-09-12 16:30:17 +03:00
nodeId = parseUUID(paramStr(1))
2023-09-11 19:55:30 +03:00
peersIds = nodesIds
2023-09-12 14:49:14 +03:00
port: int
idx = peersIds.find(nodeId)
2023-09-12 14:49:14 +03:00
port = conf[idx].port
peersIds.del(idx)
2023-10-13 07:24:35 +03:00
node = BasicRaftNode.new(nodeId, peersIds, testRaftMessageSendCallbackCreate[SmCommand, SmState](conf))
2023-09-11 19:55:30 +03:00
2023-10-13 07:24:35 +03:00
raftNodeStart(node)
spawn raftPipesRead[SmCommand, SmState](node, port)
runForever()
2023-09-11 19:55:30 +03:00
2023-09-12 14:49:14 +03:00
if isMainModule:
main()