From dd86da3247a4c7ec3f246d8202674dba3360289c Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 14 Sep 2023 21:28:57 +0200 Subject: [PATCH] feat: HTTP REST API: Filter support v2 (#1890) Filter v2 rest api support implemented Filter rest api documentation updated with v1 and v2 interface support. Separated legacy filter rest interface Fix code and tests of v2 Filter rest api Filter v2 message push test added Applied autoshard to Filter V2 Redesigned FilterPushHandling, code style, catch up apps and tests with filter v2 interface changes Rename of FilterV1SubscriptionsRequest to FilterLegacySubscribeRequest, fix broken chat2 app, fix tests Changed Filter v2 push handler subscription to simple register Separate node's filterUnsubscribe and filterUnsubscribeAll --- apps/chat2/chat2.nim | 18 +- apps/chat2bridge/chat2bridge.nim | 4 +- apps/wakunode2/app.nim | 15 +- examples/filter_subscriber.nim | 8 +- tests/all_tests_waku.nim | 8 +- tests/test_peer_manager.nim | 56 +-- ...filter.nim => test_waku_filter_legacy.nim} | 0 ...er.nim => test_wakunode_filter_legacy.nim} | 2 +- tests/waku_filter_v2/client_utils.nim | 5 +- tests/waku_filter_v2/test_waku_client.nim | 28 +- tests/waku_filter_v2/test_waku_filter.nim | 98 +++-- tests/waku_store/test_wakunode_store.nim | 4 +- tests/wakunode_jsonrpc/test_jsonrpc_admin.nim | 6 +- .../wakunode_jsonrpc/test_jsonrpc_filter.nim | 2 +- tests/wakunode_rest/test_rest_filter.nim | 317 ++++++++++------ .../wakunode_rest/test_rest_legacy_filter.nim | 191 ++++++++++ waku/README.md | 6 +- waku/node/jsonrpc/admin/handlers.nim | 4 +- waku/node/jsonrpc/filter/handlers.nim | 10 +- waku/node/message_cache.nim | 2 + waku/node/rest/filter/client.nim | 87 +++-- waku/node/rest/filter/handlers.nim | 295 ++++++++++++--- waku/node/rest/filter/legacy_client.nim | 68 ++++ waku/node/rest/filter/legacy_handlers.nim | 160 ++++++++ waku/node/rest/filter/openapi.yaml | 351 +++++++++++++++++- waku/node/rest/filter/types.nim | 208 ++++++++++- waku/node/waku_node.nim | 277 +++++++++++--- waku/waku_core/peers.nim | 7 +- waku/waku_filter/client.nim | 7 +- waku/waku_filter/protocol.nim | 8 +- waku/waku_filter_v2/client.nim | 48 ++- waku/waku_filter_v2/rpc.nim | 27 ++ 32 files changed, 1928 insertions(+), 399 deletions(-) rename tests/{test_waku_filter.nim => test_waku_filter_legacy.nim} (100%) rename tests/{test_wakunode_filter.nim => test_wakunode_filter_legacy.nim} (93%) create mode 100644 tests/wakunode_rest/test_rest_legacy_filter.nim create mode 100644 waku/node/rest/filter/legacy_client.nim create mode 100644 waku/node/rest/filter/legacy_handlers.nim diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index d7ebb3a91..2d0764392 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -262,10 +262,12 @@ proc writeAndPrint(c: Chat) {.async.} = echo "You are now known as " & c.nick elif line.startsWith("/exit"): - if not c.node.wakuFilter.isNil(): + if not c.node.wakuFilterLegacy.isNil(): echo "unsubscribing from content filters..." - await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic) + let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec) + if peerOpt.isSome(): + await c.node.legacyFilterUnsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic, peer=peerOpt.get()) echo "quitting..." @@ -464,14 +466,18 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = if peerInfo.isOk(): await node.mountFilter() await node.mountFilterClient() - node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec) + node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec) proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = trace "Hit filter handler", contentTopic=msg.contentTopic chat.printReceivedMessage(msg) - await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler) - + await node.legacyFilterSubscribe(pubsubTopic=some(DefaultPubsubTopic), + contentTopics=chat.contentTopic, + filterHandler, + peerInfo.value) + # TODO: Here to support FilterV2 relevant subscription, but still + # Legacy Filter is concurrent to V2 untill legacy filter will be removed else: error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error @@ -485,7 +491,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = chat.printReceivedMessage(msg) let topic = DefaultPubsubTopic - await node.subscribe(some(topic), @[ContentTopic("")], handler) + node.subscribe(topic, handler) if conf.rlnRelay: info "WakuRLNRelay is enabled" diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 4c9f8ba48..b67ee2be3 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -18,6 +18,7 @@ import ../../../waku/waku_node, ../../../waku/node/peer_manager, ../../waku/waku_filter, + ../../waku/waku_filter_v2, ../../waku/waku_store, # Chat 2 imports ../chat2/chat2, @@ -297,7 +298,8 @@ when isMainModule: if conf.filternode != "": let filterPeer = parsePeerInfo(conf.filternode) if filterPeer.isOk(): - bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterCodec) + bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec) + bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec) else: error "Error parsing conf.filternode", error = filterPeer.error diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index c976e7de4..6cf55097b 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -37,6 +37,8 @@ import ../../waku/waku_store, ../../waku/waku_lightpush, ../../waku/waku_filter, + ../../waku/waku_filter_v2, + ../../waku/waku_filter_v2/client as waku_filter_client, ./wakunode2_validator_signed, ./internal_config, ./external_config @@ -46,6 +48,7 @@ import ../../waku/node/rest/debug/handlers as rest_debug_api, ../../waku/node/rest/relay/handlers as rest_relay_api, ../../waku/node/rest/relay/topic_cache, + ../../waku/node/rest/filter/legacy_handlers as rest_legacy_filter_api, ../../waku/node/rest/filter/handlers as rest_filter_api, ../../waku/node/rest/store/handlers as rest_store_api, ../../waku/node/rest/health/handlers as rest_health_api, @@ -470,8 +473,9 @@ proc setupProtocols(node: WakuNode, if conf.filternode != "": let filterNode = parsePeerInfo(conf.filternode) if filterNode.isOk(): - await mountFilterClient(node) - node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec) + await node.mountFilterClient() + node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec) + node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) else: return err("failed to set node waku filter peer: " & filterNode.error) @@ -577,8 +581,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo ## Filter REST API if conf.filter: - let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity) - installFilterApiHandlers(server.router, app.node, filterCache) + let legacyFilterCache = rest_legacy_filter_api.MessageCache.init() + rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache) + + let filterCache = rest_filter_api.MessageCache.init() + rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache) ## Store REST API installStoreApiHandlers(server.router, app.node) diff --git a/examples/filter_subscriber.nim b/examples/filter_subscriber.nim index 6c01ec6d0..8b0a05c97 100644 --- a/examples/filter_subscriber.nim +++ b/examples/filter_subscriber.nim @@ -28,13 +28,15 @@ proc unsubscribe(wfc: WakuFilterClient, else: notice "unsubscribe request successful" -proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) = +proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) + {.async, gcsafe.} = let payloadStr = string.fromBytes(message.payload) notice "message received", payload=payloadStr, pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, timestamp=message.timestamp + proc maintainSubscription(wfc: WakuFilterClient, filterPeer: RemotePeerInfo, filterPubsubTopic: PubsubTopic, @@ -68,11 +70,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) = var switch = newStandardSwitch() pm = PeerManager.new(switch) - wfc = WakuFilterClient.new(rng, messagePushHandler, pm) + wfc = WakuFilterClient.new(pm, rng) # Mount filter client protocol switch.mount(wfc) + wfc.registerPushHandler(messagePushHandler) + # Start maintaining subscription asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 0fc247807..6e6cb6037 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -58,8 +58,8 @@ import ./test_waku_lightpush, ./test_wakunode_lightpush, # Waku Filter - ./test_waku_filter, - ./test_wakunode_filter, + ./test_waku_filter_legacy, + ./test_wakunode_filter_legacy, ./test_waku_peer_exchange, ./test_peer_store_extended, ./test_message_cache, @@ -95,7 +95,9 @@ import ./wakunode_rest/test_rest_relay, ./wakunode_rest/test_rest_relay_serdes, ./wakunode_rest/test_rest_serdes, - ./wakunode_rest/test_rest_store + ./wakunode_rest/test_rest_store, + ./wakunode_rest/test_rest_filter, + ./wakunode_rest/test_rest_legacy_filter import ./waku_rln_relay/test_waku_rln_relay, diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 4b85babf7..cbd6d7ac2 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -52,7 +52,7 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.mountFilter())) # Dial node2 from node1 - let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec) + let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec) # Check connection check: conn.isSome() @@ -81,12 +81,12 @@ procSuite "Peer Manager": let nonExistentPeer = nonExistentPeerRes.value # Dial non-existent peer from node1 - let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec) + let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec) check: conn1.isNone() # Dial peer not supporting given protocol - let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec) + let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec) check: conn2.isNone() @@ -109,14 +109,14 @@ procSuite "Peer Manager": node.mountStoreClient() node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) - node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec) + node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec) # Check peers were successfully added to peer manager check: node.peerManager.peerStore.peers().len == 2 - node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and + node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt(it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and - it.protocols.contains(WakuFilterCodec)) + it.protocols.contains(WakuLegacyFilterCodec)) node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and it.protocols.contains(WakuStoreCodec)) @@ -429,7 +429,7 @@ procSuite "Peer Manager": # service peers node.peerManager.addServicePeer(peers[0], WakuStoreCodec) - node.peerManager.addServicePeer(peers[1], WakuFilterCodec) + node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec) node.peerManager.addServicePeer(peers[2], WakuLightPushCodec) node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec) @@ -449,7 +449,7 @@ procSuite "Peer Manager": # all service peers are added to its service slot check: node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId - node.peerManager.serviceSlots[WakuFilterCodec].peerId == peers[1].peerId + node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId @@ -474,11 +474,11 @@ procSuite "Peer Manager": (await nodes[0].peerManager.connectRelay(pInfos[2])) == true (await nodes[1].peerManager.connectRelay(pInfos[2])) == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == true # isolated dial creates a relay conn under the hood (libp2p behaviour) - (await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true + (await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == true # assert physical connections @@ -486,26 +486,26 @@ procSuite "Peer Manager": nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0 nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2 - nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0 - nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2 + nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0 + nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 - nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 - nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0 + nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 + nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 - nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 - nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1 + nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 + nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0 - nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 - nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0 + nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 + nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0 asyncTest "getNumStreams() returns expected number of connections per protocol": # Create 2 nodes @@ -521,17 +521,17 @@ procSuite "Peer Manager": require: # multiple streams are multiplexed over a single connection. # note that a relay connection is created under the hood when dialing a peer (libp2p behaviour) - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true check: nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) - nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4) + nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4) nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) - nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0) + nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0) test "selectPeer() returns the correct peer": # Valid peer id missing the last digit @@ -552,7 +552,7 @@ procSuite "Peer Manager": # Add a peer[0] to the peerstore pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs - pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec] + pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec] # When no service peers, we get one from the peerstore let selectedPeer1 = pm.selectPeer(WakuStoreCodec) @@ -561,7 +561,7 @@ procSuite "Peer Manager": selectedPeer1.get().peerId == peers[0].peerId # Same for other protocol - let selectedPeer2 = pm.selectPeer(WakuFilterCodec) + let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec) check: selectedPeer2.isSome() == true selectedPeer2.get().peerId == peers[0].peerId @@ -757,7 +757,7 @@ procSuite "Peer Manager": discard await nodes[0].peerManager.connectRelay(pInfos[3]) discard await nodes[0].peerManager.connectRelay(pInfos[4]) - # they are also prunned  + # they are also prunned check nodes[0].peerManager.switch.connManager.getConnections().len == 1 # we should have 4 peers (2in/2out) but due to collocation limit diff --git a/tests/test_waku_filter.nim b/tests/test_waku_filter_legacy.nim similarity index 100% rename from tests/test_waku_filter.nim rename to tests/test_waku_filter_legacy.nim diff --git a/tests/test_wakunode_filter.nim b/tests/test_wakunode_filter_legacy.nim similarity index 93% rename from tests/test_wakunode_filter.nim rename to tests/test_wakunode_filter_legacy.nim index 4ea7750ec..e0fd53039 100644 --- a/tests/test_wakunode_filter.nim +++ b/tests/test_wakunode_filter_legacy.nim @@ -44,7 +44,7 @@ suite "WakuNode - Filter": filterPushHandlerFut.complete((pubsubTopic, msg)) ## When - await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo) + await client.legacyFilterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo) # Wait for subscription to take effect waitFor sleepAsync(100.millis) diff --git a/tests/waku_filter_v2/client_utils.nim b/tests/waku_filter_v2/client_utils.nim index 95d3d5c46..e42076ace 100644 --- a/tests/waku_filter_v2/client_utils.nim +++ b/tests/waku_filter_v2/client_utils.nim @@ -1,6 +1,5 @@ import std/[options,tables], - testutils/unittests, chronos, chronicles @@ -22,10 +21,10 @@ proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} = return proto -proc newTestWakuFilterClient*(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} = +proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.} = let peerManager = PeerManager.new(switch) - proto = WakuFilterClient.new(rng, messagePushHandler, peerManager) + proto = WakuFilterClient.new(peerManager, rng) await proto.start() switch.mount(proto) diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index 398bcddeb..f4cae3ed8 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -3,16 +3,13 @@ import std/[options,tables], testutils/unittests, - chronos, - chronicles, - libp2p/peerstore + chronos import - ../../../waku/node/peer_manager, - ../../../waku/waku_filter_v2, - ../../../waku/waku_filter_v2/client, - ../../../waku/waku_core, - ../testlib/common, + ../../waku/node/peer_manager, + ../../waku/waku_filter_v2, + ../../waku/waku_filter_v2/client, + ../../waku/waku_core, ../testlib/wakucore, ../testlib/testasync, ./client_utils.nim @@ -28,26 +25,23 @@ suite "Waku Filter": var contentTopics {.threadvar.}: seq[ContentTopic] asyncSetup: - let - voidHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = - discard pubsubTopic = DefaultPubsubTopic contentTopics = @[DefaultContentTopic] serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, voidHandler) - + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) + await allFutures(serverSwitch.start(), clientSwitch.start()) serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() - + asyncTeardown: await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) asyncTest "Active Subscription Identification": # Given - let + let clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopics @@ -75,12 +69,12 @@ suite "Waku Filter": asyncTest "After Unsubscription": # Given - let + let clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopics ) - + require: subscribeResponse.isOk() wakuFilter.subscriptions.hasKey(clientPeerId) diff --git a/tests/waku_filter_v2/test_waku_filter.nim b/tests/waku_filter_v2/test_waku_filter.nim index 76c90e4fb..fe04d012f 100644 --- a/tests/waku_filter_v2/test_waku_filter.nim +++ b/tests/waku_filter_v2/test_waku_filter.nim @@ -3,16 +3,13 @@ import std/[options,tables], testutils/unittests, - chronos, - chronicles, - libp2p/peerstore + chronos import - ../../../waku/node/peer_manager, - ../../../waku/waku_filter_v2, - ../../../waku/waku_filter_v2/client, - ../../../waku/waku_core, - ../testlib/common, + ../../waku/node/peer_manager, + ../../waku/waku_filter_v2, + ../../waku/waku_filter_v2/client, + ../../waku/waku_core, ../testlib/wakucore, ./client_utils.nim @@ -22,21 +19,29 @@ suite "Waku Filter - end to end": # Given var pushHandlerFuture = newFuture[(string, WakuMessage)]() - messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage): + Future[void] + {.async, closure, gcsafe.} = pushHandlerFuture.complete((pubsubTopic, message)) let serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic contentTopics = @[DefaultContentTopic] # When await allFutures(serverSwitch.start(), clientSwitch.start()) - let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + wakuFilterClient.registerPushHandler(messagePushHandler) + + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) # Then check: @@ -57,7 +62,9 @@ suite "Waku Filter - end to end": pushedMsg == msg1 # When - let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) # Then check: @@ -74,20 +81,24 @@ suite "Waku Filter - end to end": not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed # Teardown - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), + serverSwitch.stop(), clientSwitch.stop()) asyncTest "subscribe, unsubscribe multiple content topics": # Given var pushHandlerFuture = newFuture[(string, WakuMessage)]() - messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage): + Future[void] + {.async, closure, gcsafe.} = pushHandlerFuture.complete((pubsubTopic, message)) let serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") @@ -95,7 +106,12 @@ suite "Waku Filter - end to end": # When await allFutures(serverSwitch.start(), clientSwitch.start()) - let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + wakuFilterClient.registerPushHandler(messagePushHandler) + + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) # Then check: @@ -129,7 +145,9 @@ suite "Waku Filter - end to end": pushedMsg2 == msg2 # When - let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, @[contentTopic2]) # Unsubscribe only one content topic + let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + @[contentTopic2]) # Unsubscribe only one content topic # Then check: @@ -159,20 +177,24 @@ suite "Waku Filter - end to end": not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed # Teardown - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), + serverSwitch.stop(), clientSwitch.stop()) asyncTest "subscribe to multiple content topics and unsubscribe all": # Given var pushHandlerFuture = newFuture[(string, WakuMessage)]() - messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage): + Future[void] + {.async, closure, gcsafe.} = pushHandlerFuture.complete((pubsubTopic, message)) let serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") @@ -180,7 +202,12 @@ suite "Waku Filter - end to end": # When await allFutures(serverSwitch.start(), clientSwitch.start()) - let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + wakuFilterClient.registerPushHandler(messagePushHandler) + + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) # Then check: @@ -234,20 +261,24 @@ suite "Waku Filter - end to end": not (await pushHandlerFuture.withTimeout(2.seconds)) # Neither message should be pushed # Teardown - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), + serverSwitch.stop(), clientSwitch.stop()) asyncTest "subscribe, unsubscribe multiple pubsub topics and content topics": # Given var pushHandlerFuture = newFuture[(string, WakuMessage)]() - messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage): + Future[void] + {.async, closure, gcsafe.} = pushHandlerFuture.complete((pubsubTopic, message)) let serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic pubsubTopic2 = PubsubTopic("/waku/2/non-default-pubsub/proto") @@ -258,9 +289,17 @@ suite "Waku Filter - end to end": # When await allFutures(serverSwitch.start(), clientSwitch.start()) + + wakuFilterClient.registerPushHandler(messagePushHandler) + let - response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) - response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, contentTopics) + response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) + + response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic2, + contentTopics) # Then check: @@ -299,7 +338,9 @@ suite "Waku Filter - end to end": ## Step 3: We can selectively unsubscribe from pubsub topics and content topic(s) # When - let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, @[contentTopic2]) + let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic2, + @[contentTopic2]) require response3.isOk() let msg3 = fakeWakuMessage(contentTopic=contentTopic2) @@ -325,4 +366,5 @@ suite "Waku Filter - end to end": pushedMsg3 == msg3 # Teardown - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), + serverSwitch.stop(), clientSwitch.stop()) diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index 6d01c36ee..117a1cc93 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -216,7 +216,7 @@ procSuite "WakuNode - Store": let mountArchiveRes = server.mountArchive(driver) assert mountArchiveRes.isOk(), mountArchiveRes.error - + waitFor server.mountStore() waitFor server.mountFilterClient() client.mountStoreClient() @@ -232,7 +232,7 @@ procSuite "WakuNode - Store": proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = filterFut.complete((pubsubTopic, msg)) - waitFor server.filterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer) + waitFor server.legacyFilterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer) waitFor sleepAsync(100.millis) diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_admin.nim b/tests/wakunode_jsonrpc/test_jsonrpc_admin.nim index 1a4823fd3..e4e8c09e6 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_admin.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_admin.nim @@ -169,11 +169,11 @@ procSuite "Waku v2 JSON-RPC API - Admin": filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr]) storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr]) - node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec) + node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec) node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) # Mock that we connected in the past so Identify populated this - node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuFilterCodec] + node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuLegacyFilterCodec] node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec] let response = await client.get_waku_v2_admin_v1_peers() @@ -182,7 +182,7 @@ procSuite "Waku v2 JSON-RPC API - Admin": check: response.len == 2 # Check filter peer - (response.filterIt(it.protocol == WakuFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer) + (response.filterIt(it.protocol == WakuLegacyFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer) # Check store peer (response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer) diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim b/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim index 17202df4e..8b2e43c9a 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim @@ -41,7 +41,7 @@ procSuite "Waku v2 JSON-RPC API - Filter": await node1.mountFilter() await node2.mountFilterClient() - node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) + node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec) # RPC server setup let diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index e4276fa3b..c13cd08b0 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -21,6 +21,11 @@ import ../../waku/node/rest/filter/handlers as filter_api, ../../waku/node/rest/filter/client as filter_api_client, ../../waku/waku_relay, + ../../waku/waku_filter_v2/subscriptions, + ../../waku/waku_filter_v2/common, + ../../waku/node/rest/relay/topic_cache, + ../../waku/node/rest/relay/handlers as relay_api, + ../../waku/node/rest/relay/client as relay_api_client, ../testlib/wakucore, ../testlib/wakunode @@ -36,51 +41,64 @@ proc testWakuNode(): WakuNode = type RestFilterTest = object - node1: WakuNode - node2: WakuNode + serviceNode: WakuNode + subscriberNode: WakuNode restServer: RestServerRef + restServerForService: RestServerRef messageCache: filter_api.MessageCache client: RestClientRef + clientTwdServiceNode: RestClientRef -proc setupRestFilter(): Future[RestFilterTest] {.async.} = - result.node1 = testWakuNode() - result.node2 = testWakuNode() +proc init(T: type RestFilterTest): Future[T] {.async.} = + var testSetup = RestFilterTest() + testSetup.serviceNode = testWakuNode() + testSetup.subscriberNode = testWakuNode() - await allFutures(result.node1.start(), result.node2.start()) + await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start()) - await result.node1.mountFilter() - await result.node2.mountFilterClient() + await testSetup.serviceNode.mountRelay() + await testSetup.serviceNode.mountFilter() + await testSetup.subscriberNode.mountFilterClient() - result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) + testSetup.subscriberNode.peerManager.addServicePeer(testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec) let restPort = Port(58011) - let restAddress = ValidIpAddress.init("0.0.0.0") - result.restServer = RestServerRef.init(restAddress, restPort).tryGet() + let restAddress = ValidIpAddress.init("127.0.0.1") + testSetup.restServer = RestServerRef.init(restAddress, restPort).tryGet() - result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity) + let restPort2 = Port(58012) + testSetup.restServerForService = RestServerRef.init(restAddress, restPort2).tryGet() - installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) - installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) - installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache) + # through this one we will see if messages are pushed according to our content topic sub + testSetup.messageCache = filter_api.MessageCache.init() + installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache) - result.restServer.start() + let topicCache = TopicCache.init() + installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache) - result.client = newRestHttpClient(initTAddress(restAddress, restPort)) + testSetup.restServer.start() + testSetup.restServerForService.start() - return result + testSetup.client = newRestHttpClient(initTAddress(restAddress, restPort)) + testSetup.clientTwdServiceNode = newRestHttpClient(initTAddress(restAddress, restPort2)) + + return testSetup proc shutdown(self: RestFilterTest) {.async.} = await self.restServer.stop() await self.restServer.closeWait() - await allFutures(self.node1.stop(), self.node2.stop()) + await self.restServerForService.stop() + await self.restServerForService.closeWait() + await allFutures(self.serviceNode.stop(), self.subscriberNode.stop()) -suite "Waku v2 Rest API - Filter": - asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": +suite "Waku v2 Rest API - Filter V2": + asyncTest "Subscribe a node to an array of topics - POST /filter/v2/subscriptions": # Given - let restFilterTest: RestFilterTest = await setupRestFilter() + let restFilterTest = await RestFilterTest.init() + let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId # When let contentFilters = @[DefaultContentTopic @@ -89,103 +107,178 @@ suite "Waku v2 Rest API - Filter": ,ContentTopic("4") ] - let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, + let requestBody = FilterSubscribeRequest(requestId: "1234", + contentFilters: contentFilters, pubsubTopic: some(DefaultPubsubTopic)) - let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) + let response = await restFilterTest.client.filterPostSubscriptions(requestBody) - # Then - check: - response.status == 200 - $response.contentType == $MIMETYPE_TEXT - response.data == "OK" + echo "response", $response - check: - restFilterTest.messageCache.isSubscribed(DefaultContentTopic) - restFilterTest.messageCache.isSubscribed("2") - restFilterTest.messageCache.isSubscribed("3") - restFilterTest.messageCache.isSubscribed("4") - - # When - error case - let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: none(string)) - let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) - - check: - badResponse.status == 400 - $badResponse.contentType == $MIMETYPE_TEXT - badResponse.data == "Invalid content body, could not decode. Unable to deserialize data" - - - await restFilterTest.shutdown() - - - asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": - # Given - let - restFilterTest: RestFilterTest = await setupRestFilter() - - # When - restFilterTest.messageCache.subscribe("1") - restFilterTest.messageCache.subscribe("2") - restFilterTest.messageCache.subscribe("3") - restFilterTest.messageCache.subscribe("4") - - let contentFilters = @[ContentTopic("1") - ,ContentTopic("2") - ,ContentTopic("3") - # ,ContentTopic("4") # Keep this subscription for check - ] - - # When - let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, - pubsubTopic: some(DefaultPubsubTopic)) - let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) - - # Then - check: - response.status == 200 - $response.contentType == $MIMETYPE_TEXT - response.data == "OK" - - check: - not restFilterTest.messageCache.isSubscribed("1") - not restFilterTest.messageCache.isSubscribed("2") - not restFilterTest.messageCache.isSubscribed("3") - restFilterTest.messageCache.isSubscribed("4") - - await restFilterTest.shutdown() - - - asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": - # Given - - let - restFilterTest = await setupRestFilter() - - let pubSubTopic = "/waku/2/default-waku/proto" - let contentTopic = ContentTopic( "content-topic-x" ) - - let messages = @[ - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - ] - - restFilterTest.messageCache.subscribe(contentTopic) - for msg in messages: - restFilterTest.messageCache.addMessage(contentTopic, msg) - - # When - let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) + let subscribedPeer1 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, DefaultContentTopic) + let subscribedPeer2 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "2") + let subscribedPeer3 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "3") + let subscribedPeer4 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4") # Then check: response.status == 200 $response.contentType == $MIMETYPE_JSON - response.data.len == 3 - response.data.all do (msg: FilterWakuMessage) -> bool: - msg.payload == base64.encode("TEST-1") and - msg.contentTopic.get().string == "content-topic-x" and - msg.version.get() == 2 and - msg.timestamp.get() != Timestamp(0) + response.data.requestId == "1234" + subscribedPeer1.len() == 1 + subPeerId in subscribedPeer1 + subPeerId in subscribedPeer2 + subPeerId in subscribedPeer3 + subPeerId in subscribedPeer4 + + # When - error case + let badRequestBody = FilterSubscribeRequest(requestId: "4567", contentFilters: @[], pubsubTopic: none(string)) + let badRequestResp = await restFilterTest.client.filterPostSubscriptions(badRequestBody) + + check: + badRequestResp.status == 400 + $badRequestResp.contentType == $MIMETYPE_JSON + badRequestResp.data.requestId == "unknown" + # badRequestResp.data.statusDesc == "*********" + badRequestResp.data.statusDesc.startsWith("BAD_REQUEST: Failed to decode request") + + await restFilterTest.shutdown() + + asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v2/subscriptions": + # Given + let + restFilterTest = await RestFilterTest.init() + subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + + # When + var requestBody = FilterSubscribeRequest(requestId: "1234", + contentFilters: @[ContentTopic("1") + ,ContentTopic("2") + ,ContentTopic("3") + ,ContentTopic("4") + ], + pubsubTopic: some(DefaultPubsubTopic)) + discard await restFilterTest.client.filterPostSubscriptions(requestBody) + + let contentFilters = @[ContentTopic("1") + ,ContentTopic("2") + ,ContentTopic("3") + # ,ContentTopic("4") # Keep this subscription for check + ] + + let requestBodyUnsub = FilterUnsubscribeRequest(requestId: "4321", + contentFilters: contentFilters, + pubsubTopic: some(DefaultPubsubTopic)) + let response = await restFilterTest.client.filterDeleteSubscriptions(requestBodyUnsub) + + let subscribedPeer1 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, DefaultContentTopic) + let subscribedPeer2 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "2") + let subscribedPeer3 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "3") + let subscribedPeer4 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4") + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.requestId == "4321" + subscribedPeer1.len() == 0 + subPeerId notin subscribedPeer1 + subPeerId notin subscribedPeer2 + subPeerId notin subscribedPeer3 + subscribedPeer4.len() == 1 + subPeerId in subscribedPeer4 + + # When - error case + let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "2143") + let responseUnsubAll = await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll) + + let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4") + + check: + responseUnsubAll.status == 200 + $responseUnsubAll.contentType == $MIMETYPE_JSON + responseUnsubAll.data.requestId == "2143" + subscribedPeer.len() == 0 + + await restFilterTest.shutdown() + + asyncTest "ping subscribed node - GET /filter/v2/subscriptions/{requestId}": + # Given + let + restFilterTest = await RestFilterTest.init() + subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + + # When + var requestBody = FilterSubscribeRequest(requestId: "1234", + contentFilters: @[ContentTopic("1")], + pubsubTopic: some(DefaultPubsubTopic)) + discard await restFilterTest.client.filterPostSubscriptions(requestBody) + + let pingResponse = await restFilterTest.client.filterSubscriberPing("9999") + + # Then + check: + pingResponse.status == 200 + $pingResponse.contentType == $MIMETYPE_JSON + pingResponse.data.requestId == "9999" + pingResponse.data.statusDesc.len() == 0 + + # When - error case + let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "9988") + discard await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll) + + let pingResponseFail = await restFilterTest.client.filterSubscriberPing("9977") + + # Then + check: + pingResponseFail.status == 404 # NOT_FOUND + $pingResponseFail.contentType == $MIMETYPE_JSON + pingResponseFail.data.requestId == "9977" + pingResponseFail.data.statusDesc == "NOT_FOUND: peer has no subscriptions" + + await restFilterTest.shutdown() + + asyncTest "push filtered message": + # Given + let + restFilterTest = await RestFilterTest.init() + subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + + restFilterTest.messageCache.subscribe(DefaultPubsubTopic) + restFilterTest.serviceNode.subscribe(DefaultPubsubTopic) + + # When + var requestBody = FilterSubscribeRequest(requestId: "1234", + contentFilters: @[ContentTopic("1")], + pubsubTopic: some(DefaultPubsubTopic)) + discard await restFilterTest.client.filterPostSubscriptions(requestBody) + + let pingResponse = await restFilterTest.client.filterSubscriberPing("9999") + + # Then + check: + pingResponse.status == 200 + $pingResponse.contentType == $MIMETYPE_JSON + pingResponse.data.requestId == "9999" + pingResponse.data.statusDesc.len() == 0 + + # When - message push + let testMessage = WakuMessage( + payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(), + contentTopic: "1", + timestamp: int64(2022) + ) + + let postMsgResponse = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, + toRelayWakuMessage(testMessage) + ) + # Then + let messages = restFilterTest.messageCache.getMessages("1").tryGet() + + check: + postMsgResponse.status == 200 + $postMsgResponse.contentType == $MIMETYPE_TEXT + postMsgResponse.data == "OK" + messages == @[testMessage] await restFilterTest.shutdown() diff --git a/tests/wakunode_rest/test_rest_legacy_filter.nim b/tests/wakunode_rest/test_rest_legacy_filter.nim new file mode 100644 index 000000000..7b4ceba77 --- /dev/null +++ b/tests/wakunode_rest/test_rest_legacy_filter.nim @@ -0,0 +1,191 @@ +{.used.} + +import + std/sequtils, + stew/byteutils, + stew/shims/net, + testutils/unittests, + presto, presto/client as presto_client, + libp2p/crypto/crypto +import + ../../waku/node/message_cache, + ../../waku/common/base64, + ../../waku/waku_core, + ../../waku/waku_node, + ../../waku/node/peer_manager, + ../../waku/waku_filter, + ../../waku/node/rest/server, + ../../waku/node/rest/client, + ../../waku/node/rest/responses, + ../../waku/node/rest/filter/types, + ../../waku/node/rest/filter/legacy_handlers as filter_api, + ../../waku/node/rest/filter/legacy_client as filter_api_client, + ../../waku/waku_relay, + ../testlib/wakucore, + ../testlib/wakunode + + +proc testWakuNode(): WakuNode = + let + privkey = generateSecp256k1Key() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(0) + + return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) + + +type RestFilterTest = object + filterNode: WakuNode + clientNode: WakuNode + restServer: RestServerRef + messageCache: filter_api.MessageCache + client: RestClientRef + + +proc setupRestFilter(): Future[RestFilterTest] {.async.} = + result.filterNode = testWakuNode() + result.clientNode = testWakuNode() + + await allFutures(result.filterNode.start(), result.clientNode.start()) + + await result.filterNode.mountFilter() + await result.clientNode.mountFilterClient() + + result.clientNode.peerManager.addServicePeer(result.filterNode.peerInfo.toRemotePeerInfo() + ,WakuLegacyFilterCodec) + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("0.0.0.0") + result.restServer = RestServerRef.init(restAddress, restPort).tryGet() + + result.messageCache = filter_api.MessageCache.init() + installLegacyFilterRestApiHandlers(result.restServer.router + ,result.clientNode + ,result.messageCache) + + result.restServer.start() + + result.client = newRestHttpClient(initTAddress(restAddress, restPort)) + + return result + + +proc shutdown(self: RestFilterTest) {.async.} = + await self.restServer.stop() + await self.restServer.closeWait() + await allFutures(self.filterNode.stop(), self.clientNode.stop()) + + +suite "Waku v2 Rest API - Filter": + asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": + # Given + let restFilterTest: RestFilterTest = await setupRestFilter() + + # When + let contentFilters = @[DefaultContentTopic + ,ContentTopic("2") + ,ContentTopic("3") + ,ContentTopic("4") + ] + + let requestBody = FilterLegacySubscribeRequest(contentFilters: contentFilters, + pubsubTopic: some(DefaultPubsubTopic)) + let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + restFilterTest.messageCache.isSubscribed(DefaultContentTopic) + restFilterTest.messageCache.isSubscribed("2") + restFilterTest.messageCache.isSubscribed("3") + restFilterTest.messageCache.isSubscribed("4") + + # When - error case + let badRequestBody = FilterLegacySubscribeRequest(contentFilters: @[] + ,pubsubTopic: none(string)) + let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) + + check: + badResponse.status == 400 + $badResponse.contentType == $MIMETYPE_TEXT + badResponse.data == "Invalid content body, could not decode. Unable to deserialize data" + + + await restFilterTest.shutdown() + + + asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": + # Given + let + restFilterTest: RestFilterTest = await setupRestFilter() + + # When + restFilterTest.messageCache.subscribe("1") + restFilterTest.messageCache.subscribe("2") + restFilterTest.messageCache.subscribe("3") + restFilterTest.messageCache.subscribe("4") + + let contentFilters = @[ContentTopic("1") + ,ContentTopic("2") + ,ContentTopic("3") + # ,ContentTopic("4") # Keep this subscription for check + ] + + # When + let requestBody = FilterLegacySubscribeRequest(contentFilters: contentFilters, + pubsubTopic: some(DefaultPubsubTopic)) + let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + not restFilterTest.messageCache.isSubscribed("1") + not restFilterTest.messageCache.isSubscribed("2") + not restFilterTest.messageCache.isSubscribed("3") + restFilterTest.messageCache.isSubscribed("4") + + await restFilterTest.shutdown() + + asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": + # Given + + let + restFilterTest = await setupRestFilter() + + let pubSubTopic = "/waku/2/default-waku/proto" + let contentTopic = ContentTopic( "content-topic-x" ) + + let messages = @[ + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + ] + + restFilterTest.messageCache.subscribe(contentTopic) + for msg in messages: + restFilterTest.messageCache.addMessage(contentTopic, msg) + + # When + let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.len == 3 + response.data.all do (msg: FilterWakuMessage) -> bool: + msg.payload == base64.encode("TEST-1") and + msg.contentTopic.get().string == "content-topic-x" and + msg.version.get() == 2 and + msg.timestamp.get() != Timestamp(0) + + await restFilterTest.shutdown() diff --git a/waku/README.md b/waku/README.md index 59f49c10f..c24fce873 100644 --- a/waku/README.md +++ b/waku/README.md @@ -50,18 +50,18 @@ To run a specific test. # Get a shell with the right environment variables set ./env.sh bash # Run a specific test -nim c -r ./tests/test_waku_filter.nim +nim c -r ./tests/test_waku_filter_legacy.nim ``` You can also alter compile options. For example, if you want a less verbose output you can do the following. For more, refer to the [compiler flags](https://nim-lang.org/docs/nimc.html#compiler-usage) and [chronicles documentation](https://github.com/status-im/nim-chronicles#compile-time-configuration). ```bash -nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off ./tests/test_waku_filter.nim +nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off ./tests/waku_filter_v2/test_waku_filter.nim ``` You may also want to change the `outdir` to a folder ignored by git. ```bash -nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off --outdir=build ./tests/test_waku_filter.nim +nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off --outdir=build ./tests/waku_filter_v2/test_waku_filter.nim ``` ### Waku Protocol Example diff --git a/waku/node/jsonrpc/admin/handlers.nim b/waku/node/jsonrpc/admin/handlers.nim index 5c7f1a29d..d60994a72 100644 --- a/waku/node/jsonrpc/admin/handlers.nim +++ b/waku/node/jsonrpc/admin/handlers.nim @@ -71,9 +71,9 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = if not node.wakuFilterLegacy.isNil(): # Map WakuFilter peers to WakuPeers and add to return list - let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec) + let filterPeers = node.peerManager.peerStore.peers(WakuLegacyFilterCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), - protocol: WakuFilterCodec, + protocol: WakuLegacyFilterCodec, connected: it.connectedness == Connectedness.Connected)) peers.add(filterPeers) diff --git a/waku/node/jsonrpc/filter/handlers.nim b/waku/node/jsonrpc/filter/handlers.nim index 43174c0e1..312dcd036 100644 --- a/waku/node/jsonrpc/filter/handlers.nim +++ b/waku/node/jsonrpc/filter/handlers.nim @@ -34,7 +34,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message ## Subscribes a node to a list of content filters debug "post_waku_v2_filter_v1_subscription" - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) if peerOpt.isNone(): raise newException(ValueError, "no suitable remote filter peers") @@ -43,7 +43,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = cache.addMessage(msg.contentTopic, msg) - let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) + let subFut = node.legacyFilterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) if not await subFut.withTimeout(futTimeout): raise newException(ValueError, "Failed to subscribe to contentFilters") @@ -59,7 +59,11 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) - let unsubFut = node.unsubscribe(pubsubTopic, contentTopics) + let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) + if peerOpt.isNone(): + raise newException(ValueError, "no suitable remote filter peers") + + let unsubFut = node.legacyFilterUnsubscribe(pubsubTopic, contentTopics, peerOpt.get()) if not await unsubFut.withTimeout(futTimeout): raise newException(ValueError, "Failed to unsubscribe from contentFilters") diff --git a/waku/node/message_cache.nim b/waku/node/message_cache.nim index 9d801ea78..dd0af1740 100644 --- a/waku/node/message_cache.nim +++ b/waku/node/message_cache.nim @@ -44,6 +44,8 @@ proc unsubscribe*[K](t: MessageCache[K], topic: K) = return t.table.del(topic) +proc unsubscribeAll*[K](t: MessageCache[K]) = + t.table.clear() proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) = if not t.isSubscribed(topic): diff --git a/waku/node/rest/filter/client.nim b/waku/node/rest/filter/client.nim index a5b53c01f..f4d8fd262 100644 --- a/waku/node/rest/filter/client.nim +++ b/waku/node/rest/filter/client.nim @@ -4,8 +4,10 @@ else: {.push raises: [].} import + json, std/sets, stew/byteutils, + strformat, chronicles, json_serialization, json_serialization/std/options, @@ -19,9 +21,9 @@ import export types logScope: - topics = "waku node rest client" + topics = "waku node rest client v2" -proc encodeBytes*(value: FilterSubscriptionsRequest, +proc encodeBytes*(value: FilterSubscribeRequest, contentType: string): RestResult[seq[byte]] = if MediaType.init(contentType) != MIMETYPE_JSON: error "Unsupported contentType value", contentType = contentType @@ -30,30 +32,68 @@ proc encodeBytes*(value: FilterSubscriptionsRequest, let encoded = ?encodeIntoJsonBytes(value) return ok(encoded) -proc decodeBytes*(t: typedesc[string], value: openarray[byte], - contentType: Opt[ContentTypeData]): RestResult[string] = - if MediaType.init($contentType) != MIMETYPE_TEXT: +proc encodeBytes*(value: FilterSubscriberPing, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: error "Unsupported contentType value", contentType = contentType return err("Unsupported contentType") - var res: string - if len(value) > 0: - res = newString(len(value)) - copyMem(addr res[0], unsafeAddr value[0], len(value)) - return ok(res) + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest): - RestResponse[string] - {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} +proc encodeBytes*(value: FilterUnsubscribeRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest): - RestResponse[string] - {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.} + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) -proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], - data: openArray[byte], +proc encodeBytes*(value: FilterUnsubscribeAllRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[FilterSubscriptionResponse], + value: openarray[byte], + contentType: Opt[ContentTypeData]): + + RestResult[FilterSubscriptionResponse] = + + if MediaType.init($contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let decoded = ?decodeFromJsonBytes(FilterSubscriptionResponse, value) + return ok(decoded) + +proc filterSubscriberPing*(requestId: string): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions/{requestId}", meth: HttpMethod.MethodGet.} + +proc filterPostSubscriptions*(body: FilterSubscribeRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPost.} + +proc filterPutSubscriptions*(body: FilterSubscribeRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPut.} + +proc filterDeleteSubscriptions*(body: FilterUnsubscribeRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodDelete.} + +proc filterDeleteAllSubscriptions*(body: FilterUnsubscribeAllRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions/all", meth: HttpMethod.MethodDelete.} + +proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], + data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] = if MediaType.init($contentType) != MIMETYPE_JSON: error "Unsupported response contentType value", contentType = contentType @@ -62,7 +102,6 @@ proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data) return ok(decoded) -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterGetMessagesV1*(contentTopic: string): - RestResponse[FilterGetMessagesResponse] - {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.} +proc filterGetMessagesV1*(contentTopic: string): + RestResponse[FilterGetMessagesResponse] + {.rest, endpoint: "/filter/v2/messages/{contentTopic}", meth: HttpMethod.MethodGet.} diff --git a/waku/node/rest/filter/handlers.nim b/waku/node/rest/filter/handlers.nim index d61ae52c7..1cfc5b011 100644 --- a/waku/node/rest/filter/handlers.nim +++ b/waku/node/rest/filter/handlers.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + std/strformat, std/sequtils, stew/byteutils, chronicles, @@ -14,24 +15,28 @@ import import ../../../waku_core, ../../../waku_filter, - ../../../waku_filter/client, + ../../../waku_filter_v2, + ../../../waku_filter_v2/client as filter_protocol_client, + ../../../waku_filter_v2/common as filter_protocol_type, ../../message_cache, ../../peer_manager, ../../waku_node, ../serdes, ../responses, ./types - + export types logScope: - topics = "waku node rest filter_api" + topics = "waku node rest filter_api_v2" -const futTimeoutForSubscriptionProcessing* = 5.seconds +const futTimeoutForSubscriptionProcessing* = 5.seconds #### Request handlers -const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" +const ROUTE_FILTER_SUBSCRIPTIONS* = "/filter/v2/subscriptions" + +const ROUTE_FILTER_ALL_SUBSCRIPTIONS* = "/filter/v2/subscriptions/all" const filterMessageCacheDefaultCapacity* = 30 @@ -50,85 +55,258 @@ func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiR let requestResult = decodeFromJsonBytes(T, reqBodyData) if requestResult.isErr(): - return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & $requestResult.error)) return ok(requestResult.get()) -proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - let pushHandler: FilterPushHandler = - proc(pubsubTopic: PubsubTopic, - msg: WakuMessage) {.async, gcsafe, closure.} = - cache.addMessage(msg.contentTopic, msg) +proc getErrorCause(err: filter_protocol_type.FilterSubscribeError): string = + ## Retrieve proper error cause of FilterSubscribeError - due stringify make some parts of text double - router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: - ## Subscribes a node to a list of contentTopics of a pubsubTopic - # debug "post_waku_v2_filter_v1_subscriptions" + case err.kind: + of FilterSubscribeErrorKind.PEER_DIAL_FAILURE: + err.address + of FilterSubscribeErrorKind.BAD_RESPONSE, FilterSubscribeErrorKind.BAD_REQUEST, + FilterSubscribeErrorKind.NOT_FOUND, FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: + err.cause + of FilterSubscribeErrorKind.UNKNOWN: + "UNKNOWN" - let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) +proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): T = + ## Properly convert filter protocol's response to rest response - if decodedBody.isErr(): - return decodedBody.error + if protocolClientRes.isErr(): + return FilterSubscriptionResponse( + requestId: requestId, + statusCode: uint32(protocolClientRes.error().kind), + statusDesc: getErrorCause(protocolClientRes.error()) + ) + else: + return FilterSubscriptionResponse( + requestId: requestId, + statusCode: 0, + statusDesc: "" + ) - let req: FilterSubscriptionsRequest = decodedBody.value() +proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): T = + ## Properly convert filter protocol's response to rest response in case of error - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + return FilterSubscriptionResponse( + requestId: requestId, + statusCode: uint32(protocolClientRes.kind), + statusDesc: $protocolClientRes + ) - if peerOpt.isNone(): - return RestApiResponse.internalServerError("No suitable remote filter peers") +proc convertErrorKindToHttpStatus(kind: filter_protocol_type.FilterSubscribeErrorKind): HttpCode = + ## Filter protocol's error code is not directly convertible to HttpCodes hence this converter - let subFut = node.filterSubscribe(req.pubsubTopic, - req.contentFilters, - pushHandler, + case kind: + of filter_protocol_type.FilterSubscribeErrorKind.UNKNOWN: + return Http200 + of filter_protocol_type.FilterSubscribeErrorKind.PEER_DIAL_FAILURE: + return Http504 #gateway timout + of filter_protocol_type.FilterSubscribeErrorKind.BAD_RESPONSE: + return Http500 # internal server error + of filter_protocol_type.FilterSubscribeErrorKind.BAD_REQUEST: + return Http400 + of filter_protocol_type.FilterSubscribeErrorKind.NOT_FOUND: + return Http404 + of filter_protocol_type.FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: + return Http503 + else: + return Http500 + +proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): RestApiResponse = + let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes) + + var httpStatus : HttpCode = Http200 + + if protocolClientRes.isErr(): + httpStatus = convertErrorKindToHttpStatus(protocolClientRes.error().kind) # TODO: convert status codes! + + let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus) + + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}")) + + return resp.get() + +proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): RestApiResponse = + let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes) + + let httpStatus = convertErrorKindToHttpStatus(protocolClientRes.kind) # TODO: convert status codes! + + let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus) + + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}")) + + return resp.get() + +proc filterPostPutSubscriptionRequestHandler(node: WakuNode, + contentBody: Option[ContentBody], + cache: MessageCache): + Future[RestApiResponse] + {.async.} = + ## handles any filter subscription requests, adds or modifies. + + let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody) + + if decodedBody.isErr(): + return makeRestResponse("unknown", FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}"))) + + let req: FilterSubscribeRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + + if peerOpt.isNone(): + return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers")) + + let subFut = node.filterSubscribe(req.pubsubTopic, + req.contentFilters, peerOpt.get()) - if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): - error "Failed to subscribe to contentFilters do to timeout!" - return RestApiResponse.internalServerError("Failed to subscribe to contentFilters") + if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to subscribe to contentFilters do to timeout!" + return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("Subscription request timed out")) - # Successfully subscribed to all content filters - for cTopic in req.contentFilters: - cache.subscribe(cTopic) + # Successfully subscribed to all content filters + for cTopic in req.contentFilters: + cache.subscribe(cTopic) - return RestApiResponse.ok() + return makeRestResponse(req.requestId, subFut.read()) -proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: +proc installFilterPostSubscriptionsHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a pubsubTopic + debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody + + let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache) + return response + +proc installFilterPutSubscriptionsHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic + debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody + + let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache) + return response + +proc installFilterDeleteSubscriptionsHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Subscribes a node to a list of contentTopics of a PubSub topic - # debug "delete_waku_v2_filter_v1_subscriptions" + debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody - let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) + let decodedBody = decodeRequestBody[FilterUnsubscribeRequest](contentBody) if decodedBody.isErr(): - return decodedBody.error + return makeRestResponse("unknown", + FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}"))) - let req: FilterSubscriptionsRequest = decodedBody.value() + let req: FilterUnsubscribeRequest = decodedBody.value() - let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters) + let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + + if peerOpt.isNone(): + return makeRestResponse(req.requestId, + FilterSubscribeError.serviceUnavailable("No suitable peers")) + + let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get()) if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): error "Failed to unsubscribe from contentFilters due to timeout!" - return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") + return makeRestResponse(req.requestId, + FilterSubscribeError.serviceUnavailable( + "Failed to unsubscribe from contentFilters due to timeout!")) + # Successfully subscribed to all content filters for cTopic in req.contentFilters: cache.unsubscribe(cTopic) # Successfully unsubscribed from all requested contentTopics - return RestApiResponse.ok() + return makeRestResponse(req.requestId, unsubFut.read()) -const ROUTE_RELAY_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" +proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a PubSub topic + debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody -proc installFilterGetMessagesV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: + let decodedBody = decodeRequestBody[FilterUnsubscribeAllRequest](contentBody) + + if decodedBody.isErr(): + return makeRestResponse("unknown", + FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}"))) + + let req: FilterUnsubscribeAllRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + + if peerOpt.isNone(): + return makeRestResponse(req.requestId, + FilterSubscribeError.serviceUnavailable("No suitable peers")) + + let unsubFut = node.filterUnsubscribeAll(peerOpt.get()) + if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to unsubscribe from contentFilters due to timeout!" + return makeRestResponse(req.requestId, + FilterSubscribeError.serviceUnavailable( + "Failed to unsubscribe from all contentFilters due to timeout!")) + + cache.unsubscribeAll() + + # Successfully unsubscribed from all requested contentTopics + return makeRestResponse(req.requestId, unsubFut.read()) + +const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}" + +proc installFilterPingSubscriberHandler(router: var RestRouter, + node: WakuNode) = + router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse: + ## Checks if a node has valid subscription or not. + debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId + + let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + if peerOpt.isNone(): + return makeRestResponse(requestId.get(), + FilterSubscribeError.serviceUnavailable("No suitable remote filter peers")) + + let pingFutRes = node.wakuFilterClient.ping(peerOpt.get()) + + if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to ping filter service peer due to timeout!" + return makeRestResponse(requestId.get(), + FilterSubscribeError.serviceUnavailable("Ping timed out")) + + return makeRestResponse(requestId.get(), pingFutRes.read()) + +const ROUTE_FILTER_MESSAGES* = "/filter/v2/messages/{contentTopic}" + +proc installFilterGetMessagesHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + + + let pushHandler : FilterPushHandler = proc (pubsubTopic: PubsubTopic, + msg: WakuMessage) + {.async, gcsafe, closure.} = + cache.addMessage(msg.contentTopic, msg) + + node.wakuFilterClient.registerPushHandler(pushHandler) + + router.api(MethodGet, ROUTE_FILTER_MESSAGES) do (contentTopic: string) -> RestApiResponse: ## Returns all WakuMessages received on a specified content topic since the ## last time this method was called - ## TODO: ability to specify a return message limit - # debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic + ## TODO: ability to specify a return message limit, maybe use cursor to control paging response. + debug "get", ROUTE_FILTER_MESSAGES, contentTopic=contentTopic if contentTopic.isErr(): return RestApiResponse.badRequest("Missing contentTopic") @@ -147,9 +325,12 @@ proc installFilterGetMessagesV1Handler*(router: var RestRouter, return resp.get() -proc installFilterApiHandlers*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - installFilterPostSubscriptionsV1Handler(router, node, cache) - installFilterDeleteSubscriptionsV1Handler(router, node, cache) - installFilterGetMessagesV1Handler(router, node, cache) +proc installFilterRestApiHandlers*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + installFilterPingSubscriberHandler(router, node) + installFilterPostSubscriptionsHandler(router, node, cache) + installFilterPutSubscriptionsHandler(router, node, cache) + installFilterDeleteSubscriptionsHandler(router, node, cache) + installFilterDeleteAllSubscriptionsHandler(router, node, cache) + installFilterGetMessagesHandler(router, node, cache) diff --git a/waku/node/rest/filter/legacy_client.nim b/waku/node/rest/filter/legacy_client.nim new file mode 100644 index 000000000..9434dd6e0 --- /dev/null +++ b/waku/node/rest/filter/legacy_client.nim @@ -0,0 +1,68 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sets, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../waku_core, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest client v1" + +proc encodeBytes*(value: FilterLegacySubscribeRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[string], value: openarray[byte], + contentType: Opt[ContentTypeData]): RestResult[string] = + if MediaType.init($contentType) != MIMETYPE_TEXT: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + var res: string + if len(value) > 0: + res = newString(len(value)) + copyMem(addr res[0], unsafeAddr value[0], len(value)) + return ok(res) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterPostSubscriptionsV1*(body: FilterLegacySubscribeRequest): + RestResponse[string] + {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterDeleteSubscriptionsV1*(body: FilterLegacySubscribeRequest): + RestResponse[string] + {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.} + +proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], + data: openArray[byte], + contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] = + if MediaType.init($contentType) != MIMETYPE_JSON: + error "Unsupported response contentType value", contentType = contentType + return err("Unsupported response contentType") + + let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data) + return ok(decoded) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterGetMessagesV1*(contentTopic: string): + RestResponse[FilterGetMessagesResponse] + {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.} diff --git a/waku/node/rest/filter/legacy_handlers.nim b/waku/node/rest/filter/legacy_handlers.nim new file mode 100644 index 000000000..92131cb0d --- /dev/null +++ b/waku/node/rest/filter/legacy_handlers.nim @@ -0,0 +1,160 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sequtils, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/route, + presto/common +import + ../../../waku_core, + ../../../waku_filter, + ../../../waku_filter/client, + ../../message_cache, + ../../peer_manager, + ../../waku_node, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest filter_api v1" + +const futTimeoutForSubscriptionProcessing* = 5.seconds + +#### Request handlers + +const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" + +const filterMessageCacheDefaultCapacity* = 30 + +type + MessageCache* = message_cache.MessageCache[ContentTopic] + +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + if contentBody.isNone(): + return err(RestApiResponse.badRequest("Missing content body")) + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) + + let reqBodyData = contentBody.get().data + + let requestResult = decodeFromJsonBytes(T, reqBodyData) + if requestResult.isErr(): + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + $requestResult.error)) + + return ok(requestResult.get()) + +proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + let pushHandler: FilterPushHandler = + proc(pubsubTopic: PubsubTopic, + msg: WakuMessage) {.async, gcsafe, closure.} = + cache.addMessage(msg.contentTopic, msg) + + router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a pubsubTopic + debug "post", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody + + let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error + + let req: FilterLegacySubscribeRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) + + if peerOpt.isNone(): + return RestApiResponse.internalServerError("No suitable remote filter peers") + + let subFut = node.legacyFilterSubscribe(req.pubsubTopic, + req.contentFilters, + pushHandler, + peerOpt.get()) + + if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to subscribe to contentFilters do to timeout!" + return RestApiResponse.internalServerError("Failed to subscribe to contentFilters") + + # Successfully subscribed to all content filters + for cTopic in req.contentFilters: + cache.subscribe(cTopic) + + return RestApiResponse.ok() + +proc installFilterV1DeleteSubscriptionsV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a PubSub topic + debug "delete", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody + + let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error + + let req: FilterLegacySubscribeRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) + + if peerOpt.isNone(): + return RestApiResponse.internalServerError("No suitable remote filter peers") + + let unsubFut = node.legacyFilterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get()) + if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to unsubscribe from contentFilters due to timeout!" + return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") + + for cTopic in req.contentFilters: + cache.unsubscribe(cTopic) + + # Successfully unsubscribed from all requested contentTopics + return RestApiResponse.ok() + +const ROUTE_FILTER_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" + +proc installFilterV1GetMessagesV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodGet, ROUTE_FILTER_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: + ## Returns all WakuMessages received on a specified content topic since the + ## last time this method was called + ## TODO: ability to specify a return message limit + debug "get", ROUTE_FILTER_MESSAGESV1, contentTopic=contentTopic + + if contentTopic.isErr(): + return RestApiResponse.badRequest("Missing contentTopic") + + let contentTopic = contentTopic.get() + + let msgRes = cache.getMessages(contentTopic, clear=true) + if msgRes.isErr(): + return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic) + + let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage)) + let resp = RestApiResponse.jsonResponse(data, status=Http200) + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError("An error ocurred while building the json respose") + + return resp.get() + +proc installLegacyFilterRestApiHandlers*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + installFilterV1PostSubscriptionsV1Handler(router, node, cache) + installFilterV1DeleteSubscriptionsV1Handler(router, node, cache) + installFilterV1GetMessagesV1Handler(router, node, cache) diff --git a/waku/node/rest/filter/openapi.yaml b/waku/node/rest/filter/openapi.yaml index d913eb08a..49ee6d967 100644 --- a/waku/node/rest/filter/openapi.yaml +++ b/waku/node/rest/filter/openapi.yaml @@ -1,6 +1,6 @@ openapi: 3.0.3 info: - title: Waku V2 node REST API + title: Waku V2 node REST API version: 1.0.0 contact: name: VAC Team @@ -10,6 +10,8 @@ tags: description: Filter REST API for WakuV2 node paths: + # Legacy support for v1 waku filter + # TODO: legacy endpoint, remove in the future /filter/v1/subscriptions: post: # post_waku_v2_filter_v1_subscription summary: Subscribe a node to an array of topics @@ -21,7 +23,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterSubscriptionsRequest' + $ref: '#/components/schemas/FilterLegacySubscribeRequest' responses: '200': description: OK @@ -32,8 +34,16 @@ paths: # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + text/plain: + schema: + type: string '5XX': description: Unexpected error. + content: + text/plain: + schema: + type: string delete: # delete_waku_v2_filter_v1_subscription summary: Unsubscribe a node from an array of topics @@ -45,7 +55,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterSubscriptionsRequest' + $ref: '#/components/schemas/FilterLegacySubscribeRequest' responses: '200': description: OK @@ -56,12 +66,24 @@ paths: # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + text/plain: + schema: + type: string '404': description: Not found. + content: + text/plain: + schema: + type: string '5XX': description: Unexpected error. + content: + text/plain: + schema: + type: string - # TODO: Review the path of this endpoint due maybe query for list of contentTopics matching + # TODO: legacy endpoint, remove in the future /filter/v1/messages/{contentTopic}: get: # get_waku_v2_filter_v1_messages summary: Get the latest messages on the polled content topic @@ -86,10 +108,270 @@ paths: # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + text/plain: + schema: + type: string '404': description: Not found. + content: + text/plain: + schema: + type: string '5XX': description: Unexpected error. + content: + text/plain: + schema: + type: string + + /filter/v2/subscriptions/{requestId}: + get: # get_waku_v2_filter_v2_subscription - ping + summary: Subscriber-ping - a peer can query if there is a registered subscription for it + description: | + Subscriber peer can query its subscription existence on service node. + Returns HTTP200 if exists and HTTP404 if not. + Client must not fill anything but requestId in the request body. + operationId: subscriberPing + tags: + - filter + parameters: + - in: path + name: requestId + required: true + schema: + type: string + description: Id of ping request + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + /filter/v2/subscriptions: + post: # post_waku_v2_filter_v2_subscription + summary: Subscribe a peer to an array of content topics under a pubsubTopic + description: | + Subscribe a peer to an array of content topics under a pubsubTopic. + + It is allowed to refresh or add new content topic to an existing subscription. + + Fields pubsubTopic and contentFilters must be filled. + operationId: postSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + put: # put_waku_v2_filter_v2_subscription + summary: Modify existing subscription of a peer under a pubsubTopic + description: | + Modify existing subscription of a peer under a pubsubTopic. + + It is allowed to refresh or add new content topic to an existing subscription. + + Fields pubsubTopic and contentFilters must be filled. + operationId: putSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + + delete: # delete_waku_v2_filter_v2_subscription + summary: Unsubscribe a peer from content topics + description: | + Unsubscribe a peer from content topics + Only that subscription will be removed which matches existing. + operationId: deleteSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterUnsubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + /filter/v2/subscriptions/all: + delete: # delete_waku_v2_filter_v2_subscription + summary: Unsubscribe a peer from all content topics + description: | + Unsubscribe a peer from all content topics + operationId: deleteAllSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterUnsubscribeAllRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + /filter/v2/messages/{contentTopic}: + get: # get_waku_v2_filter_v2_messages + summary: Get the latest messages on the polled content topic + description: Get a list of messages that were received on a subscribed content topic after the last time this method was called. + operationId: getMessagesByTopic + tags: + - filter + parameters: + - in: path + name: contentTopic # Note the name is the same as in the path + required: true + schema: + type: string + description: Content topic of message + responses: + '200': + description: The latest messages on the polled topic. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterGetMessagesResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + text/plain: + schema: + type: string + '404': + description: Not found. + content: + text/plain: + schema: + type: string + '5XX': + description: Unexpected error. + content: + text/plain: + schema: + type: string components: schemas: @@ -97,7 +379,7 @@ components: type: string ContentTopic: type: string - + FilterWakuMessage: type: object properties: @@ -113,19 +395,68 @@ components: required: - payload - FilterSubscriptionsRequest: + FilterLegacySubscribeRequest: type: object - properties: + properties: contentFilters: type: array items: $ref: '#/components/schemas/ContentTopic' pubsubTopic: $ref: "#/components/schemas/PubSubTopic" - required: + required: - contentFilters - + FilterGetMessagesResponse: type: array items: - $ref: '#/components/schemas/FilterWakuMessage' \ No newline at end of file + $ref: '#/components/schemas/FilterWakuMessage' + + FilterSubscribeRequest: + type: object + properties: + requestId: + type: string + contentFilters: + type: array + items: + $ref: '#/components/schemas/ContentTopic' + pubsubTopic: + $ref: "#/components/schemas/PubSubTopic" + required: + - requestId + - contentFilters + - pubsubTopic + + FilterUnsubscribeRequest: + type: object + properties: + requestId: + type: string + contentFilters: + type: array + items: + $ref: '#/components/schemas/ContentTopic' + pubsubTopic: + $ref: "#/components/schemas/PubSubTopic" + required: + - requestId + - contentFilters + + FilterUnsubscribeAllRequest: + type: object + properties: + requestId: + type: string + required: + - requestId + + FilterSubscriptionResponse: + type: object + properties: + requestId: + type: string + statusDesc: + type: string + required: + - requestId diff --git a/waku/node/rest/filter/types.nim b/waku/node/rest/filter/types.nim index 595c8b180..a7a4fa585 100644 --- a/waku/node/rest/filter/types.nim +++ b/waku/node/rest/filter/types.nim @@ -8,7 +8,8 @@ import chronicles, json_serialization, json_serialization/std/options, - presto/[route, client, common] + presto/[route, client, common], + libp2p/peerid import ../../../common/base64, ../../../waku_core, @@ -24,10 +25,32 @@ type FilterWakuMessage* = object type FilterGetMessagesResponse* = seq[FilterWakuMessage] -type FilterSubscriptionsRequest* = object +type FilterLegacySubscribeRequest* = object + # Subscription request for legacy filter support pubsubTopic*: Option[PubSubTopic] contentFilters*: seq[ContentTopic] +type FilterSubscriberPing* = object + requestId*: string + +type FilterSubscribeRequest* = object + requestId*: string + pubsubTopic*: Option[PubSubTopic] + contentFilters*: seq[ContentTopic] + +type FilterUnsubscribeRequest* = object + requestId*: string + pubsubTopic*: Option[PubSubTopic] + contentFilters*: seq[ContentTopic] + +type FilterUnsubscribeAllRequest* = object + requestId*: string + +type FilterSubscriptionResponse* = object + requestId*: string + statusCode*: uint32 + statusDesc*: string + #### Type conversion proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage = @@ -65,7 +88,7 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage) writer.writeField("timestamp", value.timestamp) writer.endRecord() -proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest) +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterLegacySubscribeRequest) {.raises: [IOError].} = writer.beginRecord() writer.writeField("pubsubTopic", value.pubsubTopic) @@ -114,8 +137,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage) timestamp: timestamp ) -proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest) - {.raises: [SerializationError, IOError].} = +proc readValue*(reader: var JsonReader[RestJson], value: var FilterLegacySubscribeRequest) + {.raises: [SerializationError, IOError].} = var pubsubTopic = none(PubsubTopic) contentFilters = none(seq[ContentTopic]) @@ -126,7 +149,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions if keys.containsOrIncl(fieldName): let err = try: fmt"Multiple `{fieldName}` fields found" except CatchableError: "Multiple fields with the same name found" - reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest") + reader.raiseUnexpectedField(err, "FilterLegacySubscribeRequest") case fieldName of "pubsubTopic": @@ -136,8 +159,70 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions else: unrecognizedFieldWarning() - if pubsubTopic.isNone(): - reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") + if contentFilters.isNone(): + reader.raiseUnexpectedValue("Field `contentFilters` is missing") + + if contentFilters.get().len() == 0: + reader.raiseUnexpectedValue("Field `contentFilters` is empty") + + value = FilterLegacySubscribeRequest( + pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), + contentFilters: contentFilters.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriberPing) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterSubscriberPing") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterSubscriberPing( + requestId: requestId.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscribeRequest) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + pubsubTopic = none(PubsubTopic) + contentFilters = none(seq[ContentTopic]) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterSubscribeRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "contentFilters": + contentFilters = some(reader.readValue(seq[ContentTopic])) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") if contentFilters.isNone(): reader.raiseUnexpectedValue("Field `contentFilters` is missing") @@ -145,7 +230,108 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions if contentFilters.get().len() == 0: reader.raiseUnexpectedValue("Field `contentFilters` is empty") - value = FilterSubscriptionsRequest( - pubsubTopic: if pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), + value = FilterSubscribeRequest( + requestId: requestId.get(), + pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), contentFilters: contentFilters.get() - ) \ No newline at end of file + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeRequest) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + pubsubTopic = none(PubsubTopic) + contentFilters = none(seq[ContentTopic]) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterUnsubscribeRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "contentFilters": + contentFilters = some(reader.readValue(seq[ContentTopic])) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + if contentFilters.isNone(): + reader.raiseUnexpectedValue("Field `contentFilters` is missing") + + if contentFilters.get().len() == 0: + reader.raiseUnexpectedValue("Field `contentFilters` is empty") + + value = FilterUnsubscribeRequest( + requestId: requestId.get(), + pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), + contentFilters: contentFilters.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeAllRequest) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterUnsubscribeAllRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterUnsubscribeAllRequest( + requestId: requestId.get(), + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionResponse) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + statusCode = none(uint32) + statusDesc = none(string) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterSubscriptionResponse") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + of "statusCode": + statusCode = some(reader.readValue(uint32)) + of "statusDesc": + statusDesc = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterSubscriptionResponse( + requestId: requestId.get(), + statusCode: statusCode.get(), + statusDesc: statusDesc.get("") + ) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 5d9aa54e9..2be13113e 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -31,8 +31,9 @@ import ../waku_store, ../waku_store/client as store_client, ../waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed - ../waku_filter/client as filter_client, #TODO: support for legacy filter protocol will be removed + ../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed ../waku_filter_v2, + ../waku_filter_v2/client as filter_client, ../waku_lightpush, ../waku_lightpush/client as lightpush_client, ../waku_enr, @@ -41,7 +42,8 @@ import ../waku_rln_relay, ./config, ./peer_manager, - ./waku_switch + ./waku_switch, + ./rest/relay/topic_cache declarePublicCounter waku_node_messages, "number of messages received", ["type"] @@ -88,8 +90,9 @@ type wakuStore*: WakuStore wakuStoreClient*: WakuStoreClient wakuFilter*: waku_filter_v2.WakuFilter + wakuFilterClient*: filter_client.WakuFilterClient wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed - wakuFilterClientLegacy*: WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed + wakuFilterClientLegacy*: legacy_filter_client.WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush wakuLightpushClient*: WakuLightPushClient @@ -335,10 +338,10 @@ proc mountRelay*(node: WakuNode, for topic in topics: node.subscribe(topic) - ## Waku filter -proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} = +proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) + {.async, raises: [Defect, LPError]} = info "mounting filter protocol" node.wakuFilter = WakuFilter.new(node.peerManager) node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy @@ -348,32 +351,47 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) { await node.wakuFilterLegacy.start() #TODO: remove legacy node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec)) - node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuFilterCodec)) #TODO: remove legacy + node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec)) #TODO: remove legacy -proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.}= - if node.wakuFilter.isNil(): +proc filterHandleMessage*(node: WakuNode, + pubsubTopic: PubsubTopic, + message: WakuMessage) + {.async.}= + + if node.wakuFilter.isNil() or node.wakuFilterLegacy.isNil(): error "cannot handle filter message", error="waku filter is nil" return - await node.wakuFilter.handleMessage(pubsubTopic, message) - await node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy - + await allFutures(node.wakuFilter.handleMessage(pubsubTopic, message), + node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy + ) proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = + ## Mounting both filter clients v1 - legacy and v2. + ## Giving option for application level to chose btw own push message handling or + ## rely on node provided cache. - This only applies for v2 filter client info "mounting filter client" node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng) + node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) + if node.started: - # Node has started already. Let's start filter too. - await node.wakuFilterClientLegacy.start() + await allFutures(node.wakuFilterClientLegacy.start(), node.wakuFilterClient.start()) - node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec)) + node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec)) + node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuLegacyFilterCodec)) -proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], - handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = - ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. +proc legacyFilterSubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: ContentTopic|seq[ContentTopic], + handler: FilterPushHandler, + peer: RemotePeerInfo|string) + {.async, gcsafe, raises: [Defect, ValueError].} = + + ## Registers for messages that match a specific filter. + ## Triggers the handler whenever a message is received. if node.wakuFilterClientLegacy.isNil(): - error "cannot register filter subscription to topic", error="waku filter client is nil" + error "cannot register filter subscription to topic", error="waku legacy filter client is not set up" return let remotePeerRes = parsePeerInfo(peer) @@ -385,21 +403,31 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled # TODO: Move this logic to wakunode2 app - let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = - if node.wakuRelay.isNil() and not node.wakuStore.isNil(): - await node.wakuArchive.handleMessage(pubSubTopic, message) - - await handler(pubsubTopic, message) + # FIXME: This part needs refactoring. It seems possible that in special cases archiver will store same message multiple times. + let handlerWrapper: FilterPushHandler = + if node.wakuRelay.isNil() and not node.wakuStore.isNil(): + proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = + await allFutures(node.wakuArchive.handleMessage(pubSubTopic, message), + handler(pubsubTopic, message)) + else: + handler if pubsubTopic.isSome(): - info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + info "registering legacy filter subscription to content", + pubsubTopic=pubsubTopic.get(), + contentTopics=contentTopics, + peer=remotePeer.peerId - let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(), contentTopics, handlerWrapper, peer=remotePeer) + let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(), + contentTopics, + handlerWrapper, + peer=remotePeer) if res.isOk(): - info "subscribed to topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics + info "subscribed to topic", pubsubTopic=pubsubTopic.get(), + contentTopics=contentTopics else: - error "failed filter subscription", error=res.error + error "failed legacy filter subscription", error=res.error waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) else: let topicMapRes = parseSharding(pubsubTopic, contentTopics) @@ -412,7 +440,11 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT var futures = collect(newSeq): for pubsub, topics in topicMap.pairs: - info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId + info "registering legacy filter subscription to content", + pubsubTopic=pubsub, + contentTopics=topics, + peer=remotePeer.peerId + let content = topics.mapIt($it) node.wakuFilterClientLegacy.subscribe($pubsub, content, handlerWrapper, peer=remotePeer) @@ -422,15 +454,82 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT let res = fut.read() if res.isErr(): - error "failed filter subscription", error=res.error + error "failed legacy filter subscription", error=res.error waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) for pubsub, topics in topicMap.pairs: info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics -proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], - peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = - ## Unsubscribe from a content filter. +proc filterSubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: ContentTopic|seq[ContentTopic], + peer: RemotePeerInfo|string): + + Future[FilterSubscribeResult] + + {.async, gcsafe, raises: [Defect, ValueError].} = + + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + if node.wakuFilterClient.isNil(): + error "cannot register filter subscription to topic", error="waku filter client is not set up" + return err(FilterSubscribeError.serviceUnavailable()) + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "Couldn't parse the peer info properly", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + if pubsubTopic.isSome(): + info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + + let subRes = await node.wakuFilterClient.subscribe(remotePeer, pubsubTopic.get(), contentTopics) + if subRes.isOk(): + info "v2 subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + else: + error "failed filter v2 subscription", error=subRes.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + + return subRes + else: + let topicMapRes = parseSharding(pubsubTopic, contentTopics) + + let topicMap = + if topicMapRes.isErr(): + error "can't get shard", error=topicMapRes.error + return err(FilterSubscribeError.badResponse("can't get shard")) + else: topicMapRes.get() + + var futures = collect(newSeq): + for pubsub, topics in topicMap.pairs: + info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId + let content = topics.mapIt($it) + node.wakuFilterClient.subscribe(remotePeer, $pubsub, content) + + let finished = await allFinished(futures) + + var subRes: FilterSubscribeResult = FilterSubscribeResult.ok() + for fut in finished: + let res = fut.read() + + if res.isErr(): + error "failed filter subscription", error=res.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + subRes = FilterSubscribeResult.err(res.error) + + for pubsub, topics in topicMap.pairs: + info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics + + # return the last error or ok + return subRes + +proc legacyFilterUnsubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: ContentTopic|seq[ContentTopic], + peer: RemotePeerInfo|string) + {.async, gcsafe, raises: [Defect, ValueError].} = + ## Unsubscribe from a content legacy filter. if node.wakuFilterClientLegacy.isNil(): error "cannot unregister filter subscription to content", error="waku filter client is nil" return @@ -443,7 +542,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten let remotePeer = remotePeerRes.value if pubsubTopic.isSome(): - info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + info "deregistering legacy filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer) @@ -479,35 +578,103 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten for pubsub, topics in topicMap.pairs: info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics -# TODO: Move to application module (e.g., wakunode2.nim) -proc subscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe, - deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} = - ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - if node.wakuFilterClientLegacy.isNil(): - error "cannot register filter subscription to topic", error="waku filter client is nil" - return +proc filterUnsubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: RemotePeerInfo|string): - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) - if peerOpt.isNone(): - error "cannot register filter subscription to topic", error="no suitable remote peers" - return + Future[FilterSubscribeResult] - await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get()) + {.async, gcsafe, raises: [Defect, ValueError].} = -# TODO: Move to application module (e.g., wakunode2.nim) -proc unsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe, - deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} = - ## Unsubscribe from a content filter. + ## Unsubscribe from a content filter V2". if node.wakuFilterClientLegacy.isNil(): error "cannot unregister filter subscription to content", error="waku filter client is nil" - return + return err(FilterSubscribeError.serviceUnavailable()) - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) - if peerOpt.isNone(): - error "cannot register filter subscription to topic", error="no suitable remote peers" - return + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "couldn't parse remotePeerInfo", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) - await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get()) + let remotePeer = remotePeerRes.value + + if pubsubTopic.isSome(): + info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + + let unsubRes = await node.wakuFilterClient.unsubscribe(remotePeer, pubsubTopic.get(), contentTopics) + if unsubRes.isOk(): + info "unsubscribed from topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics + else: + error "failed filter unsubscription", error=unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + return unsubRes + + else: # pubsubTopic.isNone + let topicMapRes = parseSharding(pubsubTopic, contentTopics) + + let topicMap = + if topicMapRes.isErr(): + error "can't get shard", error = topicMapRes.error + return err(FilterSubscribeError.badResponse("can't get shard")) + else: topicMapRes.get() + + var futures = collect(newSeq): + for pubsub, topics in topicMap.pairs: + info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId + let content = topics.mapIt($it) + node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content) + + let finished = await allFinished(futures) + + var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok() + for fut in finished: + let res = fut.read() + + if res.isErr(): + error "failed filter unsubscription", error=res.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + unsubRes = FilterSubscribeResult.err(res.error) + + for pubsub, topics in topicMap.pairs: + info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics + + # return the last error or ok + return unsubRes + +proc filterUnsubscribeAll*(node: WakuNode, + peer: RemotePeerInfo|string): + + Future[FilterSubscribeResult] + + {.async, gcsafe, raises: [Defect, ValueError].} = + + ## Unsubscribe from a content filter V2". + if node.wakuFilterClientLegacy.isNil(): + error "cannot unregister filter subscription to content", error="waku filter client is nil" + return err(FilterSubscribeError.serviceUnavailable()) + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "couldn't parse remotePeerInfo", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + info "deregistering all filter subscription to content", peer=remotePeer.peerId + + let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer) + if unsubRes.isOk(): + info "unsubscribed from all content-topic", peerId=remotePeer.peerId + else: + error "failed filter unsubscription from all content-topic", error=unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + return unsubRes + +# NOTICE: subscribe / unsubscribe methods are removed - they were already depricated +# yet incompatible to handle both type of filters - use specific filter registration instead ## Waku archive const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 19520ebcb..bb918b368 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -17,8 +17,8 @@ import libp2p/multicodec, libp2p/peerid, libp2p/peerinfo, - libp2p/routing_record - + libp2p/routing_record, + json_serialization type Connectedness* = enum @@ -62,6 +62,9 @@ type RemotePeerInfo* = ref object func `$`*(remotePeerInfo: RemotePeerInfo): string = $remotePeerInfo.peerId +proc writeValue*(w: var JsonWriter, value: RemotePeerInfo) {.inline, raises: [IOError].} = + w.writeValue $value + proc init*( T: typedesc[RemotePeerInfo], peerId: PeerID, diff --git a/waku/waku_filter/client.nim b/waku/waku_filter/client.nim index ae7b46b66..40ba73119 100644 --- a/waku/waku_filter/client.nim +++ b/waku/waku_filter/client.nim @@ -20,7 +20,6 @@ import ./protocol, ./protocol_metrics - logScope: topics = "waku filter client" @@ -71,7 +70,7 @@ proc initProtocolHandler(wf: WakuFilterClientLegacy) = wf.handleMessagePush(peerId, requestId, push) wf.handler = handle - wf.codec = WakuFilterCodec + wf.codec = WakuLegacyFilterCodec proc new*(T: type WakuFilterClientLegacy, peerManager: PeerManager, @@ -87,7 +86,7 @@ proc new*(T: type WakuFilterClientLegacy, proc sendFilterRpc(wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec) if connOpt.isNone(): return err(dialFailure) let connection = connOpt.get() @@ -155,6 +154,8 @@ proc unsubscribe*(wf: WakuFilterClientLegacy, if sendRes.isErr(): return err(sendRes.error) + # FIXME: I see an issue here that such solution prevents filtering client to properly manage its + # subscriptions on different peers and get notified correctly! for topic in topics: wf.subManager.removeSubscription(pubsubTopic, topic) diff --git a/waku/waku_filter/protocol.nim b/waku/waku_filter/protocol.nim index 9ad6818a0..62c0d618a 100644 --- a/waku/waku_filter/protocol.nim +++ b/waku/waku_filter/protocol.nim @@ -20,7 +20,7 @@ logScope: const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" + WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1" WakuFilterTimeout: Duration = 2.hours @@ -60,8 +60,6 @@ proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsu ## Protocol type - MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} - WakuFilterLegacy* = ref object of LPProtocol rng*: ref rand.HmacDrbgContext peerManager*: PeerManager @@ -110,7 +108,7 @@ proc initProtocolHandler(wf: WakuFilterLegacy) = wf.handleFilterRequest(conn.peerId, rpc) wf.handler = handler - wf.codec = WakuFilterCodec + wf.codec = WakuLegacyFilterCodec proc new*(T: type WakuFilterLegacy, peerManager: PeerManager, @@ -131,7 +129,7 @@ proc init*(T: type WakuFilterLegacy, proc sendFilterRpc(wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec) if connOpt.isNone(): return err(dialFailure) let connection = connOpt.get() diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 5a7d86d90..72da09048 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -23,19 +23,21 @@ logScope: topics = "waku filter client" type - MessagePushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} WakuFilterClient* = ref object of LPProtocol rng: ref HmacDrbgContext - messagePushHandler: MessagePushHandler peerManager: PeerManager + pushHandlers: seq[FilterPushHandler] func generateRequestId(rng: ref HmacDrbgContext): string = var bytes: array[10, byte] hmacDrbgGenerate(rng[], bytes) return toHex(bytes) -proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, filterSubscribeRequest: FilterSubscribeRequest): Future[FilterSubscribeResult] {.async.} = - trace "Sending filter subscribe request", servicePeer, filterSubscribeRequest +proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, + filterSubscribeRequest: FilterSubscribeRequest): + Future[FilterSubscribeResult] + {.async.} = + trace "Sending filter subscribe request", peerId=servicePeer.peerId, filterSubscribeRequest let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec) if connOpt.isNone(): @@ -77,7 +79,13 @@ proc ping*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSub return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) -proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} = +proc subscribe*(wfc: WakuFilterClient, + servicePeer: RemotePeerInfo, + pubsubTopic: PubsubTopic, + contentTopics: seq[ContentTopic]): + Future[FilterSubscribeResult] + {.async.} = + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.subscribe( requestId = requestId, @@ -87,7 +95,13 @@ proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) -proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} = +proc unsubscribe*(wfc: WakuFilterClient, + servicePeer: RemotePeerInfo, + pubsubTopic: PubsubTopic, + contentTopics: seq[ContentTopic]): + Future[FilterSubscribeResult] + {.async.} = + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe( requestId = requestId, @@ -97,7 +111,10 @@ proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopi return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) -proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} = +proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): + Future[FilterSubscribeResult] + {.async.} = + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.unsubscribeAll( requestId = requestId @@ -105,6 +122,9 @@ proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) +proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) = + wfc.pushHandlers.add(handler) + proc initProtocolHandler(wfc: WakuFilterClient) = proc handler(conn: Connection, proto: string) {.async.} = @@ -119,7 +139,9 @@ proc initProtocolHandler(wfc: WakuFilterClient) = let messagePush = decodeRes.value #TODO: toAPI() split here trace "Received message push", peerId=conn.peerId, messagePush - wfc.messagePushHandler(messagePush.pubsubTopic, messagePush.wakuMessage) + for handler in wfc.pushHandlers: + asyncSpawn handler(messagePush.pubsubTopic, + messagePush.wakuMessage) # Protocol specifies no response for now return @@ -128,14 +150,14 @@ proc initProtocolHandler(wfc: WakuFilterClient) = wfc.codec = WakuFilterPushCodec proc new*(T: type WakuFilterClient, - rng: ref HmacDrbgContext, - messagePushHandler: MessagePushHandler, - peerManager: PeerManager): T = + peerManager: PeerManager, + rng: ref HmacDrbgContext + ): T = let wfc = WakuFilterClient( rng: rng, - messagePushHandler: messagePushHandler, - peerManager: peerManager + peerManager: peerManager, + pushHandlers: @[] ) wfc.initProtocolHandler() wfc diff --git a/waku/waku_filter_v2/rpc.nim b/waku/waku_filter_v2/rpc.nim index e6718dfa9..7807651b7 100644 --- a/waku/waku_filter_v2/rpc.nim +++ b/waku/waku_filter_v2/rpc.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + json_serialization, std/options import ../waku_core @@ -70,3 +71,29 @@ proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T = statusCode: 200, statusDesc: some(desc) ) + +proc `$`*(err: FilterSubscribeResponse): string = + "FilterSubscribeResponse of req:" & err.requestId & " [" & $err.statusCode & "] " & $err.statusDesc + +proc `$`*(req: FilterSubscribeRequest): string = + "FilterSubscribeRequest of req:" & req.requestId & " [" & $req.filterSubscribeType & "] " & $req.pubsubTopic + +proc `$`*(t: FilterSubscribeType): string = + result = case t: + of SUBSCRIBER_PING: "SUBSCRIBER_PING" + of SUBSCRIBE: "SUBSCRIBE" + of UNSUBSCRIBE: "UNSUBSCRIBE" + of UNSUBSCRIBE_ALL: "UNSUBSCRIBE_ALL" + +proc writeValue*(writer: var JsonWriter, value: FilterSubscribeRequest) {.inline, raises: [IOError].} = + writer.beginRecord() + writer.writeField("requestId", value.requestId) + writer.writeField("type", value.filterSubscribeType) + if value.pubsubTopic.isSome: + writer.writeField("pubsubTopic", value.pubsubTopic) + if value.contentTopics.len>0: + writer.writeField("contentTopics", value.contentTopics) + writer.endRecord() + + +