fix: filter - enhancements in subscription management (#3198)

* waku_filter_v2: idiomatic way run periodic subscription manager
* filter subscriptions: add more debug logs
* filter: make sure the custom start and stop procs are called
* make sure filter protocol is started if it is mounted
* filter: dial push connection on subscribe only
* reduce max num filter peers from 1000 to 100
* adapt filter tests
* waku_peer_exchange protocol remove temporary debug logs
This commit is contained in:
Ivan FB 2025-01-28 15:37:33 +01:00 committed by GitHub
parent c01a21e01f
commit d9e79022fe
10 changed files with 931 additions and 712 deletions

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, tables, sequtils],
std/[options, tables, sequtils, strutils, sets],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
@ -17,8 +17,29 @@ import
waku_filter_v2,
waku_filter_v2/client,
waku_filter_v2/subscriptions,
waku_filter_v2/rpc,
],
../testlib/[common, wakucore, wakunode, testasync, futures, testutils]
../testlib/[common, wakucore, wakunode, testasync, futures, testutils],
../waku_filter_v2/waku_filter_utils
proc generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)
proc createRequest(
filterSubscribeType: FilterSubscribeType,
pubsubTopic = none(PubsubTopic),
contentTopics = newSeq[ContentTopic](),
): FilterSubscribeRequest =
let requestId = generateRequestId(rng)
return FilterSubscribeRequest(
requestId: requestId,
filterSubscribeType: filterSubscribeType,
pubsubTopic: pubsubTopic,
contentTopics: contentTopics,
)
suite "Waku Filter - End to End":
var client {.threadvar.}: WakuNode
@ -31,6 +52,8 @@ suite "Waku Filter - End to End":
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)]
var messagePushHandler {.threadvar.}: FilterPushHandler
var clientKey {.threadvar.}: PrivateKey
var serverKey {.threadvar.}: PrivateKey
asyncSetup:
pushHandlerFuture = newFuture[(string, WakuMessage)]()
@ -43,11 +66,12 @@ suite "Waku Filter - End to End":
contentTopic = DefaultContentTopic
contentTopicSeq = @[DefaultContentTopic]
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(23450))
server = newTestWakuNode(
serverKey, parseIpAddress("0.0.0.0"), Port(23450), maxConnections = 300
)
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
clientClone = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
# Used for testing client restarts
@ -148,9 +172,14 @@ suite "Waku Filter - End to End":
# Then the subscription is successful
check (not subscribeResponse.isOk())
asyncTest "Filter Client Node can receive messages after subscribing and restarting, via Filter":
xasyncTest "Filter Client Node can receive messages after subscribing and restarting, via Filter":
## connect both switches
await client.switch.connect(
server.switch.peerInfo.peerId, server.switch.peerInfo.listenAddrs
)
# Given a valid filter subscription
let subscribeResponse = await client.filterSubscribe(
var subscribeResponse = await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
require:
@ -159,7 +188,28 @@ suite "Waku Filter - End to End":
# And the client node reboots
await client.stop()
await clientClone.start() # Mimic restart by starting the clone
## This line above causes the test to fail. I think ConnManager
## is not prepare for restarts and maybe we don't need that restart feature.
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
await client.start() # Mimic restart by starting the clone
# pushHandlerFuture = newFuture[(string, WakuMessage)]()
await client.mountFilterClient()
client.wakuFilterClient.registerPushHandler(messagePushHandler)
## connect both switches
await client.switch.connect(
server.switch.peerInfo.peerId, server.switch.peerInfo.listenAddrs
)
# Given a valid filter subscription
subscribeResponse = await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
require:
subscribeResponse.isOk()
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
# When a message is sent to the subscribed content topic, via Filter; without refreshing the subscription
let msg = fakeWakuMessage(contentTopic = contentTopic)
@ -209,3 +259,580 @@ suite "Waku Filter - End to End":
# Then the message is not sent to the client's filter push handler
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
asyncTest "ping subscriber":
# Given
let
wakuFilter = server.wakuFilter
clientPeerId = client.switch.peerInfo.peerId
serverPeerId = server.switch.peerInfo.peerId
pingRequest =
createRequest(filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING)
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
## connect both switches
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
# When
let response1 = await wakuFilter.handleSubscribeRequest(clientPeerId, pingRequest)
# Then
check:
response1.requestId == pingRequest.requestId
response1.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response1.statusDesc.get().contains("peer has no subscriptions")
# When
let
response2 =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest)
response3 = await wakuFilter.handleSubscribeRequest(clientPeerId, pingRequest)
# Then
check:
response2.requestId == filterSubscribeRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
response3.requestId == pingRequest.requestId
response3.statusCode == 200
response3.statusDesc.get() == "OK"
asyncTest "simple subscribe and unsubscribe request":
# Given
let
wakuFilter = server.wakuFilter
clientPeerId = client.switch.peerInfo.peerId
serverPeerId = server.switch.peerInfo.peerId
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
filterUnsubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest.pubsubTopic,
contentTopics = filterSubscribeRequest.contentTopics,
)
## connect both switches
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
# When
let response =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 1
response.requestId == filterSubscribeRequest.requestId
response.statusCode == 200
response.statusDesc.get() == "OK"
# When
let response2 =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterUnsubscribeRequest)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 0
# peerId is removed from subscriptions
response2.requestId == filterUnsubscribeRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
asyncTest "simple subscribe and unsubscribe all for multiple content topics":
# Given
let
wakuFilter = server.wakuFilter
clientPeerId = client.switch.peerInfo.peerId
serverPeerId = server.switch.peerInfo.peerId
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic, nonDefaultContentTopic],
)
filterUnsubscribeAllRequest =
createRequest(filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL)
## connect both switches
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
# When
let response =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 2
unorderedCompare(
wakuFilter.getSubscribedContentTopics(clientPeerId),
filterSubscribeRequest.contentTopics,
)
response.requestId == filterSubscribeRequest.requestId
response.statusCode == 200
response.statusDesc.get() == "OK"
# When
let response2 =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterUnsubscribeAllRequest)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 0
# peerId is removed from subscriptions
response2.requestId == filterUnsubscribeAllRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
asyncTest "subscribe and unsubscribe to multiple content topics":
# Given
let
wakuFilter = server.wakuFilter
clientPeerId = client.switch.peerInfo.peerId
serverPeerId = server.switch.peerInfo.peerId
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
filterSubscribeRequest1 = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
filterSubscribeRequest2 = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
contentTopics = @[nonDefaultContentTopic],
)
filterUnsubscribeRequest1 = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
contentTopics = filterSubscribeRequest1.contentTopics,
)
filterUnsubscribeRequest2 = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest2.pubsubTopic,
contentTopics = filterSubscribeRequest2.contentTopics,
)
## connect both switches
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
# When
let response1 =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest1)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 1
unorderedCompare(
wakuFilter.getSubscribedContentTopics(clientPeerId),
filterSubscribeRequest1.contentTopics,
)
response1.requestId == filterSubscribeRequest1.requestId
response1.statusCode == 200
response1.statusDesc.get() == "OK"
# When
let response2 =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterSubscribeRequest2)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 2
unorderedCompare(
wakuFilter.getSubscribedContentTopics(clientPeerId),
filterSubscribeRequest1.contentTopics & filterSubscribeRequest2.contentTopics,
)
response2.requestId == filterSubscribeRequest2.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
# When
let response3 =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterUnsubscribeRequest1)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[clientPeerId].criteriaCount == 1
unorderedCompare(
wakuFilter.getSubscribedContentTopics(clientPeerId),
filterSubscribeRequest2.contentTopics,
)
response3.requestId == filterUnsubscribeRequest1.requestId
response3.statusCode == 200
response3.statusDesc.get() == "OK"
# When
let response4 =
await wakuFilter.handleSubscribeRequest(clientPeerId, filterUnsubscribeRequest2)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 0
# peerId is removed from subscriptions
response4.requestId == filterUnsubscribeRequest2.requestId
response4.statusCode == 200
response4.statusDesc.get() == "OK"
asyncTest "subscribe errors":
## Tests most common error paths while subscribing
# Given
let
wakuFilter = server.wakuFilter
clientPeerId = client.switch.peerInfo.peerId
serverPeerId = server.switch.peerInfo.peerId
peerManager = server.peerManager
## connect both switches
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
## Incomplete filter criteria
# When
let
reqNoPubsubTopic = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = none(PubsubTopic),
contentTopics = @[DefaultContentTopic],
)
reqNoContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[],
)
response1 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqNoPubsubTopic)
response2 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqNoContentTopics)
# Then
check:
response1.requestId == reqNoPubsubTopic.requestId
response2.requestId == reqNoContentTopics.requestId
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response1.statusDesc.get().contains(
"pubsubTopic and contentTopics must be specified"
)
response2.statusDesc.get().contains(
"pubsubTopic and contentTopics must be specified"
)
## Max content topics per request exceeded
# When
let
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(
ContentTopic("/waku/2/content-$#/proto" % [$it])
)
reqTooManyContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = contentTopics,
)
response3 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqTooManyContentTopics)
# Then
check:
response3.requestId == reqTooManyContentTopics.requestId
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response3.statusDesc.get().contains("exceeds maximum content topics")
## Max filter criteria exceeded
# When
let filterCriteria = toSeq(1 .. MaxFilterCriteriaPerPeer).mapIt(
(DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it]))
)
discard await wakuFilter.subscriptions.addSubscription(
clientPeerId, filterCriteria.toHashSet(), peerManager
)
let
reqTooManyFilterCriteria = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
response4 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqTooManyFilterCriteria)
# Then
check:
response4.requestId == reqTooManyFilterCriteria.requestId
response4.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
response4.statusDesc.get().contains(
"peer has reached maximum number of filter criteria"
)
## Max subscriptions exceeded
# When
await wakuFilter.subscriptions.removePeer(clientPeerId)
wakuFilter.subscriptions.cleanUp()
var peers = newSeq[WakuNode](MaxFilterPeers)
for index in 0 ..< MaxFilterPeers:
peers[index] = newTestWakuNode(
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23551 + index)
)
await peers[index].start()
await peers[index].mountFilterClient()
## connect switches
debug "establish connection", peerId = peers[index].peerInfo.peerId
await server.switch.connect(
peers[index].switch.peerInfo.peerId, peers[index].switch.peerInfo.listenAddrs
)
debug "adding subscription"
(
await wakuFilter.subscriptions.addSubscription(
peers[index].switch.peerInfo.peerId,
@[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet(),
peerManager,
)
).isOkOr:
assert false, $error
let
reqTooManySubscriptions = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
response5 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqTooManySubscriptions)
# Then
check:
response5.requestId == reqTooManySubscriptions.requestId
response5.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
response5.statusDesc.get().contains(
"node has reached maximum number of subscriptions"
)
## stop the peers
for index in 0 ..< MaxFilterPeers:
await peers[index].stop()
asyncTest "unsubscribe errors":
## Tests most common error paths while unsubscribing
# Given
let
wakuFilter = server.wakuFilter
clientPeerId = client.switch.peerInfo.peerId
serverPeerId = server.switch.peerInfo.peerId
## connect both switches
await client.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
## Incomplete filter criteria
# When
let
reqNoPubsubTopic = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = none(PubsubTopic),
contentTopics = @[DefaultContentTopic],
)
reqNoContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[],
)
response1 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqNoPubsubTopic)
response2 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqNoContentTopics)
# Then
check:
response1.requestId == reqNoPubsubTopic.requestId
response2.requestId == reqNoContentTopics.requestId
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response1.statusDesc.get().contains(
"pubsubTopic and contentTopics must be specified"
)
response2.statusDesc.get().contains(
"pubsubTopic and contentTopics must be specified"
)
## Max content topics per request exceeded
# When
let
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(
ContentTopic("/waku/2/content-$#/proto" % [$it])
)
reqTooManyContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = contentTopics,
)
response3 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqTooManyContentTopics)
# Then
check:
response3.requestId == reqTooManyContentTopics.requestId
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response3.statusDesc.get().contains("exceeds maximum content topics")
## Subscription not found - unsubscribe
# When
let
reqSubscriptionNotFound = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
response4 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqSubscriptionNotFound)
# Then
check:
response4.requestId == reqSubscriptionNotFound.requestId
response4.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response4.statusDesc.get().contains("peer has no subscriptions")
## Subscription not found - unsubscribe all
# When
let
reqUnsubscribeAll =
createRequest(filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL)
response5 =
await wakuFilter.handleSubscribeRequest(clientPeerId, reqUnsubscribeAll)
# Then
check:
response5.requestId == reqUnsubscribeAll.requestId
response5.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response5.statusDesc.get().contains("peer has no subscriptions")
suite "Waku Filter - subscription maintenance":
asyncTest "simple maintenance":
# Given
let
wakuFilter = server.wakuFilter
clientPeerId = client.switch.peerInfo.peerId
serverPeerId = server.switch.peerInfo.peerId
peerManager = server.peerManager
let
client1 = newTestWakuNode(
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23552)
)
client2 = newTestWakuNode(
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23553)
)
client3 = newTestWakuNode(
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23554)
)
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
## connect both switches
await client1.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
await client2.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
await client3.switch.connect(serverPeerId, server.switch.peerInfo.listenAddrs)
await client1.start()
await client2.start()
await client3.start()
defer:
await client1.stop()
await client2.stop()
await client3.stop()
await client1.mountFilterClient()
await client2.mountFilterClient()
await client3.mountFilterClient()
# When
server.switch.peerStore[ProtoBook][client1.switch.peerInfo.peerId] =
@[WakuFilterPushCodec]
server.switch.peerStore[ProtoBook][client2.switch.peerInfo.peerId] =
@[WakuFilterPushCodec]
server.switch.peerStore[ProtoBook][client3.switch.peerInfo.peerId] =
@[WakuFilterPushCodec]
check:
(
await wakuFilter.handleSubscribeRequest(
client1.switch.peerInfo.peerId, filterSubscribeRequest
)
).statusCode == 200
(
await wakuFilter.handleSubscribeRequest(
client2.switch.peerInfo.peerId, filterSubscribeRequest
)
).statusCode == 200
(
await wakuFilter.handleSubscribeRequest(
client3.switch.peerInfo.peerId, filterSubscribeRequest
)
).statusCode == 200
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 3
wakuFilter.subscriptions.isSubscribed(client1.switch.peerInfo.peerId)
wakuFilter.subscriptions.isSubscribed(client2.switch.peerInfo.peerId)
wakuFilter.subscriptions.isSubscribed(client1.switch.peerInfo.peerId)
# When
# Maintenance loop should leave all peers in peer store intact
await wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 3
wakuFilter.subscriptions.isSubscribed(client1.switch.peerInfo.peerId)
wakuFilter.subscriptions.isSubscribed(client2.switch.peerInfo.peerId)
wakuFilter.subscriptions.isSubscribed(client1.switch.peerInfo.peerId)
# When
# Remove peerId1 and peerId3 from peer store
server.switch.peerStore.del(client1.switch.peerInfo.peerId)
server.switch.peerStore.del(client3.switch.peerInfo.peerId)
await wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.isSubscribed(client2.switch.peerInfo.peerId)
# When
# Remove peerId2 from peer store
server.switch.peerStore.del(client2.switch.peerInfo.peerId)
await wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 0

