mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-26 14:51:49 +00:00
fixes/wait-for-start (#228)
* fixes subscribe to wait for start * moved around * added test * fix * eol * rm * removed as waku is default * updated docs * fix * Update node.md * Update node.md * Update wakunode2.nim * Update wakunode2.nim * fix test * rm * fix * fixes * fixes * fix * fix * using connect * fix * fix * Update wakubridge.nim * Update wakunode2.nim
This commit is contained in:
parent
dcbefbf4d6
commit
bfb29338a1
@ -17,7 +17,7 @@ the consumer wants to make. These methods are:
|
||||
```Nim
|
||||
proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
||||
bindIp: ValidIpAddress, bindPort: Port,
|
||||
extIp = none[ValidIpAddress](), extPort = none[Port](), topics = newSeq[string]()): T =
|
||||
extIp = none[ValidIpAddress](), extPort = none[Port]()): T =
|
||||
## Creates a Waku Node.
|
||||
##
|
||||
## Status: Implemented.
|
||||
|
@ -62,18 +62,10 @@ proc parsePeer(address: string): PeerInfo =
|
||||
let parts = address.split("/")
|
||||
result = PeerInfo.init(parts[^1], [multiAddr])
|
||||
|
||||
# NOTE Dialing on WakuRelay specifically
|
||||
proc dialPeer(c: Chat, address: string) {.async.} =
|
||||
let peer = parsePeer(address)
|
||||
echo &"dialing peer: {peer.peerId}"
|
||||
# XXX Discarding conn, do we want to keep this here?
|
||||
discard await c.node.switch.dial(peer, WakuRelayCodec)
|
||||
c.connected = true
|
||||
|
||||
proc connectToNodes(c: Chat, nodes: openArray[string]) =
|
||||
proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} =
|
||||
echo "Connecting to nodes"
|
||||
for nodeId in nodes:
|
||||
discard dialPeer(c, nodeId)
|
||||
await c.node.connectToNodes(nodes)
|
||||
c.connected = true
|
||||
|
||||
proc publish(c: Chat, line: string) =
|
||||
let payload = cast[seq[byte]](line)
|
||||
@ -117,7 +109,7 @@ proc writeAndPrint(c: Chat) {.async.} =
|
||||
echo "enter address of remote peer"
|
||||
let address = await c.transp.readLine()
|
||||
if address.len > 0:
|
||||
await c.dialPeer(address)
|
||||
await c.connectToNodes(@[address])
|
||||
|
||||
# elif line.startsWith("/exit"):
|
||||
# if p.connected and p.conn.closed.not:
|
||||
@ -135,7 +127,7 @@ proc writeAndPrint(c: Chat) {.async.} =
|
||||
else:
|
||||
try:
|
||||
if line.startsWith("/") and "p2p" in line:
|
||||
await c.dialPeer(line)
|
||||
await c.connectToNodes(@[line])
|
||||
except:
|
||||
echo &"unable to dial remote peer {line}"
|
||||
echo getCurrentExceptionMsg()
|
||||
@ -171,7 +163,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||
var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true)
|
||||
|
||||
if conf.staticnodes.len > 0:
|
||||
connectToNodes(chat, conf.staticnodes)
|
||||
await connectToNodes(chat, conf.staticnodes)
|
||||
|
||||
let peerInfo = node.peerInfo
|
||||
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
|
||||
@ -199,8 +191,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||
echo &"{payload}"
|
||||
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
|
||||
|
||||
# XXX Timing issue with subscribe, need to wait a bit to ensure GRAFT message is sent
|
||||
await sleepAsync(5.seconds)
|
||||
let topic = cast[Topic](DefaultTopic)
|
||||
await node.subscribe(topic, handler)
|
||||
|
||||
|
@ -198,3 +198,54 @@ procSuite "WakuNode":
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Messages are correctly relayed":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60003))
|
||||
pubSubTopic = "test"
|
||||
contentTopic = ContentTopic(1)
|
||||
payload = "hello world".toBytes()
|
||||
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
||||
|
||||
await node1.start()
|
||||
await node1.mountRelay(@[pubSubTopic])
|
||||
|
||||
await node2.start()
|
||||
await node2.mountRelay(@[pubSubTopic])
|
||||
|
||||
await node3.start()
|
||||
await node3.mountRelay(@[pubSubTopic])
|
||||
|
||||
await node1.connectToNodes(@[node2.peerInfo])
|
||||
await node3.connectToNodes(@[node2.peerInfo])
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
let msg = WakuMessage.init(data)
|
||||
if msg.isOk():
|
||||
let val = msg.value()
|
||||
check:
|
||||
topic == pubSubTopic
|
||||
val.contentTopic == contentTopic
|
||||
val.payload == payload
|
||||
completionFut.complete(true)
|
||||
|
||||
await node3.subscribe(pubSubTopic, relayHandler)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
node1.publish(pubSubTopic, message)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
|
@ -94,7 +94,7 @@ proc start*(node: WakuNode) {.async.} =
|
||||
## Status: Implemented.
|
||||
##
|
||||
node.libp2pTransportLoops = await node.switch.start()
|
||||
|
||||
|
||||
# TODO Get this from WakuNode obj
|
||||
let peerInfo = node.peerInfo
|
||||
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
|
||||
@ -213,8 +213,6 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async
|
||||
node.wakuRelay = wakuRelay
|
||||
node.switch.mount(wakuRelay)
|
||||
|
||||
await sleepAsync(5.seconds)
|
||||
|
||||
info "mounting relay"
|
||||
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
let msg = WakuMessage.init(data)
|
||||
@ -248,7 +246,7 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} =
|
||||
# TODO Keep track of conn and connected state somewhere (WakuRelay?)
|
||||
#p.conn = await p.switch.dial(remotePeer, WakuRelayCodec)
|
||||
#p.connected = true
|
||||
discard n.switch.dial(remotePeer, WakuRelayCodec)
|
||||
discard await n.switch.dial(remotePeer, WakuRelayCodec)
|
||||
info "Post switch dial"
|
||||
|
||||
proc setStorePeer*(n: WakuNode, address: string) =
|
||||
@ -265,11 +263,32 @@ proc setFilterPeer*(n: WakuNode, address: string) =
|
||||
|
||||
n.wakuFilter.setPeer(remotePeer)
|
||||
|
||||
proc connectToNodes*(n: WakuNode, nodes: openArray[string]) =
|
||||
proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
|
||||
for nodeId in nodes:
|
||||
info "connectToNodes", node = nodeId
|
||||
# XXX: This seems...brittle
|
||||
discard dialPeer(n, nodeId)
|
||||
await dialPeer(n, nodeId)
|
||||
|
||||
# The issue seems to be around peers not being fully connected when
|
||||
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
||||
# fully connected.
|
||||
#
|
||||
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
|
||||
# later.
|
||||
await sleepAsync(5.seconds)
|
||||
|
||||
proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} =
|
||||
for peerInfo in nodes:
|
||||
info "connectToNodes", peer = peerInfo
|
||||
discard await n.switch.dial(peerInfo, WakuRelayCodec)
|
||||
|
||||
# The issue seems to be around peers not being fully connected when
|
||||
# trying to subscribe. So what we do is sleep to guarantee nodes are
|
||||
# fully connected.
|
||||
#
|
||||
# This issue was known to Dmitiry on nim-libp2p and may be resolvable
|
||||
# later.
|
||||
await sleepAsync(5.seconds)
|
||||
|
||||
when isMainModule:
|
||||
import
|
||||
@ -320,7 +339,7 @@ when isMainModule:
|
||||
waitFor mountRelay(node, conf.topics.split(" "))
|
||||
|
||||
if conf.staticnodes.len > 0:
|
||||
connectToNodes(node, conf.staticnodes)
|
||||
waitFor connectToNodes(node, conf.staticnodes)
|
||||
|
||||
if conf.storenode != "":
|
||||
setStorePeer(node, conf.storenode)
|
||||
|
@ -85,7 +85,7 @@ proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} =
|
||||
waitFor mountRelay(node, config.topics.split(" "))
|
||||
|
||||
if config.staticnodesv2.len > 0:
|
||||
connectToNodes(node, config.staticnodesv2)
|
||||
waitFor connectToNodes(node, config.staticnodesv2)
|
||||
|
||||
if config.storenode != "":
|
||||
setStorePeer(node, config.storenode)
|
||||
|
Loading…
x
Reference in New Issue
Block a user