refactor(rest): deep code and tests reorganization

This commit is contained in:
Lorenzo Delgado 2023-02-13 15:22:24 +01:00 committed by GitHub
parent 9a40ad6400
commit 0a15ce48d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 400 additions and 276 deletions

View File

@ -10,8 +10,9 @@ import
import
../../waku/v2/node/waku_node,
../../waku/v2/node/rest/server,
../../waku/v2/node/rest/debug/debug_api,
../../waku/v2/node/rest/relay/[relay_api, topic_cache],
../../waku/v2/node/rest/debug/handlers as debug_api,
../../waku/v2/node/rest/relay/handlers as relay_api,
../../waku/v2/node/rest/relay/topic_cache,
./config
@ -19,14 +20,14 @@ logScope:
topics = "wakunode rest"
proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) =
proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) =
let serverResult = newRestHttpServer(address, port)
if serverResult.isErr():
notice "REST HTTP server could not be started", address = $address&":" & $port, reason = serverResult.error()
return
let server = serverResult.get()
## Debug REST API
installDebugApiHandlers(server.router, node)

View File

@ -47,11 +47,6 @@ import
./v2/test_peer_store_extended,
./v2/test_utils_peers,
./v2/test_message_cache,
./v2/test_rest_serdes,
./v2/test_rest_debug_api_serdes,
./v2/test_rest_debug_api,
./v2/test_rest_relay_api_serdes,
./v2/test_rest_relay_api,
./v2/test_peer_manager,
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
./v2/test_peer_storage,
@ -80,6 +75,14 @@ import
./v2/wakunode_jsonrpc/test_jsonrpc_relay,
./v2/wakunode_jsonrpc/test_jsonrpc_store
## Wakunode Rest API test suite
import
./v2/wakunode_rest/test_rest_debug,
./v2/wakunode_rest/test_rest_debug_serdes,
./v2/wakunode_rest/test_rest_relay,
./v2/wakunode_rest/test_rest_relay_serdes,
./v2/wakunode_rest/test_rest_serdes
## Apps

View File

@ -1,53 +0,0 @@
{.used.}
import
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/peerinfo,
libp2p/multiaddress,
libp2p/crypto/crypto
import
../../waku/v2/node/waku_node,
../../waku/v2/node/rest/[server, client, utils],
../../waku/v2/node/rest/debug/debug_api,
./testlib/waku2
proc testWakuNode(): WakuNode =
let
privkey = generateSecp256k1Key()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
suite "REST API - Debug":
asyncTest "Get node info - GET /debug/v1/info":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(8546)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
installDebugApiHandlers(restServer.router, node)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.debugInfoV1()
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.listenAddresses == @[$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId]
await restServer.stop()
await restServer.closeWait()
await node.stop()

View File

@ -0,0 +1,84 @@
{.used.}
import
stew/shims/net,
testutils/unittests,
presto,
presto/client as presto_client,
libp2p/peerinfo,
libp2p/multiaddress,
libp2p/crypto/crypto
import
../../waku/v2/node/waku_node,
../../waku/v2/node/rest/server,
../../waku/v2/node/rest/client,
../../waku/v2/node/rest/responses,
../../waku/v2/node/rest/debug/handlers as debug_api,
../../waku/v2/node/rest/debug/client as debug_api_client
proc testWakuNode(): WakuNode =
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(58000)
WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
suite "Waku v2 REST API - Debug":
asyncTest "Get node info - GET /debug/v1/info":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(58001)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
installDebugApiHandlers(restServer.router, node)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.debugInfoV1()
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.listenAddresses == @[$node.switch.peerInfo.addrs[^1] & "/p2p/" & $node.switch.peerInfo.peerId]
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Get node version - GET /debug/v1/version":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(58002)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
installDebugApiHandlers(restServer.router, node)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.debugVersionV1()
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == waku_node.git_version
await restServer.stop()
await restServer.closeWait()
await node.stop()

View File

@ -1,16 +1,16 @@
{.used.}
import
stew/results,
stew/results,
stew/byteutils,
testutils/unittests,
json_serialization
import
import
../../waku/v2/node/rest/serdes,
../../waku/v2/node/rest/debug/api_types
../../waku/v2/node/rest/debug/types
suite "Debug API - serialization":
suite "Waku v2 REST API - Debug - serialization":
suite "DebugWakuInfo - decode":
test "optional field is not provided":
@ -21,11 +21,11 @@ suite "Debug API - serialization":
let res = decodeFromJsonBytes(DebugWakuInfo, jsonBytes, requireAllFields = true)
# Then
require(res.isOk)
require(res.isOk())
let value = res.get()
check:
value.listenAddresses == @["123"]
value.enrUri.isNone
value.enrUri.isNone()
suite "DebugWakuInfo - encode":
test "optional field is none":
@ -36,7 +36,7 @@ suite "Debug API - serialization":
let res = encodeIntoJsonBytes(data)
# Then
require(res.isOk)
require(res.isOk())
let value = res.get()
check:
value == toBytes("""{"listenAddresses":["GO"]}""")

