mirror of https://github.com/waku-org/nwaku.git
test(relay-filter): cleanup (#2138)
* Fix some tests. * Clean legacy tests. * Fix imports.
This commit is contained in:
parent
f5f431382b
commit
bd25191a74
|
@ -41,7 +41,7 @@ when defined(waku_exp_store_resume):
|
|||
|
||||
|
||||
import
|
||||
./waku_relay/test_all
|
||||
./waku_relay/test_all,
|
||||
./waku_filter_v2/test_all
|
||||
|
||||
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
import
|
||||
chronicles,
|
||||
chronos
|
||||
|
||||
import ../../../waku/waku_core/message
|
||||
|
||||
|
||||
let FUTURE_TIMEOUT* = 1.seconds
|
||||
|
||||
proc newPushHandlerFuture*(): Future[(string, WakuMessage)] =
|
||||
newFuture[(string, WakuMessage)]()
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,19 +1,24 @@
|
|||
import
|
||||
std/[options,tables],
|
||||
std/[sequtils,sets,strutils],
|
||||
testutils/unittests,
|
||||
std/[
|
||||
options,
|
||||
tables,
|
||||
sets
|
||||
],
|
||||
chronos,
|
||||
chronicles
|
||||
|
||||
import
|
||||
../../../waku/node/peer_manager,
|
||||
../../../waku/waku_filter_v2,
|
||||
../../../waku/waku_filter_v2/client,
|
||||
../../../waku/waku_filter_v2/subscriptions,
|
||||
../../../waku/waku_filter_v2/rpc,
|
||||
../../../waku/waku_core,
|
||||
../testlib/common,
|
||||
../testlib/wakucore
|
||||
../../../waku/[
|
||||
node/peer_manager,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client,
|
||||
waku_core
|
||||
],
|
||||
../testlib/[
|
||||
common,
|
||||
wakucore
|
||||
]
|
||||
|
||||
|
||||
proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} =
|
||||
let
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
./test_waku_relay,
|
||||
./test_wakunode_relay,
|
||||
./test_message_id,
|
||||
./test_protocol
|
||||
./test_protocol,
|
||||
./test_wakunode_relay
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
import
|
||||
unittest,
|
||||
stew/shims/net,
|
||||
stew/[results, byteutils]
|
||||
stew/[
|
||||
shims/net,
|
||||
results,
|
||||
byteutils
|
||||
],
|
||||
nimcrypto/sha2,
|
||||
libp2p/protocols/pubsub/rpc/messages
|
||||
|
||||
import
|
||||
stew/results,
|
||||
nimcrypto/sha2,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
../../../waku/waku_relay/message_id,
|
||||
../testlib/sequtils
|
||||
|
||||
|
|
|
@ -40,11 +40,14 @@ suite "Waku Relay":
|
|||
var messageSeq {.threadvar.}: seq[(PubsubTopic, WakuMessage)]
|
||||
var handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
|
||||
var simpleFutureHandler {.threadvar.}: WakuRelayHandler
|
||||
|
||||
var switch {.threadvar.}: Switch
|
||||
var peerManager {.threadvar.}: PeerManager
|
||||
var node {.threadvar.}: WakuRelay
|
||||
|
||||
var remotePeerInfo {.threadvar.}: RemotePeerInfo
|
||||
var peerId {.threadvar.}: PeerId
|
||||
|
||||
var contentTopic {.threadvar.}: ContentTopic
|
||||
var pubsubTopic {.threadvar.}: PubsubTopic
|
||||
var pubsubTopicSeq {.threadvar.}: seq[PubsubTopic]
|
||||
|
@ -85,7 +88,7 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is not published
|
||||
check:
|
||||
not await handlerFuture.withTimeout(3.seconds)
|
||||
not await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
asyncTest "Publish with Subscription (Network Size: 1)":
|
||||
# When subscribing to a Pubsub Topic
|
||||
|
@ -100,7 +103,7 @@ suite "Waku Relay":
|
|||
discard await node.publish(pubsubTopic, wakuMessage)
|
||||
|
||||
# Then the message is published
|
||||
assert (await handlerFuture.withTimeout(3.seconds))
|
||||
assert (await handlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
let (topic, msg) = handlerFuture.read()
|
||||
check:
|
||||
topic == pubsubTopic
|
||||
|
@ -138,8 +141,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is published only in the subscribed node
|
||||
check:
|
||||
not await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
not await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let (otherTopic1, otherMessage1) = otherHandlerFuture.read()
|
||||
check:
|
||||
|
@ -154,8 +157,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is published only in the subscribed node
|
||||
check:
|
||||
not await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
not await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let (otherTopic2, otherMessage2) = otherHandlerFuture.read()
|
||||
check:
|
||||
|
@ -198,8 +201,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is published in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let
|
||||
(topic1, message1) = handlerFuture.read()
|
||||
|
@ -219,8 +222,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is published in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let
|
||||
(topic2, message2) = handlerFuture.read()
|
||||
|
@ -260,7 +263,7 @@ suite "Waku Relay":
|
|||
discard await node.publish(pubsubTopic, wakuMessage)
|
||||
|
||||
# Then the message is published
|
||||
check (await handlerFuture.withTimeout(3.seconds))
|
||||
check (await handlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
let (topic, msg) = handlerFuture.read()
|
||||
check:
|
||||
topic == pubsubTopic
|
||||
|
@ -315,14 +318,14 @@ suite "Waku Relay":
|
|||
# Then the validator is ran in the other node, and fails
|
||||
# Not run in the self node
|
||||
check:
|
||||
await validatorFuture.withTimeout(3.seconds)
|
||||
await validatorFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
validatorFuture.read() == false
|
||||
|
||||
# And the message is published in the self node, but not in the other node,
|
||||
# because it doesn't pass the validator check.
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
not await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
let (topic1, msg1) = handlerFuture.read()
|
||||
# let (otherTopic1, otherMsg1) = otherHandlerFuture.read()
|
||||
check:
|
||||
|
@ -341,13 +344,13 @@ suite "Waku Relay":
|
|||
# Then the validator is ran in the other node, and succeeds
|
||||
# Not run in the self node
|
||||
check:
|
||||
await validatorFuture.withTimeout(3.seconds)
|
||||
await validatorFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
validatorFuture.read() == true
|
||||
|
||||
# And the message is published in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
let (topic2, msg2) = handlerFuture.read()
|
||||
let (otherTopic2, otherMsg2) = otherHandlerFuture.read()
|
||||
check:
|
||||
|
@ -403,8 +406,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is published in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
let (topic1, msg1) = handlerFuture.read()
|
||||
let (otherTopic1, otherMsg1) = otherHandlerFuture.read()
|
||||
check:
|
||||
|
@ -497,12 +500,12 @@ suite "Waku Relay":
|
|||
# Then the messages are published in all nodes (because it's published in the center node)
|
||||
# Center meaning that all other nodes are connected to this one
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture2.withTimeout(3.seconds)
|
||||
await otherHandlerFuture1.withTimeout(3.seconds)
|
||||
await otherHandlerFuture2.withTimeout(3.seconds)
|
||||
await anotherHandlerFuture1.withTimeout(3.seconds)
|
||||
await anotherHandlerFuture2.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await handlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let
|
||||
(topic1, msg1) = handlerFuture.read()
|
||||
|
@ -553,12 +556,12 @@ suite "Waku Relay":
|
|||
# Then the message is published in node and otherNode,
|
||||
# but not in anotherNode because it is not connected anymore
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture2.withTimeout(3.seconds)
|
||||
await otherHandlerFuture1.withTimeout(3.seconds)
|
||||
await otherHandlerFuture2.withTimeout(3.seconds)
|
||||
not await anotherHandlerFuture1.withTimeout(3.seconds)
|
||||
not await anotherHandlerFuture2.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await handlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
not await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
not await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let
|
||||
(topic3, msg3) = handlerFuture.read()
|
||||
|
@ -596,12 +599,12 @@ suite "Waku Relay":
|
|||
# Then the messages are only published in anotherNode because it's disconnected from
|
||||
# the rest of the network
|
||||
check:
|
||||
not await handlerFuture.withTimeout(3.seconds)
|
||||
not await handlerFuture2.withTimeout(3.seconds)
|
||||
not await otherHandlerFuture1.withTimeout(3.seconds)
|
||||
not await otherHandlerFuture2.withTimeout(3.seconds)
|
||||
await anotherHandlerFuture1.withTimeout(3.seconds)
|
||||
await anotherHandlerFuture2.withTimeout(3.seconds)
|
||||
not await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
not await handlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
not await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
not await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let
|
||||
(anotherTopic3, anotherMsg3) = anotherHandlerFuture1.read()
|
||||
|
@ -633,12 +636,12 @@ suite "Waku Relay":
|
|||
# Then the messages are only published in otherNode and node, but not in anotherNode
|
||||
# because it's disconnected from the rest of the network
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture2.withTimeout(3.seconds)
|
||||
await otherHandlerFuture1.withTimeout(3.seconds)
|
||||
await otherHandlerFuture2.withTimeout(3.seconds)
|
||||
not await anotherHandlerFuture1.withTimeout(3.seconds)
|
||||
not await anotherHandlerFuture2.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await handlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
not await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
not await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let
|
||||
(topic5, msg5) = handlerFuture.read()
|
||||
|
@ -683,12 +686,12 @@ suite "Waku Relay":
|
|||
# even if they're connected like so AnotherNode <-> OtherNode <-> Node,
|
||||
# otherNode doesn't broadcast B topic messages because it's not subscribed to it
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
not await handlerFuture2.withTimeout(3.seconds)
|
||||
await otherHandlerFuture1.withTimeout(3.seconds)
|
||||
await otherHandlerFuture2.withTimeout(3.seconds)
|
||||
await anotherHandlerFuture1.withTimeout(3.seconds)
|
||||
await anotherHandlerFuture2.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
not await handlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
await anotherHandlerFuture1.withTimeout(FUTURE_TIMEOUT)
|
||||
await anotherHandlerFuture2.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
let
|
||||
(topic7, msg7) = handlerFuture.read()
|
||||
|
@ -860,8 +863,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg1) == handlerFuture.read()
|
||||
(pubsubTopic, msg1) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -872,8 +875,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg2) == handlerFuture.read()
|
||||
(pubsubTopic, msg2) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -884,8 +887,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg3) == handlerFuture.read()
|
||||
(pubsubTopic, msg3) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -896,8 +899,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg4) == handlerFuture.read()
|
||||
(pubsubTopic, msg4) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -908,8 +911,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg5) == handlerFuture.read()
|
||||
(pubsubTopic, msg5) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -920,8 +923,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg6) == handlerFuture.read()
|
||||
(pubsubTopic, msg6) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -932,8 +935,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg7) == handlerFuture.read()
|
||||
(pubsubTopic, msg7) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -944,8 +947,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg8) == handlerFuture.read()
|
||||
(pubsubTopic, msg8) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -956,8 +959,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg9) == handlerFuture.read()
|
||||
(pubsubTopic, msg9) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -968,8 +971,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg10) == handlerFuture.read()
|
||||
(pubsubTopic, msg10) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -1004,10 +1007,10 @@ suite "Waku Relay":
|
|||
msg1 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024)) # 1KiB
|
||||
msg2 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(10*1024)) # 10KiB
|
||||
msg3 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(100*1024)) # 100KiB
|
||||
msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1023*1024)) # 1MiB - 1B -> Max Size (Inclusive Limit)
|
||||
msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024)) # 1MiB -> Max Size (Exclusive Limit)
|
||||
msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1025*1024)) # 1MiB + 1B -> Out of Max Size
|
||||
|
||||
msg4 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024 - 1)) # 1MiB - 1B -> Max Size (Inclusive Limit)
|
||||
msg5 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024)) # 1MiB -> Max Size (Exclusive Limit)
|
||||
msg6 = fakeWakuMessage(contentTopic=contentTopic, payload=getByteSequence(1024*1024 + 1)) # 1MiB + 1B -> Out of Max Size
|
||||
|
||||
# When sending the 1KiB message
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
|
@ -1015,11 +1018,11 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg1) == handlerFuture.read()
|
||||
(pubsubTopic, msg1) == otherHandlerFuture.read()
|
||||
|
||||
|
||||
# When sending the 10KiB message
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
|
@ -1027,23 +1030,23 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg2) == handlerFuture.read()
|
||||
(pubsubTopic, msg2) == otherHandlerFuture.read()
|
||||
|
||||
|
||||
# When sending the 100KiB message
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
discard await node.publish(pubsubTopic, msg3)
|
||||
|
||||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg3) == handlerFuture.read()
|
||||
(pubsubTopic, msg3) == otherHandlerFuture.read()
|
||||
|
||||
|
||||
# When sending the 1023KiB message
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
|
@ -1051,8 +1054,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg4) == handlerFuture.read()
|
||||
(pubsubTopic, msg4) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -1063,8 +1066,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in self, because there's no checking, but not in other node
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
not await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg5) == handlerFuture.read()
|
||||
|
||||
# When sending the 1025KiB message
|
||||
|
@ -1074,8 +1077,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in self, because there's no checking, but not in other node
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
not await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg6) == handlerFuture.read()
|
||||
|
||||
# Finally stop the other node
|
||||
|
@ -1120,31 +1123,31 @@ suite "Waku Relay":
|
|||
msg4 = fakeWakuMessage("msg4", pubsubTopic)
|
||||
|
||||
discard await node.publish(pubsubTopic, msg1)
|
||||
check await thisHandlerFuture.withTimeout(3.seconds)
|
||||
check await otherHandlerFuture.withTimeout(3.seconds)
|
||||
check await thisHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
check await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
thisHandlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
discard await node.publish(pubsubTopic, msg2)
|
||||
check await thisHandlerFuture.withTimeout(3.seconds)
|
||||
check await otherHandlerFuture.withTimeout(3.seconds)
|
||||
check await thisHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
check await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
thisHandlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
discard await node.publish(pubsubTopic, msg3)
|
||||
check await thisHandlerFuture.withTimeout(3.seconds)
|
||||
check await otherHandlerFuture.withTimeout(3.seconds)
|
||||
check await thisHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
check await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
thisHandlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
discard await node.publish(pubsubTopic, msg4)
|
||||
|
||||
check:
|
||||
await thisHandlerFuture.withTimeout(3.seconds)
|
||||
await thisHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
thisMessageSeq == @[
|
||||
(pubsubTopic, msg1),
|
||||
(pubsubTopic, msg2),
|
||||
(pubsubTopic, msg3),
|
||||
(pubsubTopic, msg4)
|
||||
]
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
otherMessageSeq == @[
|
||||
(pubsubTopic, msg1),
|
||||
(pubsubTopic, msg2),
|
||||
|
@ -1194,8 +1197,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg1) == handlerFuture.read()
|
||||
(pubsubTopic, msg1) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -1207,8 +1210,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg2) == handlerFuture.read()
|
||||
(pubsubTopic, msg2) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -1224,8 +1227,8 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg3) == handlerFuture.read()
|
||||
(pubsubTopic, msg3) == otherHandlerFuture.read()
|
||||
|
||||
|
@ -1237,15 +1240,15 @@ suite "Waku Relay":
|
|||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg4) == handlerFuture.read()
|
||||
(pubsubTopic, msg4) == otherHandlerFuture.read()
|
||||
|
||||
# Finally stop the other node
|
||||
await allFutures(otherSwitch.stop(), otherNode.stop())
|
||||
|
||||
xasyncTest "Relay can receive messages after subscribing and stopping without unsubscribing":
|
||||
asyncTest "Relay can't receive messages after subscribing and stopping without unsubscribing":
|
||||
# Given a second node connected to the first one
|
||||
let
|
||||
otherSwitch = newTestSwitch()
|
||||
|
@ -1272,7 +1275,6 @@ suite "Waku Relay":
|
|||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
# FIXME: Inconsistent behaviour with Filter protocol.
|
||||
# Given other node is stopped without unsubscribing
|
||||
await allFutures(otherSwitch.stop(), otherNode.stop())
|
||||
|
||||
|
@ -1280,22 +1282,20 @@ suite "Waku Relay":
|
|||
let msg1 = fakeWakuMessage(testMessage, pubsubTopic)
|
||||
discard await node.publish(pubsubTopic, msg1)
|
||||
|
||||
# Then the message is received in both nodes
|
||||
# Then the message is not received in any node
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
not await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg1) == handlerFuture.read()
|
||||
(pubsubTopic, msg1) == otherHandlerFuture.read()
|
||||
|
||||
|
||||
# When sending a message from other node
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
otherHandlerFuture = newPushHandlerFuture()
|
||||
let msg2 = fakeWakuMessage(testMessage, pubsubTopic)
|
||||
discard await otherNode.publish(pubsubTopic, msg2)
|
||||
|
||||
|
||||
# Then the message is received in both nodes
|
||||
check:
|
||||
await handlerFuture.withTimeout(3.seconds)
|
||||
await otherHandlerFuture.withTimeout(3.seconds)
|
||||
(pubsubTopic, msg2) == handlerFuture.read()
|
||||
not await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
await otherHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
(pubsubTopic, msg2) == otherHandlerFuture.read()
|
||||
|
|
|
@ -1,211 +0,0 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, sequtils, strutils],
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/rpc/messages
|
||||
|
||||
import
|
||||
../../../waku/node/peer_manager,
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_relay,
|
||||
../testlib/common,
|
||||
../testlib/wakucore,
|
||||
./utils
|
||||
|
||||
|
||||
suite "Waku Relay":
|
||||
|
||||
asyncTest "subscribe and add handler to topics":
|
||||
## Setup
|
||||
let nodeA = await newTestWakuRelay()
|
||||
|
||||
## Given
|
||||
let
|
||||
networkA = "test-network1"
|
||||
networkB = "test-network2"
|
||||
|
||||
## when
|
||||
discard nodeA.subscribe(networkA, noopRawHandler())
|
||||
discard nodeA.subscribe(networkB, noopRawHandler())
|
||||
|
||||
## Then
|
||||
check:
|
||||
nodeA.isSubscribed(networkA)
|
||||
nodeA.isSubscribed(networkB)
|
||||
|
||||
let subscribedTopics = toSeq(nodeA.subscribedTopics)
|
||||
check:
|
||||
subscribedTopics.len == 2
|
||||
subscribedTopics.contains(networkA)
|
||||
subscribedTopics.contains(networkB)
|
||||
|
||||
## Cleanup
|
||||
await nodeA.stop()
|
||||
|
||||
asyncTest "unsubscribe all handlers from topic":
|
||||
## Setup
|
||||
let nodeA = await newTestWakuRelay()
|
||||
|
||||
## Given
|
||||
let
|
||||
networkA = "test-network1"
|
||||
networkB = "test-network2"
|
||||
networkC = "test-network3"
|
||||
|
||||
discard nodeA.subscribe(networkA, noopRawHandler())
|
||||
discard nodeA.subscribe(networkB, noopRawHandler())
|
||||
discard nodeA.subscribe(networkC, noopRawHandler())
|
||||
|
||||
let topics = toSeq(nodeA.subscribedTopics)
|
||||
require:
|
||||
topics.len == 3
|
||||
topics.contains(networkA)
|
||||
topics.contains(networkB)
|
||||
topics.contains(networkC)
|
||||
|
||||
## When
|
||||
nodeA.unsubscribeAll(networkA)
|
||||
|
||||
## Then
|
||||
check:
|
||||
nodeA.isSubscribed(networkB)
|
||||
nodeA.isSubscribed(networkC)
|
||||
not nodeA.isSubscribed(networkA)
|
||||
|
||||
let subscribedTopics = toSeq(nodeA.subscribedTopics)
|
||||
check:
|
||||
subscribedTopics.len == 2
|
||||
subscribedTopics.contains(networkB)
|
||||
subscribedTopics.contains(networkC)
|
||||
not subscribedTopics.contains(networkA)
|
||||
|
||||
## Cleanup
|
||||
await nodeA.stop()
|
||||
|
||||
asyncTest "publish a message into a topic":
|
||||
## Setup
|
||||
let
|
||||
srcSwitch = newTestSwitch()
|
||||
srcPeerManager = PeerManager.new(srcSwitch)
|
||||
srcNode = await newTestWakuRelay(srcSwitch)
|
||||
dstSwitch = newTestSwitch()
|
||||
dstPeerManager = PeerManager.new(dstSwitch)
|
||||
dstNode = await newTestWakuRelay(dstSwitch)
|
||||
|
||||
await allFutures(srcSwitch.start(), dstSwitch.start())
|
||||
|
||||
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
|
||||
let connOk = await srcPeerManager.connectRelay(dstPeerInfo)
|
||||
require:
|
||||
connOk == true
|
||||
|
||||
## Given
|
||||
let networkTopic = "test-network1"
|
||||
let message = fakeWakuMessage()
|
||||
|
||||
# Self subscription (triggerSelf = true)
|
||||
let srcSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
|
||||
proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
|
||||
srcSubsFut.complete((topic, message))
|
||||
|
||||
discard srcNode.subscribe(networkTopic, srcSubsHandler)
|
||||
|
||||
# Subscription
|
||||
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
|
||||
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
|
||||
dstSubsFut.complete((topic, message))
|
||||
|
||||
discard dstNode.subscribe(networkTopic, dstSubsHandler)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
## When
|
||||
discard await srcNode.publish(networkTopic, message)
|
||||
|
||||
## Then
|
||||
require:
|
||||
await srcSubsFut.withTimeout(5.seconds)
|
||||
await dstSubsFut.withTimeout(5.seconds)
|
||||
|
||||
let (srcTopic, srcMsg) = srcSubsFut.read()
|
||||
check:
|
||||
srcTopic == networkTopic
|
||||
srcMsg == message
|
||||
|
||||
let (dstTopic, dstMsg) = dstSubsFut.read()
|
||||
check:
|
||||
dstTopic == networkTopic
|
||||
dstMsg == message
|
||||
|
||||
## Cleanup
|
||||
await allFutures(srcSwitch.stop(), dstSwitch.stop())
|
||||
|
||||
asyncTest "content topic validator as a message subscription filter":
|
||||
## Setup
|
||||
let
|
||||
srcSwitch = newTestSwitch()
|
||||
srcPeerManager = PeerManager.new(srcSwitch)
|
||||
srcNode = await newTestWakuRelay(srcSwitch)
|
||||
dstSwitch = newTestSwitch()
|
||||
dstPeerManager = PeerManager.new(dstSwitch)
|
||||
dstNode = await newTestWakuRelay(dstSwitch)
|
||||
|
||||
await allFutures(srcSwitch.start(), dstSwitch.start())
|
||||
|
||||
let dstPeerInfo = dstPeerManager.switch.peerInfo.toRemotePeerInfo()
|
||||
let connOk = await srcPeerManager.connectRelay(dstPeerInfo)
|
||||
require:
|
||||
connOk == true
|
||||
|
||||
## Given
|
||||
let networkTopic = "test-network1"
|
||||
let contentTopic = "test-content1"
|
||||
|
||||
let message = fakeWakuMessage(contentTopic=contentTopic)
|
||||
let messages = @[
|
||||
fakeWakuMessage(contentTopic="any"),
|
||||
fakeWakuMessage(contentTopic="any"),
|
||||
fakeWakuMessage(contentTopic="any"),
|
||||
message,
|
||||
fakeWakuMessage(contentTopic="any"),
|
||||
]
|
||||
|
||||
# Subscription
|
||||
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
|
||||
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
|
||||
dstSubsFut.complete((topic, message))
|
||||
|
||||
discard dstNode.subscribe(networkTopic, dstSubsHandler)
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
# Validator
|
||||
proc validator(topic: PubsubTopic, msg: WakuMessage): Future[ValidationResult] {.async.} =
|
||||
# only relay messages with contentTopic1
|
||||
if msg.contentTopic != contentTopic:
|
||||
return ValidationResult.Reject
|
||||
|
||||
return ValidationResult.Accept
|
||||
|
||||
dstNode.addValidator(networkTopic, validator)
|
||||
|
||||
## When
|
||||
for msg in messages:
|
||||
discard await srcNode.publish(networkTopic, msg)
|
||||
|
||||
## Then
|
||||
require:
|
||||
await dstSubsFut.withTimeout(5.seconds)
|
||||
|
||||
let (dstTopic, dstMsg) = dstSubsFut.read()
|
||||
check:
|
||||
dstTopic == networkTopic
|
||||
dstMsg == message
|
||||
|
||||
## Cleanup
|
||||
await allFutures(srcSwitch.stop(), dstSwitch.stop())
|
Loading…
Reference in New Issue