diff --git a/.gitignore b/.gitignore index cf8c18321..469e90e52 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,9 @@ node_modules/ # Ignore Jetbrains IDE files .idea/ +# ignore vscode files +.vscode/ + # RLN / keystore rlnKeystore.json *.tar.gz diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 83ce9e0dd..d2d37bfae 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -45,6 +45,7 @@ import ../../waku/v2/node/rest/debug/handlers as rest_debug_api, ../../waku/v2/node/rest/relay/handlers as rest_relay_api, ../../waku/v2/node/rest/relay/topic_cache, + ../../waku/v2/node/rest/filter/handlers as rest_filter_api, ../../waku/v2/node/rest/store/handlers as rest_store_api, ../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api, ../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api, @@ -566,6 +567,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity) installRelayApiHandlers(server.router, app.node, relayCache) + ## Filter REST API + if conf.filter: + let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity) + installFilterApiHandlers(server.router, app.node, filterCache) + ## Store REST API installStoreApiHandlers(server.router, app.node) diff --git a/tests/v2/wakunode_rest/test_rest_filter.nim b/tests/v2/wakunode_rest/test_rest_filter.nim new file mode 100644 index 000000000..c17bdccc3 --- /dev/null +++ b/tests/v2/wakunode_rest/test_rest_filter.nim @@ -0,0 +1,191 @@ +{.used.} + +import + std/sequtils, + stew/byteutils, + stew/shims/net, + testutils/unittests, + presto, presto/client as presto_client, + libp2p/crypto/crypto +import + ../../waku/v2/node/message_cache, + ../../waku/common/base64, + ../../waku/v2/waku_core, + ../../waku/v2/waku_node, + ../../waku/v2/node/peer_manager, + ../../waku/v2/waku_filter, + ../../waku/v2/node/rest/server, + ../../waku/v2/node/rest/client, + ../../waku/v2/node/rest/responses, + ../../waku/v2/node/rest/filter/types, + ../../waku/v2/node/rest/filter/handlers as filter_api, + ../../waku/v2/node/rest/filter/client as filter_api_client, + ../../waku/v2/waku_relay, + ../testlib/wakucore, + ../testlib/wakunode + + +proc testWakuNode(): WakuNode = + let + privkey = generateSecp256k1Key() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(0) + + return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) + + +type RestFilterTest = object + node1: WakuNode + node2: WakuNode + restServer: RestServerRef + messageCache: filter_api.MessageCache + client: RestClientRef + + +proc setupRestFilter(): Future[RestFilterTest] {.async.} = + result.node1 = testWakuNode() + result.node2 = testWakuNode() + + await allFutures(result.node1.start(), result.node2.start()) + + await result.node1.mountFilter() + await result.node2.mountFilterClient() + + result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec) + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("0.0.0.0") + result.restServer = RestServerRef.init(restAddress, restPort).tryGet() + + result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity) + + installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) + installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache) + installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache) + + result.restServer.start() + + result.client = newRestHttpClient(initTAddress(restAddress, restPort)) + + return result + + +proc shutdown(self: RestFilterTest) {.async.} = + await self.restServer.stop() + await self.restServer.closeWait() + await allFutures(self.node1.stop(), self.node2.stop()) + + +suite "Waku v2 Rest API - Filter": + asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions": + # Given + let restFilterTest: RestFilterTest = await setupRestFilter() + + # When + let contentFilters = @[DefaultContentTopic + ,ContentTopic("2") + ,ContentTopic("3") + ,ContentTopic("4") + ] + + let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, + pubsubTopic: DefaultPubsubTopic) + let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + restFilterTest.messageCache.isSubscribed(DefaultContentTopic) + restFilterTest.messageCache.isSubscribed("2") + restFilterTest.messageCache.isSubscribed("3") + restFilterTest.messageCache.isSubscribed("4") + + # When - error case + let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "") + let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody) + + check: + badResponse.status == 400 + $badResponse.contentType == $MIMETYPE_TEXT + badResponse.data == "Invalid content body, could not decode. Unable to deserialize data" + + + await restFilterTest.shutdown() + + + asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions": + # Given + let + restFilterTest: RestFilterTest = await setupRestFilter() + + # When + restFilterTest.messageCache.subscribe("1") + restFilterTest.messageCache.subscribe("2") + restFilterTest.messageCache.subscribe("3") + restFilterTest.messageCache.subscribe("4") + + let contentFilters = @[ContentTopic("1") + ,ContentTopic("2") + ,ContentTopic("3") + # ,ContentTopic("4") # Keep this subscription for check + ] + + # When + let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters, + pubsubTopic: DefaultPubsubTopic) + let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + not restFilterTest.messageCache.isSubscribed("1") + not restFilterTest.messageCache.isSubscribed("2") + not restFilterTest.messageCache.isSubscribed("3") + restFilterTest.messageCache.isSubscribed("4") + + await restFilterTest.shutdown() + + + asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}": + # Given + + let + restFilterTest = await setupRestFilter() + + let pubSubTopic = "/waku/2/default-waku/proto" + let contentTopic = ContentTopic( "content-topic-x" ) + + let messages = @[ + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + ] + + restFilterTest.messageCache.subscribe(contentTopic) + for msg in messages: + restFilterTest.messageCache.addMessage(contentTopic, msg) + + # When + let response = await restFilterTest.client.filterGetMessagesV1(contentTopic) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.len == 3 + response.data.all do (msg: FilterWakuMessage) -> bool: + msg.payload == base64.encode("TEST-1") and + msg.contentTopic.get().string == "content-topic-x" and + msg.version.get() == 2 and + msg.timestamp.get() != Timestamp(0) + + await restFilterTest.shutdown() diff --git a/waku/v2/node/rest/filter/client.nim b/waku/v2/node/rest/filter/client.nim new file mode 100644 index 000000000..a5b53c01f --- /dev/null +++ b/waku/v2/node/rest/filter/client.nim @@ -0,0 +1,68 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sets, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../waku_core, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest client" + +proc encodeBytes*(value: FilterSubscriptionsRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[string], value: openarray[byte], + contentType: Opt[ContentTypeData]): RestResult[string] = + if MediaType.init($contentType) != MIMETYPE_TEXT: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + var res: string + if len(value) > 0: + res = newString(len(value)) + copyMem(addr res[0], unsafeAddr value[0], len(value)) + return ok(res) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest): + RestResponse[string] + {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.} + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest): + RestResponse[string] + {.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.} + +proc decodeBytes*(t: typedesc[FilterGetMessagesResponse], + data: openArray[byte], + contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] = + if MediaType.init($contentType) != MIMETYPE_JSON: + error "Unsupported response contentType value", contentType = contentType + return err("Unsupported response contentType") + + let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data) + return ok(decoded) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc filterGetMessagesV1*(contentTopic: string): + RestResponse[FilterGetMessagesResponse] + {.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.} diff --git a/waku/v2/node/rest/filter/handlers.nim b/waku/v2/node/rest/filter/handlers.nim new file mode 100644 index 000000000..d61ae52c7 --- /dev/null +++ b/waku/v2/node/rest/filter/handlers.nim @@ -0,0 +1,155 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sequtils, + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/route, + presto/common +import + ../../../waku_core, + ../../../waku_filter, + ../../../waku_filter/client, + ../../message_cache, + ../../peer_manager, + ../../waku_node, + ../serdes, + ../responses, + ./types + +export types + +logScope: + topics = "waku node rest filter_api" + +const futTimeoutForSubscriptionProcessing* = 5.seconds + +#### Request handlers + +const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions" + +const filterMessageCacheDefaultCapacity* = 30 + +type + MessageCache* = message_cache.MessageCache[ContentTopic] + +func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] = + if contentBody.isNone(): + return err(RestApiResponse.badRequest("Missing content body")) + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json")) + + let reqBodyData = contentBody.get().data + + let requestResult = decodeFromJsonBytes(T, reqBodyData) + if requestResult.isErr(): + return err(RestApiResponse.badRequest("Invalid content body, could not decode. " & + $requestResult.error)) + + return ok(requestResult.get()) + +proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + let pushHandler: FilterPushHandler = + proc(pubsubTopic: PubsubTopic, + msg: WakuMessage) {.async, gcsafe, closure.} = + cache.addMessage(msg.contentTopic, msg) + + router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a pubsubTopic + # debug "post_waku_v2_filter_v1_subscriptions" + + let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error + + let req: FilterSubscriptionsRequest = decodedBody.value() + + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + + if peerOpt.isNone(): + return RestApiResponse.internalServerError("No suitable remote filter peers") + + let subFut = node.filterSubscribe(req.pubsubTopic, + req.contentFilters, + pushHandler, + peerOpt.get()) + + if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to subscribe to contentFilters do to timeout!" + return RestApiResponse.internalServerError("Failed to subscribe to contentFilters") + + # Successfully subscribed to all content filters + for cTopic in req.contentFilters: + cache.subscribe(cTopic) + + return RestApiResponse.ok() + +proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + ## Subscribes a node to a list of contentTopics of a PubSub topic + # debug "delete_waku_v2_filter_v1_subscriptions" + + let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody) + + if decodedBody.isErr(): + return decodedBody.error + + let req: FilterSubscriptionsRequest = decodedBody.value() + + let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters) + if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing): + error "Failed to unsubscribe from contentFilters due to timeout!" + return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters") + + for cTopic in req.contentFilters: + cache.unsubscribe(cTopic) + + # Successfully unsubscribed from all requested contentTopics + return RestApiResponse.ok() + +const ROUTE_RELAY_MESSAGESV1* = "/filter/v1/messages/{contentTopic}" + +proc installFilterGetMessagesV1Handler*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse: + ## Returns all WakuMessages received on a specified content topic since the + ## last time this method was called + ## TODO: ability to specify a return message limit + # debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic + + if contentTopic.isErr(): + return RestApiResponse.badRequest("Missing contentTopic") + + let contentTopic = contentTopic.get() + + let msgRes = cache.getMessages(contentTopic, clear=true) + if msgRes.isErr(): + return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic) + + let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage)) + let resp = RestApiResponse.jsonResponse(data, status=Http200) + if resp.isErr(): + error "An error ocurred while building the json respose: ", error=resp.error + return RestApiResponse.internalServerError("An error ocurred while building the json respose") + + return resp.get() + +proc installFilterApiHandlers*(router: var RestRouter, + node: WakuNode, + cache: MessageCache) = + installFilterPostSubscriptionsV1Handler(router, node, cache) + installFilterDeleteSubscriptionsV1Handler(router, node, cache) + installFilterGetMessagesV1Handler(router, node, cache) diff --git a/waku/v2/node/rest/filter/openapi.yaml b/waku/v2/node/rest/filter/openapi.yaml index 38b9ae730..d913eb08a 100644 --- a/waku/v2/node/rest/filter/openapi.yaml +++ b/waku/v2/node/rest/filter/openapi.yaml @@ -21,7 +21,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterPostSubscriptionsRequest' + $ref: '#/components/schemas/FilterSubscriptionsRequest' responses: '200': description: OK @@ -45,7 +45,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/FilterDeleteSubscriptionsRequest' + $ref: '#/components/schemas/FilterSubscriptionsRequest' responses: '200': description: OK @@ -61,7 +61,8 @@ paths: '5XX': description: Unexpected error. - /filter/v1/messages/{topic}: + # TODO: Review the path of this endpoint due maybe query for list of contentTopics matching + /filter/v1/messages/{contentTopic}: get: # get_waku_v2_filter_v1_messages summary: Get the latest messages on the polled content topic description: Get a list of messages that were received on a subscribed content topic after the last time this method was called. @@ -70,11 +71,11 @@ paths: - filter parameters: - in: path - name: topic # Note the name is the same as in the path + name: contentTopic # Note the name is the same as in the path required: true schema: type: string - description: The user ID + description: Content topic of message responses: '200': description: The latest messages on the polled topic. @@ -112,19 +113,7 @@ components: required: - payload - FilterPostSubscriptionsRequest: - type: object - properties: - contentFilters: - type: array - items: - $ref: '#/components/schemas/ContentTopic' - pubsubTopic: - $ref: "#/components/schemas/PubSubTopic" - required: - - contentFilters - - FilterDeleteSubscriptionsRequest: + FilterSubscriptionsRequest: type: object properties: contentFilters: diff --git a/waku/v2/node/rest/filter/types.nim b/waku/v2/node/rest/filter/types.nim new file mode 100644 index 000000000..22b2ee4da --- /dev/null +++ b/waku/v2/node/rest/filter/types.nim @@ -0,0 +1,151 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[sets, strformat], + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../../common/base64, + ../../../waku_core, + ../serdes + +#### Types + +type FilterWakuMessage* = object + payload*: Base64String + contentTopic*: Option[ContentTopic] + version*: Option[Natural] + timestamp*: Option[int64] + +type FilterGetMessagesResponse* = seq[FilterWakuMessage] + +type FilterSubscriptionsRequest* = object + pubsubTopic*: PubSubTopic + contentFilters*: seq[ContentTopic] + +#### Type conversion + +proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage = + FilterWakuMessage( + payload: base64.encode(msg.payload), + contentTopic: some(msg.contentTopic), + version: some(Natural(msg.version)), + timestamp: some(msg.timestamp) + ) + +proc toWakuMessage*(msg: FilterWakuMessage, version = 0): Result[WakuMessage, string] = + let + payload = ?msg.payload.decode() + contentTopic = msg.contentTopic.get(DefaultContentTopic) + version = uint32(msg.version.get(version)) + timestamp = msg.timestamp.get(0) + + ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, timestamp: timestamp)) + +#### Serialization and deserialization + +proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String) + {.raises: [IOError].} = + writer.writeValue(string(value)) + +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage) + {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("payload", value.payload) + if value.contentTopic.isSome: + writer.writeField("contentTopic", value.contentTopic) + if value.version.isSome: + writer.writeField("version", value.version) + if value.timestamp.isSome: + writer.writeField("timestamp", value.timestamp) + writer.endRecord() + +proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest) + {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("pubsubTopic", value.pubsubTopic) + writer.writeField("contentFilters", value.contentFilters) + writer.endRecord() + +proc readValue*(reader: var JsonReader[RestJson], value: var Base64String) + {.raises: [SerializationError, IOError].} = + value = Base64String(reader.readValue(string)) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage) + {.raises: [SerializationError, IOError].} = + var + payload = none(Base64String) + contentTopic = none(ContentTopic) + version = none(Natural) + timestamp = none(int64) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterWakuMessage") + + case fieldName + of "payload": + payload = some(reader.readValue(Base64String)) + of "contentTopic": + contentTopic = some(reader.readValue(ContentTopic)) + of "version": + version = some(reader.readValue(Natural)) + of "timestamp": + timestamp = some(reader.readValue(int64)) + else: + unrecognizedFieldWarning() + + if payload.isNone(): + reader.raiseUnexpectedValue("Field `payload` is missing") + + value = FilterWakuMessage( + payload: payload.get(), + contentTopic: contentTopic, + version: version, + timestamp: timestamp + ) + +proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest) + {.raises: [SerializationError, IOError].} = + var + pubsubTopic = none(PubsubTopic) + contentFilters = none(seq[ContentTopic]) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except CatchableError: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest") + + case fieldName + of "pubsubTopic": + pubsubTopic = some(reader.readValue(PubsubTopic)) + of "contentFilters": + contentFilters = some(reader.readValue(seq[ContentTopic])) + else: + unrecognizedFieldWarning() + + if pubsubTopic.isNone(): + reader.raiseUnexpectedValue("Field `pubsubTopic` is missing") + + if contentFilters.isNone(): + reader.raiseUnexpectedValue("Field `contentFilters` is missing") + + if contentFilters.get().len() == 0: + reader.raiseUnexpectedValue("Field `contentFilters` is empty") + + value = FilterSubscriptionsRequest( + pubsubTopic: pubsubTopic.get(), + contentFilters: contentFilters.get() + ) \ No newline at end of file