From b2acb54d6a77acbff50f006ed3d9d30a169ea6cd Mon Sep 17 00:00:00 2001 From: Ivansete-status <128452529+Ivansete-status@users.noreply.github.com> Date: Thu, 6 Apr 2023 11:43:19 +0200 Subject: [PATCH] feat(rest-api-store): new rest api to retrieve store waku messages (#1611) (#1630) * feat: new rest api based on the current store json-rpc api and following the same structure as the current relay rest api. * feat: the store api attend GET requests to retrieve historical messages * feat: unit tests. * feat: allow return message to rest-client in case error (4XX or 5XX) * chore: always allow to call the store api endpoints (only rest) without explicit storenode (#1575) * feat: always mounting the current node as storenode client --- apps/wakunode2/wakunode2.nim | 2 +- apps/wakunode2/wakunode2_setup_rest.nim | 4 + docs/api/v2/rest-api.md | 33 +- docs/operators/how-to/configure-rest-api.md | 23 + docs/operators/how-to/configure.md | 3 +- tests/all_tests_v2.nim | 3 +- tests/v2/wakunode_rest/test_rest_store.nim | 522 ++++++++++++++++++++ waku/v2/node/rest/responses.nim | 23 +- waku/v2/node/rest/store/client.nim | 72 +++ waku/v2/node/rest/store/handlers.nim | 248 ++++++++++ waku/v2/node/rest/store/openapi.yaml | 203 ++++++++ waku/v2/node/rest/store/types.nim | 375 ++++++++++++++ waku/v2/utils/peers.nim | 17 +- 13 files changed, 1503 insertions(+), 25 deletions(-) create mode 100644 docs/operators/how-to/configure-rest-api.md create mode 100644 tests/v2/wakunode_rest/test_rest_store.nim create mode 100644 waku/v2/node/rest/store/client.nim create mode 100644 waku/v2/node/rest/store/handlers.nim create mode 100644 waku/v2/node/rest/store/openapi.yaml create mode 100644 waku/v2/node/rest/store/types.nim diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 6a4917aa6..e3f9acca4 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -437,9 +437,9 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, executeMessageRetentionPolicy(node) startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval) + mountStoreClient(node) if conf.storenode != "": try: - mountStoreClient(node) let storenode = parseRemotePeerInfo(conf.storenode) node.peerManager.addServicePeer(storenode, WakuStoreCodec) except CatchableError: diff --git a/apps/wakunode2/wakunode2_setup_rest.nim b/apps/wakunode2/wakunode2_setup_rest.nim index 5b2491de8..69e83fa56 100644 --- a/apps/wakunode2/wakunode2_setup_rest.nim +++ b/apps/wakunode2/wakunode2_setup_rest.nim @@ -13,6 +13,7 @@ import ../../waku/v2/node/rest/debug/handlers as debug_api, ../../waku/v2/node/rest/relay/handlers as relay_api, ../../waku/v2/node/rest/relay/topic_cache, + ../../waku/v2/node/rest/store/handlers as store_api, ./config @@ -36,5 +37,8 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity) installRelayApiHandlers(server.router, node, relayCache) + ## Store REST API + installStoreApiHandlers(server.router, node) + server.start() info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" diff --git a/docs/api/v2/rest-api.md b/docs/api/v2/rest-api.md index f874dcc62..bffbf328c 100644 --- a/docs/api/v2/rest-api.md +++ b/docs/api/v2/rest-api.md @@ -13,14 +13,29 @@ This API is divided in different _namespaces_ which group a set of resources: | `/admin` | Privileged access to the internal operations of the node. | | `/private` | Provides functionality to encrypt/decrypt `WakuMessage` payloads using either symmetric or asymmetric cryptography. This allows backwards compatibility with Waku v1 nodes. | -The full HTTP REST API documentation can be found here: [TBD]() ### API Specification -The HTTP REST API has been designed following the OpenAPI 3.0.3 standard specification format. The OpenAPI specification file can be found here: [TBD]() +The HTTP REST API has been designed following the OpenAPI 3.0.3 standard specification format. +The OpenAPI specification files can be found here: + +| Namespace | OpenAPI file | +------------|-------------- +| `/debug` | [openapi.yaml](https://github.com/waku-org/nwaku/blob/master/waku/v2/node/rest/debug/openapi.yaml) | +| `/relay` | [openapi.yaml](https://github.com/waku-org/nwaku/blob/master/waku/v2/node/rest/relay/openapi.yaml) | +| `/store` | [openapi.yaml](https://github.com/waku-org/nwaku/blob/master/waku/v2/node/rest/store/openapi.yaml) | +| `/filter` | [openapi.yaml](https://github.com/waku-org/nwaku/blob/master/waku/v2/node/rest/filter/openapi.yaml) | + +The OpenAPI files can be analysed online with [Redocly](https://redocly.github.io/redoc/) Check the [OpenAPI Tools](https://openapi.tools/) site for the right tool for you (e.g. REST API client generator) +A particular OpenAPI spec can be easily imported into [Postman](https://www.postman.com/downloads/) + 1. Open Postman. + 2. Click on File -> Import... + 2. Load the openapi.yaml of interest, stored in your computer. + 3. Then, requests can be made from within the 'Collections' section. + ### Usage example @@ -40,16 +55,4 @@ curl http://localhost:8645/debug/v1/info -s | jq ### Node configuration - -A subset of the node configuration can be used to modify the behaviour of the HTTP REST API. These are the relevant command line options: - -| CLI option | Description | Default value | -|------------|-------------|---------------| -|`--rest` | Enable Waku REST HTTP server. | `false` | -|`--rest-address` | Listening address of the REST HTTP server. | `127.0.0.1` | -|`--rest-port` | Listening port of the REST HTTP server. | `8645` | -|`--rest-relay-cache-capacity` | Capacity of the Relay REST API message cache. | `30` | -|`--rest-admin` | Enable access to REST HTTP Admin API. | `false` | -|`--rest-private` | Enable access to REST HTTP Private API. | `false` | - -Note that these command line options have their counterpart option in the node configuration file. +Find details [here](https://github.com/waku-org/nwaku/tree/master/docs/operators/how-to/configure-rest-api.md) diff --git a/docs/operators/how-to/configure-rest-api.md b/docs/operators/how-to/configure-rest-api.md new file mode 100644 index 000000000..3fe070aab --- /dev/null +++ b/docs/operators/how-to/configure-rest-api.md @@ -0,0 +1,23 @@ + +# Configure a REST API node + +A subset of the node configuration can be used to modify the behaviour of the HTTP REST API. + +These are the relevant command line options: + +| CLI option | Description | Default value | +|------------|-------------|---------------| +|`--rest` | Enable Waku REST HTTP server. | `false` | +|`--rest-address` | Listening address of the REST HTTP server. | `127.0.0.1` | +|`--rest-port` | Listening port of the REST HTTP server. | `8645` | +|`--rest-relay-cache-capacity` | Capacity of the Relay REST API message cache. | `30` | +|`--rest-admin` | Enable access to REST HTTP Admin API. | `false` | +|`--rest-private` | Enable access to REST HTTP Private API. | `false` | + +Note that these command line options have their counterpart option in the node configuration file. + +Example: + +```shell +wakunode2 --rest=true +``` diff --git a/docs/operators/how-to/configure.md b/docs/operators/how-to/configure.md index b7aea2d9d..6d777bf78 100644 --- a/docs/operators/how-to/configure.md +++ b/docs/operators/how-to/configure.md @@ -124,7 +124,7 @@ The following options are available: ## Configuration use cases -This an index of tutorials explaining how to configure your nwaku node for different use cases. +This is an index of tutorials explaining how to configure your nwaku node for different use cases. 1. [Connect to other peers](./connect.md) 2. [Configure a domain name](./configure-domain.md) @@ -133,3 +133,4 @@ This an index of tutorials explaining how to configure your nwaku node for diffe 5. [Generate and configure a node key](./configure-key.md) 6. [Configure websocket transport](./configure-websocket.md) 7. [Run nwaku with rate limiting enabled](./run-with-rln.md) +8. [Configure a REST API node](./configure-rest-api.md) diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index aa73ba452..dd5ec22fd 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -78,7 +78,8 @@ import ./v2/wakunode_rest/test_rest_debug_serdes, ./v2/wakunode_rest/test_rest_relay, ./v2/wakunode_rest/test_rest_relay_serdes, - ./v2/wakunode_rest/test_rest_serdes + ./v2/wakunode_rest/test_rest_serdes, + ./v2/wakunode_rest/test_rest_store ## Apps diff --git a/tests/v2/wakunode_rest/test_rest_store.nim b/tests/v2/wakunode_rest/test_rest_store.nim new file mode 100644 index 000000000..e3012735d --- /dev/null +++ b/tests/v2/wakunode_rest/test_rest_store.nim @@ -0,0 +1,522 @@ +{.used.} + +import + std/[options, times], + stew/shims/net as stewNet, + chronicles, + testutils/unittests, + eth/keys, + presto, presto/client as presto_client, + libp2p/crypto/crypto +import + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/node/waku_node, + ../../waku/v2/node/rest/server, + ../../waku/v2/node/rest/client, + ../../waku/v2/node/rest/responses, + ../../../waku/v2/node/rest/store/handlers as store_api, + ../../../waku/v2/node/rest/store/client as store_api_client, + ../../../waku/v2/node/rest/store/types, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/protocol/waku_archive/driver/queue_driver, + ../../../waku/v2/protocol/waku_store as waku_store, + ../../../waku/v2/utils/peers, + ../../../waku/v2/utils/time, + ../../v2/testlib/common, + ../../v2/testlib/wakucore, + ../../v2/testlib/wakunode + +logScope: + topics = "waku node rest store_api test" + +proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = + let + digest = waku_archive.computeDigest(message) + receivedTime = if message.timestamp > 0: message.timestamp + else: getNanosecondTime(getTime().toUnixFloat()) + + store.put(pubsubTopic, message, digest, receivedTime) + +# Creates a new WakuNode +proc testWakuNode(): WakuNode = + let + privkey = generateSecp256k1Key() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(0) + + return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port)) + +################################################################################ +# Beginning of the tests +################################################################################ +procSuite "Waku v2 Rest API - Store": + + asyncTest "MessageDigest <-> string conversions": + # Validate MessageDigest conversion from a WakuMessage obj + let wakuMsg = WakuMessage( + contentTopic: "Test content topic", + payload: @[byte('H'), byte('i'), byte('!')] + ) + + let messageDigest = waku_store.computeDigest(wakuMsg) + let restMsgDigest = some(messageDigest.toRestStringMessageDigest()) + let parsedMsgDigest = restMsgDigest.parseMsgDigest().value + + check: + messageDigest == parsedMsgDigest.get() + + # Random validation. Obtained the raw values manually + let expected = some("ZjNhM2Q2NDkwMTE0MjMzNDg0MzJlMDdiZGI3NzIwYTc%3D") + let msgDigest = expected.parseMsgDigest().value + check: + expected.get() == msgDigest.get().toRestStringMessageDigest() + + asyncTest "Filter by start and end time": + let node = testWakuNode() + await node.start() + await node.mountRelay() + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + installStoreApiHandlers(restServer.router, node) + restServer.start() + + # WakuStore setup + let driver: ArchiveDriver = QueueDriver.new() + node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + await node.mountStore() + node.mountStoreClient() + + let key = generateEcdsaKey() + var peerSwitch = newStandardSwitch(some(key)) + await peerSwitch.start() + + peerSwitch.mount(node.wakuStore) + + # Now prime it with some history before tests + let msgList = @[ + fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0), + fakeWakuMessage(@[byte 1], ts=1), + fakeWakuMessage(@[byte 1, byte 2], ts=2), + fakeWakuMessage(@[byte 1], ts=3), + fakeWakuMessage(@[byte 1], ts=4), + fakeWakuMessage(@[byte 1], ts=5), + fakeWakuMessage(@[byte 1], ts=6), + fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("c2"), ts=9) + ] + for msg in msgList: + require driver.put(DefaultPubsubTopic, msg).isOk() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo() + let fullAddr = $remotePeerInfo.addrs[0] & + "/p2p/" & $remotePeerInfo.peerId + + # Apply filter by start and end timestamps + var response = + await client.getStoreMessagesV1( + encodeUrl(fullAddr), + encodeUrl(DefaultPubsubTopic), + "", # empty content topics. Don't filter by this field + "3", # start time + "6", # end time + "", # sender time + "", # store time + "", # base64-encoded digest + "", # empty implies default page size + "true" # ascending + ) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.messages.len == 4 + + await restServer.stop() + await restServer.closeWait() + await node.stop() + + asyncTest "Store node history response - forward pagination": + # Test adapted from the analogous present at waku_store/test_wakunode_store.nim + let node = testWakuNode() + await node.start() + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + installStoreApiHandlers(restServer.router, node) + restServer.start() + + # WakuStore setup + let driver: ArchiveDriver = QueueDriver.new() + node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + await node.mountStore() + node.mountStoreClient() + + let key = generateEcdsaKey() + var peerSwitch = newStandardSwitch(some(key)) + await peerSwitch.start() + + peerSwitch.mount(node.wakuStore) + + # Now prime it with some history before tests + let timeOrigin = common.now() + let msgList = @[ + fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 01], ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 02], ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 03], ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 04], ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 05], ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 06], ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 07], ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 08], ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin)) + ] + for msg in msgList: + require driver.put(DefaultPubsubTopic, msg).isOk() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo() + let fullAddr = $remotePeerInfo.addrs[0] & + "/p2p/" & $remotePeerInfo.peerId + + var pages = newSeq[seq[WakuMessage]](2) + + # Fields that compose a HistoryCursor object + var reqPubsubTopic = DefaultPubsubTopic + var reqSenderTime = Timestamp(0) + var reqStoreTime = Timestamp(0) + var reqDigest = waku_store.MessageDigest() + + for i in 0..<2: + let response = + await client.getStoreMessagesV1( + encodeUrl(fullAddr), + encodeUrl(reqPubsubTopic), + "", # content topics. Empty ignores the field. + "", # start time. Empty ignores the field. + "", # end time. Empty ignores the field. + encodeUrl($reqSenderTime), # sender time + encodeUrl($reqStoreTime), # store time + reqDigest.toRestStringMessageDigest(), # base64-encoded digest. Empty ignores the field. + "7", # page size. Empty implies default page size. + "true" # ascending + ) + + var wakuMessages = newSeq[WakuMessage](0) + for j in 0.. 0: + res = newString(len(data)) + copyMem(addr res[0], unsafeAddr data[0], len(data)) + + return ok(StoreResponseRest( + messages: newSeq[StoreWakuMessage](0), + cursor: none(HistoryCursorRest), + # field that contain error information + errorMessage: some(res) + )) + + # If everything goes wrong + return err(cstring("Unsupported contentType " & $contentType)) + + +proc getStoreMessagesV1*( + # URL-encoded reference to the store-node + peerAddr: string = "", + pubsubTopic: string = "", + # URL-encoded comma-separated list of content topics + contentTopics: string = "", + startTime: string = "", + endTime: string = "", + + # Optional cursor fields + senderTime: string = "", + storeTime: string = "", + digest: string = "", # base64-encoded digest + + pageSize: string = "", + ascending: string = "" + ): + RestResponse[StoreResponseRest] + + {.rest, + endpoint: "/store/v1/messages", + meth: HttpMethod.MethodGet.} diff --git a/waku/v2/node/rest/store/handlers.nim b/waku/v2/node/rest/store/handlers.nim new file mode 100644 index 000000000..67bb6d340 --- /dev/null +++ b/waku/v2/node/rest/store/handlers.nim @@ -0,0 +1,248 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/strformat, + stew/results, + chronicles, + json_serialization, + presto/route +import + ../../../../common/base64, + ../../../protocol/waku_message/topics/content_topic, + ../../../protocol/waku_store/common, + ../../../utils/time, + ../../waku_node, + ../../peer_manager, + ../responses, + ../serdes, + ./types + +export types + +logScope: + topics = "waku node rest store_api" + +const futTimeout* = 5.seconds # Max time to wait for futures + +# Queries the store-node with the query parameters and +# returns a RestApiResponse that is sent back to the api client. +proc performHistoryQuery(selfNode: WakuNode, + histQuery: HistoryQuery, + storePeer: RemotePeerInfo): + + Future[RestApiResponse] {.async.} = + + let queryFut = selfNode.query(histQuery, storePeer) + if not await queryFut.withTimeout(futTimeout): + const msg = "No history response received (timeout)" + error msg + return RestApiResponse.internalServerError(msg) + + let res = queryFut.read() + if res.isErr(): + const msg = "Error occurred in queryFut.read()" + error msg, error=res.error + return RestApiResponse.internalServerError( + fmt("{msg} [{res.error}]")) + + let storeResp = res.value.toStoreResponseRest() + let resp = RestApiResponse.jsonResponse(storeResp, status=Http200) + if resp.isErr(): + const msg = "Error building the json respose" + error msg, error=resp.error + return RestApiResponse.internalServerError( + fmt("{msg} [{resp.error}]")) + + return resp.get() + +# Converts a string time representation into an Option[Timestamp]. +# Only positive time is considered a valid Timestamp in the request +proc parseTime(input: Option[string]): + Result[Option[Timestamp], string] = + if input.isSome() and input.get() != "": + try: + let time = parseInt(input.get()) + if time > 0: + return ok(some(Timestamp(time))) + except ValueError: + return err("Problem parsing time [" & + getCurrentExceptionMsg() & "]") + + return ok(none(Timestamp)) + +# Generates a history query cursor as per the given params +proc parseCursor(parsedPubsubTopic: Option[string], + senderTime: Option[string], + storeTime: Option[string], + digest: Option[string]): + Result[Option[HistoryCursor], string] = + + # Parse sender time + let parsedSenderTime = parseTime(senderTime) + if not parsedSenderTime.isOk(): + return err(parsedSenderTime.error) + + # Parse store time + let parsedStoreTime = parseTime(storeTime) + if not parsedStoreTime.isOk(): + return err(parsedStoreTime.error) + + # Parse message digest + let parsedMsgDigest = parseMsgDigest(digest) + if not parsedMsgDigest.isOk(): + return err(parsedMsgDigest.error) + + # Parse cursor information + if parsedPubsubTopic.isSome() and + parsedSenderTime.value.isSome() and + parsedStoreTime.value.isSome() and + parsedMsgDigest.value.isSome(): + + return ok(some( + HistoryCursor( + pubsubTopic: parsedPubsubTopic.get(), + senderTime: parsedSenderTime.value.get(), + storeTime: parsedStoreTime.value.get(), + digest: parsedMsgDigest.value.get()) + )) + else: + return ok(none(HistoryCursor)) + +# Creates a HistoryQuery from the given params +proc createHistoryQuery(pubsubTopic: Option[string], + contentTopics: Option[string], + senderTime: Option[string], + storeTime: Option[string], + digest: Option[string], + startTime: Option[string], + endTime: Option[string], + pageSize: Option[string], + ascending: Option[string]): + + Result[HistoryQuery, string] = + + # Parse pubsubTopic parameter + var parsedPubsubTopic = none(string) + if pubsubTopic.isSome(): + let decodedPubsubTopic = decodeUrl(pubsubTopic.get()) + if decodedPubsubTopic != "": + parsedPubsubTopic = some(decodedPubsubTopic) + + # Parse the content topics + var parsedContentTopics = newSeq[ContentTopic](0) + if contentTopics.isSome(): + let ctList = decodeUrl(contentTopics.get()) + if ctList != "": + for ct in ctList.split(','): + parsedContentTopics.add(ct) + + # Parse cursor information + let parsedCursor = ? parseCursor(parsedPubsubTopic, + senderTime, + storeTime, + digest) + + # Parse page size field + var parsedPagedSize = DefaultPageSize + if pageSize.isSome() and pageSize.get() != "": + try: + parsedPagedSize = uint64(parseInt(pageSize.get())) + except CatchableError: + return err("Problem parsing page size [" & + getCurrentExceptionMsg() & "]") + + # Parse start time + let parsedStartTime = ? parseTime(startTime) + + # Parse end time + let parsedEndTime = ? parseTime(endTime) + + # Parse ascending field + var parsedAscending = true + if ascending.isSome() and ascending.get() != "": + parsedAscending = ascending.get() == "true" + + return ok( + HistoryQuery(pubsubTopic: parsedPubsubTopic, + contentTopics: parsedContentTopics, + startTime: parsedStartTime, + endTime: parsedEndTime, + ascending: parsedAscending, + pageSize: parsedPagedSize, + cursor: parsedCursor + )) + +# Simple type conversion. The "Option[Result[string, cstring]]" +# type is used by the nim-presto library. +proc toOpt(self: Option[Result[string, cstring]]): Option[string] = + if not self.isSome() or self.get().value == "": + return none(string) + if self.isSome() and self.get().value != "": + return some(self.get().value) + + +# Subscribes the rest handler to attend "/store/v1/messages" requests +proc installStoreV1Handler(router: var RestRouter, + node: WakuNode) = + + # Handles the store-query request according to the passed parameters + router.api(MethodGet, + "/store/v1/messages") do ( + peerAddr: Option[string], + pubsubTopic: Option[string], + contentTopics: Option[string], + senderTime: Option[string], + storeTime: Option[string], + digest: Option[string], + startTime: Option[string], + endTime: Option[string], + pageSize: Option[string], + ascending: Option[string] + ) -> RestApiResponse: + + debug "REST-GET /store/v1/messages ", peer_addr = $peerAddr + + # All the GET parameters are URL-encoded (https://en.wikipedia.org/wiki/URL_encoding) + # Example: + # /store/v1/messages?peerAddr=%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\&pubsubTopic=my-waku-topic + + # Parse the peer address parameter + var parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()) + if not parsedPeerAddr.isOk(): + return RestApiResponse.badRequest(parsedPeerAddr.error) + + var peerOpt = none(RemotePeerInfo) + if parsedPeerAddr.value.isSome(): + peerOpt = parsedPeerAddr.value + else: + # The user didn't specify any store peer address. + peerOpt = node.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + return RestApiResponse.preconditionFailed("Missing known store-peer node") + + # Parse the rest of the parameters and create a HistoryQuery + let histQuery = createHistoryQuery( + pubsubTopic.toOpt(), + contentTopics.toOpt(), + senderTime.toOpt(), + storeTime.toOpt(), + digest.toOpt(), + startTime.toOpt(), + endTime.toOpt(), + pageSize.toOpt(), + ascending.toOpt() + ) + + if not histQuery.isOk(): + return RestApiResponse.badRequest(histQuery.error) + + return await node.performHistoryQuery(histQuery.value, + peerOpt.get()) + +# Registers the Api Handlers +proc installStoreApiHandlers*(router: var RestRouter, + node: WakuNode) = + installStoreV1Handler(router, node) diff --git a/waku/v2/node/rest/store/openapi.yaml b/waku/v2/node/rest/store/openapi.yaml new file mode 100644 index 000000000..cbd653ca3 --- /dev/null +++ b/waku/v2/node/rest/store/openapi.yaml @@ -0,0 +1,203 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: store + description: Store REST API for WakuV2 node + +paths: + /store/v1/messages: + get: + summary: Gets message history + description: > + Retrieves WakuV2 message history. The returned history + can be potentially filtered by optional request parameters. + operationId: getMessageHistory + tags: + - store + parameters: + - name: peerAddr + in: query + schema: + type: string + required: true + description: > + P2P fully qualified peer multiaddress + in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded. + example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN' + + - name: pubsubTopic + in: query + schema: + type: string + description: > + The pubsub topic on which a WakuMessage is published. + If left empty, no filtering is applied. + It is also intended for pagination purposes. + It should be a URL-encoded string. + example: 'my%20pubsub%20topic' + + - name: contentTopics + in: query + schema: string + description: > + Comma-separated list of content topics. When specified, + only WakuMessages that are linked to any of the given + content topics will be delivered in the get response. + It should be a URL-encoded-comma-separated string. + example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic' + + - name: startTime + in: query + schema: + type: string + description: > + The inclusive lower bound on the timestamp of + queried WakuMessages. This field holds the + Unix epoch time in nanoseconds as a 64-bits + integer value. + example: '1680590945000000000' + + - name: endTime + in: query + schema: + type: string + description: > + The inclusive upper bound on the timestamp of + queried WakuMessages. This field holds the + Unix epoch time in nanoseconds as a 64-bits + integer value. + example: '1680590945000000000' + + - name: senderTime + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + Represents the Unix time in nanoseconds at which a message was generated. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: '1680590947000000000' + + - name: storeTime + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + Represents the Unix time in nanoseconds at which a message was stored. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: '1680590945000000000' + + - name: digest + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + URL-base64-encoded string computed as a hash of the + a message content topic plus a message payload. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D' + + - name: pageSize + in: query + schema: + type: string + description: > + Number of messages to retrieve per page + example: '5' + + - name: ascending + in: query + schema: + type: string + description: > + "true" for paging forward, "false" for paging backward + example: "true" + + responses: + '200': + description: WakuV2 message history. + content: + application/json: + schema: + $ref: '#/components/schemas/StoreResponse' + '400': + description: Bad request error. + content: + text/plain: + type: string + '412': + description: Precondition failed. + content: + text/plain: + type: string + '500': + description: Internal server error. + content: + text/plain: + type: string + +components: + schemas: + StoreResponse: + type: object + properties: + messages: + type: array + items: + $ref: '#/components/schemas/WakuMessage' + cursor: + $ref: '#/components/schemas/HistoryCursor' + error_message: + type: string + required: + - messages + + HistoryCursor: + type: object + properties: + pubsub_topic: + type: string + sender_time: + type: string + store_time: + type: string + digest: + type: string + required: + - pubsub_topic + - sender_time + - store_time + - digest + + WakuMessage: + type: object + properties: + payload: + type: string + content_topic: + type: string + version: + type: integer + format: int32 + timestamp: + type: integer + format: int64 + ephemeral: + type: boolean + required: + - payload + - content_topic diff --git a/waku/v2/node/rest/store/types.nim b/waku/v2/node/rest/store/types.nim new file mode 100644 index 000000000..811cd5b6c --- /dev/null +++ b/waku/v2/node/rest/store/types.nim @@ -0,0 +1,375 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[sets, strformat, uri], + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import + ../../../protocol/waku_store/common as waku_store_common, + ../../../../common/base64, + ../../../utils/time, + ../../../protocol/waku_message/topics/content_topic, + ../../../protocol/waku_message/topics/pubsub_topic, + ../../../protocol/waku_message/message, + ../serdes + + +#### Types + +type + HistoryCursorRest* = object + pubsubTopic*: PubsubTopic + senderTime*: Timestamp + storeTime*: Timestamp + digest*: MessageDigest + + StoreRequestRest* = object + # inspired by https://github.com/waku-org/nwaku/blob/f95147f5b7edfd45f914586f2d41cd18fb0e0d18/waku/v2/protocol/waku_store/common.nim#L52 + pubsubTopic*: Option[PubsubTopic] + contentTopics*: seq[ContentTopic] + cursor*: Option[HistoryCursorRest] + startTime*: Option[Timestamp] + endTime*: Option[Timestamp] + pageSize*: uint64 + ascending*: bool + + StoreWakuMessage* = object + payload*: Base64String + contentTopic*: Option[ContentTopic] + version*: Option[uint32] + timestamp*: Option[Timestamp] + ephemeral*: Option[bool] + + StoreResponseRest* = object + # inspired by https://rfc.vac.dev/spec/16/#storeresponse + messages*: seq[StoreWakuMessage] + cursor*: Option[HistoryCursorRest] + # field that contains error information + errorMessage*: Option[string] + + +#### Type conversion + +# Converts a URL-encoded-base64 string into a 'MessageDigest' +proc parseMsgDigest*(input: Option[string]): + Result[Option[MessageDigest], string] = + + if not input.isSome() or input.get() == "": + return ok(none(MessageDigest)) + + let decodedUrl = decodeUrl(input.get()) + let base64Decoded = base64.decode(Base64String(decodedUrl)) + var messageDigest = MessageDigest() + + if not base64Decoded.isOk(): + return err(base64Decoded.error) + + let base64DecodedArr = base64Decoded.get() + # Next snippet inspired by "nwaku/waku/v2/protocol/waku_archive/archive.nim" + # TODO: Improve coherence of MessageDigest type + messageDigest = block: + var data: array[32, byte] + for i in 0..