View File

@ -1,4 +1,3 @@
{.used.}
import
./test_waku_client, ./test_waku_filter_protocol, ./test_waku_filter_dos_protection
import ./test_waku_client, ./test_waku_filter_dos_protection

View File

@ -10,10 +10,10 @@ import
libp2p/peerstore
import
waku/node/peer_manager,
waku/node/[peer_manager, waku_node],
waku/waku_core,
waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec],
../testlib/[wakucore, testasync, testutils, futures, sequtils],
../testlib/[wakucore, testasync, testutils, futures, sequtils, wakunode],
./waku_filter_utils,
../resources/payloads
@ -2225,12 +2225,9 @@ suite "Waku Filter - End to End":
pushedMsg == msg
suite "Subscription timeout":
var serverSwitch {.threadvar.}: Switch
var clientSwitch {.threadvar.}: Switch
var clientSwitch2nd {.threadvar.}: Switch
var wakuFilter {.threadvar.}: WakuFilter
var wakuFilterClient {.threadvar.}: WakuFilterClient
var wakuFilterClient2nd {.threadvar.}: WakuFilterClient
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
var client2nd {.threadvar.}: WakuNode
var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
var pubsubTopic {.threadvar.}: PubsubTopic
var contentTopic {.threadvar.}: ContentTopic
@ -2264,43 +2261,42 @@ suite "Waku Filter - End to End":
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
contentTopicSeq = @[contentTopic]
serverSwitch = newStandardSwitch()
clientSwitch = newStandardSwitch()
clientSwitch2nd = newStandardSwitch()
wakuFilter = await newTestWakuFilter(serverSwitch, 2.seconds)
wakuFilterClient = await newTestWakuFilterClient(clientSwitch)
wakuFilterClient2nd = await newTestWakuFilterClient(clientSwitch2nd)
await allFutures(
serverSwitch.start(), clientSwitch.start(), clientSwitch2nd.start()
)
wakuFilterClient.registerPushHandler(messagePushHandler)
wakuFilterClient2nd.registerPushHandler(messagePushHandler2nd)
serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId
clientPeerId2nd = clientSwitch2nd.peerInfo.toRemotePeerInfo().peerId
client =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23450))
server =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23451))
client2nd =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(23452))
await allFutures(server.start(), client.start(), client2nd.start())
await client.mountFilterClient()
await client2nd.mountFilterClient()
await server.mountFilter()
client.wakuFilterClient.registerPushHandler(messagePushHandler)
client2nd.wakuFilterClient.registerPushHandler(messagePushHandler2nd)
clientPeerId = client.switch.peerInfo.peerId
clientPeerId2nd = client2nd.switch.peerInfo.peerId
serverRemotePeerInfo = server.switch.peerInfo
asyncTeardown:
await allFutures(
wakuFilter.stop(),
wakuFilterClient.stop(),
wakuFilterClient2nd.stop(),
serverSwitch.stop(),
clientSwitch.stop(),
clientSwitch2nd.stop(),
)
await allFutures(client2nd.stop(), client.stop(), server.stop())
asyncTest "client unsubscribe by timeout":
server.wakuFilter.setSubscriptionTimeout(1.seconds)
# Given
let subscribeResponse = await wakuFilterClient.subscribe(
let subscribeResponse = await client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
assert subscribeResponse.isOk(), $subscribeResponse.error
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg1 = fakeWakuMessage(contentTopic = contentTopic)
await wakuFilter.handleMessage(pubsubTopic, msg1)
await server.wakuFilter.handleMessage(pubsubTopic, msg1)
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read()
@ -2308,48 +2304,57 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic1 == pubsubTopic
pushedMsg1 == msg1
await sleepAsync(2500)
await sleepAsync(1500)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
await wakuFilter.handleMessage(pubsubTopic, msg2)
await server.wakuFilter.handleMessage(pubsubTopic, msg2)
check:
wakuFilter.subscriptions.isSubscribed(clientPeerId) == false
server.wakuFilter.subscriptions.isSubscribed(clientPeerId) == false
not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
asyncTest "client reset subscription timeout with ping":
server.wakuFilter.setSubscriptionTimeout(1.seconds)
# Given
let subscribeResponse = await wakuFilterClient.subscribe(
let subscribeResponse = await client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
assert subscribeResponse.isOk(), $subscribeResponse.error
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
assert server.wakuFilter.subscriptions.subscribedPeerCount() == 1,
"wrong num of subscribed peers"
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg1 = fakeWakuMessage(contentTopic = contentTopic)
await wakuFilter.handleMessage(pubsubTopic, msg1)
var msg1 = fakeWakuMessage(contentTopic = contentTopic)
await server.wakuFilter.handleMessage(pubsubTopic, msg1)
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read()
var (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read()
check:
pushedMsgPubsubTopic1 == pubsubTopic
pushedMsg1 == msg1
await sleepAsync(1000)
await sleepAsync(500)
let pingResponse = await wakuFilterClient.ping(serverRemotePeerInfo)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
msg1 = fakeWakuMessage(contentTopic = contentTopic)
await server.wakuFilter.handleMessage(pubsubTopic, msg1)
# the ping restarts the timeout counting. We will have 1 sec from now
let pingResponse = await client.wakuFilterClient.ping(serverRemotePeerInfo)
assert pingResponse.isOk(), $pingResponse.error
# wait more in sum of the timeout
await sleepAsync(1200)
await sleepAsync(700)
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
await wakuFilter.handleMessage(pubsubTopic, msg2)
await server.wakuFilter.handleMessage(pubsubTopic, msg2)
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read()
@ -2359,15 +2364,15 @@ suite "Waku Filter - End to End":
asyncTest "client reset subscription timeout with subscribe":
# Given
let subscribeResponse = await wakuFilterClient.subscribe(
let subscribeResponse = await client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
assert subscribeResponse.isOk(), $subscribeResponse.error
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg1 = fakeWakuMessage(contentTopic = contentTopic)
await wakuFilter.handleMessage(pubsubTopic, msg1)
await server.wakuFilter.handleMessage(pubsubTopic, msg1)
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read()
@ -2379,7 +2384,7 @@ suite "Waku Filter - End to End":
let contentTopic2nd = "content-topic-2nd"
contentTopicSeq = @[contentTopic2nd]
let subscribeResponse2nd = await wakuFilterClient.subscribe(
let subscribeResponse2nd = await client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
@ -2388,11 +2393,11 @@ suite "Waku Filter - End to End":
# wait more in sum of the timeout
await sleepAsync(1200)
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg2 = fakeWakuMessage(contentTopic = contentTopic2nd)
await wakuFilter.handleMessage(pubsubTopic, msg2)
await server.wakuFilter.handleMessage(pubsubTopic, msg2)
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read()
@ -2404,15 +2409,15 @@ suite "Waku Filter - End to End":
# Given
let contentTopic2nd = "content-topic-2nd"
contentTopicSeq.add(contentTopic2nd)
let subscribeResponse = await wakuFilterClient.subscribe(
let subscribeResponse = await client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
assert subscribeResponse.isOk(), $subscribeResponse.error
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg1 = fakeWakuMessage(contentTopic = contentTopic2nd)
await wakuFilter.handleMessage(pubsubTopic, msg1)
await server.wakuFilter.handleMessage(pubsubTopic, msg1)
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
let (pushedMsgPubsubTopic1, pushedMsg1) = pushHandlerFuture.read()
@ -2423,7 +2428,7 @@ suite "Waku Filter - End to End":
await sleepAsync(1000)
contentTopicSeq = @[contentTopic2nd]
let unsubscribeResponse = await wakuFilterClient.subscribe(
let unsubscribeResponse = await client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
@ -2432,11 +2437,11 @@ suite "Waku Filter - End to End":
# wait more in sum of the timeout
await sleepAsync(1200)
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
await wakuFilter.handleMessage(pubsubTopic, msg2)
await server.wakuFilter.handleMessage(pubsubTopic, msg2)
# shall still receive message on default content topic
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -2446,28 +2451,29 @@ suite "Waku Filter - End to End":
pushedMsg2 == msg2
asyncTest "two clients shifted subscription and timeout":
server.wakuFilter.setSubscriptionTimeout(1.seconds)
# Given
let contentTopic2nd = "content-topic-2nd"
contentTopicSeq.add(contentTopic2nd)
let subscribeResponse = await wakuFilterClient.subscribe(
let subscribeResponse = await client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
assert subscribeResponse.isOk(), $subscribeResponse.error
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
await sleepAsync(1000)
await sleepAsync(500)
let subscribeResponse2nd = await wakuFilterClient2nd.subscribe(
let subscribeResponse2nd = await client2nd.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
assert subscribeResponse2nd.isOk(), $subscribeResponse2nd.error
check wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
pushHandlerFuture2nd = newPushHandlerFuture() # Clear previous future
let msg1 = fakeWakuMessage(contentTopic = contentTopic2nd)
await wakuFilter.handleMessage(pubsubTopic, msg1)
await server.wakuFilter.handleMessage(pubsubTopic, msg1)
# both clients get messages
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -2483,14 +2489,14 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic1 == pubsubTopic
pushedMsg1 == msg1
await sleepAsync(1200)
await sleepAsync(700)
check not wakuFilter.subscriptions.isSubscribed(clientPeerId)
check not server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
pushHandlerFuture2nd = newPushHandlerFuture() # Clear previous future
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
await wakuFilter.handleMessage(pubsubTopic, msg2)
await server.wakuFilter.handleMessage(pubsubTopic, msg2)
# shall still receive message on default content topic
check not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -2500,31 +2506,31 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic2 == pubsubTopic
pushedMsg2 == msg2
await sleepAsync(1000)
await sleepAsync(500)
check not wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
check not server.wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
asyncTest "two clients timeout maintenance":
server.wakuFilter.setSubscriptionTimeout(500.milliseconds)
# Given
let contentTopic2nd = "content-topic-2nd"
contentTopicSeq.add(contentTopic2nd)
let subscribeResponse = await wakuFilterClient.subscribe(
let subscribeResponse = await client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
assert subscribeResponse.isOk(), $subscribeResponse.error
check wakuFilter.subscriptions.isSubscribed(clientPeerId)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
let subscribeResponse2nd = await wakuFilterClient2nd.subscribe(
let subscribeResponse2nd = await client2nd.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
assert subscribeResponse2nd.isOk(), $subscribeResponse2nd.error
check wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
check server.wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
pushHandlerFuture2nd = newPushHandlerFuture() # Clear previous future
let msg1 = fakeWakuMessage(contentTopic = contentTopic2nd)
await wakuFilter.handleMessage(pubsubTopic, msg1)
await server.wakuFilter.handleMessage(pubsubTopic, msg1)
# both clients get messages
check await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
@ -2540,17 +2546,17 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic1 == pubsubTopic
pushedMsg1 == msg1
await sleepAsync(2200)
await sleepAsync(700)
wakuFilter.maintainSubscriptions()
await server.wakuFilter.maintainSubscriptions()
check not wakuFilter.subscriptions.isSubscribed(clientPeerId)
check not wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
check not server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
check not server.wakuFilter.subscriptions.isSubscribed(clientPeerId2nd)
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
pushHandlerFuture2nd = newPushHandlerFuture() # Clear previous future
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
await wakuFilter.handleMessage(pubsubTopic, msg2)
await server.wakuFilter.handleMessage(pubsubTopic, msg2)
# shall still receive message on default content topic
check not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)

View File

@ -1,524 +0,0 @@
{.used.}
import
std/[options, sequtils, sets, strutils, tables],
testutils/unittests,
chronos,
chronicles,
libp2p/peerstore
import
waku/[
node/peer_manager,
waku_filter_v2,
waku_filter_v2/rpc,
waku_filter_v2/subscriptions,
waku_core,
],
../testlib/common,
../testlib/wakucore,
./waku_filter_utils
proc newTestWakuFilter(switch: Switch): WakuFilter =
let
peerManager = PeerManager.new(switch)
proto = WakuFilter.new(peerManager)
return proto
proc generateRequestId(rng: ref HmacDrbgContext): string =
var bytes: array[10, byte]
hmacDrbgGenerate(rng[], bytes)
return toHex(bytes)
proc createRequest(
filterSubscribeType: FilterSubscribeType,
pubsubTopic = none(PubsubTopic),
contentTopics = newSeq[ContentTopic](),
): FilterSubscribeRequest =
let requestId = generateRequestId(rng)
return FilterSubscribeRequest(
requestId: requestId,
filterSubscribeType: filterSubscribeType,
pubsubTopic: pubsubTopic,
contentTopics: contentTopics,
)
proc getSubscribedContentTopics(
wakuFilter: WakuFilter, peerId: PeerId
): seq[ContentTopic] =
var contentTopics: seq[ContentTopic] = @[]
let peersCreitera = wakuFilter.subscriptions.getPeerSubscriptions(peerId)
for filterCriterion in peersCreitera:
contentTopics.add(filterCriterion.contentTopic)
return contentTopics
suite "Waku Filter - handling subscribe requests":
asyncTest "simple subscribe and unsubscribe request":
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
filterUnsubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest.pubsubTopic,
contentTopics = filterSubscribeRequest.contentTopics,
)
# When
let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
response.requestId == filterSubscribeRequest.requestId
response.statusCode == 200
response.statusDesc.get() == "OK"
# When
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 0
# peerId is removed from subscriptions
response2.requestId == filterUnsubscribeRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
asyncTest "simple subscribe and unsubscribe all for multiple content topics":
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic, nonDefaultContentTopic],
)
filterUnsubscribeAllRequest =
createRequest(filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL)
# When
let response = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 2
unorderedCompare(
wakuFilter.getSubscribedContentTopics(peerId),
filterSubscribeRequest.contentTopics,
)
response.requestId == filterSubscribeRequest.requestId
response.statusCode == 200
response.statusDesc.get() == "OK"
# When
let response2 =
wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeAllRequest)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 0
# peerId is removed from subscriptions
response2.requestId == filterUnsubscribeAllRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
asyncTest "subscribe and unsubscribe to multiple content topics":
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto")
filterSubscribeRequest1 = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
filterSubscribeRequest2 = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
contentTopics = @[nonDefaultContentTopic],
)
filterUnsubscribeRequest1 = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest1.pubsubTopic,
contentTopics = filterSubscribeRequest1.contentTopics,
)
filterUnsubscribeRequest2 = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = filterSubscribeRequest2.pubsubTopic,
contentTopics = filterSubscribeRequest2.contentTopics,
)
# When
let response1 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest1)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
unorderedCompare(
wakuFilter.getSubscribedContentTopics(peerId),
filterSubscribeRequest1.contentTopics,
)
response1.requestId == filterSubscribeRequest1.requestId
response1.statusCode == 200
response1.statusDesc.get() == "OK"
# When
let response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest2)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 2
unorderedCompare(
wakuFilter.getSubscribedContentTopics(peerId),
filterSubscribeRequest1.contentTopics & filterSubscribeRequest2.contentTopics,
)
response2.requestId == filterSubscribeRequest2.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
# When
let response3 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest1)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
unorderedCompare(
wakuFilter.getSubscribedContentTopics(peerId),
filterSubscribeRequest2.contentTopics,
)
response3.requestId == filterUnsubscribeRequest1.requestId
response3.statusCode == 200
response3.statusDesc.get() == "OK"
# When
let response4 = wakuFilter.handleSubscribeRequest(peerId, filterUnsubscribeRequest2)
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 0
# peerId is removed from subscriptions
response4.requestId == filterUnsubscribeRequest2.requestId
response4.statusCode == 200
response4.statusDesc.get() == "OK"
asyncTest "subscribe errors":
## Tests most common error paths while subscribing
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
## Incomplete filter criteria
# When
let
reqNoPubsubTopic = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = none(PubsubTopic),
contentTopics = @[DefaultContentTopic],
)
reqNoContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[],
)
response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)
# Then
check:
response1.requestId == reqNoPubsubTopic.requestId
response2.requestId == reqNoContentTopics.requestId
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response1.statusDesc.get().contains(
"pubsubTopic and contentTopics must be specified"
)
response2.statusDesc.get().contains(
"pubsubTopic and contentTopics must be specified"
)
## Max content topics per request exceeded
# When
let
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(
ContentTopic("/waku/2/content-$#/proto" % [$it])
)
reqTooManyContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = contentTopics,
)
response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)
# Then
check:
response3.requestId == reqTooManyContentTopics.requestId
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response3.statusDesc.get().contains("exceeds maximum content topics")
## Max filter criteria exceeded
# When
let filterCriteria = toSeq(1 .. MaxFilterCriteriaPerPeer).mapIt(
(DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it]))
)
discard wakuFilter.subscriptions.addSubscription(peerId, filterCriteria.toHashSet())
let
reqTooManyFilterCriteria = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
response4 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyFilterCriteria)
# Then
check:
response4.requestId == reqTooManyFilterCriteria.requestId
response4.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
response4.statusDesc.get().contains(
"peer has reached maximum number of filter criteria"
)
## Max subscriptions exceeded
# When
wakuFilter.subscriptions.removePeer(peerId)
wakuFilter.subscriptions.cleanUp()
for _ in 1 .. MaxFilterPeers:
discard wakuFilter.subscriptions.addSubscription(
PeerId.random().get(), @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet()
)
let
reqTooManySubscriptions = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
response5 = wakuFilter.handleSubscribeRequest(peerId, reqTooManySubscriptions)
# Then
check:
response5.requestId == reqTooManySubscriptions.requestId
response5.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
response5.statusDesc.get().contains(
"node has reached maximum number of subscriptions"
)
asyncTest "unsubscribe errors":
## Tests most common error paths while unsubscribing
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
## Incomplete filter criteria
# When
let
reqNoPubsubTopic = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = none(PubsubTopic),
contentTopics = @[DefaultContentTopic],
)
reqNoContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[],
)
response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)
# Then
check:
response1.requestId == reqNoPubsubTopic.requestId
response2.requestId == reqNoContentTopics.requestId
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response1.statusDesc.get().contains(
"pubsubTopic and contentTopics must be specified"
)
response2.statusDesc.get().contains(
"pubsubTopic and contentTopics must be specified"
)
## Max content topics per request exceeded
# When
let
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(
ContentTopic("/waku/2/content-$#/proto" % [$it])
)
reqTooManyContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = contentTopics,
)
response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)
# Then
check:
response3.requestId == reqTooManyContentTopics.requestId
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response3.statusDesc.get().contains("exceeds maximum content topics")
## Subscription not found - unsubscribe
# When
let
reqSubscriptionNotFound = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
response4 = wakuFilter.handleSubscribeRequest(peerId, reqSubscriptionNotFound)
# Then
check:
response4.requestId == reqSubscriptionNotFound.requestId
response4.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response4.statusDesc.get().contains("peer has no subscriptions")
## Subscription not found - unsubscribe all
# When
let
reqUnsubscribeAll =
createRequest(filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL)
response5 = wakuFilter.handleSubscribeRequest(peerId, reqUnsubscribeAll)
# Then
check:
response5.requestId == reqUnsubscribeAll.requestId
response5.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response5.statusDesc.get().contains("peer has no subscriptions")
asyncTest "ping subscriber":
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
pingRequest =
createRequest(filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING)
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
# When
let response1 = wakuFilter.handleSubscribeRequest(peerId, pingRequest)
# Then
check:
response1.requestId == pingRequest.requestId
response1.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response1.statusDesc.get().contains("peer has no subscriptions")
# When
let
response2 = wakuFilter.handleSubscribeRequest(peerId, filterSubscribeRequest)
response3 = wakuFilter.handleSubscribeRequest(peerId, pingRequest)
# Then
check:
response2.requestId == filterSubscribeRequest.requestId
response2.statusCode == 200
response2.statusDesc.get() == "OK"
response3.requestId == pingRequest.requestId
response3.statusCode == 200
response3.statusDesc.get() == "OK"
suite "Waku Filter - subscription maintenance":
asyncTest "simple maintenance":
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId1 = PeerId.random().get()
peerId2 = PeerId.random().get()
peerId3 = PeerId.random().get()
filterSubscribeRequest = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic],
)
# When
switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec]
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).statusCode ==
200
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).statusCode ==
200
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).statusCode ==
200
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 3
wakuFilter.subscriptions.isSubscribed(peerId1)
wakuFilter.subscriptions.isSubscribed(peerId2)
wakuFilter.subscriptions.isSubscribed(peerId3)
# When
# Maintenance loop should leave all peers in peer store intact
wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 3
wakuFilter.subscriptions.isSubscribed(peerId1)
wakuFilter.subscriptions.isSubscribed(peerId2)
wakuFilter.subscriptions.isSubscribed(peerId3)
# When
# Remove peerId1 and peerId3 from peer store
switch.peerStore.del(peerId1)
switch.peerStore.del(peerId3)
wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 1
wakuFilter.subscriptions.isSubscribed(peerId2)
# When
# Remove peerId2 from peer store
switch.peerStore.del(peerId2)
wakuFilter.maintainSubscriptions()
# Then
check:
wakuFilter.subscriptions.subscribedPeerCount() == 0

View File

@ -468,11 +468,10 @@ proc mountFilter*(
some(rateLimitSetting),
)
if node.started:
try:
await node.wakuFilter.start()
except CatchableError:
error "failed to start wakuFilter", error = getCurrentExceptionMsg()
try:
await node.wakuFilter.start()
except CatchableError:
error "failed to start wakuFilter", error = getCurrentExceptionMsg()
try:
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
@ -496,11 +495,10 @@ proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} =
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
if node.started:
try:
await node.wakuFilterClient.start()
except CatchableError:
error "failed to start wakuFilterClient", error = getCurrentExceptionMsg()
try:
await node.wakuFilterClient.start()
except CatchableError:
error "failed to start wakuFilterClient", error = getCurrentExceptionMsg()
try:
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec))

View File

@ -2,7 +2,13 @@
{.push raises: [].}
import std/options, chronicles, chronos, libp2p/protocols/protocol, bearssl/rand
import
std/options,
chronicles,
chronos,
libp2p/protocols/protocol,
bearssl/rand,
stew/byteutils
import
../node/peer_manager,
../node/delivery_monitor/subscriptions_observer,
@ -101,6 +107,7 @@ proc sendSubscribeRequest(
proc ping*(
wfc: WakuFilterClient, servicePeer: RemotePeerInfo
): Future[FilterSubscribeResult] {.async.} =
debug "sending ping", servicePeer = shortLog($servicePeer)
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId)
@ -168,22 +175,34 @@ proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) =
proc initProtocolHandler(wfc: WakuFilterClient) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(int(DefaultMaxPushSize))
## Notice that the client component is acting as a server of WakuFilterPushCodec messages
while not conn.atEof():
var buf: seq[byte]
try:
buf = await conn.readLp(int(DefaultMaxPushSize))
except CancelledError, LPStreamError:
error "error while reading conn", error = getCurrentExceptionMsg()
let decodeRes = MessagePush.decode(buf)
if decodeRes.isErr():
error "Failed to decode message push", peerId = conn.peerId
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
let msgPush = MessagePush.decode(buf).valueOr:
error "Failed to decode message push", peerId = conn.peerId, error = $error
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
let messagePush = decodeRes.value #TODO: toAPI() split here
trace "Received message push", peerId = conn.peerId, messagePush
let msg_hash =
computeMessageHash(msgPush.pubsubTopic, msgPush.wakuMessage).to0xHex()
for handler in wfc.pushHandlers:
asyncSpawn handler(messagePush.pubsubTopic, messagePush.wakuMessage)
debug "Received message push",
peerId = conn.peerId,
msg_hash,
payload = shortLog(msgPush.wakuMessage.payload),
pubsubTopic = msgPush.pubsubTopic,
content_topic = msgPush.wakuMessage.contentTopic,
conn
# Protocol specifies no response for now
return
for handler in wfc.pushHandlers:
asyncSpawn handler(msgPush.pubsubTopic, msgPush.wakuMessage)
# Protocol specifies no response for now
wfc.handler = handler
wfc.codec = WakuFilterPushCodec

