diff --git a/CHANGELOG.md b/CHANGELOG.md index accdce6c2..c06b043f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ - `relay`, `filter`, `store` and `swap` peers are now stored in a common, shared peer store and no longer in separate sets. - Admin API now provides a `post` method to connect to peers on an ad-hoc basis - Added persistent peer storage. A node will now attempt to reconnect to `relay` peers after a restart. +- Changed `contentTopic` back to a string +- Fixed: content filtering now works on any PubSub topic and not just the `waku` default. ## 2021-01-05 v0.2 diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index b89167090..40ed3bc4e 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -132,6 +132,82 @@ procSuite "WakuNode": (await completionFut.withTimeout(5.seconds)) == true await node1.stop() await node2.stop() + + asyncTest "Can receive filtered messages published on both default and other topics": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + defaultTopic = "/waku/2/default-waku/proto" + otherTopic = "/non/waku/formatted" + defaultContentTopic = "defaultCT" + otherContentTopic = "otherCT" + defaultPayload = @[byte 1] + otherPayload = @[byte 9] + defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic) + otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic) + defaultFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[defaultContentTopic])], subscribe: true) + otherFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[otherContentTopic])], subscribe: true) + + await node1.start() + node1.mountRelay() + node1.mountFilter() + + await node2.start() + node2.mountRelay() + node2.mountFilter() + node2.wakuFilter.setPeer(node1.peerInfo) + + var defaultComplete = newFuture[bool]() + var otherComplete = newFuture[bool]() + + # Subscribe nodes 1 and 2 to otherTopic + proc emptyHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # Do not notify filters or subscriptions here. This should be default behaviour for all topics + discard + + node1.subscribe(otherTopic, emptyHandler) + node2.subscribe(otherTopic, emptyHandler) + + await sleepAsync(2000.millis) + + proc defaultHandler(msg: WakuMessage) {.gcsafe, closure.} = + check: + msg.payload == defaultPayload + msg.contentTopic == defaultContentTopic + defaultComplete.complete(true) + + proc otherHandler(msg: WakuMessage) {.gcsafe, closure.} = + check: + msg.payload == otherPayload + msg.contentTopic == otherContentTopic + otherComplete.complete(true) + + # Subscribe a contentFilter to trigger a specific application handler when + # WakuMessages with that content are received + await node2.subscribe(defaultFR, defaultHandler) + + await sleepAsync(2000.millis) + + # Let's check that content filtering works on the default topic + await node1.publish(defaultTopic, defaultMessage) + + check: + (await defaultComplete.withTimeout(5.seconds)) == true + + # Now check that content filtering works on other topics + await node2.subscribe(otherFR, otherHandler) + + await sleepAsync(2000.millis) + + await node1.publish(otherTopic,otherMessage) + + check: + (await otherComplete.withTimeout(5.seconds)) == true + + await node1.stop() + await node2.stop() asyncTest "Store protocol returns expected message": let diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 8e676c91c..5b1fbb971 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az182-10: +# Libtool was configured on host fv-az269-370: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 0211d3bcb..7da1cb847 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -176,16 +176,37 @@ proc stop*(node: WakuNode) {.async.} = node.started = false +proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = + info "subscribe", topic=topic + + proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # A default handler should be registered for all topics + trace "Hit default handler", topic=topic, data=data + + let msg = WakuMessage.init(data) + if msg.isOk(): + node.filters.notify(msg.value(), "") # Trigger filter handlers on a light node + await node.subscriptions.notify(topic, msg.value()) # Trigger subscription handlers on a store/filter node + waku_node_messages.inc(labelValues = ["relay"]) + + let wakuRelay = node.wakuRelay + + if topic notin PubSub(wakuRelay).topics: + # Add default handler only for new topics + debug "Registering default handler", topic=topic + wakuRelay.subscribe(topic, defaultHandler) + + if handler.isSome: + debug "Registering handler", topic=topic + wakuRelay.subscribe(topic, handler.get()) + proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = ## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## this topic. TopicHandler is a method that takes a topic and some data. ## ## NOTE The data field SHOULD be decoded as a WakuMessage. ## Status: Implemented. - info "subscribe", topic=topic - - let wakuRelay = node.wakuRelay - wakuRelay.subscribe(topic, handler) + node.subscribe(topic, some(handler)) proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. @@ -369,7 +390,6 @@ proc mountRlnRelay*(node: WakuNode, ethClientAddress: Option[string] = none(stri node.wakuRlnRelay = rlnPeer - proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) = ## this procedure is a thin wrapper for the pubsub addValidator method ## it sets message validator on the given pubsubTopic, the validator will check that @@ -401,20 +421,11 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela waitFor node.peerManager.reconnectPeers(WakuRelayCodec) info "mounting relay" - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - node.filters.notify(msg.value(), "") - await node.subscriptions.notify(topic, msg.value()) - waku_node_messages.inc(labelValues = ["relay"]) - node.wakuRelay.subscribe(defaultTopic, relayHandler) + node.subscribe(defaultTopic, none(TopicHandler)) for topic in topics: - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - debug "Hit handler", topic=topic, data=data - - node.subscribe(topic, handler) + node.subscribe(topic, none(TopicHandler)) if rlnRelayEnabled: # TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc @@ -431,7 +442,6 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela info "relay mounted and started successfully" - ## Helpers proc dialPeer*(n: WakuNode, address: string) {.async.} = info "dialPeer", address = address