diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index 33aeb33c5..cb1d80701 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -16,71 +16,10 @@ import [node/waku_node, node/peer_manager, waku_core, waku_node, waku_rln_relay], ../waku_store/store_utils, ../waku_archive/archive_utils, + ../waku_relay/utils, ../testlib/[wakucore, wakunode, testasync, futures], ../resources/payloads -proc setupRln(node: WakuNode, identifier: uint) {.async.} = - await node.mountRlnRelay( - WakuRlnConfig( - rlnRelayDynamic: false, - rlnRelayCredIndex: some(identifier), - rlnRelayTreePath: genTempPath("rln_tree", "wakunode_" & $identifier), - rlnEpochSizeSec: 1, - ) - ) - -proc setupRelayWithRln( - node: WakuNode, identifier: uint, pubsubTopics: seq[string] -) {.async.} = - await node.mountRelay(pubsubTopics) - await setupRln(node, identifier) - -proc subscribeCompletionHandler(node: WakuNode, pubsubTopic: string): Future[bool] = - var completionFut = newFuture[bool]() - proc relayHandler( - topic: PubsubTopic, msg: WakuMessage - ): Future[void] {.async, gcsafe.} = - if topic == pubsubTopic: - completionFut.complete(true) - - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) - return completionFut - -proc sendRlnMessage( - client: WakuNode, - pubsubTopic: string, - contentTopic: string, - completionFuture: Future[bool], - payload: seq[byte] = "Hello".toBytes(), -): Future[bool] {.async.} = - var message = WakuMessage(payload: payload, contentTopic: contentTopic) - doAssert(client.wakuRlnRelay.appendRLNProof(message, epochTime()).isOk()) - discard await client.publish(some(pubsubTopic), message) - let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT) - return isCompleted - -proc sendRlnMessageWithInvalidProof( - client: WakuNode, - pubsubTopic: string, - contentTopic: string, - completionFuture: Future[bool], - payload: seq[byte] = "Hello".toBytes(), -): Future[bool] {.async.} = - let - extraBytes: seq[byte] = @[byte(1), 2, 3] - rateLimitProofRes = client.wakuRlnRelay.groupManager.generateProof( - concat(payload, extraBytes), - # we add extra bytes to invalidate proof verification against original payload - client.wakuRlnRelay.getCurrentEpoch(), - ) - rateLimitProof = rateLimitProofRes.get().encode().buffer - message = - WakuMessage(payload: @payload, contentTopic: contentTopic, proof: rateLimitProof) - - discard await client.publish(some(pubsubTopic), message) - let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT) - return isCompleted - suite "Waku RlnRelay - End to End": var pubsubTopic {.threadvar.}: PubsubTopic @@ -237,23 +176,29 @@ suite "Waku RlnRelay - End to End": doAssert( client.wakuRlnRelay - .appendRLNProof(message1b, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 0) - .isOk() - ) - doAssert( - client.wakuRlnRelay - .appendRLNProof(message1kib, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 1) - .isOk() - ) - doAssert( - client.wakuRlnRelay - .appendRLNProof(message150kib, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 2) + .appendRLNProof( + message1b, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 0) + ) .isOk() ) doAssert( client.wakuRlnRelay .appendRLNProof( - message151kibPlus, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 3 + message1kib, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 1) + ) + .isOk() + ) + doAssert( + client.wakuRlnRelay + .appendRLNProof( + message150kib, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 2) + ) + .isOk() + ) + doAssert( + client.wakuRlnRelay + .appendRLNProof( + message151kibPlus, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 3) ) .isOk() ) @@ -317,9 +262,11 @@ suite "Waku RlnRelay - End to End": WakuMessage(payload: @payload150kibPlus, contentTopic: contentTopic) doAssert( - client.wakuRlnRelay.appendRLNProof( - message151kibPlus, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 3 + client.wakuRlnRelay + .appendRLNProof( + message151kibPlus, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 3) ) + .isOk() ) # When sending the 150KiB plus message diff --git a/tests/node/test_wakunode_sharding.nim b/tests/node/test_wakunode_sharding.nim new file mode 100644 index 000000000..a92e9cc38 --- /dev/null +++ b/tests/node/test_wakunode_sharding.nim @@ -0,0 +1,1032 @@ +{.used.} + +import + std/[options, sequtils, tempfiles], + testutils/unittests, + chronos, + chronicles, + stew/shims/net as stewNet + +import + std/[sequtils, tempfiles], + stew/byteutils, + stew/shims/net as stewNet, + testutils/unittests, + chronos, + libp2p/switch, + libp2p/protocols/pubsub/pubsub + +import + ../../../waku/[ + waku_core/topics/pubsub_topic, + waku_core/topics/sharding, + node/waku_node, + common/paging, + waku_core, + waku_store/common, + node/peer_manager, + waku_filter_v2/client, + ], + ../waku_relay/utils, + ../waku_archive/archive_utils, + ../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils] + +import ../../../waku/waku_relay/protocol + +const + listenIp = ValidIpAddress.init("0.0.0.0") + listenPort = Port(0) + +suite "Sharding": + var + server {.threadvar.}: WakuNode + client {.threadvar.}: WakuNode + + asyncSetup: + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, listenIp, listenPort) + client = newTestWakuNode(clientKey, listenIp, listenPort) + + await allFutures(server.mountRelay(), client.mountRelay()) + await allFutures(server.start(), client.start()) + + asyncTeardown: + await allFutures(server.stop(), client.stop()) + + suite "Static Sharding Functionality": + asyncTest "Shard Subscription and Peer Dialing": + # Given a connected server and client subscribed to the same pubsub shard + let + topic = "/waku/2/rs/0/1" + serverHandler = server.subscribeCompletionHandler(topic) + clientHandler = client.subscribeCompletionHandler(topic) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message in the subscribed topic + discard await client.publish( + some(topic), + WakuMessage(payload: "message1".toBytes(), contentTopic: "contentTopic"), + ) + + # Then the server receives the message + let + serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + assertResultOk(serverResult1) + assertResultOk(clientResult1) + + # When the server publishes a message in the subscribed topic + serverHandler.reset() + clientHandler.reset() + discard await server.publish( + some(topic), + WakuMessage(payload: "message2".toBytes(), contentTopic: "contentTopic"), + ) + + # Then the client receives the message + let + serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + assertResultOk(serverResult2) + assertResultOk(clientResult2) + + asyncTest "Exclusion of Non-Subscribed Service Nodes": + # When a connected server and client are subscribed to different pubsub shards + let + topic1 = "/waku/2/rs/0/1" + topic2 = "/waku/2/rs/0/2" + contentTopic = "myContentTopic" + + var + serverHandler = server.subscribeCompletionHandler(topic1) + clientHandler = client.subscribeCompletionHandler(topic2) + + # await sleepAsync(FUTURE_TIMEOUT) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When a message is published in the server's subscribed topic + discard await client.publish( + some(topic1), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic), + ) + let + serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the server receives the message but the client does not + assertResultOk(serverResult1) + check clientResult1.isErr() + + # When the server publishes a message in the client's subscribed topic + serverHandler.reset() + clientHandler.reset() + let wakuMessage2 = + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic) + discard await server.publish(some(topic2), wakuMessage2) + let + serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the client receives the message but the server does not + check serverResult2.isErr() + assertResultOk(clientResult2) + + suite "Automatic Sharding Mechanics": + asyncTest "Content Topic-Based Shard Dialing": + # Given a connected server and client subscribed to the same content topic (with two different formats) + let + contentTopicShort = "/toychat/2/huilong/proto" + contentTopicFull = "/0/toychat/2/huilong/proto" + pubsubTopic = "/waku/2/rs/0/58355" + # Automatically generated from the contentTopic above + + let + serverHandler = server.subscribeToContentTopicWithHandler(contentTopicShort) + clientHandler = client.subscribeToContentTopicWithHandler(contentTopicFull) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message + discard await client.publish( + some(pubsubTopic), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort), + ) + let + serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the server and client receive the message + assertResultOk(serverResult1) + assertResultOk(clientResult1) + + # When the server publishes a message + serverHandler.reset() + clientHandler.reset() + discard await server.publish( + some(pubsubTopic), + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull), + ) + let + serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the client and server receive the message + assertResultOk(serverResult2) + assertResultOk(clientResult2) + + asyncTest "Exclusion of Irrelevant Autosharded Topics": + # Given a connected server and client subscribed to different content topics + let + contentTopic1 = "/toychat/2/huilong/proto" + pubsubTopic1 = "/waku/2/rs/0/58355" + pubsubTopic12 = NsPubsubTopic.parse(contentTopic1) + # Automatically generated from the contentTopic above + contentTopic2 = "/0/toychat2/2/huilong/proto" + pubsubTopic2 = "/waku/2/rs/0/23286" + # Automatically generated from the contentTopic above + + let + serverHandler = server.subscribeToContentTopicWithHandler(contentTopic1) + clientHandler = client.subscribeToContentTopicWithHandler(contentTopic2) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the server publishes a message in the server's subscribed topic + discard await server.publish( + some(pubsubTopic1), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic1), + ) + let + serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the server receives the message but the client does not + assertResultOk(serverResult1) + check clientResult1.isErr() + + # When the client publishes a message in the client's subscribed topic + serverHandler.reset() + clientHandler.reset() + discard await client.publish( + some(pubsubTopic2), + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic2), + ) + let + serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the client receives the message but the server does not + assertResultOk(clientResult2) + check serverResult2.isErr() + + suite "Application Layer Integration": + suite "App Protocol Compatibility": + asyncTest "relay": + # Given a connected server and client subscribed to the same pubsub topic + let + pubsubTopic = "/waku/2/rs/0/1" + serverHandler = server.subscribeCompletionHandler(pubsubTopic) + clientHandler = client.subscribeCompletionHandler(pubsubTopic) + + await sleepAsync(FUTURE_TIMEOUT) + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message + discard await client.publish( + some(pubsubTopic), + WakuMessage(payload: "message1".toBytes(), contentTopic: "myContentTopic"), + ) + let + serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the server and client receive the message + assertResultOk(serverResult1) + assertResultOk(clientResult1) + + asyncTest "filter": + # Given a connected server and client using the same pubsub topic + await client.mountFilterClient() + await server.mountFilter() + + let pushHandlerFuture = newFuture[(string, WakuMessage)]() + proc messagePushHandler( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, closure, gcsafe.} = + pushHandlerFuture.complete((pubsubTopic, message)) + + client.wakuFilterClient.registerPushHandler(messagePushHandler) + let + pubsubTopic = "/waku/2/rs/0/1" + contentTopic = "myContentTopic" + subscribeResponse = await client.filterSubscribe( + some(pubsubTopic), + @[contentTopic], + server.switch.peerInfo.toRemotePeerInfo(), + ) + + assertResultOk(subscribeResponse) + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When a peer publishes a message (the client, for testing easeness) + let msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic) + await server.filterHandleMessage(pubsubTopic, msg) + + # Then the client receives the message + let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT) + assertResultOk(pushHandlerResult) + + asyncTest "lightpush": + # Given a connected server and client subscribed to the same pubsub topic + client.mountLightPushClient() + await server.mountLightpush() + + let + topic = "/waku/2/rs/0/1" + clientHandler = client.subscribeCompletionHandler(topic) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When a peer publishes a message (the client, for testing easeness) + let + msg = + WakuMessage(payload: "message".toBytes(), contentTopic: "myContentTopic") + lightpublishRespnse = await client.lightpushPublish( + some(topic), msg, server.switch.peerInfo.toRemotePeerInfo() + ) + + # Then the client receives the message + let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT) + assertResultOk(clientResult) + + suite "Content Topic Filtering and Routing": + asyncTest "relay (automatic sharding filtering)": + # Given a connected server and client subscribed to the same content topic (with two different formats) + let + contentTopicShort = "/toychat/2/huilong/proto" + contentTopicFull = "/0/toychat/2/huilong/proto" + pubsubTopic = "/waku/2/rs/0/58355" + serverHandler = server.subscribeToContentTopicWithHandler(contentTopicShort) + clientHandler = client.subscribeToContentTopicWithHandler(contentTopicFull) + + await sleepAsync(FUTURE_TIMEOUT) + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message + discard await client.publish( + some(pubsubTopic), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort), + ) + let + serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the server and client receive the message + assertResultOk(serverResult1) + assertResultOk(clientResult1) + + # When the server publishes a message + serverHandler.reset() + clientHandler.reset() + discard await server.publish( + some(pubsubTopic), + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull), + ) + let + serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the server and client receive the message + assertResultOk(serverResult2) + assertResultOk(clientResult2) + + asyncTest "filter (automatic sharding filtering)": + # Given a connected server and client using the same content topic (with two different formats) + await client.mountFilterClient() + await server.mountFilter() + + let pushHandlerFuture = newFuture[(string, WakuMessage)]() + proc messagePushHandler( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, closure, gcsafe.} = + pushHandlerFuture.complete((pubsubTopic, message)) + + client.wakuFilterClient.registerPushHandler(messagePushHandler) + let + contentTopicShort = "/toychat/2/huilong/proto" + contentTopicFull = "/0/toychat/2/huilong/proto" + pubsubTopic = "/waku/2/rs/0/58355" + subscribeResponse1 = await client.filterSubscribe( + some(pubsubTopic), + @[contentTopicShort], + server.switch.peerInfo.toRemotePeerInfo(), + ) + + assertResultOk(subscribeResponse1) + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message + let msg = + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort) + await server.filterHandleMessage(pubsubTopic, msg) + + # Then the client receives the message + let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT) + assertResultOk(pushHandlerResult) + check pushHandlerResult.get() == (pubsubTopic, msg) + + # Given the subscription is cleared and a new subscription is made + let + unsubscribeResponse = + await client.filterUnsubscribeAll(server.switch.peerInfo.toRemotePeerInfo()) + subscribeResponse2 = await client.filterSubscribe( + some(pubsubTopic), + @[contentTopicFull], + server.switch.peerInfo.toRemotePeerInfo(), + ) + + assertResultOk(unsubscribeResponse) + assertResultOk(subscribeResponse2) + + # When the client publishes a message + pushHandlerFuture.reset() + let msg2 = + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull) + await server.filterHandleMessage(pubsubTopic, msg2) + + # Then the client receives the message + let pushHandlerResult2 = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT) + assertResultOk(pushHandlerResult2) + check pushHandlerResult2.get() == (pubsubTopic, msg2) + + asyncTest "lightpush (automatic sharding filtering)": + # Given a connected server and client using the same content topic (with two different formats) + client.mountLightPushClient() + await server.mountLightpush() + + let + contentTopicShort = "/toychat/2/huilong/proto" + contentTopicFull = "/0/toychat/2/huilong/proto" + pubsubTopic = "/waku/2/rs/0/58355" + clientHandler = client.subscribeToContentTopicWithHandler(contentTopicShort) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When a peer publishes a message (the client, for testing easeness) + let + msg = + WakuMessage(payload: "message".toBytes(), contentTopic: contentTopicFull) + lightpublishRespnse = await client.lightpushPublish( + some(pubsubTopic), msg, server.switch.peerInfo.toRemotePeerInfo() + ) + + # Then the client receives the message + let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT) + assertResultOk(clientResult) + + xasyncTest "store (automatic sharding filtering)": + # Given one archive with two sets of messages using the same content topic (with two different formats) + let + timeOrigin = now() + contentTopicShort = "/toychat/2/huilong/proto" + contentTopicFull = "/0/toychat/2/huilong/proto" + pubsubTopic = "/waku/2/rs/0/58355" + archiveMessages1 = + @[ + fakeWakuMessage( + @[byte 00], ts = ts(00, timeOrigin), contentTopic = contentTopicShort + ) + ] + archiveMessages2 = + @[ + fakeWakuMessage( + @[byte 01], ts = ts(10, timeOrigin), contentTopic = contentTopicFull + ) + ] + archiveDriver = newArchiveDriverWithMessages(pubsubTopic, archiveMessages1) + discard archiveDriver.put(pubsubTopic, archiveMessages2) + let mountArchiveResult = server.mountArchive(archiveDriver) + assertResultOk(mountArchiveResult) + + waitFor server.mountStore() + client.mountStoreClient() + + # Given one query for each content topic format + let + historyQuery1 = HistoryQuery( + contentTopics: @[contentTopicShort], + direction: PagingDirection.Forward, + pageSize: 3, + ) + historyQuery2 = HistoryQuery( + contentTopics: @[contentTopicFull], + direction: PagingDirection.Forward, + pageSize: 3, + ) + + # When the client queries the server for the messages + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo) + queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo) + assertResultOk(queryResponse1) + assertResultOk(queryResponse2) + + # Then the responses of both queries should contain all the messages + check: + queryResponse1.get().messages == archiveMessages1 & archiveMessages2 + queryResponse2.get().messages == archiveMessages1 & archiveMessages2 + + asyncTest "relay - exclusion (automatic sharding filtering)": + # Given a connected server and client subscribed to different content topics + let + contentTopic1 = "/toychat/2/huilong/proto" + pubsubTopic1 = "/waku/2/rs/0/58355" + # Automatically generated from the contentTopic above + contentTopic2 = "/0/toychat2/2/huilong/proto" + pubsubTopic2 = "/waku/2/rs/0/23286" + # Automatically generated from the contentTopic above + serverHandler = server.subscribeToContentTopicWithHandler(contentTopic1) + clientHandler = client.subscribeToContentTopicWithHandler(contentTopic2) + + await sleepAsync(FUTURE_TIMEOUT) + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message in the client's subscribed topic + discard await client.publish( + some(pubsubTopic2), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic2), + ) + let + serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the client receives the message but the server does not + check serverResult1.isErr() + assertResultOk(clientResult1) + + # When the server publishes a message in the server's subscribed topic + serverHandler.reset() + clientHandler.reset() + discard await server.publish( + some(pubsubTopic1), + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic1), + ) + let + serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT) + clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT) + + # Then the server receives the message but the client does not + assertResultOk(serverResult2) + check clientResult2.isErr() + + asyncTest "filter - exclusion (automatic sharding filtering)": + # Given a connected server and client using different content topics + await client.mountFilterClient() + await server.mountFilter() + + let pushHandlerFuture = newFuture[(string, WakuMessage)]() + proc messagePushHandler( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, closure, gcsafe.} = + pushHandlerFuture.complete((pubsubTopic, message)) + + client.wakuFilterClient.registerPushHandler(messagePushHandler) + let + contentTopic1 = "/toychat/2/huilong/proto" + pubsubTopic1 = "/waku/2/rs/0/58355" + # Automatically generated from the contentTopic above + contentTopic2 = "/0/toychat2/2/huilong/proto" + pubsubTopic2 = "/waku/2/rs/0/23286" + # Automatically generated from the contentTopic above + subscribeResponse1 = await client.filterSubscribe( + some(pubsubTopic1), + @[contentTopic1], + server.switch.peerInfo.toRemotePeerInfo(), + ) + + assertResultOk(subscribeResponse1) + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the server publishes a message in the server's subscribed topic + let msg = + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic2) + await server.filterHandleMessage(pubsubTopic2, msg) + + # Then the client does not receive the message + let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT) + check pushHandlerResult.isErr() + + asyncTest "lightpush - exclusion (automatic sharding filtering)": + # Given a connected server and client using different content topics + client.mountLightPushClient() + await server.mountLightpush() + + let + contentTopic1 = "/toychat/2/huilong/proto" + pubsubTopic1 = "/waku/2/rs/0/58355" + # Automatically generated from the contentTopic above + contentTopic2 = "/0/toychat2/2/huilong/proto" + pubsubTopic2 = "/waku/2/rs/0/23286" + # Automatically generated from the contentTopic above + clientHandler = client.subscribeToContentTopicWithHandler(contentTopic1) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When a peer publishes a message in the server's subscribed topic (the client, for testing easeness) + let + msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic2) + lightpublishRespnse = await client.lightpushPublish( + some(pubsubTopic2), msg, server.switch.peerInfo.toRemotePeerInfo() + ) + + # Then the client does not receive the message + let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT) + check clientResult.isErr() + + asyncTest "store - exclusion (automatic sharding filtering)": + # Given one archive with two sets of messages using different content topics + let + timeOrigin = now() + contentTopic1 = "/toychat/2/huilong/proto" + pubsubTopic1 = "/waku/2/rs/0/58355" + # Automatically generated from the contentTopic above + contentTopic2 = "/0/toychat2/2/huilong/proto" + pubsubTopic2 = "/waku/2/rs/0/23286" + # Automatically generated from the contentTopic above + archiveMessages1 = + @[ + fakeWakuMessage( + @[byte 00], ts = ts(00, timeOrigin), contentTopic = contentTopic1 + ) + ] + archiveMessages2 = + @[ + fakeWakuMessage( + @[byte 01], ts = ts(10, timeOrigin), contentTopic = contentTopic2 + ) + ] + archiveDriver = newArchiveDriverWithMessages(pubsubTopic1, archiveMessages1) + discard archiveDriver.put(pubsubTopic2, archiveMessages2) + let mountArchiveResult = server.mountArchive(archiveDriver) + assertResultOk(mountArchiveResult) + + waitFor server.mountStore() + client.mountStoreClient() + + # Given one query for each content topic + let + historyQuery1 = HistoryQuery( + contentTopics: @[contentTopic1], + direction: PagingDirection.Forward, + pageSize: 2, + ) + historyQuery2 = HistoryQuery( + contentTopics: @[contentTopic2], + direction: PagingDirection.Forward, + pageSize: 2, + ) + + # When the client queries the server for the messages + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo) + queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo) + assertResultOk(queryResponse1) + assertResultOk(queryResponse2) + + # Then each response should contain only the messages of the corresponding content topic + check: + queryResponse1.get().messages == archiveMessages1 + queryResponse2.get().messages == archiveMessages2 + + suite "Specific Tests": + asyncTest "Configure Node with Multiple PubSub Topics": + # Given a connected server and client subscribed to multiple pubsub topics + let + contentTopic = "myContentTopic" + topic1 = "/waku/2/rs/0/1" + topic2 = "/waku/2/rs/0/2" + serverHandler1 = server.subscribeCompletionHandler(topic1) + serverHandler2 = server.subscribeCompletionHandler(topic2) + clientHandler1 = client.subscribeCompletionHandler(topic1) + clientHandler2 = client.subscribeCompletionHandler(topic2) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message in the topic1 + discard await client.publish( + some(topic1), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic), + ) + + # Then the server and client receive the message in topic1's handlers, but not in topic2's + assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT)) + check: + (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + + # When the client publishes a message in the topic2 + serverHandler1.reset() + serverHandler2.reset() + clientHandler1.reset() + clientHandler2.reset() + discard await client.publish( + some(topic2), + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic), + ) + + # Then the server and client receive the message in topic2's handlers, but not in topic1's + assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT)) + check: + (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + + asyncTest "Configure Node with Multiple Content Topics": + # Given a connected server and client subscribed to multiple content topics + let + contentTopic1 = "/toychat/2/huilong/proto" + pubsubTopic1 = "/waku/2/rs/0/58355" + # Automatically generated from the contentTopic above + contentTopic2 = "/0/toychat2/2/huilong/proto" + pubsubTopic2 = "/waku/2/rs/0/23286" + # Automatically generated from the contentTopic above + serverHandler1 = server.subscribeToContentTopicWithHandler(contentTopic1) + serverHandler2 = server.subscribeToContentTopicWithHandler(contentTopic2) + clientHandler1 = client.subscribeToContentTopicWithHandler(contentTopic1) + clientHandler2 = client.subscribeToContentTopicWithHandler(contentTopic2) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message in contentTopic1 + discard await client.publish( + some(pubsubTopic1), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic1), + ) + + # Then the server and client receive the message in contentTopic1's handlers, but not in contentTopic2's + assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT)) + check: + (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + + # When the client publishes a message in contentTopic2 + serverHandler1.reset() + serverHandler2.reset() + clientHandler1.reset() + clientHandler2.reset() + discard await client.publish( + some(pubsubTopic2), + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic2), + ) + + # Then the server and client receive the message in contentTopic2's handlers, but not in contentTopic1's + assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT)) + check: + (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + + asyncTest "Configure Node combining Multiple Pubsub and Content Topics": + # Given a connected server and client subscribed to multiple pubsub topics and content topics + let + contentTopic = "myContentTopic" + pubsubTopic1 = "/waku/2/rs/0/1" + pubsubTopic2 = "/waku/2/rs/0/2" + serverHandler1 = server.subscribeCompletionHandler(pubsubTopic1) + clientHandler1 = client.subscribeCompletionHandler(pubsubTopic1) + serverHandler2 = server.subscribeCompletionHandler(pubsubTopic2) + clientHandler2 = client.subscribeCompletionHandler(pubsubTopic2) + contentTopic3 = "/toychat/2/huilong/proto" + pubsubTopic3 = "/waku/2/rs/0/58355" + # Automatically generated from the contentTopic above + contentTopic4 = "/0/toychat2/2/huilong/proto" + pubsubTopic4 = "/waku/2/rs/0/23286" + # Automatically generated from the contentTopic above + serverHandler3 = server.subscribeToContentTopicWithHandler(contentTopic3) + clientHandler3 = client.subscribeToContentTopicWithHandler(contentTopic3) + serverHandler4 = server.subscribeToContentTopicWithHandler(contentTopic4) + clientHandler4 = client.subscribeToContentTopicWithHandler(contentTopic4) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message in the topic1 + discard await client.publish( + some(pubsubTopic1), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic), + ) + + # Then the server and client receive the message in topic1's handlers, but not in topic234's + assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT)) + check: + (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr() + + # When the client publishes a message in the topic2 + serverHandler1.reset() + clientHandler1.reset() + serverHandler2.reset() + clientHandler2.reset() + serverHandler3.reset() + clientHandler3.reset() + serverHandler4.reset() + clientHandler4.reset() + discard await client.publish( + some(pubsubTopic2), + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic), + ) + + # Then the server and client receive the message in topic2's handlers, but not in topic134's + assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT)) + check: + (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr() + + # When the client publishes a message in the topic3 + serverHandler1.reset() + clientHandler1.reset() + serverHandler2.reset() + clientHandler2.reset() + serverHandler3.reset() + clientHandler3.reset() + serverHandler4.reset() + clientHandler4.reset() + discard await client.publish( + some(pubsubTopic3), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic3), + ) + + # Then the server and client receive the message in topic3's handlers, but not in topic124's + assertResultOk(await serverHandler3.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await clientHandler3.waitForResult(FUTURE_TIMEOUT)) + check: + (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr() + + # When the client publishes a message in the topic4 + serverHandler1.reset() + clientHandler1.reset() + serverHandler2.reset() + clientHandler2.reset() + serverHandler3.reset() + clientHandler3.reset() + serverHandler4.reset() + clientHandler4.reset() + discard await client.publish( + some(pubsubTopic4), + WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic4), + ) + + # Then the server and client receive the message in topic4's handlers, but not in topic123's + assertResultOk(await serverHandler4.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await clientHandler4.waitForResult(FUTURE_TIMEOUT)) + check: + (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr() + + asyncTest "Protocol with Unconfigured PubSub Topic Fails": + # Given a + let + contentTopic = "myContentTopic" + topic = "/waku/2/rs/0/1" + # Using a different topic to simulate "unconfigured" pubsub topic + # but to have a handler (and be able to assert the test) + serverHandler = server.subscribeCompletionHandler("/waku/2/rs/0/0") + clientHandler = client.subscribeCompletionHandler("/waku/2/rs/0/0") + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message in the topic + discard await client.publish( + some(topic), + WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic), + ) + + # Then the server and client don't receive the message + check: + (await serverHandler.waitForResult(FUTURE_TIMEOUT)).isErr() + (await clientHandler.waitForResult(FUTURE_TIMEOUT)).isErr() + + asyncTest "Waku LightPush Sharding (Static Sharding)": + # Given a connected server and client using two different pubsub topics + client.mountLightPushClient() + await server.mountLightpush() + + # Given a connected server and client subscribed to multiple pubsub topics + let + contentTopic = "myContentTopic" + topic1 = "/waku/2/rs/0/1" + topic2 = "/waku/2/rs/0/2" + serverHandler1 = server.subscribeCompletionHandler(topic1) + serverHandler2 = server.subscribeCompletionHandler(topic2) + clientHandler1 = client.subscribeCompletionHandler(topic1) + clientHandler2 = client.subscribeCompletionHandler(topic2) + + await sleepAsync(FUTURE_TIMEOUT) + + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When a peer publishes a message (the client, for testing easeness) in topic1 + let + msg1 = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic) + lightpublishRespnse = await client.lightpushPublish( + some(topic1), msg1, server.switch.peerInfo.toRemotePeerInfo() + ) + + # Then the server and client receive the message in topic1's handlers, but not in topic2's + assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT)) + check: + (await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr() + + # When a peer publishes a message (the client, for testing easeness) in topic2 + serverHandler1.reset() + serverHandler2.reset() + clientHandler1.reset() + clientHandler2.reset() + let + msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic) + lightpublishResponse2 = await client.lightpushPublish( + some(topic2), msg2, server.switch.peerInfo.toRemotePeerInfo() + ) + + # Then the server and client receive the message in topic2's handlers, but not in topic1's + assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT)) + assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT)) + check: + (await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + (await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr() + + asyncTest "Waku Filter Sharding (Static Sharding)": + # Given a connected server and client using two different pubsub topics + await client.mountFilterClient() + await server.mountFilter() + + let + contentTopic = "myContentTopic" + topic1 = "/waku/2/rs/0/1" + topic2 = "/waku/2/rs/0/2" + + let + pushHandlerFuture1 = newFuture[(string, WakuMessage)]() + pushHandlerFuture2 = newFuture[(string, WakuMessage)]() + + proc messagePushHandler1( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, closure, gcsafe.} = + if topic1 == pubsubTopic: + pushHandlerFuture1.complete((pubsubTopic, message)) + + proc messagePushHandler2( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, closure, gcsafe.} = + if topic2 == pubsubTopic: + pushHandlerFuture2.complete((pubsubTopic, message)) + + client.wakuFilterClient.registerPushHandler(messagePushHandler1) + client.wakuFilterClient.registerPushHandler(messagePushHandler2) + + let + subscribeResponse1 = await client.filterSubscribe( + some(topic1), @[contentTopic], server.switch.peerInfo.toRemotePeerInfo() + ) + subscribeResponse2 = await client.filterSubscribe( + some(topic2), @[contentTopic], server.switch.peerInfo.toRemotePeerInfo() + ) + + assertResultOk(subscribeResponse1) + assertResultOk(subscribeResponse2) + await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()]) + + # When the client publishes a message in topic1 + let msg = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic) + await server.filterHandleMessage(topic1, msg) + + # Then the client receives the message in topic1's handler, but not in topic2's + let pushHandlerResult = await pushHandlerFuture1.waitForResult(FUTURE_TIMEOUT) + assertResultOk(pushHandlerResult) + check: + pushHandlerResult.get() == (topic1, msg) + (await pushHandlerFuture2.waitForResult(FUTURE_TIMEOUT)).isErr() + + # Given the futures are reset + pushHandlerFuture1.reset() + pushHandlerFuture2.reset() + + # When the client publishes a message in topic2 + let msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic) + await server.filterHandleMessage(topic2, msg2) + + # Then the client receives the message in topic2's handler, but not in topic1's + let pushHandlerResult2 = await pushHandlerFuture2.waitForResult(FUTURE_TIMEOUT) + assertResultOk(pushHandlerResult2) + check: + pushHandlerResult2.get() == (topic2, msg2) + (await pushHandlerFuture1.waitForResult(FUTURE_TIMEOUT)).isErr() + + asyncTest "Waku Store Sharding (Static Sharding)": + # Given one archive with two sets of messages using two different pubsub topics + let + timeOrigin = now() + topic1 = "/waku/2/rs/0/1" + topic2 = "/waku/2/rs/0/2" + archiveMessages1 = @[fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin))] + archiveMessages2 = @[fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin))] + archiveDriver = newArchiveDriverWithMessages(topic1, archiveMessages1) + discard archiveDriver.put(topic2, archiveMessages2) + let mountArchiveResult = server.mountArchive(archiveDriver) + assertResultOk(mountArchiveResult) + + waitFor server.mountStore() + client.mountStoreClient() + + # Given one query for each pubsub topic + let + historyQuery1 = HistoryQuery( + pubsubTopic: some(topic1), direction: PagingDirection.Forward, pageSize: 2 + ) + historyQuery2 = HistoryQuery( + pubsubTopic: some(topic2), direction: PagingDirection.Forward, pageSize: 2 + ) + + # When the client queries the server for the messages + let + serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo) + queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo) + assertResultOk(queryResponse1) + assertResultOk(queryResponse2) + + # Then each response should contain only the messages of the corresponding pubsub topic + check: + queryResponse1.get().messages == archiveMessages1[0 ..< 1] + queryResponse2.get().messages == archiveMessages2[0 ..< 1] diff --git a/tests/testlib/assertions.nim b/tests/testlib/assertions.nim index a2f0f357e..0d32c843a 100644 --- a/tests/testlib/assertions.nim +++ b/tests/testlib/assertions.nim @@ -1,4 +1,4 @@ import chronos template assertResultOk*[T, E](result: Result[T, E]) = - assert result.isOk(), result.error() + assert result.isOk(), $result.error() diff --git a/tests/testlib/futures.nim b/tests/testlib/futures.nim index 7dd5dfaa0..b3f2e935b 100644 --- a/tests/testlib/futures.nim +++ b/tests/testlib/futures.nim @@ -41,3 +41,8 @@ proc waitForResult*[T]( ): Future[Result[T, string]] {.async.} = discard await future.withTimeout(timeout) return future.toResult() + +proc reset*[T](future: Future[T]): void = + # Likely an incomplete reset, but good enough for testing purposes (for now) + future.internalError = nil + future.internalState = FutureState.Pending diff --git a/tests/waku_core/topics/test_pubsub_topic.nim b/tests/waku_core/topics/test_pubsub_topic.nim new file mode 100644 index 000000000..8b93e7b37 --- /dev/null +++ b/tests/waku_core/topics/test_pubsub_topic.nim @@ -0,0 +1,44 @@ +{.used.} + +import std/[options], testutils/unittests, results + +import ../../../../waku/[waku_core/topics/pubsub_topic], ../../testlib/[wakucore] + +suite "Static Sharding Functionality": + test "Shard Cluster Identification": + let topic = NsPubsubTopic.parseStaticSharding("/waku/2/rs/0/1").get() + check: + topic.clusterId == 0 + topic.shardId == 1 + topic == NsPubsubTopic.staticSharding(0, 1) + + test "Pubsub Topic Naming Compliance": + let topic = NsPubsubTopic.staticSharding(0, 1) + check: + topic.clusterId == 0 + topic.shardId == 1 + topic == "/waku/2/rs/0/1" + +suite "Automatic Sharding Mechanics": + test "Shard Selection Algorithm": + let + topic1 = NsPubsubTopic.parseNamedSharding("/waku/2/xxx").get() + topic2 = NsPubsubTopic.parseNamedSharding("/waku/2/123").get() + topic3 = NsPubsubTopic.parseNamedSharding("/waku/2/xxx123").get() + + check: + # topic1.shardId == 1 + # topic1.clusterId == 0 + topic1 == NsPubsubTopic.staticSharding(0, 1) + # topic2.shardId == 1 + # topic2.clusterId == 0 + topic2 == NsPubsubTopic.staticSharding(0, 1) + # topic3.shardId == 1 + # topic3.clusterId == 0 + topic3 == NsPubsubTopic.staticSharding(0, 1) + + test "Shard Selection Algorithm without topicName": + let topicResult = NsPubsubTopic.parseNamedSharding("/waku/2/") + + check: + topicResult.isErr() diff --git a/tests/waku_core/topics/test_sharding.nim b/tests/waku_core/topics/test_sharding.nim index fbab1a4b4..64a12150f 100644 --- a/tests/waku_core/topics/test_sharding.nim +++ b/tests/waku_core/topics/test_sharding.nim @@ -11,16 +11,33 @@ suite "Autosharding": pubsubTopic13 = "/waku/2/rs/1/3" contentTopicShort = "/toychat/2/huilong/proto" contentTopicFull = "/0/toychat/2/huilong/proto" + contentTopicShort2 = "/toychat2/2/huilong/proto" + contentTopicFull2 = "/0/toychat2/2/huilong/proto" + contentTopicShort3 = "/toychat/2/huilong/proto2" + contentTopicFull3 = "/0/toychat/2/huilong/proto2" + contentTopicShort4 = "/toychat/4/huilong/proto2" + contentTopicFull4 = "/0/toychat/4/huilong/proto2" + contentTopicFull5 = "/1/toychat/2/huilong/proto" + contentTopicFull6 = "/1/toychat2/2/huilong/proto" contentTopicInvalid = "/1/toychat/2/huilong/proto" suite "getGenZeroShard": test "Generate Gen0 Shard": let sharding = Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) + # Given two valid topics let nsContentTopic1 = NsContentTopic.parse(contentTopicShort).value() nsContentTopic2 = NsContentTopic.parse(contentTopicFull).value() + nsContentTopic3 = NsContentTopic.parse(contentTopicShort2).value() + nsContentTopic4 = NsContentTopic.parse(contentTopicFull2).value() + nsContentTopic5 = NsContentTopic.parse(contentTopicShort3).value() + nsContentTopic6 = NsContentTopic.parse(contentTopicFull3).value() + nsContentTopic7 = NsContentTopic.parse(contentTopicShort3).value() + nsContentTopic8 = NsContentTopic.parse(contentTopicFull3).value() + nsContentTopic9 = NsContentTopic.parse(contentTopicFull4).value() + nsContentTopic10 = NsContentTopic.parse(contentTopicFull5).value() # When we generate a gen0 shard from them let @@ -28,11 +45,35 @@ suite "Autosharding": sharding.getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount) nsPubsubTopic2 = sharding.getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount) + nsPubsubTopic3 = + sharding.getGenZeroShard(nsContentTopic3, GenerationZeroShardsCount) + nsPubsubTopic4 = + sharding.getGenZeroShard(nsContentTopic4, GenerationZeroShardsCount) + nsPubsubTopic5 = + sharding.getGenZeroShard(nsContentTopic5, GenerationZeroShardsCount) + nsPubsubTopic6 = + sharding.getGenZeroShard(nsContentTopic6, GenerationZeroShardsCount) + nsPubsubTopic7 = + sharding.getGenZeroShard(nsContentTopic7, GenerationZeroShardsCount) + nsPubsubTopic8 = + sharding.getGenZeroShard(nsContentTopic8, GenerationZeroShardsCount) + nsPubsubTopic9 = + sharding.getGenZeroShard(nsContentTopic9, GenerationZeroShardsCount) + nsPubsubTopic10 = + sharding.getGenZeroShard(nsContentTopic10, GenerationZeroShardsCount) # Then the generated shards are valid check: nsPubsubTopic1 == NsPubsubTopic.staticSharding(ClusterId, 3) nsPubsubTopic2 == NsPubsubTopic.staticSharding(ClusterId, 3) + nsPubsubTopic3 == NsPubsubTopic.staticSharding(ClusterId, 6) + nsPubsubTopic4 == NsPubsubTopic.staticSharding(ClusterId, 6) + nsPubsubTopic5 == NsPubsubTopic.staticSharding(ClusterId, 3) + nsPubsubTopic6 == NsPubsubTopic.staticSharding(ClusterId, 3) + nsPubsubTopic7 == NsPubsubTopic.staticSharding(ClusterId, 3) + nsPubsubTopic8 == NsPubsubTopic.staticSharding(ClusterId, 3) + nsPubsubTopic9 == NsPubsubTopic.staticSharding(ClusterId, 7) + nsPubsubTopic10 == NsPubsubTopic.staticSharding(ClusterId, 3) suite "getShard from NsContentTopic": test "Generate Gen0 Shard with topic.generation==none": diff --git a/tests/waku_enr/test_sharding.nim b/tests/waku_enr/test_sharding.nim index 0417da88a..92308885d 100644 --- a/tests/waku_enr/test_sharding.nim +++ b/tests/waku_enr/test_sharding.nim @@ -9,7 +9,7 @@ import eth/keys as eth_keys import - ../../../waku/[waku_enr, discovery/waku_discv5, waku_core], + ../../../waku/[waku_enr, discovery/waku_discv5, waku_core, common/enr], ../testlib/wakucore, ../waku_discv5/utils, ./utils @@ -114,3 +114,55 @@ suite "Sharding": ## Cleanup await node.stop() + +suite "Discovery Mechanisms for Shards": + test "Index List Representation": + # Given a valid index list and its representation + let + indicesList: seq[uint8] = @[0, 73, 2, 0, 1, 0, 10] + clusterId: uint16 = 73 # bitVector's clusterId + shardIds: seq[uint16] = @[1u16, 10u16] # bitVector's shardIds + + let + enrSeqNum = 1u64 + enrPrivKey = generatesecp256k1key() + + # When building an ENR with the index list + var builder = EnrBuilder.init(enrPrivKey, enrSeqNum) + builder.addFieldPair(ShardingIndicesListEnrField, indicesList) + let + record = builder.build().tryGet() + relayShards = record.toTyped().tryGet().relayShardingIndicesList().get() + + # Then the ENR should be correctly parsed + check: + relayShards == RelayShards.init(clusterId, shardIds).expect("Valid Shards") + + test "Bit Vector Representation": + # Given a valid bit vector and its representation + let + bitVector: seq[byte] = + @[ + 0, 73, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ] + clusterId: uint16 = 73 # bitVector's clusterId + shardIds: seq[uint16] = @[1u16, 10u16] # bitVector's shardIds + + let + enrSeqNum = 1u64 + enrPrivKey = generatesecp256k1key() + + # When building an ENR with the bit vector + var builder = EnrBuilder.init(enrPrivKey, enrSeqNum) + builder.addFieldPair(ShardingBitVectorEnrField, bitVector) + let + record = builder.build().tryGet() + relayShards = record.toTyped().tryGet().relayShardingBitVector().get() + + # Then the ENR should be correctly parsed + check: + relayShards == RelayShards.init(clusterId, shardIds).expect("Valid Shards") diff --git a/tests/waku_relay/utils.nim b/tests/waku_relay/utils.nim index d1fed6437..0826cd2f4 100644 --- a/tests/waku_relay/utils.nim +++ b/tests/waku_relay/utils.nim @@ -1,8 +1,30 @@ {.used.} -import std/[strutils], stew/shims/net as stewNet, chronos +import + std/[strutils, sequtils, tempfiles], + stew/byteutils, + stew/shims/net as stewNet, + testutils/unittests, + chronos, + libp2p/switch, + libp2p/protocols/pubsub/pubsub -import ../../../waku/waku_relay, ../../../waku/waku_core, ../testlib/wakucore +from std/times import epochTime + +import + ../../../waku/ + [ + waku_relay, + node/waku_node, + node/peer_manager, + waku_core, + waku_node, + waku_rln_relay, + ], + ../waku_store/store_utils, + ../waku_archive/archive_utils, + ../testlib/[wakucore, wakunode, testasync, futures], + ../resources/payloads proc noopRawHandler*(): WakuRelayHandler = var handler: WakuRelayHandler @@ -19,3 +41,105 @@ proc newTestWakuRelay*(switch = newTestSwitch()): Future[WakuRelay] {.async.} = switch.mount(proto, protocolMatcher) return proto + +proc setupRln*(node: WakuNode, identifier: uint) {.async.} = + await node.mountRlnRelay( + WakuRlnConfig( + rlnRelayDynamic: false, + rlnRelayCredIndex: some(identifier), + rlnRelayTreePath: genTempPath("rln_tree", "wakunode_" & $identifier), + rlnEpochSizeSec: 1, + ) + ) + +proc setupRelayWithRln*( + node: WakuNode, identifier: uint, pubsubTopics: seq[string] +) {.async.} = + await node.mountRelay(pubsubTopics) + await setupRln(node, identifier) + +proc subscribeToContentTopicWithHandler*( + node: WakuNode, contentTopic: string +): Future[bool] = + var completionFut = newFuture[bool]() + proc relayHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + if topic == topic: + completionFut.complete(true) + + node.subscribe((kind: ContentSub, topic: contentTopic), some(relayHandler)) + return completionFut + +proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bool] = + var completionFut = newFuture[bool]() + proc relayHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + if topic == pubsubTopic: + completionFut.complete(true) + + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + return completionFut + +proc sendRlnMessage*( + client: WakuNode, + pubsubTopic: string, + contentTopic: string, + completionFuture: Future[bool], + payload: seq[byte] = "Hello".toBytes(), +): Future[bool] {.async.} = + var message = WakuMessage(payload: payload, contentTopic: contentTopic) + doAssert(client.wakuRlnRelay.appendRLNProof(message, epochTime()).isOk()) + discard await client.publish(some(pubsubTopic), message) + let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT) + return isCompleted + +when defined(rln_v2): + proc sendRlnMessageWithInvalidProof*( + client: WakuNode, + pubsubTopic: string, + contentTopic: string, + completionFuture: Future[bool], + payload: seq[byte] = "Hello".toBytes(), + ): Future[bool] {.async.} = + let + extraBytes: seq[byte] = @[byte(1), 2, 3] + rateLimitProofRes = client.wakuRlnRelay.groupManager.generateProof( + concat(payload, extraBytes), + # we add extra bytes to invalidate proof verification against original payload + client.wakuRlnRelay.getCurrentEpoch(), + messageId = MessageId(0), + ) + rateLimitProof = rateLimitProofRes.get().encode().buffer + message = WakuMessage( + payload: @payload, contentTopic: contentTopic, proof: rateLimitProof + ) + + discard await client.publish(some(pubsubTopic), message) + let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT) + return isCompleted + +else: + proc sendRlnMessageWithInvalidProof*( + client: WakuNode, + pubsubTopic: string, + contentTopic: string, + completionFuture: Future[bool], + payload: seq[byte] = "Hello".toBytes(), + ): Future[bool] {.async.} = + let + extraBytes: seq[byte] = @[byte(1), 2, 3] + rateLimitProofRes = client.wakuRlnRelay.groupManager.generateProof( + concat(payload, extraBytes), + # we add extra bytes to invalidate proof verification against original payload + client.wakuRlnRelay.getCurrentEpoch(), + ) + rateLimitProof = rateLimitProofRes.get().encode().buffer + message = WakuMessage( + payload: @payload, contentTopic: contentTopic, proof: rateLimitProof + ) + + discard await client.publish(some(pubsubTopic), message) + let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT) + return isCompleted