View File

@ -8,13 +8,19 @@ import
presto, presto/client as presto_client,
libp2p/crypto/crypto
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/waku_node,
../../waku/v2/node/rest/[server, client, base64, utils],
../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache],
../../waku/v2/node/rest/server,
../../waku/v2/node/rest/client,
../../waku/v2/node/rest/base64,
../../waku/v2/node/rest/responses,
../../waku/v2/node/rest/relay/types,
../../waku/v2/node/rest/relay/handlers as relay_api,
../../waku/v2/node/rest/relay/client as relay_api_client,
../../waku/v2/node/rest/relay/topic_cache,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_relay,
../../waku/v2/utils/time,
./testlib/waku2
../testlib/waku2
proc testWakuNode(): WakuNode =
@ -22,19 +28,19 @@ proc testWakuNode(): WakuNode =
privkey = generateSecp256k1Key()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
port = Port(0)
WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
suite "REST API - Relay":
suite "Waku v2 Rest API - Relay":
asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
let restPort = Port(8546)
let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
@ -79,7 +85,7 @@ suite "REST API - Relay":
await node.start()
await node.mountRelay()
let restPort = Port(8546)
let restPort = Port(58012)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
@ -127,7 +133,7 @@ suite "REST API - Relay":
await node.start()
await node.mountRelay()
let restPort = Port(8546)
let restPort = Port(58013)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
@ -179,7 +185,7 @@ suite "REST API - Relay":
await node.mountRelay()
# RPC server setup
let restPort = Port(8546)
let restPort = Port(58014)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()

View File

@ -8,11 +8,12 @@ import
import
../../waku/v2/node/rest/serdes,
../../waku/v2/node/rest/base64,
../../waku/v2/node/rest/relay/api_types,
../../waku/v2/node/rest/relay/types,
../../waku/v2/protocol/waku_message
suite "Relay API - serialization":
suite "Waku v2 Rest API - Relay - serialization":
suite "RelayWakuMessage - decode":
test "optional fields are not provided":

View File

@ -1,18 +1,18 @@
{.used.}
import std/typetraits
import chronicles,
unittest2,
import
stew/[results, byteutils],
chronicles,
unittest2,
json_serialization
import
import
../../waku/v2/node/rest/serdes,
../../waku/v2/node/rest/debug/api_types
../../waku/v2/node/rest/debug/types
# TODO: Decouple this test suite from the `debug_api` module by defining
# private custom types for this test suite module
suite "Serdes":
suite "Waku v2 Rest API - Serdes":
suite "decode":
test "decodeFromJsonString - use the corresponding readValue template":
@ -28,7 +28,7 @@ suite "Serdes":
check:
value.listenAddresses == @["123"]
value.enrUri.isNone
test "decodeFromJsonBytes - use the corresponding readValue template":
# Given
let jsonBytes = toBytes("""{ "listenAddresses":["123"] }""")

View File

@ -0,0 +1,49 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client]
import
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest debug_api"
proc decodeBytes*(t: typedesc[DebugWakuInfo], data: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[DebugWakuInfo] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported respose contentType value", contentType = contentType
return err("Unsupported response contentType")
let decoded = ?decodeFromJsonBytes(DebugWakuInfo, data)
return ok(decoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc debugInfoV1*(): RestResponse[DebugWakuInfo] {.rest, endpoint: "/debug/v1/info", meth: HttpMethod.MethodGet.}
proc decodeBytes*(t: typedesc[string], value: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc debugVersionV1*(): RestResponse[string] {.rest, endpoint: "/debug/v1/version", meth: HttpMethod.MethodGet.}

View File

@ -1,48 +0,0 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client]
import "."/api_types
import ".."/[serdes, utils]
import ../../waku_node
logScope:
topics = "waku node rest debug_api"
#### Server request handlers
const ROUTE_DEBUG_INFOV1* = "/debug/v1/info"
proc installDebugInfoV1Handler(router: var RestRouter, node: WakuNode) =
router.api(MethodGet, ROUTE_DEBUG_INFOV1) do () -> RestApiResponse:
let info = node.info().toDebugWakuInfo()
let resp = RestApiResponse.jsonResponse(info, status=Http200)
if resp.isErr():
debug "An error occurred while building the json respose", error=resp.error()
return RestApiResponse.internalServerError()
return resp.get()
proc installDebugApiHandlers*(router: var RestRouter, node: WakuNode) =
installDebugInfoV1Handler(router, node)
#### Client
proc decodeBytes*(t: typedesc[DebugWakuInfo], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[DebugWakuInfo] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported respose contentType value", contentType = contentType
return err("Unsupported response contentType")
let decoded = ?decodeFromJsonBytes(DebugWakuInfo, data)
return ok(decoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc debugInfoV1*(): RestResponse[DebugWakuInfo] {.rest, endpoint: "/debug/v1/info", meth: HttpMethod.MethodGet.}

View File

@ -0,0 +1,45 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronicles,
json_serialization,
presto/route
import
../../../../waku/v2/node/waku_node,
../responses,
../serdes,
./types
export types
logScope:
topics = "waku node rest debug_api"
const ROUTE_DEBUG_INFOV1* = "/debug/v1/info"
proc installDebugInfoV1Handler(router: var RestRouter, node: WakuNode) =
router.api(MethodGet, ROUTE_DEBUG_INFOV1) do () -> RestApiResponse:
let info = node.info().toDebugWakuInfo()
let resp = RestApiResponse.jsonResponse(info, status=Http200)
if resp.isErr():
debug "An error occurred while building the json respose", error=resp.error
return RestApiResponse.internalServerError()
return resp.get()
const ROUTE_DEBUG_VERSIONV1* = "/debug/v1/version"
proc installDebugVersionV1Handler(router: var RestRouter, node: WakuNode) =
router.api(MethodGet, ROUTE_DEBUG_VERSIONV1) do () -> RestApiResponse:
return RestApiResponse.textResponse(git_version, status=Http200)
proc installDebugApiHandlers*(router: var RestRouter, node: WakuNode) =
installDebugInfoV1Handler(router, node)
installDebugVersionV1Handler(router, node)

View File

@ -1,6 +1,6 @@
openapi: 3.0.3
info:
title: Waku V2 node REST API
title: Waku V2 node REST API
version: 1.0.0
contact:
name: VAC Team
@ -25,6 +25,27 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/WakuInfo'
'4XX':
description: Bad request error.
'5XX':
description: Unexpected error.
/debug/v1/version:
get:
summary: Get node version
description: Retrieve the Waku v2 node version.
operationId: getNodeVersion
tags:
- debug
responses:
'200':
description: The version of a Waku v2 node.
content:
text/plain:
schema:
type: string
'4XX':
description: Bad request error.
'5XX':
description: Unexpected error.

View File

@ -7,8 +7,9 @@ import
chronicles,
json_serialization,
json_serialization/std/options
import ".."/serdes
import ../../waku_node
import
../../../../waku/v2/node/waku_node,
../serdes
#### Types
@ -20,7 +21,7 @@ type
#### Type conversion
proc toDebugWakuInfo*(nodeInfo: WakuInfo): DebugWakuInfo =
proc toDebugWakuInfo*(nodeInfo: WakuInfo): DebugWakuInfo =
DebugWakuInfo(
listenAddresses: nodeInfo.listenAddresses,
enrUri: some(nodeInfo.enrUri)
@ -30,7 +31,7 @@ proc toDebugWakuInfo*(nodeInfo: WakuInfo): DebugWakuInfo =
#### Serialization and deserialization
proc writeValue*(writer: var JsonWriter[RestJson], value: DebugWakuInfo)
{.raises: [IOError, Defect].} =
{.raises: [IOError].} =
writer.beginRecord()
writer.writeField("listenAddresses", value.listenAddresses)
if value.enrUri.isSome:
@ -38,7 +39,7 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: DebugWakuInfo)
writer.endRecord()
proc readValue*(reader: var JsonReader[RestJson], value: var DebugWakuInfo)
{.raises: [SerializationError, IOError, Defect].} =
{.raises: [SerializationError, IOError].} =
var
listenAddresses: Option[seq[string]]
enrUri: Option[string]
@ -62,4 +63,4 @@ proc readValue*(reader: var JsonReader[RestJson], value: var DebugWakuInfo)
value = DebugWakuInfo(
listenAddresses: listenAddresses.get,
enrUri: enrUri
)
)

View File

@ -0,0 +1,75 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sets,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../../../waku/v2/protocol/waku_message,
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest client"
proc encodeBytes*(value: seq[PubSubTopic],
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[RelayGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported respose contentType value", contentType = contentType
return err("Unsupported response contentType")
let decoded = ?decodeFromJsonBytes(RelayGetMessagesResponse, data)
return ok(decoded)
proc encodeBytes*(value: RelayPostMessagesRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.}

View File

@ -4,21 +4,24 @@ else:
{.push raises: [].}
import
std/[sets, sequtils],
std/sequtils,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../waku_node,
../../../protocol/waku_message,
presto/route,
presto/common
import
../../../../waku/v2/node/waku_node,
../serdes,
../utils,
./api_types,
../responses,
./types,
./topic_cache
logScope:
export types
logScope:
topics = "waku node rest relay_api"
@ -31,7 +34,7 @@ const futTimeout* = 5.seconds # Max time to wait for futures
const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions"
proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics
@ -40,7 +43,7 @@ proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuN
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
@ -53,17 +56,17 @@ proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuN
let req: RelayPostSubscriptionsRequest = reqResult.get()
for topic in req:
if topicCache.isSubscribed(string(topic)):
if cache.isSubscribed(string(topic)):
# Only subscribe to topics for which we have no subscribed topic handlers yet
continue
topicCache.subscribe(string(topic))
node.subscribe(string(topic), topicCache.messageHandler())
cache.subscribe(string(topic))
node.subscribe(string(topic), cache.messageHandler())
return RestApiResponse.ok()
proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
router.api(MethodDelete, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics
# debug "delete_waku_v2_relay_v1_subscriptions"
@ -71,7 +74,7 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
@ -86,16 +89,16 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak
# Unsubscribe all handlers from requested topics
for topic in req:
node.unsubscribeAll(string(topic))
topicCache.unsubscribe(string(topic))
cache.unsubscribe(string(topic))
# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
return RestApiResponse.ok()
const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}"
proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse:
proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse:
# ## Returns all WakuMessages received on a PubSub topic since the
# ## last time this method was called
# ## TODO: ability to specify a return message limit
@ -105,22 +108,22 @@ proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, t
return RestApiResponse.badRequest()
let pubSubTopic = topic.get()
let messages = topicCache.getMessages(pubSubTopic, clear=true)
let messages = cache.getMessages(pubSubTopic, clear=true)
if messages.isErr():
debug "Not subscribed to topic", topic=pubSubTopic
return RestApiResponse.notFound()
return RestApiResponse.notFound()
let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status=Http200)
if resp.isErr():
debug "An error ocurred while building the json respose", error=resp.error()
debug "An error ocurred while building the json respose", error=resp.error
return RestApiResponse.internalServerError()
return resp.get()
proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) =
router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
if topic.isErr():
return RestApiResponse.badRequest()
let pubSubTopic = topic.get()
@ -128,7 +131,7 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode)
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
@ -143,68 +146,14 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode)
return RestApiResponse.badRequest()
if not (waitFor node.publish(pubSubTopic, resMessage.value).withTimeout(futTimeout)):
error "Failed to publish message to topic", topic=pubSubTopic
error "Failed to publish message to topic", topic=pubSubTopic
return RestApiResponse.internalServerError()
return RestApiResponse.ok()
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
installRelayGetMessagesV1Handler(router, node, topicCache)
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: TopicCache) =
installRelayPostSubscriptionsV1Handler(router, node, cache)
installRelayDeleteSubscriptionsV1Handler(router, node, cache)
installRelayGetMessagesV1Handler(router, node, cache)
installRelayPostMessagesV1Handler(router, node)
installRelayPostSubscriptionsV1Handler(router, node, topicCache)
installRelayDeleteSubscriptionsV1Handler(router, node, topicCache)
#### Client
proc encodeBytes*(value: seq[PubSubTopic],
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[RelayGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported respose contentType value", contentType = contentType
return err("Unsupported response contentType")
let decoded = ?decodeFromJsonBytes(RelayGetMessagesResponse, data)
return ok(decoded)
proc encodeBytes*(value: RelayPostMessagesRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.}

View File

@ -4,16 +4,12 @@ else:
{.push raises: [].}
import
stew/results,
chronicles,
chronos,
libp2p/protocols/pubsub
chronicles
import
../../../protocol/waku_message,
../../message_cache
logScope:
topics = "waku node rest relay_api"
../../../../waku/v2/protocol/waku_relay,
../../../../waku/v2/protocol/waku_message,
../../../../waku/v2/node/message_cache
export message_cache
@ -27,21 +23,11 @@ type TopicCache* = MessageCache[PubSubTopic]
##### Message handler
type TopicCacheMessageHandler* = Topichandler
type TopicCacheMessageHandler* = SubscriptionHandler
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
let handler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.async, closure.} =
trace "PubsubTopic handler triggered", pubsubTopic=pubsubTopic
let handler = proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} =
cache.addMessage(PubSubTopic(pubsubTopic), msg)
# Add message to current cache
let msg = WakuMessage.decode(data)
if msg.isErr():
debug "WakuMessage received but failed to decode", msg=msg, pubsubTopic=pubsubTopic
# TODO: handle message decode failure
return
trace "WakuMessage received", msg=msg, pubsubTopic=pubsubTopic
cache.addMessage(PubSubTopic(pubsubTopic), msg.get())
handler
handler

View File

@ -56,11 +56,11 @@ proc toWakuMessage*(msg: RelayWakuMessage, version = 0): Result[WakuMessage, cst
#### Serialization and deserialization
proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String)
{.raises: [IOError, Defect].} =
{.raises: [IOError].} =
writer.writeValue(string(value))
proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
{.raises: [IOError, Defect].} =
{.raises: [IOError].} =
writer.beginRecord()
writer.writeField("payload", value.payload)
if value.contentTopic.isSome:
@ -72,11 +72,11 @@ proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
writer.endRecord()
proc readValue*(reader: var JsonReader[RestJson], value: var Base64String)
{.raises: [SerializationError, IOError, Defect].} =
{.raises: [SerializationError, IOError].} =
value = Base64String(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
{.raises: [SerializationError, IOError, Defect].} =
{.raises: [SerializationError, IOError].} =
var
payload = none(Base64String)
contentTopic = none(ContentTopic)

View File

@ -3,30 +3,35 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/typetraits
import
chronicles,
import
std/typetraits,
stew/results,
chronicles,
presto/common
import "."/serdes
import
./serdes
const MIMETYPE_JSON* = MediaType.init("application/json")
const MIMETYPE_TEXT* = MediaType.init("text/plain")
proc jsonResponse*(t: typedesc[RestApiResponse], data: auto, status: HttpCode = Http200): SerdesResult[RestApiResponse] =
let encoded = ?encodeIntoJsonBytes(data)
ok(RestApiResponse.response(encoded, status, $MIMETYPE_JSON))
proc ok*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.response("OK", Http200, $MIMETYPE_TEXT)
proc internalServerError*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.error(Http500)
proc ok*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.response("OK", status=Http200, contentType="text/plain")
proc badRequest*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.error(Http400)
proc notFound*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.error(Http404)
RestApiResponse.error(Http404)
proc jsonResponse*(t: typedesc[RestApiResponse], data: auto, status: HttpCode = Http200): SerdesResult[RestApiResponse] =
let encoded = ?encodeIntoJsonBytes(data)
ok(RestApiResponse.response(encoded, status, $MIMETYPE_JSON))
proc textResponse*(t: typedesc[RestApiResponse], data: string, status: HttpCode = Http200): RestApiResponse =
RestApiResponse.response(data, status, $MIMETYPE_TEXT)

View File

@ -16,10 +16,10 @@ type RestServerResult*[T] = Result[T, cstring]
### Configuration
type RestServerConf* = object
type RestServerConf* = object
cacheSize*: Natural ## \
## The maximum number of recently accessed states that are kept in \
## memory. Speeds up requests obtaining information for consecutive
## memory. Speeds up requests obtaining information for consecutive
## slots or epochs.
cacheTtl*: Natural ## \
@ -65,8 +65,8 @@ proc init*(T: type RestServerRef,
HttpServerFlags.QueryCommaSeparatedArray,
HttpServerFlags.NotifyDisconnect
}
let
let
headersTimeout = if conf.requestTimeout == 0: chronos.InfiniteDuration
else: seconds(int64(conf.requestTimeout))
maxHeadersSize = conf.maxRequestHeadersSize * 1024
@ -77,8 +77,8 @@ proc init*(T: type RestServerRef,
var res: RestResult[RestServerRef]
try:
res = RestServerRef.new(
router,
address,
router,
address,
serverFlags = serverFlags,
httpHeadersTimeout = headersTimeout,
maxHeadersSize = maxHeadersSize,
@ -88,9 +88,8 @@ proc init*(T: type RestServerRef,
return err(cstring(ex.msg))
res
proc newRestHttpServer*(ip: ValidIpAddress, port: Port,
allowedOrigin=none(string),
conf=RestServerConf.default()): RestServerResult[RestServerRef] =
RestServerRef.init(ip, port, allowedOrigin, conf)