diff --git a/Dockerfile b/Dockerfile index eb4579497..ec370a9c8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,7 @@ RUN make -j$(nproc) $MAKE_TARGET NIM_PARAMS="$NIM_PARAMS" FROM alpine:3.12 -ARG MAKE_TARGET=wakunode +ARG MAKE_TARGET=wakunode2 LABEL maintainer="jakub@status.im" LABEL source="https://github.com/status-im/nim-waku" @@ -45,6 +45,6 @@ COPY --from=nim-build /app/build/$MAKE_TARGET /usr/local/bin/ # Symlink the correct wakunode binary RUN ln -sv /usr/local/bin/$MAKE_TARGET /usr/bin/wakunode -ENTRYPOINT ["/usr/bin/wakunode"] +ENTRYPOINT ["/usr/bin/wakunode", "--store:true", "--relay:true", "--filter:true"] # By default just show help if called without arguments CMD ["--help"] diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index 6f801ac64..d12d92d98 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -25,6 +25,7 @@ proc runBackground() {.async.} = Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort) await node.start() + await node.mountRelay() # Subscribe to a topic let topic = cast[Topic]("foobar") diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 32fbd3930..b456eaabe 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -160,10 +160,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = Port(uint16(conf.tcpPort) + conf.portsShift), Port(uint16(conf.udpPort) + conf.portsShift)) node = WakuNode.init(conf.nodeKey, conf.libp2pAddress, - Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort, conf.topics.split(" ")) + Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort) # waitFor vs await await node.start() + await node.mountRelay(conf.topics.split(" ")) var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true) @@ -175,6 +176,8 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = echo &"Listening on\n {listenStr}" if conf.storenode != "": + node.mountStore() + node.wakuStore.setPeer(parsePeer(conf.storenode)) proc storeHandler(response: HistoryResponse) {.gcsafe.} = diff --git a/tests/v2/test_rpc_waku.nim b/tests/v2/test_rpc_waku.nim index 8cae40b52..f055bce01 100644 --- a/tests/v2/test_rpc_waku.nim +++ b/tests/v2/test_rpc_waku.nim @@ -22,10 +22,12 @@ suite "Waku v2 Remote Procedure Calls": bindIp = ValidIpAddress.init("0.0.0.0") extIp = ValidIpAddress.init("127.0.0.1") port = Port(9000) - node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port), @["waku"]) + node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port)) waitFor node.start() + waitFor node.mountRelay(@["waku"]) + # RPC server setup let rpcPort = Port(8545) diff --git a/tests/v2/test_waku.nim b/tests/v2/test_waku.nim index 200c20cef..3e592ad49 100644 --- a/tests/v2/test_waku.nim +++ b/tests/v2/test_waku.nim @@ -78,11 +78,15 @@ procSuite "FloodSub": let nodes = generateNodes(2) + nodesFut = await allFinished( nodes[0].start(), nodes[1].start() ) + for node in nodes: + await node.mountRelay() + await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index dc56b64bf..e1752c562 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -44,12 +44,13 @@ procSuite "WakuNode": await node.start() + await node.mountRelay() + # Subscribe our node to the pubSubTopic where all chat data go onto. await node.subscribe(pubSubTopic, relayHandler) # Subscribe a contentFilter to trigger a specific application handler when # WakuMessages with that content are received - # node2.wakuFilter.setPeer(node1.peerInfo) await node.subscribe(filterRequest, contentHandler) await sleepAsync(2000.millis) @@ -95,6 +96,12 @@ procSuite "WakuNode": await allFutures([node1.start(), node2.start()]) + await node1.mountRelay() + await node2.mountRelay() + + node1.mountFilter() + node2.mountFilter() + # Subscribe our node to the pubSubTopic where all chat data go onto. await node1.subscribe(pubSubTopic, relayHandler) # Subscribe a contentFilter to trigger a specific application handler when @@ -132,7 +139,9 @@ procSuite "WakuNode": var completionFut = newFuture[bool]() await node1.start() + node1.mountStore() await node2.start() + node2.mountStore() await node2.subscriptions.notify("waku", message) @@ -166,7 +175,9 @@ procSuite "WakuNode": var completionFut = newFuture[bool]() await node1.start() + node1.mountFilter() await node2.start() + node2.mountFilter() node1.wakuFilter.setPeer(node2.peerInfo) diff --git a/waku/node/v2/config.nim b/waku/node/v2/config.nim index ef1547744..7cee8fab2 100644 --- a/waku/node/v2/config.nim +++ b/waku/node/v2/config.nim @@ -73,6 +73,21 @@ type defaultValue: "" name: "storenode" }: string + store* {. + desc: "Flag whether to start store protocol", + defaultValue: false + name: "store" }: bool + + filter* {. + desc: "Flag whether to start filter protocol", + defaultValue: false + name: "filter" }: bool + + relay* {. + desc: "Flag whether to start relay protocol", + defaultValue: true + name: "relay" }: bool + filternode* {. desc: "Enode URL to filter.", defaultValue: "" diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 94618e828..431de9832 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -53,7 +53,7 @@ template tcpEndPoint(address, port): auto = proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, bindIp: ValidIpAddress, bindPort: Port, - extIp = none[ValidIpAddress](), extPort = none[Port](), topics = newSeq[string]()): T = + extIp = none[ValidIpAddress](), extPort = none[Port]()): T = ## Creates a Waku Node. ## ## Status: Implemented. @@ -78,35 +78,14 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, # msgIdProvider = msgIdProvider, # triggerSelf = true, sign = false, # verifySignature = false).PubSub - let wakuRelay = WakuRelay.init( - switch = switch, - # Use default - #msgIdProvider = msgIdProvider, - triggerSelf = true, - sign = false, - verifySignature = false) - # This gets messy with: .PubSub - switch.mount(wakuRelay) - result = WakuNode( switch: switch, rng: rng, peerInfo: peerInfo, - wakuRelay: wakuRelay, subscriptions: newTable[string, MessageNotificationSubscription](), filters: initTable[string, Filter]() ) - # TODO This is _not_ safe. Subscribe should happen in start and after things settled down. - # Otherwise GRAFT message isn't sent to a node. - for topic in topics: - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - debug "Hit handler", topic=topic, data=data - - # XXX: Is using discard here fine? Not sure if we want init to be async? - # Can also move this to the start proc, possibly wiser? - discard result.subscribe(topic, handler) - proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node. ## @@ -114,29 +93,6 @@ proc start*(node: WakuNode) {.async.} = ## node.libp2pTransportLoops = await node.switch.start() - # NOTE WakuRelay is being instantiated as part of initing node - - node.wakuStore = WakuStore.init(node.switch, node.rng) - node.switch.mount(node.wakuStore) - node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) - - proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} = - info "push received" - for message in msg.messages: - node.filters.notify(message, requestId) - - node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler) - node.switch.mount(node.wakuFilter) - node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) - - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - node.filters.notify(msg.value(), "") - await node.subscriptions.notify(topic, msg.value()) - - await node.wakuRelay.subscribe("waku", relayHandler) - # TODO Get this from WakuNode obj let peerInfo = node.peerInfo info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs @@ -145,8 +101,8 @@ proc start*(node: WakuNode) {.async.} = info "Listening on", full = listenStr proc stop*(node: WakuNode) {.async.} = - let wakuRelay = node.wakuRelay - await wakuRelay.stop() + if not node.wakuRelay.isNil: + await node.wakuRelay.stop() await node.switch.stop() @@ -168,8 +124,10 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa ## Status: Implemented. info "subscribe content", filter=request - # @TODO: ERROR HANDLING - let id = await node.wakuFilter.subscribe(request) + var id = generateRequestId(node.rng) + if node.wakuFilter.isNil == false: + # @TODO: ERROR HANDLING + id = await node.wakuFilter.subscribe(request) node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler) proc unsubscribe*(w: WakuNode, topic: Topic) = @@ -223,6 +181,56 @@ proc info*(node: WakuNode): WakuInfo = let wakuInfo = WakuInfo(listenStr: listenStr) return wakuInfo +proc mountFilter*(node: WakuNode) = + info "mounting filter" + proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} = + info "push received" + for message in msg.messages: + node.filters.notify(message, requestId) + + node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler) + node.switch.mount(node.wakuFilter) + node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) + +proc mountStore*(node: WakuNode) = + info "mounting store" + node.wakuStore = WakuStore.init(node.switch, node.rng) + node.switch.mount(node.wakuStore) + node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) + +proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string]()) {.async, gcsafe.} = + let wakuRelay = WakuRelay.init( + switch = node.switch, + # Use default + #msgIdProvider = msgIdProvider, + triggerSelf = true, + sign = false, + verifySignature = false + ) + + node.wakuRelay = wakuRelay + node.switch.mount(wakuRelay) + + await sleepAsync(5.seconds) + + info "mounting relay" + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + node.filters.notify(msg.value(), "") + await node.subscriptions.notify(topic, msg.value()) + + await node.wakuRelay.subscribe("waku", relayHandler) + + for topic in topics: + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + debug "Hit handler", topic=topic, data=data + + # XXX: Is using discard here fine? Not sure if we want init to be async? + # Can also move this to the start proc, possibly wiser? + discard node.subscribe(topic, handler) + + when isMainModule: import std/strutils, @@ -300,10 +308,19 @@ when isMainModule: Port(uint16(conf.tcpPort) + conf.portsShift), Port(uint16(conf.udpPort) + conf.portsShift)) node = WakuNode.init(conf.nodeKey, conf.libp2pAddress, - Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort, conf.topics.split(" ")) + Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort) waitFor node.start() + if conf.store: + mountStore(node) + + if conf.filter: + mountFilter(node) + + if conf.relay: + waitFor mountRelay(node, conf.topics.split(" ")) + if conf.staticnodes.len > 0: connectToNodes(node, conf.staticnodes)