mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
feat: Rest API interface for legacy (v1) filter service. (#1851)
* Added Rest API interface for legacy (v1) filter service with tests.
This commit is contained in:
parent
831a093f25
commit
08ff667227
3
.gitignore
vendored
3
.gitignore
vendored
@ -38,6 +38,9 @@ node_modules/
|
|||||||
# Ignore Jetbrains IDE files
|
# Ignore Jetbrains IDE files
|
||||||
.idea/
|
.idea/
|
||||||
|
|
||||||
|
# ignore vscode files
|
||||||
|
.vscode/
|
||||||
|
|
||||||
# RLN / keystore
|
# RLN / keystore
|
||||||
rlnKeystore.json
|
rlnKeystore.json
|
||||||
*.tar.gz
|
*.tar.gz
|
||||||
|
|||||||
@ -45,6 +45,7 @@ import
|
|||||||
../../waku/v2/node/rest/debug/handlers as rest_debug_api,
|
../../waku/v2/node/rest/debug/handlers as rest_debug_api,
|
||||||
../../waku/v2/node/rest/relay/handlers as rest_relay_api,
|
../../waku/v2/node/rest/relay/handlers as rest_relay_api,
|
||||||
../../waku/v2/node/rest/relay/topic_cache,
|
../../waku/v2/node/rest/relay/topic_cache,
|
||||||
|
../../waku/v2/node/rest/filter/handlers as rest_filter_api,
|
||||||
../../waku/v2/node/rest/store/handlers as rest_store_api,
|
../../waku/v2/node/rest/store/handlers as rest_store_api,
|
||||||
../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api,
|
../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api,
|
||||||
../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api,
|
../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api,
|
||||||
@ -566,6 +567,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
|
|||||||
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
|
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
|
||||||
installRelayApiHandlers(server.router, app.node, relayCache)
|
installRelayApiHandlers(server.router, app.node, relayCache)
|
||||||
|
|
||||||
|
## Filter REST API
|
||||||
|
if conf.filter:
|
||||||
|
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity)
|
||||||
|
installFilterApiHandlers(server.router, app.node, filterCache)
|
||||||
|
|
||||||
## Store REST API
|
## Store REST API
|
||||||
installStoreApiHandlers(server.router, app.node)
|
installStoreApiHandlers(server.router, app.node)
|
||||||
|
|
||||||
|
|||||||
191
tests/v2/wakunode_rest/test_rest_filter.nim
Normal file
191
tests/v2/wakunode_rest/test_rest_filter.nim
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/sequtils,
|
||||||
|
stew/byteutils,
|
||||||
|
stew/shims/net,
|
||||||
|
testutils/unittests,
|
||||||
|
presto, presto/client as presto_client,
|
||||||
|
libp2p/crypto/crypto
|
||||||
|
import
|
||||||
|
../../waku/v2/node/message_cache,
|
||||||
|
../../waku/common/base64,
|
||||||
|
../../waku/v2/waku_core,
|
||||||
|
../../waku/v2/waku_node,
|
||||||
|
../../waku/v2/node/peer_manager,
|
||||||
|
../../waku/v2/waku_filter,
|
||||||
|
../../waku/v2/node/rest/server,
|
||||||
|
../../waku/v2/node/rest/client,
|
||||||
|
../../waku/v2/node/rest/responses,
|
||||||
|
../../waku/v2/node/rest/filter/types,
|
||||||
|
../../waku/v2/node/rest/filter/handlers as filter_api,
|
||||||
|
../../waku/v2/node/rest/filter/client as filter_api_client,
|
||||||
|
../../waku/v2/waku_relay,
|
||||||
|
../testlib/wakucore,
|
||||||
|
../testlib/wakunode
|
||||||
|
|
||||||
|
|
||||||
|
proc testWakuNode(): WakuNode =
|
||||||
|
let
|
||||||
|
privkey = generateSecp256k1Key()
|
||||||
|
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||||
|
extIp = ValidIpAddress.init("127.0.0.1")
|
||||||
|
port = Port(0)
|
||||||
|
|
||||||
|
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
|
||||||
|
|
||||||
|
|
||||||
|
type RestFilterTest = object
|
||||||
|
node1: WakuNode
|
||||||
|
node2: WakuNode
|
||||||
|
restServer: RestServerRef
|
||||||
|
messageCache: filter_api.MessageCache
|
||||||
|
client: RestClientRef
|
||||||
|
|
||||||
|
|
||||||
|
proc setupRestFilter(): Future[RestFilterTest] {.async.} =
|
||||||
|
result.node1 = testWakuNode()
|
||||||
|
result.node2 = testWakuNode()
|
||||||
|
|
||||||
|
await allFutures(result.node1.start(), result.node2.start())
|
||||||
|
|
||||||
|
await result.node1.mountFilter()
|
||||||
|
await result.node2.mountFilterClient()
|
||||||
|
|
||||||
|
result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)
|
||||||
|
|
||||||
|
let restPort = Port(58011)
|
||||||
|
let restAddress = ValidIpAddress.init("0.0.0.0")
|
||||||
|
result.restServer = RestServerRef.init(restAddress, restPort).tryGet()
|
||||||
|
|
||||||
|
result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity)
|
||||||
|
|
||||||
|
installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
|
||||||
|
installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
|
||||||
|
installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache)
|
||||||
|
|
||||||
|
result.restServer.start()
|
||||||
|
|
||||||
|
result.client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
proc shutdown(self: RestFilterTest) {.async.} =
|
||||||
|
await self.restServer.stop()
|
||||||
|
await self.restServer.closeWait()
|
||||||
|
await allFutures(self.node1.stop(), self.node2.stop())
|
||||||
|
|
||||||
|
|
||||||
|
suite "Waku v2 Rest API - Filter":
|
||||||
|
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
|
||||||
|
# Given
|
||||||
|
let restFilterTest: RestFilterTest = await setupRestFilter()
|
||||||
|
|
||||||
|
# When
|
||||||
|
let contentFilters = @[DefaultContentTopic
|
||||||
|
,ContentTopic("2")
|
||||||
|
,ContentTopic("3")
|
||||||
|
,ContentTopic("4")
|
||||||
|
]
|
||||||
|
|
||||||
|
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
|
||||||
|
pubsubTopic: DefaultPubsubTopic)
|
||||||
|
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.status == 200
|
||||||
|
$response.contentType == $MIMETYPE_TEXT
|
||||||
|
response.data == "OK"
|
||||||
|
|
||||||
|
check:
|
||||||
|
restFilterTest.messageCache.isSubscribed(DefaultContentTopic)
|
||||||
|
restFilterTest.messageCache.isSubscribed("2")
|
||||||
|
restFilterTest.messageCache.isSubscribed("3")
|
||||||
|
restFilterTest.messageCache.isSubscribed("4")
|
||||||
|
|
||||||
|
# When - error case
|
||||||
|
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "")
|
||||||
|
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
|
||||||
|
|
||||||
|
check:
|
||||||
|
badResponse.status == 400
|
||||||
|
$badResponse.contentType == $MIMETYPE_TEXT
|
||||||
|
badResponse.data == "Invalid content body, could not decode. Unable to deserialize data"
|
||||||
|
|
||||||
|
|
||||||
|
await restFilterTest.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
|
||||||
|
# Given
|
||||||
|
let
|
||||||
|
restFilterTest: RestFilterTest = await setupRestFilter()
|
||||||
|
|
||||||
|
# When
|
||||||
|
restFilterTest.messageCache.subscribe("1")
|
||||||
|
restFilterTest.messageCache.subscribe("2")
|
||||||
|
restFilterTest.messageCache.subscribe("3")
|
||||||
|
restFilterTest.messageCache.subscribe("4")
|
||||||
|
|
||||||
|
let contentFilters = @[ContentTopic("1")
|
||||||
|
,ContentTopic("2")
|
||||||
|
,ContentTopic("3")
|
||||||
|
# ,ContentTopic("4") # Keep this subscription for check
|
||||||
|
]
|
||||||
|
|
||||||
|
# When
|
||||||
|
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
|
||||||
|
pubsubTopic: DefaultPubsubTopic)
|
||||||
|
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.status == 200
|
||||||
|
$response.contentType == $MIMETYPE_TEXT
|
||||||
|
response.data == "OK"
|
||||||
|
|
||||||
|
check:
|
||||||
|
not restFilterTest.messageCache.isSubscribed("1")
|
||||||
|
not restFilterTest.messageCache.isSubscribed("2")
|
||||||
|
not restFilterTest.messageCache.isSubscribed("3")
|
||||||
|
restFilterTest.messageCache.isSubscribed("4")
|
||||||
|
|
||||||
|
await restFilterTest.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
|
||||||
|
# Given
|
||||||
|
|
||||||
|
let
|
||||||
|
restFilterTest = await setupRestFilter()
|
||||||
|
|
||||||
|
let pubSubTopic = "/waku/2/default-waku/proto"
|
||||||
|
let contentTopic = ContentTopic( "content-topic-x" )
|
||||||
|
|
||||||
|
let messages = @[
|
||||||
|
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
|
||||||
|
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
|
||||||
|
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
|
||||||
|
]
|
||||||
|
|
||||||
|
restFilterTest.messageCache.subscribe(contentTopic)
|
||||||
|
for msg in messages:
|
||||||
|
restFilterTest.messageCache.addMessage(contentTopic, msg)
|
||||||
|
|
||||||
|
# When
|
||||||
|
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.status == 200
|
||||||
|
$response.contentType == $MIMETYPE_JSON
|
||||||
|
response.data.len == 3
|
||||||
|
response.data.all do (msg: FilterWakuMessage) -> bool:
|
||||||
|
msg.payload == base64.encode("TEST-1") and
|
||||||
|
msg.contentTopic.get().string == "content-topic-x" and
|
||||||
|
msg.version.get() == 2 and
|
||||||
|
msg.timestamp.get() != Timestamp(0)
|
||||||
|
|
||||||
|
await restFilterTest.shutdown()
|
||||||
68
waku/v2/node/rest/filter/client.nim
Normal file
68
waku/v2/node/rest/filter/client.nim
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/sets,
|
||||||
|
stew/byteutils,
|
||||||
|
chronicles,
|
||||||
|
json_serialization,
|
||||||
|
json_serialization/std/options,
|
||||||
|
presto/[route, client, common]
|
||||||
|
import
|
||||||
|
../../../waku_core,
|
||||||
|
../serdes,
|
||||||
|
../responses,
|
||||||
|
./types
|
||||||
|
|
||||||
|
export types
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "waku node rest client"
|
||||||
|
|
||||||
|
proc encodeBytes*(value: FilterSubscriptionsRequest,
|
||||||
|
contentType: string): RestResult[seq[byte]] =
|
||||||
|
if MediaType.init(contentType) != MIMETYPE_JSON:
|
||||||
|
error "Unsupported contentType value", contentType = contentType
|
||||||
|
return err("Unsupported contentType")
|
||||||
|
|
||||||
|
let encoded = ?encodeIntoJsonBytes(value)
|
||||||
|
return ok(encoded)
|
||||||
|
|
||||||
|
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
|
||||||
|
contentType: Opt[ContentTypeData]): RestResult[string] =
|
||||||
|
if MediaType.init($contentType) != MIMETYPE_TEXT:
|
||||||
|
error "Unsupported contentType value", contentType = contentType
|
||||||
|
return err("Unsupported contentType")
|
||||||
|
|
||||||
|
var res: string
|
||||||
|
if len(value) > 0:
|
||||||
|
res = newString(len(value))
|
||||||
|
copyMem(addr res[0], unsafeAddr value[0], len(value))
|
||||||
|
return ok(res)
|
||||||
|
|
||||||
|
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
|
||||||
|
proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest):
|
||||||
|
RestResponse[string]
|
||||||
|
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.}
|
||||||
|
|
||||||
|
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
|
||||||
|
proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest):
|
||||||
|
RestResponse[string]
|
||||||
|
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.}
|
||||||
|
|
||||||
|
proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
|
||||||
|
data: openArray[byte],
|
||||||
|
contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
|
||||||
|
if MediaType.init($contentType) != MIMETYPE_JSON:
|
||||||
|
error "Unsupported response contentType value", contentType = contentType
|
||||||
|
return err("Unsupported response contentType")
|
||||||
|
|
||||||
|
let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
|
||||||
|
return ok(decoded)
|
||||||
|
|
||||||
|
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
|
||||||
|
proc filterGetMessagesV1*(contentTopic: string):
|
||||||
|
RestResponse[FilterGetMessagesResponse]
|
||||||
|
{.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.}
|
||||||
155
waku/v2/node/rest/filter/handlers.nim
Normal file
155
waku/v2/node/rest/filter/handlers.nim
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/sequtils,
|
||||||
|
stew/byteutils,
|
||||||
|
chronicles,
|
||||||
|
json_serialization,
|
||||||
|
json_serialization/std/options,
|
||||||
|
presto/route,
|
||||||
|
presto/common
|
||||||
|
import
|
||||||
|
../../../waku_core,
|
||||||
|
../../../waku_filter,
|
||||||
|
../../../waku_filter/client,
|
||||||
|
../../message_cache,
|
||||||
|
../../peer_manager,
|
||||||
|
../../waku_node,
|
||||||
|
../serdes,
|
||||||
|
../responses,
|
||||||
|
./types
|
||||||
|
|
||||||
|
export types
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "waku node rest filter_api"
|
||||||
|
|
||||||
|
const futTimeoutForSubscriptionProcessing* = 5.seconds
|
||||||
|
|
||||||
|
#### Request handlers
|
||||||
|
|
||||||
|
const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions"
|
||||||
|
|
||||||
|
const filterMessageCacheDefaultCapacity* = 30
|
||||||
|
|
||||||
|
type
|
||||||
|
MessageCache* = message_cache.MessageCache[ContentTopic]
|
||||||
|
|
||||||
|
func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] =
|
||||||
|
if contentBody.isNone():
|
||||||
|
return err(RestApiResponse.badRequest("Missing content body"))
|
||||||
|
|
||||||
|
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
|
||||||
|
if reqBodyContentType != MIMETYPE_JSON:
|
||||||
|
return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json"))
|
||||||
|
|
||||||
|
let reqBodyData = contentBody.get().data
|
||||||
|
|
||||||
|
let requestResult = decodeFromJsonBytes(T, reqBodyData)
|
||||||
|
if requestResult.isErr():
|
||||||
|
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
|
||||||
|
$requestResult.error))
|
||||||
|
|
||||||
|
return ok(requestResult.get())
|
||||||
|
|
||||||
|
proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter,
|
||||||
|
node: WakuNode,
|
||||||
|
cache: MessageCache) =
|
||||||
|
let pushHandler: FilterPushHandler =
|
||||||
|
proc(pubsubTopic: PubsubTopic,
|
||||||
|
msg: WakuMessage) {.async, gcsafe, closure.} =
|
||||||
|
cache.addMessage(msg.contentTopic, msg)
|
||||||
|
|
||||||
|
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
|
||||||
|
## Subscribes a node to a list of contentTopics of a pubsubTopic
|
||||||
|
# debug "post_waku_v2_filter_v1_subscriptions"
|
||||||
|
|
||||||
|
let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody)
|
||||||
|
|
||||||
|
if decodedBody.isErr():
|
||||||
|
return decodedBody.error
|
||||||
|
|
||||||
|
let req: FilterSubscriptionsRequest = decodedBody.value()
|
||||||
|
|
||||||
|
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
|
||||||
|
|
||||||
|
if peerOpt.isNone():
|
||||||
|
return RestApiResponse.internalServerError("No suitable remote filter peers")
|
||||||
|
|
||||||
|
let subFut = node.filterSubscribe(req.pubsubTopic,
|
||||||
|
req.contentFilters,
|
||||||
|
pushHandler,
|
||||||
|
peerOpt.get())
|
||||||
|
|
||||||
|
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
|
||||||
|
error "Failed to subscribe to contentFilters do to timeout!"
|
||||||
|
return RestApiResponse.internalServerError("Failed to subscribe to contentFilters")
|
||||||
|
|
||||||
|
# Successfully subscribed to all content filters
|
||||||
|
for cTopic in req.contentFilters:
|
||||||
|
cache.subscribe(cTopic)
|
||||||
|
|
||||||
|
return RestApiResponse.ok()
|
||||||
|
|
||||||
|
proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter,
|
||||||
|
node: WakuNode,
|
||||||
|
cache: MessageCache) =
|
||||||
|
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
|
||||||
|
## Subscribes a node to a list of contentTopics of a PubSub topic
|
||||||
|
# debug "delete_waku_v2_filter_v1_subscriptions"
|
||||||
|
|
||||||
|
let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody)
|
||||||
|
|
||||||
|
if decodedBody.isErr():
|
||||||
|
return decodedBody.error
|
||||||
|
|
||||||
|
let req: FilterSubscriptionsRequest = decodedBody.value()
|
||||||
|
|
||||||
|
let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters)
|
||||||
|
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
|
||||||
|
error "Failed to unsubscribe from contentFilters due to timeout!"
|
||||||
|
return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters")
|
||||||
|
|
||||||
|
for cTopic in req.contentFilters:
|
||||||
|
cache.unsubscribe(cTopic)
|
||||||
|
|
||||||
|
# Successfully unsubscribed from all requested contentTopics
|
||||||
|
return RestApiResponse.ok()
|
||||||
|
|
||||||
|
const ROUTE_RELAY_MESSAGESV1* = "/filter/v1/messages/{contentTopic}"
|
||||||
|
|
||||||
|
proc installFilterGetMessagesV1Handler*(router: var RestRouter,
|
||||||
|
node: WakuNode,
|
||||||
|
cache: MessageCache) =
|
||||||
|
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse:
|
||||||
|
## Returns all WakuMessages received on a specified content topic since the
|
||||||
|
## last time this method was called
|
||||||
|
## TODO: ability to specify a return message limit
|
||||||
|
# debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
|
||||||
|
|
||||||
|
if contentTopic.isErr():
|
||||||
|
return RestApiResponse.badRequest("Missing contentTopic")
|
||||||
|
|
||||||
|
let contentTopic = contentTopic.get()
|
||||||
|
|
||||||
|
let msgRes = cache.getMessages(contentTopic, clear=true)
|
||||||
|
if msgRes.isErr():
|
||||||
|
return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic)
|
||||||
|
|
||||||
|
let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage))
|
||||||
|
let resp = RestApiResponse.jsonResponse(data, status=Http200)
|
||||||
|
if resp.isErr():
|
||||||
|
error "An error ocurred while building the json respose: ", error=resp.error
|
||||||
|
return RestApiResponse.internalServerError("An error ocurred while building the json respose")
|
||||||
|
|
||||||
|
return resp.get()
|
||||||
|
|
||||||
|
proc installFilterApiHandlers*(router: var RestRouter,
|
||||||
|
node: WakuNode,
|
||||||
|
cache: MessageCache) =
|
||||||
|
installFilterPostSubscriptionsV1Handler(router, node, cache)
|
||||||
|
installFilterDeleteSubscriptionsV1Handler(router, node, cache)
|
||||||
|
installFilterGetMessagesV1Handler(router, node, cache)
|
||||||
@ -21,7 +21,7 @@ paths:
|
|||||||
content:
|
content:
|
||||||
application/json:
|
application/json:
|
||||||
schema:
|
schema:
|
||||||
$ref: '#/components/schemas/FilterPostSubscriptionsRequest'
|
$ref: '#/components/schemas/FilterSubscriptionsRequest'
|
||||||
responses:
|
responses:
|
||||||
'200':
|
'200':
|
||||||
description: OK
|
description: OK
|
||||||
@ -45,7 +45,7 @@ paths:
|
|||||||
content:
|
content:
|
||||||
application/json:
|
application/json:
|
||||||
schema:
|
schema:
|
||||||
$ref: '#/components/schemas/FilterDeleteSubscriptionsRequest'
|
$ref: '#/components/schemas/FilterSubscriptionsRequest'
|
||||||
responses:
|
responses:
|
||||||
'200':
|
'200':
|
||||||
description: OK
|
description: OK
|
||||||
@ -61,7 +61,8 @@ paths:
|
|||||||
'5XX':
|
'5XX':
|
||||||
description: Unexpected error.
|
description: Unexpected error.
|
||||||
|
|
||||||
/filter/v1/messages/{topic}:
|
# TODO: Review the path of this endpoint due maybe query for list of contentTopics matching
|
||||||
|
/filter/v1/messages/{contentTopic}:
|
||||||
get: # get_waku_v2_filter_v1_messages
|
get: # get_waku_v2_filter_v1_messages
|
||||||
summary: Get the latest messages on the polled content topic
|
summary: Get the latest messages on the polled content topic
|
||||||
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
|
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
|
||||||
@ -70,11 +71,11 @@ paths:
|
|||||||
- filter
|
- filter
|
||||||
parameters:
|
parameters:
|
||||||
- in: path
|
- in: path
|
||||||
name: topic # Note the name is the same as in the path
|
name: contentTopic # Note the name is the same as in the path
|
||||||
required: true
|
required: true
|
||||||
schema:
|
schema:
|
||||||
type: string
|
type: string
|
||||||
description: The user ID
|
description: Content topic of message
|
||||||
responses:
|
responses:
|
||||||
'200':
|
'200':
|
||||||
description: The latest messages on the polled topic.
|
description: The latest messages on the polled topic.
|
||||||
@ -112,19 +113,7 @@ components:
|
|||||||
required:
|
required:
|
||||||
- payload
|
- payload
|
||||||
|
|
||||||
FilterPostSubscriptionsRequest:
|
FilterSubscriptionsRequest:
|
||||||
type: object
|
|
||||||
properties:
|
|
||||||
contentFilters:
|
|
||||||
type: array
|
|
||||||
items:
|
|
||||||
$ref: '#/components/schemas/ContentTopic'
|
|
||||||
pubsubTopic:
|
|
||||||
$ref: "#/components/schemas/PubSubTopic"
|
|
||||||
required:
|
|
||||||
- contentFilters
|
|
||||||
|
|
||||||
FilterDeleteSubscriptionsRequest:
|
|
||||||
type: object
|
type: object
|
||||||
properties:
|
properties:
|
||||||
contentFilters:
|
contentFilters:
|
||||||
|
|||||||
151
waku/v2/node/rest/filter/types.nim
Normal file
151
waku/v2/node/rest/filter/types.nim
Normal file
@ -0,0 +1,151 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[sets, strformat],
|
||||||
|
chronicles,
|
||||||
|
json_serialization,
|
||||||
|
json_serialization/std/options,
|
||||||
|
presto/[route, client, common]
|
||||||
|
import
|
||||||
|
../../../../common/base64,
|
||||||
|
../../../waku_core,
|
||||||
|
../serdes
|
||||||
|
|
||||||
|
#### Types
|
||||||
|
|
||||||
|
type FilterWakuMessage* = object
|
||||||
|
payload*: Base64String
|
||||||
|
contentTopic*: Option[ContentTopic]
|
||||||
|
version*: Option[Natural]
|
||||||
|
timestamp*: Option[int64]
|
||||||
|
|
||||||
|
type FilterGetMessagesResponse* = seq[FilterWakuMessage]
|
||||||
|
|
||||||
|
type FilterSubscriptionsRequest* = object
|
||||||
|
pubsubTopic*: PubSubTopic
|
||||||
|
contentFilters*: seq[ContentTopic]
|
||||||
|
|
||||||
|
#### Type conversion
|
||||||
|
|
||||||
|
proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage =
|
||||||
|
FilterWakuMessage(
|
||||||
|
payload: base64.encode(msg.payload),
|
||||||
|
contentTopic: some(msg.contentTopic),
|
||||||
|
version: some(Natural(msg.version)),
|
||||||
|
timestamp: some(msg.timestamp)
|
||||||
|
)
|
||||||
|
|
||||||
|
proc toWakuMessage*(msg: FilterWakuMessage, version = 0): Result[WakuMessage, string] =
|
||||||
|
let
|
||||||
|
payload = ?msg.payload.decode()
|
||||||
|
contentTopic = msg.contentTopic.get(DefaultContentTopic)
|
||||||
|
version = uint32(msg.version.get(version))
|
||||||
|
timestamp = msg.timestamp.get(0)
|
||||||
|
|
||||||
|
ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, timestamp: timestamp))
|
||||||
|
|
||||||
|
#### Serialization and deserialization
|
||||||
|
|
||||||
|
proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String)
|
||||||
|
{.raises: [IOError].} =
|
||||||
|
writer.writeValue(string(value))
|
||||||
|
|
||||||
|
proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage)
|
||||||
|
{.raises: [IOError].} =
|
||||||
|
writer.beginRecord()
|
||||||
|
writer.writeField("payload", value.payload)
|
||||||
|
if value.contentTopic.isSome:
|
||||||
|
writer.writeField("contentTopic", value.contentTopic)
|
||||||
|
if value.version.isSome:
|
||||||
|
writer.writeField("version", value.version)
|
||||||
|
if value.timestamp.isSome:
|
||||||
|
writer.writeField("timestamp", value.timestamp)
|
||||||
|
writer.endRecord()
|
||||||
|
|
||||||
|
proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest)
|
||||||
|
{.raises: [IOError].} =
|
||||||
|
writer.beginRecord()
|
||||||
|
writer.writeField("pubsubTopic", value.pubsubTopic)
|
||||||
|
writer.writeField("contentFilters", value.contentFilters)
|
||||||
|
writer.endRecord()
|
||||||
|
|
||||||
|
proc readValue*(reader: var JsonReader[RestJson], value: var Base64String)
|
||||||
|
{.raises: [SerializationError, IOError].} =
|
||||||
|
value = Base64String(reader.readValue(string))
|
||||||
|
|
||||||
|
proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage)
|
||||||
|
{.raises: [SerializationError, IOError].} =
|
||||||
|
var
|
||||||
|
payload = none(Base64String)
|
||||||
|
contentTopic = none(ContentTopic)
|
||||||
|
version = none(Natural)
|
||||||
|
timestamp = none(int64)
|
||||||
|
|
||||||
|
var keys = initHashSet[string]()
|
||||||
|
for fieldName in readObjectFields(reader):
|
||||||
|
# Check for reapeated keys
|
||||||
|
if keys.containsOrIncl(fieldName):
|
||||||
|
let err = try: fmt"Multiple `{fieldName}` fields found"
|
||||||
|
except CatchableError: "Multiple fields with the same name found"
|
||||||
|
reader.raiseUnexpectedField(err, "FilterWakuMessage")
|
||||||
|
|
||||||
|
case fieldName
|
||||||
|
of "payload":
|
||||||
|
payload = some(reader.readValue(Base64String))
|
||||||
|
of "contentTopic":
|
||||||
|
contentTopic = some(reader.readValue(ContentTopic))
|
||||||
|
of "version":
|
||||||
|
version = some(reader.readValue(Natural))
|
||||||
|
of "timestamp":
|
||||||
|
timestamp = some(reader.readValue(int64))
|
||||||
|
else:
|
||||||
|
unrecognizedFieldWarning()
|
||||||
|
|
||||||
|
if payload.isNone():
|
||||||
|
reader.raiseUnexpectedValue("Field `payload` is missing")
|
||||||
|
|
||||||
|
value = FilterWakuMessage(
|
||||||
|
payload: payload.get(),
|
||||||
|
contentTopic: contentTopic,
|
||||||
|
version: version,
|
||||||
|
timestamp: timestamp
|
||||||
|
)
|
||||||
|
|
||||||
|
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest)
|
||||||
|
{.raises: [SerializationError, IOError].} =
|
||||||
|
var
|
||||||
|
pubsubTopic = none(PubsubTopic)
|
||||||
|
contentFilters = none(seq[ContentTopic])
|
||||||
|
|
||||||
|
var keys = initHashSet[string]()
|
||||||
|
for fieldName in readObjectFields(reader):
|
||||||
|
# Check for reapeated keys
|
||||||
|
if keys.containsOrIncl(fieldName):
|
||||||
|
let err = try: fmt"Multiple `{fieldName}` fields found"
|
||||||
|
except CatchableError: "Multiple fields with the same name found"
|
||||||
|
reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest")
|
||||||
|
|
||||||
|
case fieldName
|
||||||
|
of "pubsubTopic":
|
||||||
|
pubsubTopic = some(reader.readValue(PubsubTopic))
|
||||||
|
of "contentFilters":
|
||||||
|
contentFilters = some(reader.readValue(seq[ContentTopic]))
|
||||||
|
else:
|
||||||
|
unrecognizedFieldWarning()
|
||||||
|
|
||||||
|
if pubsubTopic.isNone():
|
||||||
|
reader.raiseUnexpectedValue("Field `pubsubTopic` is missing")
|
||||||
|
|
||||||
|
if contentFilters.isNone():
|
||||||
|
reader.raiseUnexpectedValue("Field `contentFilters` is missing")
|
||||||
|
|
||||||
|
if contentFilters.get().len() == 0:
|
||||||
|
reader.raiseUnexpectedValue("Field `contentFilters` is empty")
|
||||||
|
|
||||||
|
value = FilterSubscriptionsRequest(
|
||||||
|
pubsubTopic: pubsubTopic.get(),
|
||||||
|
contentFilters: contentFilters.get()
|
||||||
|
)
|
||||||
Loading…
x
Reference in New Issue
Block a user