nwaku/tests/node/test_wakunode_sharding.nim

1033 lines
42 KiB
Nim
Raw Normal View History

{.used.}
import
std/[options, sequtils, tempfiles],
testutils/unittests,
chronos,
chronicles,
stew/shims/net as stewNet
import
std/[sequtils, tempfiles],
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
chronos,
libp2p/switch,
libp2p/protocols/pubsub/pubsub
import
waku/[
waku_core/topics/pubsub_topic,
waku_core/topics/sharding,
node/waku_node,
common/paging,
waku_core,
waku_store/common,
node/peer_manager,
waku_filter_v2/client,
],
../waku_relay/utils,
../waku_archive/archive_utils,
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils]
import waku_relay/protocol
const
listenIp = ValidIpAddress.init("0.0.0.0")
listenPort = Port(0)
suite "Sharding":
var
server {.threadvar.}: WakuNode
client {.threadvar.}: WakuNode
asyncSetup:
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, listenIp, listenPort)
client = newTestWakuNode(clientKey, listenIp, listenPort)
await allFutures(server.mountRelay(), client.mountRelay())
await allFutures(server.start(), client.start())
asyncTeardown:
await allFutures(server.stop(), client.stop())
suite "Static Sharding Functionality":
asyncTest "Shard Subscription and Peer Dialing":
# Given a connected server and client subscribed to the same pubsub shard
let
topic = "/waku/2/rs/0/1"
serverHandler = server.subscribeCompletionHandler(topic)
clientHandler = client.subscribeCompletionHandler(topic)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message in the subscribed topic
discard await client.publish(
some(topic),
WakuMessage(payload: "message1".toBytes(), contentTopic: "contentTopic"),
)
# Then the server receives the message
let
serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
assertResultOk(serverResult1)
assertResultOk(clientResult1)
# When the server publishes a message in the subscribed topic
serverHandler.reset()
clientHandler.reset()
discard await server.publish(
some(topic),
WakuMessage(payload: "message2".toBytes(), contentTopic: "contentTopic"),
)
# Then the client receives the message
let
serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
assertResultOk(serverResult2)
assertResultOk(clientResult2)
asyncTest "Exclusion of Non-Subscribed Service Nodes":
# When a connected server and client are subscribed to different pubsub shards
let
topic1 = "/waku/2/rs/0/1"
topic2 = "/waku/2/rs/0/2"
contentTopic = "myContentTopic"
var
serverHandler = server.subscribeCompletionHandler(topic1)
clientHandler = client.subscribeCompletionHandler(topic2)
# await sleepAsync(FUTURE_TIMEOUT)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When a message is published in the server's subscribed topic
discard await client.publish(
some(topic1),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic),
)
let
serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the server receives the message but the client does not
assertResultOk(serverResult1)
check clientResult1.isErr()
# When the server publishes a message in the client's subscribed topic
serverHandler.reset()
clientHandler.reset()
let wakuMessage2 =
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic)
discard await server.publish(some(topic2), wakuMessage2)
let
serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the client receives the message but the server does not
check serverResult2.isErr()
assertResultOk(clientResult2)
suite "Automatic Sharding Mechanics":
asyncTest "Content Topic-Based Shard Dialing":
# Given a connected server and client subscribed to the same content topic (with two different formats)
let
contentTopicShort = "/toychat/2/huilong/proto"
contentTopicFull = "/0/toychat/2/huilong/proto"
pubsubTopic = "/waku/2/rs/0/58355"
# Automatically generated from the contentTopic above
let
serverHandler = server.subscribeToContentTopicWithHandler(contentTopicShort)
clientHandler = client.subscribeToContentTopicWithHandler(contentTopicFull)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message
discard await client.publish(
some(pubsubTopic),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort),
)
let
serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the server and client receive the message
assertResultOk(serverResult1)
assertResultOk(clientResult1)
# When the server publishes a message
serverHandler.reset()
clientHandler.reset()
discard await server.publish(
some(pubsubTopic),
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull),
)
let
serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the client and server receive the message
assertResultOk(serverResult2)
assertResultOk(clientResult2)
asyncTest "Exclusion of Irrelevant Autosharded Topics":
# Given a connected server and client subscribed to different content topics
let
contentTopic1 = "/toychat/2/huilong/proto"
pubsubTopic1 = "/waku/2/rs/0/58355"
pubsubTopic12 = NsPubsubTopic.parse(contentTopic1)
# Automatically generated from the contentTopic above
contentTopic2 = "/0/toychat2/2/huilong/proto"
pubsubTopic2 = "/waku/2/rs/0/23286"
# Automatically generated from the contentTopic above
let
serverHandler = server.subscribeToContentTopicWithHandler(contentTopic1)
clientHandler = client.subscribeToContentTopicWithHandler(contentTopic2)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the server publishes a message in the server's subscribed topic
discard await server.publish(
some(pubsubTopic1),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic1),
)
let
serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the server receives the message but the client does not
assertResultOk(serverResult1)
check clientResult1.isErr()
# When the client publishes a message in the client's subscribed topic
serverHandler.reset()
clientHandler.reset()
discard await client.publish(
some(pubsubTopic2),
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic2),
)
let
serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the client receives the message but the server does not
assertResultOk(clientResult2)
check serverResult2.isErr()
suite "Application Layer Integration":
suite "App Protocol Compatibility":
asyncTest "relay":
# Given a connected server and client subscribed to the same pubsub topic
let
pubsubTopic = "/waku/2/rs/0/1"
serverHandler = server.subscribeCompletionHandler(pubsubTopic)
clientHandler = client.subscribeCompletionHandler(pubsubTopic)
await sleepAsync(FUTURE_TIMEOUT)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message
discard await client.publish(
some(pubsubTopic),
WakuMessage(payload: "message1".toBytes(), contentTopic: "myContentTopic"),
)
let
serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the server and client receive the message
assertResultOk(serverResult1)
assertResultOk(clientResult1)
asyncTest "filter":
# Given a connected server and client using the same pubsub topic
await client.mountFilterClient()
await server.mountFilter()
let pushHandlerFuture = newFuture[(string, WakuMessage)]()
proc messagePushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message))
client.wakuFilterClient.registerPushHandler(messagePushHandler)
let
pubsubTopic = "/waku/2/rs/0/1"
contentTopic = "myContentTopic"
subscribeResponse = await client.filterSubscribe(
some(pubsubTopic),
@[contentTopic],
server.switch.peerInfo.toRemotePeerInfo(),
)
assertResultOk(subscribeResponse)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When a peer publishes a message (the client, for testing easeness)
let msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic)
await server.filterHandleMessage(pubsubTopic, msg)
# Then the client receives the message
let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT)
assertResultOk(pushHandlerResult)
asyncTest "lightpush":
# Given a connected server and client subscribed to the same pubsub topic
client.mountLightPushClient()
await server.mountLightpush()
let
topic = "/waku/2/rs/0/1"
clientHandler = client.subscribeCompletionHandler(topic)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When a peer publishes a message (the client, for testing easeness)
let
msg =
WakuMessage(payload: "message".toBytes(), contentTopic: "myContentTopic")
lightpublishRespnse = await client.lightpushPublish(
some(topic), msg, server.switch.peerInfo.toRemotePeerInfo()
)
# Then the client receives the message
let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT)
assertResultOk(clientResult)
suite "Content Topic Filtering and Routing":
asyncTest "relay (automatic sharding filtering)":
# Given a connected server and client subscribed to the same content topic (with two different formats)
let
contentTopicShort = "/toychat/2/huilong/proto"
contentTopicFull = "/0/toychat/2/huilong/proto"
pubsubTopic = "/waku/2/rs/0/58355"
serverHandler = server.subscribeToContentTopicWithHandler(contentTopicShort)
clientHandler = client.subscribeToContentTopicWithHandler(contentTopicFull)
await sleepAsync(FUTURE_TIMEOUT)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message
discard await client.publish(
some(pubsubTopic),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort),
)
let
serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the server and client receive the message
assertResultOk(serverResult1)
assertResultOk(clientResult1)
# When the server publishes a message
serverHandler.reset()
clientHandler.reset()
discard await server.publish(
some(pubsubTopic),
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull),
)
let
serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the server and client receive the message
assertResultOk(serverResult2)
assertResultOk(clientResult2)
asyncTest "filter (automatic sharding filtering)":
# Given a connected server and client using the same content topic (with two different formats)
await client.mountFilterClient()
await server.mountFilter()
let pushHandlerFuture = newFuture[(string, WakuMessage)]()
proc messagePushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message))
client.wakuFilterClient.registerPushHandler(messagePushHandler)
let
contentTopicShort = "/toychat/2/huilong/proto"
contentTopicFull = "/0/toychat/2/huilong/proto"
pubsubTopic = "/waku/2/rs/0/58355"
subscribeResponse1 = await client.filterSubscribe(
some(pubsubTopic),
@[contentTopicShort],
server.switch.peerInfo.toRemotePeerInfo(),
)
assertResultOk(subscribeResponse1)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message
let msg =
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopicShort)
await server.filterHandleMessage(pubsubTopic, msg)
# Then the client receives the message
let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT)
assertResultOk(pushHandlerResult)
check pushHandlerResult.get() == (pubsubTopic, msg)
# Given the subscription is cleared and a new subscription is made
let
unsubscribeResponse =
await client.filterUnsubscribeAll(server.switch.peerInfo.toRemotePeerInfo())
subscribeResponse2 = await client.filterSubscribe(
some(pubsubTopic),
@[contentTopicFull],
server.switch.peerInfo.toRemotePeerInfo(),
)
assertResultOk(unsubscribeResponse)
assertResultOk(subscribeResponse2)
# When the client publishes a message
pushHandlerFuture.reset()
let msg2 =
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopicFull)
await server.filterHandleMessage(pubsubTopic, msg2)
# Then the client receives the message
let pushHandlerResult2 = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT)
assertResultOk(pushHandlerResult2)
check pushHandlerResult2.get() == (pubsubTopic, msg2)
asyncTest "lightpush (automatic sharding filtering)":
# Given a connected server and client using the same content topic (with two different formats)
client.mountLightPushClient()
await server.mountLightpush()
let
contentTopicShort = "/toychat/2/huilong/proto"
contentTopicFull = "/0/toychat/2/huilong/proto"
pubsubTopic = "/waku/2/rs/0/58355"
clientHandler = client.subscribeToContentTopicWithHandler(contentTopicShort)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When a peer publishes a message (the client, for testing easeness)
let
msg =
WakuMessage(payload: "message".toBytes(), contentTopic: contentTopicFull)
lightpublishRespnse = await client.lightpushPublish(
some(pubsubTopic), msg, server.switch.peerInfo.toRemotePeerInfo()
)
# Then the client receives the message
let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT)
assertResultOk(clientResult)
xasyncTest "store (automatic sharding filtering)":
# Given one archive with two sets of messages using the same content topic (with two different formats)
let
timeOrigin = now()
contentTopicShort = "/toychat/2/huilong/proto"
contentTopicFull = "/0/toychat/2/huilong/proto"
pubsubTopic = "/waku/2/rs/0/58355"
archiveMessages1 =
@[
fakeWakuMessage(
@[byte 00], ts = ts(00, timeOrigin), contentTopic = contentTopicShort
)
]
archiveMessages2 =
@[
fakeWakuMessage(
@[byte 01], ts = ts(10, timeOrigin), contentTopic = contentTopicFull
)
]
archiveDriver = newArchiveDriverWithMessages(pubsubTopic, archiveMessages1)
discard archiveDriver.put(pubsubTopic, archiveMessages2)
let mountArchiveResult = server.mountArchive(archiveDriver)
assertResultOk(mountArchiveResult)
waitFor server.mountStore()
client.mountStoreClient()
# Given one query for each content topic format
let
historyQuery1 = HistoryQuery(
contentTopics: @[contentTopicShort],
direction: PagingDirection.Forward,
pageSize: 3,
)
historyQuery2 = HistoryQuery(
contentTopics: @[contentTopicFull],
direction: PagingDirection.Forward,
pageSize: 3,
)
# When the client queries the server for the messages
let
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo)
queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
assertResultOk(queryResponse1)
assertResultOk(queryResponse2)
# Then the responses of both queries should contain all the messages
check:
queryResponse1.get().messages == archiveMessages1 & archiveMessages2
queryResponse2.get().messages == archiveMessages1 & archiveMessages2
asyncTest "relay - exclusion (automatic sharding filtering)":
# Given a connected server and client subscribed to different content topics
let
contentTopic1 = "/toychat/2/huilong/proto"
pubsubTopic1 = "/waku/2/rs/0/58355"
# Automatically generated from the contentTopic above
contentTopic2 = "/0/toychat2/2/huilong/proto"
pubsubTopic2 = "/waku/2/rs/0/23286"
# Automatically generated from the contentTopic above
serverHandler = server.subscribeToContentTopicWithHandler(contentTopic1)
clientHandler = client.subscribeToContentTopicWithHandler(contentTopic2)
await sleepAsync(FUTURE_TIMEOUT)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message in the client's subscribed topic
discard await client.publish(
some(pubsubTopic2),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic2),
)
let
serverResult1 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult1 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the client receives the message but the server does not
check serverResult1.isErr()
assertResultOk(clientResult1)
# When the server publishes a message in the server's subscribed topic
serverHandler.reset()
clientHandler.reset()
discard await server.publish(
some(pubsubTopic1),
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic1),
)
let
serverResult2 = await serverHandler.waitForResult(FUTURE_TIMEOUT)
clientResult2 = await clientHandler.waitForResult(FUTURE_TIMEOUT)
# Then the server receives the message but the client does not
assertResultOk(serverResult2)
check clientResult2.isErr()
asyncTest "filter - exclusion (automatic sharding filtering)":
# Given a connected server and client using different content topics
await client.mountFilterClient()
await server.mountFilter()
let pushHandlerFuture = newFuture[(string, WakuMessage)]()
proc messagePushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message))
client.wakuFilterClient.registerPushHandler(messagePushHandler)
let
contentTopic1 = "/toychat/2/huilong/proto"
pubsubTopic1 = "/waku/2/rs/0/58355"
# Automatically generated from the contentTopic above
contentTopic2 = "/0/toychat2/2/huilong/proto"
pubsubTopic2 = "/waku/2/rs/0/23286"
# Automatically generated from the contentTopic above
subscribeResponse1 = await client.filterSubscribe(
some(pubsubTopic1),
@[contentTopic1],
server.switch.peerInfo.toRemotePeerInfo(),
)
assertResultOk(subscribeResponse1)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the server publishes a message in the server's subscribed topic
let msg =
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic2)
await server.filterHandleMessage(pubsubTopic2, msg)
# Then the client does not receive the message
let pushHandlerResult = await pushHandlerFuture.waitForResult(FUTURE_TIMEOUT)
check pushHandlerResult.isErr()
asyncTest "lightpush - exclusion (automatic sharding filtering)":
# Given a connected server and client using different content topics
client.mountLightPushClient()
await server.mountLightpush()
let
contentTopic1 = "/toychat/2/huilong/proto"
pubsubTopic1 = "/waku/2/rs/0/58355"
# Automatically generated from the contentTopic above
contentTopic2 = "/0/toychat2/2/huilong/proto"
pubsubTopic2 = "/waku/2/rs/0/23286"
# Automatically generated from the contentTopic above
clientHandler = client.subscribeToContentTopicWithHandler(contentTopic1)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When a peer publishes a message in the server's subscribed topic (the client, for testing easeness)
let
msg = WakuMessage(payload: "message".toBytes(), contentTopic: contentTopic2)
lightpublishRespnse = await client.lightpushPublish(
some(pubsubTopic2), msg, server.switch.peerInfo.toRemotePeerInfo()
)
# Then the client does not receive the message
let clientResult = await clientHandler.waitForResult(FUTURE_TIMEOUT)
check clientResult.isErr()
asyncTest "store - exclusion (automatic sharding filtering)":
# Given one archive with two sets of messages using different content topics
let
timeOrigin = now()
contentTopic1 = "/toychat/2/huilong/proto"
pubsubTopic1 = "/waku/2/rs/0/58355"
# Automatically generated from the contentTopic above
contentTopic2 = "/0/toychat2/2/huilong/proto"
pubsubTopic2 = "/waku/2/rs/0/23286"
# Automatically generated from the contentTopic above
archiveMessages1 =
@[
fakeWakuMessage(
@[byte 00], ts = ts(00, timeOrigin), contentTopic = contentTopic1
)
]
archiveMessages2 =
@[
fakeWakuMessage(
@[byte 01], ts = ts(10, timeOrigin), contentTopic = contentTopic2
)
]
archiveDriver = newArchiveDriverWithMessages(pubsubTopic1, archiveMessages1)
discard archiveDriver.put(pubsubTopic2, archiveMessages2)
let mountArchiveResult = server.mountArchive(archiveDriver)
assertResultOk(mountArchiveResult)
waitFor server.mountStore()
client.mountStoreClient()
# Given one query for each content topic
let
historyQuery1 = HistoryQuery(
contentTopics: @[contentTopic1],
direction: PagingDirection.Forward,
pageSize: 2,
)
historyQuery2 = HistoryQuery(
contentTopics: @[contentTopic2],
direction: PagingDirection.Forward,
pageSize: 2,
)
# When the client queries the server for the messages
let
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo)
queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
assertResultOk(queryResponse1)
assertResultOk(queryResponse2)
# Then each response should contain only the messages of the corresponding content topic
check:
queryResponse1.get().messages == archiveMessages1
queryResponse2.get().messages == archiveMessages2
suite "Specific Tests":
asyncTest "Configure Node with Multiple PubSub Topics":
# Given a connected server and client subscribed to multiple pubsub topics
let
contentTopic = "myContentTopic"
topic1 = "/waku/2/rs/0/1"
topic2 = "/waku/2/rs/0/2"
serverHandler1 = server.subscribeCompletionHandler(topic1)
serverHandler2 = server.subscribeCompletionHandler(topic2)
clientHandler1 = client.subscribeCompletionHandler(topic1)
clientHandler2 = client.subscribeCompletionHandler(topic2)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message in the topic1
discard await client.publish(
some(topic1),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic),
)
# Then the server and client receive the message in topic1's handlers, but not in topic2's
assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT))
check:
(await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
# When the client publishes a message in the topic2
serverHandler1.reset()
serverHandler2.reset()
clientHandler1.reset()
clientHandler2.reset()
discard await client.publish(
some(topic2),
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic),
)
# Then the server and client receive the message in topic2's handlers, but not in topic1's
assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT))
check:
(await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
asyncTest "Configure Node with Multiple Content Topics":
# Given a connected server and client subscribed to multiple content topics
let
contentTopic1 = "/toychat/2/huilong/proto"
pubsubTopic1 = "/waku/2/rs/0/58355"
# Automatically generated from the contentTopic above
contentTopic2 = "/0/toychat2/2/huilong/proto"
pubsubTopic2 = "/waku/2/rs/0/23286"
# Automatically generated from the contentTopic above
serverHandler1 = server.subscribeToContentTopicWithHandler(contentTopic1)
serverHandler2 = server.subscribeToContentTopicWithHandler(contentTopic2)
clientHandler1 = client.subscribeToContentTopicWithHandler(contentTopic1)
clientHandler2 = client.subscribeToContentTopicWithHandler(contentTopic2)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message in contentTopic1
discard await client.publish(
some(pubsubTopic1),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic1),
)
# Then the server and client receive the message in contentTopic1's handlers, but not in contentTopic2's
assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT))
check:
(await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
# When the client publishes a message in contentTopic2
serverHandler1.reset()
serverHandler2.reset()
clientHandler1.reset()
clientHandler2.reset()
discard await client.publish(
some(pubsubTopic2),
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic2),
)
# Then the server and client receive the message in contentTopic2's handlers, but not in contentTopic1's
assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT))
check:
(await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
asyncTest "Configure Node combining Multiple Pubsub and Content Topics":
# Given a connected server and client subscribed to multiple pubsub topics and content topics
let
contentTopic = "myContentTopic"
pubsubTopic1 = "/waku/2/rs/0/1"
pubsubTopic2 = "/waku/2/rs/0/2"
serverHandler1 = server.subscribeCompletionHandler(pubsubTopic1)
clientHandler1 = client.subscribeCompletionHandler(pubsubTopic1)
serverHandler2 = server.subscribeCompletionHandler(pubsubTopic2)
clientHandler2 = client.subscribeCompletionHandler(pubsubTopic2)
contentTopic3 = "/toychat/2/huilong/proto"
pubsubTopic3 = "/waku/2/rs/0/58355"
# Automatically generated from the contentTopic above
contentTopic4 = "/0/toychat2/2/huilong/proto"
pubsubTopic4 = "/waku/2/rs/0/23286"
# Automatically generated from the contentTopic above
serverHandler3 = server.subscribeToContentTopicWithHandler(contentTopic3)
clientHandler3 = client.subscribeToContentTopicWithHandler(contentTopic3)
serverHandler4 = server.subscribeToContentTopicWithHandler(contentTopic4)
clientHandler4 = client.subscribeToContentTopicWithHandler(contentTopic4)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message in the topic1
discard await client.publish(
some(pubsubTopic1),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic),
)
# Then the server and client receive the message in topic1's handlers, but not in topic234's
assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT))
check:
(await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
# When the client publishes a message in the topic2
serverHandler1.reset()
clientHandler1.reset()
serverHandler2.reset()
clientHandler2.reset()
serverHandler3.reset()
clientHandler3.reset()
serverHandler4.reset()
clientHandler4.reset()
discard await client.publish(
some(pubsubTopic2),
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic),
)
# Then the server and client receive the message in topic2's handlers, but not in topic134's
assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT))
check:
(await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
# When the client publishes a message in the topic3
serverHandler1.reset()
clientHandler1.reset()
serverHandler2.reset()
clientHandler2.reset()
serverHandler3.reset()
clientHandler3.reset()
serverHandler4.reset()
clientHandler4.reset()
discard await client.publish(
some(pubsubTopic3),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic3),
)
# Then the server and client receive the message in topic3's handlers, but not in topic124's
assertResultOk(await serverHandler3.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await clientHandler3.waitForResult(FUTURE_TIMEOUT))
check:
(await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler4.waitForResult(FUTURE_TIMEOUT)).isErr()
# When the client publishes a message in the topic4
serverHandler1.reset()
clientHandler1.reset()
serverHandler2.reset()
clientHandler2.reset()
serverHandler3.reset()
clientHandler3.reset()
serverHandler4.reset()
clientHandler4.reset()
discard await client.publish(
some(pubsubTopic4),
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic4),
)
# Then the server and client receive the message in topic4's handlers, but not in topic123's
assertResultOk(await serverHandler4.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await clientHandler4.waitForResult(FUTURE_TIMEOUT))
check:
(await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler3.waitForResult(FUTURE_TIMEOUT)).isErr()
asyncTest "Protocol with Unconfigured PubSub Topic Fails":
# Given a
let
contentTopic = "myContentTopic"
topic = "/waku/2/rs/0/1"
# Using a different topic to simulate "unconfigured" pubsub topic
# but to have a handler (and be able to assert the test)
serverHandler = server.subscribeCompletionHandler("/waku/2/rs/0/0")
clientHandler = client.subscribeCompletionHandler("/waku/2/rs/0/0")
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message in the topic
discard await client.publish(
some(topic),
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic),
)
# Then the server and client don't receive the message
check:
(await serverHandler.waitForResult(FUTURE_TIMEOUT)).isErr()
(await clientHandler.waitForResult(FUTURE_TIMEOUT)).isErr()
asyncTest "Waku LightPush Sharding (Static Sharding)":
# Given a connected server and client using two different pubsub topics
client.mountLightPushClient()
await server.mountLightpush()
# Given a connected server and client subscribed to multiple pubsub topics
let
contentTopic = "myContentTopic"
topic1 = "/waku/2/rs/0/1"
topic2 = "/waku/2/rs/0/2"
serverHandler1 = server.subscribeCompletionHandler(topic1)
serverHandler2 = server.subscribeCompletionHandler(topic2)
clientHandler1 = client.subscribeCompletionHandler(topic1)
clientHandler2 = client.subscribeCompletionHandler(topic2)
await sleepAsync(FUTURE_TIMEOUT)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When a peer publishes a message (the client, for testing easeness) in topic1
let
msg1 = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic)
lightpublishRespnse = await client.lightpushPublish(
some(topic1), msg1, server.switch.peerInfo.toRemotePeerInfo()
)
# Then the server and client receive the message in topic1's handlers, but not in topic2's
assertResultOk(await clientHandler1.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await serverHandler1.waitForResult(FUTURE_TIMEOUT))
check:
(await clientHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler2.waitForResult(FUTURE_TIMEOUT)).isErr()
# When a peer publishes a message (the client, for testing easeness) in topic2
serverHandler1.reset()
serverHandler2.reset()
clientHandler1.reset()
clientHandler2.reset()
let
msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic)
lightpublishResponse2 = await client.lightpushPublish(
some(topic2), msg2, server.switch.peerInfo.toRemotePeerInfo()
)
# Then the server and client receive the message in topic2's handlers, but not in topic1's
assertResultOk(await clientHandler2.waitForResult(FUTURE_TIMEOUT))
assertResultOk(await serverHandler2.waitForResult(FUTURE_TIMEOUT))
check:
(await clientHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
(await serverHandler1.waitForResult(FUTURE_TIMEOUT)).isErr()
asyncTest "Waku Filter Sharding (Static Sharding)":
# Given a connected server and client using two different pubsub topics
await client.mountFilterClient()
await server.mountFilter()
let
contentTopic = "myContentTopic"
topic1 = "/waku/2/rs/0/1"
topic2 = "/waku/2/rs/0/2"
let
pushHandlerFuture1 = newFuture[(string, WakuMessage)]()
pushHandlerFuture2 = newFuture[(string, WakuMessage)]()
proc messagePushHandler1(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, closure, gcsafe.} =
if topic1 == pubsubTopic:
pushHandlerFuture1.complete((pubsubTopic, message))
proc messagePushHandler2(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, closure, gcsafe.} =
if topic2 == pubsubTopic:
pushHandlerFuture2.complete((pubsubTopic, message))
client.wakuFilterClient.registerPushHandler(messagePushHandler1)
client.wakuFilterClient.registerPushHandler(messagePushHandler2)
let
subscribeResponse1 = await client.filterSubscribe(
some(topic1), @[contentTopic], server.switch.peerInfo.toRemotePeerInfo()
)
subscribeResponse2 = await client.filterSubscribe(
some(topic2), @[contentTopic], server.switch.peerInfo.toRemotePeerInfo()
)
assertResultOk(subscribeResponse1)
assertResultOk(subscribeResponse2)
await client.connectToNodes(@[server.switch.peerInfo.toRemotePeerInfo()])
# When the client publishes a message in topic1
let msg = WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic)
await server.filterHandleMessage(topic1, msg)
# Then the client receives the message in topic1's handler, but not in topic2's
let pushHandlerResult = await pushHandlerFuture1.waitForResult(FUTURE_TIMEOUT)
assertResultOk(pushHandlerResult)
check:
pushHandlerResult.get() == (topic1, msg)
(await pushHandlerFuture2.waitForResult(FUTURE_TIMEOUT)).isErr()
# Given the futures are reset
pushHandlerFuture1.reset()
pushHandlerFuture2.reset()
# When the client publishes a message in topic2
let msg2 = WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic)
await server.filterHandleMessage(topic2, msg2)
# Then the client receives the message in topic2's handler, but not in topic1's
let pushHandlerResult2 = await pushHandlerFuture2.waitForResult(FUTURE_TIMEOUT)
assertResultOk(pushHandlerResult2)
check:
pushHandlerResult2.get() == (topic2, msg2)
(await pushHandlerFuture1.waitForResult(FUTURE_TIMEOUT)).isErr()
asyncTest "Waku Store Sharding (Static Sharding)":
# Given one archive with two sets of messages using two different pubsub topics
let
timeOrigin = now()
topic1 = "/waku/2/rs/0/1"
topic2 = "/waku/2/rs/0/2"
archiveMessages1 = @[fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin))]
archiveMessages2 = @[fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin))]
archiveDriver = newArchiveDriverWithMessages(topic1, archiveMessages1)
discard archiveDriver.put(topic2, archiveMessages2)
let mountArchiveResult = server.mountArchive(archiveDriver)
assertResultOk(mountArchiveResult)
waitFor server.mountStore()
client.mountStoreClient()
# Given one query for each pubsub topic
let
historyQuery1 = HistoryQuery(
pubsubTopic: some(topic1), direction: PagingDirection.Forward, pageSize: 2
)
historyQuery2 = HistoryQuery(
pubsubTopic: some(topic2), direction: PagingDirection.Forward, pageSize: 2
)
# When the client queries the server for the messages
let
serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
queryResponse1 = await client.query(historyQuery1, serverRemotePeerInfo)
queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
assertResultOk(queryResponse1)
assertResultOk(queryResponse2)
# Then each response should contain only the messages of the corresponding pubsub topic
check:
queryResponse1.get().messages == archiveMessages1[0 ..< 1]
queryResponse2.get().messages == archiveMessages2[0 ..< 1]