From f5f431382bb0e99f3dae68bec72e91ccf8ada177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Wed, 15 Nov 2023 16:11:36 +0100 Subject: [PATCH] test(waku-relay): Relay (#2101) * Implement message id tests. * Implement relay tests. * Update import paths to use test_all. --- tests/all_tests_waku.nim | 11 +- .../test_data.nim => resources/payloads.nim} | 7 + tests/testlib/futures.nim | 3 + tests/waku_filter_v2/test_all.nim | 3 +- tests/waku_filter_v2/test_waku_client.nim | 2 +- tests/waku_relay/crypto_utils.nim | 47 + tests/waku_relay/test_all.nim | 4 +- tests/waku_relay/test_message_id.nim | 37 + tests/waku_relay/test_protocol.nim | 1301 +++++++++++++++++ tests/waku_relay/test_waku_relay.nim | 22 +- tests/waku_relay/utils.nim | 29 + waku/waku_relay/protocol.nim | 6 +- 12 files changed, 1439 insertions(+), 33 deletions(-) rename tests/{waku_filter_v2/test_data.nim => resources/payloads.nim} (91%) create mode 100644 tests/waku_relay/crypto_utils.nim create mode 100644 tests/waku_relay/test_message_id.nim create mode 100644 tests/waku_relay/test_protocol.nim create mode 100644 tests/waku_relay/utils.nim diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 7bd5ca0df..97039c1ec 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -40,15 +40,10 @@ when defined(waku_exp_store_resume): import ./waku_store/test_resume -# Waku relay test suite -import - ./waku_relay/test_waku_relay, - ./waku_relay/test_wakunode_relay +import + ./waku_relay/test_all + ./waku_filter_v2/test_all -# Waku filter test suite -import - ./waku_filter_v2/test_waku_client, - ./waku_filter_v2/test_waku_filter_protocol import # Waku v2 tests diff --git a/tests/waku_filter_v2/test_data.nim b/tests/resources/payloads.nim similarity index 91% rename from tests/waku_filter_v2/test_data.nim rename to tests/resources/payloads.nim index bc3ccb78e..d2f74e9f1 100644 --- a/tests/waku_filter_v2/test_data.nim +++ b/tests/resources/payloads.nim @@ -68,3 +68,10 @@ proc getSampleJsonList*(): JsonNode = "name": "Oberon" }, ] + + +proc getByteSequence*(bytesNumber: int): seq[byte] = + result = newSeq[byte](bytesNumber) + for i in 0 ..< bytesNumber: + result[i] = cast[byte](i mod 256) + return result diff --git a/tests/testlib/futures.nim b/tests/testlib/futures.nim index 8981516e4..ffed6a5db 100644 --- a/tests/testlib/futures.nim +++ b/tests/testlib/futures.nim @@ -6,3 +6,6 @@ import ../../../waku/waku_core/message proc newPushHandlerFuture*(): Future[(string, WakuMessage)] = newFuture[(string, WakuMessage)]() + +proc newBoolFuture*(): Future[bool] = + newFuture[bool]() diff --git a/tests/waku_filter_v2/test_all.nim b/tests/waku_filter_v2/test_all.nim index b3856d1b7..a91b8ba4b 100644 --- a/tests/waku_filter_v2/test_all.nim +++ b/tests/waku_filter_v2/test_all.nim @@ -2,5 +2,4 @@ import ./test_waku_client, - ./test_waku_filter_protocol, - ./test_waku_filter + ./test_waku_filter_protocol diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index 918763892..218764ff3 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -36,7 +36,7 @@ import sequtils ], ./waku_filter_utils.nim, - ./test_data.nim + ../resources/payloads.nim let FUTURE_TIMEOUT = 1.seconds diff --git a/tests/waku_relay/crypto_utils.nim b/tests/waku_relay/crypto_utils.nim new file mode 100644 index 000000000..73347dfb5 --- /dev/null +++ b/tests/waku_relay/crypto_utils.nim @@ -0,0 +1,47 @@ +# Source: nimcrypto/examples/cfb.nim + + +import nimcrypto + + +proc cfbEncode*(key: string, iv: string, data: string): seq[byte] = + var context: CFB[aes256] + var pKey: array[aes256.sizeKey, byte] + var pIv: array[aes256.sizeBlock, byte] + var pData = newSeq[byte](len(data)) + var encodedData = newSeq[byte](len(data)) + + copyMem(addr pData[0], unsafeAddr data[0], len(data)) + # WARNING! Do not use 0 byte padding in applications, this is done as example. + copyMem(addr pKey[0], unsafeAddr key[0], len(key)) + copyMem(addr pIv[0], unsafeAddr iv[0], len(iv)) + + # Initialization of CFB[aes256] context with encryption key + context.init(pKey, pIv) + # Encryption process + context.encrypt(pData, encodedData) + # Clear context of CFB[aes256] + context.clear() + + return encodedData + + +proc cfbDecode*(key: string, iv: string, encodedData: seq[byte]): seq[byte] = + var context: CFB[aes256] + var pKey: array[aes256.sizeKey, byte] + var pIv: array[aes256.sizeBlock, byte] + var decodedData = newSeq[byte](len(encodedData)) + + # copyMem(addr _data[0], addr data[0], len(data)) + # WARNING! Do not use 0 byte padding in applications, this is done as example. + copyMem(addr pKey[0], unsafeAddr key[0], len(key)) + copyMem(addr pIv[0], unsafeAddr iv[0], len(iv)) + + # Initialization of CFB[aes256] context with encryption key + context.init(pKey, pIv) + # Decryption process + context.decrypt(encodedData, decodedData) + # Clear context of CFB[aes256] + context.clear() + + return decodedData diff --git a/tests/waku_relay/test_all.nim b/tests/waku_relay/test_all.nim index 9131aeb5e..a615f0e4e 100644 --- a/tests/waku_relay/test_all.nim +++ b/tests/waku_relay/test_all.nim @@ -2,4 +2,6 @@ import ./test_waku_relay, - ./test_wakunode_relay + ./test_wakunode_relay, + ./test_message_id, + ./test_protocol diff --git a/tests/waku_relay/test_message_id.nim b/tests/waku_relay/test_message_id.nim new file mode 100644 index 000000000..7cf3d4402 --- /dev/null +++ b/tests/waku_relay/test_message_id.nim @@ -0,0 +1,37 @@ +import + unittest, + stew/shims/net, + stew/[results, byteutils] + +import + stew/results, + nimcrypto/sha2, + libp2p/protocols/pubsub/rpc/messages, + ../../../waku/waku_relay/message_id, + ../testlib/sequtils + + +suite "Message ID Provider": + test "Non-empty string": + let message = Message(data: "Hello, world!".toBytes()) + let result = defaultMessageIdProvider(message) + let expected = MDigest[256].fromHex("315f5bdb76d078c43b8ac0064e4a0164612b1fce77c869345bfc94c75894edd3") + check: + result.isOk() + result.get() == expected.data + + test "Empty string": + let message = Message(data: "".toBytes()) + let result = defaultMessageIdProvider(message) + let expected = MDigest[256].fromHex("E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855") + check: + result.isOk() + result.get() == expected.data + + test "Empty array": + let message = Message(data: @[]) + let result = defaultMessageIdProvider(message) + let expected = MDigest[256].fromHex("E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855") + check: + result.isOk() + result.get() == expected.data diff --git a/tests/waku_relay/test_protocol.nim b/tests/waku_relay/test_protocol.nim new file mode 100644 index 000000000..17c8d902c --- /dev/null +++ b/tests/waku_relay/test_protocol.nim @@ -0,0 +1,1301 @@ +{.used.} + +import + std/[options, sequtils, strutils], + stew/shims/net as stewNet, + testutils/unittests, + chronicles, + chronos, + libp2p/protocols/pubsub/[ + pubsub, + gossipsub + ], + libp2p/[ + multihash, + stream/connection, + switch + ], + ./crypto_utils, + std/json + +import + ../../../waku/[ + node/peer_manager, + waku_relay/protocol, + waku_relay, + waku_core + ], + ../testlib/[ + wakucore, + testasync, + testutils, + futures, + sequtils + ], + ./utils, + ../resources/payloads + + +suite "Waku Relay": + var messageSeq {.threadvar.}: seq[(PubsubTopic, WakuMessage)] + var handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] + var simpleFutureHandler {.threadvar.}: WakuRelayHandler + var switch {.threadvar.}: Switch + var peerManager {.threadvar.}: PeerManager + var node {.threadvar.}: WakuRelay + var remotePeerInfo {.threadvar.}: RemotePeerInfo + var peerId {.threadvar.}: PeerId + var contentTopic {.threadvar.}: ContentTopic + var pubsubTopic {.threadvar.}: PubsubTopic + var pubsubTopicSeq {.threadvar.}: seq[PubsubTopic] + var testMessage {.threadvar.}: string + var wakuMessage {.threadvar.}: WakuMessage + + asyncSetup: + messageSeq = @[] + handlerFuture = newPushHandlerFuture() + simpleFutureHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, closure, gcsafe.} = + messageSeq.add((topic, msg)) + handlerFuture.complete((topic, msg)) + + switch = newTestSwitch() + peerManager = PeerManager.new(switch) + node = await newTestWakuRelay(switch) + + testMessage = "test-message" + contentTopic = DefaultContentTopic + pubsubTopic = DefaultPubsubTopic + pubsubTopicSeq = @[pubsubTopic] + wakuMessage = fakeWakuMessage(testMessage, pubsubTopic) + + await allFutures(switch.start(), node.start()) + + remotePeerInfo = switch.peerInfo.toRemotePeerInfo() + peerId = remotePeerInfo.peerId + + asyncTeardown: + await allFutures(switch.stop(), node.stop()) + + suite "Subscribe": + asyncTest "Publish without Subscription": + # When publishing a message without being subscribed + discard await node.publish(pubsubTopic, wakuMessage) + + # Then the message is not published + check: + not await handlerFuture.withTimeout(3.seconds) + + asyncTest "Publish with Subscription (Network Size: 1)": + # When subscribing to a Pubsub Topic + discard node.subscribe(pubsubTopic, simpleFutureHandler) + + # Then the node is subscribed + check: + node.isSubscribed(pubsubTopic) + node.subscribedTopics == pubsubTopicSeq + + # When publishing a message + discard await node.publish(pubsubTopic, wakuMessage) + + # Then the message is published + assert (await handlerFuture.withTimeout(3.seconds)) + let (topic, msg) = handlerFuture.read() + check: + topic == pubsubTopic + msg == wakuMessage + + asyncTest "Pubsub Topic Subscription (Network Size: 2, only one subscribed)": + # Given a second node connected to the first one + let + otherSwitch = newTestSwitch() + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + check await peerManager.connectRelay(otherRemotePeerInfo) + + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + + # When subscribing the second node to the Pubsub Topic + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + + # Then the second node is subscribed, but not the first one + check: + not node.isSubscribed(pubsubTopic) + node.subscribedTopics != pubsubTopicSeq + otherNode.isSubscribed(pubsubTopic) + otherNode.subscribedTopics == pubsubTopicSeq + + await sleepAsync(500.millis) + + # When publishing a message in the subscribed node + let fromOtherWakuMessage = fakeWakuMessage("fromOther") + discard await otherNode.publish(pubsubTopic, fromOtherWakuMessage) + + # Then the message is published only in the subscribed node + check: + not await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + + let (otherTopic1, otherMessage1) = otherHandlerFuture.read() + check: + otherTopic1 == pubsubTopic + otherMessage1 == fromOtherWakuMessage + + # When publishing a message in the other node + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + let fromNodeWakuMessage = fakeWakuMessage("fromNode") + discard await node.publish(pubsubTopic, fromNodeWakuMessage) + + # Then the message is published only in the subscribed node + check: + not await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + + let (otherTopic2, otherMessage2) = otherHandlerFuture.read() + check: + otherTopic2 == pubsubTopic + otherMessage2 == fromNodeWakuMessage + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + asyncTest "Pubsub Topic Subscription (Network Size: 2, both subscribed to same pubsub topic)": + # Given a second node connected to the first one + let + otherSwitch = newTestSwitch() + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + check await peerManager.connectRelay(otherRemotePeerInfo) + + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + + # When subscribing both nodes to the same Pubsub Topic + discard node.subscribe(pubsubTopic, simpleFutureHandler) + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + + # Then both nodes are subscribed + check: + node.isSubscribed(pubsubTopic) + node.subscribedTopics == pubsubTopicSeq + otherNode.isSubscribed(pubsubTopic) + otherNode.subscribedTopics == pubsubTopicSeq + + await sleepAsync(500.millis) + + # When publishing a message in node + let fromOtherWakuMessage = fakeWakuMessage("fromOther") + discard await node.publish(pubsubTopic, fromOtherWakuMessage) + + # Then the message is published in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + + let + (topic1, message1) = handlerFuture.read() + (otherTopic1, otherMessage1) = otherHandlerFuture.read() + check: + topic1 == pubsubTopic + message1 == fromOtherWakuMessage + otherTopic1 == pubsubTopic + otherMessage1 == fromOtherWakuMessage + + # When publishing a message in the other node + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + let fromNodeWakuMessage = fakeWakuMessage("fromNode") + discard await node.publish(pubsubTopic, fromNodeWakuMessage) + discard await otherNode.publish(pubsubTopic, fromNodeWakuMessage) + + # Then the message is published in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + + let + (topic2, message2) = handlerFuture.read() + (otherTopic2, otherMessage2) = otherHandlerFuture.read() + check: + topic2 == pubsubTopic + message2 == fromNodeWakuMessage + otherTopic2 == pubsubTopic + otherMessage2 == fromNodeWakuMessage + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + asyncTest "Refreshing subscription": + # Given a subscribed node + node.subscribe(pubsubTopic, simpleFutureHandler) + check: + node.isSubscribed(pubsubTopic) + node.subscribedTopics == pubsubTopicSeq + let otherWakuMessage = fakeWakuMessage("fromOther") + discard await node.publish(pubsubTopic, otherWakuMessage) + check: + messageSeq == @[(pubsubTopic, otherWakuMessage)] + + # Given the subscription is refreshed + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + node.subscribe(pubsubTopic, otherSimpleFutureHandler) + check: + node.isSubscribed(pubsubTopic) + node.subscribedTopics == pubsubTopicSeq + messageSeq == @[(pubsubTopic, otherWakuMessage)] + + # When publishing a message with the refreshed subscription + handlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, wakuMessage) + + # Then the message is published + check (await handlerFuture.withTimeout(3.seconds)) + let (topic, msg) = handlerFuture.read() + check: + topic == pubsubTopic + msg == wakuMessage + messageSeq == @[(pubsubTopic, otherWakuMessage), (pubsubTopic, wakuMessage)] + + asyncTest "With additional validator": + # Given a simple validator + var validatorFuture = newBoolFuture() + let len4Validator = proc( + pubsubTopic: string, message: WakuMessage + ): Future[ValidationResult] {.async.} = + if message.payload.len() == 8: + validatorFuture.complete(true) + return ValidationResult.Accept + else: + validatorFuture.complete(false) + return ValidationResult.Reject + + # And a second node connected to the first one + let + otherSwitch = newTestSwitch() + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + check await peerManager.connectRelay(otherRemotePeerInfo) + + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + + otherNode.addValidator(pubsubTopic, len4Validator) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + await sleepAsync(500.millis) + check: + otherNode.isSubscribed(pubsubTopic) + + # Given a subscribed node with a validator + node.addValidator(pubsubTopic, len4Validator) + node.subscribe(pubsubTopic, simpleFutureHandler) + await sleepAsync(500.millis) + check: + node.isSubscribed(pubsubTopic) + node.subscribedTopics == pubsubTopicSeq + otherNode.isSubscribed(pubsubTopic) + otherNode.subscribedTopics == pubsubTopicSeq + + # When publishing a message that doesn't match the validator + discard await node.publish(pubsubTopic, wakuMessage) + + # Then the validator is ran in the other node, and fails + # Not run in the self node + check: + await validatorFuture.withTimeout(3.seconds) + validatorFuture.read() == false + + # And the message is published in the self node, but not in the other node, + # because it doesn't pass the validator check. + check: + await handlerFuture.withTimeout(3.seconds) + not await otherHandlerFuture.withTimeout(3.seconds) + let (topic1, msg1) = handlerFuture.read() + # let (otherTopic1, otherMsg1) = otherHandlerFuture.read() + check: + topic1 == pubsubTopic + msg1 == wakuMessage + # otherTopic1 == pubsubTopic + # otherMsg1 == wakuMessage + + # When publishing a message that matches the validator + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + validatorFuture = newBoolFuture() + let wakuMessage2 = fakeWakuMessage("12345678", pubsubTopic) + discard await node.publish(pubsubTopic, wakuMessage2) + + # Then the validator is ran in the other node, and succeeds + # Not run in the self node + check: + await validatorFuture.withTimeout(3.seconds) + validatorFuture.read() == true + + # And the message is published in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + let (topic2, msg2) = handlerFuture.read() + let (otherTopic2, otherMsg2) = otherHandlerFuture.read() + check: + topic2 == pubsubTopic + msg2 == wakuMessage2 + otherTopic2 == pubsubTopic + otherMsg2 == wakuMessage2 + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + asyncTest "Max Topic Size": + # NOT FOUND + discard + + asyncTest "Max subscriptions": + # NOT FOUND + discard + + asyncTest "Message encryption/decryption": + # Given a second node connected to the first one, both subscribed to the same Pubsub Topic + let + otherSwitch = newTestSwitch() + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + check await peerManager.connectRelay(otherRemotePeerInfo) + + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + + discard node.subscribe(pubsubTopic, simpleFutureHandler) + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + check: + node.isSubscribed(pubsubTopic) + node.subscribedTopics == pubsubTopicSeq + otherNode.isSubscribed(pubsubTopic) + otherNode.subscribedTopics == pubsubTopicSeq + + await sleepAsync(500.millis) + + # Given some crypto info + var key = "My fancy key" + var data = "Hello, Crypto!" + var iv = "0123456789ABCDEF" + + # When publishing an encrypted message + let encodedText = cfbEncode(key, iv, data) + let encodedWakuMessage = fakeWakuMessage(encodedText, pubsubTopic) + discard await node.publish(pubsubTopic, encodedWakuMessage) + + # Then the message is published in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + let (topic1, msg1) = handlerFuture.read() + let (otherTopic1, otherMsg1) = otherHandlerFuture.read() + check: + topic1 == pubsubTopic + msg1 == encodedWakuMessage + otherTopic1 == pubsubTopic + otherMsg1 == encodedWakuMessage + + # When decoding the message + let + decodedText = cfbDecode(key, iv, msg1.payload) + otherDecodedText = cfbDecode(key, iv, otherMsg1.payload) + + # Then the message is decrypted in both nodes + check: + decodedText.toString() == data + otherDecodedText.toString() == data + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + asyncTest "How multiple interconnected nodes work": + # Given two other pubsub topics + let + pubsubTopicB = "pubsub-topic-b" + pubsubTopicC = "pubsub-topic-c" + + # Given two other nodes connected to the first one + let + otherSwitch = newTestSwitch() + otherPeerManager = PeerManager.new(otherSwitch) + otherNode = await newTestWakuRelay(otherSwitch) + anotherSwitch = newTestSwitch() + anotherPeerManager = PeerManager.new(anotherSwitch) + anotherNode = await newTestWakuRelay(anotherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start(), anotherSwitch.start(), anotherNode.start()) + + let + otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + otherPeerId = otherRemotePeerInfo.peerId + anotherRemotePeerInfo = anotherSwitch.peerInfo.toRemotePeerInfo() + anotherPeerId = anotherRemotePeerInfo.peerId + + check: + await peerManager.connectRelay(otherRemotePeerInfo) + await peerManager.connectRelay(anotherRemotePeerInfo) + + # Given the first node is subscribed to two pubsub topics + var handlerFuture2 = newPushHandlerFuture() + proc simpleFutureHandler2(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + handlerFuture2.complete((topic, message)) + + discard node.subscribe(pubsubTopic, simpleFutureHandler) + discard node.subscribe(pubsubTopicB, simpleFutureHandler2) + + # Given the other nodes are subscribed to two pubsub topics + var otherHandlerFuture1 = newPushHandlerFuture() + proc otherSimpleFutureHandler1(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture1.complete((topic, message)) + + var otherHandlerFuture2 = newPushHandlerFuture() + proc otherSimpleFutureHandler2(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture2.complete((topic, message)) + + var anotherHandlerFuture1 = newPushHandlerFuture() + proc anotherSimpleFutureHandler1(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + anotherHandlerFuture1.complete((topic, message)) + + var anotherHandlerFuture2 = newPushHandlerFuture() + proc anotherSimpleFutureHandler2(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + anotherHandlerFuture2.complete((topic, message)) + + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler1) + discard otherNode.subscribe(pubsubTopicC, otherSimpleFutureHandler2) + discard anotherNode.subscribe(pubsubTopicB, anotherSimpleFutureHandler1) + discard anotherNode.subscribe(pubsubTopicC, anotherSimpleFutureHandler2) + await sleepAsync(500.millis) + + # When publishing a message in node for each of the pubsub topics + let + fromNodeWakuMessage1 = fakeWakuMessage("fromNode1") + fromNodeWakuMessage2 = fakeWakuMessage("fromNode2") + fromNodeWakuMessage3 = fakeWakuMessage("fromNode3") + + discard await node.publish(pubsubTopic, fromNodeWakuMessage1) + discard await node.publish(pubsubTopicB, fromNodeWakuMessage2) + discard await node.publish(pubsubTopicC, fromNodeWakuMessage3) + + # Then the messages are published in all nodes (because it's published in the center node) + # Center meaning that all other nodes are connected to this one + check: + await handlerFuture.withTimeout(3.seconds) + await handlerFuture2.withTimeout(3.seconds) + await otherHandlerFuture1.withTimeout(3.seconds) + await otherHandlerFuture2.withTimeout(3.seconds) + await anotherHandlerFuture1.withTimeout(3.seconds) + await anotherHandlerFuture2.withTimeout(3.seconds) + + let + (topic1, msg1) = handlerFuture.read() + (topic2, msg2) = handlerFuture2.read() + (otherTopic1, otherMsg1) = otherHandlerFuture1.read() + (otherTopic2, otherMsg2) = otherHandlerFuture2.read() + (anotherTopic1, anotherMsg1) = anotherHandlerFuture1.read() + (anotherTopic2, anotherMsg2) = anotherHandlerFuture2.read() + + check: + topic1 == pubsubTopic + msg1 == fromNodeWakuMessage1 + topic2 == pubsubTopicB + msg2 == fromNodeWakuMessage2 + otherTopic1 == pubsubTopic + otherMsg1 == fromNodeWakuMessage1 + otherTopic2 == pubsubTopicC + otherMsg2 == fromNodeWakuMessage3 + anotherTopic1 == pubsubTopicB + anotherMsg1 == fromNodeWakuMessage2 + anotherTopic2 == pubsubTopicC + anotherMsg2 == fromNodeWakuMessage3 + + # Given anotherNode is completely disconnected from the first one + await anotherPeerManager.switch.disconnect(peerId) + await peerManager.switch.disconnect(anotherPeerId) + check: + not anotherPeerManager.switch.isConnected(peerId) + not peerManager.switch.isConnected(anotherPeerId) + + # When publishing a message in node for each of the pubsub topics + handlerFuture = newPushHandlerFuture() + handlerFuture2 = newPushHandlerFuture() + otherHandlerFuture1 = newPushHandlerFuture() + otherHandlerFuture2 = newPushHandlerFuture() + anotherHandlerFuture1 = newPushHandlerFuture() + anotherHandlerFuture2 = newPushHandlerFuture() + + let + fromNodeWakuMessage4 = fakeWakuMessage("fromNode4") + fromNodeWakuMessage5 = fakeWakuMessage("fromNode5") + fromNodeWakuMessage6 = fakeWakuMessage("fromNode6") + + discard await node.publish(pubsubTopic, fromNodeWakuMessage4) + discard await node.publish(pubsubTopicB, fromNodeWakuMessage5) + discard await node.publish(pubsubTopicC, fromNodeWakuMessage6) + + # Then the message is published in node and otherNode, + # but not in anotherNode because it is not connected anymore + check: + await handlerFuture.withTimeout(3.seconds) + await handlerFuture2.withTimeout(3.seconds) + await otherHandlerFuture1.withTimeout(3.seconds) + await otherHandlerFuture2.withTimeout(3.seconds) + not await anotherHandlerFuture1.withTimeout(3.seconds) + not await anotherHandlerFuture2.withTimeout(3.seconds) + + let + (topic3, msg3) = handlerFuture.read() + (topic4, msg4) = handlerFuture2.read() + (otherTopic3, otherMsg3) = otherHandlerFuture1.read() + (otherTopic4, otherMsg4) = otherHandlerFuture2.read() + + check: + topic3 == pubsubTopic + msg3 == fromNodeWakuMessage4 + topic4 == pubsubTopicB + msg4 == fromNodeWakuMessage5 + otherTopic3 == pubsubTopic + otherMsg3 == fromNodeWakuMessage4 + otherTopic4 == pubsubTopicC + otherMsg4 == fromNodeWakuMessage6 + + # When publishing a message in anotherNode for each of the pubsub topics + handlerFuture = newPushHandlerFuture() + handlerFuture2 = newPushHandlerFuture() + otherHandlerFuture1 = newPushHandlerFuture() + otherHandlerFuture2 = newPushHandlerFuture() + anotherHandlerFuture1 = newPushHandlerFuture() + anotherHandlerFuture2 = newPushHandlerFuture() + + let + fromAnotherNodeWakuMessage1 = fakeWakuMessage("fromAnotherNode1") + fromAnotherNodeWakuMessage2 = fakeWakuMessage("fromAnotherNode2") + fromAnotherNodeWakuMessage3 = fakeWakuMessage("fromAnotherNode3") + + discard await anotherNode.publish(pubsubTopic, fromAnotherNodeWakuMessage1) + discard await anotherNode.publish(pubsubTopicB, fromAnotherNodeWakuMessage2) + discard await anotherNode.publish(pubsubTopicC, fromAnotherNodeWakuMessage3) + + # Then the messages are only published in anotherNode because it's disconnected from + # the rest of the network + check: + not await handlerFuture.withTimeout(3.seconds) + not await handlerFuture2.withTimeout(3.seconds) + not await otherHandlerFuture1.withTimeout(3.seconds) + not await otherHandlerFuture2.withTimeout(3.seconds) + await anotherHandlerFuture1.withTimeout(3.seconds) + await anotherHandlerFuture2.withTimeout(3.seconds) + + let + (anotherTopic3, anotherMsg3) = anotherHandlerFuture1.read() + (anotherTopic4, anotherMsg4) = anotherHandlerFuture2.read() + + check: + anotherTopic3 == pubsubTopicB + anotherMsg3 == fromAnotherNodeWakuMessage2 + anotherTopic4 == pubsubTopicC + anotherMsg4 == fromAnotherNodeWakuMessage3 + + # When publishing a message in otherNode for each of the pubsub topics + handlerFuture = newPushHandlerFuture() + handlerFuture2 = newPushHandlerFuture() + otherHandlerFuture1 = newPushHandlerFuture() + otherHandlerFuture2 = newPushHandlerFuture() + anotherHandlerFuture1 = newPushHandlerFuture() + anotherHandlerFuture2 = newPushHandlerFuture() + + let + fromOtherNodeWakuMessage1 = fakeWakuMessage("fromOtherNode1") + fromOtherNodeWakuMessage2 = fakeWakuMessage("fromOtherNode2") + fromOtherNodeWakuMessage3 = fakeWakuMessage("fromOtherNode3") + + discard await otherNode.publish(pubsubTopic, fromOtherNodeWakuMessage1) + discard await otherNode.publish(pubsubTopicB, fromOtherNodeWakuMessage2) + discard await otherNode.publish(pubsubTopicC, fromOtherNodeWakuMessage3) + + # Then the messages are only published in otherNode and node, but not in anotherNode + # because it's disconnected from the rest of the network + check: + await handlerFuture.withTimeout(3.seconds) + await handlerFuture2.withTimeout(3.seconds) + await otherHandlerFuture1.withTimeout(3.seconds) + await otherHandlerFuture2.withTimeout(3.seconds) + not await anotherHandlerFuture1.withTimeout(3.seconds) + not await anotherHandlerFuture2.withTimeout(3.seconds) + + let + (topic5, msg5) = handlerFuture.read() + (topic6, msg6) = handlerFuture2.read() + (otherTopic5, otherMsg5) = otherHandlerFuture1.read() + (otherTopic6, otherMsg6) = otherHandlerFuture2.read() + + check: + topic5 == pubsubTopic + msg5 == fromOtherNodeWakuMessage1 + topic6 == pubsubTopicB + msg6 == fromOtherNodeWakuMessage2 + otherTopic5 == pubsubTopic + otherMsg5 == fromOtherNodeWakuMessage1 + otherTopic6 == pubsubTopicC + otherMsg6 == fromOtherNodeWakuMessage3 + + # Given anotherNode is reconnected, but to otherNode + check await anotherPeerManager.connectRelay(otherRemotePeerInfo) + check: + anotherPeerManager.switch.isConnected(otherPeerId) + otherPeerManager.switch.isConnected(anotherPeerId) + + # When publishing a message in anotherNode for each of the pubsub topics + handlerFuture = newPushHandlerFuture() + handlerFuture2 = newPushHandlerFuture() + otherHandlerFuture1 = newPushHandlerFuture() + otherHandlerFuture2 = newPushHandlerFuture() + anotherHandlerFuture1 = newPushHandlerFuture() + anotherHandlerFuture2 = newPushHandlerFuture() + + let + fromAnotherNodeWakuMessage4 = fakeWakuMessage("fromAnotherNode4") + fromAnotherNodeWakuMessage5 = fakeWakuMessage("fromAnotherNode5") + fromAnotherNodeWakuMessage6 = fakeWakuMessage("fromAnotherNode6") + + discard await anotherNode.publish(pubsubTopic, fromAnotherNodeWakuMessage4) + discard await anotherNode.publish(pubsubTopicB, fromAnotherNodeWakuMessage5) + discard await anotherNode.publish(pubsubTopicC, fromAnotherNodeWakuMessage6) + + # Then the messages are published in all nodes except in node's B topic, because + # even if they're connected like so AnotherNode <-> OtherNode <-> Node, + # otherNode doesn't broadcast B topic messages because it's not subscribed to it + check: + await handlerFuture.withTimeout(3.seconds) + not await handlerFuture2.withTimeout(3.seconds) + await otherHandlerFuture1.withTimeout(3.seconds) + await otherHandlerFuture2.withTimeout(3.seconds) + await anotherHandlerFuture1.withTimeout(3.seconds) + await anotherHandlerFuture2.withTimeout(3.seconds) + + let + (topic7, msg7) = handlerFuture.read() + (otherTopic7, otherMsg7) = otherHandlerFuture1.read() + (otherTopic8, otherMsg8) = otherHandlerFuture2.read() + (anotherTopic7, anotherMsg7) = anotherHandlerFuture1.read() + (anotherTopic8, anotherMsg8) = anotherHandlerFuture2.read() + + check: + topic7 == pubsubTopic + msg7 == fromAnotherNodeWakuMessage4 + otherTopic7 == pubsubTopic + otherMsg7 == fromAnotherNodeWakuMessage4 + otherTopic8 == pubsubTopicC + otherMsg8 == fromAnotherNodeWakuMessage6 + anotherTopic7 == pubsubTopicB + anotherMsg7 == fromAnotherNodeWakuMessage5 + anotherTopic8 == pubsubTopicC + anotherMsg8 == fromAnotherNodeWakuMessage6 + + # Finally stop the other nodes + await allFutures(otherSwitch.stop(), otherNode.stop(), anotherSwitch.stop(), anotherNode.stop()) + + suite "Unsubscribe": + asyncTest "Without Subscription": + # Given an external topic handler + let + otherSwitch = newTestSwitch() + otherNode = await newTestWakuRelay(otherSwitch) + await allFutures(otherSwitch.start(), otherNode.start()) + let otherTopicHandler: TopicHandler = otherNode.subscribe(pubsubTopic, simpleFutureHandler) + + # Given a node without a subscription + check: + node.subscribedTopics == [] + + # When unsubscribing from a pubsub topic from an unsubscribed topic handler + node.unsubscribe(pubsubTopic, otherTopicHandler) + + # Then the node is still not subscribed + check: + node.subscribedTopics == [] + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + asyncTest "Single Node with Single Pubsub Topic": + # Given a node subscribed to a pubsub topic + let topicHandler = node.subscribe(pubsubTopic, simpleFutureHandler) + check node.subscribedTopics == pubsubTopicSeq + + # When unsubscribing from the pubsub topic + node.unsubscribe(pubsubTopic, topicHandler) + + # Then the node is not subscribed anymore + check node.subscribedTopics == [] + + asyncTest "Single Node with Multiple Pubsub Topics": + # Given other pubsub topic + let pubsubTopicB = "pubsub-topic-b" + + # Given a node subscribed to multiple pubsub topics + let + topicHandler = node.subscribe(pubsubTopic, simpleFutureHandler) + topicHandlerB = node.subscribe(pubsubTopicB, simpleFutureHandler) + check node.subscribedTopics == @[pubsubTopic, pubsubTopicB] + + # When unsubscribing from one of the pubsub topics + node.unsubscribe(pubsubTopic, topicHandler) + + # Then the node is still subscribed to the other pubsub topic + check node.subscribedTopics == @[pubsubTopicB] + + # When unsubscribing from the other pubsub topic + node.unsubscribe(pubsubTopicB, topicHandlerB) + + # Then the node is not subscribed anymore + check node.subscribedTopics == [] + + suite "Unsubscribe All": + asyncTest "Without subscriptions": + # Given a node without subscriptions + check node.subscribedTopics == [] + + # When unsubscribing from all pubsub topics + node.unsubscribeAll(pubsubTopic) + + # Then the node is still not subscribed + check node.subscribedTopics == [] + + asyncTest "Single Node with Single Pubsub Topic": + # Given a node subscribed to a pubsub topic + node.subscribe(pubsubTopic, simpleFutureHandler) + check node.subscribedTopics == pubsubTopicSeq + + # When unsubscribing from all pubsub topics + node.unsubscribeAll(pubsubTopic) + + # Then the node is not subscribed anymore + check node.subscribedTopics == [] + + asyncTest "Single Node with Multiple Pubsub Topics": + # Given other pubsub topic + let pubsubTopicB = "pubsub-topic-b" + + # Given a node subscribed to multiple pubsub topics + discard node.subscribe(pubsubTopic, simpleFutureHandler) + discard node.subscribe(pubsubTopic, simpleFutureHandler) + discard node.subscribe(pubsubTopicB, simpleFutureHandler) + + check node.subscribedTopics == @[pubsubTopic, pubsubTopicB] + + # When unsubscribing all handlers from pubsubTopic + node.unsubscribeAll(pubsubTopic) + + # Then the node doesn't have pubsubTopic handlers + check node.subscribedTopics == @[pubsubTopicB] + + # When unsubscribing all handlers from pubsubTopicB + node.unsubscribeAll(pubsubTopicB) + + # Then the node is not subscribed to anything + check node.subscribedTopics == [] + + suite "Send & Retrieve Messages": + asyncTest "Valid Payload Types": + # Given a second node connected to the first one + let + otherSwitch = newTestSwitch() + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + check await peerManager.connectRelay(otherRemotePeerInfo) + + # Given both are subscribed to the same pubsub topic + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + discard node.subscribe(pubsubTopic, simpleFutureHandler) + check: + node.subscribedTopics == pubsubTopicSeq + otherNode.subscribedTopics == pubsubTopicSeq + + await sleepAsync(500.millis) + + # Given some payloads + let + JSON_DICTIONARY = getSampleJsonDictionary() + JSON_LIST = getSampleJsonList() + + # Given some valid messages + let + msg1 = fakeWakuMessage(contentTopic=contentTopic, payload=ALPHABETIC) + msg2 = fakeWakuMessage(contentTopic=contentTopic, payload=ALPHANUMERIC) + msg3 = fakeWakuMessage(contentTopic=contentTopic, payload=ALPHANUMERIC_SPECIAL) + msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=EMOJI) + msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=CODE) + msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=QUERY) + msg7 = fakeWakuMessage(contentTopic=contentTopic, payload=($JSON_DICTIONARY)) + msg8 = fakeWakuMessage(contentTopic=contentTopic, payload=($JSON_LIST)) + msg9 = fakeWakuMessage(contentTopic=contentTopic, payload=TEXT_SMALL) + msg10 = fakeWakuMessage(contentTopic=contentTopic, payload=TEXT_LARGE) + + # When sending the alphabetic message + discard await node.publish(pubsubTopic, msg1) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg1) == handlerFuture.read() + (pubsubTopic, msg1) == otherHandlerFuture.read() + + # When sending the alphanumeric message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg2) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg2) == handlerFuture.read() + (pubsubTopic, msg2) == otherHandlerFuture.read() + + # When sending the alphanumeric special message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg3) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg3) == handlerFuture.read() + (pubsubTopic, msg3) == otherHandlerFuture.read() + + # When sending the emoji message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg4) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg4) == handlerFuture.read() + (pubsubTopic, msg4) == otherHandlerFuture.read() + + # When sending the code message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg5) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg5) == handlerFuture.read() + (pubsubTopic, msg5) == otherHandlerFuture.read() + + # When sending the query message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg6) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg6) == handlerFuture.read() + (pubsubTopic, msg6) == otherHandlerFuture.read() + + # When sending the JSON dictionary message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg7) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg7) == handlerFuture.read() + (pubsubTopic, msg7) == otherHandlerFuture.read() + + # When sending the JSON list message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg8) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg8) == handlerFuture.read() + (pubsubTopic, msg8) == otherHandlerFuture.read() + + # When sending the small text message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg9) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg9) == handlerFuture.read() + (pubsubTopic, msg9) == otherHandlerFuture.read() + + # When sending the large text message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg10) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg10) == handlerFuture.read() + (pubsubTopic, msg10) == otherHandlerFuture.read() + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + asyncTest "Valid Payload Sizes": + # Given a second node connected to the first one + let + otherSwitch = newTestSwitch() + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + check await peerManager.connectRelay(otherRemotePeerInfo) + + # Given both are subscribed to the same pubsub topic + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + discard node.subscribe(pubsubTopic, simpleFutureHandler) + check: + node.subscribedTopics == pubsubTopicSeq + otherNode.subscribedTopics == pubsubTopicSeq + + await sleepAsync(500.millis) + + # Given some valid payloads + let + msg1 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024)) # 1KiB + msg2 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(10*1024)) # 10KiB + msg3 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(100*1024)) # 100KiB + msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1023*1024)) # 1MiB - 1B -> Max Size (Inclusive Limit) + msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024)) # 1MiB -> Max Size (Exclusive Limit) + msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1025*1024)) # 1MiB + 1B -> Out of Max Size + + # When sending the 1KiB message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg1) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg1) == handlerFuture.read() + (pubsubTopic, msg1) == otherHandlerFuture.read() + + # When sending the 10KiB message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg2) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg2) == handlerFuture.read() + (pubsubTopic, msg2) == otherHandlerFuture.read() + + # When sending the 100KiB message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg3) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg3) == handlerFuture.read() + (pubsubTopic, msg3) == otherHandlerFuture.read() + + # When sending the 1023KiB message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg4) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg4) == handlerFuture.read() + (pubsubTopic, msg4) == otherHandlerFuture.read() + + # When sending the 1024KiB message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg5) + + # Then the message is received in self, because there's no checking, but not in other node + check: + await handlerFuture.withTimeout(3.seconds) + not await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg5) == handlerFuture.read() + + # When sending the 1025KiB message + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg6) + + # Then the message is received in self, because there's no checking, but not in other node + check: + await handlerFuture.withTimeout(3.seconds) + not await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg6) == handlerFuture.read() + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + asyncTest "Multiple messages at once": + # Given a second node connected to the first one + let + otherSwitch = newTestSwitch() + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + check await peerManager.connectRelay(otherRemotePeerInfo) + + # Given both are subscribed to the same pubsub topic + # Create a different handler than the default to include messages in a seq + var thisHandlerFuture = newPushHandlerFuture() + var thisMessageSeq: seq[(PubsubTopic, WakuMessage)] = @[] + proc thisSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + thisMessageSeq.add((topic, message)) + thisHandlerFuture.complete((topic, message)) + + var otherHandlerFuture = newPushHandlerFuture() + var otherMessageSeq: seq[(PubsubTopic, WakuMessage)] = @[] + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherMessageSeq.add((topic, message)) + otherHandlerFuture.complete((topic, message)) + + discard node.subscribe(pubsubTopic, thisSimpleFutureHandler) + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + check: + node.subscribedTopics == pubsubTopicSeq + otherNode.subscribedTopics == pubsubTopicSeq + await sleepAsync(500.millis) + + # When sending multiple messages from node + let + msg1 = fakeWakuMessage("msg1", pubsubTopic) + msg2 = fakeWakuMessage("msg2", pubsubTopic) + msg3 = fakeWakuMessage("msg3", pubsubTopic) + msg4 = fakeWakuMessage("msg4", pubsubTopic) + + discard await node.publish(pubsubTopic, msg1) + check await thisHandlerFuture.withTimeout(3.seconds) + check await otherHandlerFuture.withTimeout(3.seconds) + thisHandlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg2) + check await thisHandlerFuture.withTimeout(3.seconds) + check await otherHandlerFuture.withTimeout(3.seconds) + thisHandlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg3) + check await thisHandlerFuture.withTimeout(3.seconds) + check await otherHandlerFuture.withTimeout(3.seconds) + thisHandlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + discard await node.publish(pubsubTopic, msg4) + + check: + await thisHandlerFuture.withTimeout(3.seconds) + thisMessageSeq == @[ + (pubsubTopic, msg1), + (pubsubTopic, msg2), + (pubsubTopic, msg3), + (pubsubTopic, msg4) + ] + await otherHandlerFuture.withTimeout(3.seconds) + otherMessageSeq == @[ + (pubsubTopic, msg1), + (pubsubTopic, msg2), + (pubsubTopic, msg3), + (pubsubTopic, msg4) + ] + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + suite "Security and Privacy": + asyncTest "Relay can receive messages after reboot and reconnect": + # Given a second node connected to the first one + let + otherSwitch = newTestSwitch() + otherPeerManager = PeerManager.new(otherSwitch) + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let + otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + otherPeerId = otherRemotePeerInfo.peerId + + check await peerManager.connectRelay(otherRemotePeerInfo) + + # Given both are subscribed to the same pubsub topic + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + discard node.subscribe(pubsubTopic, simpleFutureHandler) + check: + node.subscribedTopics == pubsubTopicSeq + otherNode.subscribedTopics == pubsubTopicSeq + await sleepAsync(500.millis) + + # Given other node is stopped and restarted + await allFutures(otherSwitch.stop(), otherNode.stop()) + await allFutures(otherSwitch.start(), otherNode.start()) + # FIXME: Once stopped and started, nodes are not considered connected, nor do they reconnect after running connectRelay, as below + # check await otherPeerManager.connectRelay(otherRemotePeerInfo) + + # When sending a message from node + let msg1 = fakeWakuMessage(testMessage, pubsubTopic) + discard await node.publish(pubsubTopic, msg1) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg1) == handlerFuture.read() + (pubsubTopic, msg1) == otherHandlerFuture.read() + + # When sending a message from other node + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + let msg2 = fakeWakuMessage(testMessage, pubsubTopic) + discard await otherNode.publish(pubsubTopic, msg2) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg2) == handlerFuture.read() + (pubsubTopic, msg2) == otherHandlerFuture.read() + + # Given node is stopped and restarted + await allFutures(switch.stop(), node.stop()) + await allFutures(switch.start(), node.start()) + + # When sending a message from node + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + let msg3 = fakeWakuMessage(testMessage, pubsubTopic) + discard await node.publish(pubsubTopic, msg3) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg3) == handlerFuture.read() + (pubsubTopic, msg3) == otherHandlerFuture.read() + + # When sending a message from other node + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + let msg4 = fakeWakuMessage(testMessage, pubsubTopic) + discard await otherNode.publish(pubsubTopic, msg4) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg4) == handlerFuture.read() + (pubsubTopic, msg4) == otherHandlerFuture.read() + + # Finally stop the other node + await allFutures(otherSwitch.stop(), otherNode.stop()) + + xasyncTest "Relay can receive messages after subscribing and stopping without unsubscribing": + # Given a second node connected to the first one + let + otherSwitch = newTestSwitch() + otherPeerManager = PeerManager.new(otherSwitch) + otherNode = await newTestWakuRelay(otherSwitch) + + await allFutures(otherSwitch.start(), otherNode.start()) + let + otherRemotePeerInfo = otherSwitch.peerInfo.toRemotePeerInfo() + otherPeerId = otherRemotePeerInfo.peerId + + check await peerManager.connectRelay(otherRemotePeerInfo) + + # Given both are subscribed to the same pubsub topic + var otherHandlerFuture = newPushHandlerFuture() + proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + otherHandlerFuture.complete((topic, message)) + + discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + discard node.subscribe(pubsubTopic, simpleFutureHandler) + check: + node.subscribedTopics == pubsubTopicSeq + otherNode.subscribedTopics == pubsubTopicSeq + + await sleepAsync(500.millis) + + # FIXME: Inconsistent behaviour with Filter protocol. + # Given other node is stopped without unsubscribing + await allFutures(otherSwitch.stop(), otherNode.stop()) + + # When sending a message from node + let msg1 = fakeWakuMessage(testMessage, pubsubTopic) + discard await node.publish(pubsubTopic, msg1) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg1) == handlerFuture.read() + (pubsubTopic, msg1) == otherHandlerFuture.read() + + # When sending a message from other node + handlerFuture = newPushHandlerFuture() + otherHandlerFuture = newPushHandlerFuture() + let msg2 = fakeWakuMessage(testMessage, pubsubTopic) + discard await otherNode.publish(pubsubTopic, msg2) + + # Then the message is received in both nodes + check: + await handlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(3.seconds) + (pubsubTopic, msg2) == handlerFuture.read() + (pubsubTopic, msg2) == otherHandlerFuture.read() diff --git a/tests/waku_relay/test_waku_relay.nim b/tests/waku_relay/test_waku_relay.nim index 62e094bd1..53e3ec803 100644 --- a/tests/waku_relay/test_waku_relay.nim +++ b/tests/waku_relay/test_waku_relay.nim @@ -8,30 +8,14 @@ import chronos, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/rpc/messages + import ../../../waku/node/peer_manager, ../../../waku/waku_core, ../../../waku/waku_relay, ../testlib/common, - ../testlib/wakucore - - -proc noopRawHandler(): WakuRelayHandler = - var handler: WakuRelayHandler - handler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = discard - handler - - -proc newTestWakuRelay(switch = newTestSwitch()): Future[WakuRelay] {.async.} = - let proto = WakuRelay.new(switch).tryGet() - await proto.start() - - let protocolMatcher = proc(proto: string): bool {.gcsafe.} = - return proto.startsWith(WakuRelayCodec) - - switch.mount(proto, protocolMatcher) - - return proto + ../testlib/wakucore, + ./utils suite "Waku Relay": diff --git a/tests/waku_relay/utils.nim b/tests/waku_relay/utils.nim new file mode 100644 index 000000000..bb394ada2 --- /dev/null +++ b/tests/waku_relay/utils.nim @@ -0,0 +1,29 @@ +{.used.} + +import + std/[strutils], + stew/shims/net as stewNet, + chronos + +import + ../../../waku/waku_relay, + ../../../waku/waku_core, + ../testlib/wakucore + + +proc noopRawHandler*(): WakuRelayHandler = + var handler: WakuRelayHandler + handler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = discard + handler + + +proc newTestWakuRelay*(switch = newTestSwitch()): Future[WakuRelay] {.async.} = + let proto = WakuRelay.new(switch).tryGet() + await proto.start() + + let protocolMatcher = proc(proto: string): bool {.gcsafe.} = + return proto.startsWith(WakuRelayCodec) + + switch.mount(proto, protocolMatcher) + + return proto diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 7709e2aba..c99f87a56 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -217,7 +217,7 @@ proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} = proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler = debug "subscribe", pubsubTopic=pubsubTopic - # we need to wrap the handler since gossipsub doesnt understand WakuMessage + # We need to wrap the handler since gossipsub doesnt understand WakuMessage let wrappedHandler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} = let decMsg = WakuMessage.decode(data) @@ -230,7 +230,9 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle else: return handler(pubsubTopic, decMsg.get()) - # add the ordered validator to the topic + # Add the ordered validator to the topic + # This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator. + # Otherwise this might lead to unintended behaviour. if not w.validatorInserted.hasKey(pubSubTopic): procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator()) w.validatorInserted[pubSubTopic] = true