From 1bba18349225c1499695171f2611a08c57950dca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Thu, 12 Oct 2023 20:59:21 +0200 Subject: [PATCH] test(waku-filter): Subscribe tests batch (1/4) (#2034) * Implement waku filter client subscribe tests. * Remove legacy filter tests. * Fix constant for max criteria per subscription limit. --- tests/all_tests_waku.nim | 1 - tests/node/test_wakunode_filter.nim | 109 ++ tests/testlib/futures.nim | 8 + tests/waku_filter_v2/test_waku_client.nim | 951 ++++++++++++++++-- tests/waku_filter_v2/test_waku_filter.nim | 370 ------- ...client_utils.nim => waku_filter_utils.nim} | 11 + waku/waku_filter_v2/protocol.nim | 2 +- 7 files changed, 1024 insertions(+), 428 deletions(-) create mode 100644 tests/node/test_wakunode_filter.nim create mode 100644 tests/testlib/futures.nim delete mode 100644 tests/waku_filter_v2/test_waku_filter.nim rename tests/waku_filter_v2/{client_utils.nim => waku_filter_utils.nim} (64%) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index c527d7f63..7bd5ca0df 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -48,7 +48,6 @@ import # Waku filter test suite import ./waku_filter_v2/test_waku_client, - ./waku_filter_v2/test_waku_filter, ./waku_filter_v2/test_waku_filter_protocol import diff --git a/tests/node/test_wakunode_filter.nim b/tests/node/test_wakunode_filter.nim new file mode 100644 index 000000000..0b918ac0d --- /dev/null +++ b/tests/node/test_wakunode_filter.nim @@ -0,0 +1,109 @@ +{.used.} + +import + std/[options, tables, sequtils], + stew/shims/net as stewNet, + testutils/unittests, + chronos, + chronicles, + os, + libp2p/peerstore, + libp2p/crypto/crypto + +import + ../../../waku/waku_core, + ../../../waku/node/peer_manager, + ../../../waku/node/waku_node, + ../../../waku/waku_filter_v2, + ../../../waku/waku_filter_v2/client, + ../../../waku/waku_filter_v2/subscriptions, + ../testlib/common, + ../testlib/wakucore, + ../testlib/wakunode, + ../testlib/testasync, + ../testlib/futures + +let FUTURE_TIMEOUT = 1.seconds + +suite "Full Node - Waku Filter - End to End": + var client {.threadvar.}: WakuNode + var clientPeerId {.threadvar.}: PeerId + var server {.threadvar.}: WakuNode + var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + var pubsubTopic {.threadvar.}: PubsubTopicy + var contentTopic {.threadvar.}: ContentTopic + var contentTopicSeq {.threadvar.}: seq[ContentTopic] + var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)] + var messagePushHandler {.threadvar.}: FilterPushHandler + + asyncSetup: + pushHandlerFuture = newFuture[(string, WakuMessage)]() + messagePushHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, closure, gcsafe.} = + pushHandlerFuture.complete((pubsubTopic, message)) + + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + contentTopicSeq = @[DefaultContentTopic] + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + waitFor allFutures(server.start(), client.start()) + + waitFor server.mountFilter() + waitFor client.mountFilterClient() + + client.wakuFilterClient.registerPushHandler(messagePushHandler) + serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + clientPeerId = client.peerInfo.toRemotePeerInfo().peerId + + asyncTeardown: + waitFor allFutures(client.stop(), server.stop()) + + asyncTest "Full Client Node to Full Service Node Subscription": + # When a full client node subscribes to a full service node + let subscribeResponse = await client.filterSubscribe( + some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo + ) + + # Then the subscription is successful + check: + subscribeResponse.isOk() + server.wakuFilter.subscriptions.len == 1 + server.wakuFilter.subscriptions.hasKey(clientPeerId) + + # When sending a message to the subscribed content topic + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await server.filterHandleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + + # When unsubscribing from the subscription + let unsubscribeResponse = await client.filterUnsubscribe( + some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + server.wakuFilter.subscriptions.len == 0 + + # When sending a message to the previously subscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg2 = fakeWakuMessage(contentTopic=contentTopic) + await server.filterHandleMessage(pubsubTopic, msg2) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) diff --git a/tests/testlib/futures.nim b/tests/testlib/futures.nim new file mode 100644 index 000000000..8981516e4 --- /dev/null +++ b/tests/testlib/futures.nim @@ -0,0 +1,8 @@ +import + chronicles, + chronos + +import ../../../waku/waku_core/message + +proc newPushHandlerFuture*(): Future[(string, WakuMessage)] = + newFuture[(string, WakuMessage)]() diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index f4cae3ed8..0ef88235d 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -1,95 +1,934 @@ {.used.} import - std/[options,tables], + std/[options, tables, sequtils], testutils/unittests, - chronos + chronos, + chronicles, + os, + libp2p/peerstore import - ../../waku/node/peer_manager, - ../../waku/waku_filter_v2, - ../../waku/waku_filter_v2/client, - ../../waku/waku_core, + ../../../waku/node/peer_manager, + ../../../waku/waku_filter_v2, + ../../../waku/waku_filter_v2/client, + ../../../waku/waku_filter_v2/subscriptions, + ../../../waku/waku_core, + ../testlib/common, ../testlib/wakucore, ../testlib/testasync, - ./client_utils.nim + ../testlib/testutils, + ../testlib/futures, + ./waku_filter_utils.nim -suite "Waku Filter": - suite "Subscriber Ping": +let FUTURE_TIMEOUT = 1.seconds + +suite "Waku Filter - End to End": + suite "MessagePushHandler - Void": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch var wakuFilter {.threadvar.}: WakuFilter var wakuFilterClient {.threadvar.}: WakuFilterClient var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo var pubsubTopic {.threadvar.}: PubsubTopic - var contentTopics {.threadvar.}: seq[ContentTopic] + var contentTopic {.threadvar.}: ContentTopic + var contentTopicSeq {.threadvar.}: seq[ContentTopic] + var clientPeerId {.threadvar.}: PeerId + var messagePushHandler {.threadvar.}: FilterPushHandler + var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)] asyncSetup: + pushHandlerFuture = newPushHandlerFuture() + messagePushHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ) {.async, closure, gcsafe.} = + pushHandlerFuture.complete((pubsubTopic, message)) pubsubTopic = DefaultPubsubTopic - contentTopics = @[DefaultContentTopic] + contentTopic = DefaultContentTopic + contentTopicSeq = @[DefaultContentTopic] serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) wakuFilterClient = await newTestWakuFilterClient(clientSwitch) await allFutures(serverSwitch.start(), clientSwitch.start()) + wakuFilterClient.registerPushHandler(messagePushHandler) serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() - + clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId + asyncTeardown: await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) - asyncTest "Active Subscription Identification": - # Given - let - clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId - subscribeResponse = await wakuFilterClient.subscribe( - serverRemotePeerInfo, pubsubTopic, contentTopics + suite "Subscriber Ping": + asyncTest "Active Subscription Identification": + # Given + let + subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + assert subscribeResponse.isOk(), $subscribeResponse.error + require: + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When + let subscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) + + # Then + check: + subscribedPingResponse.isOk() + wakuFilter.subscriptions.hasKey(clientPeerId) + + asyncTest "No Active Subscription Identification": + # When + let unsubscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) + + # Then + check: + unsubscribedPingResponse.isErr() # Not subscribed + unsubscribedPingResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND + + asyncTest "After Unsubscription": + # Given + let + subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() - wakuFilter.subscriptions.hasKey(clientPeerId) + assert unsubscribeResponse.isOk(), $unsubscribeResponse.error + require: + unsubscribeResponse.isOk() + not wakuFilter.subscriptions.hasKey(clientPeerId) - # When - let subscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) + let unsubscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) - # Then - check: - subscribedPingResponse.isOk() - wakuFilter.subscriptions.hasKey(clientSwitch.peerInfo.toRemotePeerInfo().peerId) + # Then + check: + unsubscribedPingResponse.isErr() # Not subscribed + unsubscribedPingResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND - asyncTest "No Active Subscription Identification": - # When - let unsubscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) + suite "Subscribe": + asyncTest "Server remote peer info doesn't match an online server": + # Given an offline service node + let offlineServerSwitch = newStandardSwitch() + let offlineWakuFilter = await newTestWakuFilter(offlineServerSwitch) + let offlineServerRemotePeerInfo = offlineServerSwitch.peerInfo.toRemotePeerInfo() - # Then - check: - unsubscribedPingResponse.isErr() # Not subscribed - unsubscribedPingResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND - - asyncTest "After Unsubscription": - # Given - let - clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId - subscribeResponse = await wakuFilterClient.subscribe( - serverRemotePeerInfo, pubsubTopic, contentTopics + # When subscribing to the offline service node + let subscribeResponse = await wakuFilterClient.subscribe( + offlineServerRemotePeerInfo, pubsubTopic, contentTopicSeq ) - require: - subscribeResponse.isOk() - wakuFilter.subscriptions.hasKey(clientPeerId) + # Then the subscription is not successful + check: + subscribeResponse.isErr() # Not subscribed + subscribeResponse.error().kind == FilterSubscribeErrorKind.PEER_DIAL_FAILURE - # When - let unsubscribeResponse = await wakuFilterClient.unsubscribe( - serverRemotePeerInfo, pubsubTopic, contentTopics - ) - require: - unsubscribeResponse.isOk() - not wakuFilter.subscriptions.hasKey(clientPeerId) + asyncTest "PubSub Topic with Single Content Topic": + # Given + let nonExistentContentTopic = "non-existent-content-topic" - let unsubscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) + # When subscribing to a content topic + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) - # Then - check: - unsubscribedPingResponse.isErr() # Not subscribed - unsubscribedPingResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND + # Then the subscription is successful + check: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + + # When sending a message to the subscribed content topic + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == pubsubTopic + pushedMsg == msg1 + + # When sending a message to a non-subscribed content topic (before unsubscription) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg2 = fakeWakuMessage(contentTopic=nonExistentContentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg2) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # Given a valid unsubscription to an existing subscription + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + require: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + # When sending a message to the previously unsubscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg3 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg3) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # When sending a message to a non-subscribed content topic (after unsubscription) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg4 = fakeWakuMessage(contentTopic=nonExistentContentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg4) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + asyncTest "PubSub Topic with Multiple Content Topics": + # Given + let nonExistentContentTopic = "non-existent-content-topic" + let otherContentTopic = "other-content-topic" + let contentTopicsSeq = @[contentTopic, otherContentTopic] + + # Given a valid subscription to multiple content topics + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicsSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq + + # When sending a message to the one of the subscribed content topics + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + + # When sending a message to the other subscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg2 = fakeWakuMessage(contentTopic=otherContentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg2) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == pubsubTopic + pushedMsg2 == msg2 + + # When sending a message to a non-subscribed content topic (before unsubscription) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg3 = fakeWakuMessage(contentTopic=nonExistentContentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg3) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # Given a valid unsubscription to an existing subscription + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicsSeq + ) + require: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + # When sending a message to the previously unsubscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg4 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg4) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # When sending a message to the other previously unsubscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg5 = fakeWakuMessage(contentTopic=otherContentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg5) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # When sending a message to a non-subscribed content topic (after unsubscription) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg6 = fakeWakuMessage(contentTopic=nonExistentContentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg6) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + asyncTest "Different PubSub Topics with Different Content Topics, Unsubscribe One By One": + # Given + let otherPubsubTopic = "other-pubsub-topic" + let otherContentTopic = "other-content-topic" + let otherContentTopicSeq = @[otherContentTopic] + + # When subscribing to a pubsub topic + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the subscription is successful + check: + subscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + + # When subscribing to a different pubsub topic + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, otherPubsubTopic, otherContentTopicSeq + ) + + # Then the subscription is successful + check: + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq & otherContentTopicSeq + + # When sending a message to one of the subscribed content topics + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + + # When sending a message to the other subscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg2 = fakeWakuMessage(contentTopic=otherContentTopic) + await wakuFilter.handleMessage(otherPubsubTopic, msg2) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == otherPubsubTopic + pushedMsg2 == msg2 + + # When sending a message to a non-subscribed content topic (before unsubscription) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg3 = fakeWakuMessage(contentTopic="non-existent-content-topic") + await wakuFilter.handleMessage(pubsubTopic, msg3) + + # Then + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == otherContentTopicSeq + + # When sending a message to the previously subscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg4 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg4) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # When sending a message to the still subscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg5 = fakeWakuMessage(contentTopic=otherContentTopic) + await wakuFilter.handleMessage(otherPubsubTopic, msg5) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic3 == otherPubsubTopic + pushedMsg3 == msg5 + + # When unsubscribing from the other subscription + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, otherPubsubTopic, otherContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 0 + + # When sending a message to the previously unsubscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg6 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg6) + + # Then the message is not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + asyncTest "Different PubSub Topics with Different Content Topics, Unsubscribe All": + # Given + let otherPubsubTopic = "other-pubsub-topic" + let otherContentTopic = "other-content-topic" + let otherContentTopicSeq = @[otherContentTopic] + + # When subscribing to a content topic + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then + check: + subscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + + # When subscribing to a different content topic + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, otherPubsubTopic, otherContentTopicSeq + ) + + # Then + check: + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq & otherContentTopicSeq + + # When sending a message to one of the subscribed content topics + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + + # When sending a message to the other subscribed content topic + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg2 = fakeWakuMessage(contentTopic=otherContentTopic) + await wakuFilter.handleMessage(otherPubsubTopic, msg2) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == otherPubsubTopic + pushedMsg2 == msg2 + + # When sending a message to a non-subscribed content topic (before unsubscription) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg3 = fakeWakuMessage(contentTopic="non-existent-content-topic") + await wakuFilter.handleMessage(pubsubTopic, msg3) + + # Then + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse = await wakuFilterClient.unsubscribeAll(serverRemotePeerInfo) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + # When sending a message the previously subscribed content topics + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg4 = fakeWakuMessage(contentTopic=contentTopic) + let msg5 = fakeWakuMessage(contentTopic=otherContentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg4) + await wakuFilter.handleMessage(otherPubsubTopic, msg5) + + # Then the messages are not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + asyncTest "Different PubSub Topics with Same Content Topics, Unsubscribe Selectively": + # Given + let otherPubsubTopic = "other-pubsub-topic" + let otherContentTopic1 = "other-content-topic1" + let otherContentTopic2 = "other-content-topic2" + let contentTopicsSeq1 = @[contentTopic, otherContentTopic1] + let contentTopicsSeq2 = @[contentTopic, otherContentTopic2] + + # When subscribing to a pubsub topic + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicsSeq1 + ) + + # Then the subscription is successful + check: + subscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq1 + + # When subscribing to a different pubsub topic + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, otherPubsubTopic, contentTopicsSeq2 + ) + + # Then the subscription is successful + check: + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq1 & contentTopicsSeq2 + + # When sending a message to (pubsubTopic, contentTopic) + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + + # When sending a message to (pubsubTopic, otherContentTopic1) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg2 = fakeWakuMessage(contentTopic=otherContentTopic1) + await wakuFilter.handleMessage(pubsubTopic, msg2) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == pubsubTopic + pushedMsg2 == msg2 + + # When sending a message to (otherPubsubTopic, contentTopic) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg3 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(otherPubsubTopic, msg3) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic3 == otherPubsubTopic + pushedMsg3 == msg3 + + # When sending a message to (otherPubsubTopic, otherContentTopic2) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg4 = fakeWakuMessage(contentTopic=otherContentTopic2) + await wakuFilter.handleMessage(otherPubsubTopic, msg4) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic4, pushedMsg4) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic4 == otherPubsubTopic + pushedMsg4 == msg4 + + # When selectively unsubscribing from (pubsubTopic, otherContentTopic1) and (otherPubsubTopic, contentTopic) + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[otherContentTopic1] + ) + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, otherPubsubTopic, @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == @[contentTopic, otherContentTopic2] + + # When sending a message to (pubsubTopic, contentTopic) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg5 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg5) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic5, pushedMsg5) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic5 == pubsubTopic + pushedMsg5 == msg5 + + # When sending a message to (otherPubsubTopic, otherContentTopic2) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg6 = fakeWakuMessage(contentTopic=otherContentTopic2) + await wakuFilter.handleMessage(otherPubsubTopic, msg6) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic6, pushedMsg6) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic6 == otherPubsubTopic + pushedMsg6 == msg6 + + # When sending a message to (pubsubTopic, otherContentTopic1) and (otherPubsubTopic, contentTopic) + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg7 = fakeWakuMessage(contentTopic=otherContentTopic1) + await wakuFilter.handleMessage(pubsubTopic, msg7) + let msg8 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(otherPubsubTopic, msg8) + + # Then the messages are not pushed to the client + check: + not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + asyncTest "Max Topic Size": + # Given a topic list of 30 topics + var topicSeq: seq[string] = toSeq(0.. 0: + let takeNumber = min(topicSeq.len, MaxContentTopicsPerRequest) + let topicSeqBatch = topicSeq[0..= MaxCriteriaPerSubscription: + if peerSubscription.len() + filterCriteria.len() > MaxCriteriaPerSubscription: return err(FilterSubscribeError.serviceUnavailable("peer has reached maximum number of filter criteria")) peerSubscription.incl(filterCriteria)