diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 85356bfb3..2b722ff51 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -13,6 +13,9 @@ import ./v2/test_rest_serdes, ./v2/test_rest_debug_api_serdes, ./v2/test_rest_debug_api, + ./v2/test_rest_relay_api_serdes, + ./v2/test_rest_relay_api_topic_cache, + ./v2/test_rest_relay_api, ./v2/test_peer_manager, ./v2/test_web3, # TODO remove it when rln-relay tests get finalized ./v2/test_waku_bridge, diff --git a/tests/v2/test_rest_relay_api.nim b/tests/v2/test_rest_relay_api.nim new file mode 100644 index 000000000..df24b08b8 --- /dev/null +++ b/tests/v2/test_rest_relay_api.nim @@ -0,0 +1,249 @@ +{.used.} + +import + std/sequtils, + stew/byteutils, + stew/shims/net, + chronicles, + testutils/unittests, + presto, + libp2p/crypto/crypto, + libp2p/protocols/pubsub/pubsub +import + ../../waku/v2/node/wakunode2, + ../../waku/v2/node/rest/[server, client, utils], + ../../waku/v2/node/rest/relay/[api_types, relay_api, topic_cache] + + +proc testWakuNode(): WakuNode = + let + rng = crypto.newRng() + privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(9000) + + WakuNode.new(privkey, bindIp, port, some(extIp), some(port)) + +proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage = + WakuMessage( + payload: payload, + contentTopic: contentTopic, + version: 1, + timestamp: 2022 + ) + + +suite "REST API - Relay": + asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions": + # Given + let node = testWakuNode() + await node.start() + node.mountRelay() + + let restPort = Port(8546) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init( + restAddress, + restPort, + none(string), + none(RestServerConf) + ) + + let topicCache = TopicCache.init() + + installRelayPostSubscriptionsV1Handler(restServer.router, node, topicCache) + restServer.start() + + let pubSubTopics = @[ + PubSubTopicString("pubsub-topic-1"), + PubSubTopicString("pubsub-topic-2"), + PubSubTopicString("pubsub-topic-3") + ] + + # When + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + let requestBody = RelayPostSubscriptionsRequest(pubSubTopics) + let response = await client.relayPostSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + topicCache.isSubscribed("pubsub-topic-1") + topicCache.isSubscribed("pubsub-topic-2") + topicCache.isSubscribed("pubsub-topic-3") + + check: + # Node should be subscribed to default + new topics + PubSub(node.wakuRelay).topics.len == 1 + pubSubTopics.len + + await restServer.stop() + await restServer.closeWait() + await node.stop() + + asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions": + # Given + let node = testWakuNode() + await node.start() + node.mountRelay() + + let restPort = Port(8546) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init( + restAddress, + restPort, + none(string), + none(RestServerConf) + ) + + let topicCache = TopicCache.init() + topicCache.subscribe("pubsub-topic-1") + topicCache.subscribe("pubsub-topic-2") + topicCache.subscribe("pubsub-topic-3") + topicCache.subscribe("pubsub-topic-x") + + installRelayDeleteSubscriptionsV1Handler(restServer.router, node, topicCache) + restServer.start() + + let pubSubTopics = @[ + PubSubTopicString("pubsub-topic-1"), + PubSubTopicString("pubsub-topic-2"), + PubSubTopicString("pubsub-topic-3"), + PubSubTopicString("pubsub-topic-y") + ] + + # When + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + let requestBody = RelayDeleteSubscriptionsRequest(pubSubTopics) + let response = await client.relayDeleteSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + not topicCache.isSubscribed("pubsub-topic-1") + not topicCache.isSubscribed("pubsub-topic-2") + not topicCache.isSubscribed("pubsub-topic-3") + topicCache.isSubscribed("pubsub-topic-x") + + await restServer.stop() + await restServer.closeWait() + await node.stop() + + + asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}": + # Given + let node = testWakuNode() + await node.start() + node.mountRelay() + + let restPort = Port(8546) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init( + restAddress, + restPort, + none(string), + none(RestServerConf) + ) + + let pubSubTopic = "/waku/2/default-waku/proto" + let messages = @[ + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), + ] + + let topicCache = TopicCache.init() + + topicCache.subscribe(pubSubTopic) + for msg in messages: + topicCache.addMessage(pubSubTopic, msg) + + installRelayGetMessagesV1Handler(restServer.router, node, topicCache) + restServer.start() + + # When + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + let response = await client.relayGetMessagesV1(pubSubTopic) + + # Then + check: + response.status == 200 + response.contentType == $MIMETYPE_JSON + response.data.len == 3 + response.data.all do (msg: RelayWakuMessage) -> bool: + msg.payload == "TEST-1" and + string(msg.contentTopic.get()) == "content-topic-x" and + msg.version.get() == Natural(1) and + msg.timestamp.get() == int64(2022) + + + check: + topicCache.isSubscribed(pubSubTopic) + topicCache.getMessages(pubSubTopic).tryGet().len == 0 + + await restServer.stop() + await restServer.closeWait() + await node.stop() + + asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}": + ## "Relay API: publish and subscribe/unsubscribe": + # Given + let node = testWakuNode() + await node.start() + node.mountRelay() + + # RPC server setup + let restPort = Port(8546) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init( + restAddress, + restPort, + none(string), + none(RestServerConf) + ) + + let topicCache = TopicCache.init() + + installRelayApiHandlers(restServer.router, node, topicCache) + restServer.start() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + const defaultContentTopic = ContentTopic("/waku/2/default-content/proto") + + # At this stage the node is only subscribed to the default topic + require(PubSub(node.wakuRelay).topics.len == 1) + + + # When + let newTopics = @[ + PubSubTopicString("pubsub-topic-1"), + PubSubTopicString("pubsub-topic-2"), + PubSubTopicString("pubsub-topic-3") + ] + discard await client.relayPostSubscriptionsV1(newTopics) + + let response = await client.relayPostMessagesV1(defaultTopic, RelayWakuMessage( + payload: "TEST-PAYLOAD", + contentTopic: some(ContentTopicString(defaultContentTopic)), + timestamp: some(int64(2022)) + )) + + # Then + check: + response.status == 200 + response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + # TODO: Check for the message to be published to the topic + + await restServer.stop() + await restServer.closeWait() + await node.stop() diff --git a/tests/v2/test_rest_relay_api_serdes.nim b/tests/v2/test_rest_relay_api_serdes.nim new file mode 100644 index 000000000..3da9eabc2 --- /dev/null +++ b/tests/v2/test_rest_relay_api_serdes.nim @@ -0,0 +1,49 @@ +{.used.} + +import std/typetraits +import chronicles, + unittest2, + stew/[results, byteutils], + json_serialization +import + ../../waku/v2/node/rest/serdes, + ../../waku/v2/node/rest/relay/api_types + + +suite "Relay API - serialization": + + suite "RelayWakuMessage - decode": + test "optional fields are not provided": + # Given + let jsonBytes = toBytes("""{ "payload": "MESSAGE" }""") + + # When + let res = decodeFromJsonBytes(RelayWakuMessage, jsonBytes, requireAllFields = true) + + # Then + require(res.isOk) + let value = res.get() + check: + value.payload == "MESSAGE" + value.contentTopic.isNone + value.version.isNone + value.timestamp.isNone + + suite "RelayWakuMessage - encode": + test "optional fields are none": + # Given + let data = RelayWakuMessage( + payload: "MESSAGE", + contentTopic: none(ContentTopicString), + version: none(Natural), + timestamp: none(int64) + ) + + # When + let res = encodeIntoJsonBytes(data) + + # Then + require(res.isOk) + let value = res.get() + check: + value == toBytes("""{"payload":"MESSAGE"}""") diff --git a/tests/v2/test_rest_relay_api_topic_cache.nim b/tests/v2/test_rest_relay_api_topic_cache.nim new file mode 100644 index 000000000..081dc79c9 --- /dev/null +++ b/tests/v2/test_rest_relay_api_topic_cache.nim @@ -0,0 +1,163 @@ +{.used.} + +import + std/tables, + stew/byteutils, + stew/shims/net, + chronicles, + testutils/unittests, + presto, + libp2p/crypto/crypto, + libp2p/protocols/pubsub/pubsub +import + ../../waku/v2/protocol/waku_message, + ../../waku/v2/node/rest/relay/topic_cache + + +proc fakeWakuMessage(payload = toBytes("TEST"), contentTopic = "test"): WakuMessage = + WakuMessage( + payload: payload, + contentTopic: contentTopic, + version: 1, + timestamp: 2022 + ) + + +suite "TopicCache": + test "subscribe to topic": + ## Given + let testTopic = "test-pubsub-topic" + let cache = TopicCache.init() + + ## When + cache.subscribe(testTopic) + + ## Then + check: + cache.isSubscribed(testTopic) + + + test "unsubscribe from topic": + ## Given + let testTopic = "test-pubsub-topic" + let cache = TopicCache.init() + + # Init cache content + cache.subscribe(testTopic) + + ## When + cache.unsubscribe(testTopic) + + ## Then + check: + not cache.isSubscribed(testTopic) + + + test "get messages of a subscribed topic": + ## Given + let testTopic = "test-pubsub-topic" + let testMessage = fakeWakuMessage() + let cache = TopicCache.init() + + # Init cache content + cache.subscribe(testTopic) + cache.addMessage(testTopic, testMessage) + + ## When + let res = cache.getMessages(testTopic) + + ## Then + check: + res.isOk() + res.get() == @[testMessage] + + + test "get messages with clean flag shoud clear the messages cache": + ## Given + let testTopic = "test-pubsub-topic" + let testMessage = fakeWakuMessage() + let cache = TopicCache.init() + + # Init cache content + cache.subscribe(testTopic) + cache.addMessage(testTopic, testMessage) + + ## When + var res = cache.getMessages(testTopic, clear=true) + require(res.isOk()) + + res = cache.getMessages(testTopic) + + ## Then + check: + res.isOk() + res.get().len == 0 + + + test "get messages of a non-subscribed topic": + ## Given + let testTopic = "test-pubsub-topic" + let cache = TopicCache.init() + + ## When + let res = cache.getMessages(testTopic) + + ## Then + check: + res.isErr() + res.error() == "Not subscribed to topic" + + + test "add messages to subscribed topic": + ## Given + let testTopic = "test-pubsub-topic" + let testMessage = fakeWakuMessage() + let cache = TopicCache.init() + + cache.subscribe(testTopic) + + ## When + cache.addMessage(testTopic, testMessage) + + ## Then + let messages = cache.getMessages(testTopic).tryGet() + check: + messages == @[testMessage] + + + test "add messages to non-subscribed topic": + ## Given + let testTopic = "test-pubsub-topic" + let testMessage = fakeWakuMessage() + let cache = TopicCache.init() + + ## When + cache.addMessage(testTopic, testMessage) + + ## Then + let res = cache.getMessages(testTopic) + check: + res.isErr() + res.error() == "Not subscribed to topic" + + + test "add messages beyond the capacity": + ## Given + let testTopic = "test-pubsub-topic" + let testMessages = @[ + fakeWakuMessage(toBytes("MSG-1")), + fakeWakuMessage(toBytes("MSG-2")), + fakeWakuMessage(toBytes("MSG-3")) + ] + + let cache = TopicCache.init(conf=TopicCacheConfig(capacity: 2)) + cache.subscribe(testTopic) + + ## When + for msg in testMessages: + cache.addMessage(testTopic, msg) + + ## Then + let messages = cache.getMessages(testTopic).tryGet() + check: + messages == testMessages[1..2] diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 98ea8103e..fd1c0073a 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az462-196: +# Libtool was configured on host fv-az504-494: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/rest/relay/api_types.nim b/waku/v2/node/rest/relay/api_types.nim new file mode 100644 index 000000000..c925159c3 --- /dev/null +++ b/waku/v2/node/rest/relay/api_types.nim @@ -0,0 +1,121 @@ +{.push raises: [ Defect ].} + +import + std/[sets, strformat], + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import ".."/serdes +import ../../wakunode2 + + +#### Types + +type + PubSubTopicString* = distinct string + ContentTopicString* = distinct string + +type RelayWakuMessage* = object + payload*: string + contentTopic*: Option[ContentTopicString] + version*: Option[Natural] + timestamp*: Option[int64] + + +type + RelayGetMessagesResponse* = seq[RelayWakuMessage] + RelayPostMessagesRequest* = RelayWakuMessage + +type + RelayPostSubscriptionsRequest* = seq[PubSubTopicString] + RelayDeleteSubscriptionsRequest* = seq[PubSubTopicString] + + +#### Type conversion + +proc toRelayWakuMessage*(msg: WakuMessage): RelayWakuMessage = + RelayWakuMessage( + payload: string.fromBytes(msg.payload), + contentTopic: some(ContentTopicString(msg.contentTopic)), + version: some(Natural(msg.version)), + timestamp: some(msg.timestamp) + ) + +proc toWakuMessage*(msg: RelayWakuMessage, version = 0): WakuMessage = + const defaultContentTopic = ContentTopicString("/waku/2/default-content/proto") + WakuMessage( + payload: msg.payload.toBytes(), + contentTopic: ContentTopic(msg.contentTopic.get(defaultContentTopic)), + version: uint32(msg.version.get(version)), + timestamp: msg.timestamp.get(0) + ) + +#### Serialization and deserialization + +proc writeValue*(writer: var JsonWriter[RestJson], value: PubSubTopicString) + {.raises: [IOError, Defect].} = + writer.writeValue(string(value)) + +proc writeValue*(writer: var JsonWriter[RestJson], value: ContentTopicString) + {.raises: [IOError, Defect].} = + writer.writeValue(string(value)) + +proc writeValue*(writer: var JsonWriter[RestJson], value: RelayWakuMessage) + {.raises: [IOError, Defect].} = + writer.beginRecord() + writer.writeField("payload", value.payload) + if value.contentTopic.isSome: + writer.writeField("contentTopic", value.contentTopic) + if value.version.isSome: + writer.writeField("version", value.version) + if value.timestamp.isSome: + writer.writeField("timestamp", value.timestamp) + writer.endRecord() + +proc readValue*(reader: var JsonReader[RestJson], value: var PubSubTopicString) + {.raises: [SerializationError, IOError, Defect].} = + value = PubSubTopicString(reader.readValue(string)) + +proc readValue*(reader: var JsonReader[RestJson], value: var ContentTopicString) + {.raises: [SerializationError, IOError, Defect].} = + value = ContentTopicString(reader.readValue(string)) + +proc readValue*(reader: var JsonReader[RestJson], value: var RelayWakuMessage) + {.raises: [SerializationError, IOError, Defect].} = + var + payload = none(string) + contentTopic = none(ContentTopicString) + version = none(Natural) + timestamp = none(int64) + + var keys = initHashSet[string]() + for fieldName in readObjectFields(reader): + # Check for reapeated keys + if keys.containsOrIncl(fieldName): + let err = try: fmt"Multiple `{fieldName}` fields found" + except: "Multiple fields with the same name found" + reader.raiseUnexpectedField(err, "RelayWakuMessage") + + case fieldName + of "payload": + payload = some(reader.readValue(string)) + of "contentTopic": + contentTopic = some(reader.readValue(ContentTopicString)) + of "version": + version = some(reader.readValue(Natural)) + of "timestamp": + timestamp = some(reader.readValue(int64)) + else: + unrecognizedFieldWarning() + + if payload.isNone(): + reader.raiseUnexpectedValue("Field `payload` is missing") + + value = RelayWakuMessage( + payload: payload.get(), + contentTopic: contentTopic, + version: version, + timestamp: timestamp + ) diff --git a/waku/v2/node/rest/relay/relay_api.nim b/waku/v2/node/rest/relay/relay_api.nim new file mode 100644 index 000000000..c0b9ead1c --- /dev/null +++ b/waku/v2/node/rest/relay/relay_api.nim @@ -0,0 +1,200 @@ +{.push raises: [Defect].} + +import + std/[sets, sequtils], + stew/byteutils, + chronicles, + json_serialization, + json_serialization/std/options, + presto/[route, client, common] +import ".."/[serdes, utils] +import ../../wakunode2 +import "."/[api_types, topic_cache] + +logScope: topics = "rest_api_relay" + + +##### Topic cache + +const futTimeout* = 5.seconds # Max time to wait for futures + + +#### Request handlers + +const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions" + +proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) = + + router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + # ## Subscribes a node to a list of PubSub topics + # debug "post_waku_v2_relay_v1_subscriptions" + + # Check the request body + if contentBody.isNone(): + return RestApiResponse.badRequest() + + let reqBodyContentType = MediaType.init(contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return RestApiResponse.badRequest() + + let reqBodyData = contentBody.get().data + let reqResult = decodeFromJsonBytes(RelayPostSubscriptionsRequest, reqBodyData) + if reqResult.isErr(): + return RestApiResponse.badRequest() + + let req: RelayPostSubscriptionsRequest = reqResult.get() + + for topic in req: + if topicCache.isSubscribed(string(topic)): + # Only subscribe to topics for which we have no subscribed topic handlers yet + continue + + topicCache.subscribe(string(topic)) + node.subscribe(string(topic), topicCache.messageHandler()) + + return RestApiResponse.ok() + + +proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) = + router.api(MethodDelete, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + # ## Subscribes a node to a list of PubSub topics + # debug "delete_waku_v2_relay_v1_subscriptions" + + # Check the request body + if contentBody.isNone(): + return RestApiResponse.badRequest() + + let reqBodyContentType = MediaType.init(contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return RestApiResponse.badRequest() + + let reqBodyData = contentBody.get().data + let reqResult = decodeFromJsonBytes(RelayDeleteSubscriptionsRequest, reqBodyData) + if reqResult.isErr(): + return RestApiResponse.badRequest() + + let req: RelayDeleteSubscriptionsRequest = reqResult.get() + + # Unsubscribe all handlers from requested topics + for topic in req: + node.unsubscribeAll(string(topic)) + topicCache.unsubscribe(string(topic)) + + # Successfully unsubscribed from all requested topics + return RestApiResponse.ok() + + +const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}" + +proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) = + router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse: + # ## Returns all WakuMessages received on a PubSub topic since the + # ## last time this method was called + # ## TODO: ability to specify a return message limit + # debug "get_waku_v2_relay_v1_messages", topic=topic + + if topic.isErr(): + return RestApiResponse.badRequest() + let pubSubTopic = topic.get() + + let messages = topicCache.getMessages(pubSubTopic, clear=true) + if messages.isErr(): + debug "Not subscribed to topic", topic=pubSubTopic + return RestApiResponse.notFound() + + let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage)) + let resp = RestApiResponse.jsonResponse(data, status=Http200) + if resp.isErr(): + debug "An error ocurred while building the json respose", error=resp.error() + return RestApiResponse.internalServerError() + + return resp.get() + +proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) = + router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse: + + if topic.isErr(): + return RestApiResponse.badRequest() + let pubSubTopic = topic.get() + + # Check the request body + if contentBody.isNone(): + return RestApiResponse.badRequest() + + let reqBodyContentType = MediaType.init(contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return RestApiResponse.badRequest() + + let reqBodyData = contentBody.get().data + let reqResult = decodeFromJsonBytes(RelayPostMessagesRequest, reqBodyData) + if reqResult.isErr(): + return RestApiResponse.badRequest() + + let message: RelayPostMessagesRequest = reqResult.get() + + if not (waitFor node.publish(pubSubTopic, message.toWakuMessage(version = 0)).withTimeout(futTimeout)): + error "Failed to publish message to topic", topic=pubSubTopic + return RestApiResponse.internalServerError() + + return RestApiResponse.ok() + + +proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, topicCache: TopicCache) = + installRelayGetMessagesV1Handler(router, node, topicCache) + installRelayPostMessagesV1Handler(router, node) + installRelayPostSubscriptionsV1Handler(router, node, topicCache) + installRelayDeleteSubscriptionsV1Handler(router, node, topicCache) + + +#### Client + +proc encodeBytes*(value: seq[PubSubTopicString], + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +proc decodeBytes*(t: typedesc[string], value: openarray[byte], + contentType: string): RestResult[string] = + if MediaType.init(contentType) != MIMETYPE_TEXT: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + var res: string + if len(value) > 0: + res = newString(len(value)) + copyMem(addr res[0], unsafeAddr value[0], len(value)) + return ok(res) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.} + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.} + + +proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: string): RestResult[RelayGetMessagesResponse] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported respose contentType value", contentType = contentType + return err("Unsupported response contentType") + + let decoded = ?decodeFromJsonBytes(RelayGetMessagesResponse, data) + return ok(decoded) + +proc encodeBytes*(value: RelayPostMessagesRequest, + contentType: string): RestResult[seq[byte]] = + if MediaType.init(contentType) != MIMETYPE_JSON: + error "Unsupported contentType value", contentType = contentType + return err("Unsupported contentType") + + let encoded = ?encodeIntoJsonBytes(value) + return ok(encoded) + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.} + +# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) +proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.} \ No newline at end of file diff --git a/waku/v2/node/rest/relay/relay_api.yaml b/waku/v2/node/rest/relay/relay_api.yaml new file mode 100644 index 000000000..f84b4e7d4 --- /dev/null +++ b/waku/v2/node/rest/relay/relay_api.yaml @@ -0,0 +1,147 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: relay + description: Relay REST API for WakuV2 node + +paths: + /relay/v1/messages/{topic}: # Note the plural in messages + get: # get_waku_v2_relay_v1_messages + summary: Get the latest messages on the polled topic + description: Get a list of messages that were received on a subscribed PubSub topic after the last time this method was called. + operationId: getMessagesByTopic + tags: + - relay + parameters: + - in: path + name: topic # Note the name is the same as in the path + required: true + schema: + type: string + description: The user ID + responses: + '200': + description: The latest messages on the polled topic. + content: + application/json: + schema: + $ref: '#/components/schemas/RelayGetMessagesResponse' + # TODO: Review the possible errors of this endpoint + '5XX': + description: Unexpected error. + + post: # post_waku_v2_relay_v1_message + summary: Publish a message to be relayed + description: Publishes a message to be relayed on a PubSub topic. + operationId: postMessagesToTopic + tags: + - relay + parameters: + - in: path + name: topic # Note the name is the same as in the path + description: The messages content topic + required: true + schema: + $ref: '#/components/schemas/RelayPostMessagesRequest' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RelayPostMessagesRequest' + responses: + '200': + description: OK + # TODO: Review the possible errors of this endpoint + '5XX': + description: Unexpected error. + + /relay/v1/subscriptions: + post: # post_waku_v2_relay_v1_subscriptions + summary: Subscribe a node to an array of topics + description: Subscribe a node to an array of PubSub topics. + operationId: postSubscriptions + tags: + - relay + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RelayPostSubscriptionsRequest' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + # TODO: Review the possible errors of this endpoint + '5XX': + description: Unexpected error. + + delete: # delete_waku_v2_relay_v1_subscriptions + summary: Unsubscribe a node from an array of topics + description: Unsubscribe a node from an array of PubSub topics. + operationId: deleteSubscriptions + tags: + - relay + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/RelayDeleteSubscriptionsRequest' + responses: + '200': + description: OK + content: + text/plain: + schema: + type: string + # TODO: Review the possible errors of this endpoint + '5XX': + description: Unexpected error. + +components: + schemas: + PubSubTopic: + type: string + ContentTopic: + type: string + + RelayWakuMessage: + type: object + properties: + payload: + type: string + contentTopic: + $ref: '#/components/schemas/ContentTopic' + version: + type: number + timestamp: + type: number + required: + - payload + + RelayGetMessagesResponse: + type: array + items: + $ref: '#/components/schemas/RelayWakuMessage' + + RelayPostMessagesRequest: + $ref: '#/components/schemas/RelayWakuMessage' + + RelayPostSubscriptionsRequest: + type: array + items: + $ref: '#/components/schemas/PubSubTopic' + + RelayDeleteSubscriptionsRequest: + type: array + items: + $ref: '#/components/schemas/PubSubTopic' + \ No newline at end of file diff --git a/waku/v2/node/rest/relay/topic_cache.nim b/waku/v2/node/rest/relay/topic_cache.nim new file mode 100644 index 000000000..b224d0850 --- /dev/null +++ b/waku/v2/node/rest/relay/topic_cache.nim @@ -0,0 +1,108 @@ +{.push raises: [Defect].} + +import + std/[tables, sequtils], + stew/results, + chronicles, + chronos, + libp2p/protocols/pubsub +import + ../../../protocol/waku_message + +logScope: topics = "rest_api_relay_topiccache" + +const DEFAULT_TOPICCACHE_CAPACITY* = 30 # Max number of messages cached per topic @TODO make this configurable + + +type PubSubTopicString = string + +type TopicCacheResult*[T] = Result[T, cstring] + +type TopicCacheMessageHandler* = Topichandler + + +type TopicCacheConfig* = object + capacity*: int + +proc default*(T: type TopicCacheConfig): T = + TopicCacheConfig( + capacity: DEFAULT_TOPICCACHE_CAPACITY + ) + + +type TopicCache* = ref object + conf: TopicCacheConfig + table: Table[PubSubTopicString, seq[WakuMessage]] + +func init*(T: type TopicCache, conf=TopicCacheConfig.default()): T = + TopicCache( + conf: conf, + table: initTable[PubSubTopicString, seq[WakuMessage]]() + ) + + +proc isSubscribed*(t: TopicCache, topic: PubSubTopicString): bool = + t.table.hasKey(topic) + +proc subscribe*(t: TopicCache, topic: PubSubTopicString) = + if t.isSubscribed(topic): + return + t.table[topic] = @[] + +proc unsubscribe*(t: TopicCache, topic: PubSubTopicString) = + if not t.isSubscribed(topic): + return + t.table.del(topic) + + +proc addMessage*(t: TopicCache, topic: PubSubTopicString, msg: WakuMessage) = + if not t.isSubscribed(topic): + return + + # Make a copy of msgs for this topic to modify + var messages = t.table.getOrDefault(topic, @[]) + + if messages.len >= t.conf.capacity: + debug "Topic cache capacity reached", topic=topic + # Message cache on this topic exceeds maximum. Delete oldest. + # TODO: this may become a bottle neck if called as the norm rather than + # exception when adding messages. Performance profile needed. + messages.delete(0,0) + + messages.add(msg) + + # Replace indexed entry with copy + t.table[topic] = messages + +proc clearMessages*(t: TopicCache, topic: PubSubTopicString) = + if not t.isSubscribed(topic): + return + t.table[topic] = @[] + +proc getMessages*(t: TopicCache, topic: PubSubTopicString, clear=false): TopicCacheResult[seq[WakuMessage]] = + if not t.isSubscribed(topic): + return err("Not subscribed to topic") + + let messages = t.table.getOrDefault(topic, @[]) + if clear: + t.clearMessages(topic) + + ok(messages) + + +proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler = + + proc handler(topic: string, data: seq[byte]): Future[void] {.async, raises: [Defect].} = + trace "Topic handler triggered", topic=topic + + # Add message to current cache + let msg = WakuMessage.init(data) + if msg.isErr(): + debug "WakuMessage received but failed to decode", msg=msg, topic=topic + # TODO: handle message decode failure + return + + trace "WakuMessage received", msg=msg, topic=topic + cache.addMessage(PubSubTopicString(topic), msg.get()) + + handler \ No newline at end of file diff --git a/waku/v2/node/rest/serdes.nim b/waku/v2/node/rest/serdes.nim index fa90b58dc..e87b9b21b 100644 --- a/waku/v2/node/rest/serdes.nim +++ b/waku/v2/node/rest/serdes.nim @@ -69,4 +69,13 @@ proc encodeIntoJsonBytes*(value: auto): SerdesResult[seq[byte]] = # TODO: Do better error reporting here return err("unable to serialize data") - ok(encoded) \ No newline at end of file + ok(encoded) + + +#### helpers + +proc encodeString*(value: string): RestResult[string] = + ok(value) + +proc decodeString*(t: typedesc[string], value: string): RestResult[string] = + ok(value) diff --git a/waku/v2/node/rest/server.nim b/waku/v2/node/rest/server.nim index 8d633c9f4..19e642f09 100644 --- a/waku/v2/node/rest/server.nim +++ b/waku/v2/node/rest/server.nim @@ -13,10 +13,11 @@ import proc getRouter(allowedOrigin: Option[string]): RestRouter = # TODO: Review this `validate` method. Check in nim-presto what is this used for. - proc validate(key: string, value: string): int = + proc validate(pattern: string, value: string): int = ## This is rough validation procedure which should be simple and fast, ## because it will be used for query routing. - 1 + if pattern.startsWith("{") and pattern.endsWith("}"): 0 + else: 1 RestRouter.init(validate, allowedOrigin = allowedOrigin) diff --git a/waku/v2/node/rest/utils.nim b/waku/v2/node/rest/utils.nim index 2e787d718..baaee7bbd 100644 --- a/waku/v2/node/rest/utils.nim +++ b/waku/v2/node/rest/utils.nim @@ -9,10 +9,21 @@ import "."/serdes const MIMETYPE_JSON* = MediaType.init("application/json") +const MIMETYPE_TEXT* = MediaType.init("text/plain") proc jsonResponse*(t: typedesc[RestApiResponse], data: auto, status: HttpCode = Http200): SerdesResult[RestApiResponse] = let encoded = ?encodeIntoJsonBytes(data) ok(RestApiResponse.response(encoded, status, $MIMETYPE_JSON)) proc internalServerError*(t: typedesc[RestApiResponse]): RestApiResponse = - RestApiResponse.error(Http500) \ No newline at end of file + RestApiResponse.error(Http500) + +proc ok*(t: typedesc[RestApiResponse]): RestApiResponse = + RestApiResponse.response("OK", status=Http200, contentType="text/plain") + +proc badRequest*(t: typedesc[RestApiResponse]): RestApiResponse = + RestApiResponse.error(Http400) + + +proc notFound*(t: typedesc[RestApiResponse]): RestApiResponse = + RestApiResponse.error(Http404) \ No newline at end of file