From 3e669e2a1be9239ac6575184856bf605de558ce5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Wed, 15 Nov 2023 16:15:38 +0100 Subject: [PATCH] test(relay-filter): cleanup (#2138) * Fix some tests. * Clean legacy tests. * Fix imports. --- tests/all_tests_waku.nim | 2 +- tests/testlib/futures.nim | 4 +- tests/waku_filter_v2/test_waku_client.nim | 766 +++++++++++---------- tests/waku_filter_v2/waku_filter_utils.nim | 27 +- tests/waku_relay/test_all.nim | 5 +- tests/waku_relay/test_message_id.nim | 12 +- tests/waku_relay/test_protocol.nim | 232 +++---- tests/waku_relay/test_waku_relay.nim | 211 ------ 8 files changed, 541 insertions(+), 718 deletions(-) delete mode 100644 tests/waku_relay/test_waku_relay.nim diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 97039c1ec..35e9f41a6 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -41,7 +41,7 @@ when defined(waku_exp_store_resume): import - ./waku_relay/test_all + ./waku_relay/test_all, ./waku_filter_v2/test_all diff --git a/tests/testlib/futures.nim b/tests/testlib/futures.nim index ffed6a5db..f55114ea7 100644 --- a/tests/testlib/futures.nim +++ b/tests/testlib/futures.nim @@ -1,9 +1,11 @@ import - chronicles, chronos import ../../../waku/waku_core/message + +let FUTURE_TIMEOUT* = 1.seconds + proc newPushHandlerFuture*(): Future[(string, WakuMessage)] = newFuture[(string, WakuMessage)]() diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index 218764ff3..ed02b65c6 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -24,21 +24,21 @@ import waku_core ], ../../../waku/waku_filter_v2/[ + common, client, - subscriptions + subscriptions, + protocol ], ../testlib/[ - common, - wakucore, - testasync, - testutils, - futures, + wakucore, + testasync, + testutils, + futures, sequtils ], - ./waku_filter_utils.nim, - ../resources/payloads.nim + ./waku_filter_utils, + ../resources/payloads -let FUTURE_TIMEOUT = 1.seconds suite "Waku Filter - End to End": suite "MessagePushHandler - Void": @@ -52,13 +52,16 @@ suite "Waku Filter - End to End": var contentTopicSeq {.threadvar.}: seq[ContentTopic] var clientPeerId {.threadvar.}: PeerId var messagePushHandler {.threadvar.}: FilterPushHandler - var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)] + var msgSeq {.threadvar.}: seq[(PubsubTopic, WakuMessage)] + var pushHandlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] asyncSetup: + msgSeq = @[] pushHandlerFuture = newPushHandlerFuture() messagePushHandler = proc( pubsubTopic: PubsubTopic, message: WakuMessage - ) {.async, closure, gcsafe.} = + ): Future[void] {.async, closure, gcsafe.} = + msgSeq.add((pubsubTopic, message)) pushHandlerFuture.complete((pubsubTopic, message)) pubsubTopic = DefaultPubsubTopic @@ -85,15 +88,14 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) assert subscribeResponse.isOk(), $subscribeResponse.error - require: - wakuFilter.subscriptions.hasKey(clientPeerId) + check wakuFilter.subscriptions.hasKey(clientPeerId) # When let subscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) # Then + assert subscribedPingResponse.isOk(), $subscribedPingResponse.error check: - subscribedPingResponse.isOk() wakuFilter.subscriptions.hasKey(clientPeerId) asyncTest "No Active Subscription Identification": @@ -112,18 +114,15 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() - wakuFilter.subscriptions.hasKey(clientPeerId) + assert subscribeResponse.isOk(), $subscribeResponse.error + check wakuFilter.subscriptions.hasKey(clientPeerId) # When let unsubscribeResponse = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) assert unsubscribeResponse.isOk(), $unsubscribeResponse.error - require: - unsubscribeResponse.isOk() - not wakuFilter.subscriptions.hasKey(clientPeerId) + check not wakuFilter.subscriptions.hasKey(clientPeerId) let unsubscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) @@ -136,7 +135,6 @@ suite "Waku Filter - End to End": asyncTest "Server remote peer info doesn't match an online server": # Given an offline service node let offlineServerSwitch = newStandardSwitch() - let offlineWakuFilter = await newTestWakuFilter(offlineServerSwitch) let offlineServerRemotePeerInfo = offlineServerSwitch.peerInfo.toRemotePeerInfo() # When subscribing to the offline service node @@ -170,8 +168,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscribeResponse.isOk(), $subscribeResponse.error check: - subscribeResponse.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq @@ -181,7 +179,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg1) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() check: pushedMsgPubsubTopic == pubsubTopic @@ -200,8 +198,8 @@ suite "Waku Filter - End to End": let unsubscribeResponse = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - unsubscribeResponse.isOk() + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error + check: wakuFilter.subscriptions.len == 0 # When sending a message to the previously unsubscribed content topic @@ -232,8 +230,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicsSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq @@ -243,7 +241,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg1) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() check: pushedMsgPubsubTopic1 == pubsubTopic @@ -255,7 +253,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg2) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() check: pushedMsgPubsubTopic2 == pubsubTopic @@ -274,9 +272,8 @@ suite "Waku Filter - End to End": let unsubscribeResponse = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, pubsubTopic, contentTopicsSeq ) - require: - unsubscribeResponse.isOk() - wakuFilter.subscriptions.len == 0 + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error + check wakuFilter.subscriptions.len == 0 # When sending a message to the previously unsubscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future @@ -317,8 +314,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscribeResponse1.isOk(), $subscribeResponse1.error check: - subscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq @@ -329,8 +326,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscribeResponse2.isOk(), $subscribeResponse2.error check: - subscribeResponse2.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq & otherContentTopicSeq @@ -340,7 +337,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg1) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() check: pushedMsgPubsubTopic1 == pubsubTopic @@ -352,7 +349,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(otherPubsubTopic, msg2) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() check: pushedMsgPubsubTopic2 == otherPubsubTopic @@ -373,8 +370,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == otherContentTopicSeq @@ -394,7 +391,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(otherPubsubTopic, msg5) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() check: pushedMsgPubsubTopic3 == otherPubsubTopic @@ -406,8 +403,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 0 # When sending a message to the previously unsubscribed content topic @@ -431,8 +428,8 @@ suite "Waku Filter - End to End": ) # Then + assert subscribeResponse1.isOk(), $subscribeResponse1.error check: - subscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq @@ -443,8 +440,8 @@ suite "Waku Filter - End to End": ) # Then + assert subscribeResponse2.isOk(), $subscribeResponse2.error check: - subscribeResponse2.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq & otherContentTopicSeq @@ -454,7 +451,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg1) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() check: pushedMsgPubsubTopic1 == pubsubTopic @@ -466,7 +463,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(otherPubsubTopic, msg2) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() check: pushedMsgPubsubTopic2 == otherPubsubTopic @@ -485,8 +482,8 @@ suite "Waku Filter - End to End": let unsubscribeResponse = await wakuFilterClient.unsubscribeAll(serverRemotePeerInfo) # Then the unsubscription is successful + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - unsubscribeResponse.isOk() wakuFilter.subscriptions.len == 0 # When sending a message the previously subscribed content topics @@ -514,8 +511,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscribeResponse1.isOk(), $subscribeResponse1.error check: - subscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq1 @@ -526,8 +523,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscribeResponse2.isOk(), $subscribeResponse2.error check: - subscribeResponse2.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq1 & contentTopicsSeq2 @@ -537,7 +534,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg1) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() check: pushedMsgPubsubTopic1 == pubsubTopic @@ -549,7 +546,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg2) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() check: pushedMsgPubsubTopic2 == pubsubTopic @@ -561,7 +558,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(otherPubsubTopic, msg3) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() check: pushedMsgPubsubTopic3 == otherPubsubTopic @@ -573,7 +570,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(otherPubsubTopic, msg4) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic4, pushedMsg4) = pushHandlerFuture.read() check: pushedMsgPubsubTopic4 == otherPubsubTopic @@ -588,9 +585,9 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse1.isOk() - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == @[contentTopic, otherContentTopic2] @@ -601,7 +598,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg5) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic5, pushedMsg5) = pushHandlerFuture.read() check: pushedMsgPubsubTopic5 == pubsubTopic @@ -613,7 +610,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(otherPubsubTopic, msg6) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic6, pushedMsg6) = pushHandlerFuture.read() check: pushedMsgPubsubTopic6 == otherPubsubTopic @@ -640,8 +637,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful - require: - subscribeResponse1.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 30 @@ -652,8 +649,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscribeResponse2.isOk(), $subscribeResponse2.error check: - subscribeResponse2.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 30 @@ -726,9 +723,8 @@ suite "Waku Filter - End to End": await standardSwitch.start() let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq - ) - require: - subscribeResponse.isOk() + ) + assert subscribeResponse.isOk(), $subscribeResponse.error # Then the service node should have MaxTotalSubscriptions subscriptions check: @@ -758,8 +754,8 @@ suite "Waku Filter - End to End": let subscribeResponse1 = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse1.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq @@ -770,8 +766,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscriptionResponse2.isOk(), $subscriptionResponse2.error check: - subscriptionResponse2.isOk() wakuFilter2.subscriptions.len == 1 wakuFilter2.subscriptions.hasKey(clientPeerId) wakuFilter2.getSubscribedContentTopics(clientPeerId) == contentTopicSeq @@ -787,7 +783,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg1) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() check: pushedMsgPubsubTopic1 == pubsubTopic @@ -799,12 +795,129 @@ suite "Waku Filter - End to End": await wakuFilter2.handleMessage(pubsubTopic, msg2) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() check: pushedMsgPubsubTopic2 == pubsubTopic pushedMsg2 == msg2 + asyncTest "Refreshing Subscription": + # Given a valid subscription + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + assert subscribeResponse1.isOk(), $subscribeResponse1.error + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + + # When refreshing the subscription + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the subscription is successful + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + + # When sending a message to the refreshed subscription + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + + # And the message is not duplicated + check: + msgSeq.len == 1 + msgSeq[0][0] == pubsubTopic + msgSeq[0][1] == msg1 + + asyncTest "Overlapping Topic Subscription": + # Given a set of overlapping subscriptions + let + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, @["other-content-topic"] + ) + subscribeResponse3 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq + ) + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + assert subscribeResponse3.isOk(), $subscribeResponse3.error + check: + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When sending a message to the overlapping subscription 1 + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + + # And the message is not duplicated + check: + msgSeq.len == 1 + msgSeq[0][0] == pubsubTopic + msgSeq[0][1] == msg1 + + # When sending a message to the overlapping subscription 2 + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)) # Check there're no duplicate messages + pushHandlerFuture = newPushHandlerFuture() # Reset future due to timeout + + let msg2 = fakeWakuMessage(contentTopic="other-content-topic") + await wakuFilter.handleMessage(pubsubTopic, msg2) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == pubsubTopic + pushedMsg2 == msg2 + + # And the message is not duplicated + check: + msgSeq.len == 2 + msgSeq[1][0] == pubsubTopic + msgSeq[1][1] == msg2 + + # When sending a message to the overlapping subscription 3 + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)) # Check there're no duplicate messages + pushHandlerFuture = newPushHandlerFuture() # Reset future due to timeout + + let msg3 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage("other-pubsub-topic", msg3) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic3 == "other-pubsub-topic" + pushedMsg3 == msg3 + + # And the message is not duplicated + check: + msgSeq.len == 3 + msgSeq[2][0] == "other-pubsub-topic" + msgSeq[2][1] == msg3 + suite "Unsubscribe": ### @@ -816,8 +929,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq @@ -828,8 +941,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - unsubscribeResponse.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a subscription with Single Content Topic": @@ -837,8 +950,8 @@ suite "Waku Filter - End to End": let subscribeResponse1 = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse1.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq @@ -849,8 +962,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscribeResponse2.isOk(), $subscribeResponse2.error check: - subscribeResponse2.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq @@ -861,8 +974,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - unsubscribeResponse.isOk() wakuFilter.subscriptions.len == 0 asyncTest "PubSub Topic with Multiple Content Topics, One By One": @@ -871,8 +984,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -883,8 +996,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] @@ -895,8 +1008,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - unsubscribeResponse.isOk() wakuFilter.subscriptions.len == 0 asyncTest "PubSub Topic with Multiple Content Topics, All At Once": @@ -905,8 +1018,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -917,8 +1030,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - unsubscribeResponse.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a complete subscription with Multiple Content Topics, One By One": @@ -927,8 +1040,8 @@ suite "Waku Filter - End to End": let subscribeResponse1 = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse1.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -938,8 +1051,8 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse2.isOk() + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -950,8 +1063,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] @@ -962,8 +1075,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a complete subscription with Multiple Content Topics, All At Once": @@ -972,8 +1085,8 @@ suite "Waku Filter - End to End": let subscribeResponse1 = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse1.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -983,8 +1096,8 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse2.isOk() + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -995,8 +1108,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - unsubscribeResponse.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a partial subscription with Multiple Content Topics, One By One": @@ -1005,8 +1118,8 @@ suite "Waku Filter - End to End": let subscribeResponse1 = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse1.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -1015,8 +1128,8 @@ suite "Waku Filter - End to End": let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, pubsubTopic, @[contentTopic] ) - require: - unsubscribeResponse1.isOk() + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] @@ -1025,8 +1138,8 @@ suite "Waku Filter - End to End": let subscribeResponse2 = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse2.isOk() + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -1037,8 +1150,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] @@ -1049,8 +1162,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse3.isOk(), $unsubscribeResponse3.error check: - unsubscribeResponse3.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a partial subscription with Multiple Content Topics, All At Once": @@ -1059,8 +1172,8 @@ suite "Waku Filter - End to End": let subscribeResponse1 = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse1.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -1069,8 +1182,8 @@ suite "Waku Filter - End to End": let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, pubsubTopic, @[contentTopic] ) - require: - unsubscribeResponse1.isOk() + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] @@ -1079,8 +1192,8 @@ suite "Waku Filter - End to End": let subscribeResponse2 = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq ) - require: - subscribeResponse2.isOk() + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq @@ -1091,8 +1204,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 0 ### @@ -1109,10 +1222,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq ) - # TODO: CHECK IF THIS MAKES SENSE - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1123,8 +1235,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 @@ -1135,8 +1247,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 0 asyncTest "Different PubSub Topics with Multiple (Same) Content Topics, One By One": @@ -1150,9 +1262,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1163,8 +1275,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 3 @@ -1175,8 +1287,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1187,8 +1299,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse3.isOk(), $unsubscribeResponse3.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 @@ -1199,8 +1311,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse4.isOk(), $unsubscribeResponse4.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 0 asyncTest "Different PubSub Topics with Multiple (Same) Content Topics, All At Once": @@ -1214,10 +1326,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - # TODO: CHECK IF THIS MAKES SENSE - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1228,8 +1339,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1240,8 +1351,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a complete subscription with different PubSub Topics and Single (Same) Content Topic": @@ -1254,10 +1365,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq ) - # TODO: CHECK IF THIS MAKES SENSE - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1271,9 +1381,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq ) - require: - subscribeResponse3.isOk() - subscribeResponse4.isOk() + assert subscribeResponse3.isOk(), $subscribeResponse3.error + assert subscribeResponse4.isOk(), $subscribeResponse4.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1284,8 +1394,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 @@ -1296,8 +1406,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a complete subscription with different PubSub Topics and Multiple (Same) Content Topics, One By One": @@ -1311,10 +1421,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - # TODO: CHECK IF THIS MAKES SENSE - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1328,9 +1437,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - require: - subscribeResponse3.isOk() - subscribeResponse4.isOk() + assert subscribeResponse3.isOk(), $subscribeResponse3.error + assert subscribeResponse4.isOk(), $subscribeResponse4.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1341,8 +1450,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 3 @@ -1353,8 +1462,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1365,8 +1474,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse3.isOk(), $unsubscribeResponse3.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 @@ -1377,8 +1486,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse4.isOk(), $unsubscribeResponse4.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a complete subscription with different PubSub Topics and Multiple (Same) Content Topics, All At Once": @@ -1392,10 +1501,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - # TODO: CHECK IF THIS MAKES SENSE - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1409,9 +1517,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - require: - subscribeResponse3.isOk() - subscribeResponse4.isOk() + assert subscribeResponse3.isOk(), $subscribeResponse3.error + assert subscribeResponse4.isOk(), $subscribeResponse4.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1422,8 +1530,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - unsubscribeResponse1.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1434,8 +1542,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - unsubscribeResponse2.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a partial subscription with different PubSub Topics and Multiple (Same) Content Topics, One By One": @@ -1449,10 +1557,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - # TODO: CHECK IF THIS MAKES SENSE - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1466,9 +1573,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", @["other-content-topic"] ) - require: - unsubscribeResponse1.isOk() - unsubscribeResponse2.isOk() + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1482,9 +1589,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - require: - refreshSubscriptionResponse1.isOk() - refreshSubscriptionResponse2.isOk() + assert refreshSubscriptionResponse1.isOk(), $refreshSubscriptionResponse1.error + assert refreshSubscriptionResponse2.isOk(), $refreshSubscriptionResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1495,8 +1602,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse3.isOk(), $unsubscribeResponse3.error check: - unsubscribeResponse3.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 3 @@ -1507,8 +1614,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse4.isOk(), $unsubscribeResponse4.error check: - unsubscribeResponse4.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1519,8 +1626,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse5.isOk(), $unsubscribeResponse5.error check: - unsubscribeResponse5.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 @@ -1531,8 +1638,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse6.isOk(), $unsubscribeResponse6.error check: - unsubscribeResponse6.isOk() wakuFilter.subscriptions.len == 0 asyncTest "After refreshing a partial subscription with different PubSub Topics and Multiple (Same) Content Topics, All At Once": @@ -1546,10 +1653,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - # TODO: CHECK IF THIS MAKES SENSE - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1563,9 +1669,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", @["other-content-topic"] ) - require: - unsubscribeResponse1.isOk() - unsubscribeResponse2.isOk() + assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error + assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1579,9 +1685,9 @@ suite "Waku Filter - End to End": serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq ) - require: - refreshSubscriptionResponse1.isOk() - refreshSubscriptionResponse2.isOk() + assert refreshSubscriptionResponse1.isOk(), $refreshSubscriptionResponse1.error + assert refreshSubscriptionResponse2.isOk(), $refreshSubscriptionResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 @@ -1592,8 +1698,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse3.isOk(), $unsubscribeResponse3.error check: - unsubscribeResponse3.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 @@ -1604,8 +1710,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse4.isOk(), $unsubscribeResponse4.error check: - unsubscribeResponse4.isOk() wakuFilter.subscriptions.len == 0 asyncTest "Without existing subscription": @@ -1624,8 +1730,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, "pubsub-topic", contentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1644,8 +1750,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1664,8 +1770,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1685,8 +1791,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1696,8 +1802,8 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - unsubscribeResponse.isOk() wakuFilter.subscriptions.len == 0 asyncTest "Unsubscribe from All Topics, Multiple PubSub Topics": @@ -1708,9 +1814,9 @@ suite "Waku Filter - End to End": let subscribeResponse2 = await wakuFilterClient.subscribe( serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq ) - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() + assert subscribeResponse1.isOk(), $subscribeResponse1.error + assert subscribeResponse2.isOk(), $subscribeResponse2.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1720,13 +1826,13 @@ suite "Waku Filter - End to End": ) # Then the unsubscription is successful + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - unsubscribeResponse.isOk() wakuFilter.subscriptions.len == 0 asyncTest "Unsubscribe from All Topics from a non-subscribed Service": # Given the client is not subscribed to a service - require: + check: wakuFilter.subscriptions.len == 0 # When unsubscribing from all topics for that client @@ -1740,13 +1846,13 @@ suite "Waku Filter - End to End": unsubscribeResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND suite "Filter-Push": - asyncTest "Valid Payloads": + asyncTest "Valid Payload Types": # Given a valid subscription let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1772,7 +1878,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg1) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() check: @@ -1785,7 +1891,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg2) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() check: pushedMsgPubsubTopic2 == pubsubTopic @@ -1797,7 +1903,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg3) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() check: pushedMsgPubsubTopic3 == pubsubTopic @@ -1809,7 +1915,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg4) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic4, pushedMsg4) = pushHandlerFuture.read() check: pushedMsgPubsubTopic4 == pubsubTopic @@ -1821,7 +1927,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg5) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic5, pushedMsg5) = pushHandlerFuture.read() check: pushedMsgPubsubTopic5 == pubsubTopic @@ -1833,7 +1939,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg6) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic6, pushedMsg6) = pushHandlerFuture.read() check: pushedMsgPubsubTopic6 == pubsubTopic @@ -1845,7 +1951,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg7) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic7, pushedMsg7) = pushHandlerFuture.read() check: pushedMsgPubsubTopic7 == pubsubTopic @@ -1857,7 +1963,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg8) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic8, pushedMsg8) = pushHandlerFuture.read() check: pushedMsgPubsubTopic8 == pubsubTopic @@ -1869,7 +1975,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg9) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic9, pushedMsg9) = pushHandlerFuture.read() check: pushedMsgPubsubTopic9 == pubsubTopic @@ -1881,17 +1987,93 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg10) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic10, pushedMsg10) = pushHandlerFuture.read() check: pushedMsgPubsubTopic10 == pubsubTopic pushedMsg10 == msg10 msg10.payload.toString() == TEXT_LARGE + asyncTest "Valid Payload Sizes": + # Given a valid subscription + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + assert subscribeResponse.isOk(), $subscribeResponse.error + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # Given some valid payloads + let + msg1 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024)) # 1KiB + msg2 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(10*1024)) # 10KiB + msg3 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(100*1024)) # 100KiB + msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(3*1024*1024 + 1023*1024 + 968)) # 4MiB - 56B -> Max Size (Inclusive Limit) + msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(3*1024*1024 + 1023*1024 + 969)) # 4MiB - 55B -> Max Size (Exclusive Limit) + msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(3*1024*1024 + 1023*1024 + 970)) # 4MiB - 54B -> Out of Max Size + + # When sending the 1KiB message + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + + # When sending the 10KiB message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg2) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == pubsubTopic + pushedMsg2 == msg2 + + # When sending the 100KiB message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg3) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic3 == pubsubTopic + pushedMsg3 == msg3 + + # When sending the 4MiB - 56B message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg4) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic4, pushedMsg4) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic4 == pubsubTopic + pushedMsg4 == msg4 + + # When sending the 4MiB - 55B message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg5) + + # Then the message is not pushed to the client + check not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # When sending the 4MiB - 54B message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg6) + + # Then the message is not pushed to the client + check not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + suite "Security and Privacy": asyncTest "Filter Client can receive messages after Client and Server reboot": # Given a clean client and server - require: + check: wakuFilter.subscriptions.len == 0 # When subscribing to a topic @@ -1900,8 +2082,8 @@ suite "Waku Filter - End to End": ) # Then the subscription is successful + assert subscribeResponse.isOk(), $subscribeResponse.error check: - subscribeResponse.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1918,7 +2100,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg1) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() check: pushedMsgPubsubTopic == pubsubTopic @@ -1930,8 +2112,8 @@ suite "Waku Filter - End to End": ) # Then the refreshment is successful + assert refreshSubscriptionResponse.isOk(), $refreshSubscriptionResponse.error check: - refreshSubscriptionResponse.isOk() wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1941,7 +2123,7 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg2) # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() check: pushedMsgPubsubTopic2 == pubsubTopic @@ -1952,8 +2134,8 @@ suite "Waku Filter - End to End": let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() + assert subscribeResponse.isOk(), $subscribeResponse.error + check: wakuFilter.subscriptions.len == 1 wakuFilter.subscriptions.hasKey(clientPeerId) @@ -1970,164 +2152,8 @@ suite "Waku Filter - End to End": await wakuFilter.handleMessage(pubsubTopic, msg) # Then the client receives the message - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() check: pushedMsgPubsubTopic == pubsubTopic pushedMsg == msg - - suite "MessagePushHandler - Msg List": - var serverSwitch {.threadvar.}: Switch - var clientSwitch {.threadvar.}: Switch - var wakuFilter {.threadvar.}: WakuFilter - var wakuFilterClient {.threadvar.}: WakuFilterClient - var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo - var pubsubTopic {.threadvar.}: PubsubTopic - var contentTopic {.threadvar.}: ContentTopic - var contentTopicSeq {.threadvar.}: seq[ContentTopic] - var clientPeerId {.threadvar.}: PeerId - var msgList {.threadvar.}: seq[(PubsubTopic, WakuMessage)] - var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)] - - asyncSetup: - pushHandlerFuture = newPushHandlerFuture() - msgList = @[] - let messagePushHandler: FilterPushHandler = proc( - pubsubTopic: PubsubTopic, message: WakuMessage - ): Future[void] {.async, closure, gcsafe.} = - msgList.add((pubsubTopic, message)) - pushHandlerFuture.complete((pubsubTopic, message)) - - pubsubTopic = DefaultPubsubTopic - contentTopic = DefaultContentTopic - contentTopicSeq = @[DefaultContentTopic] - serverSwitch = newStandardSwitch() - clientSwitch = newStandardSwitch() - wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch) - wakuFilterClient.registerPushHandler(messagePushHandler) - - await allFutures(serverSwitch.start(), clientSwitch.start()) - serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() - clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId - - asyncTeardown: - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) - - suite "Subscribe": - asyncTest "Refreshing Subscription": - # Given a valid subscription - let subscribeResponse1 = await wakuFilterClient.subscribe( - serverRemotePeerInfo, pubsubTopic, contentTopicSeq - ) - require: - subscribeResponse1.isOk() - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq - - # When refreshing the subscription - let subscribeResponse2 = await wakuFilterClient.subscribe( - serverRemotePeerInfo, pubsubTopic, contentTopicSeq - ) - - # Then the subscription is successful - check: - subscribeResponse2.isOk() - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq - - # When sending a message to the refreshed subscription - let msg1 = fakeWakuMessage(contentTopic=contentTopic) - await wakuFilter.handleMessage(pubsubTopic, msg1) - - # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() - check: - pushedMsgPubsubTopic1 == pubsubTopic - pushedMsg1 == msg1 - - # And the message is not duplicated - check: - msgList.len == 1 - msgList[0][0] == pubsubTopic - msgList[0][1] == msg1 - - asyncTest "Overlapping Topic Subscription": - # Given a set of overlapping subscriptions - let - subscribeResponse1 = await wakuFilterClient.subscribe( - serverRemotePeerInfo, pubsubTopic, contentTopicSeq - ) - subscribeResponse2 = await wakuFilterClient.subscribe( - serverRemotePeerInfo, pubsubTopic, @["other-content-topic"] - ) - subscribeResponse3 = await wakuFilterClient.subscribe( - serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq - ) - require: - subscribeResponse1.isOk() - subscribeResponse2.isOk() - subscribeResponse3.isOk() - wakuFilter.subscriptions.hasKey(clientPeerId) - - # When sending a message to the overlapping subscription 1 - let msg1 = fakeWakuMessage(contentTopic=contentTopic) - await wakuFilter.handleMessage(pubsubTopic, msg1) - - # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() - check: - pushedMsgPubsubTopic1 == pubsubTopic - pushedMsg1 == msg1 - - # And the message is not duplicated - check: - msgList.len == 1 - msgList[0][0] == pubsubTopic - msgList[0][1] == msg1 - - # When sending a message to the overlapping subscription 2 - pushHandlerFuture = newPushHandlerFuture() # Clear previous future - require (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)) # Check there're no duplicate messages - pushHandlerFuture = newPushHandlerFuture() # Reset future due to timeout - - let msg2 = fakeWakuMessage(contentTopic="other-content-topic") - await wakuFilter.handleMessage(pubsubTopic, msg2) - - # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() - check: - pushedMsgPubsubTopic2 == pubsubTopic - pushedMsg2 == msg2 - - # And the message is not duplicated - check: - msgList.len == 2 - msgList[1][0] == pubsubTopic - msgList[1][1] == msg2 - - # When sending a message to the overlapping subscription 3 - pushHandlerFuture = newPushHandlerFuture() # Clear previous future - require (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)) # Check there're no duplicate messages - pushHandlerFuture = newPushHandlerFuture() # Reset future due to timeout - - let msg3 = fakeWakuMessage(contentTopic=contentTopic) - await wakuFilter.handleMessage("other-pubsub-topic", msg3) - - # Then the message is pushed to the client - require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() - check: - pushedMsgPubsubTopic3 == "other-pubsub-topic" - pushedMsg3 == msg3 - - # And the message is not duplicated - check: - msgList.len == 3 - msgList[2][0] == "other-pubsub-topic" - msgList[2][1] == msg3 diff --git a/tests/waku_filter_v2/waku_filter_utils.nim b/tests/waku_filter_v2/waku_filter_utils.nim index f578435cb..70d2368bc 100644 --- a/tests/waku_filter_v2/waku_filter_utils.nim +++ b/tests/waku_filter_v2/waku_filter_utils.nim @@ -1,19 +1,24 @@ import - std/[options,tables], - std/[sequtils,sets,strutils], - testutils/unittests, + std/[ + options, + tables, + sets + ], chronos, chronicles import - ../../../waku/node/peer_manager, - ../../../waku/waku_filter_v2, - ../../../waku/waku_filter_v2/client, - ../../../waku/waku_filter_v2/subscriptions, - ../../../waku/waku_filter_v2/rpc, - ../../../waku/waku_core, - ../testlib/common, - ../testlib/wakucore + ../../../waku/[ + node/peer_manager, + waku_filter_v2, + waku_filter_v2/client, + waku_core + ], + ../testlib/[ + common, + wakucore + ] + proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} = let diff --git a/tests/waku_relay/test_all.nim b/tests/waku_relay/test_all.nim index a615f0e4e..4957a7352 100644 --- a/tests/waku_relay/test_all.nim +++ b/tests/waku_relay/test_all.nim @@ -1,7 +1,6 @@ {.used.} import - ./test_waku_relay, - ./test_wakunode_relay, ./test_message_id, - ./test_protocol + ./test_protocol, + ./test_wakunode_relay diff --git a/tests/waku_relay/test_message_id.nim b/tests/waku_relay/test_message_id.nim index 7cf3d4402..2c2b69026 100644 --- a/tests/waku_relay/test_message_id.nim +++ b/tests/waku_relay/test_message_id.nim @@ -1,12 +1,14 @@ import unittest, - stew/shims/net, - stew/[results, byteutils] + stew/[ + shims/net, + results, + byteutils + ], + nimcrypto/sha2, + libp2p/protocols/pubsub/rpc/messages import - stew/results, - nimcrypto/sha2, - libp2p/protocols/pubsub/rpc/messages, ../../../waku/waku_relay/message_id, ../testlib/sequtils diff --git a/tests/waku_relay/test_protocol.nim b/tests/waku_relay/test_protocol.nim index 17c8d902c..3d226a947 100644 --- a/tests/waku_relay/test_protocol.nim +++ b/tests/waku_relay/test_protocol.nim @@ -40,11 +40,14 @@ suite "Waku Relay": var messageSeq {.threadvar.}: seq[(PubsubTopic, WakuMessage)] var handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)] var simpleFutureHandler {.threadvar.}: WakuRelayHandler + var switch {.threadvar.}: Switch var peerManager {.threadvar.}: PeerManager var node {.threadvar.}: WakuRelay + var remotePeerInfo {.threadvar.}: RemotePeerInfo var peerId {.threadvar.}: PeerId + var contentTopic {.threadvar.}: ContentTopic var pubsubTopic {.threadvar.}: PubsubTopic var pubsubTopicSeq {.threadvar.}: seq[PubsubTopic] @@ -85,7 +88,7 @@ suite "Waku Relay": # Then the message is not published check: - not await handlerFuture.withTimeout(3.seconds) + not await handlerFuture.withTimeout(FUTURE_TIMEOUT) asyncTest "Publish with Subscription (Network Size: 1)": # When subscribing to a Pubsub Topic @@ -100,7 +103,7 @@ suite "Waku Relay": discard await node.publish(pubsubTopic, wakuMessage) # Then the message is published - assert (await handlerFuture.withTimeout(3.seconds)) + assert (await handlerFuture.withTimeout(FUTURE_TIMEOUT)) let (topic, msg) = handlerFuture.read() check: topic == pubsubTopic @@ -138,8 +141,8 @@ suite "Waku Relay": # Then the message is published only in the subscribed node check: - not await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + not await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (otherTopic1, otherMessage1) = otherHandlerFuture.read() check: @@ -154,8 +157,8 @@ suite "Waku Relay": # Then the message is published only in the subscribed node check: - not await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + not await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (otherTopic2, otherMessage2) = otherHandlerFuture.read() check: @@ -198,8 +201,8 @@ suite "Waku Relay": # Then the message is published in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (topic1, message1) = handlerFuture.read() @@ -219,8 +222,8 @@ suite "Waku Relay": # Then the message is published in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (topic2, message2) = handlerFuture.read() @@ -260,7 +263,7 @@ suite "Waku Relay": discard await node.publish(pubsubTopic, wakuMessage) # Then the message is published - check (await handlerFuture.withTimeout(3.seconds)) + check (await handlerFuture.withTimeout(FUTURE_TIMEOUT)) let (topic, msg) = handlerFuture.read() check: topic == pubsubTopic @@ -315,14 +318,14 @@ suite "Waku Relay": # Then the validator is ran in the other node, and fails # Not run in the self node check: - await validatorFuture.withTimeout(3.seconds) + await validatorFuture.withTimeout(FUTURE_TIMEOUT) validatorFuture.read() == false # And the message is published in the self node, but not in the other node, # because it doesn't pass the validator check. check: - await handlerFuture.withTimeout(3.seconds) - not await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (topic1, msg1) = handlerFuture.read() # let (otherTopic1, otherMsg1) = otherHandlerFuture.read() check: @@ -341,13 +344,13 @@ suite "Waku Relay": # Then the validator is ran in the other node, and succeeds # Not run in the self node check: - await validatorFuture.withTimeout(3.seconds) + await validatorFuture.withTimeout(FUTURE_TIMEOUT) validatorFuture.read() == true # And the message is published in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (topic2, msg2) = handlerFuture.read() let (otherTopic2, otherMsg2) = otherHandlerFuture.read() check: @@ -403,8 +406,8 @@ suite "Waku Relay": # Then the message is published in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) let (topic1, msg1) = handlerFuture.read() let (otherTopic1, otherMsg1) = otherHandlerFuture.read() check: @@ -497,12 +500,12 @@ suite "Waku Relay": # Then the messages are published in all nodes (because it's published in the center node) # Center meaning that all other nodes are connected to this one check: - await handlerFuture.withTimeout(3.seconds) - await handlerFuture2.withTimeout(3.seconds) - await otherHandlerFuture1.withTimeout(3.seconds) - await otherHandlerFuture2.withTimeout(3.seconds) - await anotherHandlerFuture1.withTimeout(3.seconds) - await anotherHandlerFuture2.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await handlerFuture2.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) + await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) let (topic1, msg1) = handlerFuture.read() @@ -553,12 +556,12 @@ suite "Waku Relay": # Then the message is published in node and otherNode, # but not in anotherNode because it is not connected anymore check: - await handlerFuture.withTimeout(3.seconds) - await handlerFuture2.withTimeout(3.seconds) - await otherHandlerFuture1.withTimeout(3.seconds) - await otherHandlerFuture2.withTimeout(3.seconds) - not await anotherHandlerFuture1.withTimeout(3.seconds) - not await anotherHandlerFuture2.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await handlerFuture2.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) + not await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + not await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) let (topic3, msg3) = handlerFuture.read() @@ -596,12 +599,12 @@ suite "Waku Relay": # Then the messages are only published in anotherNode because it's disconnected from # the rest of the network check: - not await handlerFuture.withTimeout(3.seconds) - not await handlerFuture2.withTimeout(3.seconds) - not await otherHandlerFuture1.withTimeout(3.seconds) - not await otherHandlerFuture2.withTimeout(3.seconds) - await anotherHandlerFuture1.withTimeout(3.seconds) - await anotherHandlerFuture2.withTimeout(3.seconds) + not await handlerFuture.withTimeout(FUTURE_TIMEOUT) + not await handlerFuture2.withTimeout(FUTURE_TIMEOUT) + not await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + not await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) + await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) let (anotherTopic3, anotherMsg3) = anotherHandlerFuture1.read() @@ -633,12 +636,12 @@ suite "Waku Relay": # Then the messages are only published in otherNode and node, but not in anotherNode # because it's disconnected from the rest of the network check: - await handlerFuture.withTimeout(3.seconds) - await handlerFuture2.withTimeout(3.seconds) - await otherHandlerFuture1.withTimeout(3.seconds) - await otherHandlerFuture2.withTimeout(3.seconds) - not await anotherHandlerFuture1.withTimeout(3.seconds) - not await anotherHandlerFuture2.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await handlerFuture2.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) + not await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + not await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) let (topic5, msg5) = handlerFuture.read() @@ -683,12 +686,12 @@ suite "Waku Relay": # even if they're connected like so AnotherNode <-> OtherNode <-> Node, # otherNode doesn't broadcast B topic messages because it's not subscribed to it check: - await handlerFuture.withTimeout(3.seconds) - not await handlerFuture2.withTimeout(3.seconds) - await otherHandlerFuture1.withTimeout(3.seconds) - await otherHandlerFuture2.withTimeout(3.seconds) - await anotherHandlerFuture1.withTimeout(3.seconds) - await anotherHandlerFuture2.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + not await handlerFuture2.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) + await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT) + await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT) let (topic7, msg7) = handlerFuture.read() @@ -860,8 +863,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg1) == handlerFuture.read() (pubsubTopic, msg1) == otherHandlerFuture.read() @@ -872,8 +875,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg2) == handlerFuture.read() (pubsubTopic, msg2) == otherHandlerFuture.read() @@ -884,8 +887,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg3) == handlerFuture.read() (pubsubTopic, msg3) == otherHandlerFuture.read() @@ -896,8 +899,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg4) == handlerFuture.read() (pubsubTopic, msg4) == otherHandlerFuture.read() @@ -908,8 +911,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg5) == handlerFuture.read() (pubsubTopic, msg5) == otherHandlerFuture.read() @@ -920,8 +923,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg6) == handlerFuture.read() (pubsubTopic, msg6) == otherHandlerFuture.read() @@ -932,8 +935,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg7) == handlerFuture.read() (pubsubTopic, msg7) == otherHandlerFuture.read() @@ -944,8 +947,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg8) == handlerFuture.read() (pubsubTopic, msg8) == otherHandlerFuture.read() @@ -956,8 +959,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg9) == handlerFuture.read() (pubsubTopic, msg9) == otherHandlerFuture.read() @@ -968,8 +971,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg10) == handlerFuture.read() (pubsubTopic, msg10) == otherHandlerFuture.read() @@ -1004,10 +1007,10 @@ suite "Waku Relay": msg1 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024)) # 1KiB msg2 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(10*1024)) # 10KiB msg3 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(100*1024)) # 100KiB - msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1023*1024)) # 1MiB - 1B -> Max Size (Inclusive Limit) - msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024)) # 1MiB -> Max Size (Exclusive Limit) - msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1025*1024)) # 1MiB + 1B -> Out of Max Size - + msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024 - 1)) # 1MiB - 1B -> Max Size (Inclusive Limit) + msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024)) # 1MiB -> Max Size (Exclusive Limit) + msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024 + 1)) # 1MiB + 1B -> Out of Max Size + # When sending the 1KiB message handlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() @@ -1015,11 +1018,11 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg1) == handlerFuture.read() (pubsubTopic, msg1) == otherHandlerFuture.read() - + # When sending the 10KiB message handlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() @@ -1027,23 +1030,23 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg2) == handlerFuture.read() (pubsubTopic, msg2) == otherHandlerFuture.read() - + # When sending the 100KiB message handlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() discard await node.publish(pubsubTopic, msg3) - + # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg3) == handlerFuture.read() (pubsubTopic, msg3) == otherHandlerFuture.read() - + # When sending the 1023KiB message handlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() @@ -1051,8 +1054,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg4) == handlerFuture.read() (pubsubTopic, msg4) == otherHandlerFuture.read() @@ -1063,8 +1066,8 @@ suite "Waku Relay": # Then the message is received in self, because there's no checking, but not in other node check: - await handlerFuture.withTimeout(3.seconds) - not await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg5) == handlerFuture.read() # When sending the 1025KiB message @@ -1074,8 +1077,8 @@ suite "Waku Relay": # Then the message is received in self, because there's no checking, but not in other node check: - await handlerFuture.withTimeout(3.seconds) - not await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg6) == handlerFuture.read() # Finally stop the other node @@ -1120,31 +1123,31 @@ suite "Waku Relay": msg4 = fakeWakuMessage("msg4", pubsubTopic) discard await node.publish(pubsubTopic, msg1) - check await thisHandlerFuture.withTimeout(3.seconds) - check await otherHandlerFuture.withTimeout(3.seconds) + check await thisHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) thisHandlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() discard await node.publish(pubsubTopic, msg2) - check await thisHandlerFuture.withTimeout(3.seconds) - check await otherHandlerFuture.withTimeout(3.seconds) + check await thisHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) thisHandlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() discard await node.publish(pubsubTopic, msg3) - check await thisHandlerFuture.withTimeout(3.seconds) - check await otherHandlerFuture.withTimeout(3.seconds) + check await thisHandlerFuture.withTimeout(FUTURE_TIMEOUT) + check await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) thisHandlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() discard await node.publish(pubsubTopic, msg4) check: - await thisHandlerFuture.withTimeout(3.seconds) + await thisHandlerFuture.withTimeout(FUTURE_TIMEOUT) thisMessageSeq == @[ (pubsubTopic, msg1), (pubsubTopic, msg2), (pubsubTopic, msg3), (pubsubTopic, msg4) ] - await otherHandlerFuture.withTimeout(3.seconds) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) otherMessageSeq == @[ (pubsubTopic, msg1), (pubsubTopic, msg2), @@ -1194,8 +1197,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg1) == handlerFuture.read() (pubsubTopic, msg1) == otherHandlerFuture.read() @@ -1207,8 +1210,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg2) == handlerFuture.read() (pubsubTopic, msg2) == otherHandlerFuture.read() @@ -1224,8 +1227,8 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg3) == handlerFuture.read() (pubsubTopic, msg3) == otherHandlerFuture.read() @@ -1237,15 +1240,15 @@ suite "Waku Relay": # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg4) == handlerFuture.read() (pubsubTopic, msg4) == otherHandlerFuture.read() # Finally stop the other node await allFutures(otherSwitch.stop(), otherNode.stop()) - xasyncTest "Relay can receive messages after subscribing and stopping without unsubscribing": + asyncTest "Relay can't receive messages after subscribing and stopping without unsubscribing": # Given a second node connected to the first one let otherSwitch = newTestSwitch() @@ -1272,7 +1275,6 @@ suite "Waku Relay": await sleepAsync(500.millis) - # FIXME: Inconsistent behaviour with Filter protocol. # Given other node is stopped without unsubscribing await allFutures(otherSwitch.stop(), otherNode.stop()) @@ -1280,22 +1282,20 @@ suite "Waku Relay": let msg1 = fakeWakuMessage(testMessage, pubsubTopic) discard await node.publish(pubsubTopic, msg1) - # Then the message is received in both nodes + # Then the message is not received in any node check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) + await handlerFuture.withTimeout(FUTURE_TIMEOUT) + not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg1) == handlerFuture.read() - (pubsubTopic, msg1) == otherHandlerFuture.read() - + # When sending a message from other node handlerFuture = newPushHandlerFuture() otherHandlerFuture = newPushHandlerFuture() let msg2 = fakeWakuMessage(testMessage, pubsubTopic) discard await otherNode.publish(pubsubTopic, msg2) - + # Then the message is received in both nodes check: - await handlerFuture.withTimeout(3.seconds) - await otherHandlerFuture.withTimeout(3.seconds) - (pubsubTopic, msg2) == handlerFuture.read() + not await handlerFuture.withTimeout(FUTURE_TIMEOUT) + await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT) (pubsubTopic, msg2) == otherHandlerFuture.read() diff --git a/tests/waku_relay/test_waku_relay.nim b/tests/waku_relay/test_waku_relay.nim deleted file mode 100644 index 53e3ec803..000000000 --- a/tests/waku_relay/test_waku_relay.nim +++ /dev/null @@ -1,211 +0,0 @@ -{.used.} - -import - std/[options, sequtils, strutils], - stew/shims/net as stewNet, - testutils/unittests, - chronicles, - chronos, - libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/rpc/messages - -import - ../../../waku/node/peer_manager, - ../../../waku/waku_core, - ../../../waku/waku_relay, - ../testlib/common, - ../testlib/wakucore, - ./utils - - -suite "Waku Relay": - - asyncTest "subscribe and add handler to topics": - ## Setup - let nodeA = await newTestWakuRelay() - - ## Given - let - networkA = "test-network1" - networkB = "test-network2" - - ## when - discard nodeA.subscribe(networkA, noopRawHandler()) - discard nodeA.subscribe(networkB, noopRawHandler()) - - ## Then - check: - nodeA.isSubscribed(networkA) - nodeA.isSubscribed(networkB) - - let subscribedTopics = toSeq(nodeA.subscribedTopics) - check: - subscribedTopics.len == 2 - subscribedTopics.contains(networkA) - subscribedTopics.contains(networkB) - - ## Cleanup - await nodeA.stop() - - asyncTest "unsubscribe all handlers from topic": - ## Setup - let nodeA = await newTestWakuRelay() - - ## Given - let - networkA = "test-network1" - networkB = "test-network2" - networkC = "test-network3" - - discard nodeA.subscribe(networkA, noopRawHandler()) - discard nodeA.subscribe(networkB, noopRawHandler()) - discard nodeA.subscribe(networkC, noopRawHandler()) - - let topics = toSeq(nodeA.subscribedTopics) - require: - topics.len == 3 - topics.contains(networkA) - topics.contains(networkB) - topics.contains(networkC) - - ## When - nodeA.unsubscribeAll(networkA) - - ## Then - check: - nodeA.isSubscribed(networkB) - nodeA.isSubscribed(networkC) - not nodeA.isSubscribed(networkA) - - let subscribedTopics = toSeq(nodeA.subscribedTopics) - check: - subscribedTopics.len == 2 - subscribedTopics.contains(networkB) - subscribedTopics.contains(networkC) - not subscribedTopics.contains(networkA) - - ## Cleanup - await nodeA.stop() - - asyncTest "publish a message into a topic": - ## Setup - let - srcSwitch = newTestSwitch() - srcPeerManager = PeerManager.new(srcSwitch) - srcNode = await newTestWakuRelay(srcSwitch) - dstSwitch = newTestSwitch() - dstPeerManager = PeerManager.new(dstSwitch) - dstNode = await newTestWakuRelay(dstSwitch) - - await allFutures(srcSwitch.start(), dstSwitch.start()) - - let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo() - let connOk = await srcPeerManager.connectRelay(dstPeerInfo) - require: - connOk == true - - ## Given - let networkTopic = "test-network1" - let message = fakeWakuMessage() - - # Self subscription (triggerSelf = true) - let srcSubsFut = newFuture[(PubsubTopic, WakuMessage)]() - proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = - srcSubsFut.complete((topic, message)) - - discard srcNode.subscribe(networkTopic, srcSubsHandler) - - # Subscription - let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]() - proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = - dstSubsFut.complete((topic, message)) - - discard dstNode.subscribe(networkTopic, dstSubsHandler) - - await sleepAsync(500.millis) - - ## When - discard await srcNode.publish(networkTopic, message) - - ## Then - require: - await srcSubsFut.withTimeout(5.seconds) - await dstSubsFut.withTimeout(5.seconds) - - let (srcTopic, srcMsg) = srcSubsFut.read() - check: - srcTopic == networkTopic - srcMsg == message - - let (dstTopic, dstMsg) = dstSubsFut.read() - check: - dstTopic == networkTopic - dstMsg == message - - ## Cleanup - await allFutures(srcSwitch.stop(), dstSwitch.stop()) - - asyncTest "content topic validator as a message subscription filter": - ## Setup - let - srcSwitch = newTestSwitch() - srcPeerManager = PeerManager.new(srcSwitch) - srcNode = await newTestWakuRelay(srcSwitch) - dstSwitch = newTestSwitch() - dstPeerManager = PeerManager.new(dstSwitch) - dstNode = await newTestWakuRelay(dstSwitch) - - await allFutures(srcSwitch.start(), dstSwitch.start()) - - let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo() - let connOk = await srcPeerManager.connectRelay(dstPeerInfo) - require: - connOk == true - - ## Given - let networkTopic = "test-network1" - let contentTopic = "test-content1" - - let message = fakeWakuMessage(contentTopic=contentTopic) - let messages = @[ - fakeWakuMessage(contentTopic="any"), - fakeWakuMessage(contentTopic="any"), - fakeWakuMessage(contentTopic="any"), - message, - fakeWakuMessage(contentTopic="any"), - ] - - # Subscription - let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]() - proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = - dstSubsFut.complete((topic, message)) - - discard dstNode.subscribe(networkTopic, dstSubsHandler) - - await sleepAsync(500.millis) - - # Validator - proc validator(topic: PubsubTopic, msg: WakuMessage): Future[ValidationResult] {.async.} = - # only relay messages with contentTopic1 - if msg.contentTopic != contentTopic: - return ValidationResult.Reject - - return ValidationResult.Accept - - dstNode.addValidator(networkTopic, validator) - - ## When - for msg in messages: - discard await srcNode.publish(networkTopic, msg) - - ## Then - require: - await dstSubsFut.withTimeout(5.seconds) - - let (dstTopic, dstMsg) = dstSubsFut.read() - check: - dstTopic == networkTopic - dstMsg == message - - ## Cleanup - await allFutures(srcSwitch.stop(), dstSwitch.stop())