diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index d7ebb3a91..2d0764392 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -262,10 +262,12 @@ proc writeAndPrint(c: Chat) {.async.} = echo "You are now known as " & c.nick elif line.startsWith("/exit"): - if not c.node.wakuFilter.isNil(): + if not c.node.wakuFilterLegacy.isNil(): echo "unsubscribing from content filters..." - await c.node.unsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic) + let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec) + if peerOpt.isSome(): + await c.node.legacyFilterUnsubscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=c.contentTopic, peer=peerOpt.get()) echo "quitting..." @@ -464,14 +466,18 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = if peerInfo.isOk(): await node.mountFilter() await node.mountFilterClient() - node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec) + node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec) proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = trace "Hit filter handler", contentTopic=msg.contentTopic chat.printReceivedMessage(msg) - await node.subscribe(pubsubTopic=some(DefaultPubsubTopic), contentTopics=chat.contentTopic, filterHandler) - + await node.legacyFilterSubscribe(pubsubTopic=some(DefaultPubsubTopic), + contentTopics=chat.contentTopic, + filterHandler, + peerInfo.value) + # TODO: Here to support FilterV2 relevant subscription, but still + # Legacy Filter is concurrent to V2 untill legacy filter will be removed else: error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error @@ -485,7 +491,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = chat.printReceivedMessage(msg) let topic = DefaultPubsubTopic - await node.subscribe(some(topic), @[ContentTopic("")], handler) + node.subscribe(topic, handler) if conf.rlnRelay: info "WakuRLNRelay is enabled" diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 4c9f8ba48..b67ee2be3 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -18,6 +18,7 @@ import ../../../waku/waku_node, ../../../waku/node/peer_manager, ../../waku/waku_filter, + ../../waku/waku_filter_v2, ../../waku/waku_store, # Chat 2 imports ../chat2/chat2, @@ -297,7 +298,8 @@ when isMainModule: if conf.filternode != "": let filterPeer = parsePeerInfo(conf.filternode) if filterPeer.isOk(): - bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterCodec) + bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec) + bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuFilterSubscribeCodec) else: error "Error parsing conf.filternode", error = filterPeer.error diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index c976e7de4..6cf55097b 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -37,6 +37,8 @@ import ../../waku/waku_store, ../../waku/waku_lightpush, ../../waku/waku_filter, + ../../waku/waku_filter_v2, + ../../waku/waku_filter_v2/client as waku_filter_client, ./wakunode2_validator_signed, ./internal_config, ./external_config @@ -46,6 +48,7 @@ import ../../waku/node/rest/debug/handlers as rest_debug_api, ../../waku/node/rest/relay/handlers as rest_relay_api, ../../waku/node/rest/relay/topic_cache, + ../../waku/node/rest/filter/legacy_handlers as rest_legacy_filter_api, ../../waku/node/rest/filter/handlers as rest_filter_api, ../../waku/node/rest/store/handlers as rest_store_api, ../../waku/node/rest/health/handlers as rest_health_api, @@ -470,8 +473,9 @@ proc setupProtocols(node: WakuNode, if conf.filternode != "": let filterNode = parsePeerInfo(conf.filternode) if filterNode.isOk(): - await mountFilterClient(node) - node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec) + await node.mountFilterClient() + node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec) + node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) else: return err("failed to set node waku filter peer: " & filterNode.error) @@ -577,8 +581,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo ## Filter REST API if conf.filter: - let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity) - installFilterApiHandlers(server.router, app.node, filterCache) + let legacyFilterCache = rest_legacy_filter_api.MessageCache.init() + rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache) + + let filterCache = rest_filter_api.MessageCache.init() + rest_filter_api.installFilterRestApiHandlers(server.router, app.node, filterCache) ## Store REST API installStoreApiHandlers(server.router, app.node) diff --git a/examples/filter_subscriber.nim b/examples/filter_subscriber.nim index 6c01ec6d0..8b0a05c97 100644 --- a/examples/filter_subscriber.nim +++ b/examples/filter_subscriber.nim @@ -28,13 +28,15 @@ proc unsubscribe(wfc: WakuFilterClient, else: notice "unsubscribe request successful" -proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) = +proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) + {.async, gcsafe.} = let payloadStr = string.fromBytes(message.payload) notice "message received", payload=payloadStr, pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, timestamp=message.timestamp + proc maintainSubscription(wfc: WakuFilterClient, filterPeer: RemotePeerInfo, filterPubsubTopic: PubsubTopic, @@ -68,11 +70,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) = var switch = newStandardSwitch() pm = PeerManager.new(switch) - wfc = WakuFilterClient.new(rng, messagePushHandler, pm) + wfc = WakuFilterClient.new(pm, rng) # Mount filter client protocol switch.mount(wfc) + wfc.registerPushHandler(messagePushHandler) + # Start maintaining subscription asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 0fc247807..6e6cb6037 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -58,8 +58,8 @@ import ./test_waku_lightpush, ./test_wakunode_lightpush, # Waku Filter - ./test_waku_filter, - ./test_wakunode_filter, + ./test_waku_filter_legacy, + ./test_wakunode_filter_legacy, ./test_waku_peer_exchange, ./test_peer_store_extended, ./test_message_cache, @@ -95,7 +95,9 @@ import ./wakunode_rest/test_rest_relay, ./wakunode_rest/test_rest_relay_serdes, ./wakunode_rest/test_rest_serdes, - ./wakunode_rest/test_rest_store + ./wakunode_rest/test_rest_store, + ./wakunode_rest/test_rest_filter, + ./wakunode_rest/test_rest_legacy_filter import ./waku_rln_relay/test_waku_rln_relay, diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 4b85babf7..cbd6d7ac2 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -52,7 +52,7 @@ procSuite "Peer Manager": await allFutures(nodes.mapIt(it.mountFilter())) # Dial node2 from node1 - let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec) + let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec) # Check connection check: conn.isSome() @@ -81,12 +81,12 @@ procSuite "Peer Manager": let nonExistentPeer = nonExistentPeerRes.value # Dial non-existent peer from node1 - let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuFilterCodec) + let conn1 = await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec) check: conn1.isNone() # Dial peer not supporting given protocol - let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterCodec) + let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec) check: conn2.isNone() @@ -109,14 +109,14 @@ procSuite "Peer Manager": node.mountStoreClient() node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) - node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec) + node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec) # Check peers were successfully added to peer manager check: node.peerManager.peerStore.peers().len == 2 - node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and + node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt(it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and - it.protocols.contains(WakuFilterCodec)) + it.protocols.contains(WakuLegacyFilterCodec)) node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and it.protocols.contains(WakuStoreCodec)) @@ -429,7 +429,7 @@ procSuite "Peer Manager": # service peers node.peerManager.addServicePeer(peers[0], WakuStoreCodec) - node.peerManager.addServicePeer(peers[1], WakuFilterCodec) + node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec) node.peerManager.addServicePeer(peers[2], WakuLightPushCodec) node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec) @@ -449,7 +449,7 @@ procSuite "Peer Manager": # all service peers are added to its service slot check: node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId - node.peerManager.serviceSlots[WakuFilterCodec].peerId == peers[1].peerId + node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId @@ -474,11 +474,11 @@ procSuite "Peer Manager": (await nodes[0].peerManager.connectRelay(pInfos[2])) == true (await nodes[1].peerManager.connectRelay(pInfos[2])) == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() == true # isolated dial creates a relay conn under the hood (libp2p behaviour) - (await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true + (await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() == true # assert physical connections @@ -486,26 +486,26 @@ procSuite "Peer Manager": nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0 nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2 - nodes[0].peerManager.connectedPeers(WakuFilterCodec)[0].len == 0 - nodes[0].peerManager.connectedPeers(WakuFilterCodec)[1].len == 2 + nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0 + nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 - nodes[1].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 - nodes[1].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0 + nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 + nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2 nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1 - nodes[2].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 - nodes[2].peerManager.connectedPeers(WakuFilterCodec)[1].len == 1 + nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 + nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1 nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0 - nodes[3].peerManager.connectedPeers(WakuFilterCodec)[0].len == 1 - nodes[3].peerManager.connectedPeers(WakuFilterCodec)[1].len == 0 + nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1 + nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0 asyncTest "getNumStreams() returns expected number of connections per protocol": # Create 2 nodes @@ -521,17 +521,17 @@ procSuite "Peer Manager": require: # multiple streams are multiplexed over a single connection. # note that a relay connection is created under the hood when dialing a peer (libp2p behaviour) - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true - (await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true + (await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() == true check: nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) - nodes[0].peerManager.getNumStreams(WakuFilterCodec) == (0, 4) + nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4) nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1) - nodes[1].peerManager.getNumStreams(WakuFilterCodec) == (4, 0) + nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0) test "selectPeer() returns the correct peer": # Valid peer id missing the last digit @@ -552,7 +552,7 @@ procSuite "Peer Manager": # Add a peer[0] to the peerstore pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs - pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterCodec] + pm.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec] # When no service peers, we get one from the peerstore let selectedPeer1 = pm.selectPeer(WakuStoreCodec) @@ -561,7 +561,7 @@ procSuite "Peer Manager": selectedPeer1.get().peerId == peers[0].peerId # Same for other protocol - let selectedPeer2 = pm.selectPeer(WakuFilterCodec) + let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec) check: selectedPeer2.isSome() == true selectedPeer2.get().peerId == peers[0].peerId @@ -757,7 +757,7 @@ procSuite "Peer Manager": discard await nodes[0].peerManager.connectRelay(pInfos[3]) discard await nodes[0].peerManager.connectRelay(pInfos[4]) - # they are also prunned  + # they are also prunned check nodes[0].peerManager.switch.connManager.getConnections().len == 1 # we should have 4 peers (2in/2out) but due to collocation limit diff --git a/tests/test_waku_filter.nim b/tests/test_waku_filter_legacy.nim similarity index 100% rename from tests/test_waku_filter.nim rename to tests/test_waku_filter_legacy.nim diff --git a/tests/test_wakunode_filter.nim b/tests/test_wakunode_filter_legacy.nim similarity index 93% rename from tests/test_wakunode_filter.nim rename to tests/test_wakunode_filter_legacy.nim index 4ea7750ec..e0fd53039 100644 --- a/tests/test_wakunode_filter.nim +++ b/tests/test_wakunode_filter_legacy.nim @@ -44,7 +44,7 @@ suite "WakuNode - Filter": filterPushHandlerFut.complete((pubsubTopic, msg)) ## When - await client.filterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo) + await client.legacyFilterSubscribe(some(pubsubTopic), contentTopic, filterPushHandler, peer=serverPeerInfo) # Wait for subscription to take effect waitFor sleepAsync(100.millis) diff --git a/tests/waku_filter_v2/client_utils.nim b/tests/waku_filter_v2/client_utils.nim index 95d3d5c46..e42076ace 100644 --- a/tests/waku_filter_v2/client_utils.nim +++ b/tests/waku_filter_v2/client_utils.nim @@ -1,6 +1,5 @@ import std/[options,tables], - testutils/unittests, chronos, chronicles @@ -22,10 +21,10 @@ proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} = return proto -proc newTestWakuFilterClient*(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} = +proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.} = let peerManager = PeerManager.new(switch) - proto = WakuFilterClient.new(rng, messagePushHandler, peerManager) + proto = WakuFilterClient.new(peerManager, rng) await proto.start() switch.mount(proto) diff --git a/tests/waku_filter_v2/test_waku_client.nim b/tests/waku_filter_v2/test_waku_client.nim index 398bcddeb..f4cae3ed8 100644 --- a/tests/waku_filter_v2/test_waku_client.nim +++ b/tests/waku_filter_v2/test_waku_client.nim @@ -3,16 +3,13 @@ import std/[options,tables], testutils/unittests, - chronos, - chronicles, - libp2p/peerstore + chronos import - ../../../waku/node/peer_manager, - ../../../waku/waku_filter_v2, - ../../../waku/waku_filter_v2/client, - ../../../waku/waku_core, - ../testlib/common, + ../../waku/node/peer_manager, + ../../waku/waku_filter_v2, + ../../waku/waku_filter_v2/client, + ../../waku/waku_core, ../testlib/wakucore, ../testlib/testasync, ./client_utils.nim @@ -28,26 +25,23 @@ suite "Waku Filter": var contentTopics {.threadvar.}: seq[ContentTopic] asyncSetup: - let - voidHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = - discard pubsubTopic = DefaultPubsubTopic contentTopics = @[DefaultContentTopic] serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, voidHandler) - + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) + await allFutures(serverSwitch.start(), clientSwitch.start()) serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() - + asyncTeardown: await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) asyncTest "Active Subscription Identification": # Given - let + let clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopics @@ -75,12 +69,12 @@ suite "Waku Filter": asyncTest "After Unsubscription": # Given - let + let clientPeerId = clientSwitch.peerInfo.toRemotePeerInfo().peerId subscribeResponse = await wakuFilterClient.subscribe( serverRemotePeerInfo, pubsubTopic, contentTopics ) - + require: subscribeResponse.isOk() wakuFilter.subscriptions.hasKey(clientPeerId) diff --git a/tests/waku_filter_v2/test_waku_filter.nim b/tests/waku_filter_v2/test_waku_filter.nim index 76c90e4fb..fe04d012f 100644 --- a/tests/waku_filter_v2/test_waku_filter.nim +++ b/tests/waku_filter_v2/test_waku_filter.nim @@ -3,16 +3,13 @@ import std/[options,tables], testutils/unittests, - chronos, - chronicles, - libp2p/peerstore + chronos import - ../../../waku/node/peer_manager, - ../../../waku/waku_filter_v2, - ../../../waku/waku_filter_v2/client, - ../../../waku/waku_core, - ../testlib/common, + ../../waku/node/peer_manager, + ../../waku/waku_filter_v2, + ../../waku/waku_filter_v2/client, + ../../waku/waku_core, ../testlib/wakucore, ./client_utils.nim @@ -22,21 +19,29 @@ suite "Waku Filter - end to end": # Given var pushHandlerFuture = newFuture[(string, WakuMessage)]() - messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage): + Future[void] + {.async, closure, gcsafe.} = pushHandlerFuture.complete((pubsubTopic, message)) let serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic contentTopics = @[DefaultContentTopic] # When await allFutures(serverSwitch.start(), clientSwitch.start()) - let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + wakuFilterClient.registerPushHandler(messagePushHandler) + + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) # Then check: @@ -57,7 +62,9 @@ suite "Waku Filter - end to end": pushedMsg == msg1 # When - let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) # Then check: @@ -74,20 +81,24 @@ suite "Waku Filter - end to end": not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed # Teardown - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), + serverSwitch.stop(), clientSwitch.stop()) asyncTest "subscribe, unsubscribe multiple content topics": # Given var pushHandlerFuture = newFuture[(string, WakuMessage)]() - messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage): + Future[void] + {.async, closure, gcsafe.} = pushHandlerFuture.complete((pubsubTopic, message)) let serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") @@ -95,7 +106,12 @@ suite "Waku Filter - end to end": # When await allFutures(serverSwitch.start(), clientSwitch.start()) - let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + wakuFilterClient.registerPushHandler(messagePushHandler) + + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) # Then check: @@ -129,7 +145,9 @@ suite "Waku Filter - end to end": pushedMsg2 == msg2 # When - let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, @[contentTopic2]) # Unsubscribe only one content topic + let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + @[contentTopic2]) # Unsubscribe only one content topic # Then check: @@ -159,20 +177,24 @@ suite "Waku Filter - end to end": not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed # Teardown - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), + serverSwitch.stop(), clientSwitch.stop()) asyncTest "subscribe to multiple content topics and unsubscribe all": # Given var pushHandlerFuture = newFuture[(string, WakuMessage)]() - messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage): + Future[void] + {.async, closure, gcsafe.} = pushHandlerFuture.complete((pubsubTopic, message)) let serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") @@ -180,7 +202,12 @@ suite "Waku Filter - end to end": # When await allFutures(serverSwitch.start(), clientSwitch.start()) - let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + wakuFilterClient.registerPushHandler(messagePushHandler) + + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) # Then check: @@ -234,20 +261,24 @@ suite "Waku Filter - end to end": not (await pushHandlerFuture.withTimeout(2.seconds)) # Neither message should be pushed # Teardown - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), + serverSwitch.stop(), clientSwitch.stop()) asyncTest "subscribe, unsubscribe multiple pubsub topics and content topics": # Given var pushHandlerFuture = newFuture[(string, WakuMessage)]() - messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + messagePushHandler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage): + Future[void] + {.async, closure, gcsafe.} = pushHandlerFuture.complete((pubsubTopic, message)) let serverSwitch = newStandardSwitch() clientSwitch = newStandardSwitch() wakuFilter = await newTestWakuFilter(serverSwitch) - wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch) clientPeerId = clientSwitch.peerInfo.peerId pubsubTopic = DefaultPubsubTopic pubsubTopic2 = PubsubTopic("/waku/2/non-default-pubsub/proto") @@ -258,9 +289,17 @@ suite "Waku Filter - end to end": # When await allFutures(serverSwitch.start(), clientSwitch.start()) + + wakuFilterClient.registerPushHandler(messagePushHandler) + let - response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) - response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, contentTopics) + response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic, + contentTopics) + + response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic2, + contentTopics) # Then check: @@ -299,7 +338,9 @@ suite "Waku Filter - end to end": ## Step 3: We can selectively unsubscribe from pubsub topics and content topic(s) # When - let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, @[contentTopic2]) + let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), + pubsubTopic2, + @[contentTopic2]) require response3.isOk() let msg3 = fakeWakuMessage(contentTopic=contentTopic2) @@ -325,4 +366,5 @@ suite "Waku Filter - end to end": pushedMsg3 == msg3 # Teardown - await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), + serverSwitch.stop(), clientSwitch.stop()) diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index 6d01c36ee..117a1cc93 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -216,7 +216,7 @@ procSuite "WakuNode - Store": let mountArchiveRes = server.mountArchive(driver) assert mountArchiveRes.isOk(), mountArchiveRes.error - + waitFor server.mountStore() waitFor server.mountFilterClient() client.mountStoreClient() @@ -232,7 +232,7 @@ procSuite "WakuNode - Store": proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = filterFut.complete((pubsubTopic, msg)) - waitFor server.filterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer) + waitFor server.legacyFilterSubscribe(some(DefaultPubsubTopic), DefaultContentTopic, filterHandler, peer=filterSourcePeer) waitFor sleepAsync(100.millis) diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_admin.nim b/tests/wakunode_jsonrpc/test_jsonrpc_admin.nim index 1a4823fd3..e4e8c09e6 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_admin.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_admin.nim @@ -169,11 +169,11 @@ procSuite "Waku v2 JSON-RPC API - Admin": filterPeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr]) storePeer = PeerInfo.new(generateEcdsaKey(), @[locationAddr]) - node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuFilterCodec) + node.peerManager.addServicePeer(filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec) node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec) # Mock that we connected in the past so Identify populated this - node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuFilterCodec] + node.peerManager.peerStore[ProtoBook][filterPeer.peerId] = @[WakuLegacyFilterCodec] node.peerManager.peerStore[ProtoBook][storePeer.peerId] = @[WakuStoreCodec] let response = await client.get_waku_v2_admin_v1_peers() @@ -182,7 +182,7 @@ procSuite "Waku v2 JSON-RPC API - Admin": check: response.len == 2 # Check filter peer - (response.filterIt(it.protocol == WakuFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer) + (response.filterIt(it.protocol == WakuLegacyFilterCodec)[0]).multiaddr == constructMultiaddrStr(filterPeer) # Check store peer (response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer) diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim b/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim index 17202df4e..8b2e43c9a 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_filter.nim @@ -41,7 +41,7 @@ procSuite "Waku v2 JSON-RPC API - Filter": await node1.mountFilter() await node2.mountFilterClient() - node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) + node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec) # RPC server setup let diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index e4276fa3b..c13cd08b0 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -21,6 +21,11 @@ import ../../waku/node/rest/filter/handlers as filter_api, ../../waku/node/rest/filter/client as filter_api_client, ../../waku/waku_relay, + ../../waku/waku_filter_v2/subscriptions, + ../../waku/waku_filter_v2/common, + ../../waku/node/rest/relay/topic_cache, + ../../waku/node/rest/relay/handlers as relay_api, + ../../waku/node/rest/relay/client as relay_api_client, ../testlib/wakucore, ../testlib/wakunode @@ -36,51 +41,64 @@ proc testWakuNode(): WakuNode = type RestFilterTest = object - node1: WakuNode - node2: WakuNode + serviceNode: WakuNode + subscriberNode: WakuNode restServer: RestServerRef + restServerForService: RestServerRef messageCache: filter_api.MessageCache client: RestClientRef + clientTwdServiceNode: RestClientRef -proc setupRestFilter(): Future[RestFilterTest] {.async.} = - result.node1 = testWakuNode() - result.node2 = testWakuNode() +proc init(T: type RestFilterTest): Future[T] {.async.} = + var testSetup = RestFilterTest() + testSetup.serviceNode = testWakuNode() + testSetup.subscriberNode = testWakuNode() - await allFutures(result.node1.start(), result.node2.start()) + await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start()) - await result.node1.mountFilter() - await result.node2.mountFilterClient() + await testSetup.serviceNode.mountRelay() + await testSetup.serviceNode.mountFilter() + await testSetup.subscriberNode.mountFilterClient() - result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) + testSetup.subscriberNode.peerManager.addServicePeer(testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec) let restPort = Port(58011) - let restAddress = ValidIpAddress.init("0.0.0.0") - result.restServer = RestServerRef.init(restAddress, restPort).tryGet() + let restAddress = ValidIpAddress.init("127.0.0.1") + testSetup.restServer = RestServerRef.init(restAddress, restPort).tryGet() - result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity) + let restPort2 = Port(58012) + testSetup.restServerForService = RestServerRef.init(restAddress, restPort2).tryGet() - installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) - installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) - installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache) + # through this one we will see if messages are pushed according to our content topic sub + testSetup.messageCache = filter_api.MessageCache.init() + installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache) - result.restServer.start() + let topicCache = TopicCache.init() + installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache) - result.client = newRestHttpClient(initTAddress(restAddress, restPort)) + testSetup.restServer.start() + testSetup.restServerForService.start() - return result + testSetup.client = newRestHttpClient(initTAddress(restAddress, restPort)) + testSetup.clientTwdServiceNode = newRestHttpClient(initTAddress(restAddress, restPort2)) + + return testSetup proc shutdown(self: RestFilterTest) {.async.} = await self.restServer.stop() await self.restServer.closeWait() - await allFutures(self.node1.stop(), self.node2.stop()) + await self.restServerForService.stop() + await self.restServerForService.closeWait() + await allFutures(self.serviceNode.stop(), self.subscriberNode.stop()) -suite "Waku v2 Rest API - Filter": - asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": +suite "Waku v2 Rest API - Filter V2": + asyncTest "Subscribe a node to an array of topics - POST /filter/v2/subscriptions": # Given - let restFilterTest: RestFilterTest = await setupRestFilter() + let restFilterTest = await RestFilterTest.init() + let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId # When let contentFilters = @[DefaultContentTopic @@ -89,103 +107,178 @@ suite "Waku v2 Rest API - Filter": ,ContentTopic("4") ] - let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, + let requestBody = FilterSubscribeRequest(requestId: "1234", + contentFilters: contentFilters, pubsubTopic: some(DefaultPubsubTopic)) - let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) + let response = await restFilterTest.client.filterPostSubscriptions(requestBody) - # Then - check: - response.status == 200 - $response.contentType == $MIMETYPE_TEXT - response.data == "OK" + echo "response", $response - check: - restFilterTest.messageCache.isSubscribed(DefaultContentTopic) - restFilterTest.messageCache.isSubscribed("2") - restFilterTest.messageCache.isSubscribed("3") - restFilterTest.messageCache.isSubscribed("4") - - # When - error case - let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: none(string)) - let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) - - check: - badResponse.status == 400 - $badResponse.contentType == $MIMETYPE_TEXT - badResponse.data == "Invalid content body, could not decode. Unable to deserialize data" - - - await restFilterTest.shutdown() - - - asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": - # Given - let - restFilterTest: RestFilterTest = await setupRestFilter() - - # When - restFilterTest.messageCache.subscribe("1") - restFilterTest.messageCache.subscribe("2") - restFilterTest.messageCache.subscribe("3") - restFilterTest.messageCache.subscribe("4") - - let contentFilters = @[ContentTopic("1") - ,ContentTopic("2") - ,ContentTopic("3") - # ,ContentTopic("4") # Keep this subscription for check - ] - - # When - let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, - pubsubTopic: some(DefaultPubsubTopic)) - let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) - - # Then - check: - response.status == 200 - $response.contentType == $MIMETYPE_TEXT - response.data == "OK" - - check: - not restFilterTest.messageCache.isSubscribed("1") - not restFilterTest.messageCache.isSubscribed("2") - not restFilterTest.messageCache.isSubscribed("3") - restFilterTest.messageCache.isSubscribed("4") - - await restFilterTest.shutdown() - - - asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": - # Given - - let - restFilterTest = await setupRestFilter() - - let pubSubTopic = "/waku/2/default-waku/proto" - let contentTopic = ContentTopic( "content-topic-x" ) - - let messages = @[ - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), - ] - - restFilterTest.messageCache.subscribe(contentTopic) - for msg in messages: - restFilterTest.messageCache.addMessage(contentTopic, msg) - - # When - let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) + let subscribedPeer1 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, DefaultContentTopic) + let subscribedPeer2 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "2") + let subscribedPeer3 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "3") + let subscribedPeer4 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4") # Then check: response.status == 200 $response.contentType == $MIMETYPE_JSON - response.data.len == 3 - response.data.all do (msg: FilterWakuMessage) -> bool: - msg.payload == base64.encode("TEST-1") and - msg.contentTopic.get().string == "content-topic-x" and - msg.version.get() == 2 and - msg.timestamp.get() != Timestamp(0) + response.data.requestId == "1234" + subscribedPeer1.len() == 1 + subPeerId in subscribedPeer1 + subPeerId in subscribedPeer2 + subPeerId in subscribedPeer3 + subPeerId in subscribedPeer4 + + # When - error case + let badRequestBody = FilterSubscribeRequest(requestId: "4567", contentFilters: @[], pubsubTopic: none(string)) + let badRequestResp = await restFilterTest.client.filterPostSubscriptions(badRequestBody) + + check: + badRequestResp.status == 400 + $badRequestResp.contentType == $MIMETYPE_JSON + badRequestResp.data.requestId == "unknown" + # badRequestResp.data.statusDesc == "*********" + badRequestResp.data.statusDesc.startsWith("BAD_REQUEST: Failed to decode request") + + await restFilterTest.shutdown() + + asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v2/subscriptions": + # Given + let + restFilterTest = await RestFilterTest.init() + subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + + # When + var requestBody = FilterSubscribeRequest(requestId: "1234", + contentFilters: @[ContentTopic("1") + ,ContentTopic("2") + ,ContentTopic("3") + ,ContentTopic("4") + ], + pubsubTopic: some(DefaultPubsubTopic)) + discard await restFilterTest.client.filterPostSubscriptions(requestBody) + + let contentFilters = @[ContentTopic("1") + ,ContentTopic("2") + ,ContentTopic("3") + # ,ContentTopic("4") # Keep this subscription for check + ] + + let requestBodyUnsub = FilterUnsubscribeRequest(requestId: "4321", + contentFilters: contentFilters, + pubsubTopic: some(DefaultPubsubTopic)) + let response = await restFilterTest.client.filterDeleteSubscriptions(requestBodyUnsub) + + let subscribedPeer1 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, DefaultContentTopic) + let subscribedPeer2 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "2") + let subscribedPeer3 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "3") + let subscribedPeer4 = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4") + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.requestId == "4321" + subscribedPeer1.len() == 0 + subPeerId notin subscribedPeer1 + subPeerId notin subscribedPeer2 + subPeerId notin subscribedPeer3 + subscribedPeer4.len() == 1 + subPeerId in subscribedPeer4 + + # When - error case + let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "2143") + let responseUnsubAll = await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll) + + let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(DefaultPubsubTopic, "4") + + check: + responseUnsubAll.status == 200 + $responseUnsubAll.contentType == $MIMETYPE_JSON + responseUnsubAll.data.requestId == "2143" + subscribedPeer.len() == 0 + + await restFilterTest.shutdown() + + asyncTest "ping subscribed node - GET /filter/v2/subscriptions/{requestId}": + # Given + let + restFilterTest = await RestFilterTest.init() + subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + + # When + var requestBody = FilterSubscribeRequest(requestId: "1234", + contentFilters: @[ContentTopic("1")], + pubsubTopic: some(DefaultPubsubTopic)) + discard await restFilterTest.client.filterPostSubscriptions(requestBody) + + let pingResponse = await restFilterTest.client.filterSubscriberPing("9999") + + # Then + check: + pingResponse.status == 200 + $pingResponse.contentType == $MIMETYPE_JSON + pingResponse.data.requestId == "9999" + pingResponse.data.statusDesc.len() == 0 + + # When - error case + let requestBodyUnsubAll = FilterUnsubscribeAllRequest(requestId: "9988") + discard await restFilterTest.client.filterDeleteAllSubscriptions(requestBodyUnsubAll) + + let pingResponseFail = await restFilterTest.client.filterSubscriberPing("9977") + + # Then + check: + pingResponseFail.status == 404 # NOT_FOUND + $pingResponseFail.contentType == $MIMETYPE_JSON + pingResponseFail.data.requestId == "9977" + pingResponseFail.data.statusDesc == "NOT_FOUND: peer has no subscriptions" + + await restFilterTest.shutdown() + + asyncTest "push filtered message": + # Given + let + restFilterTest = await RestFilterTest.init() + subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + + restFilterTest.messageCache.subscribe(DefaultPubsubTopic) + restFilterTest.serviceNode.subscribe(DefaultPubsubTopic) + + # When + var requestBody = FilterSubscribeRequest(requestId: "1234", + contentFilters: @[ContentTopic("1")], + pubsubTopic: some(DefaultPubsubTopic)) + discard await restFilterTest.client.filterPostSubscriptions(requestBody) + + let pingResponse = await restFilterTest.client.filterSubscriberPing("9999") + + # Then + check: + pingResponse.status == 200 + $pingResponse.contentType == $MIMETYPE_JSON + pingResponse.data.requestId == "9999" + pingResponse.data.statusDesc.len() == 0 + + # When - message push + let testMessage = WakuMessage( + payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(), + contentTopic: "1", + timestamp: int64(2022) + ) + + let postMsgResponse = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, + toRelayWakuMessage(testMessage) + ) + # Then + let messages = restFilterTest.messageCache.getMessages("1").tryGet() + + check: + postMsgResponse.status == 200 + $postMsgResponse.contentType == $MIMETYPE_TEXT + postMsgResponse.data == "OK" + messages == @[testMessage] await restFilterTest.shutdown() diff --git a/tests/wakunode_rest/test_rest_legacy_filter.nim b/tests/wakunode_rest/test_rest_legacy_filter.nim new file mode 100644 index 000000000..7b4ceba77 --- /dev/null +++ b/tests/wakunode_rest/test_rest_legacy_filter.nim @@ -0,0 +1,191 @@ +{.used.} + +import + std/sequtils, + stew/byteutils, + stew/shims/net, + testutils/unittests, + presto, presto/client as presto_client, + libp2p/crypto/crypto +import + ../../waku/node/message_cache, + ../../waku/common/base64, + ../../waku/waku_core, + ../../waku/waku_node, + ../../waku/node/peer_manager, + ../../waku/waku_filter, + ../../waku/node/rest/server, + ../../waku/node/rest/client, + ../../waku/node/rest/responses, + ../../waku/node/rest/filter/types, + ../../waku/node/rest/filter/legacy_handlers as filter_api, + ../../waku/node/rest/filter/legacy_client as filter_api_client, + ../../waku/waku_relay, + ../testlib/wakucore, + ../testlib/wakunode + + +proc testWakuNode(): WakuNode = + let + privkey = generateSecp256k1Key() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(0) + + return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) + + +type RestFilterTest = object + filterNode: WakuNode + clientNode: WakuNode + restServer: RestServerRef + messageCache: filter_api.MessageCache + client: RestClientRef + + +proc setupRestFilter(): Future[RestFilterTest] {.async.} = + result.filterNode = testWakuNode() + result.clientNode = testWakuNode() + + await allFutures(result.filterNode.start(), result.clientNode.start()) + + await result.filterNode.mountFilter() + await result.clientNode.mountFilterClient() + + result.clientNode.peerManager.addServicePeer(result.filterNode.peerInfo.toRemotePeerInfo() + ,WakuLegacyFilterCodec) + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("0.0.0.0") + result.restServer = RestServerRef.init(restAddress, restPort).tryGet() + + result.messageCache = filter_api.MessageCache.init() + installLegacyFilterRestApiHandlers(result.restServer.router + ,result.clientNode + ,result.messageCache) + + result.restServer.start() + + result.client = newRestHttpClient(initTAddress(restAddress, restPort)) + + return result + + +proc shutdown(self: RestFilterTest) {.async.} = + await self.restServer.stop() + await self.restServer.closeWait() + await allFutures(self.filterNode.stop(), self.clientNode.stop()) + + +suite "Waku v2 Rest API - Filter": + asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": + # Given + let restFilterTest: RestFilterTest = await setupRestFilter() + + # When + let contentFilters = @[DefaultContentTopic + ,ContentTopic("2") + ,ContentTopic("3") + ,ContentTopic("4") + ] + + let requestBody = FilterLegacySubscribeRequest(contentFilters: contentFilters, + pubsubTopic: some(DefaultPubsubTopic)) + let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + restFilterTest.messageCache.isSubscribed(DefaultContentTopic) + restFilterTest.messageCache.isSubscribed("2") + restFilterTest.messageCache.isSubscribed("3") + restFilterTest.messageCache.isSubscribed("4") + + # When - error case + let badRequestBody = FilterLegacySubscribeRequest(contentFilters: @[] + ,pubsubTopic: none(string)) + let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) + + check: + badResponse.status == 400 + $badResponse.contentType == $MIMETYPE_TEXT + badResponse.data == "Invalid content body, could not decode. Unable to deserialize data" + + + await restFilterTest.shutdown() + + + asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": + # Given + let + restFilterTest: RestFilterTest = await setupRestFilter() + + # When + restFilterTest.messageCache.subscribe("1") + restFilterTest.messageCache.subscribe("2") + restFilterTest.messageCache.subscribe("3") + restFilterTest.messageCache.subscribe("4") + + let contentFilters = @[ContentTopic("1") + ,ContentTopic("2") + ,ContentTopic("3") + # ,ContentTopic("4") # Keep this subscription for check + ] + + # When + let requestBody = FilterLegacySubscribeRequest(contentFilters: contentFilters, + pubsubTopic: some(DefaultPubsubTopic)) + let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + not restFilterTest.messageCache.isSubscribed("1") + not restFilterTest.messageCache.isSubscribed("2") + not restFilterTest.messageCache.isSubscribed("3") + restFilterTest.messageCache.isSubscribed("4") + + await restFilterTest.shutdown() + + asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": + # Given + + let + restFilterTest = await setupRestFilter() + + let pubSubTopic = "/waku/2/default-waku/proto" + let contentTopic = ContentTopic( "content-topic-x" ) + + let messages = @[ + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + ] + + restFilterTest.messageCache.subscribe(contentTopic) + for msg in messages: + restFilterTest.messageCache.addMessage(contentTopic, msg) + + # When + let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.len == 3 + response.data.all do (msg: FilterWakuMessage) -> bool: + msg.payload == base64.encode("TEST-1") and + msg.contentTopic.get().string == "content-topic-x" and + msg.version.get() == 2 and + msg.timestamp.get() != Timestamp(0) + + await restFilterTest.shutdown() diff --git a/waku/README.md b/waku/README.md index 59f49c10f..c24fce873 100644 --- a/waku/README.md +++ b/waku/README.md @@ -50,18 +50,18 @@ To run a specific test. # Get a shell with the right environment variables set ./env.sh bash # Run a specific test -nim c -r ./tests/test_waku_filter.nim +nim c -r ./tests/test_waku_filter_legacy.nim ``` You can also alter compile options. For example, if you want a less verbose output you can do the following. For more, refer to the [compiler flags](https://nim-lang.org/docs/nimc.html#compiler-usage) and [chronicles documentation](https://github.com/status-im/nim-chronicles#compile-time-configuration). ```bash -nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off ./tests/test_waku_filter.nim +nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off ./tests/waku_filter_v2/test_waku_filter.nim ``` You may also want to change the `outdir` to a folder ignored by git. ```bash -nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off --outdir=build ./tests/test_waku_filter.nim +nim c -r -d:chronicles_log_level=WARN --verbosity=0 --hints=off --outdir=build ./tests/waku_filter_v2/test_waku_filter.nim ``` ### Waku Protocol Example diff --git a/waku/node/jsonrpc/admin/handlers.nim b/waku/node/jsonrpc/admin/handlers.nim index 5c7f1a29d..d60994a72 100644 --- a/waku/node/jsonrpc/admin/handlers.nim +++ b/waku/node/jsonrpc/admin/handlers.nim @@ -71,9 +71,9 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = if not node.wakuFilterLegacy.isNil(): # Map WakuFilter peers to WakuPeers and add to return list - let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec) + let filterPeers = node.peerManager.peerStore.peers(WakuLegacyFilterCodec) .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it), - protocol: WakuFilterCodec, + protocol: WakuLegacyFilterCodec, connected: it.connectedness == Connectedness.Connected)) peers.add(filterPeers) diff --git a/waku/node/jsonrpc/filter/handlers.nim b/waku/node/jsonrpc/filter/handlers.nim index 43174c0e1..312dcd036 100644 --- a/waku/node/jsonrpc/filter/handlers.nim +++ b/waku/node/jsonrpc/filter/handlers.nim @@ -34,7 +34,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message ## Subscribes a node to a list of content filters debug "post_waku_v2_filter_v1_subscription" - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) if peerOpt.isNone(): raise newException(ValueError, "no suitable remote filter peers") @@ -43,7 +43,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = cache.addMessage(msg.contentTopic, msg) - let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) + let subFut = node.legacyFilterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) if not await subFut.withTimeout(futTimeout): raise newException(ValueError, "Failed to subscribe to contentFilters") @@ -59,7 +59,11 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message let contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) - let unsubFut = node.unsubscribe(pubsubTopic, contentTopics) + let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) + if peerOpt.isNone(): + raise newException(ValueError, "no suitable remote filter peers") + + let unsubFut = node.legacyFilterUnsubscribe(pubsubTopic, contentTopics, peerOpt.get()) if not await unsubFut.withTimeout(futTimeout): raise newException(ValueError, "Failed to unsubscribe from contentFilters") diff --git a/waku/node/message_cache.nim b/waku/node/message_cache.nim index 9d801ea78..dd0af1740 100644 --- a/waku/node/message_cache.nim +++ b/waku/node/message_cache.nim @@ -44,6 +44,8 @@ proc unsubscribe*[K](t: MessageCache[K], topic: K) = return t.table.del(topic) +proc unsubscribeAll*[K](t: MessageCache[K]) = + t.table.clear() proc addMessage*[K](t: MessageCache, topic: K, msg: WakuMessage) = if not t.isSubscribed(topic): diff --git a/waku/node/rest/filter/client.nim b/waku/node/rest/filter/client.nim index a5b53c01f..f4d8fd262 100644 --- a/waku/node/rest/filter/client.nim +++ b/waku/node/rest/filter/client.nim @@ -4,8 +4,10 @@ else: {.push raises: [].} import + json, std/sets, stew/byteutils, + strformat, chronicles, json_serialization, json_serialization/std/options, @@ -19,9 +21,9 @@ import export types logScope: - topics = "waku node rest client" + topics = "waku node rest client v2" -proc encodeBytes*(value: FilterSubscriptionsRequest, +proc encodeBytes*(value: FilterSubscribeRequest, contentType: string): RestResult[seq[byte]] = if MediaType.init(contentType) != MIMETYPE_JSON: error "Unsupported contentType value", contentType = contentType @@ -30,30 +32,68 @@ proc encodeBytes*(value: FilterSubscriptionsRequest, let encoded = ?encodeIntoJsonBytes(value) return ok(encoded) -proc decodeBytes*(t: typedesc[string], value: openarray[byte], - contentType: Opt[ContentTypeData]): RestResult[string] = - if MediaType.init($contentType) != MIMETYPE_TEXT: +proc encodeBytes*(value: FilterSubscriberPing, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: error "Unsupported contentType value", contentType = contentType return err("Unsupported contentType") - var res: string - if len(value) > 0: - res = newString(len(value)) - copyMem(addr res[0], unsafeAddr value[0], len(value)) - return ok(res) + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest): - RestResponse[string] - {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} +proc encodeBytes*(value: FilterUnsubscribeRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest): - RestResponse[string] - {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.} + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) -proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], - data: openArray[byte], +proc encodeBytes*(value: FilterUnsubscribeAllRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[FilterSubscriptionResponse], + value: openarray[byte], + contentType: Opt[ContentTypeData]): + + RestResult[FilterSubscriptionResponse] = + + if MediaType.init($contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let decoded = ?decodeFromJsonBytes(FilterSubscriptionResponse, value) + return ok(decoded) + +proc filterSubscriberPing*(requestId: string): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions/{requestId}", meth: HttpMethod.MethodGet.} + +proc filterPostSubscriptions*(body: FilterSubscribeRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPost.} + +proc filterPutSubscriptions*(body: FilterSubscribeRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodPut.} + +proc filterDeleteSubscriptions*(body: FilterUnsubscribeRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions", meth: HttpMethod.MethodDelete.} + +proc filterDeleteAllSubscriptions*(body: FilterUnsubscribeAllRequest): + RestResponse[FilterSubscriptionResponse] + {.rest, endpoint: "/filter/v2/subscriptions/all", meth: HttpMethod.MethodDelete.} + +proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], + data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] = if MediaType.init($contentType) != MIMETYPE_JSON: error "Unsupported response contentType value", contentType = contentType @@ -62,7 +102,6 @@ proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data) return ok(decoded) -# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) -proc filterGetMessagesV1*(contentTopic: string): - RestResponse[FilterGetMessagesResponse] - {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.} +proc filterGetMessagesV1*(contentTopic: string): + RestResponse[FilterGetMessagesResponse] + {.rest, endpoint: "/filter/v2/messages/{contentTopic}", meth: HttpMethod.MethodGet.} diff --git a/waku/node/rest/filter/handlers.nim b/waku/node/rest/filter/handlers.nim index d61ae52c7..1cfc5b011 100644 --- a/waku/node/rest/filter/handlers.nim +++ b/waku/node/rest/filter/handlers.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + std/strformat, std/sequtils, stew/byteutils, chronicles, @@ -14,24 +15,28 @@ import import ../../../waku_core, ../../../waku_filter, - ../../../waku_filter/client, + ../../../waku_filter_v2, + ../../../waku_filter_v2/client as filter_protocol_client, + ../../../waku_filter_v2/common as filter_protocol_type, ../../message_cache, ../../peer_manager, ../../waku_node, ../serdes, ../responses, ./types - + export types logScope: - topics = "waku node rest filter_api" + topics = "waku node rest filter_api_v2" -const futTimeoutForSubscriptionProcessing* = 5.seconds +const futTimeoutForSubscriptionProcessing* = 5.seconds #### Request handlers -const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" +const ROUTE_FILTER_SUBSCRIPTIONS* = "/filter/v2/subscriptions" + +const ROUTE_FILTER_ALL_SUBSCRIPTIONS* = "/filter/v2/subscriptions/all" const filterMessageCacheDefaultCapacity* = 30 @@ -50,85 +55,258 @@ func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiR let requestResult = decodeFromJsonBytes(T, reqBodyData) if requestResult.isErr(): - return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & $requestResult.error)) return ok(requestResult.get()) -proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - let pushHandler: FilterPushHandler = - proc(pubsubTopic: PubsubTopic, - msg: WakuMessage) {.async, gcsafe, closure.} = - cache.addMessage(msg.contentTopic, msg) +proc getErrorCause(err: filter_protocol_type.FilterSubscribeError): string = + ## Retrieve proper error cause of FilterSubscribeError - due stringify make some parts of text double - router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: - ## Subscribes a node to a list of contentTopics of a pubsubTopic - # debug "post_waku_v2_filter_v1_subscriptions" + case err.kind: + of FilterSubscribeErrorKind.PEER_DIAL_FAILURE: + err.address + of FilterSubscribeErrorKind.BAD_RESPONSE, FilterSubscribeErrorKind.BAD_REQUEST, + FilterSubscribeErrorKind.NOT_FOUND, FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: + err.cause + of FilterSubscribeErrorKind.UNKNOWN: + "UNKNOWN" - let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) +proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): T = + ## Properly convert filter protocol's response to rest response - if decodedBody.isErr(): - return decodedBody.error + if protocolClientRes.isErr(): + return FilterSubscriptionResponse( + requestId: requestId, + statusCode: uint32(protocolClientRes.error().kind), + statusDesc: getErrorCause(protocolClientRes.error()) + ) + else: + return FilterSubscriptionResponse( + requestId: requestId, + statusCode: 0, + statusDesc: "" + ) - let req: FilterSubscriptionsRequest = decodedBody.value() +proc convertResponse(T: type FilterSubscriptionResponse, requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): T = + ## Properly convert filter protocol's response to rest response in case of error - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + return FilterSubscriptionResponse( + requestId: requestId, + statusCode: uint32(protocolClientRes.kind), + statusDesc: $protocolClientRes + ) - if peerOpt.isNone(): - return RestApiResponse.internalServerError("No suitable remote filter peers") +proc convertErrorKindToHttpStatus(kind: filter_protocol_type.FilterSubscribeErrorKind): HttpCode = + ## Filter protocol's error code is not directly convertible to HttpCodes hence this converter - let subFut = node.filterSubscribe(req.pubsubTopic, - req.contentFilters, - pushHandler, + case kind: + of filter_protocol_type.FilterSubscribeErrorKind.UNKNOWN: + return Http200 + of filter_protocol_type.FilterSubscribeErrorKind.PEER_DIAL_FAILURE: + return Http504 #gateway timout + of filter_protocol_type.FilterSubscribeErrorKind.BAD_RESPONSE: + return Http500 # internal server error + of filter_protocol_type.FilterSubscribeErrorKind.BAD_REQUEST: + return Http400 + of filter_protocol_type.FilterSubscribeErrorKind.NOT_FOUND: + return Http404 + of filter_protocol_type.FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: + return Http503 + else: + return Http500 + +proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeResult): RestApiResponse = + let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes) + + var httpStatus : HttpCode = Http200 + + if protocolClientRes.isErr(): + httpStatus = convertErrorKindToHttpStatus(protocolClientRes.error().kind) # TODO: convert status codes! + + let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus) + + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}")) + + return resp.get() + +proc makeRestResponse(requestId: string, protocolClientRes: filter_protocol_type.FilterSubscribeError): RestApiResponse = + let filterSubscriptionResponse = FilterSubscriptionResponse.convertResponse(requestId, protocolClientRes) + + let httpStatus = convertErrorKindToHttpStatus(protocolClientRes.kind) # TODO: convert status codes! + + let resp = RestApiResponse.jsonResponse(filterSubscriptionResponse, status=httpStatus) + + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}")) + + return resp.get() + +proc filterPostPutSubscriptionRequestHandler(node: WakuNode, + contentBody: Option[ContentBody], + cache: MessageCache): + Future[RestApiResponse] + {.async.} = + ## handles any filter subscription requests, adds or modifies. + + let decodedBody = decodeRequestBody[FilterSubscribeRequest](contentBody) + + if decodedBody.isErr(): + return makeRestResponse("unknown", FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}"))) + + let req: FilterSubscribeRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + + if peerOpt.isNone(): + return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("No suitable peers")) + + let subFut = node.filterSubscribe(req.pubsubTopic, + req.contentFilters, peerOpt.get()) - if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): - error "Failed to subscribe to contentFilters do to timeout!" - return RestApiResponse.internalServerError("Failed to subscribe to contentFilters") + if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to subscribe to contentFilters do to timeout!" + return makeRestResponse(req.requestId, FilterSubscribeError.serviceUnavailable("Subscription request timed out")) - # Successfully subscribed to all content filters - for cTopic in req.contentFilters: - cache.subscribe(cTopic) + # Successfully subscribed to all content filters + for cTopic in req.contentFilters: + cache.subscribe(cTopic) - return RestApiResponse.ok() + return makeRestResponse(req.requestId, subFut.read()) -proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: +proc installFilterPostSubscriptionsHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a pubsubTopic + debug "post", ROUTE_FILTER_SUBSCRIPTIONS, contentBody + + let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache) + return response + +proc installFilterPutSubscriptionsHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodPut, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Modifies a subscribtion of a node to a list of contentTopics of a pubsubTopic + debug "put", ROUTE_FILTER_SUBSCRIPTIONS, contentBody + + let response = await filterPostPutSubscriptionRequestHandler(node, contentBody, cache) + return response + +proc installFilterDeleteSubscriptionsHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: ## Subscribes a node to a list of contentTopics of a PubSub topic - # debug "delete_waku_v2_filter_v1_subscriptions" + debug "delete", ROUTE_FILTER_SUBSCRIPTIONS, contentBody - let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) + let decodedBody = decodeRequestBody[FilterUnsubscribeRequest](contentBody) if decodedBody.isErr(): - return decodedBody.error + return makeRestResponse("unknown", + FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}"))) - let req: FilterSubscriptionsRequest = decodedBody.value() + let req: FilterUnsubscribeRequest = decodedBody.value() - let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters) + let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + + if peerOpt.isNone(): + return makeRestResponse(req.requestId, + FilterSubscribeError.serviceUnavailable("No suitable peers")) + + let unsubFut = node.filterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get()) if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): error "Failed to unsubscribe from contentFilters due to timeout!" - return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") + return makeRestResponse(req.requestId, + FilterSubscribeError.serviceUnavailable( + "Failed to unsubscribe from contentFilters due to timeout!")) + # Successfully subscribed to all content filters for cTopic in req.contentFilters: cache.unsubscribe(cTopic) # Successfully unsubscribed from all requested contentTopics - return RestApiResponse.ok() + return makeRestResponse(req.requestId, unsubFut.read()) -const ROUTE_RELAY_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" +proc installFilterDeleteAllSubscriptionsHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_ALL_SUBSCRIPTIONS) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a PubSub topic + debug "delete", ROUTE_FILTER_ALL_SUBSCRIPTIONS, contentBody -proc installFilterGetMessagesV1Handler*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: + let decodedBody = decodeRequestBody[FilterUnsubscribeAllRequest](contentBody) + + if decodedBody.isErr(): + return makeRestResponse("unknown", + FilterSubscribeError.badRequest(fmt("Failed to decode request: {decodedBody.error}"))) + + let req: FilterUnsubscribeAllRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + + if peerOpt.isNone(): + return makeRestResponse(req.requestId, + FilterSubscribeError.serviceUnavailable("No suitable peers")) + + let unsubFut = node.filterUnsubscribeAll(peerOpt.get()) + if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to unsubscribe from contentFilters due to timeout!" + return makeRestResponse(req.requestId, + FilterSubscribeError.serviceUnavailable( + "Failed to unsubscribe from all contentFilters due to timeout!")) + + cache.unsubscribeAll() + + # Successfully unsubscribed from all requested contentTopics + return makeRestResponse(req.requestId, unsubFut.read()) + +const ROUTE_FILTER_SUBSCRIBER_PING* = "/filter/v2/subscriptions/{requestId}" + +proc installFilterPingSubscriberHandler(router: var RestRouter, + node: WakuNode) = + router.api(MethodGet, ROUTE_FILTER_SUBSCRIBER_PING) do (requestId: string) -> RestApiResponse: + ## Checks if a node has valid subscription or not. + debug "get", ROUTE_FILTER_SUBSCRIBER_PING, requestId + + let peerOpt = node.peerManager.selectPeer(WakuFilterSubscribeCodec) + if peerOpt.isNone(): + return makeRestResponse(requestId.get(), + FilterSubscribeError.serviceUnavailable("No suitable remote filter peers")) + + let pingFutRes = node.wakuFilterClient.ping(peerOpt.get()) + + if not await pingFutRes.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to ping filter service peer due to timeout!" + return makeRestResponse(requestId.get(), + FilterSubscribeError.serviceUnavailable("Ping timed out")) + + return makeRestResponse(requestId.get(), pingFutRes.read()) + +const ROUTE_FILTER_MESSAGES* = "/filter/v2/messages/{contentTopic}" + +proc installFilterGetMessagesHandler(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + + + let pushHandler : FilterPushHandler = proc (pubsubTopic: PubsubTopic, + msg: WakuMessage) + {.async, gcsafe, closure.} = + cache.addMessage(msg.contentTopic, msg) + + node.wakuFilterClient.registerPushHandler(pushHandler) + + router.api(MethodGet, ROUTE_FILTER_MESSAGES) do (contentTopic: string) -> RestApiResponse: ## Returns all WakuMessages received on a specified content topic since the ## last time this method was called - ## TODO: ability to specify a return message limit - # debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic + ## TODO: ability to specify a return message limit, maybe use cursor to control paging response. + debug "get", ROUTE_FILTER_MESSAGES, contentTopic=contentTopic if contentTopic.isErr(): return RestApiResponse.badRequest("Missing contentTopic") @@ -147,9 +325,12 @@ proc installFilterGetMessagesV1Handler*(router: var RestRouter, return resp.get() -proc installFilterApiHandlers*(router: var RestRouter, - node: WakuNode, - cache: MessageCache) = - installFilterPostSubscriptionsV1Handler(router, node, cache) - installFilterDeleteSubscriptionsV1Handler(router, node, cache) - installFilterGetMessagesV1Handler(router, node, cache) +proc installFilterRestApiHandlers*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + installFilterPingSubscriberHandler(router, node) + installFilterPostSubscriptionsHandler(router, node, cache) + installFilterPutSubscriptionsHandler(router, node, cache) + installFilterDeleteSubscriptionsHandler(router, node, cache) + installFilterDeleteAllSubscriptionsHandler(router, node, cache) + installFilterGetMessagesHandler(router, node, cache) diff --git a/waku/node/rest/filter/legacy_client.nim b/waku/node/rest/filter/legacy_client.nim new file mode 100644 index 000000000..9434dd6e0 --- /dev/null +++ b/waku/node/rest/filter/legacy_client.nim @@ -0,0 +1,68 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sets, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../waku_core, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest client v1" + +proc encodeBytes*(value: FilterLegacySubscribeRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[string], value: openarray[byte], + contentType: Opt[ContentTypeData]): RestResult[string] = + if MediaType.init($contentType) != MIMETYPE_TEXT: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + var res: string + if len(value) > 0: + res = newString(len(value)) + copyMem(addr res[0], unsafeAddr value[0], len(value)) + return ok(res) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterPostSubscriptionsV1*(body: FilterLegacySubscribeRequest): + RestResponse[string] + {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterDeleteSubscriptionsV1*(body: FilterLegacySubscribeRequest): + RestResponse[string] + {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.} + +proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], + data: openArray[byte], + contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] = + if MediaType.init($contentType) != MIMETYPE_JSON: + error "Unsupported response contentType value", contentType = contentType + return err("Unsupported response contentType") + + let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data) + return ok(decoded) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterGetMessagesV1*(contentTopic: string): + RestResponse[FilterGetMessagesResponse] + {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.} diff --git a/waku/node/rest/filter/legacy_handlers.nim b/waku/node/rest/filter/legacy_handlers.nim new file mode 100644 index 000000000..92131cb0d --- /dev/null +++ b/waku/node/rest/filter/legacy_handlers.nim @@ -0,0 +1,160 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sequtils, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/route, + presto/common +import + ../../../waku_core, + ../../../waku_filter, + ../../../waku_filter/client, + ../../message_cache, + ../../peer_manager, + ../../waku_node, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest filter_api v1" + +const futTimeoutForSubscriptionProcessing* = 5.seconds + +#### Request handlers + +const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" + +const filterMessageCacheDefaultCapacity* = 30 + +type + MessageCache* = message_cache.MessageCache[ContentTopic] + +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + if contentBody.isNone(): + return err(RestApiResponse.badRequest("Missing content body")) + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) + + let reqBodyData = contentBody.get().data + + let requestResult = decodeFromJsonBytes(T, reqBodyData) + if requestResult.isErr(): + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + $requestResult.error)) + + return ok(requestResult.get()) + +proc installFilterV1PostSubscriptionsV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + let pushHandler: FilterPushHandler = + proc(pubsubTopic: PubsubTopic, + msg: WakuMessage) {.async, gcsafe, closure.} = + cache.addMessage(msg.contentTopic, msg) + + router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a pubsubTopic + debug "post", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody + + let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error + + let req: FilterLegacySubscribeRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) + + if peerOpt.isNone(): + return RestApiResponse.internalServerError("No suitable remote filter peers") + + let subFut = node.legacyFilterSubscribe(req.pubsubTopic, + req.contentFilters, + pushHandler, + peerOpt.get()) + + if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to subscribe to contentFilters do to timeout!" + return RestApiResponse.internalServerError("Failed to subscribe to contentFilters") + + # Successfully subscribed to all content filters + for cTopic in req.contentFilters: + cache.subscribe(cTopic) + + return RestApiResponse.ok() + +proc installFilterV1DeleteSubscriptionsV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a PubSub topic + debug "delete", ROUTE_FILTER_SUBSCRIPTIONSV1, contentBody + + let decodedBody = decodeRequestBody[FilterLegacySubscribeRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error + + let req: FilterLegacySubscribeRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuLegacyFilterCodec) + + if peerOpt.isNone(): + return RestApiResponse.internalServerError("No suitable remote filter peers") + + let unsubFut = node.legacyFilterUnsubscribe(req.pubsubTopic, req.contentFilters, peerOpt.get()) + if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to unsubscribe from contentFilters due to timeout!" + return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") + + for cTopic in req.contentFilters: + cache.unsubscribe(cTopic) + + # Successfully unsubscribed from all requested contentTopics + return RestApiResponse.ok() + +const ROUTE_FILTER_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" + +proc installFilterV1GetMessagesV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodGet, ROUTE_FILTER_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: + ## Returns all WakuMessages received on a specified content topic since the + ## last time this method was called + ## TODO: ability to specify a return message limit + debug "get", ROUTE_FILTER_MESSAGESV1, contentTopic=contentTopic + + if contentTopic.isErr(): + return RestApiResponse.badRequest("Missing contentTopic") + + let contentTopic = contentTopic.get() + + let msgRes = cache.getMessages(contentTopic, clear=true) + if msgRes.isErr(): + return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic) + + let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage)) + let resp = RestApiResponse.jsonResponse(data, status=Http200) + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError("An error ocurred while building the json respose") + + return resp.get() + +proc installLegacyFilterRestApiHandlers*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + installFilterV1PostSubscriptionsV1Handler(router, node, cache) + installFilterV1DeleteSubscriptionsV1Handler(router, node, cache) + installFilterV1GetMessagesV1Handler(router, node, cache) diff --git a/waku/node/rest/filter/openapi.yaml b/waku/node/rest/filter/openapi.yaml index d913eb08a..49ee6d967 100644 --- a/waku/node/rest/filter/openapi.yaml +++ b/waku/node/rest/filter/openapi.yaml @@ -1,6 +1,6 @@ openapi: 3.0.3 info: - title: Waku V2 node REST API + title: Waku V2 node REST API version: 1.0.0 contact: name: VAC Team @@ -10,6 +10,8 @@ tags: description: Filter REST API for WakuV2 node paths: + # Legacy support for v1 waku filter + # TODO: legacy endpoint, remove in the future /filter/v1/subscriptions: post: # post_waku_v2_filter_v1_subscription summary: Subscribe a node to an array of topics @@ -21,7 +23,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterSubscriptionsRequest' + $ref: '#/components/schemas/FilterLegacySubscribeRequest' responses: '200': description: OK @@ -32,8 +34,16 @@ paths: # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + text/plain: + schema: + type: string '5XX': description: Unexpected error. + content: + text/plain: + schema: + type: string delete: # delete_waku_v2_filter_v1_subscription summary: Unsubscribe a node from an array of topics @@ -45,7 +55,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterSubscriptionsRequest' + $ref: '#/components/schemas/FilterLegacySubscribeRequest' responses: '200': description: OK @@ -56,12 +66,24 @@ paths: # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + text/plain: + schema: + type: string '404': description: Not found. + content: + text/plain: + schema: + type: string '5XX': description: Unexpected error. + content: + text/plain: + schema: + type: string - # TODO: Review the path of this endpoint due maybe query for list of contentTopics matching + # TODO: legacy endpoint, remove in the future /filter/v1/messages/{contentTopic}: get: # get_waku_v2_filter_v1_messages summary: Get the latest messages on the polled content topic @@ -86,10 +108,270 @@ paths: # TODO: Review the possible errors of this endpoint '400': description: Bad request. + content: + text/plain: + schema: + type: string '404': description: Not found. + content: + text/plain: + schema: + type: string '5XX': description: Unexpected error. + content: + text/plain: + schema: + type: string + + /filter/v2/subscriptions/{requestId}: + get: # get_waku_v2_filter_v2_subscription - ping + summary: Subscriber-ping - a peer can query if there is a registered subscription for it + description: | + Subscriber peer can query its subscription existence on service node. + Returns HTTP200 if exists and HTTP404 if not. + Client must not fill anything but requestId in the request body. + operationId: subscriberPing + tags: + - filter + parameters: + - in: path + name: requestId + required: true + schema: + type: string + description: Id of ping request + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + /filter/v2/subscriptions: + post: # post_waku_v2_filter_v2_subscription + summary: Subscribe a peer to an array of content topics under a pubsubTopic + description: | + Subscribe a peer to an array of content topics under a pubsubTopic. + + It is allowed to refresh or add new content topic to an existing subscription. + + Fields pubsubTopic and contentFilters must be filled. + operationId: postSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + put: # put_waku_v2_filter_v2_subscription + summary: Modify existing subscription of a peer under a pubsubTopic + description: | + Modify existing subscription of a peer under a pubsubTopic. + + It is allowed to refresh or add new content topic to an existing subscription. + + Fields pubsubTopic and contentFilters must be filled. + operationId: putSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + + + delete: # delete_waku_v2_filter_v2_subscription + summary: Unsubscribe a peer from content topics + description: | + Unsubscribe a peer from content topics + Only that subscription will be removed which matches existing. + operationId: deleteSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterUnsubscribeRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + /filter/v2/subscriptions/all: + delete: # delete_waku_v2_filter_v2_subscription + summary: Unsubscribe a peer from all content topics + description: | + Unsubscribe a peer from all content topics + operationId: deleteAllSubscriptions + tags: + - filter + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/FilterUnsubscribeAllRequest' + responses: + '200': + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '400': + description: Bad request. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '404': + description: Not found. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + '5XX': + description: Unexpected error. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterSubscriptionResponse' + /filter/v2/messages/{contentTopic}: + get: # get_waku_v2_filter_v2_messages + summary: Get the latest messages on the polled content topic + description: Get a list of messages that were received on a subscribed content topic after the last time this method was called. + operationId: getMessagesByTopic + tags: + - filter + parameters: + - in: path + name: contentTopic # Note the name is the same as in the path + required: true + schema: + type: string + description: Content topic of message + responses: + '200': + description: The latest messages on the polled topic. + content: + application/json: + schema: + $ref: '#/components/schemas/FilterGetMessagesResponse' + # TODO: Review the possible errors of this endpoint + '400': + description: Bad request. + content: + text/plain: + schema: + type: string + '404': + description: Not found. + content: + text/plain: + schema: + type: string + '5XX': + description: Unexpected error. + content: + text/plain: + schema: + type: string components: schemas: @@ -97,7 +379,7 @@ components: type: string ContentTopic: type: string - + FilterWakuMessage: type: object properties: @@ -113,19 +395,68 @@ components: required: - payload - FilterSubscriptionsRequest: + FilterLegacySubscribeRequest: type: object - properties: + properties: contentFilters: type: array items: $ref: '#/components/schemas/ContentTopic' pubsubTopic: $ref: "#/components/schemas/PubSubTopic" - required: + required: - contentFilters - + FilterGetMessagesResponse: type: array items: - $ref: '#/components/schemas/FilterWakuMessage' \ No newline at end of file + $ref: '#/components/schemas/FilterWakuMessage' + + FilterSubscribeRequest: + type: object + properties: + requestId: + type: string + contentFilters: + type: array + items: + $ref: '#/components/schemas/ContentTopic' + pubsubTopic: + $ref: "#/components/schemas/PubSubTopic" + required: + - requestId + - contentFilters + - pubsubTopic + + FilterUnsubscribeRequest: + type: object + properties: + requestId: + type: string + contentFilters: + type: array + items: + $ref: '#/components/schemas/ContentTopic' + pubsubTopic: + $ref: "#/components/schemas/PubSubTopic" + required: + - requestId + - contentFilters + + FilterUnsubscribeAllRequest: + type: object + properties: + requestId: + type: string + required: + - requestId + + FilterSubscriptionResponse: + type: object + properties: + requestId: + type: string + statusDesc: + type: string + required: + - requestId diff --git a/waku/node/rest/filter/types.nim b/waku/node/rest/filter/types.nim index 595c8b180..a7a4fa585 100644 --- a/waku/node/rest/filter/types.nim +++ b/waku/node/rest/filter/types.nim @@ -8,7 +8,8 @@ import chronicles, json_serialization, json_serialization/std/options, - presto/[route, client, common] + presto/[route, client, common], + libp2p/peerid import ../../../common/base64, ../../../waku_core, @@ -24,10 +25,32 @@ type FilterWakuMessage* = object type FilterGetMessagesResponse* = seq[FilterWakuMessage] -type FilterSubscriptionsRequest* = object +type FilterLegacySubscribeRequest* = object + # Subscription request for legacy filter support pubsubTopic*: Option[PubSubTopic] contentFilters*: seq[ContentTopic] +type FilterSubscriberPing* = object + requestId*: string + +type FilterSubscribeRequest* = object + requestId*: string + pubsubTopic*: Option[PubSubTopic] + contentFilters*: seq[ContentTopic] + +type FilterUnsubscribeRequest* = object + requestId*: string + pubsubTopic*: Option[PubSubTopic] + contentFilters*: seq[ContentTopic] + +type FilterUnsubscribeAllRequest* = object + requestId*: string + +type FilterSubscriptionResponse* = object + requestId*: string + statusCode*: uint32 + statusDesc*: string + #### Type conversion proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage = @@ -65,7 +88,7 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage) writer.writeField("timestamp", value.timestamp) writer.endRecord() -proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest) +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterLegacySubscribeRequest) {.raises: [IOError].} = writer.beginRecord() writer.writeField("pubsubTopic", value.pubsubTopic) @@ -114,8 +137,8 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage) timestamp: timestamp ) -proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest) - {.raises: [SerializationError, IOError].} = +proc readValue*(reader: var JsonReader[RestJson], value: var FilterLegacySubscribeRequest) + {.raises: [SerializationError, IOError].} = var pubsubTopic = none(PubsubTopic) contentFilters = none(seq[ContentTopic]) @@ -126,7 +149,7 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions if keys.containsOrIncl(fieldName): let err = try: fmt"Multiple `{fieldName}` fields found" except CatchableError: "Multiple fields with the same name found" - reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest") + reader.raiseUnexpectedField(err, "FilterLegacySubscribeRequest") case fieldName of "pubsubTopic": @@ -136,8 +159,70 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions else: unrecognizedFieldWarning() - if pubsubTopic.isNone(): - reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") + if contentFilters.isNone(): + reader.raiseUnexpectedValue("Field `contentFilters` is missing") + + if contentFilters.get().len() == 0: + reader.raiseUnexpectedValue("Field `contentFilters` is empty") + + value = FilterLegacySubscribeRequest( + pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), + contentFilters: contentFilters.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriberPing) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterSubscriberPing") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterSubscriberPing( + requestId: requestId.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscribeRequest) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + pubsubTopic = none(PubsubTopic) + contentFilters = none(seq[ContentTopic]) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterSubscribeRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "contentFilters": + contentFilters = some(reader.readValue(seq[ContentTopic])) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") if contentFilters.isNone(): reader.raiseUnexpectedValue("Field `contentFilters` is missing") @@ -145,7 +230,108 @@ proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptions if contentFilters.get().len() == 0: reader.raiseUnexpectedValue("Field `contentFilters` is empty") - value = FilterSubscriptionsRequest( - pubsubTopic: if pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), + value = FilterSubscribeRequest( + requestId: requestId.get(), + pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), contentFilters: contentFilters.get() - ) \ No newline at end of file + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeRequest) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + pubsubTopic = none(PubsubTopic) + contentFilters = none(seq[ContentTopic]) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterUnsubscribeRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "contentFilters": + contentFilters = some(reader.readValue(seq[ContentTopic])) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + if contentFilters.isNone(): + reader.raiseUnexpectedValue("Field `contentFilters` is missing") + + if contentFilters.get().len() == 0: + reader.raiseUnexpectedValue("Field `contentFilters` is empty") + + value = FilterUnsubscribeRequest( + requestId: requestId.get(), + pubsubTopic: if pubsubTopic.isNone() or pubsubTopic.get() == "": none(string) else: some(pubsubTopic.get()), + contentFilters: contentFilters.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterUnsubscribeAllRequest) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterUnsubscribeAllRequest") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterUnsubscribeAllRequest( + requestId: requestId.get(), + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionResponse) + {.raises: [SerializationError, IOError].} = + var + requestId = none(string) + statusCode = none(uint32) + statusDesc = none(string) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterSubscriptionResponse") + + case fieldName + of "requestId": + requestId = some(reader.readValue(string)) + of "statusCode": + statusCode = some(reader.readValue(uint32)) + of "statusDesc": + statusDesc = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if requestId.isNone(): + reader.raiseUnexpectedValue("Field `requestId` is missing") + + value = FilterSubscriptionResponse( + requestId: requestId.get(), + statusCode: statusCode.get(), + statusDesc: statusDesc.get("") + ) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 5d9aa54e9..2be13113e 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -31,8 +31,9 @@ import ../waku_store, ../waku_store/client as store_client, ../waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed - ../waku_filter/client as filter_client, #TODO: support for legacy filter protocol will be removed + ../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed ../waku_filter_v2, + ../waku_filter_v2/client as filter_client, ../waku_lightpush, ../waku_lightpush/client as lightpush_client, ../waku_enr, @@ -41,7 +42,8 @@ import ../waku_rln_relay, ./config, ./peer_manager, - ./waku_switch + ./waku_switch, + ./rest/relay/topic_cache declarePublicCounter waku_node_messages, "number of messages received", ["type"] @@ -88,8 +90,9 @@ type wakuStore*: WakuStore wakuStoreClient*: WakuStoreClient wakuFilter*: waku_filter_v2.WakuFilter + wakuFilterClient*: filter_client.WakuFilterClient wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed - wakuFilterClientLegacy*: WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed + wakuFilterClientLegacy*: legacy_filter_client.WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush wakuLightpushClient*: WakuLightPushClient @@ -335,10 +338,10 @@ proc mountRelay*(node: WakuNode, for topic in topics: node.subscribe(topic) - ## Waku filter -proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} = +proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) + {.async, raises: [Defect, LPError]} = info "mounting filter protocol" node.wakuFilter = WakuFilter.new(node.peerManager) node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy @@ -348,32 +351,47 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) { await node.wakuFilterLegacy.start() #TODO: remove legacy node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec)) - node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuFilterCodec)) #TODO: remove legacy + node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec)) #TODO: remove legacy -proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.}= - if node.wakuFilter.isNil(): +proc filterHandleMessage*(node: WakuNode, + pubsubTopic: PubsubTopic, + message: WakuMessage) + {.async.}= + + if node.wakuFilter.isNil() or node.wakuFilterLegacy.isNil(): error "cannot handle filter message", error="waku filter is nil" return - await node.wakuFilter.handleMessage(pubsubTopic, message) - await node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy - + await allFutures(node.wakuFilter.handleMessage(pubsubTopic, message), + node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy + ) proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = + ## Mounting both filter clients v1 - legacy and v2. + ## Giving option for application level to chose btw own push message handling or + ## rely on node provided cache. - This only applies for v2 filter client info "mounting filter client" node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng) + node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) + if node.started: - # Node has started already. Let's start filter too. - await node.wakuFilterClientLegacy.start() + await allFutures(node.wakuFilterClientLegacy.start(), node.wakuFilterClient.start()) - node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec)) + node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterSubscribeCodec)) + node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuLegacyFilterCodec)) -proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], - handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = - ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. +proc legacyFilterSubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: ContentTopic|seq[ContentTopic], + handler: FilterPushHandler, + peer: RemotePeerInfo|string) + {.async, gcsafe, raises: [Defect, ValueError].} = + + ## Registers for messages that match a specific filter. + ## Triggers the handler whenever a message is received. if node.wakuFilterClientLegacy.isNil(): - error "cannot register filter subscription to topic", error="waku filter client is nil" + error "cannot register filter subscription to topic", error="waku legacy filter client is not set up" return let remotePeerRes = parsePeerInfo(peer) @@ -385,21 +403,31 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled # TODO: Move this logic to wakunode2 app - let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = - if node.wakuRelay.isNil() and not node.wakuStore.isNil(): - await node.wakuArchive.handleMessage(pubSubTopic, message) - - await handler(pubsubTopic, message) + # FIXME: This part needs refactoring. It seems possible that in special cases archiver will store same message multiple times. + let handlerWrapper: FilterPushHandler = + if node.wakuRelay.isNil() and not node.wakuStore.isNil(): + proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = + await allFutures(node.wakuArchive.handleMessage(pubSubTopic, message), + handler(pubsubTopic, message)) + else: + handler if pubsubTopic.isSome(): - info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + info "registering legacy filter subscription to content", + pubsubTopic=pubsubTopic.get(), + contentTopics=contentTopics, + peer=remotePeer.peerId - let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(), contentTopics, handlerWrapper, peer=remotePeer) + let res = await node.wakuFilterClientLegacy.subscribe(pubsubTopic.get(), + contentTopics, + handlerWrapper, + peer=remotePeer) if res.isOk(): - info "subscribed to topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics + info "subscribed to topic", pubsubTopic=pubsubTopic.get(), + contentTopics=contentTopics else: - error "failed filter subscription", error=res.error + error "failed legacy filter subscription", error=res.error waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) else: let topicMapRes = parseSharding(pubsubTopic, contentTopics) @@ -412,7 +440,11 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT var futures = collect(newSeq): for pubsub, topics in topicMap.pairs: - info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId + info "registering legacy filter subscription to content", + pubsubTopic=pubsub, + contentTopics=topics, + peer=remotePeer.peerId + let content = topics.mapIt($it) node.wakuFilterClientLegacy.subscribe($pubsub, content, handlerWrapper, peer=remotePeer) @@ -422,15 +454,82 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT let res = fut.read() if res.isErr(): - error "failed filter subscription", error=res.error + error "failed legacy filter subscription", error=res.error waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) for pubsub, topics in topicMap.pairs: info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics -proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], - peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = - ## Unsubscribe from a content filter. +proc filterSubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: ContentTopic|seq[ContentTopic], + peer: RemotePeerInfo|string): + + Future[FilterSubscribeResult] + + {.async, gcsafe, raises: [Defect, ValueError].} = + + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + if node.wakuFilterClient.isNil(): + error "cannot register filter subscription to topic", error="waku filter client is not set up" + return err(FilterSubscribeError.serviceUnavailable()) + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "Couldn't parse the peer info properly", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + if pubsubTopic.isSome(): + info "registering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + + let subRes = await node.wakuFilterClient.subscribe(remotePeer, pubsubTopic.get(), contentTopics) + if subRes.isOk(): + info "v2 subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + else: + error "failed filter v2 subscription", error=subRes.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + + return subRes + else: + let topicMapRes = parseSharding(pubsubTopic, contentTopics) + + let topicMap = + if topicMapRes.isErr(): + error "can't get shard", error=topicMapRes.error + return err(FilterSubscribeError.badResponse("can't get shard")) + else: topicMapRes.get() + + var futures = collect(newSeq): + for pubsub, topics in topicMap.pairs: + info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId + let content = topics.mapIt($it) + node.wakuFilterClient.subscribe(remotePeer, $pubsub, content) + + let finished = await allFinished(futures) + + var subRes: FilterSubscribeResult = FilterSubscribeResult.ok() + for fut in finished: + let res = fut.read() + + if res.isErr(): + error "failed filter subscription", error=res.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + subRes = FilterSubscribeResult.err(res.error) + + for pubsub, topics in topicMap.pairs: + info "subscribed to topic", pubsubTopic=pubsub, contentTopics=topics + + # return the last error or ok + return subRes + +proc legacyFilterUnsubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: ContentTopic|seq[ContentTopic], + peer: RemotePeerInfo|string) + {.async, gcsafe, raises: [Defect, ValueError].} = + ## Unsubscribe from a content legacy filter. if node.wakuFilterClientLegacy.isNil(): error "cannot unregister filter subscription to content", error="waku filter client is nil" return @@ -443,7 +542,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten let remotePeer = remotePeerRes.value if pubsubTopic.isSome(): - info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + info "deregistering legacy filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId let res = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic.get(), contentTopics, peer=remotePeer) @@ -479,35 +578,103 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten for pubsub, topics in topicMap.pairs: info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics -# TODO: Move to application module (e.g., wakunode2.nim) -proc subscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe, - deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} = - ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - if node.wakuFilterClientLegacy.isNil(): - error "cannot register filter subscription to topic", error="waku filter client is nil" - return +proc filterUnsubscribe*(node: WakuNode, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: RemotePeerInfo|string): - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) - if peerOpt.isNone(): - error "cannot register filter subscription to topic", error="no suitable remote peers" - return + Future[FilterSubscribeResult] - await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get()) + {.async, gcsafe, raises: [Defect, ValueError].} = -# TODO: Move to application module (e.g., wakunode2.nim) -proc unsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe, - deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} = - ## Unsubscribe from a content filter. + ## Unsubscribe from a content filter V2". if node.wakuFilterClientLegacy.isNil(): error "cannot unregister filter subscription to content", error="waku filter client is nil" - return + return err(FilterSubscribeError.serviceUnavailable()) - let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) - if peerOpt.isNone(): - error "cannot register filter subscription to topic", error="no suitable remote peers" - return + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "couldn't parse remotePeerInfo", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) - await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get()) + let remotePeer = remotePeerRes.value + + if pubsubTopic.isSome(): + info "deregistering filter subscription to content", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics, peer=remotePeer.peerId + + let unsubRes = await node.wakuFilterClient.unsubscribe(remotePeer, pubsubTopic.get(), contentTopics) + if unsubRes.isOk(): + info "unsubscribed from topic", pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics + else: + error "failed filter unsubscription", error=unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + return unsubRes + + else: # pubsubTopic.isNone + let topicMapRes = parseSharding(pubsubTopic, contentTopics) + + let topicMap = + if topicMapRes.isErr(): + error "can't get shard", error = topicMapRes.error + return err(FilterSubscribeError.badResponse("can't get shard")) + else: topicMapRes.get() + + var futures = collect(newSeq): + for pubsub, topics in topicMap.pairs: + info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId + let content = topics.mapIt($it) + node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content) + + let finished = await allFinished(futures) + + var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok() + for fut in finished: + let res = fut.read() + + if res.isErr(): + error "failed filter unsubscription", error=res.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + unsubRes = FilterSubscribeResult.err(res.error) + + for pubsub, topics in topicMap.pairs: + info "unsubscribed from topic", pubsubTopic=pubsub, contentTopics=topics + + # return the last error or ok + return unsubRes + +proc filterUnsubscribeAll*(node: WakuNode, + peer: RemotePeerInfo|string): + + Future[FilterSubscribeResult] + + {.async, gcsafe, raises: [Defect, ValueError].} = + + ## Unsubscribe from a content filter V2". + if node.wakuFilterClientLegacy.isNil(): + error "cannot unregister filter subscription to content", error="waku filter client is nil" + return err(FilterSubscribeError.serviceUnavailable()) + + let remotePeerRes = parsePeerInfo(peer) + if remotePeerRes.isErr(): + error "couldn't parse remotePeerInfo", error = remotePeerRes.error + return err(FilterSubscribeError.serviceUnavailable("No peers available")) + + let remotePeer = remotePeerRes.value + + info "deregistering all filter subscription to content", peer=remotePeer.peerId + + let unsubRes = await node.wakuFilterClient.unsubscribeAll(remotePeer) + if unsubRes.isOk(): + info "unsubscribed from all content-topic", peerId=remotePeer.peerId + else: + error "failed filter unsubscription from all content-topic", error=unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + return unsubRes + +# NOTICE: subscribe / unsubscribe methods are removed - they were already depricated +# yet incompatible to handle both type of filters - use specific filter registration instead ## Waku archive const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 19520ebcb..bb918b368 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -17,8 +17,8 @@ import libp2p/multicodec, libp2p/peerid, libp2p/peerinfo, - libp2p/routing_record - + libp2p/routing_record, + json_serialization type Connectedness* = enum @@ -62,6 +62,9 @@ type RemotePeerInfo* = ref object func `$`*(remotePeerInfo: RemotePeerInfo): string = $remotePeerInfo.peerId +proc writeValue*(w: var JsonWriter, value: RemotePeerInfo) {.inline, raises: [IOError].} = + w.writeValue $value + proc init*( T: typedesc[RemotePeerInfo], peerId: PeerID, diff --git a/waku/waku_filter/client.nim b/waku/waku_filter/client.nim index ae7b46b66..40ba73119 100644 --- a/waku/waku_filter/client.nim +++ b/waku/waku_filter/client.nim @@ -20,7 +20,6 @@ import ./protocol, ./protocol_metrics - logScope: topics = "waku filter client" @@ -71,7 +70,7 @@ proc initProtocolHandler(wf: WakuFilterClientLegacy) = wf.handleMessagePush(peerId, requestId, push) wf.handler = handle - wf.codec = WakuFilterCodec + wf.codec = WakuLegacyFilterCodec proc new*(T: type WakuFilterClientLegacy, peerManager: PeerManager, @@ -87,7 +86,7 @@ proc new*(T: type WakuFilterClientLegacy, proc sendFilterRpc(wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec) if connOpt.isNone(): return err(dialFailure) let connection = connOpt.get() @@ -155,6 +154,8 @@ proc unsubscribe*(wf: WakuFilterClientLegacy, if sendRes.isErr(): return err(sendRes.error) + # FIXME: I see an issue here that such solution prevents filtering client to properly manage its + # subscriptions on different peers and get notified correctly! for topic in topics: wf.subManager.removeSubscription(pubsubTopic, topic) diff --git a/waku/waku_filter/protocol.nim b/waku/waku_filter/protocol.nim index 9ad6818a0..62c0d618a 100644 --- a/waku/waku_filter/protocol.nim +++ b/waku/waku_filter/protocol.nim @@ -20,7 +20,7 @@ logScope: const - WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" + WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1" WakuFilterTimeout: Duration = 2.hours @@ -60,8 +60,6 @@ proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsu ## Protocol type - MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} - WakuFilterLegacy* = ref object of LPProtocol rng*: ref rand.HmacDrbgContext peerManager*: PeerManager @@ -110,7 +108,7 @@ proc initProtocolHandler(wf: WakuFilterLegacy) = wf.handleFilterRequest(conn.peerId, rpc) wf.handler = handler - wf.codec = WakuFilterCodec + wf.codec = WakuLegacyFilterCodec proc new*(T: type WakuFilterLegacy, peerManager: PeerManager, @@ -131,7 +129,7 @@ proc init*(T: type WakuFilterLegacy, proc sendFilterRpc(wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + let connOpt = await wf.peerManager.dialPeer(peer, WakuLegacyFilterCodec) if connOpt.isNone(): return err(dialFailure) let connection = connOpt.get() diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 5a7d86d90..72da09048 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -23,19 +23,21 @@ logScope: topics = "waku filter client" type - MessagePushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} WakuFilterClient* = ref object of LPProtocol rng: ref HmacDrbgContext - messagePushHandler: MessagePushHandler peerManager: PeerManager + pushHandlers: seq[FilterPushHandler] func generateRequestId(rng: ref HmacDrbgContext): string = var bytes: array[10, byte] hmacDrbgGenerate(rng[], bytes) return toHex(bytes) -proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, filterSubscribeRequest: FilterSubscribeRequest): Future[FilterSubscribeResult] {.async.} = - trace "Sending filter subscribe request", servicePeer, filterSubscribeRequest +proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, + filterSubscribeRequest: FilterSubscribeRequest): + Future[FilterSubscribeResult] + {.async.} = + trace "Sending filter subscribe request", peerId=servicePeer.peerId, filterSubscribeRequest let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec) if connOpt.isNone(): @@ -77,7 +79,13 @@ proc ping*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSub return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) -proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} = +proc subscribe*(wfc: WakuFilterClient, + servicePeer: RemotePeerInfo, + pubsubTopic: PubsubTopic, + contentTopics: seq[ContentTopic]): + Future[FilterSubscribeResult] + {.async.} = + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.subscribe( requestId = requestId, @@ -87,7 +95,13 @@ proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) -proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} = +proc unsubscribe*(wfc: WakuFilterClient, + servicePeer: RemotePeerInfo, + pubsubTopic: PubsubTopic, + contentTopics: seq[ContentTopic]): + Future[FilterSubscribeResult] + {.async.} = + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe( requestId = requestId, @@ -97,7 +111,10 @@ proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopi return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) -proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} = +proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): + Future[FilterSubscribeResult] + {.async.} = + let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.unsubscribeAll( requestId = requestId @@ -105,6 +122,9 @@ proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) +proc registerPushHandler*(wfc: WakuFilterClient, handler: FilterPushHandler) = + wfc.pushHandlers.add(handler) + proc initProtocolHandler(wfc: WakuFilterClient) = proc handler(conn: Connection, proto: string) {.async.} = @@ -119,7 +139,9 @@ proc initProtocolHandler(wfc: WakuFilterClient) = let messagePush = decodeRes.value #TODO: toAPI() split here trace "Received message push", peerId=conn.peerId, messagePush - wfc.messagePushHandler(messagePush.pubsubTopic, messagePush.wakuMessage) + for handler in wfc.pushHandlers: + asyncSpawn handler(messagePush.pubsubTopic, + messagePush.wakuMessage) # Protocol specifies no response for now return @@ -128,14 +150,14 @@ proc initProtocolHandler(wfc: WakuFilterClient) = wfc.codec = WakuFilterPushCodec proc new*(T: type WakuFilterClient, - rng: ref HmacDrbgContext, - messagePushHandler: MessagePushHandler, - peerManager: PeerManager): T = + peerManager: PeerManager, + rng: ref HmacDrbgContext + ): T = let wfc = WakuFilterClient( rng: rng, - messagePushHandler: messagePushHandler, - peerManager: peerManager + peerManager: peerManager, + pushHandlers: @[] ) wfc.initProtocolHandler() wfc diff --git a/waku/waku_filter_v2/rpc.nim b/waku/waku_filter_v2/rpc.nim index e6718dfa9..7807651b7 100644 --- a/waku/waku_filter_v2/rpc.nim +++ b/waku/waku_filter_v2/rpc.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + json_serialization, std/options import ../waku_core @@ -70,3 +71,29 @@ proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T = statusCode: 200, statusDesc: some(desc) ) + +proc `$`*(err: FilterSubscribeResponse): string = + "FilterSubscribeResponse of req:" & err.requestId & " [" & $err.statusCode & "] " & $err.statusDesc + +proc `$`*(req: FilterSubscribeRequest): string = + "FilterSubscribeRequest of req:" & req.requestId & " [" & $req.filterSubscribeType & "] " & $req.pubsubTopic + +proc `$`*(t: FilterSubscribeType): string = + result = case t: + of SUBSCRIBER_PING: "SUBSCRIBER_PING" + of SUBSCRIBE: "SUBSCRIBE" + of UNSUBSCRIBE: "UNSUBSCRIBE" + of UNSUBSCRIBE_ALL: "UNSUBSCRIBE_ALL" + +proc writeValue*(writer: var JsonWriter, value: FilterSubscribeRequest) {.inline, raises: [IOError].} = + writer.beginRecord() + writer.writeField("requestId", value.requestId) + writer.writeField("type", value.filterSubscribeType) + if value.pubsubTopic.isSome: + writer.writeField("pubsubTopic", value.pubsubTopic) + if value.contentTopics.len>0: + writer.writeField("contentTopics", value.contentTopics) + writer.endRecord() + + +