mirror of https://github.com/waku-org/nwaku.git
Refactor wakunode2.nim (#664)
* Refactor wakunode2.nim * Remove empty raises * Some formatting improvements
This commit is contained in:
parent
c107072d9a
commit
92f2d5a7f5
|
@ -19,7 +19,7 @@ proc runBackground() {.async.} =
|
|||
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
Port(uint16(conf.udpPort) + conf.portsShift))
|
||||
node = WakuNode.init(conf.nodeKey, conf.listenAddress,
|
||||
node = WakuNode.new(conf.nodeKey, conf.listenAddress,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
|
||||
|
||||
await node.start()
|
||||
|
|
|
@ -112,11 +112,14 @@ proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} =
|
|||
|
||||
proc showChatPrompt(c: Chat) =
|
||||
if not c.prompt:
|
||||
stdout.write(">> ")
|
||||
stdout.flushFile()
|
||||
c.prompt = true
|
||||
try:
|
||||
stdout.write(">> ")
|
||||
stdout.flushFile()
|
||||
c.prompt = true
|
||||
except IOError:
|
||||
discard
|
||||
|
||||
proc printReceivedMessage(c: Chat, msg: WakuMessage) =
|
||||
proc printReceivedMessage(c: Chat, msg: WakuMessage) {.raises: [Defect].} =
|
||||
when PayloadV1:
|
||||
# Use Waku v1 payload encoding/encryption
|
||||
let
|
||||
|
@ -142,7 +145,12 @@ proc printReceivedMessage(c: Chat, msg: WakuMessage) =
|
|||
pb = Chat2Message.init(msg.payload)
|
||||
chatLine = if pb.isOk: pb[].toString()
|
||||
else: string.fromBytes(msg.payload)
|
||||
echo &"{chatLine}"
|
||||
try:
|
||||
echo &"{chatLine}"
|
||||
except ValueError:
|
||||
# Formatting fail. Print chat line in any case.
|
||||
echo chatLine
|
||||
|
||||
c.prompt = false
|
||||
showChatPrompt(c)
|
||||
trace "Printing message", topic=DefaultTopic, chatLine,
|
||||
|
@ -295,7 +303,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
|||
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
Port(uint16(conf.udpPort) + conf.portsShift))
|
||||
node = WakuNode.init(conf.nodekey, conf.listenAddress,
|
||||
node = WakuNode.new(conf.nodekey, conf.listenAddress,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
|
||||
|
||||
await node.start()
|
||||
|
@ -379,7 +387,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
|||
|
||||
node.wakuFilter.setPeer(parsePeerInfo(conf.filternode))
|
||||
|
||||
proc filterHandler(msg: WakuMessage) {.gcsafe.} =
|
||||
proc filterHandler(msg: WakuMessage) {.gcsafe, raises: [Defect].} =
|
||||
trace "Hit filter handler", contentTopic=msg.contentTopic
|
||||
|
||||
chat.printReceivedMessage(msg)
|
||||
|
|
|
@ -153,7 +153,7 @@ proc new*(T: type Chat2MatterBridge,
|
|||
|
||||
# Setup Waku v2 node
|
||||
let
|
||||
nodev2 = WakuNode.init(nodev2Key,
|
||||
nodev2 = WakuNode.new(nodev2Key,
|
||||
nodev2BindIp, nodev2BindPort,
|
||||
nodev2ExtIp, nodev2ExtPort)
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||
extIp = ValidIpAddress.init("127.0.0.1")
|
||||
port = Port(9000)
|
||||
node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port))
|
||||
node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
|
||||
|
||||
asyncTest "Debug API: get node info":
|
||||
waitFor node.start()
|
||||
|
@ -120,11 +120,11 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
asyncTest "Relay API: get latest messages":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, bindIp, Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, bindIp, Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, bindIp, Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, bindIp, Port(60002))
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = defaultContentTopic
|
||||
payload = @[byte 9]
|
||||
|
@ -389,14 +389,14 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
# Create a couple of nodes
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
peerInfo2 = node2.peerInfo
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60004))
|
||||
peerInfo3 = node3.peerInfo
|
||||
|
||||
|
@ -445,14 +445,14 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
# Create a couple of nodes
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
peerInfo2 = node2.peerInfo
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60004))
|
||||
peerInfo3 = node3.peerInfo
|
||||
|
||||
|
@ -496,7 +496,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
asyncTest "Admin API: get unmanaged peer information":
|
||||
let
|
||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
|
||||
waitFor node.start()
|
||||
|
@ -552,11 +552,11 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
asyncTest "Private API: generate asymmetric keys and encrypt/decrypt communication":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, bindIp, Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, bindIp, Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, bindIp, Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, bindIp, Port(60002))
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = defaultContentTopic
|
||||
payload = @[byte 9]
|
||||
|
@ -642,11 +642,11 @@ procSuite "Waku v2 JSON-RPC API":
|
|||
asyncTest "Private API: generate symmetric keys and encrypt/decrypt communication":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, bindIp, Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, bindIp, Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, bindIp, Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, bindIp, Port(60002))
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
|
||||
pubSubTopic = "polling"
|
||||
contentTopic = defaultContentTopic
|
||||
payload = @[byte 9]
|
||||
|
|
|
@ -41,12 +41,20 @@ suite "Message Store":
|
|||
var t1Flag, t2Flag, t3Flag: bool = false
|
||||
# flags for receiver timestamp
|
||||
var rt1Flag, rt2Flag, rt3Flag: bool = false
|
||||
# flags for message/pubsubTopic (default true)
|
||||
var msgFlag, psTopicFlag = true
|
||||
|
||||
var responseCount = 0
|
||||
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) =
|
||||
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
|
||||
responseCount += 1
|
||||
check msg in msgs
|
||||
check psTopic == pubsubTopic
|
||||
|
||||
# Note: cannot use `check` within `{.raises: [Defect].}` block:
|
||||
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
|
||||
if msg notin msgs:
|
||||
msgFlag = false
|
||||
|
||||
if psTopic != pubsubTopic:
|
||||
psTopicFlag = false
|
||||
|
||||
# check the correct retrieval of versions
|
||||
if msg.version == uint32(0): v0Flag = true
|
||||
|
@ -82,6 +90,9 @@ suite "Message Store":
|
|||
rt1Flag == true
|
||||
rt2Flag == true
|
||||
rt3Flag == true
|
||||
# check messages and pubsubTopic
|
||||
msgFlag == true
|
||||
psTopicFlag == true
|
||||
test "set and get user version":
|
||||
let
|
||||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
|
|
|
@ -24,10 +24,10 @@ procSuite "Peer Manager":
|
|||
asyncTest "Peer dialing works":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
peerInfo2 = node2.peerInfo
|
||||
|
||||
|
@ -57,10 +57,10 @@ procSuite "Peer Manager":
|
|||
asyncTest "Dialing fails gracefully":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
peerInfo2 = node2.peerInfo
|
||||
|
||||
|
@ -82,7 +82,7 @@ procSuite "Peer Manager":
|
|||
asyncTest "Adding, selecting and filtering peers work":
|
||||
let
|
||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
# Create filter peer
|
||||
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
|
||||
|
@ -126,10 +126,10 @@ procSuite "Peer Manager":
|
|||
asyncTest "Peer manager keeps track of connections":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
peerInfo2 = node2.peerInfo
|
||||
|
||||
|
@ -171,10 +171,10 @@ procSuite "Peer Manager":
|
|||
database = SqliteDatabase.init("", inMemory = true)[]
|
||||
storage = WakuPeerStorage.new(database)[]
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000), peerStorage = storage)
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
peerInfo2 = node2.peerInfo
|
||||
|
||||
|
@ -194,7 +194,7 @@ procSuite "Peer Manager":
|
|||
# Simulate restart by initialising a new node using the same storage
|
||||
let
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60004), peerStorage = storage)
|
||||
|
||||
await node3.start()
|
||||
|
|
|
@ -30,36 +30,53 @@ suite "Peer Storage":
|
|||
discard storage.put(peer.peerId, stored, conn, disconn)
|
||||
|
||||
var responseCount = 0
|
||||
# flags to check data matches what was stored (default true)
|
||||
var peerIdFlag, storedInfoFlag, connectednessFlag, disconnectFlag: bool
|
||||
|
||||
proc data(peerId: PeerID, storedInfo: StoredInfo,
|
||||
connectedness: Connectedness, disconnectTime: int64) =
|
||||
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
|
||||
responseCount += 1
|
||||
check:
|
||||
peerId == peer.peerId
|
||||
storedInfo == stored
|
||||
connectedness == conn
|
||||
disconnectTime == disconn
|
||||
|
||||
# Note: cannot use `check` within `{.raises: [Defect].}` block
|
||||
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
|
||||
# These flags are checked outside this block.
|
||||
peerIdFlag = peerId == peer.peerId
|
||||
storedInfoFlag = storedInfo == stored
|
||||
connectednessFlag = connectedness == conn
|
||||
disconnectFlag = disconnectTime == disconn
|
||||
|
||||
let res = storage.getAll(data)
|
||||
|
||||
check:
|
||||
res.isErr == false
|
||||
responseCount == 1
|
||||
peerIdFlag
|
||||
storedInfoFlag
|
||||
connectednessFlag
|
||||
disconnectFlag
|
||||
|
||||
# Test replace and retrieve (update an existing entry)
|
||||
discard storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10)
|
||||
|
||||
responseCount = 0
|
||||
proc replacedData(peerId: PeerID, storedInfo: StoredInfo,
|
||||
connectedness: Connectedness, disconnectTime: int64) =
|
||||
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
|
||||
responseCount += 1
|
||||
check:
|
||||
peerId == peer.peerId
|
||||
storedInfo == stored
|
||||
connectedness == CannotConnect
|
||||
disconnectTime == disconn + 10
|
||||
|
||||
|
||||
# Note: cannot use `check` within `{.raises: [Defect].}` block
|
||||
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
|
||||
# These flags are checked outside this block.
|
||||
peerIdFlag = peerId == peer.peerId
|
||||
storedInfoFlag = storedInfo == stored
|
||||
connectednessFlag = connectedness == CannotConnect
|
||||
disconnectFlag = disconnectTime == disconn + 10
|
||||
|
||||
let repRes = storage.getAll(replacedData)
|
||||
|
||||
check:
|
||||
repRes.isErr == false
|
||||
responseCount == 1
|
||||
peerIdFlag
|
||||
storedInfoFlag
|
||||
connectednessFlag
|
||||
disconnectFlag
|
||||
|
|
|
@ -48,7 +48,7 @@ procSuite "WakuBridge":
|
|||
|
||||
# Waku v2 node
|
||||
v2NodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
v2Node = WakuNode.init(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
v2Node = WakuNode.new(v2NodeKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
contentTopic = ContentTopic("/waku/1/1a2b3c4d/rlp")
|
||||
topic = [byte 0x1a, byte 0x2b, byte 0x3c, byte 0x4d]
|
||||
|
|
|
@ -18,9 +18,9 @@ procSuite "Waku Keepalive":
|
|||
asyncTest "handle ping keepalives":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
|
|
|
@ -204,7 +204,7 @@ procSuite "Waku rln relay":
|
|||
asyncTest "mounting waku rln relay":
|
||||
let
|
||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
await node.start()
|
||||
|
||||
|
|
|
@ -49,10 +49,10 @@ procSuite "Waku SWAP Accounting":
|
|||
asyncTest "Update accounting state after store operations":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60001))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
@ -96,10 +96,10 @@ procSuite "Waku SWAP Accounting":
|
|||
asyncTest "Update accounting state after sending cheque":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60001))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
|
|
@ -28,7 +28,7 @@ procSuite "WakuNode":
|
|||
asyncTest "Message published with content filter is retrievable":
|
||||
let
|
||||
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
pubSubTopic = "chat"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
@ -77,10 +77,10 @@ procSuite "WakuNode":
|
|||
asyncTest "Content filtered publishing over network":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
pubSubTopic = "chat"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
@ -140,9 +140,9 @@ procSuite "WakuNode":
|
|||
asyncTest "Can receive filtered messages published on both default and other topics":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
defaultTopic = "/waku/2/default-waku/proto"
|
||||
otherTopic = "/non/waku/formatted"
|
||||
defaultContentTopic = "defaultCT"
|
||||
|
@ -216,9 +216,9 @@ procSuite "WakuNode":
|
|||
asyncTest "Filter protocol works on node without relay capability":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
defaultTopic = "/waku/2/default-waku/proto"
|
||||
contentTopic = "defaultCT"
|
||||
payload = @[byte 1]
|
||||
|
@ -264,10 +264,10 @@ procSuite "WakuNode":
|
|||
asyncTest "Store protocol returns expected message":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
@ -300,10 +300,10 @@ procSuite "WakuNode":
|
|||
asyncTest "Filter protocol returns expected message":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
@ -338,13 +338,13 @@ procSuite "WakuNode":
|
|||
asyncTest "Messages are correctly relayed":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60003))
|
||||
pubSubTopic = "test"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
@ -389,10 +389,10 @@ procSuite "WakuNode":
|
|||
asyncTest "Protocol matcher works as expected":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
pubSubTopic = "/waku/2/default-waku/proto"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
@ -492,13 +492,13 @@ procSuite "WakuNode":
|
|||
let
|
||||
# publisher node
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
# Relay node
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
# Subscriber
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||
|
||||
pubSubTopic = "test"
|
||||
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
|
||||
|
@ -586,13 +586,13 @@ procSuite "WakuNode":
|
|||
let
|
||||
# publisher node
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
# Relay node
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
# Subscriber
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||
|
||||
pubSubTopic = "defaultTopic"
|
||||
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
|
||||
|
@ -640,7 +640,7 @@ procSuite "WakuNode":
|
|||
asyncTest "Relay protocol is started correctly":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
|
||||
# Relay protocol starts if mounted after node start
|
||||
|
@ -656,7 +656,7 @@ procSuite "WakuNode":
|
|||
|
||||
let
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
|
||||
node2.mountRelay()
|
||||
|
@ -676,13 +676,13 @@ procSuite "WakuNode":
|
|||
asyncTest "Lightpush message return success":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60010))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60012))
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60013))
|
||||
pubSubTopic = "test"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
@ -747,10 +747,10 @@ procSuite "WakuNode":
|
|||
asyncTest "Resume proc fetches the history":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
@ -778,10 +778,10 @@ procSuite "WakuNode":
|
|||
asyncTest "Resume proc discards duplicate messages":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic, timestamp: 1)
|
||||
|
|
|
@ -163,7 +163,7 @@ proc new*(T: type WakuBridge,
|
|||
|
||||
# Setup Waku v2 node
|
||||
let
|
||||
nodev2 = WakuNode.init(nodev2Key,
|
||||
nodev2 = WakuNode.new(nodev2Key,
|
||||
nodev2BindIp, nodev2BindPort,
|
||||
nodev2ExtIp, nodev2ExtPort)
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ const maxCache* = 30 # Max number of messages cached per topic @TODO make this c
|
|||
|
||||
proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) =
|
||||
|
||||
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
|
||||
proc filterHandler(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].} =
|
||||
# Add message to current cache
|
||||
trace "WakuMessage received", msg=msg
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ proc insertOrReplace(ps: PeerStorage,
|
|||
peerId: PeerID,
|
||||
storedInfo: StoredInfo,
|
||||
connectedness: Connectedness,
|
||||
disconnectTime: int64 = 0) {.raises: [Defect, Exception]} =
|
||||
disconnectTime: int64 = 0) =
|
||||
# Insert peer entry into persistent storage, or replace existing entry with updated info
|
||||
let res = ps.put(peerId, storedInfo, connectedness, disconnectTime)
|
||||
if res.isErr:
|
||||
|
@ -77,7 +77,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
|
|||
|
||||
return none(Connection)
|
||||
|
||||
proc loadFromStorage(pm: PeerManager) {.raises: [Defect, Exception]} =
|
||||
proc loadFromStorage(pm: PeerManager) =
|
||||
# Load peers from storage, if available
|
||||
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
|
||||
if peerId == pm.switch.peerInfo.peerId:
|
||||
|
@ -112,7 +112,7 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
|||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
||||
return
|
||||
|
||||
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager {.raises: [Defect, Exception]} =
|
||||
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
|
||||
let pm = PeerManager(switch: switch,
|
||||
peerStore: WakuPeerStore.new(),
|
||||
storage: storage)
|
||||
|
@ -162,7 +162,7 @@ proc hasPeers*(pm: PeerManager, proto: string): bool =
|
|||
# Returns `true` if manager has any peers for the specified protocol
|
||||
pm.peers.anyIt(it.protos.contains(proto))
|
||||
|
||||
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) {.raises: [Defect, Exception]} =
|
||||
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
|
||||
# Adds peer to manager for the specified protocol
|
||||
|
||||
if peerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
stew/results,
|
||||
../../../protocol/waku_message,
|
||||
|
@ -8,7 +10,7 @@ import
|
|||
## retrieve historical messages
|
||||
|
||||
type
|
||||
DataProc* = proc(receiverTimestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure.}
|
||||
DataProc* = proc(receiverTimestamp: float64, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
|
||||
|
||||
MessageStoreResult*[T] = Result[T, string]
|
||||
|
||||
|
@ -16,5 +18,5 @@ type
|
|||
|
||||
# MessageStore interface
|
||||
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
|
||||
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base, raises: [Defect, Exception].} = discard
|
||||
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
|
|||
|
||||
ok()
|
||||
|
||||
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] {.raises: [Defect, Exception].} =
|
||||
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
|
||||
## Retrieves all messages from the storage.
|
||||
##
|
||||
## **Example:**
|
||||
|
@ -91,7 +91,7 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
|
|||
## if res.isErr:
|
||||
## echo "error"
|
||||
var gotMessages = false
|
||||
proc msg(s: ptr sqlite3_stmt) {.raises: [Defect, Exception].} =
|
||||
proc msg(s: ptr sqlite3_stmt) =
|
||||
gotMessages = true
|
||||
let
|
||||
receiverTimestamp = sqlite3_column_double(s, 0)
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
stew/results,
|
||||
chronos,
|
||||
|
@ -12,7 +14,7 @@ type
|
|||
PeerStorageResult*[T] = Result[T, string]
|
||||
|
||||
DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo,
|
||||
connectedness: Connectedness, disconnectTime: int64) {.closure.}
|
||||
connectedness: Connectedness, disconnectTime: int64) {.closure, raises: [Defect].}
|
||||
|
||||
# PeerStorage interface
|
||||
method put*(db: PeerStorage,
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/sets,
|
||||
sqlite3_abi,
|
||||
|
@ -36,7 +38,7 @@ proc init*(T: type StoredInfo, buffer: seq[byte]): ProtoResult[T] =
|
|||
|
||||
ok(storedInfo)
|
||||
|
||||
proc encode*(storedInfo: StoredInfo): ProtoBuffer =
|
||||
proc encode*(storedInfo: StoredInfo): PeerStorageResult[ProtoBuffer] =
|
||||
var pb = initProtoBuffer()
|
||||
|
||||
pb.write(1, storedInfo.peerId)
|
||||
|
@ -47,9 +49,12 @@ proc encode*(storedInfo: StoredInfo): ProtoBuffer =
|
|||
for proto in storedInfo.protos.items:
|
||||
pb.write(3, proto)
|
||||
|
||||
pb.write(4, storedInfo.publicKey)
|
||||
try:
|
||||
pb.write(4, storedInfo.publicKey)
|
||||
except ResultError[CryptoError] as e:
|
||||
return err("Failed to encode public key")
|
||||
|
||||
return pb
|
||||
ok(pb)
|
||||
|
||||
##########################
|
||||
# Storage implementation #
|
||||
|
@ -97,7 +102,12 @@ method put*(db: WakuPeerStorage,
|
|||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
let res = prepare.value.exec((peerId.data, storedInfo.encode().buffer, int32(ord(connectedness)), disconnectTime))
|
||||
let encoded = storedInfo.encode()
|
||||
|
||||
if encoded.isErr:
|
||||
return err("failed to encode: " & encoded.error())
|
||||
|
||||
let res = prepare.value.exec((peerId.data, encoded.get().buffer, int32(ord(connectedness)), disconnectTime))
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
@ -107,7 +117,7 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR
|
|||
## Retrieves all peers from storage
|
||||
var gotPeers = false
|
||||
|
||||
proc peer(s: ptr sqlite3_stmt) =
|
||||
proc peer(s: ptr sqlite3_stmt) {.raises: [Defect, LPError, ResultError[ProtoError]].} =
|
||||
gotPeers = true
|
||||
let
|
||||
# Peer ID
|
||||
|
@ -125,8 +135,13 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR
|
|||
|
||||
onData(peerId, storedInfo, connectedness, disconnectTime)
|
||||
|
||||
let res = db.database.query("SELECT peerId, storedInfo, connectedness, disconnectTime FROM Peer", peer)
|
||||
if res.isErr:
|
||||
var queryResult: DatabaseResult[bool]
|
||||
try:
|
||||
queryResult = db.database.query("SELECT peerId, storedInfo, connectedness, disconnectTime FROM Peer", peer)
|
||||
except LPError, ResultError[ProtoError]:
|
||||
return err("failed to extract peer from query result")
|
||||
|
||||
if queryResult.isErr:
|
||||
return err("failed")
|
||||
|
||||
ok gotPeers
|
||||
|
|
|
@ -1,18 +1,14 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[options, tables, strutils, sequtils, os],
|
||||
chronos, chronicles, metrics,
|
||||
metrics/chronos_httpserver,
|
||||
stew/shims/net as stewNet,
|
||||
# TODO: Why do we need eth keys?
|
||||
eth/keys,
|
||||
web3,
|
||||
libp2p/multiaddress,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/protocols/ping,
|
||||
# NOTE For TopicHandler, solve with exports?
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/builders,
|
||||
../protocol/[waku_relay, waku_message],
|
||||
|
@ -22,10 +18,10 @@ import
|
|||
../protocol/waku_lightpush/waku_lightpush,
|
||||
../protocol/waku_rln_relay/waku_rln_relay_types,
|
||||
../utils/peers,
|
||||
../utils/requests,
|
||||
./storage/message/message_store,
|
||||
./storage/peer/peer_storage,
|
||||
./storage/migration/migration_types,
|
||||
../utils/requests,
|
||||
./peer_manager/peer_manager
|
||||
|
||||
when defined(rln):
|
||||
|
@ -78,14 +74,6 @@ type
|
|||
rng*: ref BrHmacDrbgContext
|
||||
started*: bool # Indicates that node has started listening
|
||||
|
||||
# NOTE Any difference here in Waku vs Eth2?
|
||||
# E.g. Devp2p/Libp2p support, etc.
|
||||
#func asLibp2pKey*(key: keys.PublicKey): PublicKey =
|
||||
# PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))
|
||||
|
||||
func asEthKey*(key: PrivateKey): keys.PrivateKey =
|
||||
keys.PrivateKey(key.skkey)
|
||||
|
||||
func protocolMatcher(codec: string): Matcher =
|
||||
## Returns a protocol matcher function for the provided codec
|
||||
|
||||
|
@ -125,10 +113,11 @@ template tcpEndPoint(address, port): auto =
|
|||
## Public API
|
||||
##
|
||||
|
||||
proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
||||
proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
||||
bindIp: ValidIpAddress, bindPort: Port,
|
||||
extIp = none[ValidIpAddress](), extPort = none[Port](),
|
||||
peerStorage: PeerStorage = nil): T =
|
||||
peerStorage: PeerStorage = nil): T
|
||||
{.raises: [Defect, LPError].} =
|
||||
## Creates a Waku Node.
|
||||
##
|
||||
## Status: Implemented.
|
||||
|
@ -155,7 +144,7 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|||
# msgIdProvider = msgIdProvider,
|
||||
# triggerSelf = true, sign = false,
|
||||
# verifySignature = false).PubSub
|
||||
result = WakuNode(
|
||||
let wakuNode = WakuNode(
|
||||
peerManager: PeerManager.new(switch, peerStorage),
|
||||
switch: switch,
|
||||
rng: rng,
|
||||
|
@ -163,6 +152,8 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|||
filters: initTable[string, Filter]()
|
||||
)
|
||||
|
||||
return wakuNode
|
||||
|
||||
proc start*(node: WakuNode) {.async.} =
|
||||
## Starts a created Waku Node.
|
||||
##
|
||||
|
@ -393,9 +384,11 @@ proc info*(node: WakuNode): WakuInfo =
|
|||
let wakuInfo = WakuInfo(listenStr: listenStr)
|
||||
return wakuInfo
|
||||
|
||||
proc mountFilter*(node: WakuNode) =
|
||||
proc mountFilter*(node: WakuNode) {.raises: [Defect, KeyError, LPError]} =
|
||||
info "mounting filter"
|
||||
proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} =
|
||||
proc filterHandler(requestId: string, msg: MessagePush)
|
||||
{.gcsafe, raises: [Defect, KeyError].} =
|
||||
|
||||
info "push received"
|
||||
for message in msg.messages:
|
||||
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
||||
|
@ -406,14 +399,14 @@ proc mountFilter*(node: WakuNode) =
|
|||
|
||||
# NOTE: If using the swap protocol, it must be mounted before store. This is
|
||||
# because store is using a reference to the swap protocol.
|
||||
proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) =
|
||||
proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.raises: [Defect, LPError].} =
|
||||
info "mounting swap", mode = $swapConfig.mode
|
||||
node.wakuSwap = WakuSwap.init(node.peerManager, node.rng, swapConfig)
|
||||
node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec))
|
||||
# NYI - Do we need this?
|
||||
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
|
||||
|
||||
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false) =
|
||||
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false) {.raises: [Defect, LPError].} =
|
||||
info "mounting store"
|
||||
|
||||
if node.wakuSwap.isNil:
|
||||
|
@ -426,7 +419,10 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: boo
|
|||
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
|
||||
|
||||
when defined(rln):
|
||||
proc mountRlnRelay*(node: WakuNode, ethClientAddress: Option[string] = none(string), ethAccountAddress: Option[Address] = none(Address), membershipContractAddress: Option[Address] = none(Address)) {.async.} =
|
||||
proc mountRlnRelay*(node: WakuNode,
|
||||
ethClientAddress: Option[string] = none(string),
|
||||
ethAccountAddress: Option[Address] = none(Address),
|
||||
membershipContractAddress: Option[Address] = none(Address)) {.async.} =
|
||||
# TODO return a bool value to indicate the success of the call
|
||||
# check whether inputs are provided
|
||||
doAssert(ethClientAddress.isSome())
|
||||
|
@ -469,7 +465,7 @@ when defined(rln):
|
|||
if msg.isOk():
|
||||
# check the proof
|
||||
if proofVrfy(msg.value().payload, msg.value().proof):
|
||||
result = ValidationResult.Accept
|
||||
return ValidationResult.Accept
|
||||
# set a validator for the pubsubTopic
|
||||
let pb = PubSub(node.wakuRelay)
|
||||
pb.addValidator(pubsubTopic, validator)
|
||||
|
@ -478,7 +474,10 @@ proc mountRelay*(node: WakuNode,
|
|||
topics: seq[string] = newSeq[string](),
|
||||
rlnRelayEnabled = false,
|
||||
relayMessages = true,
|
||||
triggerSelf = true) {.gcsafe.} =
|
||||
triggerSelf = true)
|
||||
# @TODO: Better error handling: CatchableError is raised by `waitFor`
|
||||
{.gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} =
|
||||
|
||||
let wakuRelay = WakuRelay.init(
|
||||
switch = node.switch,
|
||||
# Use default
|
||||
|
@ -529,7 +528,7 @@ proc mountRelay*(node: WakuNode,
|
|||
|
||||
info "relay mounted and started successfully"
|
||||
|
||||
proc mountLightPush*(node: WakuNode) =
|
||||
proc mountLightPush*(node: WakuNode) {.raises: [Defect, LPError].} =
|
||||
info "mounting light push"
|
||||
|
||||
if node.wakuRelay.isNil:
|
||||
|
@ -541,10 +540,15 @@ proc mountLightPush*(node: WakuNode) =
|
|||
|
||||
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
|
||||
|
||||
proc mountLibp2pPing*(node: WakuNode) =
|
||||
proc mountLibp2pPing*(node: WakuNode) {.raises: [Defect, LPError].} =
|
||||
info "mounting libp2p ping protocol"
|
||||
|
||||
node.libp2pPing = Ping.new(rng = node.rng)
|
||||
try:
|
||||
node.libp2pPing = Ping.new(rng = node.rng)
|
||||
except Exception as e:
|
||||
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
|
||||
# @TODO: remove exception handling once explicit `raises` in ping module
|
||||
raise newException(LPError, "Failed to initialise ping protocol")
|
||||
|
||||
node.switch.mount(node.libp2pPing)
|
||||
|
||||
|
@ -590,21 +594,21 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} =
|
|||
discard await n.peerManager.dialPeer(remotePeer, WakuRelayCodec)
|
||||
info "Post peerManager dial"
|
||||
|
||||
proc setStorePeer*(n: WakuNode, address: string) =
|
||||
proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
||||
info "Set store peer", address = address
|
||||
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
||||
n.wakuStore.setPeer(remotePeer)
|
||||
|
||||
proc setFilterPeer*(n: WakuNode, address: string) =
|
||||
proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
||||
info "Set filter peer", address = address
|
||||
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
||||
n.wakuFilter.setPeer(remotePeer)
|
||||
|
||||
proc setLightPushPeer*(n: WakuNode, address: string) =
|
||||
proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
|
||||
info "Set lightpush peer", address = address
|
||||
|
||||
let remotePeer = parsePeerInfo(address)
|
||||
|
@ -638,6 +642,7 @@ proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} =
|
|||
# later.
|
||||
await sleepAsync(5.seconds)
|
||||
|
||||
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||
when isMainModule:
|
||||
import
|
||||
system/ansi_c,
|
||||
|
@ -653,7 +658,7 @@ when isMainModule:
|
|||
./storage/peer/waku_peer_storage,
|
||||
../../common/utils/nat
|
||||
|
||||
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) =
|
||||
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) {.raises: [Defect, RpcBindError, CatchableError].} =
|
||||
let
|
||||
ta = initTAddress(rpcIp, rpcPort)
|
||||
rpcServer = newRpcHttpServer([ta])
|
||||
|
@ -681,7 +686,7 @@ when isMainModule:
|
|||
rpcServer.start()
|
||||
info "RPC Server started", ta
|
||||
|
||||
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) =
|
||||
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) {.raises: [Defect, Exception].} =
|
||||
info "Starting metrics HTTP server", serverIp, serverPort
|
||||
|
||||
startMetricsHttpServer($serverIp, serverPort)
|
||||
|
@ -755,10 +760,10 @@ when isMainModule:
|
|||
## config, the external port is the same as the bind port.
|
||||
extPort = if extIp.isSome() and extTcpPort.isNone(): some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
||||
else: extTcpPort
|
||||
node = WakuNode.init(conf.nodekey,
|
||||
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
extIp, extPort,
|
||||
pStorage)
|
||||
node = WakuNode.new(conf.nodekey,
|
||||
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
extIp, extPort,
|
||||
pStorage)
|
||||
|
||||
waitFor node.start()
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils, options],
|
||||
bearssl,
|
||||
|
@ -34,7 +36,7 @@ const
|
|||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
|
||||
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
|
||||
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") {.raises: [Defect, KeyError]} =
|
||||
for key in filters.keys:
|
||||
let filter = filters[key]
|
||||
# We do this because the key for the filter is set to the requestId received from the filter protocol.
|
||||
|
|
|
@ -12,7 +12,7 @@ type
|
|||
ContentFilter* = object
|
||||
contentTopic*: ContentTopic
|
||||
|
||||
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}
|
||||
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].}
|
||||
|
||||
Filter* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils, options],
|
||||
bearssl,
|
||||
|
|
|
@ -360,7 +360,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
|||
return historyRes
|
||||
|
||||
|
||||
proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
|
||||
proc init*(ws: WakuStore) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
var message = await conn.readLp(64*1024)
|
||||
var res = HistoryRPC.init(message)
|
||||
|
@ -411,14 +411,14 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
|
|||
|
||||
|
||||
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
|
||||
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true): T {.raises: [Defect, Exception]} =
|
||||
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true): T =
|
||||
debug "init"
|
||||
var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages)
|
||||
output.init()
|
||||
return output
|
||||
|
||||
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
|
||||
proc setPeer*(ws: WakuStore, peer: PeerInfo) {.raises: [Defect, Exception]} =
|
||||
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
|
||||
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
||||
waku_store_peers.inc()
|
||||
|
||||
|
@ -519,7 +519,7 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: PeerInfo): Fu
|
|||
debug "query is", q=q
|
||||
|
||||
var hasNextPage = true
|
||||
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
|
||||
proc handler(response: HistoryResponse) {.gcsafe.} =
|
||||
# store messages
|
||||
for m in response.messages.items: messageList.add(m)
|
||||
|
||||
|
@ -590,7 +590,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
|||
|
||||
var dismissed: uint = 0
|
||||
var added: uint = 0
|
||||
proc save(msgList: seq[WakuMessage]) {.raises: [Defect, Exception].} =
|
||||
proc save(msgList: seq[WakuMessage]) =
|
||||
debug "save proc is called"
|
||||
# exclude index from the comparison criteria
|
||||
let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg)
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
# Collection of utilities related to Waku peers
|
||||
import
|
||||
std/strutils,
|
||||
libp2p/multiaddress,
|
||||
libp2p/peerinfo
|
||||
|
||||
proc initAddress(T: type MultiAddress, str: string): T =
|
||||
proc initAddress(T: type MultiAddress, str: string): T {.raises: [Defect, ValueError, LPError].}=
|
||||
# @TODO: Rather than raising exceptions, this should return a Result
|
||||
let address = MultiAddress.init(str).tryGet()
|
||||
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
|
||||
result = address
|
||||
|
@ -14,7 +17,7 @@ proc initAddress(T: type MultiAddress, str: string): T =
|
|||
|
||||
## Parses a fully qualified peer multiaddr, in the
|
||||
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
|
||||
proc parsePeerInfo*(address: string): PeerInfo =
|
||||
proc parsePeerInfo*(address: string): PeerInfo {.raises: [Defect, ValueError, LPError].}=
|
||||
let multiAddr = MultiAddress.initAddress(address)
|
||||
|
||||
var
|
||||
|
|
|
@ -28,7 +28,7 @@ proc echo() {.exportc.} =
|
|||
# (extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
|
||||
# Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
# Port(uint16(conf.udpPort) + conf.portsShift))
|
||||
# node = WakuNode.init(conf.nodeKey, conf.listenAddress,
|
||||
# node = WakuNode.new(conf.nodeKey, conf.listenAddress,
|
||||
# Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
|
||||
#
|
||||
# await node.start()
|
||||
|
|
Loading…
Reference in New Issue