mirror of https://github.com/waku-org/nwaku.git
REST store: get msgs from self node when store is mounted and no peerAddr is passed (#2387)
A node that handles REST-Store requests normally acts as a Store-client and therefore it retrieved the messages from another Store-node. With these changes, we allow a node with Store mounted, to retrieve its messages. In other words, the node can act as a Store-server of its messages. * test_rest_store.nim: add a new test to validate that the self-node can retrieve its messages to the REST client. * rest/store/client.nim: add new proc to allow making a GET store request without peerAddr. * rest/store/handle.nim: add logic to handle requests that don't provide peerAddr but the self/local node has Store mounted. In this case, the self/local node will retrieve its locally stored messages. * waku_store/self_req_handler.nim: logic to handle "store" requests allowing the REST-store node to act as a Store-server node. The 'self_req_handler.nim' helps to bypass the store protocol and directly retrieve the messages from the local/self node. I added this logic in a separate file from 'protocol.nim' because it doesn't participate in any libp2p communication. * waku_store/protocol.nim: make 'queryHandler' attribute public so that it can be used from the 'self_req_handler.nim' module.
This commit is contained in:
parent
c55ca06756
commit
3e65cc18f6
|
@ -534,3 +534,67 @@ procSuite "Waku v2 Rest API - Store":
|
|||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
|
||||
asyncTest "retrieve historical messages from a self-store-node":
|
||||
## This test aims to validate the correct message retrieval for a store-node which exposes
|
||||
## a REST server.
|
||||
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
|
||||
let restPort = Port(58014)
|
||||
let restAddress = parseIpAddress("0.0.0.0")
|
||||
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
# WakuStore setup
|
||||
let driver: ArchiveDriver = QueueDriver.new()
|
||||
let mountArchiveRes = node.mountArchive(driver)
|
||||
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
||||
|
||||
await node.mountStore()
|
||||
|
||||
# Now prime it with some history before tests
|
||||
let msgList = @[
|
||||
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0),
|
||||
fakeWakuMessage(@[byte 1], ts=1),
|
||||
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9)
|
||||
]
|
||||
for msg in msgList:
|
||||
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
|
||||
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
|
||||
# Filtering by a known pubsub topic.
|
||||
var response =
|
||||
await client.getStoreMessagesV1(
|
||||
none[string](),
|
||||
encodeUrl(DefaultPubsubTopic))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 3
|
||||
|
||||
# Get all the messages by specifying an empty pubsub topic
|
||||
response =
|
||||
await client.getStoreMessagesV1(
|
||||
none[string](),
|
||||
encodeUrl(""))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 3
|
||||
|
||||
# Receiving no messages by filtering with a random pubsub topic
|
||||
response =
|
||||
await client.getStoreMessagesV1(
|
||||
none[string](),
|
||||
encodeUrl("random pubsub topic"))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 0
|
||||
|
|
|
@ -70,3 +70,26 @@ proc getStoreMessagesV1*(
|
|||
{.rest,
|
||||
endpoint: "/store/v1/messages",
|
||||
meth: HttpMethod.MethodGet.}
|
||||
|
||||
proc getStoreMessagesV1*(
|
||||
# URL-encoded reference to the store-node
|
||||
peerAddr: Option[string],
|
||||
pubsubTopic: string = "",
|
||||
# URL-encoded comma-separated list of content topics
|
||||
contentTopics: string = "",
|
||||
startTime: string = "",
|
||||
endTime: string = "",
|
||||
|
||||
# Optional cursor fields
|
||||
senderTime: string = "",
|
||||
storeTime: string = "",
|
||||
digest: string = "", # base64-encoded digest
|
||||
|
||||
pageSize: string = "",
|
||||
ascending: string = ""
|
||||
):
|
||||
RestResponse[StoreResponseRest]
|
||||
|
||||
{.rest,
|
||||
endpoint: "/store/v1/messages",
|
||||
meth: HttpMethod.MethodGet.}
|
|
@ -13,6 +13,7 @@ import
|
|||
import
|
||||
../../../waku_core,
|
||||
../../../waku_store/common,
|
||||
../../../waku_store/self_req_handler,
|
||||
../../../waku_node,
|
||||
../../../node/peer_manager,
|
||||
../../../common/paging,
|
||||
|
@ -187,6 +188,24 @@ proc toOpt(self: Option[Result[string, cstring]]): Option[string] =
|
|||
if self.isSome() and self.get().value != "":
|
||||
return some(self.get().value)
|
||||
|
||||
proc retrieveMsgsFromSelfNode(self: WakuNode, histQuery: HistoryQuery):
|
||||
Future[RestApiResponse] {.async.} =
|
||||
## Performs a "store" request to the local node (self node.)
|
||||
## Notice that this doesn't follow the regular store libp2p channel because a node
|
||||
## it is not allowed to libp2p-dial a node to itself, by default.
|
||||
##
|
||||
|
||||
let selfResp = (await self.wakuStore.handleSelfStoreRequest(histQuery)).valueOr:
|
||||
return RestApiResponse.internalServerError($error)
|
||||
|
||||
let storeResp = selfResp.toStoreResponseRest()
|
||||
let resp = RestApiResponse.jsonResponse(storeResp, status=Http200).valueOr:
|
||||
const msg = "Error building the json respose"
|
||||
error msg, error=error
|
||||
return RestApiResponse.internalServerError(fmt("{msg} [{error}]"))
|
||||
|
||||
return resp
|
||||
|
||||
# Subscribes the rest handler to attend "/store/v1/messages" requests
|
||||
proc installStoreApiHandlers*(
|
||||
router: var RestRouter,
|
||||
|
@ -215,22 +234,6 @@ proc installStoreApiHandlers*(
|
|||
# Example:
|
||||
# /store/v1/messages?peerAddr=%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\&pubsubTopic=my-waku-topic
|
||||
|
||||
# Parse the peer address parameter
|
||||
let parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()).valueOr:
|
||||
return RestApiResponse.badRequest(error)
|
||||
|
||||
let peerAddr = parsedPeerAddr.valueOr:
|
||||
node.peerManager.selectPeer(WakuStoreCodec).valueOr:
|
||||
let handler = discHandler.valueOr:
|
||||
return NoPeerNoDiscError
|
||||
|
||||
let peerOp = (await handler()).valueOr:
|
||||
return RestApiResponse.internalServerError($error)
|
||||
|
||||
peerOp.valueOr:
|
||||
return RestApiResponse.preconditionFailed(
|
||||
"No suitable service peer & none discovered")
|
||||
|
||||
# Parse the rest of the parameters and create a HistoryQuery
|
||||
let histQuery = createHistoryQuery(
|
||||
pubsubTopic.toOpt(),
|
||||
|
@ -247,4 +250,26 @@ proc installStoreApiHandlers*(
|
|||
if not histQuery.isOk():
|
||||
return RestApiResponse.badRequest(histQuery.error)
|
||||
|
||||
if peerAddr.isNone() and not node.wakuStore.isNil():
|
||||
## The user didn't specify a peer address and self-node is configured as a store node.
|
||||
## In this case we assume that the user is willing to retrieve the messages stored by
|
||||
## the local/self store node.
|
||||
return await node.retrieveMsgsFromSelfNode(histQuery.get())
|
||||
|
||||
# Parse the peer address parameter
|
||||
let parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt()).valueOr:
|
||||
return RestApiResponse.badRequest(error)
|
||||
|
||||
let peerAddr = parsedPeerAddr.valueOr:
|
||||
node.peerManager.selectPeer(WakuStoreCodec).valueOr:
|
||||
let handler = discHandler.valueOr:
|
||||
return NoPeerNoDiscError
|
||||
|
||||
let peerOp = (await handler()).valueOr:
|
||||
return RestApiResponse.internalServerError($error)
|
||||
|
||||
peerOp.valueOr:
|
||||
return RestApiResponse.preconditionFailed(
|
||||
"No suitable service peer & none discovered")
|
||||
|
||||
return await node.performHistoryQuery(histQuery.value, peerAddr)
|
|
@ -40,7 +40,7 @@ type
|
|||
WakuStore* = ref object of LPProtocol
|
||||
peerManager: PeerManager
|
||||
rng: ref rand.HmacDrbgContext
|
||||
queryHandler: HistoryQueryHandler
|
||||
queryHandler*: HistoryQueryHandler
|
||||
|
||||
## Protocol
|
||||
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
|
||||
##
|
||||
## This file is aimed to attend the requests that come directly
|
||||
## from the 'self' node. It is expected to attend the store requests that
|
||||
## come from REST-store endpoint when those requests don't indicate
|
||||
## any store-peer address.
|
||||
##
|
||||
## Notice that the REST-store requests normally assume that the REST
|
||||
## server is acting as a store-client. In this module, we allow that
|
||||
## such REST-store node can act as store-server as well by retrieving
|
||||
## its own stored messages. The typical use case for that is when
|
||||
## using `nwaku-compose`, which spawn a Waku node connected to a local
|
||||
## database, and the user is interested in retrieving the messages
|
||||
## stored by that local store node.
|
||||
##
|
||||
|
||||
import
|
||||
stew/results,
|
||||
chronos,
|
||||
chronicles
|
||||
import
|
||||
./protocol,
|
||||
./common
|
||||
|
||||
proc handleSelfStoreRequest*(self: WakuStore, histQuery: HistoryQuery):
|
||||
Future[WakuStoreResult[HistoryResponse]] {.async.} =
|
||||
## Handles the store requests made by the node to itself.
|
||||
## Normally used in REST-store requests
|
||||
|
||||
try:
|
||||
let resp: HistoryResponse = (await self.queryHandler(histQuery)).valueOr:
|
||||
return err("error in handleSelfStoreRequest: " & $error)
|
||||
|
||||
return WakuStoreResult[HistoryResponse].ok(resp)
|
||||
|
||||
except Exception:
|
||||
return err("exception in handleSelfStoreRequest: " & getCurrentExceptionMsg())
|
||||
|
Loading…
Reference in New Issue