From 2b2eec9535d5b9c4cffc76d49875b4cf105fbdca Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Tue, 9 Jan 2024 11:42:29 +0100 Subject: [PATCH] feat: adding filter data admin endpoint (REST) (#2314) --- tests/wakunode_rest/test_rest_admin.nim | 60 +++++++++++++++++- waku/waku_api/rest/admin/client.nim | 8 +++ waku/waku_api/rest/admin/handlers.nim | 28 ++++++++- waku/waku_api/rest/admin/openapi.yaml | 41 ++++++++++++ waku/waku_api/rest/admin/types.nim | 84 +++++++++++++++++++++++++ 5 files changed, 218 insertions(+), 3 deletions(-) diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index 2bf66d624..a418e6ab6 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -1,7 +1,7 @@ {.used.} import - std/sequtils, + std/[sequtils,strformat], stew/shims/net, testutils/unittests, presto, presto/client as presto_client, @@ -10,6 +10,7 @@ import import ../../waku/waku_core, ../../waku/waku_node, + ../../waku/waku_filter_v2/client, ../../waku/node/peer_manager, ../../waku/waku_api/rest/server, ../../waku/waku_api/rest/client, @@ -26,6 +27,7 @@ suite "Waku v2 Rest API - Admin": var node1 {.threadvar.}: WakuNode var node2 {.threadvar.}: WakuNode var node3 {.threadvar.}: WakuNode + var peerInfo1 {.threadvar.}: RemotePeerInfo var peerInfo2 {.threadvar.}: RemotePeerInfo var peerInfo3 {.threadvar.}: RemotePeerInfo var restServer {.threadvar.}: RestServerRef @@ -33,6 +35,7 @@ suite "Waku v2 Rest API - Admin": asyncSetup: node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60600)) + peerInfo1 = node1.switch.peerInfo node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60602)) peerInfo2 = node2.switch.peerInfo node3 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60604)) @@ -94,3 +97,58 @@ suite "Waku v2 Rest API - Admin": getRes.status == 200 $getRes.contentType == $MIMETYPE_JSON getRes.data.len() == 0 + + asyncTest "Get filter data": + await allFutures(node1.mountFilter(), node2.mountFilterClient(), node3.mountFilterClient()) + + let + contentFiltersNode2 = @[DefaultContentTopic, ContentTopic("2"), ContentTopic("3")] + contentFiltersNode3 = @[ContentTopic("3"), ContentTopic("4")] + pubsubTopicNode2 = DefaultPubsubTopic + pubsubTopicNode3 = PubsubTopic("/waku/2/custom-waku/proto") + + expectedFilterData2 = fmt"(peerId: ""{$peerInfo2}"", filterCriteria:" & + fmt" @[(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[0]}""), " & + fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[1]}""), " & + fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[2]}"")]" + + expectedFilterData3 = fmt"(peerId: ""{$peerInfo3}"", filterCriteria:" & + fmt" @[(pubsubTopic: ""{pubsubTopicNode3}"", contentTopic: ""{contentFiltersNode3[0]}""), " & + fmt"(pubsubTopic: ""{pubsubTopicNode3}"", contentTopic: ""{contentFiltersNode3[1]}"")]" + + let + subscribeResponse2 = await node2.wakuFilterClient.subscribe( + peerInfo1, pubsubTopicNode2, contentFiltersNode2 + ) + subscribeResponse3 = await node3.wakuFilterClient.subscribe( + peerInfo1, pubsubTopicNode3, contentFiltersNode3 + ) + + assert subscribeResponse2.isOk(), $subscribeResponse2.error + assert subscribeResponse3.isOk(), $subscribeResponse3.error + + let getRes = await client.getFilterSubscriptions() + + check: + getRes.status == 200 + $getRes.contentType == $MIMETYPE_JSON + getRes.data.len() == 2 + ($getRes.data).contains(expectedFilterData2) + ($getRes.data).contains(expectedFilterData3) + + asyncTest "Get filter data - no filter subscribers": + await node1.mountFilter() + + let getRes = await client.getFilterSubscriptions() + + check: + getRes.status == 200 + $getRes.contentType == $MIMETYPE_JSON + getRes.data.len() == 0 + + asyncTest "Get filter data - filter not mounted": + let getRes = await client.getFilterSubscriptionsFilterNotMounted() + + check: + getRes.status == 400 + getRes.data == "Error: Filter Protocol is not mounted to the node" \ No newline at end of file diff --git a/waku/waku_api/rest/admin/client.nim b/waku/waku_api/rest/admin/client.nim index 7c0ee8a66..976853656 100644 --- a/waku/waku_api/rest/admin/client.nim +++ b/waku/waku_api/rest/admin/client.nim @@ -33,3 +33,11 @@ proc getPeers*(): proc postPeers*(body: seq[string]): RestResponse[string] {.rest, endpoint: "/admin/v1/peers", meth: HttpMethod.MethodPost.} + +proc getFilterSubscriptions*(): + RestResponse[seq[FilterSubscription]] + {.rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet.} + +proc getFilterSubscriptionsFilterNotMounted*(): + RestResponse[string] + {.rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet.} diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index bf4b5ebed..6621908b5 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -4,8 +4,7 @@ else: {.push raises: [].} import - std/strformat, - std/sequtils, + std/[strformat,sequtils,sets,tables], stew/byteutils, chronicles, json_serialization, @@ -32,6 +31,7 @@ logScope: topics = "waku node rest admin api" const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" +const ROUTE_ADMIN_V1_FILTER_SUBS* = "/admin/v1/filter/subscriptions" type PeerProtocolTuple = tuple[multiaddr: string, protocol: string, connected: bool] @@ -111,6 +111,30 @@ proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) = return RestApiResponse.ok() +proc installAdminV1GetFilterSubsHandler(router: var RestRouter, node: WakuNode) = + router.api(MethodGet, ROUTE_ADMIN_V1_FILTER_SUBS) do () -> RestApiResponse: + + if node.wakuFilter.isNil(): + return RestApiResponse.badRequest("Error: Filter Protocol is not mounted to the node") + + var + subscriptions: seq[FilterSubscription] = @[] + filterCriteria: seq[FilterTopic] + + for (peerId, criteria) in node.wakuFilter.subscriptions.pairs(): + filterCriteria = criteria.toSeq().mapIt(FilterTopic(pubsubTopic: it[0], + contentTopic: it[1])) + + subscriptions.add(FilterSubscription(peerId: $peerId, filterCriteria: filterCriteria)) + + let resp = RestApiResponse.jsonResponse(subscriptions, status=Http200) + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError(fmt("An error ocurred while building the json respose: {resp.error}")) + + return resp.get() + proc installAdminApiHandlers*(router: var RestRouter, node: WakuNode) = installAdminV1GetPeersHandler(router, node) installAdminV1PostPeersHandler(router, node) + installAdminV1GetFilterSubsHandler(router, node) diff --git a/waku/waku_api/rest/admin/openapi.yaml b/waku/waku_api/rest/admin/openapi.yaml index 2ce64e46c..f6118ad70 100644 --- a/waku/waku_api/rest/admin/openapi.yaml +++ b/waku/waku_api/rest/admin/openapi.yaml @@ -49,6 +49,26 @@ paths: description: Cannot connect to one or more peers. '5XX': description: Unexpected error. + /admin/v1/filter/subscriptions: + get: + summary: Get filter protocol subscribers + description: Retrieve information about the serving filter subscriptions + operationId: getFilterInfo + tags: + - admin + responses: + '200': + description: Information about subscribed filter peers and topics + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/FilterSubscription' + '400': + description: Filter Protocol is not mounted to the node + '5XX': + description: Unexpected error. components: schemas: @@ -72,3 +92,24 @@ components: type: string connected: type: boolean + + FilterSubscription: + type: object + required: + - peerId + - filterCriteria + properties: + peerId: + type: string + filterCriteria: + type: array + items: + type: object + required: + - pubsubTopic + - contentTopic + properties: + pubsubTopic: + type: string + contentTopic: + type: string diff --git a/waku/waku_api/rest/admin/types.nim b/waku/waku_api/rest/admin/types.nim index ab6b8f22c..1e051e633 100644 --- a/waku/waku_api/rest/admin/types.nim +++ b/waku/waku_api/rest/admin/types.nim @@ -25,6 +25,16 @@ type type WakuPeers* = seq[WakuPeer] +type + FilterTopic* = object + pubsubTopic*: string + contentTopic*: string + +type + FilterSubscription* = object + peerId*: string + filterCriteria*: seq[FilterTopic] + #### Serialization and deserialization proc writeValue*(writer: var JsonWriter[RestJson], value: ProtocolState) @@ -41,6 +51,20 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: WakuPeer) writer.writeField("protocols", value.protocols) writer.endRecord() +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterTopic) + {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("pubsubTopic", value.pubsubTopic) + writer.writeField("contentTopic", value.contentTopic) + writer.endRecord() + +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscription) + {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("peerId", value.peerId) + writer.writeField("filterCriteria", value.filterCriteria) + writer.endRecord() + proc readValue*(reader: var JsonReader[RestJson], value: var ProtocolState) {.gcsafe, raises: [SerializationError, IOError].} = var @@ -101,6 +125,66 @@ proc readValue*(reader: var JsonReader[RestJson], value: var WakuPeer) protocols: protocols.get() ) +proc readValue*(reader: var JsonReader[RestJson], value: var FilterTopic) + {.gcsafe, raises: [SerializationError, IOError].} = + var + pubsubTopic: Option[string] + contentTopic: Option[string] + + for fieldName in readObjectFields(reader): + case fieldName + of "pubsubTopic": + if pubsubTopic.isSome(): + reader.raiseUnexpectedField("Multiple `pubsubTopic` fields found", "FilterTopic") + pubsubTopic = some(reader.readValue(string)) + of "contentTopic": + if contentTopic.isSome(): + reader.raiseUnexpectedField("Multiple `contentTopic` fields found", "FilterTopic") + contentTopic = some(reader.readValue(string)) + else: + unrecognizedFieldWarning() + + if pubsubTopic.isNone(): + reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") + + if contentTopic.isNone(): + reader.raiseUnexpectedValue("Field `contentTopic` are missing") + + value = FilterTopic( + pubsubTopic: pubsubTopic.get(), + contentTopic: contentTopic.get() + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscription) + {.gcsafe, raises: [SerializationError, IOError].} = + var + peerId: Option[string] + filterCriteria: Option[seq[FilterTopic]] + + for fieldName in readObjectFields(reader): + case fieldName + of "peerId": + if peerId.isSome(): + reader.raiseUnexpectedField("Multiple `peerId` fields found", "FilterSubscription") + peerId = some(reader.readValue(string)) + of "filterCriteria": + if filterCriteria.isSome(): + reader.raiseUnexpectedField("Multiple `filterCriteria` fields found", "FilterSubscription") + filterCriteria = some(reader.readValue(seq[FilterTopic])) + else: + unrecognizedFieldWarning() + + if peerId.isNone(): + reader.raiseUnexpectedValue("Field `peerId` is missing") + + if filterCriteria.isNone(): + reader.raiseUnexpectedValue("Field `filterCriteria` are missing") + + value = FilterSubscription( + peerId: peerId.get(), + filterCriteria: filterCriteria.get() + ) + ## Utility for populating WakuPeers and ProtocolState func `==`*(a, b: ProtocolState): bool {.inline.} = return a.protocol == b.protocol