mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-12 23:16:39 +00:00
* feat: new rest api based on the current store json-rpc api and following the same structure as the current relay rest api. * feat: the store api attend GET requests to retrieve historical messages * feat: unit tests. * feat: allow return message to rest-client in case error (4XX or 5XX) * chore: always allow to call the store api endpoints (only rest) without explicit storenode (#1575) * feat: always mounting the current node as storenode client
This commit is contained in:
parent
9dadc1f5a0
commit
b2acb54d6a
@ -437,9 +437,9 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
||||
executeMessageRetentionPolicy(node)
|
||||
startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
|
||||
|
||||
mountStoreClient(node)
|
||||
if conf.storenode != "":
|
||||
try:
|
||||
mountStoreClient(node)
|
||||
let storenode = parseRemotePeerInfo(conf.storenode)
|
||||
node.peerManager.addServicePeer(storenode, WakuStoreCodec)
|
||||
except CatchableError:
|
||||
|
@ -13,6 +13,7 @@ import
|
||||
../../waku/v2/node/rest/debug/handlers as debug_api,
|
||||
../../waku/v2/node/rest/relay/handlers as relay_api,
|
||||
../../waku/v2/node/rest/relay/topic_cache,
|
||||
../../waku/v2/node/rest/store/handlers as store_api,
|
||||
./config
|
||||
|
||||
|
||||
@ -36,5 +37,8 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf:
|
||||
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
|
||||
installRelayApiHandlers(server.router, node, relayCache)
|
||||
|
||||
## Store REST API
|
||||
installStoreApiHandlers(server.router, node)
|
||||
|
||||
server.start()
|
||||
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
||||
|
@ -13,14 +13,29 @@ This API is divided in different _namespaces_ which group a set of resources:
|
||||
| `/admin` | Privileged access to the internal operations of the node. |
|
||||
| `/private` | Provides functionality to encrypt/decrypt `WakuMessage` payloads using either symmetric or asymmetric cryptography. This allows backwards compatibility with Waku v1 nodes. |
|
||||
|
||||
The full HTTP REST API documentation can be found here: [TBD]()
|
||||
|
||||
### API Specification
|
||||
|
||||
The HTTP REST API has been designed following the OpenAPI 3.0.3 standard specification format. The OpenAPI specification file can be found here: [TBD]()
|
||||
The HTTP REST API has been designed following the OpenAPI 3.0.3 standard specification format.
|
||||
The OpenAPI specification files can be found here:
|
||||
|
||||
| Namespace | OpenAPI file |
|
||||
------------|--------------
|
||||
| `/debug` | [openapi.yaml](https://github.com/waku-org/nwaku/blob/master/waku/v2/node/rest/debug/openapi.yaml) |
|
||||
| `/relay` | [openapi.yaml](https://github.com/waku-org/nwaku/blob/master/waku/v2/node/rest/relay/openapi.yaml) |
|
||||
| `/store` | [openapi.yaml](https://github.com/waku-org/nwaku/blob/master/waku/v2/node/rest/store/openapi.yaml) |
|
||||
| `/filter` | [openapi.yaml](https://github.com/waku-org/nwaku/blob/master/waku/v2/node/rest/filter/openapi.yaml) |
|
||||
|
||||
The OpenAPI files can be analysed online with [Redocly](https://redocly.github.io/redoc/)
|
||||
|
||||
Check the [OpenAPI Tools](https://openapi.tools/) site for the right tool for you (e.g. REST API client generator)
|
||||
|
||||
A particular OpenAPI spec can be easily imported into [Postman](https://www.postman.com/downloads/)
|
||||
1. Open Postman.
|
||||
2. Click on File -> Import...
|
||||
2. Load the openapi.yaml of interest, stored in your computer.
|
||||
3. Then, requests can be made from within the 'Collections' section.
|
||||
|
||||
|
||||
### Usage example
|
||||
|
||||
@ -40,16 +55,4 @@ curl http://localhost:8645/debug/v1/info -s | jq
|
||||
|
||||
|
||||
### Node configuration
|
||||
|
||||
A subset of the node configuration can be used to modify the behaviour of the HTTP REST API. These are the relevant command line options:
|
||||
|
||||
| CLI option | Description | Default value |
|
||||
|------------|-------------|---------------|
|
||||
|`--rest` | Enable Waku REST HTTP server. | `false` |
|
||||
|`--rest-address` | Listening address of the REST HTTP server. | `127.0.0.1` |
|
||||
|`--rest-port` | Listening port of the REST HTTP server. | `8645` |
|
||||
|`--rest-relay-cache-capacity` | Capacity of the Relay REST API message cache. | `30` |
|
||||
|`--rest-admin` | Enable access to REST HTTP Admin API. | `false` |
|
||||
|`--rest-private` | Enable access to REST HTTP Private API. | `false` |
|
||||
|
||||
Note that these command line options have their counterpart option in the node configuration file.
|
||||
Find details [here](https://github.com/waku-org/nwaku/tree/master/docs/operators/how-to/configure-rest-api.md)
|
||||
|
23
docs/operators/how-to/configure-rest-api.md
Normal file
23
docs/operators/how-to/configure-rest-api.md
Normal file
@ -0,0 +1,23 @@
|
||||
|
||||
# Configure a REST API node
|
||||
|
||||
A subset of the node configuration can be used to modify the behaviour of the HTTP REST API.
|
||||
|
||||
These are the relevant command line options:
|
||||
|
||||
| CLI option | Description | Default value |
|
||||
|------------|-------------|---------------|
|
||||
|`--rest` | Enable Waku REST HTTP server. | `false` |
|
||||
|`--rest-address` | Listening address of the REST HTTP server. | `127.0.0.1` |
|
||||
|`--rest-port` | Listening port of the REST HTTP server. | `8645` |
|
||||
|`--rest-relay-cache-capacity` | Capacity of the Relay REST API message cache. | `30` |
|
||||
|`--rest-admin` | Enable access to REST HTTP Admin API. | `false` |
|
||||
|`--rest-private` | Enable access to REST HTTP Private API. | `false` |
|
||||
|
||||
Note that these command line options have their counterpart option in the node configuration file.
|
||||
|
||||
Example:
|
||||
|
||||
```shell
|
||||
wakunode2 --rest=true
|
||||
```
|
@ -124,7 +124,7 @@ The following options are available:
|
||||
|
||||
## Configuration use cases
|
||||
|
||||
This an index of tutorials explaining how to configure your nwaku node for different use cases.
|
||||
This is an index of tutorials explaining how to configure your nwaku node for different use cases.
|
||||
|
||||
1. [Connect to other peers](./connect.md)
|
||||
2. [Configure a domain name](./configure-domain.md)
|
||||
@ -133,3 +133,4 @@ This an index of tutorials explaining how to configure your nwaku node for diffe
|
||||
5. [Generate and configure a node key](./configure-key.md)
|
||||
6. [Configure websocket transport](./configure-websocket.md)
|
||||
7. [Run nwaku with rate limiting enabled](./run-with-rln.md)
|
||||
8. [Configure a REST API node](./configure-rest-api.md)
|
||||
|
@ -78,7 +78,8 @@ import
|
||||
./v2/wakunode_rest/test_rest_debug_serdes,
|
||||
./v2/wakunode_rest/test_rest_relay,
|
||||
./v2/wakunode_rest/test_rest_relay_serdes,
|
||||
./v2/wakunode_rest/test_rest_serdes
|
||||
./v2/wakunode_rest/test_rest_serdes,
|
||||
./v2/wakunode_rest/test_rest_store
|
||||
|
||||
|
||||
## Apps
|
||||
|
522
tests/v2/wakunode_rest/test_rest_store.nim
Normal file
522
tests/v2/wakunode_rest/test_rest_store.nim
Normal file
@ -0,0 +1,522 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, times],
|
||||
stew/shims/net as stewNet,
|
||||
chronicles,
|
||||
testutils/unittests,
|
||||
eth/keys,
|
||||
presto, presto/client as presto_client,
|
||||
libp2p/crypto/crypto
|
||||
import
|
||||
../../../waku/v2/node/peer_manager,
|
||||
../../../waku/v2/node/waku_node,
|
||||
../../waku/v2/node/rest/server,
|
||||
../../waku/v2/node/rest/client,
|
||||
../../waku/v2/node/rest/responses,
|
||||
../../../waku/v2/node/rest/store/handlers as store_api,
|
||||
../../../waku/v2/node/rest/store/client as store_api_client,
|
||||
../../../waku/v2/node/rest/store/types,
|
||||
../../../waku/v2/protocol/waku_message,
|
||||
../../../waku/v2/protocol/waku_archive,
|
||||
../../../waku/v2/protocol/waku_archive/driver/queue_driver,
|
||||
../../../waku/v2/protocol/waku_store as waku_store,
|
||||
../../../waku/v2/utils/peers,
|
||||
../../../waku/v2/utils/time,
|
||||
../../v2/testlib/common,
|
||||
../../v2/testlib/wakucore,
|
||||
../../v2/testlib/wakunode
|
||||
|
||||
logScope:
|
||||
topics = "waku node rest store_api test"
|
||||
|
||||
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
|
||||
let
|
||||
digest = waku_archive.computeDigest(message)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
store.put(pubsubTopic, message, digest, receivedTime)
|
||||
|
||||
# Creates a new WakuNode
|
||||
proc testWakuNode(): WakuNode =
|
||||
let
|
||||
privkey = generateSecp256k1Key()
|
||||
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||
extIp = ValidIpAddress.init("127.0.0.1")
|
||||
port = Port(0)
|
||||
|
||||
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
|
||||
|
||||
################################################################################
|
||||
# Beginning of the tests
|
||||
################################################################################
|
||||
procSuite "Waku v2 Rest API - Store":
|
||||
|
||||
asyncTest "MessageDigest <-> string conversions":
|
||||
# Validate MessageDigest conversion from a WakuMessage obj
|
||||
let wakuMsg = WakuMessage(
|
||||
contentTopic: "Test content topic",
|
||||
payload: @[byte('H'), byte('i'), byte('!')]
|
||||
)
|
||||
|
||||
let messageDigest = waku_store.computeDigest(wakuMsg)
|
||||
let restMsgDigest = some(messageDigest.toRestStringMessageDigest())
|
||||
let parsedMsgDigest = restMsgDigest.parseMsgDigest().value
|
||||
|
||||
check:
|
||||
messageDigest == parsedMsgDigest.get()
|
||||
|
||||
# Random validation. Obtained the raw values manually
|
||||
let expected = some("ZjNhM2Q2NDkwMTE0MjMzNDg0MzJlMDdiZGI3NzIwYTc%3D")
|
||||
let msgDigest = expected.parseMsgDigest().value
|
||||
check:
|
||||
expected.get() == msgDigest.get().toRestStringMessageDigest()
|
||||
|
||||
asyncTest "Filter by start and end time":
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58011)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
# WakuStore setup
|
||||
let driver: ArchiveDriver = QueueDriver.new()
|
||||
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
|
||||
await node.mountStore()
|
||||
node.mountStoreClient()
|
||||
|
||||
let key = generateEcdsaKey()
|
||||
var peerSwitch = newStandardSwitch(some(key))
|
||||
await peerSwitch.start()
|
||||
|
||||
peerSwitch.mount(node.wakuStore)
|
||||
|
||||
# Now prime it with some history before tests
|
||||
let msgList = @[
|
||||
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0),
|
||||
fakeWakuMessage(@[byte 1], ts=1),
|
||||
fakeWakuMessage(@[byte 1, byte 2], ts=2),
|
||||
fakeWakuMessage(@[byte 1], ts=3),
|
||||
fakeWakuMessage(@[byte 1], ts=4),
|
||||
fakeWakuMessage(@[byte 1], ts=5),
|
||||
fakeWakuMessage(@[byte 1], ts=6),
|
||||
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("c2"), ts=9)
|
||||
]
|
||||
for msg in msgList:
|
||||
require driver.put(DefaultPubsubTopic, msg).isOk()
|
||||
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
|
||||
let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
|
||||
let fullAddr = $remotePeerInfo.addrs[0] &
|
||||
"/p2p/" & $remotePeerInfo.peerId
|
||||
|
||||
# Apply filter by start and end timestamps
|
||||
var response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl(fullAddr),
|
||||
encodeUrl(DefaultPubsubTopic),
|
||||
"", # empty content topics. Don't filter by this field
|
||||
"3", # start time
|
||||
"6", # end time
|
||||
"", # sender time
|
||||
"", # store time
|
||||
"", # base64-encoded digest
|
||||
"", # empty implies default page size
|
||||
"true" # ascending
|
||||
)
|
||||
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 4
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
asyncTest "Store node history response - forward pagination":
|
||||
# Test adapted from the analogous present at waku_store/test_wakunode_store.nim
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
|
||||
let restPort = Port(58011)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
# WakuStore setup
|
||||
let driver: ArchiveDriver = QueueDriver.new()
|
||||
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
|
||||
await node.mountStore()
|
||||
node.mountStoreClient()
|
||||
|
||||
let key = generateEcdsaKey()
|
||||
var peerSwitch = newStandardSwitch(some(key))
|
||||
await peerSwitch.start()
|
||||
|
||||
peerSwitch.mount(node.wakuStore)
|
||||
|
||||
# Now prime it with some history before tests
|
||||
let timeOrigin = common.now()
|
||||
let msgList = @[
|
||||
fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 01], ts=ts(10, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 02], ts=ts(20, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 03], ts=ts(30, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 04], ts=ts(40, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 05], ts=ts(50, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 06], ts=ts(60, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 07], ts=ts(70, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 08], ts=ts(80, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin))
|
||||
]
|
||||
for msg in msgList:
|
||||
require driver.put(DefaultPubsubTopic, msg).isOk()
|
||||
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
|
||||
let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
|
||||
let fullAddr = $remotePeerInfo.addrs[0] &
|
||||
"/p2p/" & $remotePeerInfo.peerId
|
||||
|
||||
var pages = newSeq[seq[WakuMessage]](2)
|
||||
|
||||
# Fields that compose a HistoryCursor object
|
||||
var reqPubsubTopic = DefaultPubsubTopic
|
||||
var reqSenderTime = Timestamp(0)
|
||||
var reqStoreTime = Timestamp(0)
|
||||
var reqDigest = waku_store.MessageDigest()
|
||||
|
||||
for i in 0..<2:
|
||||
let response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl(fullAddr),
|
||||
encodeUrl(reqPubsubTopic),
|
||||
"", # content topics. Empty ignores the field.
|
||||
"", # start time. Empty ignores the field.
|
||||
"", # end time. Empty ignores the field.
|
||||
encodeUrl($reqSenderTime), # sender time
|
||||
encodeUrl($reqStoreTime), # store time
|
||||
reqDigest.toRestStringMessageDigest(), # base64-encoded digest. Empty ignores the field.
|
||||
"7", # page size. Empty implies default page size.
|
||||
"true" # ascending
|
||||
)
|
||||
|
||||
var wakuMessages = newSeq[WakuMessage](0)
|
||||
for j in 0..<response.data.messages.len:
|
||||
wakuMessages.add(response.data.messages[j].toWakuMessage())
|
||||
|
||||
pages[i] = wakuMessages
|
||||
|
||||
# populate the cursor for next page
|
||||
if response.data.cursor.isSome():
|
||||
reqPubsubTopic = response.data.cursor.get().pubsubTopic
|
||||
reqDigest = response.data.cursor.get().digest
|
||||
reqSenderTime = response.data.cursor.get().senderTime
|
||||
reqStoreTime = response.data.cursor.get().storeTime
|
||||
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
|
||||
check:
|
||||
pages[0] == msgList[0..6]
|
||||
pages[1] == msgList[7..9]
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
asyncTest "query a node and retrieve historical messages filtered by pubsub topic":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58011)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
# WakuStore setup
|
||||
let driver: ArchiveDriver = QueueDriver.new()
|
||||
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
|
||||
await node.mountStore()
|
||||
node.mountStoreClient()
|
||||
|
||||
let key = generateEcdsaKey()
|
||||
var peerSwitch = newStandardSwitch(some(key))
|
||||
await peerSwitch.start()
|
||||
|
||||
peerSwitch.mount(node.wakuStore)
|
||||
|
||||
# Now prime it with some history before tests
|
||||
let msgList = @[
|
||||
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("2"), ts=0),
|
||||
fakeWakuMessage(@[byte 1], ts=1),
|
||||
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=9)
|
||||
]
|
||||
for msg in msgList:
|
||||
require driver.put(DefaultPubsubTopic, msg).isOk()
|
||||
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
|
||||
let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
|
||||
let fullAddr = $remotePeerInfo.addrs[0] &
|
||||
"/p2p/" & $remotePeerInfo.peerId
|
||||
|
||||
# Filtering by a known pubsub topic
|
||||
var response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl($fullAddr),
|
||||
encodeUrl(DefaultPubsubTopic))
|
||||
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 3
|
||||
|
||||
# Get all the messages by specifying an empty pubsub topic
|
||||
response = await client.getStoreMessagesV1(encodeUrl($fullAddr))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 3
|
||||
|
||||
# Receiving no messages by filtering with a random pubsub topic
|
||||
response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl($fullAddr),
|
||||
encodeUrl("random pubsub topic"))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 0
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
asyncTest "retrieve historical messages from a provided store node address":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58011)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
# WakuStore setup
|
||||
let driver: ArchiveDriver = QueueDriver.new()
|
||||
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
|
||||
await node.mountStore()
|
||||
node.mountStoreClient()
|
||||
|
||||
let key = generateEcdsaKey()
|
||||
var peerSwitch = newStandardSwitch(some(key))
|
||||
await peerSwitch.start()
|
||||
|
||||
peerSwitch.mount(node.wakuStore)
|
||||
|
||||
# Now prime it with some history before tests
|
||||
let msgList = @[
|
||||
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0),
|
||||
fakeWakuMessage(@[byte 1], ts=1),
|
||||
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9)
|
||||
]
|
||||
for msg in msgList:
|
||||
require driver.put(DefaultPubsubTopic, msg).isOk()
|
||||
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
|
||||
let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
|
||||
let fullAddr = $remotePeerInfo.addrs[0] &
|
||||
"/p2p/" & $remotePeerInfo.peerId
|
||||
|
||||
# Filtering by a known pubsub topic.
|
||||
# We also pass the store-node address in the request.
|
||||
var response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl(fullAddr),
|
||||
encodeUrl(DefaultPubsubTopic))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 3
|
||||
|
||||
# Get all the messages by specifying an empty pubsub topic
|
||||
# We also pass the store-node address in the request.
|
||||
response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl(fullAddr),
|
||||
encodeUrl(""))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 3
|
||||
|
||||
# Receiving no messages by filtering with a random pubsub topic
|
||||
# We also pass the store-node address in the request.
|
||||
response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl(fullAddr),
|
||||
encodeUrl("random pubsub topic"))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 0
|
||||
|
||||
# Receiving 400 response if setting wrong store-node address
|
||||
response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl("incorrect multi address format"),
|
||||
encodeUrl("random pubsub topic"))
|
||||
check:
|
||||
response.status == 400
|
||||
$response.contentType == $MIMETYPE_TEXT
|
||||
response.data.messages.len == 0
|
||||
response.data.error_message.get ==
|
||||
"Failed parsing remote peer info [multiaddress: Invalid MultiAddress, must start with `/`]"
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
asyncTest "filter historical messages by content topic":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58011)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
# WakuStore setup
|
||||
let driver: ArchiveDriver = QueueDriver.new()
|
||||
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
|
||||
await node.mountStore()
|
||||
node.mountStoreClient()
|
||||
|
||||
let key = generateEcdsaKey()
|
||||
var peerSwitch = newStandardSwitch(some(key))
|
||||
await peerSwitch.start()
|
||||
|
||||
peerSwitch.mount(node.wakuStore)
|
||||
|
||||
# Now prime it with some history before tests
|
||||
let msgList = @[
|
||||
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0),
|
||||
fakeWakuMessage(@[byte 1], ts=1),
|
||||
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9)
|
||||
]
|
||||
for msg in msgList:
|
||||
require driver.put(DefaultPubsubTopic, msg).isOk()
|
||||
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
|
||||
let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
|
||||
let fullAddr = $remotePeerInfo.addrs[0] &
|
||||
"/p2p/" & $remotePeerInfo.peerId
|
||||
|
||||
# Filtering by content topic
|
||||
let response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl(fullAddr),
|
||||
encodeUrl(DefaultPubsubTopic),
|
||||
encodeUrl("ct1,ct2"))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 2
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
asyncTest "precondition failed":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
||||
let restPort = Port(58011)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
|
||||
|
||||
installStoreApiHandlers(restServer.router, node)
|
||||
restServer.start()
|
||||
|
||||
# WakuStore setup
|
||||
let driver: ArchiveDriver = QueueDriver.new()
|
||||
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
|
||||
await node.mountStore()
|
||||
node.mountStoreClient()
|
||||
|
||||
let key = generateEcdsaKey()
|
||||
var peerSwitch = newStandardSwitch(some(key))
|
||||
await peerSwitch.start()
|
||||
|
||||
peerSwitch.mount(node.wakuStore)
|
||||
|
||||
# Now prime it with some history before tests
|
||||
let msgList = @[
|
||||
fakeWakuMessage(@[byte 0], contentTopic=ContentTopic("ct1"), ts=0),
|
||||
fakeWakuMessage(@[byte 1], ts=1),
|
||||
fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9)
|
||||
]
|
||||
for msg in msgList:
|
||||
require driver.put(DefaultPubsubTopic, msg).isOk()
|
||||
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
|
||||
let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
|
||||
let fullAddr = $remotePeerInfo.addrs[0] &
|
||||
"/p2p/" & $remotePeerInfo.peerId
|
||||
|
||||
# Sending no peer-store node address
|
||||
var response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl(""),
|
||||
encodeUrl(DefaultPubsubTopic))
|
||||
check:
|
||||
response.status == 412
|
||||
$response.contentType == $MIMETYPE_TEXT
|
||||
response.data.messages.len == 0
|
||||
response.data.error_message.get == "Missing known store-peer node"
|
||||
|
||||
# Now add the storenode from "config"
|
||||
node.peerManager.addServicePeer(remotePeerInfo,
|
||||
WakuStoreCodec)
|
||||
|
||||
# Sending no peer-store node address
|
||||
response =
|
||||
await client.getStoreMessagesV1(
|
||||
encodeUrl(""),
|
||||
encodeUrl(DefaultPubsubTopic))
|
||||
check:
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.messages.len == 3
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
@ -19,14 +19,25 @@ const MIMETYPE_TEXT* = MediaType.init("text/plain")
|
||||
proc ok*(t: typedesc[RestApiResponse]): RestApiResponse =
|
||||
RestApiResponse.response("OK", Http200, $MIMETYPE_TEXT)
|
||||
|
||||
proc internalServerError*(t: typedesc[RestApiResponse]): RestApiResponse =
|
||||
RestApiResponse.error(Http500)
|
||||
proc internalServerError*(t: typedesc[RestApiResponse],
|
||||
msg: string = ""):
|
||||
RestApiResponse =
|
||||
RestApiResponse.error(Http500, msg, $MIMETYPE_TEXT)
|
||||
|
||||
proc badRequest*(t: typedesc[RestApiResponse]): RestApiResponse =
|
||||
RestApiResponse.error(Http400)
|
||||
proc badRequest*(t: typedesc[RestApiResponse],
|
||||
msg: string = ""):
|
||||
RestApiResponse =
|
||||
RestApiResponse.error(Http400, msg, $MIMETYPE_TEXT)
|
||||
|
||||
proc notFound*(t: typedesc[RestApiResponse]): RestApiResponse =
|
||||
RestApiResponse.error(Http404)
|
||||
proc notFound*(t: typedesc[RestApiResponse],
|
||||
msg: string = ""):
|
||||
RestApiResponse =
|
||||
RestApiResponse.error(Http404, msg, $MIMETYPE_TEXT)
|
||||
|
||||
proc preconditionFailed*(t: typedesc[RestApiResponse],
|
||||
msg: string = ""):
|
||||
RestApiResponse =
|
||||
RestApiResponse.error(Http412, msg, $MIMETYPE_TEXT)
|
||||
|
||||
|
||||
proc jsonResponse*(t: typedesc[RestApiResponse], data: auto, status: HttpCode = Http200): SerdesResult[RestApiResponse] =
|
||||
|
72
waku/v2/node/rest/store/client.nim
Normal file
72
waku/v2/node/rest/store/client.nim
Normal file
@ -0,0 +1,72 @@
|
||||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
chronicles,
|
||||
json_serialization,
|
||||
json_serialization/std/options,
|
||||
presto/[route, client]
|
||||
import
|
||||
../../../protocol/waku_store/common,
|
||||
../serdes,
|
||||
../responses,
|
||||
./types
|
||||
|
||||
export types
|
||||
|
||||
|
||||
logScope:
|
||||
topics = "waku node rest store_api"
|
||||
|
||||
|
||||
proc decodeBytes*(t: typedesc[StoreResponseRest],
|
||||
data: openArray[byte],
|
||||
contentType: Opt[ContentTypeData]):
|
||||
|
||||
RestResult[StoreResponseRest] =
|
||||
|
||||
if MediaType.init($contentType) == MIMETYPE_JSON:
|
||||
let decoded = ?decodeFromJsonBytes(StoreResponseRest, data)
|
||||
return ok(decoded)
|
||||
|
||||
if MediaType.init($contentType) == MIMETYPE_TEXT:
|
||||
var res: string
|
||||
if len(data) > 0:
|
||||
res = newString(len(data))
|
||||
copyMem(addr res[0], unsafeAddr data[0], len(data))
|
||||
|
||||
return ok(StoreResponseRest(
|
||||
messages: newSeq[StoreWakuMessage](0),
|
||||
cursor: none(HistoryCursorRest),
|
||||
# field that contain error information
|
||||
errorMessage: some(res)
|
||||
))
|
||||
|
||||
# If everything goes wrong
|
||||
return err(cstring("Unsupported contentType " & $contentType))
|
||||
|
||||
|
||||
proc getStoreMessagesV1*(
|
||||
# URL-encoded reference to the store-node
|
||||
peerAddr: string = "",
|
||||
pubsubTopic: string = "",
|
||||
# URL-encoded comma-separated list of content topics
|
||||
contentTopics: string = "",
|
||||
startTime: string = "",
|
||||
endTime: string = "",
|
||||
|
||||
# Optional cursor fields
|
||||
senderTime: string = "",
|
||||
storeTime: string = "",
|
||||
digest: string = "", # base64-encoded digest
|
||||
|
||||
pageSize: string = "",
|
||||
ascending: string = ""
|
||||
):
|
||||
RestResponse[StoreResponseRest]
|
||||
|
||||
{.rest,
|
||||
endpoint: "/store/v1/messages",
|
||||
meth: HttpMethod.MethodGet.}
|
248
waku/v2/node/rest/store/handlers.nim
Normal file
248
waku/v2/node/rest/store/handlers.nim
Normal file
@ -0,0 +1,248 @@
|
||||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/strformat,
|
||||
stew/results,
|
||||
chronicles,
|
||||
json_serialization,
|
||||
presto/route
|
||||
import
|
||||
../../../../common/base64,
|
||||
../../../protocol/waku_message/topics/content_topic,
|
||||
../../../protocol/waku_store/common,
|
||||
../../../utils/time,
|
||||
../../waku_node,
|
||||
../../peer_manager,
|
||||
../responses,
|
||||
../serdes,
|
||||
./types
|
||||
|
||||
export types
|
||||
|
||||
logScope:
|
||||
topics = "waku node rest store_api"
|
||||
|
||||
const futTimeout* = 5.seconds # Max time to wait for futures
|
||||
|
||||
# Queries the store-node with the query parameters and
|
||||
# returns a RestApiResponse that is sent back to the api client.
|
||||
proc performHistoryQuery(selfNode: WakuNode,
|
||||
histQuery: HistoryQuery,
|
||||
storePeer: RemotePeerInfo):
|
||||
|
||||
Future[RestApiResponse] {.async.} =
|
||||
|
||||
let queryFut = selfNode.query(histQuery, storePeer)
|
||||
if not await queryFut.withTimeout(futTimeout):
|
||||
const msg = "No history response received (timeout)"
|
||||
error msg
|
||||
return RestApiResponse.internalServerError(msg)
|
||||
|
||||
let res = queryFut.read()
|
||||
if res.isErr():
|
||||
const msg = "Error occurred in queryFut.read()"
|
||||
error msg, error=res.error
|
||||
return RestApiResponse.internalServerError(
|
||||
fmt("{msg} [{res.error}]"))
|
||||
|
||||
let storeResp = res.value.toStoreResponseRest()
|
||||
let resp = RestApiResponse.jsonResponse(storeResp, status=Http200)
|
||||
if resp.isErr():
|
||||
const msg = "Error building the json respose"
|
||||
error msg, error=resp.error
|
||||
return RestApiResponse.internalServerError(
|
||||
fmt("{msg} [{resp.error}]"))
|
||||
|
||||
return resp.get()
|
||||
|
||||
# Converts a string time representation into an Option[Timestamp].
|
||||
# Only positive time is considered a valid Timestamp in the request
|
||||
proc parseTime(input: Option[string]):
|
||||
Result[Option[Timestamp], string] =
|
||||
if input.isSome() and input.get() != "":
|
||||
try:
|
||||
let time = parseInt(input.get())
|
||||
if time > 0:
|
||||
return ok(some(Timestamp(time)))
|
||||
except ValueError:
|
||||
return err("Problem parsing time [" &
|
||||
getCurrentExceptionMsg() & "]")
|
||||
|
||||
return ok(none(Timestamp))
|
||||
|
||||
# Generates a history query cursor as per the given params
|
||||
proc parseCursor(parsedPubsubTopic: Option[string],
|
||||
senderTime: Option[string],
|
||||
storeTime: Option[string],
|
||||
digest: Option[string]):
|
||||
Result[Option[HistoryCursor], string] =
|
||||
|
||||
# Parse sender time
|
||||
let parsedSenderTime = parseTime(senderTime)
|
||||
if not parsedSenderTime.isOk():
|
||||
return err(parsedSenderTime.error)
|
||||
|
||||
# Parse store time
|
||||
let parsedStoreTime = parseTime(storeTime)
|
||||
if not parsedStoreTime.isOk():
|
||||
return err(parsedStoreTime.error)
|
||||
|
||||
# Parse message digest
|
||||
let parsedMsgDigest = parseMsgDigest(digest)
|
||||
if not parsedMsgDigest.isOk():
|
||||
return err(parsedMsgDigest.error)
|
||||
|
||||
# Parse cursor information
|
||||
if parsedPubsubTopic.isSome() and
|
||||
parsedSenderTime.value.isSome() and
|
||||
parsedStoreTime.value.isSome() and
|
||||
parsedMsgDigest.value.isSome():
|
||||
|
||||
return ok(some(
|
||||
HistoryCursor(
|
||||
pubsubTopic: parsedPubsubTopic.get(),
|
||||
senderTime: parsedSenderTime.value.get(),
|
||||
storeTime: parsedStoreTime.value.get(),
|
||||
digest: parsedMsgDigest.value.get())
|
||||
))
|
||||
else:
|
||||
return ok(none(HistoryCursor))
|
||||
|
||||
# Creates a HistoryQuery from the given params
|
||||
proc createHistoryQuery(pubsubTopic: Option[string],
|
||||
contentTopics: Option[string],
|
||||
senderTime: Option[string],
|
||||
storeTime: Option[string],
|
||||
digest: Option[string],
|
||||
startTime: Option[string],
|
||||
endTime: Option[string],
|
||||
pageSize: Option[string],
|
||||
ascending: Option[string]):
|
||||
|
||||
Result[HistoryQuery, string] =
|
||||
|
||||
# Parse pubsubTopic parameter
|
||||
var parsedPubsubTopic = none(string)
|
||||
if pubsubTopic.isSome():
|
||||
let decodedPubsubTopic = decodeUrl(pubsubTopic.get())
|
||||
if decodedPubsubTopic != "":
|
||||
parsedPubsubTopic = some(decodedPubsubTopic)
|
||||
|
||||
# Parse the content topics
|
||||
var parsedContentTopics = newSeq[ContentTopic](0)
|
||||
if contentTopics.isSome():
|
||||
let ctList = decodeUrl(contentTopics.get())
|
||||
if ctList != "":
|
||||
for ct in ctList.split(','):
|
||||
parsedContentTopics.add(ct)
|
||||
|
||||
# Parse cursor information
|
||||
let parsedCursor = ? parseCursor(parsedPubsubTopic,
|
||||
senderTime,
|
||||
storeTime,
|
||||
digest)
|
||||
|
||||
# Parse page size field
|
||||
var parsedPagedSize = DefaultPageSize
|
||||
if pageSize.isSome() and pageSize.get() != "":
|
||||
try:
|
||||
parsedPagedSize = uint64(parseInt(pageSize.get()))
|
||||
except CatchableError:
|
||||
return err("Problem parsing page size [" &
|
||||
getCurrentExceptionMsg() & "]")
|
||||
|
||||
# Parse start time
|
||||
let parsedStartTime = ? parseTime(startTime)
|
||||
|
||||
# Parse end time
|
||||
let parsedEndTime = ? parseTime(endTime)
|
||||
|
||||
# Parse ascending field
|
||||
var parsedAscending = true
|
||||
if ascending.isSome() and ascending.get() != "":
|
||||
parsedAscending = ascending.get() == "true"
|
||||
|
||||
return ok(
|
||||
HistoryQuery(pubsubTopic: parsedPubsubTopic,
|
||||
contentTopics: parsedContentTopics,
|
||||
startTime: parsedStartTime,
|
||||
endTime: parsedEndTime,
|
||||
ascending: parsedAscending,
|
||||
pageSize: parsedPagedSize,
|
||||
cursor: parsedCursor
|
||||
))
|
||||
|
||||
# Simple type conversion. The "Option[Result[string, cstring]]"
|
||||
# type is used by the nim-presto library.
|
||||
proc toOpt(self: Option[Result[string, cstring]]): Option[string] =
|
||||
if not self.isSome() or self.get().value == "":
|
||||
return none(string)
|
||||
if self.isSome() and self.get().value != "":
|
||||
return some(self.get().value)
|
||||
|
||||
|
||||
# Subscribes the rest handler to attend "/store/v1/messages" requests
|
||||
proc installStoreV1Handler(router: var RestRouter,
|
||||
node: WakuNode) =
|
||||
|
||||
# Handles the store-query request according to the passed parameters
|
||||
router.api(MethodGet,
|
||||
"/store/v1/messages") do (
|
||||
peerAddr: Option[string],
|
||||
pubsubTopic: Option[string],
|
||||
contentTopics: Option[string],
|
||||
senderTime: Option[string],
|
||||
storeTime: Option[string],
|
||||
digest: Option[string],
|
||||
startTime: Option[string],
|
||||
endTime: Option[string],
|
||||
pageSize: Option[string],
|
||||
ascending: Option[string]
|
||||
) -> RestApiResponse:
|
||||
|
||||
debug "REST-GET /store/v1/messages ", peer_addr = $peerAddr
|
||||
|
||||
# All the GET parameters are URL-encoded (https://en.wikipedia.org/wiki/URL_encoding)
|
||||
# Example:
|
||||
# /store/v1/messages?peerAddr=%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\&pubsubTopic=my-waku-topic
|
||||
|
||||
# Parse the peer address parameter
|
||||
var parsedPeerAddr = parseUrlPeerAddr(peerAddr.toOpt())
|
||||
if not parsedPeerAddr.isOk():
|
||||
return RestApiResponse.badRequest(parsedPeerAddr.error)
|
||||
|
||||
var peerOpt = none(RemotePeerInfo)
|
||||
if parsedPeerAddr.value.isSome():
|
||||
peerOpt = parsedPeerAddr.value
|
||||
else:
|
||||
# The user didn't specify any store peer address.
|
||||
peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
return RestApiResponse.preconditionFailed("Missing known store-peer node")
|
||||
|
||||
# Parse the rest of the parameters and create a HistoryQuery
|
||||
let histQuery = createHistoryQuery(
|
||||
pubsubTopic.toOpt(),
|
||||
contentTopics.toOpt(),
|
||||
senderTime.toOpt(),
|
||||
storeTime.toOpt(),
|
||||
digest.toOpt(),
|
||||
startTime.toOpt(),
|
||||
endTime.toOpt(),
|
||||
pageSize.toOpt(),
|
||||
ascending.toOpt()
|
||||
)
|
||||
|
||||
if not histQuery.isOk():
|
||||
return RestApiResponse.badRequest(histQuery.error)
|
||||
|
||||
return await node.performHistoryQuery(histQuery.value,
|
||||
peerOpt.get())
|
||||
|
||||
# Registers the Api Handlers
|
||||
proc installStoreApiHandlers*(router: var RestRouter,
|
||||
node: WakuNode) =
|
||||
installStoreV1Handler(router, node)
|
203
waku/v2/node/rest/store/openapi.yaml
Normal file
203
waku/v2/node/rest/store/openapi.yaml
Normal file
@ -0,0 +1,203 @@
|
||||
openapi: 3.0.3
|
||||
info:
|
||||
title: Waku V2 node REST API
|
||||
version: 1.0.0
|
||||
contact:
|
||||
name: VAC Team
|
||||
url: https://forum.vac.dev/
|
||||
|
||||
tags:
|
||||
- name: store
|
||||
description: Store REST API for WakuV2 node
|
||||
|
||||
paths:
|
||||
/store/v1/messages:
|
||||
get:
|
||||
summary: Gets message history
|
||||
description: >
|
||||
Retrieves WakuV2 message history. The returned history
|
||||
can be potentially filtered by optional request parameters.
|
||||
operationId: getMessageHistory
|
||||
tags:
|
||||
- store
|
||||
parameters:
|
||||
- name: peerAddr
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
required: true
|
||||
description: >
|
||||
P2P fully qualified peer multiaddress
|
||||
in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded.
|
||||
example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN'
|
||||
|
||||
- name: pubsubTopic
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: >
|
||||
The pubsub topic on which a WakuMessage is published.
|
||||
If left empty, no filtering is applied.
|
||||
It is also intended for pagination purposes.
|
||||
It should be a URL-encoded string.
|
||||
example: 'my%20pubsub%20topic'
|
||||
|
||||
- name: contentTopics
|
||||
in: query
|
||||
schema: string
|
||||
description: >
|
||||
Comma-separated list of content topics. When specified,
|
||||
only WakuMessages that are linked to any of the given
|
||||
content topics will be delivered in the get response.
|
||||
It should be a URL-encoded-comma-separated string.
|
||||
example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic'
|
||||
|
||||
- name: startTime
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: >
|
||||
The inclusive lower bound on the timestamp of
|
||||
queried WakuMessages. This field holds the
|
||||
Unix epoch time in nanoseconds as a 64-bits
|
||||
integer value.
|
||||
example: '1680590945000000000'
|
||||
|
||||
- name: endTime
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: >
|
||||
The inclusive upper bound on the timestamp of
|
||||
queried WakuMessages. This field holds the
|
||||
Unix epoch time in nanoseconds as a 64-bits
|
||||
integer value.
|
||||
example: '1680590945000000000'
|
||||
|
||||
- name: senderTime
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: >
|
||||
Cursor field intended for pagination purposes.
|
||||
Represents the Unix time in nanoseconds at which a message was generated.
|
||||
It could be empty for retrieving the first page,
|
||||
and will be returned from the GET response so that
|
||||
it can be part of the next page request.
|
||||
example: '1680590947000000000'
|
||||
|
||||
- name: storeTime
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: >
|
||||
Cursor field intended for pagination purposes.
|
||||
Represents the Unix time in nanoseconds at which a message was stored.
|
||||
It could be empty for retrieving the first page,
|
||||
and will be returned from the GET response so that
|
||||
it can be part of the next page request.
|
||||
example: '1680590945000000000'
|
||||
|
||||
- name: digest
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: >
|
||||
Cursor field intended for pagination purposes.
|
||||
URL-base64-encoded string computed as a hash of the
|
||||
a message content topic plus a message payload.
|
||||
It could be empty for retrieving the first page,
|
||||
and will be returned from the GET response so that
|
||||
it can be part of the next page request.
|
||||
example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D'
|
||||
|
||||
- name: pageSize
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: >
|
||||
Number of messages to retrieve per page
|
||||
example: '5'
|
||||
|
||||
- name: ascending
|
||||
in: query
|
||||
schema:
|
||||
type: string
|
||||
description: >
|
||||
"true" for paging forward, "false" for paging backward
|
||||
example: "true"
|
||||
|
||||
responses:
|
||||
'200':
|
||||
description: WakuV2 message history.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/StoreResponse'
|
||||
'400':
|
||||
description: Bad request error.
|
||||
content:
|
||||
text/plain:
|
||||
type: string
|
||||
'412':
|
||||
description: Precondition failed.
|
||||
content:
|
||||
text/plain:
|
||||
type: string
|
||||
'500':
|
||||
description: Internal server error.
|
||||
content:
|
||||
text/plain:
|
||||
type: string
|
||||
|
||||
components:
|
||||
schemas:
|
||||
StoreResponse:
|
||||
type: object
|
||||
properties:
|
||||
messages:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/WakuMessage'
|
||||
cursor:
|
||||
$ref: '#/components/schemas/HistoryCursor'
|
||||
error_message:
|
||||
type: string
|
||||
required:
|
||||
- messages
|
||||
|
||||
HistoryCursor:
|
||||
type: object
|
||||
properties:
|
||||
pubsub_topic:
|
||||
type: string
|
||||
sender_time:
|
||||
type: string
|
||||
store_time:
|
||||
type: string
|
||||
digest:
|
||||
type: string
|
||||
required:
|
||||
- pubsub_topic
|
||||
- sender_time
|
||||
- store_time
|
||||
- digest
|
||||
|
||||
WakuMessage:
|
||||
type: object
|
||||
properties:
|
||||
payload:
|
||||
type: string
|
||||
content_topic:
|
||||
type: string
|
||||
version:
|
||||
type: integer
|
||||
format: int32
|
||||
timestamp:
|
||||
type: integer
|
||||
format: int64
|
||||
ephemeral:
|
||||
type: boolean
|
||||
required:
|
||||
- payload
|
||||
- content_topic
|
375
waku/v2/node/rest/store/types.nim
Normal file
375
waku/v2/node/rest/store/types.nim
Normal file
@ -0,0 +1,375 @@
|
||||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[sets, strformat, uri],
|
||||
stew/byteutils,
|
||||
chronicles,
|
||||
json_serialization,
|
||||
json_serialization/std/options,
|
||||
presto/[route, client, common]
|
||||
import
|
||||
../../../protocol/waku_store/common as waku_store_common,
|
||||
../../../../common/base64,
|
||||
../../../utils/time,
|
||||
../../../protocol/waku_message/topics/content_topic,
|
||||
../../../protocol/waku_message/topics/pubsub_topic,
|
||||
../../../protocol/waku_message/message,
|
||||
../serdes
|
||||
|
||||
|
||||
#### Types
|
||||
|
||||
type
|
||||
HistoryCursorRest* = object
|
||||
pubsubTopic*: PubsubTopic
|
||||
senderTime*: Timestamp
|
||||
storeTime*: Timestamp
|
||||
digest*: MessageDigest
|
||||
|
||||
StoreRequestRest* = object
|
||||
# inspired by https://github.com/waku-org/nwaku/blob/f95147f5b7edfd45f914586f2d41cd18fb0e0d18/waku/v2/protocol/waku_store/common.nim#L52
|
||||
pubsubTopic*: Option[PubsubTopic]
|
||||
contentTopics*: seq[ContentTopic]
|
||||
cursor*: Option[HistoryCursorRest]
|
||||
startTime*: Option[Timestamp]
|
||||
endTime*: Option[Timestamp]
|
||||
pageSize*: uint64
|
||||
ascending*: bool
|
||||
|
||||
StoreWakuMessage* = object
|
||||
payload*: Base64String
|
||||
contentTopic*: Option[ContentTopic]
|
||||
version*: Option[uint32]
|
||||
timestamp*: Option[Timestamp]
|
||||
ephemeral*: Option[bool]
|
||||
|
||||
StoreResponseRest* = object
|
||||
# inspired by https://rfc.vac.dev/spec/16/#storeresponse
|
||||
messages*: seq[StoreWakuMessage]
|
||||
cursor*: Option[HistoryCursorRest]
|
||||
# field that contains error information
|
||||
errorMessage*: Option[string]
|
||||
|
||||
|
||||
#### Type conversion
|
||||
|
||||
# Converts a URL-encoded-base64 string into a 'MessageDigest'
|
||||
proc parseMsgDigest*(input: Option[string]):
|
||||
Result[Option[MessageDigest], string] =
|
||||
|
||||
if not input.isSome() or input.get() == "":
|
||||
return ok(none(MessageDigest))
|
||||
|
||||
let decodedUrl = decodeUrl(input.get())
|
||||
let base64Decoded = base64.decode(Base64String(decodedUrl))
|
||||
var messageDigest = MessageDigest()
|
||||
|
||||
if not base64Decoded.isOk():
|
||||
return err(base64Decoded.error)
|
||||
|
||||
let base64DecodedArr = base64Decoded.get()
|
||||
# Next snippet inspired by "nwaku/waku/v2/protocol/waku_archive/archive.nim"
|
||||
# TODO: Improve coherence of MessageDigest type
|
||||
messageDigest = block:
|
||||
var data: array[32, byte]
|
||||
for i in 0..<min(base64DecodedArr.len, 32):
|
||||
data[i] = base64DecodedArr[i]
|
||||
|
||||
MessageDigest(data: data)
|
||||
|
||||
return ok(some(messageDigest))
|
||||
|
||||
# Converts a given MessageDigest object into a suitable
|
||||
# Base64-URL-encoded string suitable to be transmitted in a Rest
|
||||
# request-response. The MessageDigest is first base64 encoded
|
||||
# and this result is URL-encoded.
|
||||
proc toRestStringMessageDigest*(self: MessageDigest): string =
|
||||
let base64Encoded = base64.encode(self.data)
|
||||
encodeUrl($base64Encoded)
|
||||
|
||||
proc toWakuMessage*(message: StoreWakuMessage): WakuMessage =
|
||||
WakuMessage(
|
||||
payload: base64.decode(message.payload).get(),
|
||||
contentTopic: message.contentTopic.get(),
|
||||
version: message.version.get(),
|
||||
timestamp: message.timestamp.get(),
|
||||
ephemeral: message.ephemeral.get()
|
||||
)
|
||||
|
||||
# Converts a 'HistoryResponse' object to an 'StoreResponseRest'
|
||||
# that can be serialized to a json object.
|
||||
proc toStoreResponseRest*(histResp: HistoryResponse): StoreResponseRest =
|
||||
|
||||
proc toStoreWakuMessage(message: WakuMessage): StoreWakuMessage =
|
||||
StoreWakuMessage(
|
||||
payload: base64.encode(message.payload),
|
||||
contentTopic: some(message.contentTopic),
|
||||
version: some(message.version),
|
||||
timestamp: some(message.timestamp),
|
||||
ephemeral: some(message.ephemeral)
|
||||
)
|
||||
|
||||
var storeWakuMsgs: seq[StoreWakuMessage]
|
||||
for m in histResp.messages:
|
||||
storeWakuMsgs.add(m.toStoreWakuMessage())
|
||||
|
||||
var cursor = none(HistoryCursorRest)
|
||||
if histResp.cursor.isSome:
|
||||
cursor = some(HistoryCursorRest(
|
||||
pubsubTopic: histResp.cursor.get().pubsubTopic,
|
||||
senderTime: histResp.cursor.get().senderTime,
|
||||
storeTime: histResp.cursor.get().storeTime,
|
||||
digest: histResp.cursor.get().digest
|
||||
))
|
||||
|
||||
StoreResponseRest(
|
||||
messages: storeWakuMsgs,
|
||||
cursor: cursor
|
||||
)
|
||||
|
||||
## Beginning of Base64String serde
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson],
|
||||
value: Base64String)
|
||||
{.raises: [IOError].} =
|
||||
writer.writeValue(string(value))
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson],
|
||||
value: var Base64String)
|
||||
{.raises: [SerializationError, IOError].} =
|
||||
value = Base64String(reader.readValue(string))
|
||||
|
||||
## End of Base64String serde
|
||||
|
||||
## Beginning of StoreWakuMessage serde
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson],
|
||||
value: StoreWakuMessage)
|
||||
{.raises: [IOError].} =
|
||||
writer.beginRecord()
|
||||
writer.writeField("payload", $value.payload)
|
||||
if value.contentTopic.isSome:
|
||||
writer.writeField("content_topic", value.contentTopic)
|
||||
if value.version.isSome:
|
||||
writer.writeField("version", value.version)
|
||||
if value.timestamp.isSome:
|
||||
writer.writeField("timestamp", value.timestamp)
|
||||
if value.ephemeral.isSome:
|
||||
writer.writeField("ephemeral", value.ephemeral)
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson],
|
||||
value: var StoreWakuMessage)
|
||||
{.raises: [SerializationError, IOError].} =
|
||||
var
|
||||
payload = none(Base64String)
|
||||
contentTopic = none(ContentTopic)
|
||||
version = none(uint32)
|
||||
timestamp = none(Timestamp)
|
||||
ephemeral = none(bool)
|
||||
|
||||
var keys = initHashSet[string]()
|
||||
for fieldName in readObjectFields(reader):
|
||||
# Check for reapeated keys
|
||||
if keys.containsOrIncl(fieldName):
|
||||
let err = try: fmt"Multiple `{fieldName}` fields found"
|
||||
except CatchableError: "Multiple fields with the same name found"
|
||||
reader.raiseUnexpectedField(err, "StoreWakuMessage")
|
||||
|
||||
case fieldName
|
||||
of "payload":
|
||||
payload = some(reader.readValue(Base64String))
|
||||
of "content_topic":
|
||||
contentTopic = some(reader.readValue(ContentTopic))
|
||||
of "version":
|
||||
version = some(reader.readValue(uint32))
|
||||
of "timestamp":
|
||||
timestamp = some(reader.readValue(Timestamp))
|
||||
of "ephemeral":
|
||||
ephemeral = some(reader.readValue(bool))
|
||||
else:
|
||||
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
|
||||
|
||||
if payload.isNone():
|
||||
reader.raiseUnexpectedValue("Field `payload` is missing")
|
||||
|
||||
value = StoreWakuMessage(
|
||||
payload: payload.get(),
|
||||
contentTopic: contentTopic,
|
||||
version: version,
|
||||
timestamp: timestamp,
|
||||
ephemeral: ephemeral
|
||||
)
|
||||
|
||||
## End of StoreWakuMessage serde
|
||||
|
||||
## Beginning of MessageDigest serde
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson],
|
||||
value: MessageDigest)
|
||||
{.raises: [IOError].} =
|
||||
writer.beginRecord()
|
||||
writer.writeField("data", base64.encode(value.data))
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson],
|
||||
value: var MessageDigest)
|
||||
{.raises: [SerializationError, IOError].} =
|
||||
var
|
||||
data = none(seq[byte])
|
||||
|
||||
for fieldName in readObjectFields(reader):
|
||||
case fieldName
|
||||
of "data":
|
||||
if data.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `data` fields found", "MessageDigest")
|
||||
let decoded = base64.decode(reader.readValue(Base64String))
|
||||
if not decoded.isOk():
|
||||
reader.raiseUnexpectedField("Failed decoding data", "MessageDigest")
|
||||
data = some(decoded.get())
|
||||
else:
|
||||
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
|
||||
|
||||
if data.isNone():
|
||||
reader.raiseUnexpectedValue("Field `data` is missing")
|
||||
|
||||
for i in 0..<32:
|
||||
value.data[i] = data.get()[i]
|
||||
|
||||
## End of MessageDigest serde
|
||||
|
||||
## Beginning of HistoryCursorRest serde
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson],
|
||||
value: HistoryCursorRest)
|
||||
{.raises: [IOError].} =
|
||||
writer.beginRecord()
|
||||
writer.writeField("pubsub_topic", value.pubsubTopic)
|
||||
writer.writeField("sender_time", value.senderTime)
|
||||
writer.writeField("store_time", value.storeTime)
|
||||
writer.writeField("digest", value.digest)
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson],
|
||||
value: var HistoryCursorRest)
|
||||
{.raises: [SerializationError, IOError].} =
|
||||
var
|
||||
pubsubTopic = none(PubsubTopic)
|
||||
senderTime = none(Timestamp)
|
||||
storeTime = none(Timestamp)
|
||||
digest = none(MessageDigest)
|
||||
|
||||
for fieldName in readObjectFields(reader):
|
||||
case fieldName
|
||||
of "pubsub_topic":
|
||||
if pubsubTopic.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `pubsub_topic` fields found", "HistoryCursorRest")
|
||||
pubsubTopic = some(reader.readValue(PubsubTopic))
|
||||
of "sender_time":
|
||||
if senderTime.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `sender_time` fields found", "HistoryCursorRest")
|
||||
senderTime = some(reader.readValue(Timestamp))
|
||||
of "store_time":
|
||||
if storeTime.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `store_time` fields found", "HistoryCursorRest")
|
||||
storeTime = some(reader.readValue(Timestamp))
|
||||
of "digest":
|
||||
if digest.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `digest` fields found", "HistoryCursorRest")
|
||||
digest = some(reader.readValue(MessageDigest))
|
||||
else:
|
||||
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
|
||||
|
||||
if pubsubTopic.isNone():
|
||||
reader.raiseUnexpectedValue("Field `pubsub_topic` is missing")
|
||||
|
||||
if senderTime.isNone():
|
||||
reader.raiseUnexpectedValue("Field `sender_time` is missing")
|
||||
|
||||
if storeTime.isNone():
|
||||
reader.raiseUnexpectedValue("Field `store_time` is missing")
|
||||
|
||||
if digest.isNone():
|
||||
reader.raiseUnexpectedValue("Field `digest` is missing")
|
||||
|
||||
value = HistoryCursorRest(
|
||||
pubsubTopic: pubsubTopic.get(),
|
||||
senderTime: senderTime.get(),
|
||||
storeTime: storeTime.get(),
|
||||
digest: digest.get()
|
||||
)
|
||||
|
||||
## End of HistoryCursorRest serde
|
||||
|
||||
## Beginning of StoreResponseRest serde
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson],
|
||||
value: StoreResponseRest)
|
||||
{.raises: [IOError].} =
|
||||
writer.beginRecord()
|
||||
writer.writeField("messages", value.messages)
|
||||
if value.cursor.isSome:
|
||||
writer.writeField("cursor", value.cursor)
|
||||
if value.errorMessage.isSome:
|
||||
writer.writeField("error_message", value.errorMessage)
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson],
|
||||
value: var StoreResponseRest)
|
||||
{.raises: [SerializationError, IOError].} =
|
||||
var
|
||||
messages = none(seq[StoreWakuMessage])
|
||||
cursor = none(HistoryCursorRest)
|
||||
errorMessage = none(string)
|
||||
|
||||
for fieldName in readObjectFields(reader):
|
||||
case fieldName
|
||||
of "messages":
|
||||
if messages.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `messages` fields found", "StoreResponseRest")
|
||||
messages = some(reader.readValue(seq[StoreWakuMessage]))
|
||||
of "cursor":
|
||||
if cursor.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `cursor` fields found", "StoreResponseRest")
|
||||
cursor = some(reader.readValue(HistoryCursorRest))
|
||||
of "error_message":
|
||||
if errorMessage.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `error_message` fields found", "StoreResponseRest")
|
||||
errorMessage = some(reader.readValue(string))
|
||||
else:
|
||||
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
|
||||
|
||||
if messages.isNone():
|
||||
reader.raiseUnexpectedValue("Field `messages` is missing")
|
||||
|
||||
value = StoreResponseRest(
|
||||
messages: messages.get(),
|
||||
cursor: cursor,
|
||||
errorMessage: errorMessage
|
||||
)
|
||||
|
||||
## End of StoreResponseRest serde
|
||||
|
||||
## Beginning of StoreRequestRest serde
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson],
|
||||
value: StoreRequestRest)
|
||||
{.raises: [IOError].} =
|
||||
|
||||
writer.beginRecord()
|
||||
if value.pubsubTopic.isSome:
|
||||
writer.writeField("pubsub_topic", value.pubsubTopic)
|
||||
writer.writeField("content_topics", value.contentTopics)
|
||||
if value.startTime.isSome:
|
||||
writer.writeField("start_time", value.startTime)
|
||||
if value.endTime.isSome:
|
||||
writer.writeField("end_time", value.endTime)
|
||||
writer.writeField("page_size", value.pageSize)
|
||||
writer.writeField("ascending", value.ascending)
|
||||
writer.endRecord()
|
||||
|
||||
## End of StoreRequestRest serde
|
||||
|
@ -5,7 +5,7 @@ else:
|
||||
|
||||
# Collection of utilities related to Waku peers
|
||||
import
|
||||
std/[options, sequtils, strutils],
|
||||
std/[options, sequtils, strutils, uri],
|
||||
chronos,
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
@ -142,6 +142,21 @@ proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, Va
|
||||
|
||||
return RemotePeerInfo.init(peerIdStr, @[wireAddr])
|
||||
|
||||
# Checks whether the peerAddr parameter represents a valid p2p multiaddress.
|
||||
# The param must be in the format `(ip4|ip6)/tcp/p2p/$peerId` but URL-encoded
|
||||
proc parseUrlPeerAddr*(peerAddr: Option[string]):
|
||||
Result[Option[RemotePeerInfo], string] =
|
||||
|
||||
if not peerAddr.isSome() or peerAddr.get() == "":
|
||||
return ok(none(RemotePeerInfo))
|
||||
|
||||
try:
|
||||
let parsedAddr = decodeUrl(peerAddr.get())
|
||||
return ok(some(parseRemotePeerInfo(parsedAddr)))
|
||||
except Exception:
|
||||
return err("Failed parsing remote peer info [" &
|
||||
getCurrentExceptionMsg() & "]")
|
||||
|
||||
## Converts an ENR to dialable RemotePeerInfo
|
||||
proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
|
||||
let typedR = ? enr.toTypedRecord
|
||||
|
Loading…
x
Reference in New Issue
Block a user