diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index dd5ec22fd..9ce4b56ab 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -32,6 +32,10 @@ import ./v2/waku_relay/test_waku_relay, ./v2/waku_relay/test_wakunode_relay +# Waku filter test suite +import + ./v2/waku_filter_v2/test_waku_filter, + ./v2/waku_filter_v2/test_waku_filter_protocol import # Waku v2 tests diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 0eb6d33eb..051cbb85f 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -15,20 +15,20 @@ import ./testlib/wakucore -proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilter] {.async.} = +proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilterLegacy] {.async.} = let peerManager = PeerManager.new(switch) - proto = WakuFilter.new(peerManager, rng, timeout) + proto = WakuFilterLegacy.new(peerManager, rng, timeout) await proto.start() switch.mount(proto) return proto -proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClient] {.async.} = +proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClientLegacy] {.async.} = let peerManager = PeerManager.new(switch) - proto = WakuFilterClient.new(peerManager, rng) + proto = WakuFilterClientLegacy.new(peerManager, rng) await proto.start() switch.mount(proto) diff --git a/tests/v2/waku_filter_v2/test_waku_filter.nim b/tests/v2/waku_filter_v2/test_waku_filter.nim index 066552b01..5f4824184 100644 --- a/tests/v2/waku_filter_v2/test_waku_filter.nim +++ b/tests/v2/waku_filter_v2/test_waku_filter.nim @@ -211,6 +211,9 @@ suite "Waku Filter - end to end": check: not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed + # Teardown + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + asyncTest "subscribe to multiple content topics and unsubscribe all": # Given var @@ -373,3 +376,6 @@ suite "Waku Filter - end to end": check: pushedMsgPubsubTopic3 == DefaultPubsubTopic pushedMsg3 == msg3 + + # Teardown + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) diff --git a/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim b/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim index 1280386e2..3ff12bc52 100644 --- a/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim +++ b/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options,sets,strutils,tables], + std/[options,sequtils,sets,strutils,tables], testutils/unittests, chronos, chronicles, @@ -10,6 +10,7 @@ import ../../../waku/v2/node/peer_manager, ../../../waku/v2/protocol/waku_filter_v2, ../../../waku/v2/protocol/waku_filter_v2/rpc, + ../../../waku/v2/protocol/waku_filter_v2/subscriptions, ../../../waku/v2/protocol/waku_message, ../testlib/common, ../testlib/wakucore @@ -197,6 +198,187 @@ suite "Waku Filter - handling subscribe requests": response4.statusCode == 200 response4.statusDesc.get() == "OK" + asyncTest "subscribe errors": + ## Tests most common error paths while subscribing + + # Given + let + switch = newStandardSwitch() + wakuFilter = newTestWakuFilter(switch) + peerId = PeerId.random().get() + + ## Incomplete filter criteria + + # When + let + reqNoPubsubTopic = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = none(PubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + reqNoContentTopics = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[] + ) + response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic) + response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics) + + # Then + check: + response1.requestId == reqNoPubsubTopic.requestId + response2.requestId == reqNoContentTopics.requestId + response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32 + response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32 + response1.statusDesc.get().contains("pubsubTopic and contentTopics must be specified") + response2.statusDesc.get().contains("pubsubTopic and contentTopics must be specified") + + ## Max content topics per request exceeded + + # When + let + contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(ContentTopic("/waku/2/content-$#/proto" % [$it])) + reqTooManyContentTopics = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = contentTopics + ) + response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics) + + # Then + check: + response3.requestId == reqTooManyContentTopics.requestId + response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32 + response3.statusDesc.get().contains("exceeds maximum content topics") + + ## Max filter criteria exceeded + + # When + let + filterCriteria = toSeq(1 .. MaxCriteriaPerSubscription + 1).mapIt((DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it]))) + + wakuFilter.subscriptions[peerId] = filterCriteria.toHashSet() + + let + reqTooManyFilterCriteria = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + response4 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyFilterCriteria) + + # Then + check: + response4.requestId == reqTooManyFilterCriteria.requestId + response4.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32 + response4.statusDesc.get().contains("peer has reached maximum number of filter criteria") + + ## Max subscriptions exceeded + + # When + wakuFilter.subscriptions.clear() + for _ in 1 .. MaxTotalSubscriptions: + wakuFilter.subscriptions[PeerId.random().get()] = @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet() + + let + reqTooManySubscriptions = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + response5 = wakuFilter.handleSubscribeRequest(peerId, reqTooManySubscriptions) + + # Then + check: + response5.requestId == reqTooManySubscriptions.requestId + response5.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32 + response5.statusDesc.get().contains("node has reached maximum number of subscriptions") + + asyncTest "unsubscribe errors": + ## Tests most common error paths while unsubscribing + + # Given + let + switch = newStandardSwitch() + wakuFilter = newTestWakuFilter(switch) + peerId = PeerId.random().get() + + ## Incomplete filter criteria + + # When + let + reqNoPubsubTopic = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE, + pubsubTopic = none(PubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + reqNoContentTopics = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[] + ) + response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic) + response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics) + + # Then + check: + response1.requestId == reqNoPubsubTopic.requestId + response2.requestId == reqNoContentTopics.requestId + response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32 + response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32 + response1.statusDesc.get().contains("pubsubTopic and contentTopics must be specified") + response2.statusDesc.get().contains("pubsubTopic and contentTopics must be specified") + + ## Max content topics per request exceeded + + # When + let + contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(ContentTopic("/waku/2/content-$#/proto" % [$it])) + reqTooManyContentTopics = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = contentTopics + ) + response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics) + + # Then + check: + response3.requestId == reqTooManyContentTopics.requestId + response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32 + response3.statusDesc.get().contains("exceeds maximum content topics") + + ## Subscription not found - unsubscribe + + # When + let + reqSubscriptionNotFound = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + response4 = wakuFilter.handleSubscribeRequest(peerId, reqSubscriptionNotFound) + + # Then + check: + response4.requestId == reqSubscriptionNotFound.requestId + response4.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32 + response4.statusDesc.get().contains("peer has no subscriptions") + + ## Subscription not found - unsubscribe all + + # When + let + reqUnsubscribeAll = createRequest( + filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL + ) + response5 = wakuFilter.handleSubscribeRequest(peerId, reqUnsubscribeAll) + + # Then + check: + response5.requestId == reqUnsubscribeAll.requestId + response5.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32 + response5.statusDesc.get().contains("peer has no subscriptions") + asyncTest "ping subscriber": # Given let diff --git a/tests/v2/waku_store/test_wakunode_store.nim b/tests/v2/waku_store/test_wakunode_store.nim index f0d271a21..6d81130e9 100644 --- a/tests/v2/waku_store/test_wakunode_store.nim +++ b/tests/v2/waku_store/test_wakunode_store.nim @@ -230,7 +230,7 @@ procSuite "WakuNode - Store": await sleepAsync(100.millis) # Send filter push message to server from source node - await filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message) + await filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message) # Wait for the server filter to receive the push message require await filterFut.withTimeout(5.seconds) diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim index f499da61b..abeb64679 100644 --- a/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim @@ -58,7 +58,7 @@ procSuite "Waku v2 JSON-RPC API - Filter": check: # Light node has not yet subscribed to any filters - node2.wakuFilterClient.getSubscriptionsCount() == 0 + node2.wakuFilterClientLegacy.getSubscriptionsCount() == 0 let contentFilters = @[ ContentFilter(contentTopic: DefaultContentTopic), @@ -70,13 +70,13 @@ procSuite "Waku v2 JSON-RPC API - Filter": check: response == true # Light node has successfully subscribed to 4 content topics - node2.wakuFilterClient.getSubscriptionsCount() == 4 + node2.wakuFilterClientLegacy.getSubscriptionsCount() == 4 response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic)) check: response == true # Light node has successfully unsubscribed from all filters - node2.wakuFilterClient.getSubscriptionsCount() == 0 + node2.wakuFilterClientLegacy.getSubscriptionsCount() == 0 ## Cleanup await server.stop() diff --git a/waku/v2/node/jsonrpc/admin/handlers.nim b/waku/v2/node/jsonrpc/admin/handlers.nim index 4ca5970e5..067c05f18 100644 --- a/waku/v2/node/jsonrpc/admin/handlers.nim +++ b/waku/v2/node/jsonrpc/admin/handlers.nim @@ -65,7 +65,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = connected: it.connectedness == Connectedness.Connected)) peers.add(relayPeers) - if not node.wakuFilter.isNil(): + if not node.wakuFilterLegacy.isNil(): # Map WakuFilter peers to WakuPeers and add to return list let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), diff --git a/waku/v2/node/waku_metrics.nim b/waku/v2/node/waku_metrics.nim index 3f1485a22..1986dee22 100644 --- a/waku/v2/node/waku_metrics.nim +++ b/waku/v2/node/waku_metrics.nim @@ -62,7 +62,7 @@ proc startMetricsLog*() = let pxPeers = collectorAsF64(waku_px_peers) let lightpushPeers = collectorAsF64(waku_lightpush_peers) let filterPeers = collectorAsF64(waku_filter_peers) - let filterSubscribers = collectorAsF64(waku_filter_subscribers) + let filterSubscribers = collectorAsF64(waku_legacy_filter_subscribers) info "Total connections initiated", count = $freshConnCount info "Total messages", count = totalMessages diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 135f1eae1..ad5e3064e 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -29,8 +29,9 @@ import ../protocol/waku_archive, ../protocol/waku_store, ../protocol/waku_store/client as store_client, - ../protocol/waku_filter, - ../protocol/waku_filter/client as filter_client, + ../protocol/waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed + ../protocol/waku_filter/client as filter_client, #TODO: support for legacy filter protocol will be removed + ../protocol/waku_filter_v2, ../protocol/waku_lightpush, ../protocol/waku_lightpush/client as lightpush_client, ../protocol/waku_enr, @@ -87,8 +88,9 @@ type wakuArchive*: WakuArchive wakuStore*: WakuStore wakuStoreClient*: WakuStoreClient - wakuFilter*: WakuFilter - wakuFilterClient*: WakuFilterClient + wakuFilter*: waku_filter_v2.WakuFilter + wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed + wakuFilterClientLegacy*: WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed when defined(rln): wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush @@ -247,6 +249,12 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuFilter.handleMessage(topic, msg) + ##TODO: Support for legacy filter will be removed + if node.wakuFilterLegacy.isNil(): + return + + await node.wakuFilterLegacy.handleMessage(topic, msg) + proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = if node.wakuArchive.isNil(): return @@ -395,12 +403,15 @@ proc mountRelay*(node: WakuNode, proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} = info "mounting filter protocol" - node.wakuFilter = WakuFilter.new(node.peerManager, node.rng, filterTimeout) + node.wakuFilter = WakuFilter.new(node.peerManager) + node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy if node.started: await node.wakuFilter.start() + await node.wakuFilterLegacy.start() #TODO: remove legacy - node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) + node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec)) + node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuFilterCodec)) #TODO: remove legacy proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.}= if node.wakuFilter.isNil(): @@ -408,22 +419,23 @@ proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: Wak return await node.wakuFilter.handleMessage(pubsubTopic, message) + await node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = info "mounting filter client" - node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) + node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng) if node.started: # Node has started already. Let's start filter too. - await node.wakuFilterClient.start() + await node.wakuFilterClientLegacy.start() - node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec)) + node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec)) proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - if node.wakuFilterClient.isNil(): + if node.wakuFilterClientLegacy.isNil(): error "cannot register filter subscription to topic", error="waku filter client is nil" return @@ -440,7 +452,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C handler(pubsubTopic, message) - let subRes = await node.wakuFilterClient.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer) + let subRes = await node.wakuFilterClientLegacy.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer) if subRes.isOk(): info "subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics else: @@ -450,7 +462,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = ## Unsubscribe from a content filter. - if node.wakuFilterClient.isNil(): + if node.wakuFilterClientLegacy.isNil(): error "cannot unregister filter subscription to content", error="waku filter client is nil" return @@ -459,7 +471,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId - let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer) + let unsubRes = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer) if unsubRes.isOk(): info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics else: @@ -470,7 +482,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe, deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} = ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - if node.wakuFilterClient.isNil(): + if node.wakuFilterClientLegacy.isNil(): error "cannot register filter subscription to topic", error="waku filter client is nil" return @@ -485,7 +497,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe, deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} = ## Unsubscribe from a content filter. - if node.wakuFilterClient.isNil(): + if node.wakuFilterClientLegacy.isNil(): error "cannot unregister filter subscription to content", error="waku filter client is nil" return diff --git a/waku/v2/protocol/waku_filter/client.nim b/waku/v2/protocol/waku_filter/client.nim index 6acda3b94..4ab848de7 100644 --- a/waku/v2/protocol/waku_filter/client.nim +++ b/waku/v2/protocol/waku_filter/client.nim @@ -71,13 +71,13 @@ proc getSubscriptionsCount(m: SubscriptionManager): int = type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} -type WakuFilterClient* = ref object of LPProtocol +type WakuFilterClientLegacy* = ref object of LPProtocol rng: ref rand.HmacDrbgContext peerManager: PeerManager subManager: SubscriptionManager -proc handleMessagePush(wf: WakuFilterClient, peerId: PeerId, requestId: string, rpc: MessagePush) = +proc handleMessagePush(wf: WakuFilterClientLegacy, peerId: PeerId, requestId: string, rpc: MessagePush) = for msg in rpc.messages: let pubsubTopic = Defaultstring # TODO: Extend the filter push rpc to provide the pubsub topic. This is a limitation @@ -86,24 +86,24 @@ proc handleMessagePush(wf: WakuFilterClient, peerId: PeerId, requestId: string, wf.subManager.notifySubscriptionHandler(pubsubTopic, contentTopic, msg) -proc initProtocolHandler(wf: WakuFilterClient) = +proc initProtocolHandler(wf: WakuFilterClientLegacy) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = let buffer = await conn.readLp(MaxRpcSize.int) let decodeReqRes = FilterRPC.decode(buffer) if decodeReqRes.isErr(): - waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + waku_legacy_filter_errors.inc(labelValues = [decodeRpcFailure]) return let rpc = decodeReqRes.get() trace "filter message received" if rpc.push.isNone(): - waku_filter_errors.inc(labelValues = [emptyMessagePushFailure]) + waku_legacy_filter_errors.inc(labelValues = [emptyMessagePushFailure]) # TODO: Manage the empty push message error. Perform any action? return - waku_filter_messages.inc(labelValues = ["MessagePush"]) + waku_legacy_filter_messages.inc(labelValues = ["MessagePush"]) let peerId = conn.peerId @@ -116,11 +116,11 @@ proc initProtocolHandler(wf: WakuFilterClient) = wf.handler = handle wf.codec = WakuFilterCodec -proc new*(T: type WakuFilterClient, +proc new*(T: type WakuFilterClientLegacy, peerManager: PeerManager, rng: ref rand.HmacDrbgContext): T = - let wf = WakuFilterClient( + let wf = WakuFilterClientLegacy( peerManager: peerManager, rng: rng, subManager: SubscriptionManager.init() @@ -129,7 +129,7 @@ proc new*(T: type WakuFilterClient, wf -proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= +proc sendFilterRpc(wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) if connOpt.isNone(): return err(dialFailure) @@ -138,7 +138,7 @@ proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeer await connection.writeLP(rpc.encode().buffer) return ok() -proc sendFilterRequestRpc(wf: WakuFilterClient, +proc sendFilterRequestRpc(wf: WakuFilterClientLegacy, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic], subscribe: bool, @@ -158,13 +158,13 @@ proc sendFilterRequestRpc(wf: WakuFilterClient, let sendRes = await wf.sendFilterRpc(rpc, peer) if sendRes.isErr(): - waku_filter_errors.inc(labelValues = [sendRes.error]) + waku_legacy_filter_errors.inc(labelValues = [sendRes.error]) return err(sendRes.error) return ok() -proc subscribe*(wf: WakuFilterClient, +proc subscribe*(wf: WakuFilterClientLegacy, pubsubTopic: PubsubTopic, contentTopic: ContentTopic|seq[ContentTopic], handler: FilterPushHandler, @@ -184,7 +184,7 @@ proc subscribe*(wf: WakuFilterClient, return ok() -proc unsubscribe*(wf: WakuFilterClient, +proc unsubscribe*(wf: WakuFilterClientLegacy, pubsubTopic: PubsubTopic, contentTopic: ContentTopic|seq[ContentTopic], peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} = @@ -203,8 +203,8 @@ proc unsubscribe*(wf: WakuFilterClient, return ok() -proc clearSubscriptions*(wf: WakuFilterClient) = +proc clearSubscriptions*(wf: WakuFilterClientLegacy) = wf.subManager.clear() -proc getSubscriptionsCount*(wf: WakuFilterClient): int = +proc getSubscriptionsCount*(wf: WakuFilterClientLegacy): int = wf.subManager.getSubscriptionsCount() diff --git a/waku/v2/protocol/waku_filter/protocol.nim b/waku/v2/protocol/waku_filter/protocol.nim index 05428f7df..6f9c3563c 100644 --- a/waku/v2/protocol/waku_filter/protocol.nim +++ b/waku/v2/protocol/waku_filter/protocol.nim @@ -1,201 +1,201 @@ -import - std/[options, sets, tables, sequtils], - stew/results, - chronicles, - chronos, - metrics, - bearssl/rand, - libp2p/protocols/protocol, - libp2p/crypto/crypto -import - ../waku_message, - ../../node/peer_manager, - ./rpc, - ./rpc_codec, - ./protocol_metrics - - -logScope: - topics = "waku filter" - - -const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" - - WakuFilterTimeout: Duration = 2.hours - - -type WakuFilterResult*[T] = Result[T, string] - - -## Subscription manager - -type Subscription = object - requestId: string - peer: PeerID - pubsubTopic: PubsubTopic - contentTopics: HashSet[ContentTopic] - - -proc addSubscription(subscriptions: var seq[Subscription], peer: PeerID, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]) = - let subscription = Subscription( - requestId: requestId, - peer: peer, - pubsubTopic: pubsubTopic, - contentTopics: toHashSet(contentTopics) - ) - subscriptions.add(subscription) - -proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsubscribeTopics: seq[ContentTopic]) = - for sub in subscriptions.mitems: - if sub.peer != peer: - continue - - sub.contentTopics.excl(toHashSet(unsubscribeTopics)) - - # Delete the subscriber if no more content filters left - subscriptions.keepItIf(it.contentTopics.len > 0) - - -## Protocol - -type - MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} - - WakuFilter* = ref object of LPProtocol - rng*: ref rand.HmacDrbgContext - peerManager*: PeerManager - subscriptions*: seq[Subscription] - failedPeers*: Table[string, chronos.Moment] - timeout*: chronos.Duration - -proc handleFilterRequest(wf: WakuFilter, peerId: PeerId, rpc: FilterRPC) = - let - requestId = rpc.requestId - subscribe = rpc.request.get().subscribe - pubsubTopic = rpc.request.get().pubsubTopic - contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic) - - if subscribe: - info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics - wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics) - else: - info "removed filter subscritpiton", peerId=peerId, contentTopics=contentTopics - wf.subscriptions.removeSubscription(peerId, contentTopics) - - waku_filter_subscribers.set(wf.subscriptions.len.int64) - - -proc initProtocolHandler(wf: WakuFilter) = - proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let buffer = await conn.readLp(MaxRpcSize.int) - - let decodeRpcRes = FilterRPC.decode(buffer) - if decodeRpcRes.isErr(): - waku_filter_errors.inc(labelValues = [decodeRpcFailure]) - return - - trace "filter message received" - - let rpc = decodeRpcRes.get() - - ## Filter request - # Subscription/unsubscription request - if rpc.request.isNone(): - waku_filter_errors.inc(labelValues = [emptyFilterRequestFailure]) - # TODO: Manage the empty filter request message error. Perform any action? - return - - waku_filter_messages.inc(labelValues = ["FilterRequest"]) - wf.handleFilterRequest(conn.peerId, rpc) - - wf.handler = handler - wf.codec = WakuFilterCodec - -proc new*(T: type WakuFilter, - peerManager: PeerManager, - rng: ref rand.HmacDrbgContext, - timeout: Duration = WakuFilterTimeout): T = - let wf = WakuFilter(rng: rng, - peerManager: peerManager, - timeout: timeout) - wf.initProtocolHandler() - return wf - -proc init*(T: type WakuFilter, - peerManager: PeerManager, - rng: ref rand.HmacDrbgContext, - timeout: Duration = WakuFilterTimeout): T {. - deprecated: "WakuFilter.new()' instead".} = - WakuFilter.new(peerManager, rng, timeout) - - -proc sendFilterRpc(wf: WakuFilter, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) - if connOpt.isNone(): - return err(dialFailure) - let connection = connOpt.get() - - await connection.writeLP(rpc.encode().buffer) - return ok() - - -### Send message to subscriptors -proc removePeerFromFailedPeersTable(wf: WakuFilter, subs: seq[Subscription]) = - ## Clear the failed peer table if subscriber was able to connect - for sub in subs: - wf.failedPeers.del($sub) - -proc handleClientError(wf: WakuFilter, subs: seq[Subscription]) {.raises: [Defect, KeyError].} = - ## If we have already failed to send message to this peer, - ## check for elapsed time and if it's been too long, remove the peer. - for sub in subs: - let subKey: string = $(sub) - - if not wf.failedPeers.hasKey(subKey): - # add the peer to the failed peers table. - wf.failedPeers[subKey] = Moment.now() - return - - let elapsedTime = Moment.now() - wf.failedPeers[subKey] - if elapsedTime > wf.timeout: - wf.failedPeers.del(subKey) - - let index = wf.subscriptions.find(sub) - wf.subscriptions.delete(index) - - -proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = - - trace "handling message", pubsubTopic, contentTopic=msg.contentTopic, subscriptions=wf.subscriptions.len - - if wf.subscriptions.len <= 0: - return - - var failedSubscriptions: seq[Subscription] - var connectedSubscriptions: seq[Subscription] - - for sub in wf.subscriptions: - # TODO: Review when pubsubTopic can be empty and if it is a valid case - if sub.pubSubTopic != "" and sub.pubSubTopic != pubsubTopic: - continue - - if msg.contentTopic notin sub.contentTopics: - continue - - let rpc = FilterRPC( - requestId: sub.requestId, - push: some(MessagePush(messages: @[msg])) - ) - - let res = await wf.sendFilterRpc(rpc, sub.peer) - if res.isErr(): - waku_filter_errors.inc(labelValues = [res.error()]) - failedSubscriptions.add(sub) - continue - - connectedSubscriptions.add(sub) - - wf.removePeerFromFailedPeersTable(connectedSubscriptions) - - wf.handleClientError(failedSubscriptions) +import + std/[options, sets, tables, sequtils], + stew/results, + chronicles, + chronos, + metrics, + bearssl/rand, + libp2p/protocols/protocol, + libp2p/crypto/crypto +import + ../waku_message, + ../../node/peer_manager, + ./rpc, + ./rpc_codec, + ./protocol_metrics + + +logScope: + topics = "waku filter" + + +const + WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" + + WakuFilterTimeout: Duration = 2.hours + + +type WakuFilterResult*[T] = Result[T, string] + + +## Subscription manager + +type Subscription = object + requestId: string + peer: PeerID + pubsubTopic: PubsubTopic + contentTopics: HashSet[ContentTopic] + + +proc addSubscription(subscriptions: var seq[Subscription], peer: PeerID, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]) = + let subscription = Subscription( + requestId: requestId, + peer: peer, + pubsubTopic: pubsubTopic, + contentTopics: toHashSet(contentTopics) + ) + subscriptions.add(subscription) + +proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsubscribeTopics: seq[ContentTopic]) = + for sub in subscriptions.mitems: + if sub.peer != peer: + continue + + sub.contentTopics.excl(toHashSet(unsubscribeTopics)) + + # Delete the subscriber if no more content filters left + subscriptions.keepItIf(it.contentTopics.len > 0) + + +## Protocol + +type + MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} + + WakuFilterLegacy* = ref object of LPProtocol + rng*: ref rand.HmacDrbgContext + peerManager*: PeerManager + subscriptions*: seq[Subscription] + failedPeers*: Table[string, chronos.Moment] + timeout*: chronos.Duration + +proc handleFilterRequest(wf: WakuFilterLegacy, peerId: PeerId, rpc: FilterRPC) = + let + requestId = rpc.requestId + subscribe = rpc.request.get().subscribe + pubsubTopic = rpc.request.get().pubsubTopic + contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic) + + if subscribe: + info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics + wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics) + else: + info "removed filter subscritpiton", peerId=peerId, contentTopics=contentTopics + wf.subscriptions.removeSubscription(peerId, contentTopics) + + waku_legacy_filter_subscribers.set(wf.subscriptions.len.int64) + + +proc initProtocolHandler(wf: WakuFilterLegacy) = + proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let buffer = await conn.readLp(MaxRpcSize.int) + + let decodeRpcRes = FilterRPC.decode(buffer) + if decodeRpcRes.isErr(): + waku_legacy_filter_errors.inc(labelValues = [decodeRpcFailure]) + return + + trace "filter message received" + + let rpc = decodeRpcRes.get() + + ## Filter request + # Subscription/unsubscription request + if rpc.request.isNone(): + waku_legacy_filter_errors.inc(labelValues = [emptyFilterRequestFailure]) + # TODO: Manage the empty filter request message error. Perform any action? + return + + waku_legacy_filter_messages.inc(labelValues = ["FilterRequest"]) + wf.handleFilterRequest(conn.peerId, rpc) + + wf.handler = handler + wf.codec = WakuFilterCodec + +proc new*(T: type WakuFilterLegacy, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + timeout: Duration = WakuFilterTimeout): T = + let wf = WakuFilterLegacy(rng: rng, + peerManager: peerManager, + timeout: timeout) + wf.initProtocolHandler() + return wf + +proc init*(T: type WakuFilterLegacy, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + timeout: Duration = WakuFilterTimeout): T {. + deprecated: "WakuFilterLegacy.new()' instead".} = + WakuFilterLegacy.new(peerManager, rng, timeout) + + +proc sendFilterRpc(wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= + let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + if connOpt.isNone(): + return err(dialFailure) + let connection = connOpt.get() + + await connection.writeLP(rpc.encode().buffer) + return ok() + + +### Send message to subscriptors +proc removePeerFromFailedPeersTable(wf: WakuFilterLegacy, subs: seq[Subscription]) = + ## Clear the failed peer table if subscriber was able to connect + for sub in subs: + wf.failedPeers.del($sub) + +proc handleClientError(wf: WakuFilterLegacy, subs: seq[Subscription]) {.raises: [Defect, KeyError].} = + ## If we have already failed to send message to this peer, + ## check for elapsed time and if it's been too long, remove the peer. + for sub in subs: + let subKey: string = $(sub) + + if not wf.failedPeers.hasKey(subKey): + # add the peer to the failed peers table. + wf.failedPeers[subKey] = Moment.now() + return + + let elapsedTime = Moment.now() - wf.failedPeers[subKey] + if elapsedTime > wf.timeout: + wf.failedPeers.del(subKey) + + let index = wf.subscriptions.find(sub) + wf.subscriptions.delete(index) + + +proc handleMessage*(wf: WakuFilterLegacy, pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = + + trace "handling message", pubsubTopic, contentTopic=msg.contentTopic, subscriptions=wf.subscriptions.len + + if wf.subscriptions.len <= 0: + return + + var failedSubscriptions: seq[Subscription] + var connectedSubscriptions: seq[Subscription] + + for sub in wf.subscriptions: + # TODO: Review when pubsubTopic can be empty and if it is a valid case + if sub.pubSubTopic != "" and sub.pubSubTopic != pubsubTopic: + continue + + if msg.contentTopic notin sub.contentTopics: + continue + + let rpc = FilterRPC( + requestId: sub.requestId, + push: some(MessagePush(messages: @[msg])) + ) + + let res = await wf.sendFilterRpc(rpc, sub.peer) + if res.isErr(): + waku_legacy_filter_errors.inc(labelValues = [res.error()]) + failedSubscriptions.add(sub) + continue + + connectedSubscriptions.add(sub) + + wf.removePeerFromFailedPeersTable(connectedSubscriptions) + + wf.handleClientError(failedSubscriptions) diff --git a/waku/v2/protocol/waku_filter/protocol_metrics.nim b/waku/v2/protocol/waku_filter/protocol_metrics.nim index de266e16b..deb9dd158 100644 --- a/waku/v2/protocol/waku_filter/protocol_metrics.nim +++ b/waku/v2/protocol/waku_filter/protocol_metrics.nim @@ -6,9 +6,9 @@ else: import metrics -declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers" -declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] -declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"] +declarePublicGauge waku_legacy_filter_subscribers, "number of light node filter subscribers" +declarePublicGauge waku_legacy_filter_errors, "number of filter protocol errors", ["type"] +declarePublicGauge waku_legacy_filter_messages, "number of filter messages received", ["type"] declarePublicGauge waku_node_filters, "number of content filter subscriptions" diff --git a/waku/v2/protocol/waku_filter/rpc.nim b/waku/v2/protocol/waku_filter/rpc.nim index 39c842421..28b975a9d 100644 --- a/waku/v2/protocol/waku_filter/rpc.nim +++ b/waku/v2/protocol/waku_filter/rpc.nim @@ -1,27 +1,27 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/options -import - ../waku_message - - -type - ContentFilter* = object - contentTopic*: string - - FilterRequest* = object - contentFilters*: seq[ContentFilter] - pubsubTopic*: string - subscribe*: bool - - MessagePush* = object - messages*: seq[WakuMessage] - - FilterRPC* = object - requestId*: string - request*: Option[FilterRequest] - push*: Option[MessagePush] +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options +import + ../waku_message + + +type + ContentFilter* = object + contentTopic*: string + + FilterRequest* = object + contentFilters*: seq[ContentFilter] + pubsubTopic*: string + subscribe*: bool + + MessagePush* = object + messages*: seq[WakuMessage] + + FilterRPC* = object + requestId*: string + request*: Option[FilterRequest] + push*: Option[MessagePush] diff --git a/waku/v2/protocol/waku_filter_v2/protocol.nim b/waku/v2/protocol/waku_filter_v2/protocol.nim index bc73ed658..de1af7d30 100644 --- a/waku/v2/protocol/waku_filter_v2/protocol.nim +++ b/waku/v2/protocol/waku_filter_v2/protocol.nim @@ -6,7 +6,7 @@ else: {.push raises: [].} import - std/[options,sequtils,sets,tables], + std/[options,sequtils,sets,strutils,tables], chronicles, chronos, libp2p/peerid, @@ -24,12 +24,13 @@ logScope: topics = "waku filter" const - MaxContentTopicsPerRequest = 30 + MaxContentTopicsPerRequest* = 30 type WakuFilter* = ref object of LPProtocol subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria peerManager: PeerManager + maintenanceTask: TimerCallback proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = trace "pinging subscriber", peerId=peerId @@ -52,6 +53,7 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria if peerId in wf.subscriptions: + # We already have a subscription for this peer. Try to add the new filter criteria. var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]()) if peerSubscription.len() + filterCriteria.len() >= MaxCriteriaPerSubscription: return err(FilterSubscribeError.serviceUnavailable("peer has reached maximum number of filter criteria")) @@ -59,6 +61,7 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], peerSubscription.incl(filterCriteria) wf.subscriptions[peerId] = peerSubscription else: + # We don't have a subscription for this peer yet. Try to add it. if wf.subscriptions.len() >= MaxTotalSubscriptions: return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions")) debug "creating new subscription", peerId=peerId @@ -78,7 +81,7 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria if peerId notin wf.subscriptions: - debug "unsubscibing peer has no subscriptions", peerId=peerId + debug "unsubscribing peer has no subscriptions", peerId=peerId return err(FilterSubscribeError.notFound()) var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]()) @@ -95,7 +98,7 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = if peerId notin wf.subscriptions: - debug "unsubscibing peer has no subscriptions", peerId=peerId + debug "unsubscribing peer has no subscriptions", peerId=peerId return err(FilterSubscribeError.notFound()) debug "removing peer subscription", peerId=peerId @@ -169,6 +172,7 @@ proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) { proc maintainSubscriptions*(wf: WakuFilter) = trace "maintaining subscriptions" + ## Remove subscriptions for peers that have been removed from peer store var peersToRemove: seq[PeerId] for peerId, peerSubscription in wf.subscriptions.pairs(): ## TODO: currently we only maintain by syncing with peer store. We could @@ -177,7 +181,11 @@ proc maintainSubscriptions*(wf: WakuFilter) = debug "peer has been removed from peer store, removing subscription", peerId=peerId peersToRemove.add(peerId) - wf.subscriptions.removePeers(peersToRemove) + if peersToRemove.len() > 0: + wf.subscriptions.removePeers(peersToRemove) + + ## Periodic report of number of subscriptions + waku_filter_subscriptions.set(wf.subscriptions.len().float64) const MessagePushTimeout = 20.seconds proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} = @@ -208,6 +216,8 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa proc initProtocolHandler(wf: WakuFilter) = proc handler(conn: Connection, proto: string) {.async.} = + trace "filter subscribe request handler triggered", peerId=conn.peerId + let buf = await conn.readLp(MaxSubscribeSize) let decodeRes = FilterSubscribeRequest.decode(buf) @@ -220,6 +230,8 @@ proc initProtocolHandler(wf: WakuFilter) = let response = wf.handleSubscribeRequest(conn.peerId, request) + debug "sending filter subscribe response", peerId=conn.peerId, response=response + await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here return @@ -242,9 +254,9 @@ proc startMaintainingSubscriptions*(wf: WakuFilter, interval: Duration) = var maintainSubs: proc(udata: pointer) {.gcsafe, raises: [Defect].} maintainSubs = proc(udata: pointer) {.gcsafe.} = maintainSubscriptions(wf) - discard setTimer(Moment.fromNow(interval), maintainSubs) + wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs) - discard setTimer(Moment.fromNow(interval), maintainSubs) + wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs) method start*(wf: WakuFilter) {.async.} = debug "starting filter protocol" @@ -254,4 +266,5 @@ method start*(wf: WakuFilter) {.async.} = method stop*(wf: WakuFilter) {.async.} = debug "stopping filter protocol" + wf.maintenanceTask.clearTimer() await procCall LPProtocol(wf).stop() diff --git a/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim b/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim index 16a4c9bba..894407d53 100644 --- a/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim +++ b/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim @@ -9,6 +9,7 @@ export metrics declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] declarePublicGauge waku_filter_requests, "number of filter subscribe requests received", ["type"] +declarePublicGauge waku_filter_subscriptions, "number of subscribed filter clients" declarePublicHistogram waku_filter_request_duration_seconds, "duration of filter subscribe requests", ["type"] declarePublicHistogram waku_filter_handle_message_duration_seconds, "duration to push message to filter subscribers"