feat: Rest API interface for legacy (v1) filter service. (#1851)

* Added Rest API interface for legacy (v1) filter service with tests.
This commit is contained in:
NagyZoltanPeter 2023-08-04 11:34:22 +02:00 committed by GitHub
parent 42d7105530
commit cdc5423a86
7 changed files with 581 additions and 18 deletions

3
.gitignore vendored
View File

@ -38,6 +38,9 @@ node_modules/
# Ignore Jetbrains IDE files
.idea/
# ignore vscode files
.vscode/
# RLN / keystore
rlnKeystore.json
*.tar.gz

View File

@ -45,6 +45,7 @@ import
../../waku/v2/node/rest/debug/handlers as rest_debug_api,
../../waku/v2/node/rest/relay/handlers as rest_relay_api,
../../waku/v2/node/rest/relay/topic_cache,
../../waku/v2/node/rest/filter/handlers as rest_filter_api,
../../waku/v2/node/rest/store/handlers as rest_store_api,
../../waku/v2/node/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/v2/node/jsonrpc/debug/handlers as rpc_debug_api,
@ -566,6 +567,11 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, app.node, relayCache)
## Filter REST API
if conf.filter:
let filterCache = rest_filter_api.MessageCache.init(capacity=rest_filter_api.filterMessageCacheDefaultCapacity)
installFilterApiHandlers(server.router, app.node, filterCache)
## Store REST API
installStoreApiHandlers(server.router, app.node)

View File

@ -0,0 +1,191 @@
{.used.}
import
std/sequtils,
stew/byteutils,
stew/shims/net,
testutils/unittests,
presto, presto/client as presto_client,
libp2p/crypto/crypto
import
../../waku/v2/node/message_cache,
../../waku/common/base64,
../../waku/v2/waku_core,
../../waku/v2/waku_node,
../../waku/v2/node/peer_manager,
../../waku/v2/waku_filter,
../../waku/v2/node/rest/server,
../../waku/v2/node/rest/client,
../../waku/v2/node/rest/responses,
../../waku/v2/node/rest/filter/types,
../../waku/v2/node/rest/filter/handlers as filter_api,
../../waku/v2/node/rest/filter/client as filter_api_client,
../../waku/v2/waku_relay,
../testlib/wakucore,
../testlib/wakunode
proc testWakuNode(): WakuNode =
let
privkey = generateSecp256k1Key()
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
port = Port(0)
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
type RestFilterTest = object
node1: WakuNode
node2: WakuNode
restServer: RestServerRef
messageCache: filter_api.MessageCache
client: RestClientRef
proc setupRestFilter(): Future[RestFilterTest] {.async.} =
result.node1 = testWakuNode()
result.node2 = testWakuNode()
await allFutures(result.node1.start(), result.node2.start())
await result.node1.mountFilter()
await result.node2.mountFilterClient()
result.node2.peerManager.addServicePeer(result.node1.peerInfo.toRemotePeerInfo(), WakuFilterCodec)
let restPort = Port(58011)
let restAddress = ValidIpAddress.init("0.0.0.0")
result.restServer = RestServerRef.init(restAddress, restPort).tryGet()
result.messageCache = filter_api.MessageCache.init(capacity=filter_api.filterMessageCacheDefaultCapacity)
installFilterPostSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterDeleteSubscriptionsV1Handler(result.restServer.router, result.node2, result.messageCache)
installFilterGetMessagesV1Handler(result.restServer.router, result.node2, result.messageCache)
result.restServer.start()
result.client = newRestHttpClient(initTAddress(restAddress, restPort))
return result
proc shutdown(self: RestFilterTest) {.async.} =
await self.restServer.stop()
await self.restServer.closeWait()
await allFutures(self.node1.stop(), self.node2.stop())
suite "Waku v2 Rest API - Filter":
asyncTest "Subscribe a node to an array of topics - POST /filter/v1/subscriptions":
# Given
let restFilterTest: RestFilterTest = await setupRestFilter()
# When
let contentFilters = @[DefaultContentTopic
,ContentTopic("2")
,ContentTopic("3")
,ContentTopic("4")
]
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
let response = await restFilterTest.client.filterPostSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
restFilterTest.messageCache.isSubscribed(DefaultContentTopic)
restFilterTest.messageCache.isSubscribed("2")
restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
# When - error case
let badRequestBody = FilterSubscriptionsRequest(contentFilters: @[], pubsubTopic: "")
let badResponse = await restFilterTest.client.filterPostSubscriptionsV1(badRequestBody)
check:
badResponse.status == 400
$badResponse.contentType == $MIMETYPE_TEXT
badResponse.data == "Invalid content body, could not decode. Unable to deserialize data"
await restFilterTest.shutdown()
asyncTest "Unsubscribe a node from an array of topics - DELETE /filter/v1/subscriptions":
# Given
let
restFilterTest: RestFilterTest = await setupRestFilter()
# When
restFilterTest.messageCache.subscribe("1")
restFilterTest.messageCache.subscribe("2")
restFilterTest.messageCache.subscribe("3")
restFilterTest.messageCache.subscribe("4")
let contentFilters = @[ContentTopic("1")
,ContentTopic("2")
,ContentTopic("3")
# ,ContentTopic("4") # Keep this subscription for check
]
# When
let requestBody = FilterSubscriptionsRequest(contentFilters: contentFilters,
pubsubTopic: DefaultPubsubTopic)
let response = await restFilterTest.client.filterDeleteSubscriptionsV1(requestBody)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
check:
not restFilterTest.messageCache.isSubscribed("1")
not restFilterTest.messageCache.isSubscribed("2")
not restFilterTest.messageCache.isSubscribed("3")
restFilterTest.messageCache.isSubscribed("4")
await restFilterTest.shutdown()
asyncTest "Get the latest messages for topic - GET /filter/v1/messages/{contentTopic}":
# Given
let
restFilterTest = await setupRestFilter()
let pubSubTopic = "/waku/2/default-waku/proto"
let contentTopic = ContentTopic( "content-topic-x" )
let messages = @[
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")),
]
restFilterTest.messageCache.subscribe(contentTopic)
for msg in messages:
restFilterTest.messageCache.addMessage(contentTopic, msg)
# When
let response = await restFilterTest.client.filterGetMessagesV1(contentTopic)
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.len == 3
response.data.all do (msg: FilterWakuMessage) -> bool:
msg.payload == base64.encode("TEST-1") and
msg.contentTopic.get().string == "content-topic-x" and
msg.version.get() == 2 and
msg.timestamp.get() != Timestamp(0)
await restFilterTest.shutdown()

