diff --git a/tests/node/test_wakunode_filter.nim b/tests/node/test_wakunode_filter.nim index 0b918ac0d..5890ab208 100644 --- a/tests/node/test_wakunode_filter.nim +++ b/tests/node/test_wakunode_filter.nim @@ -25,12 +25,12 @@ import let FUTURE_TIMEOUT = 1.seconds -suite "Full Node - Waku Filter - End to End": +suite "Waku Filter - End to End": var client {.threadvar.}: WakuNode var clientPeerId {.threadvar.}: PeerId var server {.threadvar.}: WakuNode var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo - var pubsubTopic {.threadvar.}: PubsubTopicy + var pubsubTopic {.threadvar.}: PubsubTopic var contentTopic {.threadvar.}: ContentTopic var contentTopicSeq {.threadvar.}: seq[ContentTopic] var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)] @@ -66,8 +66,8 @@ suite "Full Node - Waku Filter - End to End": asyncTeardown: waitFor allFutures(client.stop(), server.stop()) - asyncTest "Full Client Node to Full Service Node Subscription": - # When a full client node subscribes to a full service node + asyncTest "Client Node receives Push from Server Node, via Filter": + # When a client node subscribes to a filter node let subscribeResponse = await client.filterSubscribe( some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo ) @@ -107,3 +107,105 @@ suite "Full Node - Waku Filter - End to End": # Then the message is not pushed to the client check: not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + + asyncTest "Client Node can't receive Push from Server Node, via Relay": + # Given the server node has Relay enabled + await server.mountRelay() + + # And valid filter subscription + let subscribeResponse = await client.filterSubscribe( + some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo + ) + require: + subscribeResponse.isOk() + server.wakuFilter.subscriptions.len == 1 + + # When a server node gets a Relay message + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await server.publish(some(pubsubTopic), msg1) + + # Then the message is not sent to the client's filter push handler + check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)) + + asyncTest "Client Node can't subscribe to Server Node without Filter": + # Given a server node with Relay without Filter + let + serverKey = generateSecp256k1Key() + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + waitFor server.start() + waitFor server.mountRelay() + + let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + + # When a client node subscribes to the server node + let subscribeResponse = await client.filterSubscribe( + some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo + ) + + # Then the subscription is successful + check (not subscribeResponse.isOk()) + + xasyncTest "Filter Client Node can receive messages after subscribing and restarting, via Filter": + # Given a valid filter subscription + let subscribeResponse = await client.filterSubscribe( + some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo + ) + require: + subscribeResponse.isOk() + server.wakuFilter.subscriptions.len == 1 + + # And the client node reboots + waitFor client.stop() + waitFor client.start() + client.mountFilterClient() + + # When a message is sent to the subscribed content topic, via Filter; without refreshing the subscription + let msg = fakeWakuMessage(contentTopic=contentTopic) + await server.filterHandleMessage(pubsubTopic, msg) + + # Then the message is pushed to the client + check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == pubsubTopic + pushedMsg == msg + + asyncTest "Filter Client Node can't receive messages after subscribing and restarting, via Relay": # Given the server node has Relay enabled + await server.mountRelay() + + # Given a valid filter subscription + let subscribeResponse = await client.filterSubscribe( + some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo + ) + require: + subscribeResponse.isOk() + server.wakuFilter.subscriptions.len == 1 + + # And the client node reboots + waitFor client.stop() + waitFor client.start() + client.mountFilterClient() + + # When a message is sent to the subscribed content topic, via Relay + let msg = fakeWakuMessage(contentTopic=contentTopic) + await server.publish(some(pubsubTopic), msg) + + # Then the message is not sent to the client's filter push handler + check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)) + + # Given the client refreshes the subscription + let subscribeResponse2 = await client.filterSubscribe( + some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo + ) + check: + subscribeResponse2.isOk() + server.wakuFilter.subscriptions.len == 1 + + # When a message is sent to the subscribed content topic, via Relay + pushHandlerFuture = newPushHandlerFuture() + let msg2 = fakeWakuMessage(contentTopic=contentTopic) + await server.publish(some(pubsubTopic), msg2) + + # Then the message is not sent to the client's filter push handler + check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)) diff --git a/tests/testlib/sequtils.nim b/tests/testlib/sequtils.nim new file mode 100644 index 000000000..5fd3d414f --- /dev/null +++ b/tests/testlib/sequtils.nim @@ -0,0 +1,2 @@ +proc toString*(bytes: seq[byte]): string = + cast[string](bytes) diff --git a/tests/waku_filter_v2/test_data.nim b/tests/waku_filter_v2/test_data.nim new file mode 100644 index 000000000..bc3ccb78e --- /dev/null +++ b/tests/waku_filter_v2/test_data.nim @@ -0,0 +1,70 @@ +import + std/json + +const + ALPHABETIC* = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + ALPHANUMERIC* = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + ALPHANUMERIC_SPECIAL* = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()_+-=[]{}|;':\\\",./<>?`~" + EMOJI* = "😀 😃 😄 😁 😆 😅 🤣 😂 🙂 🙃 😉 😊 😇 🥰 😍 🤩 😘 😗 😚 😙" + CODE* = "def main():\n\tprint('Hello, world!')" + QUERY* = """ + SELECT + u.id, + u.name, + u.email, + u.created_at, + u.updated_at, + ( + SELECT + COUNT(*) + FROM + posts p + WHERE + p.user_id = u.id + ) AS post_count + FROM + users u + WHERE + u.id = 1 + """ + TEXT_SMALL* = "Lorem ipsum dolor sit amet, consectetur adipiscing elit." + TEXT_LARGE* = """ + Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras gravida vulputate semper. Proin + eleifend varius cursus. Morbi lacinia posuere quam sit amet pretium. Sed non metus fermentum, + venenatis nisl id, vestibulum eros. Quisque non lorem sit amet lectus faucibus elementum eu + sit amet odio. Mauris tortor justo, malesuada quis volutpat vitae, tristique at nisl. Proin + eleifend eu arcu ac sodales. In efficitur ipsum urna, ut viverra turpis sodales ut. Phasellus + nec tortor eu urna suscipit euismod eget vel ligula. Phasellus vestibulum sollicitudin tellus, + ac sodales tellus tempor id. Curabitur sed congue velit. + """ + +proc getSampleJsonDictionary*(): JsonNode = + %*{ + "shapes": [ + { + "type": "circle", + "radius": 10 + }, + { + "type": "square", + "side": 10 + } + ], + "colours": [ + "red", + "green", + "blue" + ] + } + +proc getSampleJsonList*(): JsonNode = + %*[ + { + "type": "cat", + "name": "Salem" + }, + { + "type": "dog", + "name": "Oberon" + }, + ] diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index 0ef88235d..918763892 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -1,25 +1,42 @@ {.used.} import - std/[options, tables, sequtils], + std/[ + options, + tables, + sequtils, + strutils, + json + ], testutils/unittests, + stew/[ + results, + byteutils + ], chronos, chronicles, os, libp2p/peerstore import - ../../../waku/node/peer_manager, - ../../../waku/waku_filter_v2, - ../../../waku/waku_filter_v2/client, - ../../../waku/waku_filter_v2/subscriptions, - ../../../waku/waku_core, - ../testlib/common, - ../testlib/wakucore, - ../testlib/testasync, - ../testlib/testutils, - ../testlib/futures, - ./waku_filter_utils.nim + ../../../waku/[ + node/peer_manager, + waku_core + ], + ../../../waku/waku_filter_v2/[ + client, + subscriptions + ], + ../testlib/[ + common, + wakucore, + testasync, + testutils, + futures, + sequtils + ], + ./waku_filter_utils.nim, + ./test_data.nim let FUTURE_TIMEOUT = 1.seconds @@ -46,7 +63,7 @@ suite "Waku Filter - End to End": pubsubTopic = DefaultPubsubTopic contentTopic = DefaultContentTopic - contentTopicSeq = @[DefaultContentTopic] + contentTopicSeq = @[contentTopic] serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) @@ -132,6 +149,17 @@ suite "Waku Filter - End to End": subscribeResponse.isErr() # Not subscribed subscribeResponse.error().kind == FilterSubscribeErrorKind.PEER_DIAL_FAILURE + asyncTest "Subscribing to an empty content topic": + # When subscribing to an empty content topic + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, @[] + ) + + # Then the subscription is not successful + check: + subscribeResponse.isErr() # Not subscribed + subscribeResponse.error().kind == FilterSubscribeErrorKind.BAD_REQUEST + asyncTest "PubSub Topic with Single Content Topic": # Given let nonExistentContentTopic = "non-existent-content-topic" @@ -777,6 +805,1177 @@ suite "Waku Filter - End to End": pushedMsgPubsubTopic2 == pubsubTopic pushedMsg2 == msg2 + suite "Unsubscribe": + + ### + # One PubSub Topic + ### + + asyncTest "PubSub Topic with Single Content Topic": + # Given a valid subscription + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + + # When unsubscribing from the subscription + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a subscription with Single Content Topic": + # Given a valid subscription + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + require: + subscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + + # When refreshing the subscription + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the subscription is successful + check: + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + + # When unsubscribing from the subscription + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "PubSub Topic with Multiple Content Topics, One By One": + # Given a valid subscription + let multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # When unsubscribing from one of the content topics + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] + + # When unsubscribing from the other content topic + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "PubSub Topic with Multiple Content Topics, All At Once": + # Given a valid subscription + let multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # When unsubscribing from all content topics + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a complete subscription with Multiple Content Topics, One By One": + # Given a valid subscription + let multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + require: + subscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # And a successful complete refresh of the subscription + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + + require: + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # When unsubscribing from one of the content topics + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] + + # When unsubscribing from the other content topic + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a complete subscription with Multiple Content Topics, All At Once": + # Given a valid subscription + let multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + require: + subscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # And a successful complete refresh of the subscription + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + + require: + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # When unsubscribing from all content topics + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a partial subscription with Multiple Content Topics, One By One": + # Given a valid subscription + let multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + require: + subscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # Unsubscribing from one content topic + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + require: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] + + # And a successful refresh of the partial subscription + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + require: + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # When unsubscribing from one of the content topics + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] + + # When unsubscribing from the other content topic + let unsubscribeResponse3 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse3.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a partial subscription with Multiple Content Topics, All At Once": + # Given a valid subscription + let multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + require: + subscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # Unsubscribing from one content topic + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + require: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == @["other-content-topic"] + + # And a successful refresh of the partial subscription + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + require: + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId) == multipleContentTopicSeq + + # When unsubscribing from all content topics + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 0 + + ### + # Multiple PubSub Topics + ### + + asyncTest "Different PubSub Topics with Single (Same) Content Topic": + # Given two valid subscriptions with the same content topic + let + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq + ) + + # TODO: CHECK IF THIS MAKES SENSE + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 + + # When unsubscribing from the other subscription + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "Different PubSub Topics with Multiple (Same) Content Topics, One By One": + # Given two valid subscriptions with the same content topics + let + multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 3 + + # When unsubscribing from another of the subscriptions + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # When unsubscribing from another of the subscriptions + let unsubscribeResponse3 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 + + # When unsubscribing from the last subscription + let unsubscribeResponse4 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "Different PubSub Topics with Multiple (Same) Content Topics, All At Once": + # Given two valid subscriptions with the same content topics + let + multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + # TODO: CHECK IF THIS MAKES SENSE + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # When unsubscribing from the other subscription + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a complete subscription with different PubSub Topics and Single (Same) Content Topic": + # Given two valid subscriptions with the same content topic + let + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq + ) + + # TODO: CHECK IF THIS MAKES SENSE + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # And a successful complete refresh of the subscription + let + subscribeResponse3 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + subscribeResponse4 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq + ) + + require: + subscribeResponse3.isOk() + subscribeResponse4.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 + + # When unsubscribing from the other subscription + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a complete subscription with different PubSub Topics and Multiple (Same) Content Topics, One By One": + # Given two valid subscriptions with the same content topics + let + multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + # TODO: CHECK IF THIS MAKES SENSE + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # And a successful complete refresh of the subscription + let + subscribeResponse3 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + subscribeResponse4 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + require: + subscribeResponse3.isOk() + subscribeResponse4.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 3 + + # When unsubscribing from another of the subscriptions + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # When unsubscribing from another of the subscriptions + let unsubscribeResponse3 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 + + # When unsubscribing from the last subscription + let unsubscribeResponse4 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a complete subscription with different PubSub Topics and Multiple (Same) Content Topics, All At Once": + # Given two valid subscriptions with the same content topics + let + multipleContentTopicSeq = @[contentTopic, "other-content-topic"] + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + # TODO: CHECK IF THIS MAKES SENSE + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # And a successful complete refresh of the subscription + let + subscribeResponse3 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + subscribeResponse4 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + require: + subscribeResponse3.isOk() + subscribeResponse4.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse1.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # When unsubscribing from the other subscription + let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a partial subscription with different PubSub Topics and Multiple (Same) Content Topics, One By One": + # Given two valid subscriptions with the same content topics + let + multipleContentTopicSeq = contentTopicSeq & @["other-content-topic"] + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + # TODO: CHECK IF THIS MAKES SENSE + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # Unsubscribing from one of the content topics of each subscription + let + unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", @["other-content-topic"] + ) + + require: + unsubscribeResponse1.isOk() + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # And a successful refresh of the partial subscription + let + refreshSubscriptionResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + refreshSubscriptionResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + require: + refreshSubscriptionResponse1.isOk() + refreshSubscriptionResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse3 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse3.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 3 + + # When unsubscribing from another of the subscriptions + let unsubscribeResponse4 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse4.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # When unsubscribing from another of the subscriptions + let unsubscribeResponse5 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @["other-content-topic"] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse5.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 1 + + # When unsubscribing from the last subscription + let unsubscribeResponse6 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", @[contentTopic] + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse6.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "After refreshing a partial subscription with different PubSub Topics and Multiple (Same) Content Topics, All At Once": + # Given two valid subscriptions with the same content topics + let + multipleContentTopicSeq = contentTopicSeq & @["other-content-topic"] + subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + # TODO: CHECK IF THIS MAKES SENSE + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # Unsubscribing from one of the content topics of each subscription + let + unsubscribeResponse1 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[contentTopic] + ) + unsubscribeResponse2 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", @["other-content-topic"] + ) + + require: + unsubscribeResponse1.isOk() + unsubscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # And a successful refresh of the partial subscription + let + refreshSubscriptionResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + refreshSubscriptionResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + require: + refreshSubscriptionResponse1.isOk() + refreshSubscriptionResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 4 + + # When unsubscribing from one of the subscriptions + let unsubscribeResponse3 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse3.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.getSubscribedContentTopics(clientPeerId).len == 2 + + # When unsubscribing from the other subscription + let unsubscribeResponse4 = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "other-pubsub-topic", multipleContentTopicSeq + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse4.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "Without existing subscription": + # When unsubscribing from a non-existent subscription + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the unsubscription is not successful + check: + unsubscribeResponse.isErr() # Not subscribed + unsubscribeResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND + + asyncTest "With non existent pubsub topic": + # Given a valid subscription + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "pubsub-topic", contentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When unsubscribing from a pubsub topic that does not exist + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, "non-existent-pubsub-topic", contentTopicSeq + ) + + # Then the unsubscription is not successful + check: + unsubscribeResponse.isErr() # Not subscribed + unsubscribeResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND + + asyncTest "With non existent content topic": + # Given a valid subscription + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When unsubscribing from a content topic that does not exist + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @["non-existent-content-topic"] + ) + + # Then the unsubscription is not successful + check: + unsubscribeResponse.isErr() # Not subscribed + unsubscribeResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND + + asyncTest "Empty content topic": + # Given a valid subscription + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When unsubscribing from an empty content topic + let unsubscribeResponse = await wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, @[] + ) + + # Then the unsubscription is not successful + check: + unsubscribeResponse.isErr() # Not subscribed + unsubscribeResponse.error().kind == FilterSubscribeErrorKind.BAD_REQUEST + + suite "Unsubscribe All": + asyncTest "Unsubscribe from All Topics, One PubSub Topic": + # Given a valid subscription + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When unsubscribing from all topics + let unsubscribeResponse = await wakuFilterClient.unsubscribeAll( + serverRemotePeerInfo + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "Unsubscribe from All Topics, Multiple PubSub Topics": + # Given a valid subscription + let subscribeResponse1 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + let subscribeResponse2 = await wakuFilterClient.subscribe( + serverRemotePeerInfo, "other-pubsub-topic", contentTopicSeq + ) + require: + subscribeResponse1.isOk() + subscribeResponse2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When unsubscribing from all topics + let unsubscribeResponse = await wakuFilterClient.unsubscribeAll( + serverRemotePeerInfo + ) + + # Then the unsubscription is successful + check: + unsubscribeResponse.isOk() + wakuFilter.subscriptions.len == 0 + + asyncTest "Unsubscribe from All Topics from a non-subscribed Service": + # Given the client is not subscribed to a service + require: + wakuFilter.subscriptions.len == 0 + + # When unsubscribing from all topics for that client + let unsubscribeResponse = await wakuFilterClient.unsubscribeAll( + serverRemotePeerInfo + ) + + # Then the unsubscription is not successful + check: + unsubscribeResponse.isErr() # Not subscribed + unsubscribeResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND + + suite "Filter-Push": + asyncTest "Valid Payloads": + # Given a valid subscription + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # And some extra payloads + let + JSON_DICTIONARY = getSampleJsonDictionary() + JSON_LIST = getSampleJsonList() + + # And some valid messages + let + msg1 = fakeWakuMessage(contentTopic=contentTopic, payload=ALPHABETIC) + msg2 = fakeWakuMessage(contentTopic=contentTopic, payload=ALPHANUMERIC) + msg3 = fakeWakuMessage(contentTopic=contentTopic, payload=ALPHANUMERIC_SPECIAL) + msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=EMOJI) + msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=CODE) + msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=QUERY) + msg7 = fakeWakuMessage(contentTopic=contentTopic, payload= $JSON_DICTIONARY) + msg8 = fakeWakuMessage(contentTopic=contentTopic, payload= $JSON_LIST) + msg9 = fakeWakuMessage(contentTopic=contentTopic, payload=TEXT_SMALL) + msg10 = fakeWakuMessage(contentTopic=contentTopic, payload=TEXT_LARGE) + + # When sending the alphabetic message + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read() + + check: + pushedMsgPubsubTopic1 == pubsubTopic + pushedMsg1 == msg1 + msg1.payload.toString() == ALPHABETIC + + # When sending the alphanumeric message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg2) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == pubsubTopic + pushedMsg2 == msg2 + msg2.payload.toString() == ALPHANUMERIC + + # When sending the alphanumeric special message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg3) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic3 == pubsubTopic + pushedMsg3 == msg3 + msg3.payload.toString() == ALPHANUMERIC_SPECIAL + + # When sending the emoji message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg4) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic4, pushedMsg4) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic4 == pubsubTopic + pushedMsg4 == msg4 + msg4.payload.toString() == EMOJI + + # When sending the code message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg5) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic5, pushedMsg5) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic5 == pubsubTopic + pushedMsg5 == msg5 + msg5.payload.toString() == CODE + + # When sending the query message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg6) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic6, pushedMsg6) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic6 == pubsubTopic + pushedMsg6 == msg6 + msg6.payload.toString() == QUERY + + # When sending the table message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg7) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic7, pushedMsg7) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic7 == pubsubTopic + pushedMsg7 == msg7 + msg7.payload.toString() == $JSON_DICTIONARY + + # When sending the list message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg8) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic8, pushedMsg8) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic8 == pubsubTopic + pushedMsg8 == msg8 + msg8.payload.toString() == $JSON_LIST + + # When sending the small text message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg9) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic9, pushedMsg9) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic9 == pubsubTopic + pushedMsg9 == msg9 + msg9.payload.toString() == TEXT_SMALL + + # When sending the large text message + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic, msg10) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic10, pushedMsg10) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic10 == pubsubTopic + pushedMsg10 == msg10 + msg10.payload.toString() == TEXT_LARGE + + suite "Security and Privacy": + asyncTest "Filter Client can receive messages after Client and Server reboot": + # Given a clean client and server + require: + wakuFilter.subscriptions.len == 0 + + # When subscribing to a topic + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the subscription is successful + check: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When both are stopped and started + waitFor allFutures(wakuFilter.stop(), wakuFilterClient.stop()) + waitFor allFutures(wakuFilter.start(), wakuFilterClient.start()) + + # Then the suscription is maintained + check: + wakuFilter.subscriptions.len == 1 + + # When sending a message to the subscription + let msg1 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg1) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == pubsubTopic + pushedMsg == msg1 + + # When refreshing the subscription after reboot + let refreshSubscriptionResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + + # Then the refreshment is successful + check: + refreshSubscriptionResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When sending a message to the refreshed subscription + pushHandlerFuture = newPushHandlerFuture() # Clear previous future + let msg2 = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg2) + + # Then the message is pushed to the client + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == pubsubTopic + pushedMsg2 == msg2 + + asyncTest "Filter Client can receive messages after subscribing and stopping without unsubscribing": + # Given a valid subscription + let subscribeResponse = await wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + require: + subscribeResponse.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When the client is stopped + await wakuFilterClient.stop() + + # Then the subscription is not removed + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When the server receives a message + let msg = fakeWakuMessage(contentTopic=contentTopic) + await wakuFilter.handleMessage(pubsubTopic, msg) + + # Then the client receives the message + require await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == pubsubTopic + pushedMsg == msg + suite "MessagePushHandler - Msg List": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 3b9e607b5..5a505960b 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -86,7 +86,11 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic return err(FilterSubscribeError.notFound()) var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]()) - # TODO: consider error response if filter criteria does not exist + + if not peerSubscription.containsAny(filterCriteria): + debug "unsubscribing peer is not subscribed to any of the content topics in this pubsub topic", peerId=peerId, pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics + return err(FilterSubscribeError.notFound()) + peerSubscription.excl(filterCriteria) if peerSubscription.len() == 0: diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index 68ae9e4d4..bd248f54c 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -43,3 +43,11 @@ proc removePeers*(subscriptions: var FilterSubscriptions, peerIds: seq[PeerID]) ## Remove all subscriptions for a given list of peers for peerId in peerIds: subscriptions.removePeer(peerId) + +proc containsAny*(criteria: FilterCriteria, otherCriteria: FilterCriteria): bool = + ## Check if a given pubsub topic is contained in a set of filter criteria + ## TODO: Better way to achieve this? + for criterion in otherCriteria: + if criterion in criteria: + return true + false