mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 15:16:05 +00:00
deploy: cf1a7c30746c82a2df03e8bda1291e674ec20d4e
This commit is contained in:
parent
38736b0de4
commit
85e2d43582
@ -13,6 +13,9 @@ import
|
||||
./v2/test_rest_serdes,
|
||||
./v2/test_rest_debug_api_serdes,
|
||||
./v2/test_rest_debug_api,
|
||||
./v2/test_rest_relay_api_serdes,
|
||||
./v2/test_rest_relay_api_topic_cache,
|
||||
./v2/test_rest_relay_api,
|
||||
./v2/test_peer_manager,
|
||||
./v2/test_web3, # TODO remove it when rln-relay tests get finalized
|
||||
./v2/test_waku_bridge,
|
||||
|
249
tests/v2/test_rest_relay_api.nim
Normal file
249
tests/v2/test_rest_relay_api.nim
Normal file
@ -0,0 +1,249 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/sequtils,
|
||||
stew/byteutils,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
testutils/unittests,
|
||||
presto,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/pubsub
|
||||
import
|
||||
../../waku/v2/node/wakunode2,
|
||||
../../waku/v2/node/rest/[server, client, utils],
|
||||
../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache]
|
||||
|
||||
|
||||
proc testWakuNode(): WakuNode =
|
||||
let
|
||||
rng = crypto.newRng()
|
||||
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
|
||||
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||
extIp = ValidIpAddress.init("127.0.0.1")
|
||||
port = Port(9000)
|
||||
|
||||
WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
|
||||
|
||||
proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage =
|
||||
WakuMessage(
|
||||
payload: payload,
|
||||
contentTopic: contentTopic,
|
||||
version: 1,
|
||||
timestamp: 2022
|
||||
)
|
||||
|
||||
|
||||
suite "REST API - Relay":
|
||||
asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
node.mountRelay()
|
||||
|
||||
let restPort = Port(8546)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(
|
||||
restAddress,
|
||||
restPort,
|
||||
none(string),
|
||||
none(RestServerConf)
|
||||
)
|
||||
|
||||
let topicCache = TopicCache.init()
|
||||
|
||||
installRelayPostSubscriptionsV1Handler(restServer.router, node, topicCache)
|
||||
restServer.start()
|
||||
|
||||
let pubSubTopics = @[
|
||||
PubSubTopicString("pubsub-topic-1"),
|
||||
PubSubTopicString("pubsub-topic-2"),
|
||||
PubSubTopicString("pubsub-topic-3")
|
||||
]
|
||||
|
||||
# When
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
let requestBody = RelayPostSubscriptionsRequest(pubSubTopics)
|
||||
let response = await client.relayPostSubscriptionsV1(requestBody)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response.status == 200
|
||||
response.contentType == $MIMETYPE_TEXT
|
||||
response.data == "OK"
|
||||
|
||||
check:
|
||||
topicCache.isSubscribed("pubsub-topic-1")
|
||||
topicCache.isSubscribed("pubsub-topic-2")
|
||||
topicCache.isSubscribed("pubsub-topic-3")
|
||||
|
||||
check:
|
||||
# Node should be subscribed to default + new topics
|
||||
PubSub(node.wakuRelay).topics.len == 1 + pubSubTopics.len
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
node.mountRelay()
|
||||
|
||||
let restPort = Port(8546)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(
|
||||
restAddress,
|
||||
restPort,
|
||||
none(string),
|
||||
none(RestServerConf)
|
||||
)
|
||||
|
||||
let topicCache = TopicCache.init()
|
||||
topicCache.subscribe("pubsub-topic-1")
|
||||
topicCache.subscribe("pubsub-topic-2")
|
||||
topicCache.subscribe("pubsub-topic-3")
|
||||
topicCache.subscribe("pubsub-topic-x")
|
||||
|
||||
installRelayDeleteSubscriptionsV1Handler(restServer.router, node, topicCache)
|
||||
restServer.start()
|
||||
|
||||
let pubSubTopics = @[
|
||||
PubSubTopicString("pubsub-topic-1"),
|
||||
PubSubTopicString("pubsub-topic-2"),
|
||||
PubSubTopicString("pubsub-topic-3"),
|
||||
PubSubTopicString("pubsub-topic-y")
|
||||
]
|
||||
|
||||
# When
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
let requestBody = RelayDeleteSubscriptionsRequest(pubSubTopics)
|
||||
let response = await client.relayDeleteSubscriptionsV1(requestBody)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response.status == 200
|
||||
response.contentType == $MIMETYPE_TEXT
|
||||
response.data == "OK"
|
||||
|
||||
check:
|
||||
not topicCache.isSubscribed("pubsub-topic-1")
|
||||
not topicCache.isSubscribed("pubsub-topic-2")
|
||||
not topicCache.isSubscribed("pubsub-topic-3")
|
||||
topicCache.isSubscribed("pubsub-topic-x")
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
|
||||
asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
node.mountRelay()
|
||||
|
||||
let restPort = Port(8546)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(
|
||||
restAddress,
|
||||
restPort,
|
||||
none(string),
|
||||
none(RestServerConf)
|
||||
)
|
||||
|
||||
let pubSubTopic = "/waku/2/default-waku/proto"
|
||||
let messages = @[
|
||||
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
|
||||
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
|
||||
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
|
||||
]
|
||||
|
||||
let topicCache = TopicCache.init()
|
||||
|
||||
topicCache.subscribe(pubSubTopic)
|
||||
for msg in messages:
|
||||
topicCache.addMessage(pubSubTopic, msg)
|
||||
|
||||
installRelayGetMessagesV1Handler(restServer.router, node, topicCache)
|
||||
restServer.start()
|
||||
|
||||
# When
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
let response = await client.relayGetMessagesV1(pubSubTopic)
|
||||
|
||||
# Then
|
||||
check:
|
||||
response.status == 200
|
||||
response.contentType == $MIMETYPE_JSON
|
||||
response.data.len == 3
|
||||
response.data.all do (msg: RelayWakuMessage) -> bool:
|
||||
msg.payload == "TEST-1" and
|
||||
string(msg.contentTopic.get()) == "content-topic-x" and
|
||||
msg.version.get() == Natural(1) and
|
||||
msg.timestamp.get() == int64(2022)
|
||||
|
||||
|
||||
check:
|
||||
topicCache.isSubscribed(pubSubTopic)
|
||||
topicCache.getMessages(pubSubTopic).tryGet().len == 0
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
||||
|
||||
asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}":
|
||||
## "Relay API: publish and subscribe/unsubscribe":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
node.mountRelay()
|
||||
|
||||
# RPC server setup
|
||||
let restPort = Port(8546)
|
||||
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||
let restServer = RestServerRef.init(
|
||||
restAddress,
|
||||
restPort,
|
||||
none(string),
|
||||
none(RestServerConf)
|
||||
)
|
||||
|
||||
let topicCache = TopicCache.init()
|
||||
|
||||
installRelayApiHandlers(restServer.router, node, topicCache)
|
||||
restServer.start()
|
||||
|
||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||
const defaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
# At this stage the node is only subscribed to the default topic
|
||||
require(PubSub(node.wakuRelay).topics.len == 1)
|
||||
|
||||
|
||||
# When
|
||||
let newTopics = @[
|
||||
PubSubTopicString("pubsub-topic-1"),
|
||||
PubSubTopicString("pubsub-topic-2"),
|
||||
PubSubTopicString("pubsub-topic-3")
|
||||
]
|
||||
discard await client.relayPostSubscriptionsV1(newTopics)
|
||||
|
||||
let response = await client.relayPostMessagesV1(defaultTopic, RelayWakuMessage(
|
||||
payload: "TEST-PAYLOAD",
|
||||
contentTopic: some(ContentTopicString(defaultContentTopic)),
|
||||
timestamp: some(int64(2022))
|
||||
))
|
||||
|
||||
# Then
|
||||
check:
|
||||
response.status == 200
|
||||
response.contentType == $MIMETYPE_TEXT
|
||||
response.data == "OK"
|
||||
|
||||
# TODO: Check for the message to be published to the topic
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
await node.stop()
|
49
tests/v2/test_rest_relay_api_serdes.nim
Normal file
49
tests/v2/test_rest_relay_api_serdes.nim
Normal file
@ -0,0 +1,49 @@
|
||||
{.used.}
|
||||
|
||||
import std/typetraits
|
||||
import chronicles,
|
||||
unittest2,
|
||||
stew/[results, byteutils],
|
||||
json_serialization
|
||||
import
|
||||
../../waku/v2/node/rest/serdes,
|
||||
../../waku/v2/node/rest/relay/api_types
|
||||
|
||||
|
||||
suite "Relay API - serialization":
|
||||
|
||||
suite "RelayWakuMessage - decode":
|
||||
test "optional fields are not provided":
|
||||
# Given
|
||||
let jsonBytes = toBytes("""{ "payload": "MESSAGE" }""")
|
||||
|
||||
# When
|
||||
let res = decodeFromJsonBytes(RelayWakuMessage, jsonBytes, requireAllFields = true)
|
||||
|
||||
# Then
|
||||
require(res.isOk)
|
||||
let value = res.get()
|
||||
check:
|
||||
value.payload == "MESSAGE"
|
||||
value.contentTopic.isNone
|
||||
value.version.isNone
|
||||
value.timestamp.isNone
|
||||
|
||||
suite "RelayWakuMessage - encode":
|
||||
test "optional fields are none":
|
||||
# Given
|
||||
let data = RelayWakuMessage(
|
||||
payload: "MESSAGE",
|
||||
contentTopic: none(ContentTopicString),
|
||||
version: none(Natural),
|
||||
timestamp: none(int64)
|
||||
)
|
||||
|
||||
# When
|
||||
let res = encodeIntoJsonBytes(data)
|
||||
|
||||
# Then
|
||||
require(res.isOk)
|
||||
let value = res.get()
|
||||
check:
|
||||
value == toBytes("""{"payload":"MESSAGE"}""")
|
163
tests/v2/test_rest_relay_api_topic_cache.nim
Normal file
163
tests/v2/test_rest_relay_api_topic_cache.nim
Normal file
@ -0,0 +1,163 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/tables,
|
||||
stew/byteutils,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
testutils/unittests,
|
||||
presto,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/pubsub
|
||||
import
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/node/rest/relay/topic_cache
|
||||
|
||||
|
||||
proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage =
|
||||
WakuMessage(
|
||||
payload: payload,
|
||||
contentTopic: contentTopic,
|
||||
version: 1,
|
||||
timestamp: 2022
|
||||
)
|
||||
|
||||
|
||||
suite "TopicCache":
|
||||
test "subscribe to topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let cache = TopicCache.init()
|
||||
|
||||
## When
|
||||
cache.subscribe(testTopic)
|
||||
|
||||
## Then
|
||||
check:
|
||||
cache.isSubscribed(testTopic)
|
||||
|
||||
|
||||
test "unsubscribe from topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let cache = TopicCache.init()
|
||||
|
||||
# Init cache content
|
||||
cache.subscribe(testTopic)
|
||||
|
||||
## When
|
||||
cache.unsubscribe(testTopic)
|
||||
|
||||
## Then
|
||||
check:
|
||||
not cache.isSubscribed(testTopic)
|
||||
|
||||
|
||||
test "get messages of a subscribed topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testMessage = fakeWakuMessage()
|
||||
let cache = TopicCache.init()
|
||||
|
||||
# Init cache content
|
||||
cache.subscribe(testTopic)
|
||||
cache.addMessage(testTopic, testMessage)
|
||||
|
||||
## When
|
||||
let res = cache.getMessages(testTopic)
|
||||
|
||||
## Then
|
||||
check:
|
||||
res.isOk()
|
||||
res.get() == @[testMessage]
|
||||
|
||||
|
||||
test "get messages with clean flag shoud clear the messages cache":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testMessage = fakeWakuMessage()
|
||||
let cache = TopicCache.init()
|
||||
|
||||
# Init cache content
|
||||
cache.subscribe(testTopic)
|
||||
cache.addMessage(testTopic, testMessage)
|
||||
|
||||
## When
|
||||
var res = cache.getMessages(testTopic, clear=true)
|
||||
require(res.isOk())
|
||||
|
||||
res = cache.getMessages(testTopic)
|
||||
|
||||
## Then
|
||||
check:
|
||||
res.isOk()
|
||||
res.get().len == 0
|
||||
|
||||
|
||||
test "get messages of a non-subscribed topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let cache = TopicCache.init()
|
||||
|
||||
## When
|
||||
let res = cache.getMessages(testTopic)
|
||||
|
||||
## Then
|
||||
check:
|
||||
res.isErr()
|
||||
res.error() == "Not subscribed to topic"
|
||||
|
||||
|
||||
test "add messages to subscribed topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testMessage = fakeWakuMessage()
|
||||
let cache = TopicCache.init()
|
||||
|
||||
cache.subscribe(testTopic)
|
||||
|
||||
## When
|
||||
cache.addMessage(testTopic, testMessage)
|
||||
|
||||
## Then
|
||||
let messages = cache.getMessages(testTopic).tryGet()
|
||||
check:
|
||||
messages == @[testMessage]
|
||||
|
||||
|
||||
test "add messages to non-subscribed topic":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testMessage = fakeWakuMessage()
|
||||
let cache = TopicCache.init()
|
||||
|
||||
## When
|
||||
cache.addMessage(testTopic, testMessage)
|
||||
|
||||
## Then
|
||||
let res = cache.getMessages(testTopic)
|
||||
check:
|
||||
res.isErr()
|
||||
res.error() == "Not subscribed to topic"
|
||||
|
||||
|
||||
test "add messages beyond the capacity":
|
||||
## Given
|
||||
let testTopic = "test-pubsub-topic"
|
||||
let testMessages = @[
|
||||
fakeWakuMessage(toBytes("MSG-1")),
|
||||
fakeWakuMessage(toBytes("MSG-2")),
|
||||
fakeWakuMessage(toBytes("MSG-3"))
|
||||
]
|
||||
|
||||
let cache = TopicCache.init(conf=TopicCacheConfig(capacity: 2))
|
||||
cache.subscribe(testTopic)
|
||||
|
||||
## When
|
||||
for msg in testMessages:
|
||||
cache.addMessage(testTopic, msg)
|
||||
|
||||
## Then
|
||||
let messages = cache.getMessages(testTopic).tryGet()
|
||||
check:
|
||||
messages == testMessages[1..2]
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az462-196:
|
||||
# Libtool was configured on host fv-az504-494:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
121
waku/v2/node/rest/relay/api_types.nim
Normal file
121
waku/v2/node/rest/relay/api_types.nim
Normal file
@ -0,0 +1,121 @@
|
||||
{.push raises: [ Defect ].}
|
||||
|
||||
import
|
||||
std/[sets, strformat],
|
||||
stew/byteutils,
|
||||
chronicles,
|
||||
json_serialization,
|
||||
json_serialization/std/options,
|
||||
presto/[route, client, common]
|
||||
import ".."/serdes
|
||||
import ../../wakunode2
|
||||
|
||||
|
||||
#### Types
|
||||
|
||||
type
|
||||
PubSubTopicString* = distinct string
|
||||
ContentTopicString* = distinct string
|
||||
|
||||
type RelayWakuMessage* = object
|
||||
payload*: string
|
||||
contentTopic*: Option[ContentTopicString]
|
||||
version*: Option[Natural]
|
||||
timestamp*: Option[int64]
|
||||
|
||||
|
||||
type
|
||||
RelayGetMessagesResponse* = seq[RelayWakuMessage]
|
||||
RelayPostMessagesRequest* = RelayWakuMessage
|
||||
|
||||
type
|
||||
RelayPostSubscriptionsRequest* = seq[PubSubTopicString]
|
||||
RelayDeleteSubscriptionsRequest* = seq[PubSubTopicString]
|
||||
|
||||
|
||||
#### Type conversion
|
||||
|
||||
proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage =
|
||||
RelayWakuMessage(
|
||||
payload: string.fromBytes(msg.payload),
|
||||
contentTopic: some(ContentTopicString(msg.contentTopic)),
|
||||
version: some(Natural(msg.version)),
|
||||
timestamp: some(msg.timestamp)
|
||||
)
|
||||
|
||||
proc toWakuMessage*(msg: RelayWakuMessage, version = 0): WakuMessage =
|
||||
const defaultContentTopic = ContentTopicString("/waku/2/default-content/proto")
|
||||
WakuMessage(
|
||||
payload: msg.payload.toBytes(),
|
||||
contentTopic: ContentTopic(msg.contentTopic.get(defaultContentTopic)),
|
||||
version: uint32(msg.version.get(version)),
|
||||
timestamp: msg.timestamp.get(0)
|
||||
)
|
||||
|
||||
#### Serialization and deserialization
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson], value: PubSubTopicString)
|
||||
{.raises: [IOError, Defect].} =
|
||||
writer.writeValue(string(value))
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson], value: ContentTopicString)
|
||||
{.raises: [IOError, Defect].} =
|
||||
writer.writeValue(string(value))
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage)
|
||||
{.raises: [IOError, Defect].} =
|
||||
writer.beginRecord()
|
||||
writer.writeField("payload", value.payload)
|
||||
if value.contentTopic.isSome:
|
||||
writer.writeField("contentTopic", value.contentTopic)
|
||||
if value.version.isSome:
|
||||
writer.writeField("version", value.version)
|
||||
if value.timestamp.isSome:
|
||||
writer.writeField("timestamp", value.timestamp)
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson], value: var PubSubTopicString)
|
||||
{.raises: [SerializationError, IOError, Defect].} =
|
||||
value = PubSubTopicString(reader.readValue(string))
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson], value: var ContentTopicString)
|
||||
{.raises: [SerializationError, IOError, Defect].} =
|
||||
value = ContentTopicString(reader.readValue(string))
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage)
|
||||
{.raises: [SerializationError, IOError, Defect].} =
|
||||
var
|
||||
payload = none(string)
|
||||
contentTopic = none(ContentTopicString)
|
||||
version = none(Natural)
|
||||
timestamp = none(int64)
|
||||
|
||||
var keys = initHashSet[string]()
|
||||
for fieldName in readObjectFields(reader):
|
||||
# Check for reapeated keys
|
||||
if keys.containsOrIncl(fieldName):
|
||||
let err = try: fmt"Multiple `{fieldName}` fields found"
|
||||
except: "Multiple fields with the same name found"
|
||||
reader.raiseUnexpectedField(err, "RelayWakuMessage")
|
||||
|
||||
case fieldName
|
||||
of "payload":
|
||||
payload = some(reader.readValue(string))
|
||||
of "contentTopic":
|
||||
contentTopic = some(reader.readValue(ContentTopicString))
|
||||
of "version":
|
||||
version = some(reader.readValue(Natural))
|
||||
of "timestamp":
|
||||
timestamp = some(reader.readValue(int64))
|
||||
else:
|
||||
unrecognizedFieldWarning()
|
||||
|
||||
if payload.isNone():
|
||||
reader.raiseUnexpectedValue("Field `payload` is missing")
|
||||
|
||||
value = RelayWakuMessage(
|
||||
payload: payload.get(),
|
||||
contentTopic: contentTopic,
|
||||
version: version,
|
||||
timestamp: timestamp
|
||||
)
|
200
waku/v2/node/rest/relay/relay_api.nim
Normal file
200
waku/v2/node/rest/relay/relay_api.nim
Normal file
@ -0,0 +1,200 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[sets, sequtils],
|
||||
stew/byteutils,
|
||||
chronicles,
|
||||
json_serialization,
|
||||
json_serialization/std/options,
|
||||
presto/[route, client, common]
|
||||
import ".."/[serdes, utils]
|
||||
import ../../wakunode2
|
||||
import "."/[api_types, topic_cache]
|
||||
|
||||
logScope: topics = "rest_api_relay"
|
||||
|
||||
|
||||
##### Topic cache
|
||||
|
||||
const futTimeout* = 5.seconds # Max time to wait for futures
|
||||
|
||||
|
||||
#### Request handlers
|
||||
|
||||
const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions"
|
||||
|
||||
proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
|
||||
|
||||
router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
|
||||
# ## Subscribes a node to a list of PubSub topics
|
||||
# debug "post_waku_v2_relay_v1_subscriptions"
|
||||
|
||||
# Check the request body
|
||||
if contentBody.isNone():
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let reqBodyContentType = MediaType.init(contentBody.get().contentType)
|
||||
if reqBodyContentType != MIMETYPE_JSON:
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let reqBodyData = contentBody.get().data
|
||||
let reqResult = decodeFromJsonBytes(RelayPostSubscriptionsRequest, reqBodyData)
|
||||
if reqResult.isErr():
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let req: RelayPostSubscriptionsRequest = reqResult.get()
|
||||
|
||||
for topic in req:
|
||||
if topicCache.isSubscribed(string(topic)):
|
||||
# Only subscribe to topics for which we have no subscribed topic handlers yet
|
||||
continue
|
||||
|
||||
topicCache.subscribe(string(topic))
|
||||
node.subscribe(string(topic), topicCache.messageHandler())
|
||||
|
||||
return RestApiResponse.ok()
|
||||
|
||||
|
||||
proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
|
||||
router.api(MethodDelete, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
|
||||
# ## Subscribes a node to a list of PubSub topics
|
||||
# debug "delete_waku_v2_relay_v1_subscriptions"
|
||||
|
||||
# Check the request body
|
||||
if contentBody.isNone():
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let reqBodyContentType = MediaType.init(contentBody.get().contentType)
|
||||
if reqBodyContentType != MIMETYPE_JSON:
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let reqBodyData = contentBody.get().data
|
||||
let reqResult = decodeFromJsonBytes(RelayDeleteSubscriptionsRequest, reqBodyData)
|
||||
if reqResult.isErr():
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let req: RelayDeleteSubscriptionsRequest = reqResult.get()
|
||||
|
||||
# Unsubscribe all handlers from requested topics
|
||||
for topic in req:
|
||||
node.unsubscribeAll(string(topic))
|
||||
topicCache.unsubscribe(string(topic))
|
||||
|
||||
# Successfully unsubscribed from all requested topics
|
||||
return RestApiResponse.ok()
|
||||
|
||||
|
||||
const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}"
|
||||
|
||||
proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
|
||||
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse:
|
||||
# ## Returns all WakuMessages received on a PubSub topic since the
|
||||
# ## last time this method was called
|
||||
# ## TODO: ability to specify a return message limit
|
||||
# debug "get_waku_v2_relay_v1_messages", topic=topic
|
||||
|
||||
if topic.isErr():
|
||||
return RestApiResponse.badRequest()
|
||||
let pubSubTopic = topic.get()
|
||||
|
||||
let messages = topicCache.getMessages(pubSubTopic, clear=true)
|
||||
if messages.isErr():
|
||||
debug "Not subscribed to topic", topic=pubSubTopic
|
||||
return RestApiResponse.notFound()
|
||||
|
||||
let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage))
|
||||
let resp = RestApiResponse.jsonResponse(data, status=Http200)
|
||||
if resp.isErr():
|
||||
debug "An error ocurred while building the json respose", error=resp.error()
|
||||
return RestApiResponse.internalServerError()
|
||||
|
||||
return resp.get()
|
||||
|
||||
proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) =
|
||||
router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
|
||||
|
||||
if topic.isErr():
|
||||
return RestApiResponse.badRequest()
|
||||
let pubSubTopic = topic.get()
|
||||
|
||||
# Check the request body
|
||||
if contentBody.isNone():
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let reqBodyContentType = MediaType.init(contentBody.get().contentType)
|
||||
if reqBodyContentType != MIMETYPE_JSON:
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let reqBodyData = contentBody.get().data
|
||||
let reqResult = decodeFromJsonBytes(RelayPostMessagesRequest, reqBodyData)
|
||||
if reqResult.isErr():
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
let message: RelayPostMessagesRequest = reqResult.get()
|
||||
|
||||
if not (waitFor node.publish(pubSubTopic, message.toWakuMessage(version = 0)).withTimeout(futTimeout)):
|
||||
error "Failed to publish message to topic", topic=pubSubTopic
|
||||
return RestApiResponse.internalServerError()
|
||||
|
||||
return RestApiResponse.ok()
|
||||
|
||||
|
||||
proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) =
|
||||
installRelayGetMessagesV1Handler(router, node, topicCache)
|
||||
installRelayPostMessagesV1Handler(router, node)
|
||||
installRelayPostSubscriptionsV1Handler(router, node, topicCache)
|
||||
installRelayDeleteSubscriptionsV1Handler(router, node, topicCache)
|
||||
|
||||
|
||||
#### Client
|
||||
|
||||
proc encodeBytes*(value: seq[PubSubTopicString],
|
||||
contentType: string): RestResult[seq[byte]] =
|
||||
if MediaType.init(contentType) != MIMETYPE_JSON:
|
||||
error "Unsupported contentType value", contentType = contentType
|
||||
return err("Unsupported contentType")
|
||||
|
||||
let encoded = ?encodeIntoJsonBytes(value)
|
||||
return ok(encoded)
|
||||
|
||||
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
|
||||
contentType: string): RestResult[string] =
|
||||
if MediaType.init(contentType) != MIMETYPE_TEXT:
|
||||
error "Unsupported contentType value", contentType = contentType
|
||||
return err("Unsupported contentType")
|
||||
|
||||
var res: string
|
||||
if len(value) > 0:
|
||||
res = newString(len(value))
|
||||
copyMem(addr res[0], unsafeAddr value[0], len(value))
|
||||
return ok(res)
|
||||
|
||||
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
|
||||
proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.}
|
||||
|
||||
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
|
||||
proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.}
|
||||
|
||||
|
||||
proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: string): RestResult[RelayGetMessagesResponse] =
|
||||
if MediaType.init(contentType) != MIMETYPE_JSON:
|
||||
error "Unsupported respose contentType value", contentType = contentType
|
||||
return err("Unsupported response contentType")
|
||||
|
||||
let decoded = ?decodeFromJsonBytes(RelayGetMessagesResponse, data)
|
||||
return ok(decoded)
|
||||
|
||||
proc encodeBytes*(value: RelayPostMessagesRequest,
|
||||
contentType: string): RestResult[seq[byte]] =
|
||||
if MediaType.init(contentType) != MIMETYPE_JSON:
|
||||
error "Unsupported contentType value", contentType = contentType
|
||||
return err("Unsupported contentType")
|
||||
|
||||
let encoded = ?encodeIntoJsonBytes(value)
|
||||
return ok(encoded)
|
||||
|
||||
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
|
||||
proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.}
|
||||
|
||||
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
|
||||
proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.}
|
147
waku/v2/node/rest/relay/relay_api.yaml
Normal file
147
waku/v2/node/rest/relay/relay_api.yaml
Normal file
@ -0,0 +1,147 @@
|
||||
openapi: 3.0.3
|
||||
info:
|
||||
title: Waku V2 node REST API
|
||||
version: 1.0.0
|
||||
contact:
|
||||
name: VAC Team
|
||||
url: https://forum.vac.dev/
|
||||
|
||||
tags:
|
||||
- name: relay
|
||||
description: Relay REST API for WakuV2 node
|
||||
|
||||
paths:
|
||||
/relay/v1/messages/{topic}: # Note the plural in messages
|
||||
get: # get_waku_v2_relay_v1_messages
|
||||
summary: Get the latest messages on the polled topic
|
||||
description: Get a list of messages that were received on a subscribed PubSub topic after the last time this method was called.
|
||||
operationId: getMessagesByTopic
|
||||
tags:
|
||||
- relay
|
||||
parameters:
|
||||
- in: path
|
||||
name: topic # Note the name is the same as in the path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
description: The user ID
|
||||
responses:
|
||||
'200':
|
||||
description: The latest messages on the polled topic.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RelayGetMessagesResponse'
|
||||
# TODO: Review the possible errors of this endpoint
|
||||
'5XX':
|
||||
description: Unexpected error.
|
||||
|
||||
post: # post_waku_v2_relay_v1_message
|
||||
summary: Publish a message to be relayed
|
||||
description: Publishes a message to be relayed on a PubSub topic.
|
||||
operationId: postMessagesToTopic
|
||||
tags:
|
||||
- relay
|
||||
parameters:
|
||||
- in: path
|
||||
name: topic # Note the name is the same as in the path
|
||||
description: The messages content topic
|
||||
required: true
|
||||
schema:
|
||||
$ref: '#/components/schemas/RelayPostMessagesRequest'
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RelayPostMessagesRequest'
|
||||
responses:
|
||||
'200':
|
||||
description: OK
|
||||
# TODO: Review the possible errors of this endpoint
|
||||
'5XX':
|
||||
description: Unexpected error.
|
||||
|
||||
/relay/v1/subscriptions:
|
||||
post: # post_waku_v2_relay_v1_subscriptions
|
||||
summary: Subscribe a node to an array of topics
|
||||
description: Subscribe a node to an array of PubSub topics.
|
||||
operationId: postSubscriptions
|
||||
tags:
|
||||
- relay
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RelayPostSubscriptionsRequest'
|
||||
responses:
|
||||
'200':
|
||||
description: OK
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
# TODO: Review the possible errors of this endpoint
|
||||
'5XX':
|
||||
description: Unexpected error.
|
||||
|
||||
delete: # delete_waku_v2_relay_v1_subscriptions
|
||||
summary: Unsubscribe a node from an array of topics
|
||||
description: Unsubscribe a node from an array of PubSub topics.
|
||||
operationId: deleteSubscriptions
|
||||
tags:
|
||||
- relay
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/RelayDeleteSubscriptionsRequest'
|
||||
responses:
|
||||
'200':
|
||||
description: OK
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
# TODO: Review the possible errors of this endpoint
|
||||
'5XX':
|
||||
description: Unexpected error.
|
||||
|
||||
components:
|
||||
schemas:
|
||||
PubSubTopic:
|
||||
type: string
|
||||
ContentTopic:
|
||||
type: string
|
||||
|
||||
RelayWakuMessage:
|
||||
type: object
|
||||
properties:
|
||||
payload:
|
||||
type: string
|
||||
contentTopic:
|
||||
$ref: '#/components/schemas/ContentTopic'
|
||||
version:
|
||||
type: number
|
||||
timestamp:
|
||||
type: number
|
||||
required:
|
||||
- payload
|
||||
|
||||
RelayGetMessagesResponse:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/RelayWakuMessage'
|
||||
|
||||
RelayPostMessagesRequest:
|
||||
$ref: '#/components/schemas/RelayWakuMessage'
|
||||
|
||||
RelayPostSubscriptionsRequest:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/PubSubTopic'
|
||||
|
||||
RelayDeleteSubscriptionsRequest:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/PubSubTopic'
|
||||
|
108
waku/v2/node/rest/relay/topic_cache.nim
Normal file
108
waku/v2/node/rest/relay/topic_cache.nim
Normal file
@ -0,0 +1,108 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils],
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/protocols/pubsub
|
||||
import
|
||||
../../../protocol/waku_message
|
||||
|
||||
logScope: topics = "rest_api_relay_topiccache"
|
||||
|
||||
const DEFAULT_TOPICCACHE_CAPACITY* = 30 # Max number of messages cached per topic @TODO make this configurable
|
||||
|
||||
|
||||
type PubSubTopicString = string
|
||||
|
||||
type TopicCacheResult*[T] = Result[T, cstring]
|
||||
|
||||
type TopicCacheMessageHandler* = Topichandler
|
||||
|
||||
|
||||
type TopicCacheConfig* = object
|
||||
capacity*: int
|
||||
|
||||
proc default*(T: type TopicCacheConfig): T =
|
||||
TopicCacheConfig(
|
||||
capacity: DEFAULT_TOPICCACHE_CAPACITY
|
||||
)
|
||||
|
||||
|
||||
type TopicCache* = ref object
|
||||
conf: TopicCacheConfig
|
||||
table: Table[PubSubTopicString, seq[WakuMessage]]
|
||||
|
||||
func init*(T: type TopicCache, conf=TopicCacheConfig.default()): T =
|
||||
TopicCache(
|
||||
conf: conf,
|
||||
table: initTable[PubSubTopicString, seq[WakuMessage]]()
|
||||
)
|
||||
|
||||
|
||||
proc isSubscribed*(t: TopicCache, topic: PubSubTopicString): bool =
|
||||
t.table.hasKey(topic)
|
||||
|
||||
proc subscribe*(t: TopicCache, topic: PubSubTopicString) =
|
||||
if t.isSubscribed(topic):
|
||||
return
|
||||
t.table[topic] = @[]
|
||||
|
||||
proc unsubscribe*(t: TopicCache, topic: PubSubTopicString) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
t.table.del(topic)
|
||||
|
||||
|
||||
proc addMessage*(t: TopicCache, topic: PubSubTopicString, msg: WakuMessage) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
|
||||
# Make a copy of msgs for this topic to modify
|
||||
var messages = t.table.getOrDefault(topic, @[])
|
||||
|
||||
if messages.len >= t.conf.capacity:
|
||||
debug "Topic cache capacity reached", topic=topic
|
||||
# Message cache on this topic exceeds maximum. Delete oldest.
|
||||
# TODO: this may become a bottle neck if called as the norm rather than
|
||||
# exception when adding messages. Performance profile needed.
|
||||
messages.delete(0,0)
|
||||
|
||||
messages.add(msg)
|
||||
|
||||
# Replace indexed entry with copy
|
||||
t.table[topic] = messages
|
||||
|
||||
proc clearMessages*(t: TopicCache, topic: PubSubTopicString) =
|
||||
if not t.isSubscribed(topic):
|
||||
return
|
||||
t.table[topic] = @[]
|
||||
|
||||
proc getMessages*(t: TopicCache, topic: PubSubTopicString, clear=false): TopicCacheResult[seq[WakuMessage]] =
|
||||
if not t.isSubscribed(topic):
|
||||
return err("Not subscribed to topic")
|
||||
|
||||
let messages = t.table.getOrDefault(topic, @[])
|
||||
if clear:
|
||||
t.clearMessages(topic)
|
||||
|
||||
ok(messages)
|
||||
|
||||
|
||||
proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler =
|
||||
|
||||
proc handler(topic: string, data: seq[byte]): Future[void] {.async, raises: [Defect].} =
|
||||
trace "Topic handler triggered", topic=topic
|
||||
|
||||
# Add message to current cache
|
||||
let msg = WakuMessage.init(data)
|
||||
if msg.isErr():
|
||||
debug "WakuMessage received but failed to decode", msg=msg, topic=topic
|
||||
# TODO: handle message decode failure
|
||||
return
|
||||
|
||||
trace "WakuMessage received", msg=msg, topic=topic
|
||||
cache.addMessage(PubSubTopicString(topic), msg.get())
|
||||
|
||||
handler
|
@ -69,4 +69,13 @@ proc encodeIntoJsonBytes*(value: auto): SerdesResult[seq[byte]] =
|
||||
# TODO: Do better error reporting here
|
||||
return err("unable to serialize data")
|
||||
|
||||
ok(encoded)
|
||||
ok(encoded)
|
||||
|
||||
|
||||
#### helpers
|
||||
|
||||
proc encodeString*(value: string): RestResult[string] =
|
||||
ok(value)
|
||||
|
||||
proc decodeString*(t: typedesc[string], value: string): RestResult[string] =
|
||||
ok(value)
|
||||
|
@ -13,10 +13,11 @@ import
|
||||
|
||||
proc getRouter(allowedOrigin: Option[string]): RestRouter =
|
||||
# TODO: Review this `validate` method. Check in nim-presto what is this used for.
|
||||
proc validate(key: string, value: string): int =
|
||||
proc validate(pattern: string, value: string): int =
|
||||
## This is rough validation procedure which should be simple and fast,
|
||||
## because it will be used for query routing.
|
||||
1
|
||||
if pattern.startsWith("{") and pattern.endsWith("}"): 0
|
||||
else: 1
|
||||
|
||||
RestRouter.init(validate, allowedOrigin = allowedOrigin)
|
||||
|
||||
|
@ -9,10 +9,21 @@ import "."/serdes
|
||||
|
||||
|
||||
const MIMETYPE_JSON* = MediaType.init("application/json")
|
||||
const MIMETYPE_TEXT* = MediaType.init("text/plain")
|
||||
|
||||
proc jsonResponse*(t: typedesc[RestApiResponse], data: auto, status: HttpCode = Http200): SerdesResult[RestApiResponse] =
|
||||
let encoded = ?encodeIntoJsonBytes(data)
|
||||
ok(RestApiResponse.response(encoded, status, $MIMETYPE_JSON))
|
||||
|
||||
proc internalServerError*(t: typedesc[RestApiResponse]): RestApiResponse =
|
||||
RestApiResponse.error(Http500)
|
||||
RestApiResponse.error(Http500)
|
||||
|
||||
proc ok*(t: typedesc[RestApiResponse]): RestApiResponse =
|
||||
RestApiResponse.response("OK", status=Http200, contentType="text/plain")
|
||||
|
||||
proc badRequest*(t: typedesc[RestApiResponse]): RestApiResponse =
|
||||
RestApiResponse.error(Http400)
|
||||
|
||||
|
||||
proc notFound*(t: typedesc[RestApiResponse]): RestApiResponse =
|
||||
RestApiResponse.error(Http404)
|
Loading…
x
Reference in New Issue
Block a user