View File

@ -0,0 +1,68 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sets,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../../waku_core,
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest client"
proc encodeBytes*(value: FilterSubscriptionsRequest,
contentType: string): RestResult[seq[byte]] =
if MediaType.init(contentType) != MIMETYPE_JSON:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
let encoded = ?encodeIntoJsonBytes(value)
return ok(encoded)
proc decodeBytes*(t: typedesc[string], value: openarray[byte],
contentType: Opt[ContentTypeData]): RestResult[string] =
if MediaType.init($contentType) != MIMETYPE_TEXT:
error "Unsupported contentType value", contentType = contentType
return err("Unsupported contentType")
var res: string
if len(value) > 0:
res = newString(len(value))
copyMem(addr res[0], unsafeAddr value[0], len(value))
return ok(res)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterPostSubscriptionsV1*(body: FilterSubscriptionsRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodPost.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterDeleteSubscriptionsV1*(body: FilterSubscriptionsRequest):
RestResponse[string]
{.rest, endpoint: "/filter/v1/subscriptions", meth: HttpMethod.MethodDelete.}
proc decodeBytes*(t: typedesc[FilterGetMessagesResponse],
data: openArray[byte],
contentType: Opt[ContentTypeData]): RestResult[FilterGetMessagesResponse] =
if MediaType.init($contentType) != MIMETYPE_JSON:
error "Unsupported response contentType value", contentType = contentType
return err("Unsupported response contentType")
let decoded = ?decodeFromJsonBytes(FilterGetMessagesResponse, data)
return ok(decoded)
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc filterGetMessagesV1*(contentTopic: string):
RestResponse[FilterGetMessagesResponse]
{.rest, endpoint: "/filter/v1/messages/{contentTopic}", meth: HttpMethod.MethodGet.}

View File

@ -0,0 +1,155 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/sequtils,
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
presto/route,
presto/common
import
../../../waku_core,
../../../waku_filter,
../../../waku_filter/client,
../../message_cache,
../../peer_manager,
../../waku_node,
../serdes,
../responses,
./types
export types
logScope:
topics = "waku node rest filter_api"
const futTimeoutForSubscriptionProcessing* = 5.seconds
#### Request handlers
const ROUTE_FILTER_SUBSCRIPTIONSV1* = "/filter/v1/subscriptions"
const filterMessageCacheDefaultCapacity* = 30
type
MessageCache* = message_cache.MessageCache[ContentTopic]
func decodeRequestBody[T](contentBody: Option[ContentBody]) : Result[T, RestApiResponse] =
if contentBody.isNone():
return err(RestApiResponse.badRequest("Missing content body"))
let reqBodyContentType = MediaType.init($contentBody.get().contentType)
if reqBodyContentType != MIMETYPE_JSON:
return err(RestApiResponse.badRequest("Wrong Content-Type, expected application/json"))
let reqBodyData = contentBody.get().data
let requestResult = decodeFromJsonBytes(T, reqBodyData)
if requestResult.isErr():
return err(RestApiResponse.badRequest("Invalid content body, could not decode. " &
$requestResult.error))
return ok(requestResult.get())
proc installFilterPostSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
let pushHandler: FilterPushHandler =
proc(pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async, gcsafe, closure.} =
cache.addMessage(msg.contentTopic, msg)
router.api(MethodPost, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a pubsubTopic
# debug "post_waku_v2_filter_v1_subscriptions"
let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
let req: FilterSubscriptionsRequest = decodedBody.value()
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
return RestApiResponse.internalServerError("No suitable remote filter peers")
let subFut = node.filterSubscribe(req.pubsubTopic,
req.contentFilters,
pushHandler,
peerOpt.get())
if not await subFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to subscribe to contentFilters do to timeout!"
return RestApiResponse.internalServerError("Failed to subscribe to contentFilters")
# Successfully subscribed to all content filters
for cTopic in req.contentFilters:
cache.subscribe(cTopic)
return RestApiResponse.ok()
proc installFilterDeleteSubscriptionsV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodDelete, ROUTE_FILTER_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse:
## Subscribes a node to a list of contentTopics of a PubSub topic
# debug "delete_waku_v2_filter_v1_subscriptions"
let decodedBody = decodeRequestBody[FilterSubscriptionsRequest](contentBody)
if decodedBody.isErr():
return decodedBody.error
let req: FilterSubscriptionsRequest = decodedBody.value()
let unsubFut = node.unsubscribe(req.pubsubTopic, req.contentFilters)
if not await unsubFut.withTimeout(futTimeoutForSubscriptionProcessing):
error "Failed to unsubscribe from contentFilters due to timeout!"
return RestApiResponse.internalServerError("Failed to unsubscribe from contentFilters")
for cTopic in req.contentFilters:
cache.unsubscribe(cTopic)
# Successfully unsubscribed from all requested contentTopics
return RestApiResponse.ok()
const ROUTE_RELAY_MESSAGESV1* = "/filter/v1/messages/{contentTopic}"
proc installFilterGetMessagesV1Handler*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (contentTopic: string) -> RestApiResponse:
## Returns all WakuMessages received on a specified content topic since the
## last time this method was called
## TODO: ability to specify a return message limit
# debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
if contentTopic.isErr():
return RestApiResponse.badRequest("Missing contentTopic")
let contentTopic = contentTopic.get()
let msgRes = cache.getMessages(contentTopic, clear=true)
if msgRes.isErr():
return RestApiResponse.badRequest("Not subscribed to topic: " & contentTopic)
let data = FilterGetMessagesResponse(msgRes.get().map(toFilterWakuMessage))
let resp = RestApiResponse.jsonResponse(data, status=Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error=resp.error
return RestApiResponse.internalServerError("An error ocurred while building the json respose")
return resp.get()
proc installFilterApiHandlers*(router: var RestRouter,
node: WakuNode,
cache: MessageCache) =
installFilterPostSubscriptionsV1Handler(router, node, cache)
installFilterDeleteSubscriptionsV1Handler(router, node, cache)
installFilterGetMessagesV1Handler(router, node, cache)

View File

@ -21,7 +21,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterPostSubscriptionsRequest'
$ref: '#/components/schemas/FilterSubscriptionsRequest'
responses:
'200':
description: OK
@ -45,7 +45,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterDeleteSubscriptionsRequest'
$ref: '#/components/schemas/FilterSubscriptionsRequest'
responses:
'200':
description: OK
@ -61,7 +61,8 @@ paths:
'5XX':
description: Unexpected error.
/filter/v1/messages/{topic}:
# TODO: Review the path of this endpoint due maybe query for list of contentTopics matching
/filter/v1/messages/{contentTopic}:
get: # get_waku_v2_filter_v1_messages
summary: Get the latest messages on the polled content topic
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
@ -70,11 +71,11 @@ paths:
- filter
parameters:
- in: path
name: topic # Note the name is the same as in the path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: The user ID
description: Content topic of message
responses:
'200':
description: The latest messages on the polled topic.
@ -112,19 +113,7 @@ components:
required:
- payload
FilterPostSubscriptionsRequest:
type: object
properties:
contentFilters:
type: array
items:
$ref: '#/components/schemas/ContentTopic'
pubsubTopic:
$ref: "#/components/schemas/PubSubTopic"
required:
- contentFilters
FilterDeleteSubscriptionsRequest:
FilterSubscriptionsRequest:
type: object
properties:
contentFilters:

View File

@ -0,0 +1,151 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[sets, strformat],
chronicles,
json_serialization,
json_serialization/std/options,
presto/[route, client, common]
import
../../../../common/base64,
../../../waku_core,
../serdes
#### Types
type FilterWakuMessage* = object
payload*: Base64String
contentTopic*: Option[ContentTopic]
version*: Option[Natural]
timestamp*: Option[int64]
type FilterGetMessagesResponse* = seq[FilterWakuMessage]
type FilterSubscriptionsRequest* = object
pubsubTopic*: PubSubTopic
contentFilters*: seq[ContentTopic]
#### Type conversion
proc toFilterWakuMessage*(msg: WakuMessage): FilterWakuMessage =
FilterWakuMessage(
payload: base64.encode(msg.payload),
contentTopic: some(msg.contentTopic),
version: some(Natural(msg.version)),
timestamp: some(msg.timestamp)
)
proc toWakuMessage*(msg: FilterWakuMessage, version = 0): Result[WakuMessage, string] =
let
payload = ?msg.payload.decode()
contentTopic = msg.contentTopic.get(DefaultContentTopic)
version = uint32(msg.version.get(version))
timestamp = msg.timestamp.get(0)
ok(WakuMessage(payload: payload, contentTopic: contentTopic, version: version, timestamp: timestamp))
#### Serialization and deserialization
proc writeValue*(writer: var JsonWriter[RestJson], value: Base64String)
{.raises: [IOError].} =
writer.writeValue(string(value))
proc writeValue*(writer: var JsonWriter[RestJson], value: FilterWakuMessage)
{.raises: [IOError].} =
writer.beginRecord()
writer.writeField("payload", value.payload)
if value.contentTopic.isSome:
writer.writeField("contentTopic", value.contentTopic)
if value.version.isSome:
writer.writeField("version", value.version)
if value.timestamp.isSome:
writer.writeField("timestamp", value.timestamp)
writer.endRecord()
proc writeValue*(writer: var JsonWriter[RestJson], value: FilterSubscriptionsRequest)
{.raises: [IOError].} =
writer.beginRecord()
writer.writeField("pubsubTopic", value.pubsubTopic)
writer.writeField("contentFilters", value.contentFilters)
writer.endRecord()
proc readValue*(reader: var JsonReader[RestJson], value: var Base64String)
{.raises: [SerializationError, IOError].} =
value = Base64String(reader.readValue(string))
proc readValue*(reader: var JsonReader[RestJson], value: var FilterWakuMessage)
{.raises: [SerializationError, IOError].} =
var
payload = none(Base64String)
contentTopic = none(ContentTopic)
version = none(Natural)
timestamp = none(int64)
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterWakuMessage")
case fieldName
of "payload":
payload = some(reader.readValue(Base64String))
of "contentTopic":
contentTopic = some(reader.readValue(ContentTopic))
of "version":
version = some(reader.readValue(Natural))
of "timestamp":
timestamp = some(reader.readValue(int64))
else:
unrecognizedFieldWarning()
if payload.isNone():
reader.raiseUnexpectedValue("Field `payload` is missing")
value = FilterWakuMessage(
payload: payload.get(),
contentTopic: contentTopic,
version: version,
timestamp: timestamp
)
proc readValue*(reader: var JsonReader[RestJson], value: var FilterSubscriptionsRequest)
{.raises: [SerializationError, IOError].} =
var
pubsubTopic = none(PubsubTopic)
contentFilters = none(seq[ContentTopic])
var keys = initHashSet[string]()
for fieldName in readObjectFields(reader):
# Check for reapeated keys
if keys.containsOrIncl(fieldName):
let err = try: fmt"Multiple `{fieldName}` fields found"
except CatchableError: "Multiple fields with the same name found"
reader.raiseUnexpectedField(err, "FilterSubscriptionsRequest")
case fieldName
of "pubsubTopic":
pubsubTopic = some(reader.readValue(PubsubTopic))
of "contentFilters":
contentFilters = some(reader.readValue(seq[ContentTopic]))
else:
unrecognizedFieldWarning()
if pubsubTopic.isNone():
reader.raiseUnexpectedValue("Field `pubsubTopic` is missing")
if contentFilters.isNone():
reader.raiseUnexpectedValue("Field `contentFilters` is missing")
if contentFilters.get().len() == 0:
reader.raiseUnexpectedValue("Field `contentFilters` is empty")
value = FilterSubscriptionsRequest(
pubsubTopic: pubsubTopic.get(),
contentFilters: contentFilters.get()
)