diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 828e7fbbd..877c7044e 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -468,6 +468,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = let peerInfo = parsePeerInfo(conf.filternode) if peerInfo.isOk(): await node.mountFilter() + await node.mountLegacyFilter() await node.mountFilterClient() node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec) @@ -507,7 +508,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = echo "A spam message is found and discarded" chat.prompt = false showChatPrompt(chat) - + echo "rln-relay preparation is in progress..." let rlnConf = WakuRlnConfig( diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 106eaf7e5..375f6c196 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -288,6 +288,7 @@ when isMainModule: if conf.filter: waitFor mountFilter(bridge.nodev2) + waitFor mountLegacyFilter(bridge.nodev2) if conf.staticnodes.len > 0: waitFor connectToNodes(bridge.nodev2, conf.staticnodes) diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index e724f17a9..0863eefe6 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -254,7 +254,7 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 = ) WakuDiscoveryV5.new( - app.rng, + app.rng, discv5Conf, some(app.record), some(app.node.peerManager), @@ -326,7 +326,7 @@ proc setupWakuApp*(app: var App): AppResult[void] = ok() proc getPorts(listenAddrs: seq[MultiAddress]): - AppResult[tuple[tcpPort, websocketPort: Option[Port]]] = + AppResult[tuple[tcpPort, websocketPort: Option[Port]]] = var tcpPort, websocketPort = none(Port) @@ -548,7 +548,15 @@ proc setupProtocols(node: WakuNode, # Filter setup. NOTE Must be mounted after relay if conf.filter: try: - await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) + await mountLegacyFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) + except CatchableError: + return err("failed to mount waku legacy filter protocol: " & getCurrentExceptionMsg()) + + try: + await mountFilter(node, + subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout), + maxFilterPeers = conf.filterMaxPeersToServe, + maxFilterCriteriaPerPeer = conf.filterMaxCriteria) except CatchableError: return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) @@ -724,7 +732,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon let filterCache = MessageCache.init() - let filterDiscoHandler = + let filterDiscoHandler = if app.wakuDiscv5.isSome(): some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter)) else: none(DiscoveryHandler) @@ -739,7 +747,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode" ## Store REST API - let storeDiscoHandler = + let storeDiscoHandler = if app.wakuDiscv5.isSome(): some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store)) else: none(DiscoveryHandler) @@ -749,7 +757,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon ## Light push API if conf.lightpushnode != "" and app.node.wakuLightpushClient != nil: - let lightDiscoHandler = + let lightDiscoHandler = if app.wakuDiscv5.isSome(): some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush)) else: none(DiscoveryHandler) diff --git a/apps/wakunode2/external_config.nim b/apps/wakunode2/external_config.nim index 111c622e9..22235af69 100644 --- a/apps/wakunode2/external_config.nim +++ b/apps/wakunode2/external_config.nim @@ -95,6 +95,7 @@ type defaultValue: false, name: "execute" .}: bool + of noCommand: ## Application-level configuration protectedTopics* {. @@ -221,7 +222,7 @@ type desc: "Rln relay identity commitment key as a Hex string", defaultValue: "" name: "rln-relay-id-commitment-key" }: string - + rlnRelayTreePath* {. desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)", defaultValue: "" @@ -304,10 +305,25 @@ type name: "filternode" }: string filterTimeout* {. - desc: "Timeout for filter node in seconds.", + desc: "Filter clients will be wiped out if not able to receive push messages within this timeout. In seconds.", defaultValue: 14400 # 4 hours name: "filter-timeout" }: int64 + filterSubscriptionTimeout* {. + desc: "Timeout for filter subscription without ping or refresh it, in seconds. Only for v2 filter protocol.", + defaultValue: 300 # 5 minutes + name: "filter-subscription-timeout" }: int64 + + filterMaxPeersToServe* {. + desc: "Maximum number of peers to serve at a time. Only for v2 filter protocol.", + defaultValue: 1000 + name: "filter-max-peers-to-serve" }: uint32 + + filterMaxCriteria* {. + desc: "Maximum number of pubsub- and content topic combination per peers at a time. Only for v2 filter protocol.", + defaultValue: 1000 + name: "filter-max-criteria" }: uint32 + ## Lightpush config lightpush* {. diff --git a/tests/node/test_wakunode_filter.nim b/tests/node/test_wakunode_filter.nim index 7b7024b54..d486eb180 100644 --- a/tests/node/test_wakunode_filter.nim +++ b/tests/node/test_wakunode_filter.nim @@ -1,9 +1,9 @@ -{.used.} +{.used.} import std/[ - options, - tables, + options, + tables, sequtils ], stew/shims/net as stewNet, @@ -12,7 +12,7 @@ import chronicles, os, libp2p/[ - peerstore, + peerstore, crypto/crypto ] @@ -44,7 +44,7 @@ suite "Waku Filter - End to End": var contentTopicSeq {.threadvar.}: seq[ContentTopic] var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)] var messagePushHandler {.threadvar.}: FilterPushHandler - + asyncSetup: pushHandlerFuture = newFuture[(string, WakuMessage)]() messagePushHandler = proc( @@ -84,8 +84,8 @@ suite "Waku Filter - End to End": # Then the subscription is successful check: subscribeResponse.isOk() - server.wakuFilter.subscriptions.len == 1 - server.wakuFilter.subscriptions.hasKey(clientPeerId) + server.wakuFilter.subscriptions.subscribedPeerCount() == 1 + server.wakuFilter.subscriptions.isSubscribed(clientPeerId) # When sending a message to the subscribed content topic let msg1 = fakeWakuMessage(contentTopic=contentTopic) @@ -106,7 +106,7 @@ suite "Waku Filter - End to End": # Then the unsubscription is successful check: unsubscribeResponse.isOk() - server.wakuFilter.subscriptions.len == 0 + server.wakuFilter.subscriptions.subscribedPeerCount() == 0 # When sending a message to the previously subscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future @@ -116,7 +116,7 @@ suite "Waku Filter - End to End": # Then the message is not pushed to the client check: not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - + asyncTest "Client Node can't receive Push from Server Node, via Relay": # Given the server node has Relay enabled await server.mountRelay() @@ -127,7 +127,7 @@ suite "Waku Filter - End to End": ) require: subscribeResponse.isOk() - server.wakuFilter.subscriptions.len == 1 + server.wakuFilter.subscriptions.subscribedPeerCount() == 1 # When a server node gets a Relay message let msg1 = fakeWakuMessage(contentTopic=contentTopic) @@ -141,7 +141,7 @@ suite "Waku Filter - End to End": let serverKey = generateSecp256k1Key() server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) - + waitFor server.start() waitFor server.mountRelay() @@ -162,8 +162,8 @@ suite "Waku Filter - End to End": ) require: subscribeResponse.isOk() - server.wakuFilter.subscriptions.len == 1 - + server.wakuFilter.subscriptions.subscribedPeerCount() == 1 + # And the client node reboots waitFor client.stop() waitFor client.start() @@ -189,8 +189,8 @@ suite "Waku Filter - End to End": ) require: subscribeResponse.isOk() - server.wakuFilter.subscriptions.len == 1 - + server.wakuFilter.subscriptions.subscribedPeerCount() == 1 + # And the client node reboots waitFor client.stop() waitFor client.start() @@ -209,7 +209,7 @@ suite "Waku Filter - End to End": ) check: subscribeResponse2.isOk() - server.wakuFilter.subscriptions.len == 1 + server.wakuFilter.subscriptions.subscribedPeerCount() == 1 # When a message is sent to the subscribed content topic, via Relay pushHandlerFuture = newPushHandlerFuture() diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 8576f2725..25f4a9e77 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -53,6 +53,7 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountFilter())) + await allFutures(nodes.mapIt(it.mountLegacyFilter())) # Dial node2 from node1 let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec) @@ -528,6 +529,7 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountFilter())) + await allFutures(nodes.mapIt(it.mountLegacyFilter())) let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) @@ -579,6 +581,7 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountFilter())) + await allFutures(nodes.mapIt(it.mountLegacyFilter())) let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) diff --git a/tests/test_wakunode_filter_legacy.nim b/tests/test_wakunode_filter_legacy.nim index cf0e37bc9..85b8c37db 100644 --- a/tests/test_wakunode_filter_legacy.nim +++ b/tests/test_wakunode_filter_legacy.nim @@ -29,6 +29,7 @@ suite "WakuNode - Filter": waitFor allFutures(server.start(), client.start()) waitFor server.mountFilter() + waitFor server.mountLegacyFilter() waitFor client.mountFilterClient() ## Given diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index 3d4d489f3..b0f2a653f 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -2,10 +2,10 @@ import std/[ - options, - tables, - sequtils, - strutils, + options, + tables, + sequtils, + strutils, json ], testutils/unittests, @@ -40,7 +40,6 @@ import ./waku_filter_utils, ../resources/payloads - suite "Waku Filter - End to End": suite "MessagePushHandler - Void": var serverSwitch {.threadvar.}: Switch @@ -77,27 +76,26 @@ suite "Waku Filter - End to End": wakuFilterClient.registerPushHandler(messagePushHandler) serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId - + asyncTeardown: await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) suite "Subscriber Ping": asyncTest "Active Subscription Identification": # Given - let + let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) assert subscribeResponse.isOk(), $subscribeResponse.error - check wakuFilter.subscriptions.hasKey(clientPeerId) + check wakuFilter.subscriptions.isSubscribed(clientPeerId) # When let subscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) - # Then assert subscribedPingResponse.isOk(), $subscribedPingResponse.error check: - wakuFilter.subscriptions.hasKey(clientPeerId) + wakuFilter.subscriptions.isSubscribed(clientPeerId) asyncTest "No Active Subscription Identification": # When @@ -107,26 +105,24 @@ suite "Waku Filter - End to End": check: unsubscribedPingResponse.isErr() # Not subscribed unsubscribedPingResponse.error().kind == FilterSubscribeErrorKind.NOT_FOUND - asyncTest "After Unsubscription": # Given - let + let subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) assert subscribeResponse.isOk(), $subscribeResponse.error - check wakuFilter.subscriptions.hasKey(clientPeerId) + check wakuFilter.subscriptions.isSubscribed(clientPeerId) # When let unsubscribeResponse = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) assert unsubscribeResponse.isOk(), $unsubscribeResponse.error - check not wakuFilter.subscriptions.hasKey(clientPeerId) + check not wakuFilter.subscriptions.isSubscribed(clientPeerId) let unsubscribedPingResponse = await wakuFilterClient.ping(serverRemotePeerInfo) - # Then check: unsubscribedPingResponse.isErr() # Not subscribed @@ -171,9 +167,9 @@ suite "Waku Filter - End to End": # Then the subscription is successful assert subscribeResponse.isOk(), $subscribeResponse.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), contentTopicSeq) # When sending a message to the subscribed content topic let msg1 = fakeWakuMessage(contentTopic=contentTopic) @@ -185,7 +181,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic == pubsubTopic pushedMsg == msg1 - + # When sending a message to a non-subscribed content topic (before unsubscription) pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg2 = fakeWakuMessage(contentTopic=nonExistentContentTopic) @@ -201,7 +197,7 @@ suite "Waku Filter - End to End": ) assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - wakuFilter.subscriptions.len == 0 + wakuFilter.subscriptions.subscribedPeerCount() == 0 # When sending a message to the previously unsubscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future @@ -211,7 +207,7 @@ suite "Waku Filter - End to End": # Then the message is not pushed to the client check: not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - + # When sending a message to a non-subscribed content topic (after unsubscription) pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg4 = fakeWakuMessage(contentTopic=nonExistentContentTopic) @@ -233,9 +229,9 @@ suite "Waku Filter - End to End": ) assert subscribeResponse.isOk(), $subscribeResponse.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), contentTopicsSeq) # When sending a message to the one of the subscribed content topics let msg1 = fakeWakuMessage(contentTopic=contentTopic) @@ -247,7 +243,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic1 == pubsubTopic pushedMsg1 == msg1 - + # When sending a message to the other subscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg2 = fakeWakuMessage(contentTopic=otherContentTopic) @@ -259,7 +255,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic2 == pubsubTopic pushedMsg2 == msg2 - + # When sending a message to a non-subscribed content topic (before unsubscription) pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg3 = fakeWakuMessage(contentTopic=nonExistentContentTopic) @@ -268,14 +264,14 @@ suite "Waku Filter - End to End": # Then the message is not pushed to the client check: not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - + # Given a valid unsubscription to an existing subscription let unsubscribeResponse = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, pubsubTopic, contentTopicsSeq ) assert unsubscribeResponse.isOk(), $unsubscribeResponse.error - check wakuFilter.subscriptions.len == 0 - + check wakuFilter.subscriptions.subscribedPeerCount() == 0 + # When sending a message to the previously unsubscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg4 = fakeWakuMessage(contentTopic=contentTopic) @@ -293,7 +289,7 @@ suite "Waku Filter - End to End": # Then the message is not pushed to the client check: not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - + # When sending a message to a non-subscribed content topic (after unsubscription) pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg6 = fakeWakuMessage(contentTopic=nonExistentContentTopic) @@ -311,15 +307,15 @@ suite "Waku Filter - End to End": # When subscribing to a pubsub topic let subscribeResponse1 = await wakuFilterClient.subscribe( - serverRemotePeerInfo, pubsubTopic, contentTopicSeq + serverRemotePeerInfo, pubsubTopic, contentTopicSeq ) # Then the subscription is successful assert subscribeResponse1.isOk(), $subscribeResponse1.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), contentTopicSeq) # When subscribing to a different pubsub topic let subscribeResponse2 = await wakuFilterClient.subscribe( @@ -329,9 +325,9 @@ suite "Waku Filter - End to End": # Then the subscription is successful assert subscribeResponse2.isOk(), $subscribeResponse2.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq & otherContentTopicSeq + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), contentTopicSeq & otherContentTopicSeq) # When sending a message to one of the subscribed content topics let msg1 = fakeWakuMessage(contentTopic=contentTopic) @@ -343,7 +339,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic1 == pubsubTopic pushedMsg1 == msg1 - + # When sending a message to the other subscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg2 = fakeWakuMessage(contentTopic=otherContentTopic) @@ -373,10 +369,10 @@ suite "Waku Filter - End to End": # Then the unsubscription is successful assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == otherContentTopicSeq - + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), otherContentTopicSeq) + # When sending a message to the previously subscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg4 = fakeWakuMessage(contentTopic=contentTopic) @@ -385,7 +381,7 @@ suite "Waku Filter - End to End": # Then the message is not pushed to the client check: not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) - + # When sending a message to the still subscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg5 = fakeWakuMessage(contentTopic=otherContentTopic) @@ -397,7 +393,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic3 == otherPubsubTopic pushedMsg3 == msg5 - + # When unsubscribing from the other subscription let unsubscribeResponse2 = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, otherPubsubTopic, otherContentTopicSeq @@ -406,7 +402,7 @@ suite "Waku Filter - End to End": # Then the unsubscription is successful assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - wakuFilter.subscriptions.len == 0 + wakuFilter.subscriptions.subscribedPeerCount() == 0 # When sending a message to the previously unsubscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future @@ -431,9 +427,9 @@ suite "Waku Filter - End to End": # Then assert subscribeResponse1.isOk(), $subscribeResponse1.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), contentTopicSeq) # When subscribing to a different content topic let subscribeResponse2 = await wakuFilterClient.subscribe( @@ -443,9 +439,9 @@ suite "Waku Filter - End to End": # Then assert subscribeResponse2.isOk(), $subscribeResponse2.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicSeq & otherContentTopicSeq + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), contentTopicSeq & otherContentTopicSeq) # When sending a message to one of the subscribed content topics let msg1 = fakeWakuMessage(contentTopic=contentTopic) @@ -457,7 +453,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic1 == pubsubTopic pushedMsg1 == msg1 - + # When sending a message to the other subscribed content topic pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg2 = fakeWakuMessage(contentTopic=otherContentTopic) @@ -485,8 +481,8 @@ suite "Waku Filter - End to End": # Then the unsubscription is successful assert unsubscribeResponse.isOk(), $unsubscribeResponse.error check: - wakuFilter.subscriptions.len == 0 - + wakuFilter.subscriptions.subscribedPeerCount() == 0 + # When sending a message the previously subscribed content topics pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg4 = fakeWakuMessage(contentTopic=contentTopic) @@ -508,15 +504,15 @@ suite "Waku Filter - End to End": # When subscribing to a pubsub topic let subscribeResponse1 = await wakuFilterClient.subscribe( - serverRemotePeerInfo, pubsubTopic, contentTopicsSeq1 + serverRemotePeerInfo, pubsubTopic, contentTopicsSeq1 ) # Then the subscription is successful assert subscribeResponse1.isOk(), $subscribeResponse1.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq1 + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), contentTopicsSeq1) # When subscribing to a different pubsub topic let subscribeResponse2 = await wakuFilterClient.subscribe( @@ -526,9 +522,9 @@ suite "Waku Filter - End to End": # Then the subscription is successful assert subscribeResponse2.isOk(), $subscribeResponse2.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == contentTopicsSeq1 & contentTopicsSeq2 + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), contentTopicsSeq1 & contentTopicsSeq2) # When sending a message to (pubsubTopic, contentTopic) let msg1 = fakeWakuMessage(contentTopic=contentTopic) @@ -552,7 +548,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic2 == pubsubTopic pushedMsg2 == msg2 - + # When sending a message to (otherPubsubTopic, contentTopic) pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg3 = fakeWakuMessage(contentTopic=contentTopic) @@ -564,7 +560,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic3 == otherPubsubTopic pushedMsg3 == msg3 - + # When sending a message to (otherPubsubTopic, otherContentTopic2) pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg4 = fakeWakuMessage(contentTopic=otherContentTopic2) @@ -576,7 +572,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic4 == otherPubsubTopic pushedMsg4 == msg4 - + # When selectively unsubscribing from (pubsubTopic, otherContentTopic1) and (otherPubsubTopic, contentTopic) let unsubscribeResponse1 = await wakuFilterClient.unsubscribe( serverRemotePeerInfo, pubsubTopic, @[otherContentTopic1] @@ -589,9 +585,9 @@ suite "Waku Filter - End to End": assert unsubscribeResponse1.isOk(), $unsubscribeResponse1.error assert unsubscribeResponse2.isOk(), $unsubscribeResponse2.error check: - wakuFilter.subscriptions.len == 1 - wakuFilter.subscriptions.hasKey(clientPeerId) - wakuFilter.getSubscribedContentTopics(clientPeerId) == @[contentTopic, otherContentTopic2] + wakuFilter.subscriptions.subscribedPeerCount() == 1 + wakuFilter.subscriptions.isSubscribed(clientPeerId) + unorderedCompare(wakuFilter.getSubscribedContentTopics(clientPeerId), @[contentTopic, otherContentTopic2]) # When sending a message to (pubsubTopic, contentTopic) pushHandlerFuture = newPushHandlerFuture() # Clear previous future @@ -616,7 +612,7 @@ suite "Waku Filter - End to End": check: pushedMsgPubsubTopic6 == otherPubsubTopic pushedMsg6 == msg6 - + # When sending a message to (pubsubTopic, otherContentTopic1) and (otherPubsubTopic, contentTopic) pushHandlerFuture = newPushHandlerFuture() # Clear previous future let msg7 = fakeWakuMessage(contentTopic=otherContentTopic1) @@ -629,7 +625,7 @@ suite "Waku Filter - End to End": not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT) asyncTest "Max Topic Size": - # Given a topic list of 30 topics + # Given a topic list of 100 topics var topicSeq: seq[string] = toSeq(0.. 0: let takeNumber = min(topicSeq.len, MaxContentTopicsPerRequest) @@ -687,14 +683,14 @@ suite "Waku Filter - End to End": assert subscribeResponse.isOk(), $subscribeResponse.error subscribedTopics.add(topicSeqBatch) topicSeq.delete(0.. MaxContentTopicsPerRequest: - return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest)) + if contentTopics.len > MaxContentTopicsPerRequest: + return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & + $MaxContentTopicsPerRequest)) let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria - if peerId in wf.subscriptions: - # We already have a subscription for this peer. Try to add the new filter criteria. - var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]()) - if peerSubscription.len() + filterCriteria.len() > MaxCriteriaPerSubscription: - return err(FilterSubscribeError.serviceUnavailable("peer has reached maximum number of filter criteria")) - - peerSubscription.incl(filterCriteria) - wf.subscriptions[peerId] = peerSubscription - else: - # We don't have a subscription for this peer yet. Try to add it. - if wf.subscriptions.len() >= MaxTotalSubscriptions: - return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions")) - debug "creating new subscription", peerId=peerId - wf.subscriptions[peerId] = filterCriteria + wf.subscriptions.addSubscription(peerId, filterCriteria).isOkOr: + return err(FilterSubscribeError.serviceUnavailable(error)) ok() -proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult = - if pubsubTopic.isNone() or contentTopics.len() == 0: +proc unsubscribe(wf: WakuFilter, + peerId: PeerID, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic]): FilterSubscribeResult = + if pubsubTopic.isNone() or contentTopics.len == 0: return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")) - if contentTopics.len() > MaxContentTopicsPerRequest: + if contentTopics.len > MaxContentTopicsPerRequest: return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest)) let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) - trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria + debug "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria - if peerId notin wf.subscriptions: - debug "unsubscribing peer has no subscriptions", peerId=peerId + wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr: return err(FilterSubscribeError.notFound()) - var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]()) - - if not peerSubscription.containsAny(filterCriteria): - debug "unsubscribing peer is not subscribed to any of the content topics in this pubsub topic", peerId=peerId, pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics - return err(FilterSubscribeError.notFound()) - - peerSubscription.excl(filterCriteria) - - if peerSubscription.len() == 0: - debug "peer has no more subscriptions, removing subscription", peerId=peerId - wf.subscriptions.del(peerId) - else: - wf.subscriptions[peerId] = peerSubscription - ok() proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = - if peerId notin wf.subscriptions: + if not wf.subscriptions.isSubscribed(peerId): debug "unsubscribing peer has no subscriptions", peerId=peerId return err(FilterSubscribeError.notFound()) debug "removing peer subscription", peerId=peerId - wf.subscriptions.del(peerId) + wf.subscriptions.removePeer(peerId) + wf.subscriptions.cleanUp() ok() -proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest): FilterSubscribeResponse = +proc handleSubscribeRequest*(wf: WakuFilter, + peerId: PeerId, + request: FilterSubscribeRequest): FilterSubscribeResponse = info "received filter subscribe request", peerId=peerId, request=request waku_filter_requests.inc(labelValues = [$request.filterSubscribeType]) @@ -134,7 +121,8 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs let requestDuration = Moment.now() - requestStartTime requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point - waku_filter_request_duration_seconds.observe(requestDurationSec, labelValues = [$request.filterSubscribeType]) + waku_filter_request_duration_seconds.observe( + requestDurationSec, labelValues = [$request.filterSubscribeType]) if subscribeResult.isErr(): return FilterSubscribeResponse( @@ -153,6 +141,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = trace "no addresses for peer", peer=peer return + ## TODO: Check if dial is necessary always??? let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec) if conn.isNone(): ## We do not remove this peer, but allow the underlying peer manager @@ -163,7 +152,10 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = await conn.get().writeLp(buffer) proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} = - debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic, contentTopic=messagePush.wakuMessage.contentTopic, peers=peers, hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() + debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic, + contentTopic=messagePush.wakuMessage.contentTopic, + peers=peers, + hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() let bufferToPublish = messagePush.encode().buffer @@ -179,18 +171,18 @@ proc maintainSubscriptions*(wf: WakuFilter) = ## Remove subscriptions for peers that have been removed from peer store var peersToRemove: seq[PeerId] - for peerId, peerSubscription in wf.subscriptions.pairs(): - ## TODO: currently we only maintain by syncing with peer store. We could - ## consider other metrics, such as subscription age, activity, etc. + for peerId in wf.subscriptions.peersSubscribed.keys: if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec): debug "peer has been removed from peer store, removing subscription", peerId=peerId peersToRemove.add(peerId) - if peersToRemove.len() > 0: + if peersToRemove.len > 0: wf.subscriptions.removePeers(peersToRemove) + wf.subscriptions.cleanUp() + ## Periodic report of number of subscriptions - waku_filter_subscriptions.set(wf.subscriptions.len().float64) + waku_filter_subscriptions.set(wf.subscriptions.peersSubscribed.len.float64) const MessagePushTimeout = 20.seconds proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} = @@ -201,7 +193,7 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa block: ## Find subscribers and push message to them let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) - if subscribedPeers.len() == 0: + if subscribedPeers.len == 0: trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic return @@ -210,11 +202,15 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa wakuMessage: message) if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout): - debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex() + debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, + contentTopic=message.contentTopic, + hash=pubsubTopic.computeMessageHash(message).to0xHex() waku_filter_errors.inc(labelValues = [pushTimeoutFailure]) else: - debug "pushed message succesfully to all subscribers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex() - + debug "pushed message succesfully to all subscribers", + pubsubTopic=pubsubTopic, + contentTopic=message.contentTopic, + hash=pubsubTopic.computeMessageHash(message).to0xHex() let handleMessageDuration = Moment.now() - handleMessageStartTime @@ -247,13 +243,21 @@ proc initProtocolHandler(wf: WakuFilter) = wf.codec = WakuFilterSubscribeCodec proc new*(T: type WakuFilter, - peerManager: PeerManager): T = + peerManager: PeerManager, + subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, + maxFilterPeers: uint32 = MaxFilterPeers, + maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer): T = let wf = WakuFilter( + subscriptions: FilterSubscriptions.init(subscriptionTimeout, + maxFilterPeers, + maxFilterCriteriaPerPeer + ), peerManager: peerManager ) + wf.initProtocolHandler() - wf + return wf const MaintainSubscriptionsInterval* = 1.minutes @@ -278,4 +282,5 @@ method stop*(wf: WakuFilter) {.async.} = debug "stopping filter protocol" if not wf.maintenanceTask.isNil(): wf.maintenanceTask.clearTimer() + await procCall LPProtocol(wf).stop() diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index bd248f54c..ba568f331 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -6,48 +6,163 @@ else: import std/[sets,tables], chronicles, - libp2p/peerid + chronos, + libp2p/peerid, + stew/shims/sets import - ../waku_core + ../waku_core, + ../utils/tableutils logScope: topics = "waku filter subscriptions" const - MaxTotalSubscriptions* = 1000 # TODO make configurable - MaxCriteriaPerSubscription* = 1000 + MaxFilterPeers* = 1000 + MaxFilterCriteriaPerPeer* = 1000 + DefaultSubscriptionTimeToLiveSec* = 5.minutes type - FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic + # a single filter criterion is fully defined by a pubsub topic and content topic + FilterCriterion* = tuple + pubsubTopic: PubsubTopic + contentTopic: ContentTopic + FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria - FilterSubscriptions* = Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria -proc findSubscribedPeers*(subscriptions: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic): seq[PeerID] = - ## Find all peers subscribed to a given topic and content topic - let filterCriterion = (pubsubTopic, contentTopic) + SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids - var subscribedPeers: seq[PeerID] + PeerData* = tuple + lastSeen: Moment + criteriaCount: uint - # TODO: for large maps, this can be optimized using a reverse index - for (peerId, criteria) in subscriptions.pairs(): - if filterCriterion in criteria: - subscribedPeers.add(peerId) + FilterSubscriptions* = object + peersSubscribed* : Table[PeerID, PeerData] + subscriptions : Table[FilterCriterion, SubscribedPeers] + subscriptionTimeout : Duration + maxPeers : uint + maxCriteriaPerPeer : uint - subscribedPeers +proc init*(T: type FilterSubscriptions, + subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, + maxFilterPeers: uint32 = MaxFilterPeers, + maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer): FilterSubscriptions = + ## Create a new filter subscription object + return FilterSubscriptions( + peersSubscribed: initTable[PeerID, PeerData](), + subscriptions: initTable[FilterCriterion, SubscribedPeers](), + subscriptionTimeout: subscriptionTimeout, + maxPeers: maxFilterPeers, + maxCriteriaPerPeer: maxFilterCriteriaPerPeer + ) -proc removePeer*(subscriptions: var FilterSubscriptions, peerId: PeerID) = +proc isSubscribed*(s: var FilterSubscriptions, peerId: PeerID): bool = + s.peersSubscribed.withValue(peerId, data): + return Moment.now() - data.lastSeen <= s.subscriptionTimeout + + return false + +proc subscribedPeerCount*(s: FilterSubscriptions): uint = + return cast[uint](s.peersSubscribed.len) + +proc getPeerSubscriptions*(s: var FilterSubscriptions, peerId: PeerID): seq[FilterCriterion] = + ## Get all pubsub-content topics a peer is subscribed to + var subscribedContentTopics: seq[FilterCriterion] = @[] + s.peersSubscribed.withValue(peerId, data): + if data.criteriaCount == 0: + return subscribedContentTopics + + for filterCriterion, subscribedPeers in s.subscriptions.mpairs: + if peerId in subscribedPeers: + subscribedContentTopics.add(filterCriterion) + + return subscribedContentTopics + +proc findSubscribedPeers*(s: var FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic): seq[PeerID] = + let filterCriterion : FilterCriterion = (pubsubTopic, contentTopic) + + var foundPeers : seq[PeerID] = @[] + # only peers subscribed to criteria and with legit subscription is counted + s.subscriptions.withValue(filterCriterion, peers): + for peer in peers[]: + if s.isSubscribed(peer): + foundPeers.add(peer) + + return foundPeers + +proc removePeer*(s: var FilterSubscriptions, peerId: PeerID) = ## Remove all subscriptions for a given peer - subscriptions.del(peerId) + s.peersSubscribed.del(peerId) -proc removePeers*(subscriptions: var FilterSubscriptions, peerIds: seq[PeerID]) = +proc removePeers*(s: var FilterSubscriptions, peerIds: seq[PeerID]) = ## Remove all subscriptions for a given list of peers - for peerId in peerIds: - subscriptions.removePeer(peerId) + s.peersSubscribed.keepItIf(key notin peerIds) + +proc cleanUp*(fs: var FilterSubscriptions) = + ## Remove all subscriptions for peers that have not been seen for a while + let now = Moment.now() + fs.peersSubscribed.keepItIf(now - val.lastSeen <= fs.subscriptionTimeout) + + var filtersToRemove: seq[FilterCriterion] = @[] + for filterCriterion, subscribedPeers in fs.subscriptions.mpairs: + subscribedPeers.keepItIf(fs.isSubscribed(it)==true) + + fs.subscriptions.keepItIf(val.len > 0) + +proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) = + s.peersSubscribed.withValue(peerId, data): + data.lastSeen = Moment.now() + +proc addSubscription*(s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria): Result[void, string] = + ## Add a subscription for a given peer + var peerData: ptr PeerData + + s.peersSubscribed.withValue(peerId, data): + if data.criteriaCount + cast[uint](filterCriteria.len) > s.maxCriteriaPerPeer: + return err("peer has reached maximum number of filter criteria") + + data.lastSeen = Moment.now() + peerData = data + + do: + ## not yet subscribed + if cast[uint](s.peersSubscribed.len) >= s.maxPeers: + return err("node has reached maximum number of subscriptions") + + let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0) + peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData)) + + for filterCriterion in filterCriteria: + var peersOfSub = addr(s.subscriptions.mgetOrPut(filterCriterion, SubscribedPeers())) + if peerId notin peersOfSub[]: + peersOfSub[].incl(peerId) + peerData.criteriaCount += 1 + + return ok() + +proc removeSubscription*(s: var FilterSubscriptions, + peerId: PeerID, + filterCriteria: FilterCriteria): + Result[void, string] = + ## Remove a subscription for a given peer + + s.peersSubscribed.withValue(peerId, peerData): + peerData.lastSeen = Moment.now() + for filterCriterion in filterCriteria: + s.subscriptions.withValue(filterCriterion, peers): + if peers[].missingOrexcl(peerId) == false: + peerData.criteriaCount -= 1 + + if peers[].len == 0: + s.subscriptions.del(filterCriterion) + if peerData.criteriaCount == 0: + s.peersSubscribed.del(peerId) + do: + ## Maybe let just run through and log it as a warning + return err("Peer was not subscribed to criterion") + + return ok() + + do: + return err("Peer has no subscriptions") + -proc containsAny*(criteria: FilterCriteria, otherCriteria: FilterCriteria): bool = - ## Check if a given pubsub topic is contained in a set of filter criteria - ## TODO: Better way to achieve this? - for criterion in otherCriteria: - if criterion in criteria: - return true - false