From 274101af4345e42786ec2e816ff540ac7ca78506 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Fri, 10 Feb 2023 15:17:50 +0100 Subject: [PATCH] refactor(relay): improve waku_relay apis and add tests --- tests/all_tests_v2.nim | 7 +- tests/v2/test_rest_relay_api.nim | 39 +-- tests/v2/testlib/switch.nim | 7 +- .../{ => waku_relay}/resources/test_cert.pem | 0 .../{ => waku_relay}/resources/test_key.pem | 0 tests/v2/waku_relay/test_waku_relay.nim | 232 ++++++++++++++++++ .../{ => waku_relay}/test_wakunode_relay.nim | 56 +++-- waku/v2/node/rest/relay/api_types.nim | 20 +- waku/v2/node/waku_node.nim | 12 +- waku/v2/protocol/waku_relay.nim | 37 ++- waku/v2/protocol/waku_rln_relay/utils.nim | 15 +- 11 files changed, 331 insertions(+), 94 deletions(-) rename tests/v2/{ => waku_relay}/resources/test_cert.pem (100%) rename tests/v2/{ => waku_relay}/resources/test_key.pem (100%) create mode 100644 tests/v2/waku_relay/test_waku_relay.nim rename tests/v2/{ => waku_relay}/test_wakunode_relay.nim (93%) diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 58f864144..077af6d36 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -28,10 +28,15 @@ when defined(waku_exp_store_resume): import ./v2/waku_store/test_resume +# Waku relay test suite +import + ./v2/waku_relay/test_waku_relay, + ./v2/waku_relay/test_wakunode_relay + + import # Waku v2 tests ./v2/test_wakunode, - ./v2/test_wakunode_relay, # Waku LightPush ./v2/test_waku_lightpush, ./v2/test_wakunode_lightpush, diff --git a/tests/v2/test_rest_relay_api.nim b/tests/v2/test_rest_relay_api.nim index fd1b460bd..039eeafe4 100644 --- a/tests/v2/test_rest_relay_api.nim +++ b/tests/v2/test_rest_relay_api.nim @@ -6,19 +6,19 @@ import stew/shims/net, testutils/unittests, presto, presto/client as presto_client, - libp2p/crypto/crypto, - libp2p/protocols/pubsub/pubsub + libp2p/crypto/crypto import ../../waku/v2/protocol/waku_message, ../../waku/v2/node/waku_node, ../../waku/v2/node/rest/[server, client, base64, utils], ../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache], + ../../waku/v2/protocol/waku_relay, ../../waku/v2/utils/time, ./testlib/common -proc testWakuNode(): WakuNode = - let +proc testWakuNode(): WakuNode = + let rng = crypto.newRng() privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() bindIp = ValidIpAddress.init("0.0.0.0") @@ -29,7 +29,7 @@ proc testWakuNode(): WakuNode = suite "REST API - Relay": - asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions": + asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions": # Given let node = testWakuNode() await node.start() @@ -68,13 +68,13 @@ suite "REST API - Relay": check: # Node should be subscribed to default + new topics - PubSub(node.wakuRelay).topics.len == 1 + pubSubTopics.len - + toSeq(node.wakuRelay.subscribedTopics).len == 1 + pubSubTopics.len + await restServer.stop() await restServer.closeWait() await node.stop() - asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions": + asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions": # Given let node = testWakuNode() await node.start() @@ -94,7 +94,7 @@ suite "REST API - Relay": restServer.start() let pubSubTopics = @[ - PubSubTopic("pubsub-topic-1"), + PubSubTopic("pubsub-topic-1"), PubSubTopic("pubsub-topic-2"), PubSubTopic("pubsub-topic-3"), PubSubTopic("pubsub-topic-y") @@ -122,7 +122,7 @@ suite "REST API - Relay": await node.stop() - asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}": + asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}": # Given let node = testWakuNode() await node.start() @@ -157,7 +157,7 @@ suite "REST API - Relay": response.status == 200 $response.contentType == $MIMETYPE_JSON response.data.len == 3 - response.data.all do (msg: RelayWakuMessage) -> bool: + response.data.all do (msg: RelayWakuMessage) -> bool: msg.payload == Base64String.encode("TEST-1") and msg.contentTopic.get().string == "content-topic-x" and msg.version.get() == 2 and @@ -172,8 +172,8 @@ suite "REST API - Relay": await restServer.closeWait() await node.stop() - asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}": - ## "Relay API: publish and subscribe/unsubscribe": + asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}": + ## "Relay API: publish and subscribe/unsubscribe": # Given let node = testWakuNode() await node.start() @@ -190,10 +190,11 @@ suite "REST API - Relay": restServer.start() let client = newRestHttpClient(initTAddress(restAddress, restPort)) - + # At this stage the node is only subscribed to the default topic - require(PubSub(node.wakuRelay).topics.len == 1) - + require: + toSeq(node.wakuRelay.subscribedTopics).len == 1 + # When let newTopics = @[ @@ -202,10 +203,10 @@ suite "REST API - Relay": PubSubTopic("pubsub-topic-3") ] discard await client.relayPostSubscriptionsV1(newTopics) - + let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage( - payload: Base64String.encode("TEST-PAYLOAD"), - contentTopic: some(DefaultContentTopic), + payload: Base64String.encode("TEST-PAYLOAD"), + contentTopic: some(DefaultContentTopic), timestamp: some(int64(2022)) )) diff --git a/tests/v2/testlib/switch.nim b/tests/v2/testlib/switch.nim index f7742200f..e87f26290 100644 --- a/tests/v2/testlib/switch.nim +++ b/tests/v2/testlib/switch.nim @@ -5,7 +5,10 @@ import import ../../test_helpers +export switch + + proc newTestSwitch*(key=none(PrivateKey), address=none(MultiAddress)): Switch = let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get()) - let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get()) - return newStandardSwitch(some(peerKey), addrs=peerAddr) \ No newline at end of file + let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get()) + return newStandardSwitch(some(peerKey), addrs=peerAddr) diff --git a/tests/v2/resources/test_cert.pem b/tests/v2/waku_relay/resources/test_cert.pem similarity index 100% rename from tests/v2/resources/test_cert.pem rename to tests/v2/waku_relay/resources/test_cert.pem diff --git a/tests/v2/resources/test_key.pem b/tests/v2/waku_relay/resources/test_key.pem similarity index 100% rename from tests/v2/resources/test_key.pem rename to tests/v2/waku_relay/resources/test_key.pem diff --git a/tests/v2/waku_relay/test_waku_relay.nim b/tests/v2/waku_relay/test_waku_relay.nim new file mode 100644 index 000000000..463cfa226 --- /dev/null +++ b/tests/v2/waku_relay/test_waku_relay.nim @@ -0,0 +1,232 @@ +{.used.} + +import + std/[options, sequtils, strutils], + stew/byteutils, + stew/shims/net as stewNet, + testutils/unittests, + chronicles, + chronos, + libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/rpc/messages +import + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/protocol/waku_relay, + ../testlib/switch, + ../testlib/common + + +proc noopRawHandler(): PubsubRawHandler = + var handler: PubsubRawHandler + handler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, noSideEffect.} = discard + handler + + +proc newTestWakuRelay(switch = newTestSwitch(), self = true): Future[WakuRelay] {.async.} = + let proto = WakuRelay.new(switch, triggerSelf = self).tryGet() + await proto.start() + + let protocolMatcher = proc(proto: string): bool {.gcsafe.} = + return proto.startsWith(WakuRelayCodec) + + switch.mount(proto, protocolMatcher) + + return proto + + +suite "Waku Relay": + + asyncTest "subscribe and add handler to topics": + ## Setup + let nodeA = await newTestWakuRelay() + + ## Given + let + networkA = "test-network1" + networkB = "test-network2" + + ## when + nodeA.subscribe(networkA, noopRawHandler()) + nodeA.subscribe(networkB, noopRawHandler()) + + ## Then + check: + nodeA.isSubscribed(networkA) + nodeA.isSubscribed(networkB) + + let subscribedTopics = toSeq(nodeA.subscribedTopics) + check: + subscribedTopics.len == 2 + subscribedTopics.contains(networkA) + subscribedTopics.contains(networkB) + + ## Cleanup + await nodeA.stop() + + asyncTest "unsubscribe all handlers from topic": + ## Setup + let nodeA = await newTestWakuRelay() + + ## Given + let + networkA = "test-network1" + networkB = "test-network2" + networkC = "test-network3" + + nodeA.subscribe(networkA, noopRawHandler()) + nodeA.subscribe(networkB, noopRawHandler()) + nodeA.subscribe(networkC, noopRawHandler()) + + let topics = toSeq(nodeA.subscribedTopics) + require: + topics.len == 3 + topics.contains(networkA) + topics.contains(networkB) + topics.contains(networkC) + + ## When + nodeA.unsubscribeAll(networkA) + + ## Then + check: + nodeA.isSubscribed(networkB) + nodeA.isSubscribed(networkC) + not nodeA.isSubscribed(networkA) + + let subscribedTopics = toSeq(nodeA.subscribedTopics) + check: + subscribedTopics.len == 2 + subscribedTopics.contains(networkB) + subscribedTopics.contains(networkC) + not subscribedTopics.contains(networkA) + + ## Cleanup + await nodeA.stop() + + asyncTest "publish a message into a topic": + ## Setup + let + srcSwitch = newTestSwitch() + srcPeerManager = PeerManager.new(srcSwitch) + srcNode = await newTestWakuRelay(srcSwitch) + dstSwitch = newTestSwitch() + dstPeerManager = PeerManager.new(dstSwitch) + dstNode = await newTestWakuRelay(dstSwitch) + + await allFutures(srcSwitch.start(), dstSwitch.start()) + + let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo() + let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec) + require: + conn.isSome() + + ## Given + let networkTopic = "test-network1" + let message = fakeWakuMessage() + + # Self subscription (triggerSelf = true) + let srcSubsFut = newFuture[(PubsubTopic, WakuMessage)]() + proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + srcSubsFut.complete((topic, message)) + + srcNode.subscribe(networkTopic, srcSubsHandler) + + # Subscription + let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]() + proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + dstSubsFut.complete((topic, message)) + + dstNode.subscribe(networkTopic, dstSubsHandler) + + await sleepAsync(500.millis) + + ## When + discard await srcNode.publish(networkTopic, message) + + ## Then + require: + await srcSubsFut.withTimeout(5.seconds) + await dstSubsFut.withTimeout(5.seconds) + + let (srcTopic, srcMsg) = srcSubsFut.read() + check: + srcTopic == networkTopic + srcMsg == message + + let (dstTopic, dstMsg) = dstSubsFut.read() + check: + dstTopic == networkTopic + dstMsg == message + + ## Cleanup + await allFutures(srcSwitch.stop(), dstSwitch.stop()) + + asyncTest "content topic validator as a message subscription filter": + ## Setup + let + srcSwitch = newTestSwitch() + srcPeerManager = PeerManager.new(srcSwitch) + srcNode = await newTestWakuRelay(srcSwitch) + dstSwitch = newTestSwitch() + dstPeerManager = PeerManager.new(dstSwitch) + dstNode = await newTestWakuRelay(dstSwitch) + + await allFutures(srcSwitch.start(), dstSwitch.start()) + + let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo() + let conn = await srcPeerManager.dialPeer(dstPeerInfo, WakuRelayCodec) + require: + conn.isSome() + + ## Given + let networkTopic = "test-network1" + let contentTopic = "test-content1" + + let message = fakeWakuMessage(contentTopic=contentTopic) + let messages = @[ + fakeWakuMessage(contentTopic="any"), + fakeWakuMessage(contentTopic="any"), + fakeWakuMessage(contentTopic="any"), + message, + fakeWakuMessage(contentTopic="any"), + ] + + # Subscription + let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]() + proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + dstSubsFut.complete((topic, message)) + + dstNode.subscribe(networkTopic, dstSubsHandler) + + await sleepAsync(500.millis) + + # Validator + proc validator(topic: PubsubTopic, msg: Message): Future[ValidationResult] {.async.} = + let msg = WakuMessage.decode(msg.data) + if msg.isErr(): + return ValidationResult.Ignore + + # only relay messages with contentTopic1 + if msg.value.contentTopic != contentTopic: + return ValidationResult.Reject + + return ValidationResult.Accept + + dstNode.addValidator(networkTopic, validator) + + ## When + for msg in messages: + discard await srcNode.publish(networkTopic, msg) + + ## Then + require: + await dstSubsFut.withTimeout(5.seconds) + + let (dstTopic, dstMsg) = dstSubsFut.read() + check: + dstTopic == networkTopic + dstMsg == message + + ## Cleanup + await allFutures(srcSwitch.stop(), dstSwitch.stop()) diff --git a/tests/v2/test_wakunode_relay.nim b/tests/v2/waku_relay/test_wakunode_relay.nim similarity index 93% rename from tests/v2/test_wakunode_relay.nim rename to tests/v2/waku_relay/test_wakunode_relay.nim index 80f855ed8..9e110ecbc 100644 --- a/tests/v2/test_wakunode_relay.nim +++ b/tests/v2/waku_relay/test_wakunode_relay.nim @@ -21,9 +21,9 @@ import ../../waku/v2/utils/peers, ../../waku/v2/node/waku_node, ../../waku/v2/protocol/waku_relay, - ../test_helpers, - ./testlib/common - #./testlib/testutils + ../../test_helpers, + ../testlib/common, + ../testlib/testutils template sourceDir: string = currentSourcePath.parentDir() const KEY_PATH = sourceDir / "resources/test_key.pem" @@ -43,7 +43,7 @@ procSuite "WakuNode - Relay": await node1.mountRelay() check: - GossipSub(node1.wakuRelay).heartbeatFut.isNil == false + GossipSub(node1.wakuRelay).heartbeatFut.isNil() == false # Relay protocol starts if mounted before node start @@ -55,13 +55,13 @@ procSuite "WakuNode - Relay": check: # Relay has not yet started as node has not yet started - GossipSub(node2.wakuRelay).heartbeatFut.isNil + GossipSub(node2.wakuRelay).heartbeatFut.isNil() await node2.start() check: # Relay started on node start - GossipSub(node2.wakuRelay).heartbeatFut.isNil == false + GossipSub(node2.wakuRelay).heartbeatFut.isNil() == false await allFutures([node1.stop(), node2.stop()]) @@ -87,8 +87,10 @@ procSuite "WakuNode - Relay": await node3.start() await node3.mountRelay(@[pubSubTopic]) - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + await allFutures( + node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]), + node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + ) var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -105,13 +107,13 @@ procSuite "WakuNode - Relay": await sleepAsync(500.millis) await node1.publish(pubSubTopic, message) - await sleepAsync(500.millis) + ## Then check: (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() - await node3.stop() + + ## Cleanup + await allFutures(node1.stop(), node2.stop(), node3.stop()) asyncTest "filtering relayed messages using topic validators": ## test scenario: @@ -157,23 +159,26 @@ procSuite "WakuNode - Relay": var completionFutValidatorAcc = newFuture[bool]() var completionFutValidatorRej = newFuture[bool]() + # set a topic validator for pubSubTopic proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} = ## the validator that only allows messages with contentTopic1 to be relayed check: topic == pubSubTopic - let msg = WakuMessage.decode(message.data) - if msg.isOk(): - # only relay messages with contentTopic1 - if msg.value().contentTopic == contentTopic1: - result = ValidationResult.Accept - completionFutValidatorAcc.complete(true) - else: - result = ValidationResult.Reject - completionFutValidatorRej.complete(true) - # set a topic validator for pubSubTopic - let pb = PubSub(node2.wakuRelay) - pb.addValidator(pubSubTopic, validator) + let msg = WakuMessage.decode(message.data) + if msg.isErr(): + completionFutValidatorAcc.complete(false) + return ValidationResult.Reject + + # only relay messages with contentTopic1 + if msg.value.contentTopic != contentTopic1: + completionFutValidatorRej.complete(true) + return ValidationResult.Reject + + completionFutValidatorAcc.complete(true) + return ValidationResult.Accept + + node2.wakuRelay.addValidator(pubSubTopic, validator) var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -208,7 +213,8 @@ procSuite "WakuNode - Relay": await allFutures(node1.stop(), node2.stop(), node3.stop()) - asyncTest "Stats of peer sending wrong WakuMessages are updated": + # TODO: Add a function to validate the WakuMessage integrity + xasyncTest "Stats of peer sending wrong WakuMessages are updated": # Create 2 nodes let nodes = toSeq(0..1).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))) diff --git a/waku/v2/node/rest/relay/api_types.nim b/waku/v2/node/rest/relay/api_types.nim index a57c9ab83..f16421e14 100644 --- a/waku/v2/node/rest/relay/api_types.nim +++ b/waku/v2/node/rest/relay/api_types.nim @@ -9,7 +9,7 @@ import json_serialization, json_serialization/std/options, presto/[route, client, common] -import +import ../../../protocol/waku_message, ../serdes, ../base64 @@ -24,7 +24,7 @@ type RelayWakuMessage* = object timestamp*: Option[int64] -type +type RelayGetMessagesResponse* = seq[RelayWakuMessage] RelayPostMessagesRequest* = RelayWakuMessage @@ -44,7 +44,7 @@ proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage = ) proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cstring] = - let + let payload = ?msg.payload.decode() contentTopic = msg.contentTopic.get(DefaultContentTopic) version = uint32(msg.version.get(version)) @@ -59,10 +59,6 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String) {.raises: [IOError, Defect].} = writer.writeValue(string(value)) -proc writeValue*(writer: var JsonWriter[RestJson], topic: PubSubTopic|ContentTopic) - {.raises: [IOError, Defect].} = - writer.writeValue(string(topic)) - proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage) {.raises: [IOError, Defect].} = writer.beginRecord() @@ -79,14 +75,6 @@ proc readValue*(reader: var JsonReader[RestJson], value: var Base64String) {.raises: [SerializationError, IOError, Defect].} = value = Base64String(reader.readValue(string)) -proc readValue*(reader: var JsonReader[RestJson], pubsubTopic: var PubSubTopic) - {.raises: [SerializationError, IOError, Defect].} = - pubsubTopic = PubSubTopic(reader.readValue(string)) - -proc readValue*(reader: var JsonReader[RestJson], contentTopic: var ContentTopic) - {.raises: [SerializationError, IOError, Defect].} = - contentTopic = ContentTopic(reader.readValue(string)) - proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) {.raises: [SerializationError, IOError, Defect].} = var @@ -122,5 +110,5 @@ proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) payload: payload.get(), contentTopic: contentTopic, version: version, - timestamp: timestamp + timestamp: timestamp ) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index ec4f9af84..5e45d036d 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -503,10 +503,6 @@ proc startRelay*(node: WakuNode) {.async.} = ## Setup relay protocol - # Subscribe to the default PubSub topics - for topic in node.wakuRelay.defaultPubsubTopics: - node.subscribe(topic) - # Resume previous relay connections if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)): info "Found previous WakuRelay peers. Reconnecting." @@ -532,7 +528,6 @@ proc mountRelay*(node: WakuNode, let initRes = WakuRelay.new( node.switch, - defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics), triggerSelf = triggerSelf ) if initRes.isErr(): @@ -553,6 +548,13 @@ proc mountRelay*(node: WakuNode, info "relay mounted successfully" + # TODO: As part of #1545, remove this and update the tests cases + node.subscribe(DefaultPubsubTopic) + + # Subscribe to topics + for topic in topics: + node.subscribe(topic) + ## Waku filter diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index 15cd2670c..6a5ee6005 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -8,7 +8,7 @@ else: {.push raises: [].} import - std/[sequtils, tables], + std/[tables, sequtils, hashes], stew/results, chronos, chronicles, @@ -37,7 +37,6 @@ type type WakuRelay* = ref object of GossipSub - defaultPubsubTopics*: seq[PubsubTopic] # Default configured PubSub topics WakuRelayHandler* = PubsubRawHandler|SubscriptionHandler @@ -78,15 +77,12 @@ method initPubSub(w: WakuRelay) {.raises: [InitializationError].} = w.initProtocolHandler() -proc new*(T: type WakuRelay, - switch: Switch, - defaultPubsubTopics: seq[PubsubTopic] = @[], - triggerSelf: bool = true): WakuRelayResult[T] = +proc new*(T: type WakuRelay, switch: Switch, triggerSelf: bool = true): WakuRelayResult[T] = proc msgIdProvider(msg: messages.Message): Result[MessageID, ValidationResult] = let hash = MultiHash.digest("sha2-256", msg.data) if hash.isErr(): - ok(($msg.data.hash).toBytes()) + ok(toBytes($hashes.hash(msg.data))) else: ok(hash.value.data.buffer) @@ -100,25 +96,28 @@ proc new*(T: type WakuRelay, verifySignature = false, maxMessageSize = MaxWakuMessageSize ) - - # Rejects messages that are not WakuMessage - proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} = - let msg = WakuMessage.decode(message.data) - if msg.isOk(): - return ValidationResult.Accept - return ValidationResult.Reject - - # Add validator to all default pubsub topics - for pubSubTopic in defaultPubsubTopics: - wr.addValidator(pubSubTopic, validator) except InitializationError: return err("initialization error: " & getCurrentExceptionMsg()) - wr.defaultPubsubTopics = defaultPubsubTopics + # TODO: Add a function to validate the WakuMessage integrity + # # Rejects messages that are not WakuMessage + # proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} = + # let msg = WakuMessage.decode(message.data) + # if msg.isOk(): + # return ValidationResult.Accept + # return ValidationResult.Reject + + # # Add validator to all default pubsub topics + # for pubSubTopic in defaultPubsubTopics: + # wr.addValidator(pubSubTopic, validator) ok(wr) +method addValidator*(w: WakuRelay, topic: varargs[string], handler: ValidatorHandler) {.gcsafe.} = + procCall GossipSub(w).addValidator(topic, handler) + + method start*(w: WakuRelay) {.async.} = debug "start" await procCall GossipSub(w).start() diff --git a/waku/v2/protocol/waku_rln_relay/utils.nim b/waku/v2/protocol/waku_rln_relay/utils.nim index 1ecebda8d..66a788ee0 100644 --- a/waku/v2/protocol/waku_rln_relay/utils.nim +++ b/waku/v2/protocol/waku_rln_relay/utils.nim @@ -715,8 +715,7 @@ proc addRLNRelayValidator*(wakuRlnRelay: WakuRLNRelay, handler(wakumessage) return pubsub.ValidationResult.Reject # set a validator for the supplied pubsubTopic - let pb = PubSub(wakuRelay) - pb.addValidator(pubsubTopic, validator) + wakuRelay.addValidator(pubsubTopic, validator) proc mountRlnRelayStatic*(wakuRelay: WakuRelay, group: seq[IDCommitment], @@ -943,7 +942,7 @@ proc mount(wakuRelay: WakuRelay, # getMembershipCredentials returns all credentials in keystore as sequence matching the filter let allMatchingCredentials = readCredentialsRes.get() - # if any is found, we return the first credential, otherwise credentials is none + # if any is found, we return the first credential, otherwise credentials is none if allMatchingCredentials.len() > 0: credentials = some(allMatchingCredentials[0]) else: @@ -999,7 +998,7 @@ proc mount(wakuRelay: WakuRelay, return err("dynamic rln-relay could not be mounted: " & rlnRelayRes.error()) let wakuRlnRelay = rlnRelayRes.get() if persistCredentials: - + credentials = some(MembershipCredentials(identityCredential: wakuRlnRelay.identityCredential, membershipGroups: @[MembershipGroup(membershipContract: rlnMembershipContract, treeIndex: wakuRlnRelay.membershipIndex)] )) @@ -1026,9 +1025,11 @@ proc new*(T: type WakuRlnRelay, # relay protocol is the prerequisite of rln-relay if wakuRelay.isNil(): return err("WakuRelay protocol is not mounted") - # check whether the pubsub topic is supported at the relay level - if conf.rlnRelayPubsubTopic notin wakuRelay.defaultPubsubTopics: - return err("The relay protocol does not support the configured pubsub topic") + + # TODO: Review this. The Waku Relay instance is no longer keeping track of the default pubsub topics + # # check whether the pubsub topic is supported at the relay level + # if conf.rlnRelayPubsubTopic notin wakuRelay.defaultPubsubTopics: + # return err("The relay protocol does not support the configured pubsub topic") debug "rln-relay input validation passed" waku_rln_relay_mounting_duration_seconds.nanosecondTime: