From 7aa50c8b090518259f5b1d6143c610427098ddb0 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 31 Jan 2024 17:43:59 +0100 Subject: [PATCH] REST store: get msgs from self node when store is mounted and no peerAddr is passed (#2387) A node that handles REST-Store requests normally acts as a Store-client and therefore it retrieved the messages from another Store-node. With these changes, we allow a node with Store mounted, to retrieve its messages. In other words, the node can act as a Store-server of its messages. * test_rest_store.nim: add a new test to validate that the self-node can retrieve its messages to the REST client. * rest/store/client.nim: add new proc to allow making a GET store request without peerAddr. * rest/store/handle.nim: add logic to handle requests that don't provide peerAddr but the self/local node has Store mounted. In this case, the self/local node will retrieve its locally stored messages. * waku_store/self_req_handler.nim: logic to handle "store" requests allowing the REST-store node to act as a Store-server node. The 'self_req_handler.nim' helps to bypass the store protocol and directly retrieve the messages from the local/self node. I added this logic in a separate file from 'protocol.nim' because it doesn't participate in any libp2p communication. * waku_store/protocol.nim: make 'queryHandler' attribute public so that it can be used from the 'self_req_handler.nim' module. --- tests/wakunode_rest/test_rest_store.nim | 64 +++++++++++++++++++++++++ waku/waku_api/rest/store/client.nim | 23 +++++++++ waku/waku_api/rest/store/handlers.nim | 57 +++++++++++++++------- waku/waku_store/protocol.nim | 2 +- waku/waku_store/self_req_handler.nim | 38 +++++++++++++++ 5 files changed, 167 insertions(+), 17 deletions(-) create mode 100644 waku/waku_store/self_req_handler.nim diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index d6063ebd5..24cbf48b0 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -534,3 +534,67 @@ procSuite "Waku v2 Rest API - Store": await restServer.stop() await restServer.closeWait() await node.stop() + + + asyncTest "retrieve historical messages from a self-store-node": + ## This test aims to validate the correct message retrieval for a store-node which exposes + ## a REST server. + + # Given + let node = testWakuNode() + await node.start() + + let restPort = Port(58014) + let restAddress = parseIpAddress("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + installStoreApiHandlers(restServer.router, node) + restServer.start() + + # WakuStore setup + let driver: ArchiveDriver = QueueDriver.new() + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + + await node.mountStore() + + # Now prime it with some history before tests + let msgList = @[ + fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0), + fakeWakuMessage(@[byte 1], ts=1), + fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9) + ] + for msg in msgList: + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + # Filtering by a known pubsub topic. + var response = + await client.getStoreMessagesV1( + none[string](), + encodeUrl(DefaultPubsubTopic)) + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.messages.len == 3 + + # Get all the messages by specifying an empty pubsub topic + response = + await client.getStoreMessagesV1( + none[string](), + encodeUrl("")) + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.messages.len == 3 + + # Receiving no messages by filtering with a random pubsub topic + response = + await client.getStoreMessagesV1( + none[string](), + encodeUrl("random pubsub topic")) + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.messages.len == 0 diff --git a/waku/waku_api/rest/store/client.nim b/waku/waku_api/rest/store/client.nim index 71babc8e1..3e5c9dd80 100644 --- a/waku/waku_api/rest/store/client.nim +++ b/waku/waku_api/rest/store/client.nim @@ -70,3 +70,26 @@ proc getStoreMessagesV1*( {.rest, endpoint: "/store/v1/messages", meth: HttpMethod.MethodGet.} + +proc getStoreMessagesV1*( + # URL-encoded reference to the store-node + peerAddr: Option[string], + pubsubTopic: string = "", + # URL-encoded comma-separated list of content topics + contentTopics: string = "", + startTime: string = "", + endTime: string = "", + + # Optional cursor fields + senderTime: string = "", + storeTime: string = "", + digest: string = "", # base64-encoded digest + + pageSize: string = "", + ascending: string = "" + ): + RestResponse[StoreResponseRest] + + {.rest, + endpoint: "/store/v1/messages", + meth: HttpMethod.MethodGet.} \ No newline at end of file diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index 57bcdaf61..8cdeef903 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -13,6 +13,7 @@ import import ../../../waku_core, ../../../waku_store/common, + ../../../waku_store/self_req_handler, ../../../waku_node, ../../../node/peer_manager, ../../../common/paging, @@ -187,6 +188,24 @@ proc toOpt(self: Option[Result[string, cstring]]): Option[string] = if self.isSome() and self.get().value != "": return some(self.get().value) +proc retrieveMsgsFromSelfNode(self: WakuNode, histQuery: HistoryQuery): + Future[RestApiResponse] {.async.} = + ## Performs a "store" request to the local node (self node.) + ## Notice that this doesn't follow the regular store libp2p channel because a node + ## it is not allowed to libp2p-dial a node to itself, by default. + ## + + let selfResp = (await self.wakuStore.handleSelfStoreRequest(histQuery)).valueOr: + return RestApiResponse.internalServerError($error) + + let storeResp = selfResp.toStoreResponseRest() + let resp = RestApiResponse.jsonResponse(storeResp, status=Http200).valueOr: + const msg = "Error building the json respose" + error msg, error=error + return RestApiResponse.internalServerError(fmt("{msg} [{error}]")) + + return resp + # Subscribes the rest handler to attend "/store/v1/messages" requests proc installStoreApiHandlers*( router: var RestRouter, @@ -215,22 +234,6 @@ proc installStoreApiHandlers*( # Example: # /store/v1/messages?peerAddr=%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\&pubsubTopic=my-waku-topic - # Parse the peer address parameter - let parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()).valueOr: - return RestApiResponse.badRequest(error) - - let peerAddr = parsedPeerAddr.valueOr: - node.peerManager.selectPeer(WakuStoreCodec).valueOr: - let handler = discHandler.valueOr: - return NoPeerNoDiscError - - let peerOp = (await handler()).valueOr: - return RestApiResponse.internalServerError($error) - - peerOp.valueOr: - return RestApiResponse.preconditionFailed( - "No suitable service peer & none discovered") - # Parse the rest of the parameters and create a HistoryQuery let histQuery = createHistoryQuery( pubsubTopic.toOpt(), @@ -247,4 +250,26 @@ proc installStoreApiHandlers*( if not histQuery.isOk(): return RestApiResponse.badRequest(histQuery.error) + if peerAddr.isNone() and not node.wakuStore.isNil(): + ## The user didn't specify a peer address and self-node is configured as a store node. + ## In this case we assume that the user is willing to retrieve the messages stored by + ## the local/self store node. + return await node.retrieveMsgsFromSelfNode(histQuery.get()) + + # Parse the peer address parameter + let parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()).valueOr: + return RestApiResponse.badRequest(error) + + let peerAddr = parsedPeerAddr.valueOr: + node.peerManager.selectPeer(WakuStoreCodec).valueOr: + let handler = discHandler.valueOr: + return NoPeerNoDiscError + + let peerOp = (await handler()).valueOr: + return RestApiResponse.internalServerError($error) + + peerOp.valueOr: + return RestApiResponse.preconditionFailed( + "No suitable service peer & none discovered") + return await node.performHistoryQuery(histQuery.value, peerAddr) \ No newline at end of file diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 692ec962b..4e085e3c8 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -40,7 +40,7 @@ type WakuStore* = ref object of LPProtocol peerManager: PeerManager rng: ref rand.HmacDrbgContext - queryHandler: HistoryQueryHandler + queryHandler*: HistoryQueryHandler ## Protocol diff --git a/waku/waku_store/self_req_handler.nim b/waku/waku_store/self_req_handler.nim new file mode 100644 index 000000000..9504109cc --- /dev/null +++ b/waku/waku_store/self_req_handler.nim @@ -0,0 +1,38 @@ + +## +## This file is aimed to attend the requests that come directly +## from the 'self' node. It is expected to attend the store requests that +## come from REST-store endpoint when those requests don't indicate +## any store-peer address. +## +## Notice that the REST-store requests normally assume that the REST +## server is acting as a store-client. In this module, we allow that +## such REST-store node can act as store-server as well by retrieving +## its own stored messages. The typical use case for that is when +## using `nwaku-compose`, which spawn a Waku node connected to a local +## database, and the user is interested in retrieving the messages +## stored by that local store node. +## + +import + stew/results, + chronos, + chronicles +import + ./protocol, + ./common + +proc handleSelfStoreRequest*(self: WakuStore, histQuery: HistoryQuery): + Future[WakuStoreResult[HistoryResponse]] {.async.} = + ## Handles the store requests made by the node to itself. + ## Normally used in REST-store requests + + try: + let resp: HistoryResponse = (await self.queryHandler(histQuery)).valueOr: + return err("error in handleSelfStoreRequest: " & $error) + + return WakuStoreResult[HistoryResponse].ok(resp) + + except Exception: + return err("exception in handleSelfStoreRequest: " & getCurrentExceptionMsg()) +