mirror of https://github.com/waku-org/nwaku.git
test(waku-filter): Unsubscribe tests (#2085)
* Implement unsubscribe waku filter tests. * test(waku-filter): Unsubscribe all, payloads and security tests (#2095) * Implement waku node filter Security and Privacy tests (#2096)
This commit is contained in:
parent
6d81e3841a
commit
85265e6c61
|
@ -25,12 +25,12 @@ import
|
|||
|
||||
let FUTURE_TIMEOUT = 1.seconds
|
||||
|
||||
suite "Full Node - Waku Filter - End to End":
|
||||
suite "Waku Filter - End to End":
|
||||
var client {.threadvar.}: WakuNode
|
||||
var clientPeerId {.threadvar.}: PeerId
|
||||
var server {.threadvar.}: WakuNode
|
||||
var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
|
||||
var pubsubTopic {.threadvar.}: PubsubTopicy
|
||||
var pubsubTopic {.threadvar.}: PubsubTopic
|
||||
var contentTopic {.threadvar.}: ContentTopic
|
||||
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
||||
var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)]
|
||||
|
@ -66,8 +66,8 @@ suite "Full Node - Waku Filter - End to End":
|
|||
asyncTeardown:
|
||||
waitFor allFutures(client.stop(), server.stop())
|
||||
|
||||
asyncTest "Full Client Node to Full Service Node Subscription":
|
||||
# When a full client node subscribes to a full service node
|
||||
asyncTest "Client Node receives Push from Server Node, via Filter":
|
||||
# When a client node subscribes to a filter node
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
|
@ -107,3 +107,105 @@ suite "Full Node - Waku Filter - End to End":
|
|||
# Then the message is not pushed to the client
|
||||
check:
|
||||
not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
asyncTest "Client Node can't receive Push from Server Node, via Relay":
|
||||
# Given the server node has Relay enabled
|
||||
await server.mountRelay()
|
||||
|
||||
# And valid filter subscription
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
require:
|
||||
subscribeResponse.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
|
||||
# When a server node gets a Relay message
|
||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
||||
await server.publish(some(pubsubTopic), msg1)
|
||||
|
||||
# Then the message is not sent to the client's filter push handler
|
||||
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
|
||||
asyncTest "Client Node can't subscribe to Server Node without Filter":
|
||||
# Given a server node with Relay without Filter
|
||||
let
|
||||
serverKey = generateSecp256k1Key()
|
||||
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
waitFor server.start()
|
||||
waitFor server.mountRelay()
|
||||
|
||||
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
||||
|
||||
# When a client node subscribes to the server node
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
# Then the subscription is successful
|
||||
check (not subscribeResponse.isOk())
|
||||
|
||||
xasyncTest "Filter Client Node can receive messages after subscribing and restarting, via Filter":
|
||||
# Given a valid filter subscription
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
require:
|
||||
subscribeResponse.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
|
||||
# And the client node reboots
|
||||
waitFor client.stop()
|
||||
waitFor client.start()
|
||||
client.mountFilterClient()
|
||||
|
||||
# When a message is sent to the subscribed content topic, via Filter; without refreshing the subscription
|
||||
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
||||
await server.filterHandleMessage(pubsubTopic, msg)
|
||||
|
||||
# Then the message is pushed to the client
|
||||
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read()
|
||||
check:
|
||||
pushedMsgPubsubTopic == pubsubTopic
|
||||
pushedMsg == msg
|
||||
|
||||
asyncTest "Filter Client Node can't receive messages after subscribing and restarting, via Relay": # Given the server node has Relay enabled
|
||||
await server.mountRelay()
|
||||
|
||||
# Given a valid filter subscription
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
require:
|
||||
subscribeResponse.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
|
||||
# And the client node reboots
|
||||
waitFor client.stop()
|
||||
waitFor client.start()
|
||||
client.mountFilterClient()
|
||||
|
||||
# When a message is sent to the subscribed content topic, via Relay
|
||||
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
||||
await server.publish(some(pubsubTopic), msg)
|
||||
|
||||
# Then the message is not sent to the client's filter push handler
|
||||
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
|
||||
# Given the client refreshes the subscription
|
||||
let subscribeResponse2 = await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
check:
|
||||
subscribeResponse2.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
|
||||
# When a message is sent to the subscribed content topic, via Relay
|
||||
pushHandlerFuture = newPushHandlerFuture()
|
||||
let msg2 = fakeWakuMessage(contentTopic=contentTopic)
|
||||
await server.publish(some(pubsubTopic), msg2)
|
||||
|
||||
# Then the message is not sent to the client's filter push handler
|
||||
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
proc toString*(bytes: seq[byte]): string =
|
||||
cast[string](bytes)
|
|
@ -0,0 +1,70 @@
|
|||
import
|
||||
std/json
|
||||
|
||||
const
|
||||
ALPHABETIC* = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
ALPHANUMERIC* = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||
ALPHANUMERIC_SPECIAL* = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()_+-=[]{}|;':\\\",./<>?`~"
|
||||
EMOJI* = "😀 😃 😄 😁 😆 😅 🤣 😂 🙂 🙃 😉 😊 😇 🥰 😍 🤩 😘 😗 😚 😙"
|
||||
CODE* = "def main():\n\tprint('Hello, world!')"
|
||||
QUERY* = """
|
||||
SELECT
|
||||
u.id,
|
||||
u.name,
|
||||
u.email,
|
||||
u.created_at,
|
||||
u.updated_at,
|
||||
(
|
||||
SELECT
|
||||
COUNT(*)
|
||||
FROM
|
||||
posts p
|
||||
WHERE
|
||||
p.user_id = u.id
|
||||
) AS post_count
|
||||
FROM
|
||||
users u
|
||||
WHERE
|
||||
u.id = 1
|
||||
"""
|
||||
TEXT_SMALL* = "Lorem ipsum dolor sit amet, consectetur adipiscing elit."
|
||||
TEXT_LARGE* = """
|
||||
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras gravida vulputate semper. Proin
|
||||
eleifend varius cursus. Morbi lacinia posuere quam sit amet pretium. Sed non metus fermentum,
|
||||
venenatis nisl id, vestibulum eros. Quisque non lorem sit amet lectus faucibus elementum eu
|
||||
sit amet odio. Mauris tortor justo, malesuada quis volutpat vitae, tristique at nisl. Proin
|
||||
eleifend eu arcu ac sodales. In efficitur ipsum urna, ut viverra turpis sodales ut. Phasellus
|
||||
nec tortor eu urna suscipit euismod eget vel ligula. Phasellus vestibulum sollicitudin tellus,
|
||||
ac sodales tellus tempor id. Curabitur sed congue velit.
|
||||
"""
|
||||
|
||||
proc getSampleJsonDictionary*(): JsonNode =
|
||||
%*{
|
||||
"shapes": [
|
||||
{
|
||||
"type": "circle",
|
||||
"radius": 10
|
||||
},
|
||||
{
|
||||
"type": "square",
|
||||
"side": 10
|
||||
}
|
||||
],
|
||||
"colours": [
|
||||
"red",
|
||||
"green",
|
||||
"blue"
|
||||
]
|
||||
}
|
||||
|
||||
proc getSampleJsonList*(): JsonNode =
|
||||
%*[
|
||||
{
|
||||
"type": "cat",
|
||||
"name": "Salem"
|
||||
},
|
||||
{
|
||||
"type": "dog",
|
||||
"name": "Oberon"
|
||||
},
|
||||
]
|
File diff suppressed because it is too large
Load Diff
|
@ -86,7 +86,11 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
|
|||
return err(FilterSubscribeError.notFound())
|
||||
|
||||
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
||||
# TODO: consider error response if filter criteria does not exist
|
||||
|
||||
if not peerSubscription.containsAny(filterCriteria):
|
||||
debug "unsubscribing peer is not subscribed to any of the content topics in this pubsub topic", peerId=peerId, pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics
|
||||
return err(FilterSubscribeError.notFound())
|
||||
|
||||
peerSubscription.excl(filterCriteria)
|
||||
|
||||
if peerSubscription.len() == 0:
|
||||
|
|
|
@ -43,3 +43,11 @@ proc removePeers*(subscriptions: var FilterSubscriptions, peerIds: seq[PeerID])
|
|||
## Remove all subscriptions for a given list of peers
|
||||
for peerId in peerIds:
|
||||
subscriptions.removePeer(peerId)
|
||||
|
||||
proc containsAny*(criteria: FilterCriteria, otherCriteria: FilterCriteria): bool =
|
||||
## Check if a given pubsub topic is contained in a set of filter criteria
|
||||
## TODO: Better way to achieve this?
|
||||
for criterion in otherCriteria:
|
||||
if criterion in criteria:
|
||||
return true
|
||||
false
|
||||
|
|
Loading…
Reference in New Issue