View File

@ -25,34 +25,39 @@ type WakuFilter* = ref object of LPProtocol
subscriptions*: FilterSubscriptions
# a mapping of peer ids to a sequence of filter criteria
peerManager: PeerManager
maintenanceTask: TimerCallback
messageCache: TimedCache[string]
peerRequestRateLimiter*: PerPeerRateLimiter
subscriptionsManagerFut: Future[void]
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
trace "pinging subscriber", peerId = peerId
debug "pinging subscriber", peerId = peerId
if not wf.subscriptions.isSubscribed(peerId):
debug "pinging peer has no subscriptions", peerId = peerId
error "pinging peer has no subscriptions", peerId = peerId
return err(FilterSubscribeError.notFound())
wf.subscriptions.refreshSubscription(peerId)
ok()
proc setSubscriptionTimeout*(wf: WakuFilter, newTimeout: Duration) =
wf.subscriptions.setSubscriptionTimeout(newTimeout)
proc subscribe(
wf: WakuFilter,
peerId: PeerID,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
): FilterSubscribeResult =
): Future[FilterSubscribeResult] {.async.} =
# TODO: check if this condition is valid???
if pubsubTopic.isNone() or contentTopics.len == 0:
error "pubsubTopic and contentTopics must be specified", peerId = peerId
return err(
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
)
if contentTopics.len > MaxContentTopicsPerRequest:
error "exceeds maximum content topics", peerId = peerId
return err(
FilterSubscribeError.badRequest(
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
@ -61,12 +66,14 @@ proc subscribe(
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
trace "subscribing peer to filter criteria",
debug "subscribing peer to filter criteria",
peerId = peerId, filterCriteria = filterCriteria
wf.subscriptions.addSubscription(peerId, filterCriteria).isOkOr:
(await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr:
return err(FilterSubscribeError.serviceUnavailable(error))
debug "correct subscription", peerId = peerId
ok()
proc unsubscribe(
@ -76,11 +83,13 @@ proc unsubscribe(
contentTopics: seq[ContentTopic],
): FilterSubscribeResult =
if pubsubTopic.isNone() or contentTopics.len == 0:
error "pubsubTopic and contentTopics must be specified", peerId = peerId
return err(
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
)
if contentTopics.len > MaxContentTopicsPerRequest:
error "exceeds maximum content topics", peerId = peerId
return err(
FilterSubscribeError.badRequest(
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
@ -93,27 +102,31 @@ proc unsubscribe(
peerId = peerId, filterCriteria = filterCriteria
wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
error "failed to remove subscription", error = $error
return err(FilterSubscribeError.notFound())
## Note: do not remove from peerRequestRateLimiter to prevent trick with subscribe/unsubscribe loop
## We remove only if peerManager removes the peer
debug "correct unsubscription", peerId = peerId
ok()
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
proc unsubscribeAll(
wf: WakuFilter, peerId: PeerID
): Future[FilterSubscribeResult] {.async.} =
if not wf.subscriptions.isSubscribed(peerId):
debug "unsubscribing peer has no subscriptions", peerId = peerId
return err(FilterSubscribeError.notFound())
debug "removing peer subscription", peerId = peerId
wf.subscriptions.removePeer(peerId)
await wf.subscriptions.removePeer(peerId)
wf.subscriptions.cleanUp()
ok()
proc handleSubscribeRequest*(
wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest
): FilterSubscribeResponse =
): Future[FilterSubscribeResponse] {.async.} =
info "received filter subscribe request", peerId = peerId, request = request
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
@ -127,12 +140,13 @@ proc handleSubscribeRequest*(
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
subscribeResult =
await wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
subscribeResult =
wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = wf.unsubscribeAll(peerId)
subscribeResult = await wf.unsubscribeAll(peerId)
let
requestDuration = Moment.now() - requestStartTime
@ -143,6 +157,7 @@ proc handleSubscribeRequest*(
)
if subscribeResult.isErr():
error "subscription request error", peerId = shortLog(peerId), request = request
return FilterSubscribeResponse(
requestId: request.requestId,
statusCode: subscribeResult.error.kind.uint32,
@ -152,22 +167,19 @@ proc handleSubscribeRequest*(
return FilterSubscribeResponse.ok(request.requestId)
proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
trace "pushing message to subscribed peer", peer_id = shortLog(peer)
debug "pushing message to subscribed peer", peerId = shortLog(peer)
if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
error "no addresses for peer", peer_id = shortLog(peer)
error "no addresses for peer", peerId = shortLog(peer)
return
## TODO: Check if dial is necessary always???
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
if conn.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
error "no connection to peer", peer_id = shortLog(peer)
let conn = wf.subscriptions.getConnectionByPeerId(peer).valueOr:
error "could not get connection by peer id", error = $error
return
await conn.get().writeLp(buffer)
await conn.writeLp(buffer)
debug "published successful", peerId = shortLog(peer), conn
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
)
@ -181,15 +193,17 @@ proc pushToPeers(
## it's also refresh expire of msghash, that's why update cache every time, even if it has a value.
if wf.messageCache.put(msgHash, Moment.now()):
notice "duplicate message found, not-pushing message to subscribed peers",
error "duplicate message found, not-pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
payload = shortLog(messagePush.wakuMessage.payload),
target_peer_ids = targetPeerIds,
msg_hash = msgHash
else:
notice "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
payload = shortLog(messagePush.wakuMessage.payload),
target_peer_ids = targetPeerIds,
msg_hash = msgHash
@ -201,19 +215,19 @@ proc pushToPeers(
pushFuts.add(pushFut)
await allFutures(pushFuts)
proc maintainSubscriptions*(wf: WakuFilter) =
trace "maintaining subscriptions"
proc maintainSubscriptions*(wf: WakuFilter) {.async.} =
debug "maintaining subscriptions"
## Remove subscriptions for peers that have been removed from peer store
var peersToRemove: seq[PeerId]
for peerId in wf.subscriptions.peersSubscribed.keys:
if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
debug "peer has been removed from peer store, removing subscription",
debug "peer has been removed from peer store, we will remove subscription",
peerId = peerId
peersToRemove.add(peerId)
if peersToRemove.len > 0:
wf.subscriptions.removePeers(peersToRemove)
await wf.subscriptions.removePeers(peersToRemove)
wf.peerRequestRateLimiter.unregister(peersToRemove)
wf.subscriptions.cleanUp()
@ -227,7 +241,7 @@ proc handleMessage*(
) {.async.} =
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
trace "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
let handleMessageStartTime = Moment.now()
@ -236,7 +250,7 @@ proc handleMessage*(
let subscribedPeers =
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len == 0:
trace "no subscribed peers found",
error "no subscribed peers found",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
msg_hash = msgHash
@ -270,7 +284,8 @@ proc handleMessage*(
proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async.} =
trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId)
debug "filter subscribe request handler triggered",
peerId = shortLog(conn.peerId), conn
var response: FilterSubscribeResponse
@ -290,13 +305,13 @@ proc initProtocolHandler(wf: WakuFilter) =
let request = decodeRes.value #TODO: toAPI() split here
response = wf.handleSubscribeRequest(conn.peerId, request)
response = await wf.handleSubscribeRequest(conn.peerId, request)
debug "sending filter subscribe response",
peer_id = shortLog(conn.peerId), response = response
do:
debug "filter request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wf.peerRequestRateLimiter.setting
peerId = shortLog(conn.peerId), limit = $wf.peerRequestRateLimiter.setting
response = FilterSubscribeResponse(
requestId: "N/A",
statusCode: FilterSubscribeErrorKind.TOO_MANY_REQUESTS.uint32,
@ -319,7 +334,7 @@ proc new*(
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wf = WakuFilter(
subscriptions: FilterSubscriptions.init(
subscriptions: FilterSubscriptions.new(
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
),
peerManager: peerManager,
@ -331,28 +346,19 @@ proc new*(
setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting)
return wf
const MaintainSubscriptionsInterval* = 1.minutes
proc periodicSubscriptionsMaintenance(wf: WakuFilter) {.async.} =
const MaintainSubscriptionsInterval = 1.minutes
debug "starting to maintain subscriptions"
while true:
await wf.maintainSubscriptions()
await sleepAsync(MaintainSubscriptionsInterval)
proc startMaintainingSubscriptions(wf: WakuFilter, interval: Duration) =
trace "starting to maintain subscriptions"
var maintainSubs: CallbackFunc
maintainSubs = CallbackFunc(
proc(udata: pointer) {.gcsafe.} =
maintainSubscriptions(wf)
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
)
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
method start*(wf: WakuFilter) {.async, base.} =
proc start*(wf: WakuFilter) {.async.} =
debug "starting filter protocol"
wf.startMaintainingSubscriptions(MaintainSubscriptionsInterval)
await procCall LPProtocol(wf).start()
wf.subscriptionsManagerFut = wf.periodicSubscriptionsMaintenance()
method stop*(wf: WakuFilter) {.async, base.} =
proc stop*(wf: WakuFilter) {.async.} =
debug "stopping filter protocol"
if not wf.maintenanceTask.isNil():
wf.maintenanceTask.clearTimer()
await wf.subscriptionsManagerFut.cancelAndWait()
await procCall LPProtocol(wf).stop()

View File

@ -90,3 +90,7 @@ proc writeValue*(
if value.contentTopics.len > 0:
writer.writeField("contentTopics", value.contentTopics)
writer.endRecord()
proc `$`*(self: MessagePush): string =
let msg_hash = computeMessageHash(self.pubsubTopic, self.wakuMessage)
return "msg_hash: " & shortLog(msg_hash) & " pubsubTopic: " & self.pubsubTopic

View File

@ -1,13 +1,24 @@
{.push raises: [].}
import std/[sets, tables], chronicles, chronos, libp2p/peerid, stew/shims/sets
import ../waku_core, ../utils/tableutils
import
std/[options, sets, tables, sequtils],
chronicles,
chronos,
libp2p/peerid,
libp2p/stream/connection,
stew/shims/sets
import
../waku_core,
../utils/tableutils,
../common/rate_limit/setting,
../node/peer_manager,
./common
logScope:
topics = "waku filter subscriptions"
const
MaxFilterPeers* = 1000
MaxFilterPeers* = 100
MaxFilterCriteriaPerPeer* = 1000
DefaultSubscriptionTimeToLiveSec* = 5.minutes
MessageCacheTTL* = 2.minutes
@ -20,16 +31,16 @@ type
SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint]
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint, connection: Connection]
FilterSubscriptions* = object
FilterSubscriptions* = ref object
peersSubscribed*: Table[PeerID, PeerData]
subscriptions: Table[FilterCriterion, SubscribedPeers]
subscriptions*: Table[FilterCriterion, SubscribedPeers]
subscriptionTimeout: Duration
maxPeers: uint
maxCriteriaPerPeer: uint
proc init*(
proc new*(
T: type FilterSubscriptions,
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = MaxFilterPeers,
@ -44,7 +55,7 @@ proc init*(
maxCriteriaPerPeer: maxFilterCriteriaPerPeer,
)
proc isSubscribed*(s: var FilterSubscriptions, peerId: PeerID): bool =
proc isSubscribed*(s: FilterSubscriptions, peerId: PeerID): bool =
s.peersSubscribed.withValue(peerId, data):
return Moment.now() - data.lastSeen <= s.subscriptionTimeout
@ -54,7 +65,7 @@ proc subscribedPeerCount*(s: FilterSubscriptions): uint =
return cast[uint](s.peersSubscribed.len)
proc getPeerSubscriptions*(
s: var FilterSubscriptions, peerId: PeerID
s: FilterSubscriptions, peerId: PeerID
): seq[FilterCriterion] =
## Get all pubsub-content topics a peer is subscribed to
var subscribedContentTopics: seq[FilterCriterion] = @[]
@ -69,7 +80,7 @@ proc getPeerSubscriptions*(
return subscribedContentTopics
proc findSubscribedPeers*(
s: var FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
s: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
): seq[PeerID] =
let filterCriterion: FilterCriterion = (pubsubTopic, contentTopic)
@ -80,17 +91,43 @@ proc findSubscribedPeers*(
if s.isSubscribed(peer):
foundPeers.add(peer)
debug "findSubscribedPeers result",
filter_criterion = filterCriterion,
subscr_set = s.subscriptions,
found_peers = foundPeers
return foundPeers
proc removePeer*(s: var FilterSubscriptions, peerId: PeerID) =
proc removePeer*(s: FilterSubscriptions, peerId: PeerID) {.async.} =
## Remove all subscriptions for a given peer
debug "removePeer",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
s.peersSubscribed.withValue(peerId, peerData):
debug "closing connection with peer", peerId = shortLog(peerId)
await peerData.connection.close()
s.peersSubscribed.del(peerId)
proc removePeers*(s: var FilterSubscriptions, peerIds: seq[PeerID]) =
## Remove all subscriptions for a given list of peers
s.peersSubscribed.keepItIf(key notin peerIds)
debug "removePeer after deletion",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
proc removePeers*(s: FilterSubscriptions, peerIds: seq[PeerID]) {.async.} =
## Remove all subscriptions for a given list of peers
debug "removePeers",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
peerIds = peerIds.mapIt(shortLog(it))
for peer in peerIds:
await s.removePeer(peer)
debug "removePeers after deletion",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
peerIds = peerIds.mapIt(shortLog(it))
proc cleanUp*(fs: FilterSubscriptions) =
debug "cleanUp", currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
proc cleanUp*(fs: var FilterSubscriptions) =
## Remove all subscriptions for peers that have not been seen for a while
let now = Moment.now()
fs.peersSubscribed.keepItIf(now - val.lastSeen <= fs.subscriptionTimeout)
@ -101,14 +138,23 @@ proc cleanUp*(fs: var FilterSubscriptions) =
fs.subscriptions.keepItIf(val.len > 0)
debug "after cleanUp",
currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
s.peersSubscribed.withValue(peerId, data):
data.lastSeen = Moment.now()
proc addSubscription*(
s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
): Result[void, string] =
s: FilterSubscriptions,
peerId: PeerID,
filterCriteria: FilterCriteria,
peerManager: PeerManager,
): Future[Result[void, string]] {.async.} =
## Add a subscription for a given peer
##
## The peerManager is needed to establish the first Connection through
## /vac/waku/filter-push/2.0.0-beta1
var peerData: ptr PeerData
s.peersSubscribed.withValue(peerId, data):
@ -120,9 +166,19 @@ proc addSubscription*(
do:
## not yet subscribed
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
return err("node has reached maximum number of subscriptions")
return err("node has reached maximum number of subscriptions: " & $(s.maxPeers))
let connRes = await peerManager.dialPeer(peerId, WakuFilterPushCodec)
if connRes.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
return err("addSubscription no connection to peer: " & shortLog(peerId))
let newPeerData: PeerData =
(lastSeen: Moment.now(), criteriaCount: 0, connection: connRes.get())
debug "new WakuFilterPushCodec stream", conn = connRes.get()
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
for filterCriterion in filterCriteria:
@ -131,10 +187,13 @@ proc addSubscription*(
peersOfSub[].incl(peerId)
peerData.criteriaCount += 1
debug "subscription added correctly",
new_peer = shortLog(peerId), subscr_set = s.subscriptions
return ok()
proc removeSubscription*(
s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
s: FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
): Result[void, string] =
## Remove a subscription for a given peer
@ -156,3 +215,15 @@ proc removeSubscription*(
return ok()
do:
return err("Peer has no subscriptions")
proc getConnectionByPeerId*(
s: FilterSubscriptions, peerId: PeerID
): Result[Connection, string] =
if not s.peersSubscribed.hasKey(peerId):
return err("peer not subscribed: " & shortLog(peerId))
let peerData = s.peersSubscribed.getOrDefault(peerId)
return ok(peerData.connection)
proc setSubscriptionTimeout*(s: FilterSubscriptions, newTimeout: Duration) =
s.subscriptionTimeout = newTimeout

View File

@ -71,6 +71,8 @@ proc request*(
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
error "exception when handling peer exchange request",
error = getCurrentExceptionMsg()
waku_px_errors.inc(labelValues = [exc.msg])
callResult = (
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
@ -81,10 +83,12 @@ proc request*(
await conn.closeWithEof()
if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS:
error "peer exchange request failed", status_code = callResult.status_code
return err(callResult)
let decodedBuff = PeerExchangeRpc.decode(buffer)
if decodedBuff.isErr():
error "peer exchange request error decoding buffer", error = $decodedBuff.error
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
@ -92,6 +96,8 @@ proc request*(
)
)
if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
error "peer exchange request error",
status_code = decodedBuff.get().response.status_code
return err(
(
status_code: decodedBuff.get().response.status_code,
@ -107,6 +113,7 @@ proc request*(
try:
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
if connOpt.isNone():
error "error in request connOpt is none"
return err(
(
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
@ -115,6 +122,7 @@ proc request*(
)
return await wpx.request(numPeers, connOpt.get())
except CatchableError:
error "peer exchange request exception", error = getCurrentExceptionMsg()
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
@ -128,6 +136,7 @@ proc request*(
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
error "peer exchange error peerOpt is none"
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
@ -144,6 +153,7 @@ proc respond(
try:
await conn.writeLP(rpc.encode().buffer)
except CatchableError as exc:
error "exception when trying to send a respond", error = getCurrentExceptionMsg()
waku_px_errors.inc(labelValues = [exc.msg])
return err(
(
@ -165,6 +175,7 @@ proc respondError(
try:
await conn.writeLP(rpc.encode().buffer)
except CatchableError as exc:
error "exception when trying to send a respond", error = getCurrentExceptionMsg()
waku_px_errors.inc(labelValues = [exc.msg])
return err(
(
@ -192,15 +203,15 @@ proc getEnrsFromCache(
proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
if peer.origin != Discv5:
trace "peer not from discv5", peer = $peer, origin = $peer.origin
debug "peer not from discv5", peer = $peer, origin = $peer.origin
return false
if peer.enr.isNone():
trace "peer has no ENR", peer = $peer
debug "peer has no ENR", peer = $peer
return false
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
trace "peer has mismatching cluster", peer = $peer
debug "peer has mismatching cluster", peer = $peer
return false
return true
@ -218,6 +229,7 @@ proc populateEnrCache(wpx: WakuPeerExchange) =
# swap cache for new
wpx.enrCache = newEnrCache
debug "ENR cache populated"
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
# try more aggressively to fill the cache at startup
@ -237,6 +249,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
try:
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
error "exception when handling px request", error = getCurrentExceptionMsg()
waku_px_errors.inc(labelValues = [exc.msg])
(
@ -260,8 +273,8 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
error "Failed to respond with BAD_REQUEST:", error = $error
return
trace "peer exchange request received"
let enrs = wpx.getEnrsFromCache(decBuf.get().request.numPeers)
debug "peer exchange request received", enrs = $enrs
(await wpx.respond(enrs, conn)).isErrOr:
waku_px_peers_sent.inc(enrs.len().int64())
do: