feat(rest): Add HTTP REST API (#727). Add Relay REST API

This commit is contained in:
Lorenzo Delgado 2022-06-10 13:30:51 +02:00 committed by Lorenzo Delgado
parent d2feb7c763
commit cf1a7c3074
11 changed files with 1065 additions and 4 deletions

View File

@ -13,6 +13,9 @@ import
./v2/test_rest_serdes,
./v2/test_rest_debug_api_serdes,
./v2/test_rest_debug_api,
./v2/test_rest_relay_api_serdes,
./v2/test_rest_relay_api_topic_cache,
./v2/test_rest_relay_api,
./v2/test_peer_manager,
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
./v2/test_waku_bridge,

View File

@ -0,0 +1,249 @@
{.used.}
import
std/sequtils,
stew/byteutils,
stew/shims/net,
chronicles,
testutils/unittests,
presto,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub
import
../../waku/v2/node/wakunode2,
../../waku/v2/node/rest/[server, client, utils],
../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache]
proc testWakuNode(): WakuNode =
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(9000)
WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage =
WakuMessage(
payload: payload,
contentTopic: contentTopic,
version: 1,
timestamp: 2022
)
suite "REST API - Relay":
asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions":
# Given
let node = testWakuNode()
await node.start()
node.mountRelay()
let restPort = Port(8546)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(
restAddress,
restPort,
none(string),
none(RestServerConf)
)
let topicCache = TopicCache.init()
installRelayPostSubscriptionsV1Handler(restServer.router, node, topicCache)
restServer.start()
let pubSubTopics = @[
PubSubTopicString("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3")
]
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let requestBody = RelayPostSubscriptionsRequest(pubSubTopics)
let response = await client.relayPostSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
topicCache.isSubscribed("pubsub-topic-1")
topicCache.isSubscribed("pubsub-topic-2")
topicCache.isSubscribed("pubsub-topic-3")
check:
# Node should be subscribed to default + new topics
PubSub(node.wakuRelay).topics.len == 1 + pubSubTopics.len
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions":
# Given
let node = testWakuNode()
await node.start()
node.mountRelay()
let restPort = Port(8546)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(
restAddress,
restPort,
none(string),
none(RestServerConf)
)
let topicCache = TopicCache.init()
topicCache.subscribe("pubsub-topic-1")
topicCache.subscribe("pubsub-topic-2")
topicCache.subscribe("pubsub-topic-3")
topicCache.subscribe("pubsub-topic-x")
installRelayDeleteSubscriptionsV1Handler(restServer.router, node, topicCache)
restServer.start()
let pubSubTopics = @[
PubSubTopicString("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3"),
PubSubTopicString("pubsub-topic-y")
]
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let requestBody = RelayDeleteSubscriptionsRequest(pubSubTopics)
let response = await client.relayDeleteSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not topicCache.isSubscribed("pubsub-topic-1")
not topicCache.isSubscribed("pubsub-topic-2")
not topicCache.isSubscribed("pubsub-topic-3")
topicCache.isSubscribed("pubsub-topic-x")
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}":
# Given
let node = testWakuNode()
await node.start()
node.mountRelay()
let restPort = Port(8546)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(
restAddress,
restPort,
none(string),
none(RestServerConf)
)
let pubSubTopic = "/waku/2/default-waku/proto"
let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]
let topicCache = TopicCache.init()
topicCache.subscribe(pubSubTopic)
for msg in messages:
topicCache.addMessage(pubSubTopic, msg)
installRelayGetMessagesV1Handler(restServer.router, node, topicCache)
restServer.start()
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.relayGetMessagesV1(pubSubTopic)
# Then
check:
response.status == 200
response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: RelayWakuMessage) -> bool:
msg.payload == "TEST-1" and
string(msg.contentTopic.get()) == "content-topic-x" and
msg.version.get() == Natural(1) and
msg.timestamp.get() == int64(2022)
check:
topicCache.isSubscribed(pubSubTopic)
topicCache.getMessages(pubSubTopic).tryGet().len == 0
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}":
## "Relay API: publish and subscribe/unsubscribe":
# Given
let node = testWakuNode()
await node.start()
node.mountRelay()
# RPC server setup
let restPort = Port(8546)
let restAddress = ValidIpAddress.init("0.0.0.0")
let restServer = RestServerRef.init(
restAddress,
restPort,
none(string),
none(RestServerConf)
)
let topicCache = TopicCache.init()
installRelayApiHandlers(restServer.router, node, topicCache)
restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
const defaultContentTopic = ContentTopic("/waku/2/default-content/proto")
# At this stage the node is only subscribed to the default topic
require(PubSub(node.wakuRelay).topics.len == 1)
# When
let newTopics = @[
PubSubTopicString("pubsub-topic-1"),
PubSubTopicString("pubsub-topic-2"),
PubSubTopicString("pubsub-topic-3")
]
discard await client.relayPostSubscriptionsV1(newTopics)
let response = await client.relayPostMessagesV1(defaultTopic, RelayWakuMessage(
payload: "TEST-PAYLOAD",
contentTopic: some(ContentTopicString(defaultContentTopic)),
timestamp: some(int64(2022))
))
# Then
check:
response.status == 200
response.contentType == $MIMETYPE_TEXT
response.data == "OK"
# TODO: Check for the message to be published to the topic
await restServer.stop()
await restServer.closeWait()
await node.stop()

View File

@ -0,0 +1,49 @@
{.used.}
import std/typetraits
import chronicles,
unittest2,
stew/[results, byteutils],
json_serialization
import
../../waku/v2/node/rest/serdes,
../../waku/v2/node/rest/relay/api_types
suite "Relay API - serialization":
suite "RelayWakuMessage - decode":
test "optional fields are not provided":
# Given
let jsonBytes = toBytes("""{ "payload": "MESSAGE" }""")
# When
let res = decodeFromJsonBytes(RelayWakuMessage, jsonBytes, requireAllFields = true)
# Then
require(res.isOk)
let value = res.get()
check:
value.payload == "MESSAGE"
value.contentTopic.isNone
value.version.isNone
value.timestamp.isNone
suite "RelayWakuMessage - encode":
test "optional fields are none":
# Given
let data = RelayWakuMessage(
payload: "MESSAGE",
contentTopic: none(ContentTopicString),
version: none(Natural),
timestamp: none(int64)
)
# When
let res = encodeIntoJsonBytes(data)
# Then
require(res.isOk)
let value = res.get()
check:
value == toBytes("""{"payload":"MESSAGE"}""")

View File

@ -0,0 +1,163 @@
{.used.}
import
std/tables,
stew/byteutils,
stew/shims/net,
chronicles,
testutils/unittests,
presto,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub
import
../../waku/v2/protocol/waku_message,
../../waku/v2/node/rest/relay/topic_cache
proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage =
WakuMessage(
payload: payload,
contentTopic: contentTopic,
version: 1,
timestamp: 2022
)
suite "TopicCache":
test "subscribe to topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
## When
cache.subscribe(testTopic)
## Then
check:
cache.isSubscribed(testTopic)
test "unsubscribe from topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
# Init cache content
cache.subscribe(testTopic)
## When
cache.unsubscribe(testTopic)
## Then
check:
not cache.isSubscribed(testTopic)
test "get messages of a subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
# Init cache content
cache.subscribe(testTopic)
cache.addMessage(testTopic, testMessage)
## When
let res = cache.getMessages(testTopic)
## Then
check:
res.isOk()
res.get() == @[testMessage]
test "get messages with clean flag shoud clear the messages cache":
## Given
let testTopic = "test-pubsub-topic"
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
# Init cache content
cache.subscribe(testTopic)
cache.addMessage(testTopic, testMessage)
## When
var res = cache.getMessages(testTopic, clear=true)
require(res.isOk())
res = cache.getMessages(testTopic)
## Then
check:
res.isOk()
res.get().len == 0
test "get messages of a non-subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let cache = TopicCache.init()
## When
let res = cache.getMessages(testTopic)
## Then
check:
res.isErr()
res.error() == "Not subscribed to topic"
test "add messages to subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
cache.subscribe(testTopic)
## When
cache.addMessage(testTopic, testMessage)
## Then
let messages = cache.getMessages(testTopic).tryGet()
check:
messages == @[testMessage]
test "add messages to non-subscribed topic":
## Given
let testTopic = "test-pubsub-topic"
let testMessage = fakeWakuMessage()
let cache = TopicCache.init()
## When
cache.addMessage(testTopic, testMessage)
## Then
let res = cache.getMessages(testTopic)
check:
res.isErr()
res.error() == "Not subscribed to topic"
test "add messages beyond the capacity":
## Given
let testTopic = "test-pubsub-topic"
let testMessages = @[
fakeWakuMessage(toBytes("MSG-1")),
fakeWakuMessage(toBytes("MSG-2")),
fakeWakuMessage(toBytes("MSG-3"))
]
let cache = TopicCache.init(conf=TopicCacheConfig(capacity: 2))
cache.subscribe(testTopic)
## When
for msg in testMessages:
cache.addMessage(testTopic, msg)
## Then
let messages = cache.getMessages(testTopic).tryGet()
check:
messages == testMessages[1..2]

View File

@ -0,0 +1,121 @@
{.push raises: [ Defect ].}
import
std/[sets, strformat],
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import ".."/serdes
import ../../wakunode2
#### Types
type
PubSubTopicString* = distinct string
ContentTopicString* = distinct string
type RelayWakuMessage* = object
payload*: string
contentTopic*: Option[ContentTopicString]
version*: Option[Natural]
timestamp*: Option[int64]
type
RelayGetMessagesResponse* = seq[RelayWakuMessage]
RelayPostMessagesRequest* = RelayWakuMessage
type
RelayPostSubscriptionsRequest* = seq[PubSubTopicString]
RelayDeleteSubscriptionsRequest* = seq[PubSubTopicString]
#### Type conversion
proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
RelayWakuMessage(
payload: string.fromBytes(msg.payload),
contentTopic: some(ContentTopicString(msg.contentTopic)),
version: some(Natural(msg.version)),
timestamp: some(msg.timestamp)
)
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): WakuMessage =
const defaultContentTopic = ContentTopicString("/waku/2/default-content/proto")
WakuMessage(
payload: msg.payload.toBytes(),
contentTopic: ContentTopic(msg.contentTopic.get(defaultContentTopic)),
version: uint32(msg.version.get(version)),
timestamp: msg.timestamp.get(0)
)
#### Serialization and deserialization
proc writeValue*(writer: var JsonWriter[RestJson], value: PubSubTopicString)
{.raises: [IOError, Defect].} =
writer.writeValue(string(value))
proc writeValue*(writer: var JsonWriter[RestJson], value: ContentTopicString)
{.raises: [IOError, Defect].} =
writer.writeValue(string(value))
proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
{.raises: [IOError, Defect].} =
writer.beginRecord()
writer.writeField("payload", value.payload)
if value.contentTopic.isSome:
writer.writeField("contentTopic", value.contentTopic)
if value.version.isSome:
writer.writeField("version", value.version)
if value.timestamp.isSome:
writer.writeField("timestamp", value.timestamp)
writer.endRecord()
proc readValue*(reader: var JsonReader[RestJson], value: var PubSubTopicString)
{.raises: [SerializationError, IOError, Defect].} =
value = PubSubTopicString(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var ContentTopicString)
{.raises: [SerializationError, IOError, Defect].} =
value = ContentTopicString(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
{.raises: [SerializationError, IOError, Defect].} =
var
payload = none(string)
contentTopic = none(ContentTopicString)
version = none(Natural)
timestamp = none(int64)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "RelayWakuMessage")
case fieldName
of "payload":
payload = some(reader.readValue(string))
of "contentTopic":
contentTopic = some(reader.readValue(ContentTopicString))
of "version":
version = some(reader.readValue(Natural))
of "timestamp":
timestamp = some(reader.readValue(int64))
else:
unrecognizedFieldWarning()
if payload.isNone():
reader.raiseUnexpectedValue("Field `payload` is missing")
value = RelayWakuMessage(
payload: payload.get(),
contentTopic: contentTopic,
version: version,
timestamp: timestamp
)

View File

@ -0,0 +1,200 @@
{.push raises: [Defect].}
import
std/[sets, sequtils],
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import ".."/[serdes, utils]
import ../../wakunode2
import "."/[api_types, topic_cache]
logScope: topics = "rest_api_relay"
##### Topic cache
const futTimeout* = 5.seconds # Max time to wait for futures
#### Request handlers
const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions"
proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics
# debug "post_waku_v2_relay_v1_subscriptions"
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init(contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayPostSubscriptionsRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
let req: RelayPostSubscriptionsRequest = reqResult.get()
for topic in req:
if topicCache.isSubscribed(string(topic)):
# Only subscribe to topics for which we have no subscribed topic handlers yet
continue
topicCache.subscribe(string(topic))
node.subscribe(string(topic), topicCache.messageHandler())
return RestApiResponse.ok()
proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
router.api(MethodDelete, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
# ## Subscribes a node to a list of PubSub topics
# debug "delete_waku_v2_relay_v1_subscriptions"
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init(contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayDeleteSubscriptionsRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
let req: RelayDeleteSubscriptionsRequest = reqResult.get()
# Unsubscribe all handlers from requested topics
for topic in req:
node.unsubscribeAll(string(topic))
topicCache.unsubscribe(string(topic))
# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}"
proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse:
# ## Returns all WakuMessages received on a PubSub topic since the
# ## last time this method was called
# ## TODO: ability to specify a return message limit
# debug "get_waku_v2_relay_v1_messages", topic=topic
if topic.isErr():
return RestApiResponse.badRequest()
let pubSubTopic = topic.get()
let messages = topicCache.getMessages(pubSubTopic, clear=true)
if messages.isErr():
debug "Not subscribed to topic", topic=pubSubTopic
return RestApiResponse.notFound()
let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status=Http200)
if resp.isErr():
debug "An error ocurred while building the json respose", error=resp.error()
return RestApiResponse.internalServerError()
return resp.get()
proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) =
router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
if topic.isErr():
return RestApiResponse.badRequest()
let pubSubTopic = topic.get()
# Check the request body
if contentBody.isNone():
return RestApiResponse.badRequest()
let reqBodyContentType = MediaType.init(contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return RestApiResponse.badRequest()
let reqBodyData = contentBody.get().data
let reqResult = decodeFromJsonBytes(RelayPostMessagesRequest, reqBodyData)
if reqResult.isErr():
return RestApiResponse.badRequest()
let message: RelayPostMessagesRequest = reqResult.get()
if not (waitFor node.publish(pubSubTopic, message.toWakuMessage(version = 0)).withTimeout(futTimeout)):
error "Failed to publish message to topic", topic=pubSubTopic
return RestApiResponse.internalServerError()
return RestApiResponse.ok()
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
installRelayGetMessagesV1Handler(router, node, topicCache)
installRelayPostMessagesV1Handler(router, node)
installRelayPostSubscriptionsV1Handler(router, node, topicCache)
installRelayDeleteSubscriptionsV1Handler(router, node, topicCache)
#### Client
proc encodeBytes*(value: seq[PubSubTopicString],
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: string): RestResult[string] =
if MediaType.init(contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: string): RestResult[RelayGetMessagesResponse] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported respose contentType value", contentType = contentType
return err("Unsupported response contentType")
let decoded = ?decodeFromJsonBytes(RelayGetMessagesResponse, data)
return ok(decoded)
proc encodeBytes*(value: RelayPostMessagesRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.}

View File

@ -0,0 +1,147 @@
openapi: 3.0.3
info:
title: Waku V2 node REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: relay
description: Relay REST API for WakuV2 node
paths:
/relay/v1/messages/{topic}: # Note the plural in messages
get: # get_waku_v2_relay_v1_messages
summary: Get the latest messages on the polled topic
description: Get a list of messages that were received on a subscribed PubSub topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- relay
parameters:
- in: path
name: topic # Note the name is the same as in the path
required: true
schema:
type: string
description: The user ID
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/RelayGetMessagesResponse'
# TODO: Review the possible errors of this endpoint
'5XX':
description: Unexpected error.
post: # post_waku_v2_relay_v1_message
summary: Publish a message to be relayed
description: Publishes a message to be relayed on a PubSub topic.
operationId: postMessagesToTopic
tags:
- relay
parameters:
- in: path
name: topic # Note the name is the same as in the path
description: The messages content topic
required: true
schema:
$ref: '#/components/schemas/RelayPostMessagesRequest'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RelayPostMessagesRequest'
responses:
'200':
description: OK
# TODO: Review the possible errors of this endpoint
'5XX':
description: Unexpected error.
/relay/v1/subscriptions:
post: # post_waku_v2_relay_v1_subscriptions
summary: Subscribe a node to an array of topics
description: Subscribe a node to an array of PubSub topics.
operationId: postSubscriptions
tags:
- relay
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RelayPostSubscriptionsRequest'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
# TODO: Review the possible errors of this endpoint
'5XX':
description: Unexpected error.
delete: # delete_waku_v2_relay_v1_subscriptions
summary: Unsubscribe a node from an array of topics
description: Unsubscribe a node from an array of PubSub topics.
operationId: deleteSubscriptions
tags:
- relay
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RelayDeleteSubscriptionsRequest'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
# TODO: Review the possible errors of this endpoint
'5XX':
description: Unexpected error.
components:
schemas:
PubSubTopic:
type: string
ContentTopic:
type: string
RelayWakuMessage:
type: object
properties:
payload:
type: string
contentTopic:
$ref: '#/components/schemas/ContentTopic'
version:
type: number
timestamp:
type: number
required:
- payload
RelayGetMessagesResponse:
type: array
items:
$ref: '#/components/schemas/RelayWakuMessage'
RelayPostMessagesRequest:
$ref: '#/components/schemas/RelayWakuMessage'
RelayPostSubscriptionsRequest:
type: array
items:
$ref: '#/components/schemas/PubSubTopic'
RelayDeleteSubscriptionsRequest:
type: array
items:
$ref: '#/components/schemas/PubSubTopic'

View File

@ -0,0 +1,108 @@
{.push raises: [Defect].}
import
std/[tables, sequtils],
stew/results,
chronicles,
chronos,
libp2p/protocols/pubsub
import
../../../protocol/waku_message
logScope: topics = "rest_api_relay_topiccache"
const DEFAULT_TOPICCACHE_CAPACITY* = 30 # Max number of messages cached per topic @TODO make this configurable
type PubSubTopicString = string
type TopicCacheResult*[T] = Result[T, cstring]
type TopicCacheMessageHandler* = Topichandler
type TopicCacheConfig* = object
capacity*: int
proc default*(T: type TopicCacheConfig): T =
TopicCacheConfig(
capacity: DEFAULT_TOPICCACHE_CAPACITY
)
type TopicCache* = ref object
conf: TopicCacheConfig
table: Table[PubSubTopicString, seq[WakuMessage]]
func init*(T: type TopicCache, conf=TopicCacheConfig.default()): T =
TopicCache(
conf: conf,
table: initTable[PubSubTopicString, seq[WakuMessage]]()
)
proc isSubscribed*(t: TopicCache, topic: PubSubTopicString): bool =
t.table.hasKey(topic)
proc subscribe*(t: TopicCache, topic: PubSubTopicString) =
if t.isSubscribed(topic):
return
t.table[topic] = @[]
proc unsubscribe*(t: TopicCache, topic: PubSubTopicString) =
if not t.isSubscribed(topic):
return
t.table.del(topic)
proc addMessage*(t: TopicCache, topic: PubSubTopicString, msg: WakuMessage) =
if not t.isSubscribed(topic):
return
# Make a copy of msgs for this topic to modify
var messages = t.table.getOrDefault(topic, @[])
if messages.len >= t.conf.capacity:
debug "Topic cache capacity reached", topic=topic
# Message cache on this topic exceeds maximum. Delete oldest.
# TODO: this may become a bottle neck if called as the norm rather than
# exception when adding messages. Performance profile needed.
messages.delete(0,0)
messages.add(msg)
# Replace indexed entry with copy
t.table[topic] = messages
proc clearMessages*(t: TopicCache, topic: PubSubTopicString) =
if not t.isSubscribed(topic):
return
t.table[topic] = @[]
proc getMessages*(t: TopicCache, topic: PubSubTopicString, clear=false): TopicCacheResult[seq[WakuMessage]] =
if not t.isSubscribed(topic):
return err("Not subscribed to topic")
let messages = t.table.getOrDefault(topic, @[])
if clear:
t.clearMessages(topic)
ok(messages)
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
proc handler(topic: string, data: seq[byte]): Future[void] {.async, raises: [Defect].} =
trace "Topic handler triggered", topic=topic
# Add message to current cache
let msg = WakuMessage.init(data)
if msg.isErr():
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
# TODO: handle message decode failure
return
trace "WakuMessage received", msg=msg, topic=topic
cache.addMessage(PubSubTopicString(topic), msg.get())
handler

View File

@ -69,4 +69,13 @@ proc encodeIntoJsonBytes*(value: auto): SerdesResult[seq[byte]] =
# TODO: Do better error reporting here
return err("unable to serialize data")
ok(encoded)
ok(encoded)
#### helpers
proc encodeString*(value: string): RestResult[string] =
ok(value)
proc decodeString*(t: typedesc[string], value: string): RestResult[string] =
ok(value)

View File

@ -13,10 +13,11 @@ import
proc getRouter(allowedOrigin: Option[string]): RestRouter =
# TODO: Review this `validate` method. Check in nim-presto what is this used for.
proc validate(key: string, value: string): int =
proc validate(pattern: string, value: string): int =
## This is rough validation procedure which should be simple and fast,
## because it will be used for query routing.
1
if pattern.startsWith("{") and pattern.endsWith("}"): 0
else: 1
RestRouter.init(validate, allowedOrigin = allowedOrigin)

View File

@ -9,10 +9,21 @@ import "."/serdes
const MIMETYPE_JSON* = MediaType.init("application/json")
const MIMETYPE_TEXT* = MediaType.init("text/plain")
proc jsonResponse*(t: typedesc[RestApiResponse], data: auto, status: HttpCode = Http200): SerdesResult[RestApiResponse] =
let encoded = ?encodeIntoJsonBytes(data)
ok(RestApiResponse.response(encoded, status, $MIMETYPE_JSON))
proc internalServerError*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.error(Http500)
RestApiResponse.error(Http500)
proc ok*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.response("OK", status=Http200, contentType="text/plain")
proc badRequest*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.error(Http400)
proc notFound*(t: typedesc[RestApiResponse]): RestApiResponse =
RestApiResponse.error(Http404)