From 36ff537612b8a43f896114b60dd6b474063a3c8e Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Tue, 4 May 2021 14:11:41 +0200 Subject: [PATCH] Fix: light-mode relay for all light protocols (#529) * Fix: light-mode relay for all light protocols * Clear up confusing use of overloaded concepts --- CHANGELOG.md | 2 ++ tests/v2/test_wakunode.nim | 48 +++++++++++++++++++++++++++++++++++++ waku/v2/node/wakunode2.nim | 49 +++++++++++++++++++------------------- 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 671fae6d9..af6abfa6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ - Fixed: content filtering now works on any PubSub topic and not just the `waku` default. - Added the `pubsubTopic` field to the `HistoryQuery`. Now, the message history can be filtered and queried based on the `pubsubTopic`. - Added a new table of `Message` to the message store db. The new table has an additional column of `pubsubTopic` and will be used instead of the old table `messages`. The message history in the old table `messages` will not be accessed and have to be removed. +- Fix: allow mounting light protocols without `relay` +- Add `keep-alive` option to maintain stable connection to `relay` peers on idle topics ## 2021-01-05 v0.2 diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 02d5fbc7c..1df4fe1c2 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -210,6 +210,54 @@ procSuite "WakuNode": await node1.stop() await node2.stop() + + asyncTest "Filter protocol works on node without relay capability": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + defaultTopic = "/waku/2/default-waku/proto" + contentTopic = "defaultCT" + payload = @[byte 1] + message = WakuMessage(payload: payload, contentTopic: contentTopic) + filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) + + await node1.start() + node1.mountRelay() + node1.mountFilter() + + await node2.start() + node2.mountRelay(relayMessages=false) # Do not start WakuRelay or subscribe to any topics + node2.mountFilter() + node2.wakuFilter.setPeer(node1.peerInfo) + + check: + node1.wakuRelay.isNil == false # Node1 is a full node + node2.wakuRelay.isNil == true # Node 2 is a light node + + var completeFut = newFuture[bool]() + + proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = + check: + msg.payload == payload + msg.contentTopic == contentTopic + completeFut.complete(true) + + # Subscribe a contentFilter to trigger a specific application handler when + # WakuMessages with that content are received + await node2.subscribe(filterRequest, filterHandler) + + await sleepAsync(2000.millis) + + # Let's check that content filtering works on the default topic + await node1.publish(defaultTopic, message) + + check: + (await completeFut.withTimeout(5.seconds)) == true + + await node1.stop() + await node2.stop() asyncTest "Store protocol returns expected message": let diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 2591f8a4b..44ddf9198 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -367,20 +367,6 @@ proc mountFilter*(node: WakuNode) = node.filters.notify(message, requestId) # Trigger filter handlers on a light node waku_node_messages.inc(labelValues = ["filter"]) - if node.wakuRelay.isNil: - debug "light node: mounting relay without starting" - ## WakuFilter currently requires WakuRelay to be mounted in order to work. - ## This is to allow protocol stream negotation with full nodes to succeed. - ## Here we mount relay on the switch only, but do not subscribe to any pubsub - ## topics. We also never start the relay protocol. - ## @TODO: remove WakuRelay dependency - node.switch.mount(WakuRelay.init( - switch = node.switch, - triggerSelf = true, - sign = false, - verifySignature = false - )) - node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) node.switch.mount(node.wakuFilter) node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) @@ -455,7 +441,11 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) = let pb = PubSub(node.wakuRelay) pb.addValidator(pubsubTopic, validator) -proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled = false, keepAlive = false) {.gcsafe.} = +proc mountRelay*(node: WakuNode, + topics: seq[string] = newSeq[string](), + rlnRelayEnabled = false, + keepAlive = false, + relayMessages = true) {.gcsafe.} = let wakuRelay = WakuRelay.init( switch = node.switch, # Use default @@ -464,13 +454,22 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela sign = false, verifySignature = false ) + + info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, keepAlive=keepAlive, relayMessages=relayMessages - wakuRelay.keepAlive = keepAlive - - node.wakuRelay = wakuRelay node.switch.mount(wakuRelay) - info "mounting relay" + if not relayMessages: + ## Some nodes may choose not to have the capability to relay messages (e.g. "light" nodes). + ## All nodes, however, currently require WakuRelay, regardless of desired capabilities. + ## This is to allow protocol stream negotation with relay-capable nodes to succeed. + ## Here we mount relay on the switch only, but do not proceed to subscribe to any pubsub + ## topics. We also never start the relay protocol. node.wakuRelay remains nil. + ## @TODO: in future, this WakuRelay dependency will be removed completely + return + + node.wakuRelay = wakuRelay + wakuRelay.keepAlive = keepAlive node.subscribe(defaultTopic, none(TopicHandler)) @@ -510,7 +509,6 @@ proc mountLightPush*(node: WakuNode) = node.switch.mount(node.wakuLightPush) - ## Helpers proc dialPeer*(n: WakuNode, address: string) {.async.} = info "dialPeer", address = address @@ -693,11 +691,14 @@ when isMainModule: setStorePeer(node, conf.storenode) # Relay setup - if conf.relay: # True by default - mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay, keepAlive = conf.keepAlive) + mountRelay(node, + conf.topics.split(" "), + rlnRelayEnabled = conf.rlnrelay, + keepAlive = conf.keepAlive, + relayMessages = conf.relay) # Indicates if node is capable to relay messages - if conf.staticnodes.len > 0: - waitFor connectToNodes(node, conf.staticnodes) + if conf.staticnodes.len > 0: + waitFor connectToNodes(node, conf.staticnodes) # NOTE Must be mounted after relay if conf.lightpush: