diff --git a/docs/api/v2/node.md b/docs/api/v2/node.md index 17d3d2b58..9caa796e3 100644 --- a/docs/api/v2/node.md +++ b/docs/api/v2/node.md @@ -17,7 +17,7 @@ the consumer wants to make. These methods are: ```Nim proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, bindIp: ValidIpAddress, bindPort: Port, - extIp = none[ValidIpAddress](), extPort = none[Port](), topics = newSeq[string]()): T = + extIp = none[ValidIpAddress](), extPort = none[Port]()): T = ## Creates a Waku Node. ## ## Status: Implemented. diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index b91756fe0..d1a1ab785 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -62,18 +62,10 @@ proc parsePeer(address: string): PeerInfo = let parts = address.split("/") result = PeerInfo.init(parts[^1], [multiAddr]) -# NOTE Dialing on WakuRelay specifically -proc dialPeer(c: Chat, address: string) {.async.} = - let peer = parsePeer(address) - echo &"dialing peer: {peer.peerId}" - # XXX Discarding conn, do we want to keep this here? - discard await c.node.switch.dial(peer, WakuRelayCodec) - c.connected = true - -proc connectToNodes(c: Chat, nodes: openArray[string]) = +proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} = echo "Connecting to nodes" - for nodeId in nodes: - discard dialPeer(c, nodeId) + await c.node.connectToNodes(nodes) + c.connected = true proc publish(c: Chat, line: string) = let payload = cast[seq[byte]](line) @@ -117,7 +109,7 @@ proc writeAndPrint(c: Chat) {.async.} = echo "enter address of remote peer" let address = await c.transp.readLine() if address.len > 0: - await c.dialPeer(address) + await c.connectToNodes(@[address]) # elif line.startsWith("/exit"): # if p.connected and p.conn.closed.not: @@ -135,7 +127,7 @@ proc writeAndPrint(c: Chat) {.async.} = else: try: if line.startsWith("/") and "p2p" in line: - await c.dialPeer(line) + await c.connectToNodes(@[line]) except: echo &"unable to dial remote peer {line}" echo getCurrentExceptionMsg() @@ -171,7 +163,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true) if conf.staticnodes.len > 0: - connectToNodes(chat, conf.staticnodes) + await connectToNodes(chat, conf.staticnodes) let peerInfo = node.peerInfo let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId @@ -199,8 +191,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = echo &"{payload}" info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic - # XXX Timing issue with subscribe, need to wait a bit to ensure GRAFT message is sent - await sleepAsync(5.seconds) let topic = cast[Topic](DefaultTopic) await node.subscribe(topic, handler) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 026680168..7f808a0cb 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -198,3 +198,54 @@ procSuite "WakuNode": (await completionFut.withTimeout(5.seconds)) == true await node1.stop() await node2.stop() + + asyncTest "Messages are correctly relayed": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), + Port(60003)) + pubSubTopic = "test" + contentTopic = ContentTopic(1) + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.peerInfo]) + await node3.connectToNodes(@[node2.peerInfo]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + await node3.subscribe(pubSubTopic, relayHandler) + await sleepAsync(2000.millis) + + node1.publish(pubSubTopic, message) + await sleepAsync(2000.millis) + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() + await node3.stop() diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index a0532a243..29387c833 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -94,7 +94,7 @@ proc start*(node: WakuNode) {.async.} = ## Status: Implemented. ## node.libp2pTransportLoops = await node.switch.start() - + # TODO Get this from WakuNode obj let peerInfo = node.peerInfo info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs @@ -213,8 +213,6 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async node.wakuRelay = wakuRelay node.switch.mount(wakuRelay) - await sleepAsync(5.seconds) - info "mounting relay" proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) @@ -248,7 +246,7 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} = # TODO Keep track of conn and connected state somewhere (WakuRelay?) #p.conn = await p.switch.dial(remotePeer, WakuRelayCodec) #p.connected = true - discard n.switch.dial(remotePeer, WakuRelayCodec) + discard await n.switch.dial(remotePeer, WakuRelayCodec) info "Post switch dial" proc setStorePeer*(n: WakuNode, address: string) = @@ -265,11 +263,32 @@ proc setFilterPeer*(n: WakuNode, address: string) = n.wakuFilter.setPeer(remotePeer) -proc connectToNodes*(n: WakuNode, nodes: openArray[string]) = +proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} = for nodeId in nodes: info "connectToNodes", node = nodeId # XXX: This seems...brittle - discard dialPeer(n, nodeId) + await dialPeer(n, nodeId) + + # The issue seems to be around peers not being fully connected when + # trying to subscribe. So what we do is sleep to guarantee nodes are + # fully connected. + # + # This issue was known to Dmitiry on nim-libp2p and may be resolvable + # later. + await sleepAsync(5.seconds) + +proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} = + for peerInfo in nodes: + info "connectToNodes", peer = peerInfo + discard await n.switch.dial(peerInfo, WakuRelayCodec) + + # The issue seems to be around peers not being fully connected when + # trying to subscribe. So what we do is sleep to guarantee nodes are + # fully connected. + # + # This issue was known to Dmitiry on nim-libp2p and may be resolvable + # later. + await sleepAsync(5.seconds) when isMainModule: import @@ -320,7 +339,7 @@ when isMainModule: waitFor mountRelay(node, conf.topics.split(" ")) if conf.staticnodes.len > 0: - connectToNodes(node, conf.staticnodes) + waitFor connectToNodes(node, conf.staticnodes) if conf.storenode != "": setStorePeer(node, conf.storenode) diff --git a/waku/node/wakubridge.nim b/waku/node/wakubridge.nim index ec7db0865..5bd4e2cca 100644 --- a/waku/node/wakubridge.nim +++ b/waku/node/wakubridge.nim @@ -85,7 +85,7 @@ proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} = waitFor mountRelay(node, config.topics.split(" ")) if config.staticnodesv2.len > 0: - connectToNodes(node, config.staticnodesv2) + waitFor connectToNodes(node, config.staticnodesv2) if config.storenode != "": setStorePeer(node, config.storenode)