{.used.}

import
  std/[options, sequtils, tempfiles],
  testutils/unittests,
  chronos,
  chronicles,
  stew/shims/net as stewNet

import
  std/[sequtils, tempfiles],
  stew/byteutils,
  stew/shims/net as stewNet,
  testutils/unittests,
  chronos,
  libp2p/switch,
  libp2p/protocols/pubsub/pubsub

import
  ../../../waku/[
    waku_core/topics/pubsub_topic,
    waku_core/topics/sharding,
    node/waku_node,
    common/paging,
    waku_core,
    waku_store/common,
    node/peer_manager,
    waku_filter_v2/client,
  ],
  ../waku_relay/utils,
  ../waku_archive/archive_utils,
  ../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils]

import ../../../waku/waku_relay/protocol

const
  listenIp = ValidIpAddress.init("0.0.0.0")
  listenPort = Port(0)

suite "Sharding":
  var
    server {.threadvar.}: WakuNode
    client {.threadvar.}: WakuNode

  asyncSetup:
    let
      serverKey = generateSecp256k1Key()
      clientKey = generateSecp256k1Key()

    server = newTestWakuNode(serverKey, listenIp, listenPort)
    client = newTestWakuNode(clientKey, listenIp, listenPort)

    await allFutures(server.mountRelay(), client.mountRelay())
    await allFutures(server.start(), client.start())

  asyncTeardown:
    await allFutures(server.stop(), client.stop())

  suite "Static Sharding Functionality":
    asyncTest "Shard Subscription and Peer Dialing":
      # Given a connected server and client subscribed to the same pubsub shard
      let
        topic = "/waku/2/rs/0/1"
        serverHandler = server.subscribeCompletionHandler(topic)
        clientHandler = client.subscribeCompletionHandler(topic)

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When the client publishes a message in the subscribed topic
      discard await client.publish(
        some(topic),
        WakuMessage(payload: "message1".toBytes(), contentTopic: "contentTopic"),
      )

      # Then the server receives the message
      let
        serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
        clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

      assertResultOk(serverResult1)
      assertResultOk(clientResult1)

      # When the server publishes a message in the subscribed topic
      serverHandler.reset()
      clientHandler.reset()
      discard await server.publish(
        some(topic),
        WakuMessage(payload: "message2".toBytes(), contentTopic: "contentTopic"),
      )

      # Then the client receives the message
      let
        serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
        clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

      assertResultOk(serverResult2)
      assertResultOk(clientResult2)

    asyncTest "Exclusion of Non-Subscribed Service Nodes":
      # When a connected server and client are subscribed to different pubsub shards
      let
        topic1 = "/waku/2/rs/0/1"
        topic2 = "/waku/2/rs/0/2"
        contentTopic = "myContentTopic"

      var
        serverHandler = server.subscribeCompletionHandler(topic1)
        clientHandler = client.subscribeCompletionHandler(topic2)

      # await sleepAsync(FUTURE_TIMEOUT)

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When a message is published in the server's subscribed topic
      discard await client.publish(
        some(topic1),
        WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic),
      )
      let
        serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
        clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

      # Then the server receives the message but the client does not
      assertResultOk(serverResult1)
      check clientResult1.isErr()

      # When the server publishes a message in the client's subscribed topic
      serverHandler.reset()
      clientHandler.reset()
      let wakuMessage2 =
        WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic)
      discard await server.publish(some(topic2), wakuMessage2)
      let
        serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
        clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

      # Then the client receives the message but the server does not
      check serverResult2.isErr()
      assertResultOk(clientResult2)

  suite "Automatic Sharding Mechanics":
    asyncTest "Content Topic-Based Shard Dialing":
      # Given a connected server and client subscribed to the same content topic (with two different formats)
      let
        contentTopicShort = "/toychat/2/huilong/proto"
        contentTopicFull = "/0/toychat/2/huilong/proto"
        pubsubTopic = "/waku/2/rs/0/58355"
          # Automatically generated from the contentTopic above

      let
        serverHandler = server.subscribeToContentTopicWithHandler(contentTopicShort)
        clientHandler = client.subscribeToContentTopicWithHandler(contentTopicFull)

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When the client publishes a message
      discard await client.publish(
        some(pubsubTopic),
        WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort),
      )
      let
        serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
        clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

      # Then the server and client receive the message
      assertResultOk(serverResult1)
      assertResultOk(clientResult1)

      # When the server publishes a message
      serverHandler.reset()
      clientHandler.reset()
      discard await server.publish(
        some(pubsubTopic),
        WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull),
      )
      let
        serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
        clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

      # Then the client and server receive the message
      assertResultOk(serverResult2)
      assertResultOk(clientResult2)

    asyncTest "Exclusion of Irrelevant Autosharded Topics":
      # Given a connected server and client subscribed to different content topics
      let
        contentTopic1 = "/toychat/2/huilong/proto"
        pubsubTopic1 = "/waku/2/rs/0/58355"
        pubsubTopic12 = NsPubsubTopic.parse(contentTopic1)
          # Automatically generated from the contentTopic above
        contentTopic2 = "/0/toychat2/2/huilong/proto"
        pubsubTopic2 = "/waku/2/rs/0/23286"
          # Automatically generated from the contentTopic above

      let
        serverHandler = server.subscribeToContentTopicWithHandler(contentTopic1)
        clientHandler = client.subscribeToContentTopicWithHandler(contentTopic2)

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When the server publishes a message in the server's subscribed topic
      discard await server.publish(
        some(pubsubTopic1),
        WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic1),
      )
      let
        serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
        clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

      # Then the server receives the message but the client does not
      assertResultOk(serverResult1)
      check clientResult1.isErr()

      # When the client publishes a message in the client's subscribed topic
      serverHandler.reset()
      clientHandler.reset()
      discard await client.publish(
        some(pubsubTopic2),
        WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic2),
      )
      let
        serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
        clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

      # Then the client receives the message but the server does not
      assertResultOk(clientResult2)
      check serverResult2.isErr()

  suite "Application Layer Integration":
    suite "App Protocol Compatibility":
      asyncTest "relay":
        # Given a connected server and client subscribed to the same pubsub topic
        let
          pubsubTopic = "/waku/2/rs/0/1"
          serverHandler = server.subscribeCompletionHandler(pubsubTopic)
          clientHandler = client.subscribeCompletionHandler(pubsubTopic)

        await sleepAsync(FUTURE_TIMEOUT)
        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When the client publishes a message
        discard await client.publish(
          some(pubsubTopic),
          WakuMessage(payload: "message1".toBytes(), contentTopic: "myContentTopic"),
        )
        let
          serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
          clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

        # Then the server and client receive the message
        assertResultOk(serverResult1)
        assertResultOk(clientResult1)

      asyncTest "filter":
        # Given a connected server and client using the same pubsub topic
        await client.mountFilterClient()
        await server.mountFilter()

        let pushHandlerFuture = newFuture[(string, WakuMessage)]()
        proc messagePushHandler(
            pubsubTopic: PubsubTopic, message: WakuMessage
        ): Future[void] {.async, closure, gcsafe.} =
          pushHandlerFuture.complete((pubsubTopic, message))

        client.wakuFilterClient.registerPushHandler(messagePushHandler)
        let
          pubsubTopic = "/waku/2/rs/0/1"
          contentTopic = "myContentTopic"
          subscribeResponse = await client.filterSubscribe(
            some(pubsubTopic),
            @[contentTopic],
            server.switch.peerInfo.toRemotePeerInfo(),
          )

        assertResultOk(subscribeResponse)
        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When a peer publishes a message (the client, for testing easeness)
        let msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic)
        await server.filterHandleMessage(pubsubTopic, msg)

        # Then the client receives the message
        let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT)
        assertResultOk(pushHandlerResult)

      asyncTest "lightpush":
        # Given a connected server and client subscribed to the same pubsub topic
        client.mountLightPushClient()
        await server.mountLightpush()

        let
          topic = "/waku/2/rs/0/1"
          clientHandler = client.subscribeCompletionHandler(topic)

        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When a peer publishes a message (the client, for testing easeness)
        let
          msg =
            WakuMessage(payload: "message".toBytes(), contentTopic: "myContentTopic")
          lightpublishRespnse = await client.lightpushPublish(
            some(topic), msg, server.switch.peerInfo.toRemotePeerInfo()
          )

        # Then the client receives the message
        let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT)
        assertResultOk(clientResult)

    suite "Content Topic Filtering and Routing":
      asyncTest "relay (automatic sharding filtering)":
        # Given a connected server and client subscribed to the same content topic (with two different formats)
        let
          contentTopicShort = "/toychat/2/huilong/proto"
          contentTopicFull = "/0/toychat/2/huilong/proto"
          pubsubTopic = "/waku/2/rs/0/58355"
          serverHandler = server.subscribeToContentTopicWithHandler(contentTopicShort)
          clientHandler = client.subscribeToContentTopicWithHandler(contentTopicFull)

        await sleepAsync(FUTURE_TIMEOUT)
        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When the client publishes a message
        discard await client.publish(
          some(pubsubTopic),
          WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort),
        )
        let
          serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
          clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

        # Then the server and client receive the message
        assertResultOk(serverResult1)
        assertResultOk(clientResult1)

        # When the server publishes a message
        serverHandler.reset()
        clientHandler.reset()
        discard await server.publish(
          some(pubsubTopic),
          WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull),
        )
        let
          serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
          clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

        # Then the server and client receive the message
        assertResultOk(serverResult2)
        assertResultOk(clientResult2)

      asyncTest "filter (automatic sharding filtering)":
        # Given a connected server and client using the same content topic (with two different formats)
        await client.mountFilterClient()
        await server.mountFilter()

        let pushHandlerFuture = newFuture[(string, WakuMessage)]()
        proc messagePushHandler(
            pubsubTopic: PubsubTopic, message: WakuMessage
        ): Future[void] {.async, closure, gcsafe.} =
          pushHandlerFuture.complete((pubsubTopic, message))

        client.wakuFilterClient.registerPushHandler(messagePushHandler)
        let
          contentTopicShort = "/toychat/2/huilong/proto"
          contentTopicFull = "/0/toychat/2/huilong/proto"
          pubsubTopic = "/waku/2/rs/0/58355"
          subscribeResponse1 = await client.filterSubscribe(
            some(pubsubTopic),
            @[contentTopicShort],
            server.switch.peerInfo.toRemotePeerInfo(),
          )

        assertResultOk(subscribeResponse1)
        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When the client publishes a message
        let msg =
          WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort)
        await server.filterHandleMessage(pubsubTopic, msg)

        # Then the client receives the message
        let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT)
        assertResultOk(pushHandlerResult)
        check pushHandlerResult.get() == (pubsubTopic, msg)

        # Given the subscription is cleared and a new subscription is made
        let
          unsubscribeResponse =
            await client.filterUnsubscribeAll(server.switch.peerInfo.toRemotePeerInfo())
          subscribeResponse2 = await client.filterSubscribe(
            some(pubsubTopic),
            @[contentTopicFull],
            server.switch.peerInfo.toRemotePeerInfo(),
          )

        assertResultOk(unsubscribeResponse)
        assertResultOk(subscribeResponse2)

        # When the client publishes a message
        pushHandlerFuture.reset()
        let msg2 =
          WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull)
        await server.filterHandleMessage(pubsubTopic, msg2)

        # Then the client receives the message
        let pushHandlerResult2 = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT)
        assertResultOk(pushHandlerResult2)
        check pushHandlerResult2.get() == (pubsubTopic, msg2)

      asyncTest "lightpush (automatic sharding filtering)":
        # Given a connected server and client using the same content topic (with two different formats)
        client.mountLightPushClient()
        await server.mountLightpush()

        let
          contentTopicShort = "/toychat/2/huilong/proto"
          contentTopicFull = "/0/toychat/2/huilong/proto"
          pubsubTopic = "/waku/2/rs/0/58355"
          clientHandler = client.subscribeToContentTopicWithHandler(contentTopicShort)

        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When a peer publishes a message (the client, for testing easeness)
        let
          msg =
            WakuMessage(payload: "message".toBytes(), contentTopic: contentTopicFull)
          lightpublishRespnse = await client.lightpushPublish(
            some(pubsubTopic), msg, server.switch.peerInfo.toRemotePeerInfo()
          )

        # Then the client receives the message
        let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT)
        assertResultOk(clientResult)

      xasyncTest "store (automatic sharding filtering)":
        # Given one archive with two sets of messages using the same content topic (with two different formats)
        let
          timeOrigin = now()
          contentTopicShort = "/toychat/2/huilong/proto"
          contentTopicFull = "/0/toychat/2/huilong/proto"
          pubsubTopic = "/waku/2/rs/0/58355"
          archiveMessages1 =
            @[
              fakeWakuMessage(
                @[byte 00], ts = ts(00, timeOrigin), contentTopic = contentTopicShort
              )
            ]
          archiveMessages2 =
            @[
              fakeWakuMessage(
                @[byte 01], ts = ts(10, timeOrigin), contentTopic = contentTopicFull
              )
            ]
          archiveDriver = newArchiveDriverWithMessages(pubsubTopic, archiveMessages1)
        discard archiveDriver.put(pubsubTopic, archiveMessages2)
        let mountArchiveResult = server.mountArchive(archiveDriver)
        assertResultOk(mountArchiveResult)

        waitFor server.mountStore()
        client.mountStoreClient()

        # Given one query for each content topic format
        let
          historyQuery1 = HistoryQuery(
            contentTopics: @[contentTopicShort],
            direction: PagingDirection.Forward,
            pageSize: 3,
          )
          historyQuery2 = HistoryQuery(
            contentTopics: @[contentTopicFull],
            direction: PagingDirection.Forward,
            pageSize: 3,
          )

        # When the client queries the server for the messages
        let
          serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
          queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo)
          queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
        assertResultOk(queryResponse1)
        assertResultOk(queryResponse2)

        # Then the responses of both queries should contain all the messages
        check:
          queryResponse1.get().messages == archiveMessages1 & archiveMessages2
          queryResponse2.get().messages == archiveMessages1 & archiveMessages2

      asyncTest "relay - exclusion (automatic sharding filtering)":
        # Given a connected server and client subscribed to different content topics
        let
          contentTopic1 = "/toychat/2/huilong/proto"
          pubsubTopic1 = "/waku/2/rs/0/58355"
            # Automatically generated from the contentTopic above
          contentTopic2 = "/0/toychat2/2/huilong/proto"
          pubsubTopic2 = "/waku/2/rs/0/23286"
            # Automatically generated from the contentTopic above
          serverHandler = server.subscribeToContentTopicWithHandler(contentTopic1)
          clientHandler = client.subscribeToContentTopicWithHandler(contentTopic2)

        await sleepAsync(FUTURE_TIMEOUT)
        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When the client publishes a message in the client's subscribed topic
        discard await client.publish(
          some(pubsubTopic2),
          WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic2),
        )
        let
          serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
          clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

        # Then the client receives the message but the server does not
        check serverResult1.isErr()
        assertResultOk(clientResult1)

        # When the server publishes a message in the server's subscribed topic
        serverHandler.reset()
        clientHandler.reset()
        discard await server.publish(
          some(pubsubTopic1),
          WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic1),
        )
        let
          serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
          clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)

        # Then the server receives the message but the client does not
        assertResultOk(serverResult2)
        check clientResult2.isErr()

      asyncTest "filter - exclusion (automatic sharding filtering)":
        # Given a connected server and client using different content topics
        await client.mountFilterClient()
        await server.mountFilter()

        let pushHandlerFuture = newFuture[(string, WakuMessage)]()
        proc messagePushHandler(
            pubsubTopic: PubsubTopic, message: WakuMessage
        ): Future[void] {.async, closure, gcsafe.} =
          pushHandlerFuture.complete((pubsubTopic, message))

        client.wakuFilterClient.registerPushHandler(messagePushHandler)
        let
          contentTopic1 = "/toychat/2/huilong/proto"
          pubsubTopic1 = "/waku/2/rs/0/58355"
            # Automatically generated from the contentTopic above
          contentTopic2 = "/0/toychat2/2/huilong/proto"
          pubsubTopic2 = "/waku/2/rs/0/23286"
            # Automatically generated from the contentTopic above
          subscribeResponse1 = await client.filterSubscribe(
            some(pubsubTopic1),
            @[contentTopic1],
            server.switch.peerInfo.toRemotePeerInfo(),
          )

        assertResultOk(subscribeResponse1)
        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When the server publishes a message in the server's subscribed topic
        let msg =
          WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic2)
        await server.filterHandleMessage(pubsubTopic2, msg)

        # Then the client does not receive the message
        let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT)
        check pushHandlerResult.isErr()

      asyncTest "lightpush - exclusion (automatic sharding filtering)":
        # Given a connected server and client using different content topics
        client.mountLightPushClient()
        await server.mountLightpush()

        let
          contentTopic1 = "/toychat/2/huilong/proto"
          pubsubTopic1 = "/waku/2/rs/0/58355"
            # Automatically generated from the contentTopic above
          contentTopic2 = "/0/toychat2/2/huilong/proto"
          pubsubTopic2 = "/waku/2/rs/0/23286"
            # Automatically generated from the contentTopic above
          clientHandler = client.subscribeToContentTopicWithHandler(contentTopic1)

        await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

        # When a peer publishes a message in the server's subscribed topic (the client, for testing easeness)
        let
          msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic2)
          lightpublishRespnse = await client.lightpushPublish(
            some(pubsubTopic2), msg, server.switch.peerInfo.toRemotePeerInfo()
          )

        # Then the client does not receive the message
        let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT)
        check clientResult.isErr()

      asyncTest "store - exclusion (automatic sharding filtering)":
        # Given one archive with two sets of messages using different content topics
        let
          timeOrigin = now()
          contentTopic1 = "/toychat/2/huilong/proto"
          pubsubTopic1 = "/waku/2/rs/0/58355"
            # Automatically generated from the contentTopic above
          contentTopic2 = "/0/toychat2/2/huilong/proto"
          pubsubTopic2 = "/waku/2/rs/0/23286"
            # Automatically generated from the contentTopic above
          archiveMessages1 =
            @[
              fakeWakuMessage(
                @[byte 00], ts = ts(00, timeOrigin), contentTopic = contentTopic1
              )
            ]
          archiveMessages2 =
            @[
              fakeWakuMessage(
                @[byte 01], ts = ts(10, timeOrigin), contentTopic = contentTopic2
              )
            ]
          archiveDriver = newArchiveDriverWithMessages(pubsubTopic1, archiveMessages1)
        discard archiveDriver.put(pubsubTopic2, archiveMessages2)
        let mountArchiveResult = server.mountArchive(archiveDriver)
        assertResultOk(mountArchiveResult)

        waitFor server.mountStore()
        client.mountStoreClient()

        # Given one query for each content topic
        let
          historyQuery1 = HistoryQuery(
            contentTopics: @[contentTopic1],
            direction: PagingDirection.Forward,
            pageSize: 2,
          )
          historyQuery2 = HistoryQuery(
            contentTopics: @[contentTopic2],
            direction: PagingDirection.Forward,
            pageSize: 2,
          )

        # When the client queries the server for the messages
        let
          serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
          queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo)
          queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
        assertResultOk(queryResponse1)
        assertResultOk(queryResponse2)

        # Then each response should contain only the messages of the corresponding content topic
        check:
          queryResponse1.get().messages == archiveMessages1
          queryResponse2.get().messages == archiveMessages2

  suite "Specific Tests":
    asyncTest "Configure Node with Multiple PubSub Topics":
      # Given a connected server and client subscribed to multiple pubsub topics
      let
        contentTopic = "myContentTopic"
        topic1 = "/waku/2/rs/0/1"
        topic2 = "/waku/2/rs/0/2"
        serverHandler1 = server.subscribeCompletionHandler(topic1)
        serverHandler2 = server.subscribeCompletionHandler(topic2)
        clientHandler1 = client.subscribeCompletionHandler(topic1)
        clientHandler2 = client.subscribeCompletionHandler(topic2)

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When the client publishes a message in the topic1
      discard await client.publish(
        some(topic1),
        WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic),
      )

      # Then the server and client receive the message in topic1's handlers, but not in topic2's
      assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT))
      check:
        (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()

      # When the client publishes a message in the topic2
      serverHandler1.reset()
      serverHandler2.reset()
      clientHandler1.reset()
      clientHandler2.reset()
      discard await client.publish(
        some(topic2),
        WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic),
      )

      # Then the server and client receive the message in topic2's handlers, but not in topic1's
      assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT))
      check:
        (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()

    asyncTest "Configure Node with Multiple Content Topics":
      # Given a connected server and client subscribed to multiple content topics
      let
        contentTopic1 = "/toychat/2/huilong/proto"
        pubsubTopic1 = "/waku/2/rs/0/58355"
          # Automatically generated from the contentTopic above
        contentTopic2 = "/0/toychat2/2/huilong/proto"
        pubsubTopic2 = "/waku/2/rs/0/23286"
          # Automatically generated from the contentTopic above
        serverHandler1 = server.subscribeToContentTopicWithHandler(contentTopic1)
        serverHandler2 = server.subscribeToContentTopicWithHandler(contentTopic2)
        clientHandler1 = client.subscribeToContentTopicWithHandler(contentTopic1)
        clientHandler2 = client.subscribeToContentTopicWithHandler(contentTopic2)

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When the client publishes a message in contentTopic1
      discard await client.publish(
        some(pubsubTopic1),
        WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic1),
      )

      # Then the server and client receive the message in contentTopic1's handlers, but not in contentTopic2's
      assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT))
      check:
        (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()

      # When the client publishes a message in contentTopic2
      serverHandler1.reset()
      serverHandler2.reset()
      clientHandler1.reset()
      clientHandler2.reset()
      discard await client.publish(
        some(pubsubTopic2),
        WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic2),
      )

      # Then the server and client receive the message in contentTopic2's handlers, but not in contentTopic1's
      assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT))
      check:
        (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()

    asyncTest "Configure Node combining Multiple Pubsub and Content Topics":
      # Given a connected server and client subscribed to multiple pubsub topics and content topics
      let
        contentTopic = "myContentTopic"
        pubsubTopic1 = "/waku/2/rs/0/1"
        pubsubTopic2 = "/waku/2/rs/0/2"
        serverHandler1 = server.subscribeCompletionHandler(pubsubTopic1)
        clientHandler1 = client.subscribeCompletionHandler(pubsubTopic1)
        serverHandler2 = server.subscribeCompletionHandler(pubsubTopic2)
        clientHandler2 = client.subscribeCompletionHandler(pubsubTopic2)
        contentTopic3 = "/toychat/2/huilong/proto"
        pubsubTopic3 = "/waku/2/rs/0/58355"
          # Automatically generated from the contentTopic above
        contentTopic4 = "/0/toychat2/2/huilong/proto"
        pubsubTopic4 = "/waku/2/rs/0/23286"
          # Automatically generated from the contentTopic above
        serverHandler3 = server.subscribeToContentTopicWithHandler(contentTopic3)
        clientHandler3 = client.subscribeToContentTopicWithHandler(contentTopic3)
        serverHandler4 = server.subscribeToContentTopicWithHandler(contentTopic4)
        clientHandler4 = client.subscribeToContentTopicWithHandler(contentTopic4)

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When the client publishes a message in the topic1
      discard await client.publish(
        some(pubsubTopic1),
        WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic),
      )

      # Then the server and client receive the message in topic1's handlers, but not in topic234's
      assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT))
      check:
        (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()

      # When the client publishes a message in the topic2
      serverHandler1.reset()
      clientHandler1.reset()
      serverHandler2.reset()
      clientHandler2.reset()
      serverHandler3.reset()
      clientHandler3.reset()
      serverHandler4.reset()
      clientHandler4.reset()
      discard await client.publish(
        some(pubsubTopic2),
        WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic),
      )

      # Then the server and client receive the message in topic2's handlers, but not in topic134's
      assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT))
      check:
        (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()

      # When the client publishes a message in the topic3
      serverHandler1.reset()
      clientHandler1.reset()
      serverHandler2.reset()
      clientHandler2.reset()
      serverHandler3.reset()
      clientHandler3.reset()
      serverHandler4.reset()
      clientHandler4.reset()
      discard await client.publish(
        some(pubsubTopic3),
        WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic3),
      )

      # Then the server and client receive the message in topic3's handlers, but not in topic124's
      assertResultOk(await serverHandler3.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await clientHandler3.waitForResult(FUTURE_TIMEOUT))
      check:
        (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()

      # When the client publishes a message in the topic4
      serverHandler1.reset()
      clientHandler1.reset()
      serverHandler2.reset()
      clientHandler2.reset()
      serverHandler3.reset()
      clientHandler3.reset()
      serverHandler4.reset()
      clientHandler4.reset()
      discard await client.publish(
        some(pubsubTopic4),
        WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic4),
      )

      # Then the server and client receive the message in topic4's handlers, but not in topic123's
      assertResultOk(await serverHandler4.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await clientHandler4.waitForResult(FUTURE_TIMEOUT))
      check:
        (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()

    asyncTest "Protocol with Unconfigured PubSub Topic Fails":
      # Given a 
      let
        contentTopic = "myContentTopic"
        topic = "/waku/2/rs/0/1"
        # Using a different topic to simulate "unconfigured" pubsub topic
        # but to have a handler (and be able to assert the test) 
        serverHandler = server.subscribeCompletionHandler("/waku/2/rs/0/0")
        clientHandler = client.subscribeCompletionHandler("/waku/2/rs/0/0")

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When the client publishes a message in the topic
      discard await client.publish(
        some(topic),
        WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic),
      )

      # Then the server and client don't receive the message
      check:
        (await serverHandler.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await clientHandler.waitForResult(FUTURE_TIMEOUT)).isErr()

    asyncTest "Waku LightPush Sharding (Static Sharding)":
      # Given a connected server and client using two different pubsub topics
      client.mountLightPushClient()
      await server.mountLightpush()

      # Given a connected server and client subscribed to multiple pubsub topics
      let
        contentTopic = "myContentTopic"
        topic1 = "/waku/2/rs/0/1"
        topic2 = "/waku/2/rs/0/2"
        serverHandler1 = server.subscribeCompletionHandler(topic1)
        serverHandler2 = server.subscribeCompletionHandler(topic2)
        clientHandler1 = client.subscribeCompletionHandler(topic1)
        clientHandler2 = client.subscribeCompletionHandler(topic2)

      await sleepAsync(FUTURE_TIMEOUT)

      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When a peer publishes a message (the client, for testing easeness) in topic1
      let
        msg1 = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic)
        lightpublishRespnse = await client.lightpushPublish(
          some(topic1), msg1, server.switch.peerInfo.toRemotePeerInfo()
        )

      # Then the server and client receive the message in topic1's handlers, but not in topic2's
      assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT))
      check:
        (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()

      # When a peer publishes a message (the client, for testing easeness) in topic2
      serverHandler1.reset()
      serverHandler2.reset()
      clientHandler1.reset()
      clientHandler2.reset()
      let
        msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic)
        lightpublishResponse2 = await client.lightpushPublish(
          some(topic2), msg2, server.switch.peerInfo.toRemotePeerInfo()
        )

      # Then the server and client receive the message in topic2's handlers, but not in topic1's
      assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT))
      assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT))
      check:
        (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
        (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()

    asyncTest "Waku Filter Sharding (Static Sharding)":
      # Given a connected server and client using two different pubsub topics
      await client.mountFilterClient()
      await server.mountFilter()

      let
        contentTopic = "myContentTopic"
        topic1 = "/waku/2/rs/0/1"
        topic2 = "/waku/2/rs/0/2"

      let
        pushHandlerFuture1 = newFuture[(string, WakuMessage)]()
        pushHandlerFuture2 = newFuture[(string, WakuMessage)]()

      proc messagePushHandler1(
          pubsubTopic: PubsubTopic, message: WakuMessage
      ): Future[void] {.async, closure, gcsafe.} =
        if topic1 == pubsubTopic:
          pushHandlerFuture1.complete((pubsubTopic, message))

      proc messagePushHandler2(
          pubsubTopic: PubsubTopic, message: WakuMessage
      ): Future[void] {.async, closure, gcsafe.} =
        if topic2 == pubsubTopic:
          pushHandlerFuture2.complete((pubsubTopic, message))

      client.wakuFilterClient.registerPushHandler(messagePushHandler1)
      client.wakuFilterClient.registerPushHandler(messagePushHandler2)

      let
        subscribeResponse1 = await client.filterSubscribe(
          some(topic1), @[contentTopic], server.switch.peerInfo.toRemotePeerInfo()
        )
        subscribeResponse2 = await client.filterSubscribe(
          some(topic2), @[contentTopic], server.switch.peerInfo.toRemotePeerInfo()
        )

      assertResultOk(subscribeResponse1)
      assertResultOk(subscribeResponse2)
      await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])

      # When the client publishes a message in topic1
      let msg = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic)
      await server.filterHandleMessage(topic1, msg)

      # Then the client receives the message in topic1's handler, but not in topic2's
      let pushHandlerResult = await pushHandlerFuture1.waitForResult(FUTURE_TIMEOUT)
      assertResultOk(pushHandlerResult)
      check:
        pushHandlerResult.get() == (topic1, msg)
        (await pushHandlerFuture2.waitForResult(FUTURE_TIMEOUT)).isErr()

      # Given the futures are reset
      pushHandlerFuture1.reset()
      pushHandlerFuture2.reset()

      # When the client publishes a message in topic2
      let msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic)
      await server.filterHandleMessage(topic2, msg2)

      # Then the client receives the message in topic2's handler, but not in topic1's
      let pushHandlerResult2 = await pushHandlerFuture2.waitForResult(FUTURE_TIMEOUT)
      assertResultOk(pushHandlerResult2)
      check:
        pushHandlerResult2.get() == (topic2, msg2)
        (await pushHandlerFuture1.waitForResult(FUTURE_TIMEOUT)).isErr()

    asyncTest "Waku Store Sharding (Static Sharding)":
      # Given one archive with two sets of messages using two different pubsub topics
      let
        timeOrigin = now()
        topic1 = "/waku/2/rs/0/1"
        topic2 = "/waku/2/rs/0/2"
        archiveMessages1 = @[fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin))]
        archiveMessages2 = @[fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin))]
        archiveDriver = newArchiveDriverWithMessages(topic1, archiveMessages1)
      discard archiveDriver.put(topic2, archiveMessages2)
      let mountArchiveResult = server.mountArchive(archiveDriver)
      assertResultOk(mountArchiveResult)

      waitFor server.mountStore()
      client.mountStoreClient()

      # Given one query for each pubsub topic
      let
        historyQuery1 = HistoryQuery(
          pubsubTopic: some(topic1), direction: PagingDirection.Forward, pageSize: 2
        )
        historyQuery2 = HistoryQuery(
          pubsubTopic: some(topic2), direction: PagingDirection.Forward, pageSize: 2
        )

      # When the client queries the server for the messages
      let
        serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
        queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo)
        queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
      assertResultOk(queryResponse1)
      assertResultOk(queryResponse2)

      # Then each response should contain only the messages of the corresponding pubsub topic
      check:
        queryResponse1.get().messages == archiveMessages1[0 ..< 1]
        queryResponse2.get().messages == archiveMessages2[0 ..< 1]