mirror of https://github.com/waku-org/nwaku.git
adapt filter tests
This commit is contained in:
parent
9fc4b32d81
commit
3bbc6dc654
|
@ -1,7 +1,7 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, tables, sequtils],
|
||||
std/[options, tables, sequtils, strutils, sets],
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
|
@ -17,8 +17,10 @@ import
|
|||
waku_filter_v2,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/subscriptions,
|
||||
waku_filter_v2/rpc,
|
||||
],
|
||||
../testlib/[common, wakucore, wakunode, testasync, futures, testutils]
|
||||
../testlib/[common, wakucore, wakunode, testasync, futures, testutils],
|
||||
../waku_filter_v2/[test_waku_filter_protocol, waku_filter_utils]
|
||||
|
||||
suite "Waku Filter - End to End":
|
||||
var client {.threadvar.}: WakuNode
|
||||
|
@ -47,7 +49,9 @@ suite "Waku Filter - End to End":
|
|||
serverKey = generateSecp256k1Key()
|
||||
clientKey = generateSecp256k1Key()
|
||||
|
||||
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(23450))
|
||||
server = newTestWakuNode(
|
||||
serverKey, parseIpAddress("0.0.0.0"), Port(23450), maxConnections = 300
|
||||
)
|
||||
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
|
||||
clientClone = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
|
||||
# Used for testing client restarts
|
||||
|
@ -209,3 +213,374 @@ suite "Waku Filter - End to End":
|
|||
|
||||
# Then the message is not sent to the client's filter push handler
|
||||
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
|
||||
asyncTest "ping subscriber":
|
||||
# Given
|
||||
let
|
||||
wakuFilter = server.wakuFilter
|
||||
clientPeerId = client.switch.peerInfo.peerId
|
||||
serverPeerId = server.switch.peerInfo.peerId
|
||||
pingRequest =
|
||||
createRequest(filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING)
|
||||
filterSubscribeRequest = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
|
||||
## connect both switches
|
||||
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
|
||||
|
||||
# When
|
||||
let response1 = await wakuFilter.handleSubscribeRequest(clientPeerId, pingRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response1.requestId == pingRequest.requestId
|
||||
response1.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
|
||||
response1.statusDesc.get().contains("peer has no subscriptions")
|
||||
|
||||
# When
|
||||
let
|
||||
response2 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest)
|
||||
response3 = await wakuFilter.handleSubscribeRequest(clientPeerId, pingRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response2.requestId == filterSubscribeRequest.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
response3.requestId == pingRequest.requestId
|
||||
response3.statusCode == 200
|
||||
response3.statusDesc.get() == "OK"
|
||||
|
||||
asyncTest "simple subscribe and unsubscribe request":
|
||||
# Given
|
||||
let
|
||||
wakuFilter = server.wakuFilter
|
||||
clientPeerId = client.switch.peerInfo.peerId
|
||||
serverPeerId = server.switch.peerInfo.peerId
|
||||
filterSubscribeRequest = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
filterUnsubscribeRequest = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||
pubsubTopic = filterSubscribeRequest.pubsubTopic,
|
||||
contentTopics = filterSubscribeRequest.contentTopics,
|
||||
)
|
||||
|
||||
## connect both switches
|
||||
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
|
||||
|
||||
# When
|
||||
let response =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 1
|
||||
response.requestId == filterSubscribeRequest.requestId
|
||||
response.statusCode == 200
|
||||
response.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response2 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterUnsubscribeRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||
# peerId is removed from subscriptions
|
||||
response2.requestId == filterUnsubscribeRequest.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
|
||||
asyncTest "simple subscribe and unsubscribe all for multiple content topics":
|
||||
# Given
|
||||
let
|
||||
wakuFilter = server.wakuFilter
|
||||
clientPeerId = client.switch.peerInfo.peerId
|
||||
serverPeerId = server.switch.peerInfo.peerId
|
||||
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
|
||||
filterSubscribeRequest = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic, nonDefaultContentTopic],
|
||||
)
|
||||
filterUnsubscribeAllRequest =
|
||||
createRequest(filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL)
|
||||
|
||||
## connect both switches
|
||||
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
|
||||
|
||||
# When
|
||||
let response =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 2
|
||||
unorderedCompare(
|
||||
wakuFilter.getSubscribedContentTopics(clientPeerId),
|
||||
filterSubscribeRequest.contentTopics,
|
||||
)
|
||||
response.requestId == filterSubscribeRequest.requestId
|
||||
response.statusCode == 200
|
||||
response.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response2 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterUnsubscribeAllRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||
# peerId is removed from subscriptions
|
||||
response2.requestId == filterUnsubscribeAllRequest.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
|
||||
asyncTest "subscribe and unsubscribe to multiple content topics":
|
||||
# Given
|
||||
let
|
||||
wakuFilter = server.wakuFilter
|
||||
clientPeerId = client.switch.peerInfo.peerId
|
||||
serverPeerId = server.switch.peerInfo.peerId
|
||||
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
|
||||
filterSubscribeRequest1 = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
filterSubscribeRequest2 = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
|
||||
contentTopics = @[nonDefaultContentTopic],
|
||||
)
|
||||
filterUnsubscribeRequest1 = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
|
||||
contentTopics = filterSubscribeRequest1.contentTopics,
|
||||
)
|
||||
filterUnsubscribeRequest2 = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||
pubsubTopic = filterSubscribeRequest2.pubsubTopic,
|
||||
contentTopics = filterSubscribeRequest2.contentTopics,
|
||||
)
|
||||
|
||||
## connect both switches
|
||||
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
|
||||
|
||||
# When
|
||||
let response1 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest1)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 1
|
||||
unorderedCompare(
|
||||
wakuFilter.getSubscribedContentTopics(clientPeerId),
|
||||
filterSubscribeRequest1.contentTopics,
|
||||
)
|
||||
response1.requestId == filterSubscribeRequest1.requestId
|
||||
response1.statusCode == 200
|
||||
response1.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response2 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest2)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 2
|
||||
unorderedCompare(
|
||||
wakuFilter.getSubscribedContentTopics(clientPeerId),
|
||||
filterSubscribeRequest1.contentTopics & filterSubscribeRequest2.contentTopics,
|
||||
)
|
||||
response2.requestId == filterSubscribeRequest2.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response3 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterUnsubscribeRequest1)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 1
|
||||
unorderedCompare(
|
||||
wakuFilter.getSubscribedContentTopics(clientPeerId),
|
||||
filterSubscribeRequest2.contentTopics,
|
||||
)
|
||||
response3.requestId == filterUnsubscribeRequest1.requestId
|
||||
response3.statusCode == 200
|
||||
response3.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response4 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, filterUnsubscribeRequest2)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||
# peerId is removed from subscriptions
|
||||
response4.requestId == filterUnsubscribeRequest2.requestId
|
||||
response4.statusCode == 200
|
||||
response4.statusDesc.get() == "OK"
|
||||
|
||||
asyncTest "subscribe errors":
|
||||
## Tests most common error paths while subscribing
|
||||
|
||||
# Given
|
||||
let
|
||||
wakuFilter = server.wakuFilter
|
||||
clientPeerId = client.switch.peerInfo.peerId
|
||||
serverPeerId = server.switch.peerInfo.peerId
|
||||
peerManager = server.peerManager
|
||||
|
||||
## connect both switches
|
||||
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
|
||||
|
||||
## Incomplete filter criteria
|
||||
|
||||
# When
|
||||
let
|
||||
reqNoPubsubTopic = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = none(PubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
reqNoContentTopics = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[],
|
||||
)
|
||||
response1 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, reqNoPubsubTopic)
|
||||
response2 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, reqNoContentTopics)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response1.requestId == reqNoPubsubTopic.requestId
|
||||
response2.requestId == reqNoContentTopics.requestId
|
||||
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
|
||||
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
|
||||
response1.statusDesc.get().contains(
|
||||
"pubsubTopic and contentTopics must be specified"
|
||||
)
|
||||
response2.statusDesc.get().contains(
|
||||
"pubsubTopic and contentTopics must be specified"
|
||||
)
|
||||
|
||||
## Max content topics per request exceeded
|
||||
|
||||
# When
|
||||
let
|
||||
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(
|
||||
ContentTopic("/waku/2/content-$#/proto" % [$it])
|
||||
)
|
||||
reqTooManyContentTopics = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = contentTopics,
|
||||
)
|
||||
response3 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, reqTooManyContentTopics)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response3.requestId == reqTooManyContentTopics.requestId
|
||||
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
|
||||
response3.statusDesc.get().contains("exceeds maximum content topics")
|
||||
|
||||
## Max filter criteria exceeded
|
||||
|
||||
# When
|
||||
let filterCriteria = toSeq(1 .. MaxFilterCriteriaPerPeer).mapIt(
|
||||
(DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it]))
|
||||
)
|
||||
|
||||
discard await wakuFilter.subscriptions.addSubscription(
|
||||
clientPeerId, filterCriteria.toHashSet(), peerManager
|
||||
)
|
||||
|
||||
let
|
||||
reqTooManyFilterCriteria = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
response4 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, reqTooManyFilterCriteria)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response4.requestId == reqTooManyFilterCriteria.requestId
|
||||
response4.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
|
||||
response4.statusDesc.get().contains(
|
||||
"peer has reached maximum number of filter criteria"
|
||||
)
|
||||
|
||||
## Max subscriptions exceeded
|
||||
|
||||
# When
|
||||
await wakuFilter.subscriptions.removePeer(clientPeerId)
|
||||
wakuFilter.subscriptions.cleanUp()
|
||||
|
||||
var peers = newSeq[WakuNode](MaxFilterPeers)
|
||||
|
||||
for index in 0 ..< MaxFilterPeers:
|
||||
peers[index] = newTestWakuNode(
|
||||
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23551 + index)
|
||||
)
|
||||
|
||||
await peers[index].start()
|
||||
await peers[index].mountFilterClient()
|
||||
|
||||
## connect switches
|
||||
debug "establish connection", peerId = peers[index].peerInfo.peerId
|
||||
|
||||
await server.switch.connect(
|
||||
peers[index].switch.peerInfo.peerId, peers[index].switch.peerInfo.listenAddrs
|
||||
)
|
||||
|
||||
debug "adding subscription"
|
||||
|
||||
(
|
||||
await wakuFilter.subscriptions.addSubscription(
|
||||
peers[index].switch.peerInfo.peerId,
|
||||
@[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet(),
|
||||
peerManager,
|
||||
)
|
||||
).isOkOr:
|
||||
assert false, $error
|
||||
|
||||
let
|
||||
reqTooManySubscriptions = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
response5 =
|
||||
await wakuFilter.handleSubscribeRequest(clientPeerId, reqTooManySubscriptions)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response5.requestId == reqTooManySubscriptions.requestId
|
||||
response5.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
|
||||
response5.statusDesc.get().contains(
|
||||
"node has reached maximum number of subscriptions"
|
||||
)
|
||||
|
||||
## stop the peers
|
||||
for index in 0 ..< MaxFilterPeers:
|
||||
await peers[index].stop()
|
||||
|
|
|
@ -2542,7 +2542,7 @@ suite "Waku Filter - End to End":
|
|||
|
||||
await sleepAsync(2200)
|
||||
|
||||
wakuFilter.maintainSubscriptions()
|
||||
await wakuFilter.maintainSubscriptions()
|
||||
|
||||
check not wakuFilter.subscriptions.isSubscribed(clientPeerId)
|
||||
check not wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
|
||||
|
|
|
@ -30,7 +30,7 @@ proc generateRequestId(rng: ref HmacDrbgContext): string =
|
|||
hmacDrbgGenerate(rng[], bytes)
|
||||
return toHex(bytes)
|
||||
|
||||
proc createRequest(
|
||||
proc createRequest*(
|
||||
filterSubscribeType: FilterSubscribeType,
|
||||
pubsubTopic = none(PubsubTopic),
|
||||
contentTopics = newSeq[ContentTopic](),
|
||||
|
@ -56,282 +56,6 @@ proc getSubscribedContentTopics(
|
|||
return contentTopics
|
||||
|
||||
suite "Waku Filter - handling subscribe requests":
|
||||
asyncTest "simple subscribe and unsubscribe request":
|
||||
# Given
|
||||
let
|
||||
switch = newStandardSwitch()
|
||||
wakuFilter = newTestWakuFilter(switch)
|
||||
peerId = PeerId.random().get()
|
||||
filterSubscribeRequest = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
filterUnsubscribeRequest = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||
pubsubTopic = filterSubscribeRequest.pubsubTopic,
|
||||
contentTopics = filterSubscribeRequest.contentTopics,
|
||||
)
|
||||
|
||||
# When
|
||||
let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||
response.requestId == filterSubscribeRequest.requestId
|
||||
response.statusCode == 200
|
||||
response.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||
# peerId is removed from subscriptions
|
||||
response2.requestId == filterUnsubscribeRequest.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
|
||||
asyncTest "simple subscribe and unsubscribe all for multiple content topics":
|
||||
# Given
|
||||
let
|
||||
switch = newStandardSwitch()
|
||||
wakuFilter = newTestWakuFilter(switch)
|
||||
peerId = PeerId.random().get()
|
||||
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
|
||||
filterSubscribeRequest = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic, nonDefaultContentTopic],
|
||||
)
|
||||
filterUnsubscribeAllRequest =
|
||||
createRequest(filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL)
|
||||
|
||||
# When
|
||||
let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 2
|
||||
unorderedCompare(
|
||||
wakuFilter.getSubscribedContentTopics(peerId),
|
||||
filterSubscribeRequest.contentTopics,
|
||||
)
|
||||
response.requestId == filterSubscribeRequest.requestId
|
||||
response.statusCode == 200
|
||||
response.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response2 =
|
||||
wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeAllRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||
# peerId is removed from subscriptions
|
||||
response2.requestId == filterUnsubscribeAllRequest.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
|
||||
asyncTest "subscribe and unsubscribe to multiple content topics":
|
||||
# Given
|
||||
let
|
||||
switch = newStandardSwitch()
|
||||
wakuFilter = newTestWakuFilter(switch)
|
||||
peerId = PeerId.random().get()
|
||||
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
|
||||
filterSubscribeRequest1 = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
filterSubscribeRequest2 = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
|
||||
contentTopics = @[nonDefaultContentTopic],
|
||||
)
|
||||
filterUnsubscribeRequest1 = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
|
||||
contentTopics = filterSubscribeRequest1.contentTopics,
|
||||
)
|
||||
filterUnsubscribeRequest2 = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
|
||||
pubsubTopic = filterSubscribeRequest2.pubsubTopic,
|
||||
contentTopics = filterSubscribeRequest2.contentTopics,
|
||||
)
|
||||
|
||||
# When
|
||||
let response1 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest1)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||
unorderedCompare(
|
||||
wakuFilter.getSubscribedContentTopics(peerId),
|
||||
filterSubscribeRequest1.contentTopics,
|
||||
)
|
||||
response1.requestId == filterSubscribeRequest1.requestId
|
||||
response1.statusCode == 200
|
||||
response1.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest2)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 2
|
||||
unorderedCompare(
|
||||
wakuFilter.getSubscribedContentTopics(peerId),
|
||||
filterSubscribeRequest1.contentTopics & filterSubscribeRequest2.contentTopics,
|
||||
)
|
||||
response2.requestId == filterSubscribeRequest2.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response3 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest1)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||
unorderedCompare(
|
||||
wakuFilter.getSubscribedContentTopics(peerId),
|
||||
filterSubscribeRequest2.contentTopics,
|
||||
)
|
||||
response3.requestId == filterUnsubscribeRequest1.requestId
|
||||
response3.statusCode == 200
|
||||
response3.statusDesc.get() == "OK"
|
||||
|
||||
# When
|
||||
let response4 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest2)
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||
# peerId is removed from subscriptions
|
||||
response4.requestId == filterUnsubscribeRequest2.requestId
|
||||
response4.statusCode == 200
|
||||
response4.statusDesc.get() == "OK"
|
||||
|
||||
asyncTest "subscribe errors":
|
||||
## Tests most common error paths while subscribing
|
||||
|
||||
# Given
|
||||
let
|
||||
switch = newStandardSwitch()
|
||||
wakuFilter = newTestWakuFilter(switch)
|
||||
peerId = PeerId.random().get()
|
||||
|
||||
## Incomplete filter criteria
|
||||
|
||||
# When
|
||||
let
|
||||
reqNoPubsubTopic = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = none(PubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
reqNoContentTopics = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[],
|
||||
)
|
||||
response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
|
||||
response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response1.requestId == reqNoPubsubTopic.requestId
|
||||
response2.requestId == reqNoContentTopics.requestId
|
||||
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
|
||||
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
|
||||
response1.statusDesc.get().contains(
|
||||
"pubsubTopic and contentTopics must be specified"
|
||||
)
|
||||
response2.statusDesc.get().contains(
|
||||
"pubsubTopic and contentTopics must be specified"
|
||||
)
|
||||
|
||||
## Max content topics per request exceeded
|
||||
|
||||
# When
|
||||
let
|
||||
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(
|
||||
ContentTopic("/waku/2/content-$#/proto" % [$it])
|
||||
)
|
||||
reqTooManyContentTopics = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = contentTopics,
|
||||
)
|
||||
response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response3.requestId == reqTooManyContentTopics.requestId
|
||||
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
|
||||
response3.statusDesc.get().contains("exceeds maximum content topics")
|
||||
|
||||
## Max filter criteria exceeded
|
||||
|
||||
# When
|
||||
let filterCriteria = toSeq(1 .. MaxFilterCriteriaPerPeer).mapIt(
|
||||
(DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it]))
|
||||
)
|
||||
|
||||
discard wakuFilter.subscriptions.addSubscription(peerId, filterCriteria.toHashSet())
|
||||
|
||||
let
|
||||
reqTooManyFilterCriteria = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
response4 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyFilterCriteria)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response4.requestId == reqTooManyFilterCriteria.requestId
|
||||
response4.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
|
||||
response4.statusDesc.get().contains(
|
||||
"peer has reached maximum number of filter criteria"
|
||||
)
|
||||
|
||||
## Max subscriptions exceeded
|
||||
|
||||
# When
|
||||
wakuFilter.subscriptions.removePeer(peerId)
|
||||
wakuFilter.subscriptions.cleanUp()
|
||||
|
||||
for _ in 1 .. MaxFilterPeers:
|
||||
discard wakuFilter.subscriptions.addSubscription(
|
||||
PeerId.random().get(), @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet()
|
||||
)
|
||||
|
||||
let
|
||||
reqTooManySubscriptions = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
response5 = wakuFilter.handleSubscribeRequest(peerId, reqTooManySubscriptions)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response5.requestId == reqTooManySubscriptions.requestId
|
||||
response5.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
|
||||
response5.statusDesc.get().contains(
|
||||
"node has reached maximum number of subscriptions"
|
||||
)
|
||||
|
||||
asyncTest "unsubscribe errors":
|
||||
## Tests most common error paths while unsubscribing
|
||||
|
||||
|
@ -355,8 +79,8 @@ suite "Waku Filter - handling subscribe requests":
|
|||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[],
|
||||
)
|
||||
response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
|
||||
response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)
|
||||
response1 = await wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
|
||||
response2 = await wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)
|
||||
|
||||
# Then
|
||||
check:
|
||||
|
@ -383,7 +107,8 @@ suite "Waku Filter - handling subscribe requests":
|
|||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = contentTopics,
|
||||
)
|
||||
response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)
|
||||
response3 =
|
||||
await wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)
|
||||
|
||||
# Then
|
||||
check:
|
||||
|
@ -400,7 +125,8 @@ suite "Waku Filter - handling subscribe requests":
|
|||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
response4 = wakuFilter.handleSubscribeRequest(peerId, reqSubscriptionNotFound)
|
||||
response4 =
|
||||
await wakuFilter.handleSubscribeRequest(peerId, reqSubscriptionNotFound)
|
||||
|
||||
# Then
|
||||
check:
|
||||
|
@ -414,7 +140,7 @@ suite "Waku Filter - handling subscribe requests":
|
|||
let
|
||||
reqUnsubscribeAll =
|
||||
createRequest(filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL)
|
||||
response5 = wakuFilter.handleSubscribeRequest(peerId, reqUnsubscribeAll)
|
||||
response5 = await wakuFilter.handleSubscribeRequest(peerId, reqUnsubscribeAll)
|
||||
|
||||
# Then
|
||||
check:
|
||||
|
@ -422,43 +148,6 @@ suite "Waku Filter - handling subscribe requests":
|
|||
response5.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
|
||||
response5.statusDesc.get().contains("peer has no subscriptions")
|
||||
|
||||
asyncTest "ping subscriber":
|
||||
# Given
|
||||
let
|
||||
switch = newStandardSwitch()
|
||||
wakuFilter = newTestWakuFilter(switch)
|
||||
peerId = PeerId.random().get()
|
||||
pingRequest =
|
||||
createRequest(filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING)
|
||||
filterSubscribeRequest = createRequest(
|
||||
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
|
||||
pubsubTopic = some(DefaultPubsubTopic),
|
||||
contentTopics = @[DefaultContentTopic],
|
||||
)
|
||||
|
||||
# When
|
||||
let response1 = wakuFilter.handleSubscribeRequest(peerId, pingRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response1.requestId == pingRequest.requestId
|
||||
response1.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
|
||||
response1.statusDesc.get().contains("peer has no subscriptions")
|
||||
|
||||
# When
|
||||
let
|
||||
response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
|
||||
response3 = wakuFilter.handleSubscribeRequest(peerId, pingRequest)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response2.requestId == filterSubscribeRequest.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
response3.requestId == pingRequest.requestId
|
||||
response3.statusCode == 200
|
||||
response3.statusDesc.get() == "OK"
|
||||
|
||||
suite "Waku Filter - subscription maintenance":
|
||||
asyncTest "simple maintenance":
|
||||
# Given
|
||||
|
@ -478,11 +167,11 @@ suite "Waku Filter - subscription maintenance":
|
|||
switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec]
|
||||
switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec]
|
||||
switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec]
|
||||
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).statusCode ==
|
||||
require (await wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest)).statusCode ==
|
||||
200
|
||||
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).statusCode ==
|
||||
require (await wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest)).statusCode ==
|
||||
200
|
||||
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).statusCode ==
|
||||
require (await wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest)).statusCode ==
|
||||
200
|
||||
|
||||
# Then
|
||||
|
@ -494,7 +183,7 @@ suite "Waku Filter - subscription maintenance":
|
|||
|
||||
# When
|
||||
# Maintenance loop should leave all peers in peer store intact
|
||||
wakuFilter.maintainSubscriptions()
|
||||
await wakuFilter.maintainSubscriptions()
|
||||
|
||||
# Then
|
||||
check:
|
||||
|
@ -507,7 +196,7 @@ suite "Waku Filter - subscription maintenance":
|
|||
# Remove peerId1 and peerId3 from peer store
|
||||
switch.peerStore.del(peerId1)
|
||||
switch.peerStore.del(peerId3)
|
||||
wakuFilter.maintainSubscriptions()
|
||||
await wakuFilter.maintainSubscriptions()
|
||||
|
||||
# Then
|
||||
check:
|
||||
|
@ -517,7 +206,7 @@ suite "Waku Filter - subscription maintenance":
|
|||
# When
|
||||
# Remove peerId2 from peer store
|
||||
switch.peerStore.del(peerId2)
|
||||
wakuFilter.maintainSubscriptions()
|
||||
await wakuFilter.maintainSubscriptions()
|
||||
|
||||
# Then
|
||||
check:
|
||||
|
|
Loading…
Reference in New Issue