diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index a70a21a88..c04e7b759 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -37,7 +37,6 @@ import ../../waku/waku_core, ../../waku/waku_lightpush/common, ../../waku/waku_lightpush/rpc, - ../../waku/waku_filter, ../../waku/waku_enr, ../../waku/waku_store, ../../waku/waku_dnsdisc, @@ -283,17 +282,6 @@ proc writeAndPrint(c: Chat) {.async.} = c.nick = await readNick(c.transp) echo "You are now known as " & c.nick elif line.startsWith("/exit"): - if not c.node.wakuFilterLegacy.isNil(): - echo "unsubscribing from content filters..." - - let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec) - if peerOpt.isSome(): - await c.node.legacyFilterUnsubscribe( - pubsubTopic = some(DefaultPubsubTopic), - contentTopics = c.contentTopic, - peer = peerOpt.get(), - ) - echo "quitting..." try: @@ -514,9 +502,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = let peerInfo = parsePeerInfo(conf.filternode) if peerInfo.isOk(): await node.mountFilter() - await node.mountLegacyFilter() await node.mountFilterClient() - node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec) proc filterHandler( pubsubTopic: PubsubTopic, msg: WakuMessage @@ -524,14 +510,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = trace "Hit filter handler", contentTopic = msg.contentTopic chat.printReceivedMessage(msg) - await node.legacyFilterSubscribe( - pubsubTopic = some(DefaultPubsubTopic), - contentTopics = chat.contentTopic, - filterHandler, - peerInfo.value, - ) - # TODO: Here to support FilterV2 relevant subscription, but still - # Legacy Filter is concurrent to V2 untill legacy filter will be removed + # TODO: Here to support FilterV2 relevant subscription. else: error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 97e0f328f..6a0aaef6c 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -23,7 +23,6 @@ import ../../../waku/waku_core, ../../../waku/waku_node, ../../../waku/node/peer_manager, - ../../waku/waku_filter, ../../waku/waku_filter_v2, ../../waku/waku_store, ../../waku/factory/builder, @@ -294,7 +293,6 @@ when isMainModule: if conf.filter: waitFor mountFilter(bridge.nodev2) - waitFor mountLegacyFilter(bridge.nodev2) if conf.staticnodes.len > 0: waitFor connectToNodes(bridge.nodev2, conf.staticnodes) @@ -309,7 +307,6 @@ when isMainModule: if conf.filternode != "": let filterPeer = parsePeerInfo(conf.filternode) if filterPeer.isOk(): - bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec) bridge.nodev2.peerManager.addServicePeer( filterPeer.value, WakuFilterSubscribeCodec ) diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index eba83297b..5f16b5198 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -34,7 +34,6 @@ import ../../waku/waku_api/rest/server, ../../waku/waku_api/rest/debug/handlers as rest_debug_api, ../../waku/waku_api/rest/relay/handlers as rest_relay_api, - ../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api, ../../waku/waku_api/rest/filter/handlers as rest_filter_api, ../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api, ../../waku/waku_api/rest/store/handlers as rest_store_api, @@ -48,7 +47,6 @@ import ../../waku/waku_rln_relay, ../../waku/waku_store, ../../waku/waku_lightpush/common, - ../../waku/waku_filter, ../../waku/waku_filter_v2, ../../waku/factory/node_factory, ../../waku/factory/internal_config, @@ -368,12 +366,8 @@ proc startRestServer( "/relay endpoints are not available. Please check your configuration: --relay" ## Filter REST API - if conf.filternode != "" and app.node.wakuFilterClient != nil and - app.node.wakuFilterClientLegacy != nil: - let legacyFilterCache = MessageCache.init() - rest_legacy_filter_api.installLegacyFilterRestApiHandlers( - server.router, app.node, legacyFilterCache - ) + if conf.filternode != "" and + app.node.wakuFilterClient != nil: let filterCache = MessageCache.init() diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 86f2dc762..36cfba9cd 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -48,9 +48,6 @@ import # Waku v2 tests ./test_wakunode, ./test_wakunode_lightpush, - # Waku Filter - ./test_waku_filter_legacy, - ./test_wakunode_filter_legacy, ./test_peer_store_extended, ./test_message_cache, ./test_peer_manager, @@ -77,7 +74,6 @@ import ./wakunode_rest/test_rest_serdes, ./wakunode_rest/test_rest_store, ./wakunode_rest/test_rest_filter, - ./wakunode_rest/test_rest_legacy_filter, ./wakunode_rest/test_rest_lightpush, ./wakunode_rest/test_rest_admin, ./wakunode_rest/test_rest_cors diff --git a/tests/node/test_wakunode_filter.nim b/tests/node/test_wakunode_filter.nim index f29b9f252..6bc554232 100644 --- a/tests/node/test_wakunode_filter.nim +++ b/tests/node/test_wakunode_filter.nim @@ -55,7 +55,6 @@ suite "Waku Filter - End to End": await allFutures(server.start(), client.start()) await server.mountFilter() - await server.mountLegacyFilter() await client.mountFilterClient() client.wakuFilterClient.registerPushHandler(messagePushHandler) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 304942bf3..d16624e75 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -24,8 +24,8 @@ import ../../waku/waku_core, ../../waku/waku_enr/capabilities, ../../waku/waku_relay/protocol, + ../../waku/waku_filter_v2/common, ../../waku/waku_store/common, - ../../waku/waku_filter/protocol, ../../waku/waku_lightpush/common, ../../waku/waku_peer_exchange, ../../waku/waku_metadata, @@ -63,11 +63,10 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountFilter())) - await allFutures(nodes.mapIt(it.mountLegacyFilter())) # Dial node2 from node1 let conn = await nodes[0].peerManager.dialPeer( - nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec + nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec ) await sleepAsync(chronos.milliseconds(500)) @@ -107,13 +106,13 @@ procSuite "Peer Manager": # Dial non-existent peer from node1 let conn1 = - await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec) + await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuStoreCodec) check: conn1.isNone() # Dial peer not supporting given protocol let conn2 = await nodes[0].peerManager.dialPeer( - nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec + nodes[1].peerInfo.toRemotePeerInfo(), WakuStoreCodec ) check: conn2.isNone() @@ -134,20 +133,17 @@ procSuite "Peer Manager": await node.start() - await node.mountFilterClient() - node.mountStoreClient() - node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) node.peerManager.addServicePeer( - filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec + filterPeer.toRemotePeerInfo(), WakuFilterSubscribeCodec ) # Check peers were successfully added to peer manager check: node.peerManager.peerStore.peers().len == 2 - node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt( + node.peerManager.peerStore.peers(WakuFilterSubscribeCodec).allIt( it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and - it.protocols.contains(WakuLegacyFilterCodec) + it.protocols.contains(WakuFilterSubscribeCodec) ) node.peerManager.peerStore.peers(WakuStoreCodec).allIt( it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and @@ -762,39 +758,36 @@ procSuite "Peer Manager": let node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) - peers = toSeq(1 .. 5) + peers = toSeq(1 .. 4) .mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it)) .filterIt(it.isOk()) .mapIt(it.value) require: - peers.len == 5 + peers.len == 4 # service peers node.peerManager.addServicePeer(peers[0], WakuStoreCodec) - node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec) - node.peerManager.addServicePeer(peers[2], WakuLightPushCodec) - node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec) + node.peerManager.addServicePeer(peers[1], WakuLightPushCodec) + node.peerManager.addServicePeer(peers[2], WakuPeerExchangeCodec) # relay peers (should not be added) - node.peerManager.addServicePeer(peers[4], WakuRelayCodec) + node.peerManager.addServicePeer(peers[3], WakuRelayCodec) # all peers are stored in the peerstore check: node.peerManager.peerStore.peers().anyIt(it.peerId == peers[0].peerId) node.peerManager.peerStore.peers().anyIt(it.peerId == peers[1].peerId) node.peerManager.peerStore.peers().anyIt(it.peerId == peers[2].peerId) - node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId) # but the relay peer is not - node.peerManager.peerStore.peers().anyIt(it.peerId == peers[4].peerId) == false + node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId) == false # all service peers are added to its service slot check: node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId - node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId - node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId - node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId + node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[1].peerId + node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[2].peerId # but the relay peer is not node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false @@ -809,24 +802,23 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountFilter())) - await allFutures(nodes.mapIt(it.mountLegacyFilter())) let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) # create some connections/streams - require: + check: # some relay connections (await nodes[0].peerManager.connectRelay(pInfos[1])) == true (await nodes[0].peerManager.connectRelay(pInfos[2])) == true (await nodes[1].peerManager.connectRelay(pInfos[2])) == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == + (await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterSubscribeCodec)).isSome() == true # isolated dial creates a relay conn under the hood (libp2p behaviour) - (await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == + (await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterSubscribeCodec)).isSome() == true # assert physical connections @@ -834,26 +826,26 @@ procSuite "Peer Manager": nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0 nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2 - nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0 - nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2 + nodes[0].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 0 + nodes[0].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 2 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 - nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 - nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0 + nodes[1].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1 + nodes[1].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 0 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 - nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 - nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1 + nodes[2].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1 + nodes[2].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 1 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0 - nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 - nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0 + nodes[3].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1 + nodes[3].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 0 asyncTest "getNumStreams() returns expected number of connections per protocol": # Create 2 nodes @@ -865,28 +857,27 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.mountFilter())) - await allFutures(nodes.mapIt(it.mountLegacyFilter())) let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo()) require: # multiple streams are multiplexed over a single connection. # note that a relay connection is created under the hood when dialing a peer (libp2p behaviour) - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() == true check: nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) - nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4) + nodes[0].peerManager.getNumStreams(WakuFilterSubscribeCodec) == (0, 4) nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) - nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0) + nodes[1].peerManager.getNumStreams(WakuFilterSubscribeCodec) == (4, 0) test "selectPeer() returns the correct peer": # Valid peer id missing the last digit @@ -909,7 +900,7 @@ procSuite "Peer Manager": # Add a peer[0] to the peerstore pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs pm.peerStore[ProtoBook][peers[0].peerId] = - @[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec] + @[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec] # When no service peers, we get one from the peerstore let selectedPeer1 = pm.selectPeer(WakuStoreCodec) @@ -918,7 +909,7 @@ procSuite "Peer Manager": selectedPeer1.get().peerId == peers[0].peerId # Same for other protocol - let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec) + let selectedPeer2 = pm.selectPeer(WakuFilterSubscribeCodec) check: selectedPeer2.isSome() == true selectedPeer2.get().peerId == peers[0].peerId diff --git a/tests/test_waku_filter_legacy.nim b/tests/test_waku_filter_legacy.nim deleted file mode 100644 index 9cb222d3a..000000000 --- a/tests/test_waku_filter_legacy.nim +++ /dev/null @@ -1,300 +0,0 @@ -{.used.} - -import - std/[options, tables], testutils/unittests, chronicles, chronos, libp2p/crypto/crypto -import - ../../waku/node/peer_manager, - ../../waku/waku_core, - ../../waku/waku_filter, - ../../waku/waku_filter/client, - ./testlib/common, - ./testlib/wakucore - -proc newTestWakuFilterNode( - switch: Switch, timeout: Duration = 2.hours -): Future[WakuFilterLegacy] {.async.} = - let - peerManager = PeerManager.new(switch) - proto = WakuFilterLegacy.new(peerManager, rng, timeout) - - await proto.start() - switch.mount(proto) - - return proto - -proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClientLegacy] {.async.} = - let - peerManager = PeerManager.new(switch) - proto = WakuFilterClientLegacy.new(peerManager, rng) - - await proto.start() - switch.mount(proto) - - return proto - -# TODO: Extend test coverage -suite "Waku Filter": - asyncTest "should forward messages to client after subscribed": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuFilterNode(serverSwitch) - client = await newTestWakuFilterClient(clientSwitch) - - ## Given - let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - - let pushHandlerFuture = newFuture[(string, WakuMessage)]() - proc pushHandler( - pubsubTopic: PubsubTopic, message: WakuMessage - ) {.async, gcsafe, closure.} = - pushHandlerFuture.complete((pubsubTopic, message)) - - let - pubsubTopic = DefaultPubsubTopic - contentTopic = "test-content-topic" - msg = fakeWakuMessage(contentTopic = contentTopic) - - ## When - require ( - await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer = serverAddr) - ).isOk() - - # WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc - await sleepAsync(500.milliseconds) - - await server.handleMessage(pubsubTopic, msg) - - require await pushHandlerFuture.withTimeout(3.seconds) - - ## Then - let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() - check: - pushedMsgPubsubTopic == pubsubTopic - pushedMsg == msg - - ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitch.stop()) - - asyncTest "should not forward messages to client after unsuscribed": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuFilterNode(serverSwitch) - client = await newTestWakuFilterClient(clientSwitch) - - ## Given - let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - - var pushHandlerFuture = newFuture[void]() - proc pushHandler( - pubsubTopic: PubsubTopic, message: WakuMessage - ) {.async, gcsafe, closure.} = - pushHandlerFuture.complete() - - let - pubsubTopic = DefaultPubsubTopic - contentTopic = "test-content-topic" - msg = fakeWakuMessage(contentTopic = contentTopic) - - ## When - require ( - await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer = serverAddr) - ).isOk() - - # WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc - await sleepAsync(500.milliseconds) - - await server.handleMessage(pubsubTopic, msg) - - require await pushHandlerFuture.withTimeout(1.seconds) - - # Reset to test unsubscribe - pushHandlerFuture = newFuture[void]() - - require (await client.unsubscribe(pubsubTopic, contentTopic, peer = serverAddr)).isOk() - - # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc - await sleepAsync(500.milliseconds) - - await server.handleMessage(pubsubTopic, msg) - - ## Then - let handlerWasCalledAfterUnsubscription = - await pushHandlerFuture.withTimeout(1.seconds) - check: - not handlerWasCalledAfterUnsubscription - - ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitch.stop()) - - asyncTest "peer subscription should be dropped if connection fails for second time after the timeout has elapsed": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuFilterNode(serverSwitch, timeout = 200.milliseconds) - client = await newTestWakuFilterClient(clientSwitch) - - ## Given - let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - - var pushHandlerFuture = newFuture[void]() - proc pushHandler( - pubsubTopic: PubsubTopic, message: WakuMessage - ) {.async, gcsafe, closure.} = - pushHandlerFuture.complete() - - let - pubsubTopic = DefaultPubsubTopic - contentTopic = "test-content-topic" - msg = fakeWakuMessage(contentTopic = contentTopic) - - ## When - require ( - await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer = serverAddr) - ).isOk() - - # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc - await sleepAsync(500.milliseconds) - - await server.handleMessage(DefaultPubsubTopic, msg) - - # Push handler should be called - require await pushHandlerFuture.withTimeout(1.seconds) - - # Stop client node to test timeout unsubscription - await clientSwitch.stop() - - await sleepAsync(500.milliseconds) - - # First failure should not remove the subscription - await server.handleMessage(DefaultPubsubTopic, msg) - let - subscriptionsBeforeTimeout = server.subscriptions.len() - failedPeersBeforeTimeout = server.failedPeers.len() - - # Wait for the configured peer connection timeout to elapse (200ms) - await sleepAsync(200.milliseconds) - - # Second failure should remove the subscription - await server.handleMessage(DefaultPubsubTopic, msg) - let - subscriptionsAfterTimeout = server.subscriptions.len() - failedPeersAfterTimeout = server.failedPeers.len() - - ## Then - check: - subscriptionsBeforeTimeout == 1 - failedPeersBeforeTimeout == 1 - subscriptionsAfterTimeout == 0 - failedPeersAfterTimeout == 0 - - ## Cleanup - await serverSwitch.stop() - - asyncTest "peer subscription should not be dropped if connection recovers before timeout elapses": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuFilterNode(serverSwitch, timeout = 200.milliseconds) - client = await newTestWakuFilterClient(clientSwitch) - - ## Given - let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - - var pushHandlerFuture = newFuture[void]() - proc pushHandler( - pubsubTopic: PubsubTopic, message: WakuMessage - ) {.async, gcsafe, closure.} = - pushHandlerFuture.complete() - - let - pubsubTopic = DefaultPubsubTopic - contentTopic = "test-content-topic" - msg = fakeWakuMessage(contentTopic = contentTopic) - - ## When - require ( - await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer = serverAddr) - ).isOk() - - # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc - await sleepAsync(500.milliseconds) - - await server.handleMessage(DefaultPubsubTopic, msg) - - # Push handler should be called - require await pushHandlerFuture.withTimeout(1.seconds) - - let - subscriptionsBeforeFailure = server.subscriptions.len() - failedPeersBeforeFailure = server.failedPeers.len() - - # Stop switch to test unsubscribe - await clientSwitch.stop() - - await sleepAsync(500.milliseconds) - - # First failure should add to failure list - await server.handleMessage(DefaultPubsubTopic, msg) - - pushHandlerFuture = newFuture[void]() - - let - subscriptionsAfterFailure = server.subscriptions.len() - failedPeersAfterFailure = server.failedPeers.len() - - await sleepAsync(100.milliseconds) - - # Start switch with same key as before - let clientSwitch2 = newTestSwitch( - some(clientSwitch.peerInfo.privateKey), some(clientSwitch.peerInfo.addrs[0]) - ) - await clientSwitch2.start() - await client.start() - clientSwitch2.mount(client) - - # If push succeeds after failure, the peer should removed from failed peers list - await server.handleMessage(DefaultPubsubTopic, msg) - let handlerShouldHaveBeenCalled = await pushHandlerFuture.withTimeout(1.seconds) - - let - subscriptionsAfterSuccessfulConnection = server.subscriptions.len() - failedPeersAfterSuccessfulConnection = server.failedPeers.len() - - ## Then - check: - handlerShouldHaveBeenCalled - - check: - subscriptionsBeforeFailure == 1 - subscriptionsAfterFailure == 1 - subscriptionsAfterSuccessfulConnection == 1 - - check: - failedPeersBeforeFailure == 0 - failedPeersAfterFailure == 1 - failedPeersAfterSuccessfulConnection == 0 - - ## Cleanup - await allFutures(clientSwitch2.stop(), serverSwitch.stop()) diff --git a/tests/test_wakunode_filter_legacy.nim b/tests/test_wakunode_filter_legacy.nim deleted file mode 100644 index 3f7e1d671..000000000 --- a/tests/test_wakunode_filter_legacy.nim +++ /dev/null @@ -1,67 +0,0 @@ -{.used.} - -import - std/options, - stew/shims/net as stewNet, - testutils/unittests, - chronicles, - chronos, - libp2p/crypto/crypto -import - ../../waku/waku_core, - ../../waku/node/peer_manager, - ../../waku/waku_node, - ./testlib/common, - ./testlib/wakucore, - ./testlib/wakunode - -suite "WakuNode - Filter": - asyncTest "subscriber should receive the message handled by the publisher": - ## Setup - let - serverKey = generateSecp256k1Key() - server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) - clientKey = generateSecp256k1Key() - client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0)) - - waitFor allFutures(server.start(), client.start()) - - waitFor server.mountFilter() - waitFor server.mountLegacyFilter() - waitFor client.mountFilterClient() - - ## Given - let serverPeerInfo = server.peerInfo.toRemotePeerInfo() - - let - pubSubTopic = DefaultPubsubTopic - contentTopic = DefaultContentTopic - message = fakeWakuMessage(contentTopic = contentTopic) - - var filterPushHandlerFut = newFuture[(PubsubTopic, WakuMessage)]() - proc filterPushHandler( - pubsubTopic: PubsubTopic, msg: WakuMessage - ) {.async, gcsafe, closure.} = - filterPushHandlerFut.complete((pubsubTopic, msg)) - - ## When - await client.legacyFilterSubscribe( - some(pubsubTopic), contentTopic, filterPushHandler, peer = serverPeerInfo - ) - - # Wait for subscription to take effect - waitFor sleepAsync(100.millis) - - waitFor server.filterHandleMessage(pubSubTopic, message) - - require waitFor filterPushHandlerFut.withTimeout(5.seconds) - - ## Then - check filterPushHandlerFut.completed() - let (filterPubsubTopic, filterMessage) = filterPushHandlerFut.read() - check: - filterPubsubTopic == pubsubTopic - filterMessage == message - - ## Cleanup - waitFor allFutures(client.stop(), server.stop()) diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index 25d86d8c2..f356d2f1e 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -10,8 +10,8 @@ import libp2p/peerstore import - ../../../waku/[node/peer_manager, waku_core, waku_filter/rpc_codec], - ../../../waku/waku_filter_v2/[common, client, subscriptions, protocol], + ../../../waku/[node/peer_manager, waku_core], + ../../../waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec], ../testlib/[wakucore, testasync, testutils, futures, sequtils], ./waku_filter_utils, ../resources/payloads @@ -127,7 +127,7 @@ suite "Waku Filter - End to End": asyncTest "Subscribing to an empty content topic": # When subscribing to an empty content topic let subscribeResponse = - await wakuFilterClient.subscribe(serverRemotePeerInfo, pubsubTopic, @[]) + await wakuFilterClient.subscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()) # Then the subscription is not successful check: @@ -1839,7 +1839,7 @@ suite "Waku Filter - End to End": # When unsubscribing from an empty content topic let unsubscribeResponse = - await wakuFilterClient.unsubscribe(serverRemotePeerInfo, pubsubTopic, @[]) + await wakuFilterClient.unsubscribe(serverRemotePeerInfo, pubsubTopic, newSeq[ContentTopic]()) # Then the unsubscription is not successful check: @@ -2076,10 +2076,10 @@ suite "Waku Filter - End to End": contentTopic = contentTopic, payload = getByteSequence(100 * 1024) ) # 100KiB msg4 = fakeWakuMessage( - contentTopic = contentTopic, payload = getByteSequence(MaxRpcSize - 1024) + contentTopic = contentTopic, payload = getByteSequence(MaxPushSize - 1024) ) # Max Size (Inclusive Limit) msg5 = fakeWakuMessage( - contentTopic = contentTopic, payload = getByteSequence(MaxRpcSize) + contentTopic = contentTopic, payload = getByteSequence(MaxPushSize) ) # Max Size (Exclusive Limit) # When sending the 1KiB message @@ -2114,7 +2114,7 @@ suite "Waku Filter - End to End": pushedMsgPubsubTopic3 == pubsubTopic pushedMsg3 == msg3 - # When sending the MaxRpcSize - 1024B message + # When sending the MaxPushSize - 1024B message pushHandlerFuture = newPushHandlerFuture() # Clear previous future await wakuFilter.handleMessage(pubsubTopic, msg4) @@ -2125,7 +2125,7 @@ suite "Waku Filter - End to End": pushedMsgPubsubTopic4 == pubsubTopic pushedMsg4 == msg4 - # When sending the MaxRpcSize message + # When sending the MaxPushSize message pushHandlerFuture = newPushHandlerFuture() # Clear previous future await wakuFilter.handleMessage(pubsubTopic, msg5) diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index e404d9165..e8727d6f7 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -17,11 +17,13 @@ import ../../../waku/common/paging, ../../../waku/waku_core, ../../../waku/waku_core/message/digest, + ../../../waku/waku_core/subscription, ../../../waku/node/peer_manager, ../../../waku/waku_archive, ../../../waku/waku_archive/driver/sqlite_driver, + ../../../waku/waku_filter_v2, + ../../../waku/waku_filter_v2/client, ../../../waku/waku_store, - ../../../waku/waku_filter, ../../../waku/waku_node, ../waku_store/store_utils, ../waku_archive/archive_utils, @@ -217,7 +219,6 @@ procSuite "WakuNode - Store": waitFor allFutures(client.start(), server.start(), filterSource.start()) waitFor filterSource.mountFilter() - waitFor filterSource.mountLegacyFilter() let driver = newSqliteArchiveDriver() let mountArchiveRes = server.mountArchive(driver) @@ -238,19 +239,19 @@ procSuite "WakuNode - Store": proc filterHandler( pubsubTopic: PubsubTopic, msg: WakuMessage ) {.async, gcsafe, closure.} = + await server.wakuArchive.handleMessage(pubsubTopic, msg) filterFut.complete((pubsubTopic, msg)) - waitFor server.legacyFilterSubscribe( + server.wakuFilterClient.registerPushHandler(filterHandler) + let resp = waitFor server.filterSubscribe( some(DefaultPubsubTopic), DefaultContentTopic, - filterHandler, peer = filterSourcePeer, ) waitFor sleepAsync(100.millis) - # Send filter push message to server from source node - waitFor filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message) + waitFor filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message) # Wait for the server filter to receive the push message require waitFor filterFut.withTimeout(5.seconds) diff --git a/tests/wakunode_rest/test_all.nim b/tests/wakunode_rest/test_all.nim index 9829a78f2..9c3de0f13 100644 --- a/tests/wakunode_rest/test_all.nim +++ b/tests/wakunode_rest/test_all.nim @@ -5,7 +5,6 @@ import ./test_rest_debug, ./test_rest_filter, ./test_rest_health, - ./test_rest_legacy_filter, ./test_rest_relay_serdes, ./test_rest_relay, ./test_rest_serdes, diff --git a/tests/wakunode_rest/test_rest_legacy_filter.nim b/tests/wakunode_rest/test_rest_legacy_filter.nim deleted file mode 100644 index 4f2fb3841..000000000 --- a/tests/wakunode_rest/test_rest_legacy_filter.nim +++ /dev/null @@ -1,194 +0,0 @@ -{.used.} - -import - std/sequtils, - stew/byteutils, - stew/shims/net, - testutils/unittests, - presto, - presto/client as presto_client, - libp2p/crypto/crypto -import - ../../waku/waku_api/message_cache, - ../../waku/common/base64, - ../../waku/waku_core, - ../../waku/waku_node, - ../../waku/node/peer_manager, - ../../waku/waku_filter, - ../../waku/waku_api/rest/server, - ../../waku/waku_api/rest/client, - ../../waku/waku_api/rest/responses, - ../../waku/waku_api/rest/filter/types, - ../../waku/waku_api/rest/filter/legacy_handlers as filter_api, - ../../waku/waku_api/rest/filter/legacy_client as filter_api_client, - ../../waku/waku_relay, - ../testlib/wakucore, - ../testlib/wakunode - -proc testWakuNode(): WakuNode = - let - privkey = generateSecp256k1Key() - bindIp = parseIpAddress("0.0.0.0") - extIp = parseIpAddress("127.0.0.1") - port = Port(0) - - return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) - -type RestFilterTest = object - filterNode: WakuNode - clientNode: WakuNode - restServer: WakuRestServerRef - messageCache: MessageCache - client: RestClientRef - -proc setupRestFilter(): Future[RestFilterTest] {.async.} = - result.filterNode = testWakuNode() - result.clientNode = testWakuNode() - - await allFutures(result.filterNode.start(), result.clientNode.start()) - - await result.filterNode.mountFilter() - await result.filterNode.mountLegacyFilter() - await result.clientNode.mountFilterClient() - - result.clientNode.peerManager.addServicePeer( - result.filterNode.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec - ) - - let restPort = Port(58011) - let restAddress = parseIpAddress("0.0.0.0") - result.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() - - result.messageCache = MessageCache.init() - installLegacyFilterRestApiHandlers( - result.restServer.router, result.clientNode, result.messageCache - ) - - result.restServer.start() - - result.client = newRestHttpClient(initTAddress(restAddress, restPort)) - - return result - -proc shutdown(self: RestFilterTest) {.async.} = - await self.restServer.stop() - await self.restServer.closeWait() - await allFutures(self.filterNode.stop(), self.clientNode.stop()) - -suite "Waku v2 Rest API - Filter": - asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": - # Given - let restFilterTest: RestFilterTest = await setupRestFilter() - - # When - let contentFilters = - @[DefaultContentTopic, ContentTopic("2"), ContentTopic("3"), ContentTopic("4")] - - let requestBody = FilterLegacySubscribeRequest( - contentFilters: contentFilters, pubsubTopic: some(DefaultPubsubTopic) - ) - let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) - - # Then - check: - response.status == 200 - $response.contentType == $MIMETYPE_TEXT - response.data == "OK" - - check: - restFilterTest.messageCache.isContentSubscribed(DefaultContentTopic) - restFilterTest.messageCache.isContentSubscribed("2") - restFilterTest.messageCache.isContentSubscribed("3") - restFilterTest.messageCache.isContentSubscribed("4") - - # When - error case - let badRequestBody = - FilterLegacySubscribeRequest(contentFilters: @[], pubsubTopic: none(string)) - let badResponse = - await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) - - check: - badResponse.status == 400 - $badResponse.contentType == $MIMETYPE_TEXT - badResponse.data == - "Invalid content body, could not decode. Unable to deserialize data" - - await restFilterTest.shutdown() - - asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": - # Given - let restFilterTest: RestFilterTest = await setupRestFilter() - - # When - restFilterTest.messageCache.contentSubscribe("1") - restFilterTest.messageCache.contentSubscribe("2") - restFilterTest.messageCache.contentSubscribe("3") - restFilterTest.messageCache.contentSubscribe("4") - - let contentFilters = - @[ - ContentTopic("1"), - ContentTopic("2"), - ContentTopic("3"), # ,ContentTopic("4") # Keep this subscription for check - ] - - # When - let requestBody = FilterLegacySubscribeRequest( - contentFilters: contentFilters, pubsubTopic: some(DefaultPubsubTopic) - ) - let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) - - # Then - check: - response.status == 200 - $response.contentType == $MIMETYPE_TEXT - response.data == "OK" - - check: - not restFilterTest.messageCache.isContentSubscribed("1") - not restFilterTest.messageCache.isContentSubscribed("2") - not restFilterTest.messageCache.isContentSubscribed("3") - restFilterTest.messageCache.isContentSubscribed("4") - - await restFilterTest.shutdown() - - asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": - # Given - - let restFilterTest = await setupRestFilter() - - let pubSubTopic = "/waku/2/default-waku/proto" - let contentTopic = ContentTopic("content-topic-x") - - var messages = - @[fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))] - - # Prevent duplicate messages - for i in 0 ..< 2: - var msg = - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) - - while msg == messages[i]: - msg = - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")) - - messages.add(msg) - - restFilterTest.messageCache.contentSubscribe(contentTopic) - for msg in messages: - restFilterTest.messageCache.addMessage(pubSubTopic, msg) - - # When - let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) - - # Then - check: - response.status == 200 - $response.contentType == $MIMETYPE_JSON - response.data.len == 3 - response.data.all do(msg: FilterWakuMessage) -> bool: - msg.payload == base64.encode("TEST-1") and - msg.contentTopic.get().string == "content-topic-x" and msg.version.get() == 2 and - msg.timestamp.get() != Timestamp(0) - - await restFilterTest.shutdown() diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 624014681..9973036cc 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -19,7 +19,6 @@ import ../waku_dnsdisc, ../waku_archive, ../waku_store, - ../waku_filter, ../waku_filter_v2, ../waku_peer_exchange, ../node/peer_manager, @@ -277,12 +276,6 @@ proc setupProtocols( # Filter setup. NOTE Must be mounted after relay if conf.filter: - try: - await mountLegacyFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) - except CatchableError: - return - err("failed to mount waku legacy filter protocol: " & getCurrentExceptionMsg()) - try: await mountFilter( node, @@ -298,7 +291,6 @@ proc setupProtocols( if filterNode.isOk(): try: await node.mountFilterClient() - node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec) node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) except CatchableError: return err( diff --git a/waku/node/waku_metrics.nim b/waku/node/waku_metrics.nim index 145835f41..776eab365 100644 --- a/waku/node/waku_metrics.nim +++ b/waku/node/waku_metrics.nim @@ -5,7 +5,6 @@ else: import chronicles, chronos, metrics, metrics/chronos_httpserver import - ../waku_filter/protocol_metrics as filter_metrics, ../waku_rln_relay/protocol_metrics as rln_metrics, ../utils/collector, ./peer_manager, @@ -39,7 +38,6 @@ proc startMetricsLog*() = let pxPeers = collectorAsF64(waku_px_peers) let lightpushPeers = collectorAsF64(waku_lightpush_peers) let filterPeers = collectorAsF64(waku_filter_peers) - let filterSubscribers = collectorAsF64(waku_legacy_filter_subscribers) info "Total connections initiated", count = $freshConnCount info "Total messages", count = totalMessages @@ -47,7 +45,6 @@ proc startMetricsLog*() = info "Total peer exchange peers", count = pxPeers info "Total lightpush peers", count = lightpushPeers info "Total filter peers", count = filterPeers - info "Total active filter subscriptions", count = filterSubscribers info "Total errors", count = $freshErrorCount # Start protocol specific metrics logging diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c2c0d8ed3..41741b3d6 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -33,10 +33,6 @@ import ../waku_archive, ../waku_store, ../waku_store/client as store_client, - ../waku_filter as legacy_filter, - #TODO: support for legacy filter protocol will be removed - ../waku_filter/client as legacy_filter_client, - #TODO: support for legacy filter protocol will be removed ../waku_filter_v2, ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, @@ -94,10 +90,6 @@ type wakuStoreClient*: WakuStoreClient wakuFilter*: waku_filter_v2.WakuFilter wakuFilterClient*: filter_client.WakuFilterClient - wakuFilterLegacy*: legacy_filter.WakuFilterLegacy - #TODO: support for legacy filter protocol will be removed - wakuFilterClientLegacy*: legacy_filter_client.WakuFilterClientLegacy - #TODO: support for legacy filter protocol will be removed wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush wakuLightpushClient*: WakuLightPushClient @@ -245,12 +237,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuFilter.handleMessage(topic, msg) - ##TODO: Support for legacy filter will be removed - if node.wakuFilterLegacy.isNil(): - return - - await node.wakuFilterLegacy.handleMessage(topic, msg) - proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = if node.wakuArchive.isNil(): return @@ -436,21 +422,6 @@ proc mountRelay*( ## Waku filter -proc mountLegacyFilter*( - node: WakuNode, filterTimeout: Duration = WakuLegacyFilterTimeout -) {.async, raises: [Defect, LPError].} = - ## Mounting legacy filter protocol with separation from new v2 filter protocol for easier removal later - ## TODO: remove legacy filter protocol - - info "mounting legacy filter protocol" - node.wakuFilterLegacy = - WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) - - if node.started: - await node.wakuFilterLegacy.start() #TODO: remove legacy - - node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec)) - proc mountFilter*( node: WakuNode, subscriptionTimeout: Duration = @@ -473,112 +444,25 @@ proc mountFilter*( proc filterHandleMessage*( node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage ) {.async.} = - if node.wakuFilter.isNil() or node.wakuFilterLegacy.isNil(): + if node.wakuFilter.isNil(): error "cannot handle filter message", - error = "waku filter and waku filter legacy are both required" + error = "waku filter is required" return - await allFutures( - node.wakuFilter.handleMessage(pubsubTopic, message), - node.wakuFilterLegacy.handleMessage(pubsubTopic, message), #TODO: remove legacy - ) + await node.wakuFilter.handleMessage(pubsubTopic, message) proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = - ## Mounting both filter clients v1 - legacy and v2. + ## Mounting both filter ## Giving option for application level to choose btw own push message handling or ## rely on node provided cache. - This only applies for v2 filter client info "mounting filter client" - node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng) node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) if node.started: - await allFutures(node.wakuFilterClientLegacy.start(), node.wakuFilterClient.start()) + await node.wakuFilterClient.start() node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec)) - node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuLegacyFilterCodec)) - -proc legacyFilterSubscribe*( - node: WakuNode, - pubsubTopic: Option[PubsubTopic], - contentTopics: ContentTopic | seq[ContentTopic], - handler: FilterPushHandler, - peer: RemotePeerInfo | string, -) {.async, gcsafe, raises: [Defect, ValueError].} = - ## Registers for messages that match a specific filter. - ## Triggers the handler whenever a message is received. - if node.wakuFilterClientLegacy.isNil(): - error "cannot register filter subscription to topic", - error = "waku legacy filter client is not set up" - return - - let remotePeerRes = parsePeerInfo(peer) - if remotePeerRes.isErr(): - error "Couldn't parse the peer info properly", error = remotePeerRes.error - return - - let remotePeer = remotePeerRes.value - - # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled - # TODO: Move this logic to wakunode2 app - # FIXME: This part needs refactoring. It seems possible that in special cases archiver will store same message multiple times. - let handlerWrapper: FilterPushHandler = - if node.wakuRelay.isNil() and not node.wakuStore.isNil(): - proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = - await allFutures( - node.wakuArchive.handleMessage(pubSubTopic, message), - handler(pubsubTopic, message), - ) - else: - handler - - if pubsubTopic.isSome(): - info "registering legacy filter subscription to content", - pubsubTopic = pubsubTopic.get(), - contentTopics = contentTopics, - peer = remotePeer.peerId - - let res = await node.wakuFilterClientLegacy.subscribe( - pubsubTopic.get(), contentTopics, handlerWrapper, peer = remotePeer - ) - - if res.isOk(): - info "subscribed to topic", - pubsubTopic = pubsubTopic.get(), contentTopics = contentTopics - else: - error "failed legacy filter subscription", error = res.error - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - else: - let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics) - - let topicMap = - if topicMapRes.isErr(): - error "can't get shard", error = topicMapRes.error - return - else: - topicMapRes.get() - - var futures = collect(newSeq): - for pubsub, topics in topicMap.pairs: - info "registering legacy filter subscription to content", - pubsubTopic = pubsub, contentTopics = topics, peer = remotePeer.peerId - - let content = topics.mapIt($it) - node.wakuFilterClientLegacy.subscribe( - $pubsub, content, handlerWrapper, peer = remotePeer - ) - - let finished = await allFinished(futures) - - for fut in finished: - let res = fut.read() - - if res.isErr(): - error "failed legacy filter subscription", error = res.error - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - - for pubsub, topics in topicMap.pairs: - info "subscribed to topic", pubsubTopic = pubsub, contentTopics = topics proc filterSubscribe*( node: WakuNode, @@ -658,81 +542,13 @@ proc filterSubscribe*( # return the last error or ok return subRes -proc legacyFilterUnsubscribe*( - node: WakuNode, - pubsubTopic: Option[PubsubTopic], - contentTopics: ContentTopic | seq[ContentTopic], - peer: RemotePeerInfo | string, -) {.async, gcsafe, raises: [Defect, ValueError].} = - ## Unsubscribe from a content legacy filter. - if node.wakuFilterClientLegacy.isNil(): - error "cannot unregister filter subscription to content", - error = "waku filter client is nil" - return - - let remotePeerRes = parsePeerInfo(peer) - if remotePeerRes.isErr(): - error "couldn't parse remotePeerInfo", error = remotePeerRes.error - return - - let remotePeer = remotePeerRes.value - - if pubsubTopic.isSome(): - info "deregistering legacy filter subscription to content", - pubsubTopic = pubsubTopic.get(), - contentTopics = contentTopics, - peer = remotePeer.peerId - - let res = await node.wakuFilterClientLegacy.unsubscribe( - pubsubTopic.get(), contentTopics, peer = remotePeer - ) - - if res.isOk(): - info "unsubscribed from topic", - pubsubTopic = pubsubTopic.get(), contentTopics = contentTopics - else: - error "failed filter unsubscription", error = res.error - waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) - else: - let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics) - - let topicMap = - if topicMapRes.isErr(): - error "can't get shard", error = topicMapRes.error - return - else: - topicMapRes.get() - - var futures = collect(newSeq): - for pubsub, topics in topicMap.pairs: - info "deregistering filter subscription to content", - pubsubTopic = pubsub, contentTopics = topics, peer = remotePeer.peerId - let content = topics.mapIt($it) - node.wakuFilterClientLegacy.unsubscribe($pubsub, content, peer = remotePeer) - - let finished = await allFinished(futures) - - for fut in finished: - let res = fut.read() - - if res.isErr(): - error "failed filter unsubscription", error = res.error - waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) - - for pubsub, topics in topicMap.pairs: - info "unsubscribed from topic", pubsubTopic = pubsub, contentTopics = topics - proc filterUnsubscribe*( node: WakuNode, pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], + contentTopics: ContentTopic|seq[ContentTopic], peer: RemotePeerInfo | string, ): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} = ## Unsubscribe from a content filter V2". - if node.wakuFilterClientLegacy.isNil(): - error "cannot unregister filter subscription to content", - error = "waku filter client is nil" - return err(FilterSubscribeError.serviceUnavailable()) let remotePeerRes = parsePeerInfo(peer) if remotePeerRes.isErr(): @@ -802,10 +618,6 @@ proc filterUnsubscribeAll*( node: WakuNode, peer: RemotePeerInfo | string ): Future[FilterSubscribeResult] {.async, gcsafe, raises: [Defect, ValueError].} = ## Unsubscribe from a content filter V2". - if node.wakuFilterClientLegacy.isNil(): - error "cannot unregister filter subscription to content", - error = "waku filter client is nil" - return err(FilterSubscribeError.serviceUnavailable()) let remotePeerRes = parsePeerInfo(peer) if remotePeerRes.isErr(): diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 06bf5376b..05724b113 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -14,7 +14,6 @@ import import ../../../waku_core, ../../../waku_store, - ../../../waku_filter, ../../../waku_filter_v2, ../../../waku_lightpush/common, ../../../waku_relay, @@ -54,17 +53,6 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, relayPeers) - if not node.wakuFilterLegacy.isNil(): - # Map WakuFilter peers to WakuPeers and add to return list - let filterPeers = node.peerManager.peerStore.peers(WakuLegacyFilterCodec).mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuLegacyFilterCodec, - connected: it.connectedness == Connectedness.Connected, - ) - ) - tuplesToWakuPeers(peers, filterPeers) - if not node.wakuFilter.isNil(): # Map WakuFilter peers to WakuPeers and add to return list let filterV2Peers = node.peerManager.peerStore diff --git a/waku/waku_api/rest/filter/handlers.nim b/waku/waku_api/rest/filter/handlers.nim index 315a2f948..e30efa6ec 100644 --- a/waku/waku_api/rest/filter/handlers.nim +++ b/waku/waku_api/rest/filter/handlers.nim @@ -16,7 +16,6 @@ import ../../../waku_core, ../../../waku_node, ../../../node/peer_manager, - ../../../waku_filter, ../../../waku_filter_v2, ../../../waku_filter_v2/client as filter_protocol_client, ../../../waku_filter_v2/common as filter_protocol_type, diff --git a/waku/waku_api/rest/filter/legacy_client.nim b/waku/waku_api/rest/filter/legacy_client.nim deleted file mode 100644 index 41fb55f55..000000000 --- a/waku/waku_api/rest/filter/legacy_client.nim +++ /dev/null @@ -1,44 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/sets, - stew/byteutils, - chronicles, - json_serialization, - json_serialization/std/options, - presto/[route, client, common] -import ../../../waku_core, ../serdes, ../responses, ../rest_serdes, ./types - -export types - -logScope: - topics = "waku node rest client v1" - -proc encodeBytes*( - value: FilterLegacySubscribeRequest, contentType: string -): RestResult[seq[byte]] = - return encodeBytesOf(value, contentType) - -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterPostSubscriptionsV1*( - body: FilterLegacySubscribeRequest -): RestResponse[string] {. - rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost -.} - -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterDeleteSubscriptionsV1*( - body: FilterLegacySubscribeRequest -): RestResponse[string] {. - rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete -.} - -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterGetMessagesV1*( - contentTopic: string -): RestResponse[FilterGetMessagesResponse] {. - rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet -.} diff --git a/waku/waku_api/rest/filter/legacy_handlers.nim b/waku/waku_api/rest/filter/legacy_handlers.nim deleted file mode 100644 index f926e7027..000000000 --- a/waku/waku_api/rest/filter/legacy_handlers.nim +++ /dev/null @@ -1,172 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/sequtils, - stew/byteutils, - chronicles, - json_serialization, - json_serialization/std/options, - presto/route, - presto/common -import - ../../../waku_core, - ../../../waku_filter, - ../../../waku_filter/client, - ../../../node/peer_manager, - ../../../waku_node, - ../../message_cache, - ../serdes, - ../responses, - ../rest_serdes, - ./types - -export types - -logScope: - topics = "waku node rest filter_api v1" - -const futTimeoutForSubscriptionProcessing* = 5.seconds - -#### Request handlers - -const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" - -func decodeRequestBody[T]( - contentBody: Option[ContentBody] -): Result[T, RestApiResponse] = - if contentBody.isNone(): - return err(RestApiResponse.badRequest("Missing content body")) - - let reqBodyContentType = MediaType.init($contentBody.get().contentType) - if reqBodyContentType != MIMETYPE_JSON: - return - err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) - - let reqBodyData = contentBody.get().data - - let requestResult = decodeFromJsonBytes(T, reqBodyData) - if requestResult.isErr(): - return err( - RestApiResponse.badRequest( - "Invalid content body, could not decode. " & $requestResult.error - ) - ) - - return ok(requestResult.get()) - -proc installFilterV1PostSubscriptionsV1Handler*( - router: var RestRouter, node: WakuNode, cache: MessageCache -) = - let pushHandler: FilterPushHandler = proc( - pubsubTopic: PubsubTopic, msg: WakuMessage - ) {.async, gcsafe, closure.} = - cache.addMessage(pubsubTopic, msg) - - router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do( - contentBody: Option[ContentBody] - ) -> RestApiResponse: - ## Subscribes a node to a list of contentTopics of a pubsubTopic - debug "post", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody - - let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody) - - if decodedBody.isErr(): - return decodedBody.error - - let req: FilterLegacySubscribeRequest = decodedBody.value() - - let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) - - if peerOpt.isNone(): - return RestApiResponse.internalServerError("No suitable remote filter peers") - - let subFut = node.legacyFilterSubscribe( - req.pubsubTopic, req.contentFilters, pushHandler, peerOpt.get() - ) - - if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): - error "Failed to subscribe to contentFilters do to timeout!" - return - RestApiResponse.internalServerError("Failed to subscribe to contentFilters") - - # Successfully subscribed to all content filters - for cTopic in req.contentFilters: - cache.contentSubscribe(cTopic) - - return RestApiResponse.ok() - -proc installFilterV1DeleteSubscriptionsV1Handler*( - router: var RestRouter, node: WakuNode, cache: MessageCache -) = - router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do( - contentBody: Option[ContentBody] - ) -> RestApiResponse: - ## Subscribes a node to a list of contentTopics of a PubSub topic - debug "delete", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody - - let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody) - - if decodedBody.isErr(): - return decodedBody.error - - let req: FilterLegacySubscribeRequest = decodedBody.value() - - let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) - - if peerOpt.isNone(): - return RestApiResponse.internalServerError("No suitable remote filter peers") - - let unsubFut = - node.legacyFilterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get()) - if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): - error "Failed to unsubscribe from contentFilters due to timeout!" - return - RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") - - for cTopic in req.contentFilters: - cache.contentUnsubscribe(cTopic) - - # Successfully unsubscribed from all requested contentTopics - return RestApiResponse.ok() - -const ROUTE_FILTER_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" - -proc installFilterV1GetMessagesV1Handler*( - router: var RestRouter, node: WakuNode, cache: MessageCache -) = - router.api(MethodGet, ROUTE_FILTER_MESSAGESV1) do( - contentTopic: string - ) -> RestApiResponse: - ## Returns all WakuMessages received on a specified content topic since the - ## last time this method was called - ## TODO: ability to specify a return message limit - debug "get", ROUTE_FILTER_MESSAGESV1, contentTopic = contentTopic - - if contentTopic.isErr(): - return RestApiResponse.badRequest("Missing contentTopic") - - let contentTopic = contentTopic.get() - - let msgRes = cache.getAutoMessages(contentTopic, clear = true) - if msgRes.isErr(): - return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic) - - let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage)) - let resp = RestApiResponse.jsonResponse(data, status = Http200) - if resp.isErr(): - error "An error ocurred while building the json respose: ", error = resp.error - return RestApiResponse.internalServerError( - "An error ocurred while building the json respose" - ) - - return resp.get() - -proc installLegacyFilterRestApiHandlers*( - router: var RestRouter, node: WakuNode, cache: MessageCache -) = - installFilterV1PostSubscriptionsV1Handler(router, node, cache) - installFilterV1DeleteSubscriptionsV1Handler(router, node, cache) - installFilterV1GetMessagesV1Handler(router, node, cache) diff --git a/waku/waku_filter.nim b/waku/waku_filter.nim deleted file mode 100644 index 3a091ecea..000000000 --- a/waku/waku_filter.nim +++ /dev/null @@ -1,8 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import ./waku_filter/protocol - -export protocol diff --git a/waku/waku_filter/README.md b/waku/waku_filter/README.md deleted file mode 100644 index 2deb6f170..000000000 --- a/waku/waku_filter/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Waku Filter protocol - -The filter protocol implements bandwidth preserving filtering for light nodes. See https://rfc.vac.dev/spec/12/ for more information. diff --git a/waku/waku_filter/client.nim b/waku/waku_filter/client.nim deleted file mode 100644 index 591f2cd6c..000000000 --- a/waku/waku_filter/client.nim +++ /dev/null @@ -1,174 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/[options, tables, sequtils], - stew/results, - chronicles, - chronos, - metrics, - bearssl/rand, - libp2p/protocols/protocol as libp2p_protocol -import - ../waku_core, - ../node/peer_manager, - ../utils/requests, - ./rpc, - ./rpc_codec, - ./protocol, - ./protocol_metrics - -logScope: - topics = "waku filter client" - -const Defaultstring = "/waku/2/default-waku/proto" - -## Client -type WakuFilterClientLegacy* = ref object of LPProtocol - rng: ref rand.HmacDrbgContext - peerManager: PeerManager - subManager: SubscriptionManager - -proc handleMessagePush( - wf: WakuFilterClientLegacy, peerId: PeerId, requestId: string, rpc: MessagePush -) = - for msg in rpc.messages: - let - pubsubTopic = Defaultstring - # TODO: Extend the filter push rpc to provide the pubsub topic. This is a limitation - contentTopic = msg.contentTopic - - wf.subManager.notifySubscriptionHandler(pubsubTopic, contentTopic, msg) - -proc initProtocolHandler(wf: WakuFilterClientLegacy) = - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let buffer = await conn.readLp(MaxRpcSize.int) - - let decodeReqRes = FilterRPC.decode(buffer) - if decodeReqRes.isErr(): - waku_legacy_filter_errors.inc(labelValues = [decodeRpcFailure]) - return - - let rpc = decodeReqRes.get() - trace "filter message received" - - if rpc.push.isNone(): - waku_legacy_filter_errors.inc(labelValues = [emptyMessagePushFailure]) - # TODO: Manage the empty push message error. Perform any action? - return - - waku_legacy_filter_messages.inc(labelValues = ["MessagePush"]) - - let - peerId = conn.peerId - requestId = rpc.requestId - push = rpc.push.get() - - info "received filter message push", peerId = conn.peerId, requestId = requestId - wf.handleMessagePush(peerId, requestId, push) - - wf.handler = handle - wf.codec = WakuLegacyFilterCodec - -proc new*( - T: type WakuFilterClientLegacy, - peerManager: PeerManager, - rng: ref rand.HmacDrbgContext, -): T = - let wf = WakuFilterClientLegacy( - peerManager: peerManager, rng: rng, subManager: SubscriptionManager.init() - ) - wf.initProtocolHandler() - wf - -proc sendFilterRpc( - wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId | RemotePeerInfo -): Future[WakuFilterResult[void]] {.async, gcsafe.} = - let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec) - if connOpt.isNone(): - return err(dialFailure) - let connection = connOpt.get() - - await connection.writeLP(rpc.encode().buffer) - return ok() - -proc sendFilterRequestRpc( - wf: WakuFilterClientLegacy, - pubsubTopic: PubsubTopic, - contentTopics: seq[ContentTopic], - subscribe: bool, - peer: PeerId | RemotePeerInfo, -): Future[WakuFilterResult[void]] {.async.} = - let requestId = generateRequestId(wf.rng) - let contentFilters = contentTopics.mapIt(ContentFilter(contentTopic: it)) - - let rpc = FilterRpc( - requestId: requestId, - request: some( - FilterRequest( - subscribe: subscribe, pubSubTopic: pubsubTopic, contentFilters: contentFilters - ) - ), - ) - - let sendRes = await wf.sendFilterRpc(rpc, peer) - if sendRes.isErr(): - waku_legacy_filter_errors.inc(labelValues = [sendRes.error]) - return err(sendRes.error) - - return ok() - -proc subscribe*( - wf: WakuFilterClientLegacy, - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic | seq[ContentTopic], - handler: FilterPushHandler, - peer: PeerId | RemotePeerInfo, -): Future[WakuFilterResult[void]] {.async.} = - var topics: seq[ContentTopic] - when contentTopic is seq[ContentTopic]: - topics = contentTopic - else: - topics = @[contentTopic] - - let sendRes = - await wf.sendFilterRequestRpc(pubsubTopic, topics, subscribe = true, peer = peer) - if sendRes.isErr(): - return err(sendRes.error) - - for topic in topics: - wf.subManager.registerSubscription(pubsubTopic, topic, handler) - - return ok() - -proc unsubscribe*( - wf: WakuFilterClientLegacy, - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic | seq[ContentTopic], - peer: PeerId | RemotePeerInfo, -): Future[WakuFilterResult[void]] {.async.} = - var topics: seq[ContentTopic] - when contentTopic is seq[ContentTopic]: - topics = contentTopic - else: - topics = @[contentTopic] - - let sendRes = - await wf.sendFilterRequestRpc(pubsubTopic, topics, subscribe = false, peer = peer) - if sendRes.isErr(): - return err(sendRes.error) - - # FIXME: I see an issue here that such solution prevents filtering client to properly manage its - # subscriptions on different peers and get notified correctly! - for topic in topics: - wf.subManager.removeSubscription(pubsubTopic, topic) - - return ok() - -proc clearSubscriptions*(wf: WakuFilterClientLegacy) = - wf.subManager.clear() - -proc getSubscriptionsCount*(wf: WakuFilterClientLegacy): int = - wf.subManager.getSubscriptionsCount() diff --git a/waku/waku_filter/protocol.nim b/waku/waku_filter/protocol.nim deleted file mode 100644 index 683392484..000000000 --- a/waku/waku_filter/protocol.nim +++ /dev/null @@ -1,191 +0,0 @@ -import - std/[options, sets, tables, sequtils], - stew/results, - chronicles, - chronos, - metrics, - bearssl/rand, - libp2p/protocols/protocol, - libp2p/crypto/crypto -import ../waku_core, ../node/peer_manager, ./rpc, ./rpc_codec, ./protocol_metrics - -logScope: - topics = "waku filter" - -const - WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1" - - WakuLegacyFilterTimeout*: Duration = 2.hours - -type WakuFilterResult*[T] = Result[T, string] - -## Subscription manager - -type Subscription = object - requestId: string - peer: PeerID - pubsubTopic: PubsubTopic - contentTopics: HashSet[ContentTopic] - -proc addSubscription( - subscriptions: var seq[Subscription], - peer: PeerID, - requestId: string, - pubsubTopic: PubsubTopic, - contentTopics: seq[ContentTopic], -) = - let subscription = Subscription( - requestId: requestId, - peer: peer, - pubsubTopic: pubsubTopic, - contentTopics: toHashSet(contentTopics), - ) - subscriptions.add(subscription) - -proc removeSubscription( - subscriptions: var seq[Subscription], - peer: PeerId, - unsubscribeTopics: seq[ContentTopic], -) = - for sub in subscriptions.mitems: - if sub.peer != peer: - continue - - sub.contentTopics.excl(toHashSet(unsubscribeTopics)) - - # Delete the subscriber if no more content filters left - subscriptions.keepItIf(it.contentTopics.len > 0) - -## Protocol - -type WakuFilterLegacy* = ref object of LPProtocol - rng*: ref rand.HmacDrbgContext - peerManager*: PeerManager - subscriptions*: seq[Subscription] - failedPeers*: Table[string, chronos.Moment] - timeout*: chronos.Duration - -proc handleFilterRequest(wf: WakuFilterLegacy, peerId: PeerId, rpc: FilterRPC) = - let - requestId = rpc.requestId - subscribe = rpc.request.get().subscribe - pubsubTopic = rpc.request.get().pubsubTopic - contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic) - - if subscribe: - info "added filter subscritpiton", - peerId = peerId, pubsubTopic = pubsubTopic, contentTopics = contentTopics - wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics) - else: - info "removed filter subscritpiton", peerId = peerId, contentTopics = contentTopics - wf.subscriptions.removeSubscription(peerId, contentTopics) - - waku_legacy_filter_subscribers.set(wf.subscriptions.len.int64) - -proc initProtocolHandler(wf: WakuFilterLegacy) = - proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let buffer = await conn.readLp(MaxRpcSize.int) - - let decodeRpcRes = FilterRPC.decode(buffer) - if decodeRpcRes.isErr(): - waku_legacy_filter_errors.inc(labelValues = [decodeRpcFailure]) - return - - trace "filter message received" - - let rpc = decodeRpcRes.get() - - ## Filter request - # Subscription/unsubscription request - if rpc.request.isNone(): - waku_legacy_filter_errors.inc(labelValues = [emptyFilterRequestFailure]) - # TODO: Manage the empty filter request message error. Perform any action? - return - - waku_legacy_filter_messages.inc(labelValues = ["FilterRequest"]) - wf.handleFilterRequest(conn.peerId, rpc) - - wf.handler = handler - wf.codec = WakuLegacyFilterCodec - -proc new*( - T: type WakuFilterLegacy, - peerManager: PeerManager, - rng: ref rand.HmacDrbgContext, - timeout: Duration = WakuLegacyFilterTimeout, -): T = - let wf = WakuFilterLegacy(rng: rng, peerManager: peerManager, timeout: timeout) - wf.initProtocolHandler() - return wf - -proc sendFilterRpc( - wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId | RemotePeerInfo -): Future[WakuFilterResult[void]] {.async, gcsafe.} = - let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec) - if connOpt.isNone(): - return err(dialFailure) - let connection = connOpt.get() - - await connection.writeLP(rpc.encode().buffer) - return ok() - -### Send message to subscriptors -proc removePeerFromFailedPeersTable(wf: WakuFilterLegacy, subs: seq[Subscription]) = - ## Clear the failed peer table if subscriber was able to connect - for sub in subs: - wf.failedPeers.del($sub) - -proc handleClientError( - wf: WakuFilterLegacy, subs: seq[Subscription] -) {.raises: [Defect, KeyError].} = - ## If we have already failed to send message to this peer, - ## check for elapsed time and if it's been too long, remove the peer. - for sub in subs: - let subKey: string = $(sub) - - if not wf.failedPeers.hasKey(subKey): - # add the peer to the failed peers table. - wf.failedPeers[subKey] = Moment.now() - return - - let elapsedTime = Moment.now() - wf.failedPeers[subKey] - if elapsedTime > wf.timeout: - wf.failedPeers.del(subKey) - - let index = wf.subscriptions.find(sub) - wf.subscriptions.delete(index) - -proc handleMessage*( - wf: WakuFilterLegacy, pubsubTopic: PubsubTopic, msg: WakuMessage -) {.async.} = - trace "handling message", - pubsubTopic, contentTopic = msg.contentTopic, subscriptions = wf.subscriptions.len - - if wf.subscriptions.len <= 0: - return - - var failedSubscriptions: seq[Subscription] - var connectedSubscriptions: seq[Subscription] - - for sub in wf.subscriptions: - # TODO: Review when pubsubTopic can be empty and if it is a valid case - if sub.pubSubTopic != "" and sub.pubSubTopic != pubsubTopic: - continue - - if msg.contentTopic notin sub.contentTopics: - continue - - let rpc = - FilterRPC(requestId: sub.requestId, push: some(MessagePush(messages: @[msg]))) - - let res = await wf.sendFilterRpc(rpc, sub.peer) - if res.isErr(): - waku_legacy_filter_errors.inc(labelValues = [res.error()]) - failedSubscriptions.add(sub) - continue - - connectedSubscriptions.add(sub) - - wf.removePeerFromFailedPeersTable(connectedSubscriptions) - - wf.handleClientError(failedSubscriptions) diff --git a/waku/waku_filter/protocol_metrics.nim b/waku/waku_filter/protocol_metrics.nim deleted file mode 100644 index 520a92064..000000000 --- a/waku/waku_filter/protocol_metrics.nim +++ /dev/null @@ -1,22 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import metrics - -declarePublicGauge waku_legacy_filter_subscribers, - "number of light node filter subscribers" -declarePublicGauge waku_legacy_filter_errors, - "number of filter protocol errors", ["type"] -declarePublicGauge waku_legacy_filter_messages, - "number of filter messages received", ["type"] -declarePublicGauge waku_node_filters, "number of content filter subscriptions" - -# Error types (metric label values) -const - dialFailure* = "dial_failure" - decodeRpcFailure* = "decode_rpc_failure" - peerNotFoundFailure* = "peer_not_found_failure" - emptyMessagePushFailure* = "empty_message_push_failure" - emptyFilterRequestFailure* = "empty_filter_request_failure" diff --git a/waku/waku_filter/rpc.nim b/waku/waku_filter/rpc.nim deleted file mode 100644 index 3014780f5..000000000 --- a/waku/waku_filter/rpc.nim +++ /dev/null @@ -1,24 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import std/options -import ../waku_core - -type - ContentFilter* = object - contentTopic*: string - - FilterRequest* = object - contentFilters*: seq[ContentFilter] - pubsubTopic*: string - subscribe*: bool - - MessagePush* = object - messages*: seq[WakuMessage] - - FilterRPC* = object - requestId*: string - request*: Option[FilterRequest] - push*: Option[MessagePush] diff --git a/waku/waku_filter/rpc_codec.nim b/waku/waku_filter/rpc_codec.nim deleted file mode 100644 index 9ade0ba04..000000000 --- a/waku/waku_filter/rpc_codec.nim +++ /dev/null @@ -1,130 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import std/options -import ../common/protobuf, ../waku_core, ./rpc - -# Multiply by 10 for safety. Currently we never push more than 1 message at a time -# We add a 64kB safety buffer for protocol overhead. -const MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 - -proc encode*(filter: ContentFilter): ProtoBuffer = - var pb = initProtoBuffer() - - pb.write3(1, filter.contentTopic) - pb.finish3() - - pb - -proc decode*(T: type ContentFilter, buffer: seq[byte]): ProtobufResult[T] = - let pb = initProtoBuffer(buffer) - var rpc = ContentFilter() - - var topic: string - if not ?pb.getField(1, topic): - return err(ProtobufError.missingRequiredField("content_topic")) - else: - rpc.contentTopic = topic - - ok(rpc) - -proc encode*(rpc: FilterRequest): ProtoBuffer = - var pb = initProtoBuffer() - - pb.write3(1, rpc.subscribe) - pb.write3(2, rpc.pubSubTopic) - - for filter in rpc.contentFilters: - pb.write3(3, filter.encode()) - - pb.finish3() - - pb - -proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtobufResult[T] = - let pb = initProtoBuffer(buffer) - var rpc = FilterRequest() - - var subflag: uint64 - if not ?pb.getField(1, subflag): - return err(ProtobufError.missingRequiredField("subscribe")) - else: - rpc.subscribe = bool(subflag) - - var topic: string - if not ?pb.getField(2, topic): - return err(ProtobufError.missingRequiredField("topic")) - else: - rpc.pubsubTopic = topic - - var buffs: seq[seq[byte]] - if not ?pb.getRepeatedField(3, buffs): - return err(ProtobufError.missingRequiredField("content_filters")) - else: - for buf in buffs: - let filter = ?ContentFilter.decode(buf) - rpc.contentFilters.add(filter) - - ok(rpc) - -proc encode*(push: MessagePush): ProtoBuffer = - var pb = initProtoBuffer() - - for push in push.messages: - pb.write3(1, push.encode()) - - pb.finish3() - - pb - -proc decode*(T: type MessagePush, buffer: seq[byte]): ProtobufResult[T] = - let pb = initProtoBuffer(buffer) - var rpc = MessagePush() - - var messages: seq[seq[byte]] - if not ?pb.getRepeatedField(1, messages): - return err(ProtobufError.missingRequiredField("messages")) - else: - for buf in messages: - let msg = ?WakuMessage.decode(buf) - rpc.messages.add(msg) - - ok(rpc) - -proc encode*(rpc: FilterRPC): ProtoBuffer = - var pb = initProtoBuffer() - - pb.write3(1, rpc.requestId) - pb.write3(2, rpc.request.map(encode)) - pb.write3(3, rpc.push.map(encode)) - pb.finish3() - - pb - -proc decode*(T: type FilterRPC, buffer: seq[byte]): ProtobufResult[T] = - let pb = initProtoBuffer(buffer) - var rpc = FilterRPC() - - var requestId: string - if not ?pb.getField(1, requestId): - return err(ProtobufError.missingRequiredField("request_id")) - else: - rpc.requestId = requestId - - var requestBuffer: seq[byte] - if not ?pb.getField(2, requestBuffer): - rpc.request = none(FilterRequest) - else: - let request = ?FilterRequest.decode(requestBuffer) - rpc.request = some(request) - - var pushBuffer: seq[byte] - if not ?pb.getField(3, pushBuffer): - rpc.push = none(MessagePush) - else: - let push = ?MessagePush.decode(pushBuffer) - rpc.push = some(push) - - ok(rpc) diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index d53566336..7f1b390df 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -79,11 +79,17 @@ proc subscribe*( wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, - contentTopics: seq[ContentTopic], + contentTopics: ContentTopic|seq[ContentTopic], ): Future[FilterSubscribeResult] {.async.} = + var contentTopicSeq: seq[ContentTopic] + when contentTopics is seq[ContentTopic]: + contentTopicSeq = contentTopics + else: + contentTopicSeq = @[contentTopics] + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.subscribe( - requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopics + requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq ) return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) @@ -92,11 +98,17 @@ proc unsubscribe*( wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, - contentTopics: seq[ContentTopic], + contentTopics: ContentTopic|seq[ContentTopic], ): Future[FilterSubscribeResult] {.async.} = + var contentTopicSeq: seq[ContentTopic] + when contentTopics is seq[ContentTopic]: + contentTopicSeq = contentTopics + else: + contentTopicSeq = @[contentTopics] + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe( - requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopics + requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq ) return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) @@ -140,4 +152,4 @@ proc new*( ): T = let wfc = WakuFilterClient(rng: rng, peerManager: peerManager, pushHandlers: @[]) wfc.initProtocolHandler() - wfc + wfc \ No newline at end of file