mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
fix: bad HttpCode conversion, add missing lightpush v3 rest api tests (#3389)
* Fix bad HttpCode conversion, add missing lightpush v3 rest api tests
This commit is contained in:
parent
ab8a30d3d6
commit
8394c15a1a
@ -99,6 +99,7 @@ import
|
|||||||
./wakunode_rest/test_rest_relay_serdes,
|
./wakunode_rest/test_rest_relay_serdes,
|
||||||
./wakunode_rest/test_rest_serdes,
|
./wakunode_rest/test_rest_serdes,
|
||||||
./wakunode_rest/test_rest_filter,
|
./wakunode_rest/test_rest_filter,
|
||||||
|
./wakunode_rest/test_rest_lightpush,
|
||||||
./wakunode_rest/test_rest_lightpush_legacy,
|
./wakunode_rest/test_rest_lightpush_legacy,
|
||||||
./wakunode_rest/test_rest_admin,
|
./wakunode_rest/test_rest_admin,
|
||||||
./wakunode_rest/test_rest_cors,
|
./wakunode_rest/test_rest_cors,
|
||||||
|
|||||||
282
tests/wakunode_rest/test_rest_lightpush.nim
Normal file
282
tests/wakunode_rest/test_rest_lightpush.nim
Normal file
@ -0,0 +1,282 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/sequtils,
|
||||||
|
stew/byteutils,
|
||||||
|
stew/shims/net,
|
||||||
|
testutils/unittests,
|
||||||
|
presto,
|
||||||
|
presto/client as presto_client,
|
||||||
|
libp2p/crypto/crypto
|
||||||
|
|
||||||
|
import
|
||||||
|
waku/[
|
||||||
|
waku_api/message_cache,
|
||||||
|
waku_core,
|
||||||
|
waku_node,
|
||||||
|
node/peer_manager,
|
||||||
|
waku_lightpush/common,
|
||||||
|
waku_api/rest/server,
|
||||||
|
waku_api/rest/client,
|
||||||
|
waku_api/rest/responses,
|
||||||
|
waku_api/rest/lightpush/types,
|
||||||
|
waku_api/rest/lightpush/handlers as lightpush_api,
|
||||||
|
waku_api/rest/lightpush/client as lightpush_api_client,
|
||||||
|
waku_relay,
|
||||||
|
common/rate_limit/setting,
|
||||||
|
],
|
||||||
|
../testlib/wakucore,
|
||||||
|
../testlib/wakunode
|
||||||
|
|
||||||
|
proc testWakuNode(): WakuNode =
|
||||||
|
let
|
||||||
|
privkey = generateSecp256k1Key()
|
||||||
|
bindIp = parseIpAddress("0.0.0.0")
|
||||||
|
extIp = parseIpAddress("127.0.0.1")
|
||||||
|
port = Port(0)
|
||||||
|
|
||||||
|
return newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
|
||||||
|
|
||||||
|
type RestLightPushTest = object
|
||||||
|
serviceNode: WakuNode
|
||||||
|
pushNode: WakuNode
|
||||||
|
consumerNode: WakuNode
|
||||||
|
restServer: WakuRestServerRef
|
||||||
|
restClient: RestClientRef
|
||||||
|
|
||||||
|
proc init(
|
||||||
|
T: type RestLightPushTest, rateLimit: RateLimitSetting = (0, 0.millis)
|
||||||
|
): Future[T] {.async.} =
|
||||||
|
var testSetup = RestLightPushTest()
|
||||||
|
testSetup.serviceNode = testWakuNode()
|
||||||
|
testSetup.pushNode = testWakuNode()
|
||||||
|
testSetup.consumerNode = testWakuNode()
|
||||||
|
|
||||||
|
await allFutures(
|
||||||
|
testSetup.serviceNode.start(),
|
||||||
|
testSetup.pushNode.start(),
|
||||||
|
testSetup.consumerNode.start(),
|
||||||
|
)
|
||||||
|
|
||||||
|
await testSetup.consumerNode.mountRelay()
|
||||||
|
await testSetup.serviceNode.mountRelay()
|
||||||
|
await testSetup.serviceNode.mountLightPush(rateLimit)
|
||||||
|
testSetup.pushNode.mountLightPushClient()
|
||||||
|
|
||||||
|
testSetup.serviceNode.peerManager.addServicePeer(
|
||||||
|
testSetup.consumerNode.peerInfo.toRemotePeerInfo(), WakuRelayCodec
|
||||||
|
)
|
||||||
|
|
||||||
|
await testSetup.serviceNode.connectToNodes(
|
||||||
|
@[testSetup.consumerNode.peerInfo.toRemotePeerInfo()]
|
||||||
|
)
|
||||||
|
|
||||||
|
testSetup.pushNode.peerManager.addServicePeer(
|
||||||
|
testSetup.serviceNode.peerInfo.toRemotePeerInfo(), WakuLightPushCodec
|
||||||
|
)
|
||||||
|
|
||||||
|
var restPort = Port(0)
|
||||||
|
let restAddress = parseIpAddress("127.0.0.1")
|
||||||
|
testSetup.restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||||
|
restPort = testSetup.restServer.httpServer.address.port
|
||||||
|
# update with bound port for restClient use
|
||||||
|
|
||||||
|
installLightPushRequestHandler(testSetup.restServer.router, testSetup.pushNode)
|
||||||
|
|
||||||
|
testSetup.restServer.start()
|
||||||
|
|
||||||
|
testSetup.restClient = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||||
|
|
||||||
|
return testSetup
|
||||||
|
|
||||||
|
proc shutdown(self: RestLightPushTest) {.async.} =
|
||||||
|
await self.restServer.stop()
|
||||||
|
await self.restServer.closeWait()
|
||||||
|
await allFutures(
|
||||||
|
self.serviceNode.stop(), self.pushNode.stop(), self.consumerNode.stop()
|
||||||
|
)
|
||||||
|
|
||||||
|
suite "Waku v2 Rest API - lightpush":
|
||||||
|
asyncTest "Push message with proof":
|
||||||
|
let restLightPushTest = await RestLightPushTest.init()
|
||||||
|
|
||||||
|
let message: RelayWakuMessage = fakeWakuMessage(
|
||||||
|
contentTopic = DefaultContentTopic,
|
||||||
|
payload = toBytes("TEST-1"),
|
||||||
|
proof = toBytes("proof-test"),
|
||||||
|
)
|
||||||
|
.toRelayWakuMessage()
|
||||||
|
|
||||||
|
check message.proof.isSome()
|
||||||
|
|
||||||
|
let requestBody =
|
||||||
|
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
|
||||||
|
|
||||||
|
let response =
|
||||||
|
await restLightPushTest.restClient.sendPushRequest(body = requestBody)
|
||||||
|
|
||||||
|
## Validate that the push request failed because the node is not
|
||||||
|
## connected to other node but, doesn't fail because of not properly
|
||||||
|
## handling the proof message attribute within the REST request.
|
||||||
|
check:
|
||||||
|
response.status == 505
|
||||||
|
response.data.statusDesc == some("No peers for topic, skipping publish")
|
||||||
|
response.data.relayPeerCount == none[uint32]()
|
||||||
|
|
||||||
|
asyncTest "Push message request":
|
||||||
|
# Given
|
||||||
|
let restLightPushTest = await RestLightPushTest.init()
|
||||||
|
|
||||||
|
restLightPushTest.consumerNode.subscribe(
|
||||||
|
(kind: PubsubSub, topic: DefaultPubsubTopic)
|
||||||
|
)
|
||||||
|
restLightPushTest.serviceNode.subscribe(
|
||||||
|
(kind: PubsubSub, topic: DefaultPubsubTopic)
|
||||||
|
)
|
||||||
|
require:
|
||||||
|
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
|
||||||
|
|
||||||
|
# When
|
||||||
|
let message: RelayWakuMessage = fakeWakuMessage(
|
||||||
|
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
|
||||||
|
)
|
||||||
|
.toRelayWakuMessage()
|
||||||
|
|
||||||
|
let requestBody =
|
||||||
|
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
|
||||||
|
let response = await restLightPushTest.restClient.sendPushRequest(requestBody)
|
||||||
|
|
||||||
|
echo "response", $response
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.status == 200
|
||||||
|
response.data.relayPeerCount == some(1.uint32)
|
||||||
|
|
||||||
|
await restLightPushTest.shutdown()
|
||||||
|
|
||||||
|
asyncTest "Push message bad-request":
|
||||||
|
# Given
|
||||||
|
let restLightPushTest = await RestLightPushTest.init()
|
||||||
|
|
||||||
|
restLightPushTest.serviceNode.subscribe(
|
||||||
|
(kind: PubsubSub, topic: DefaultPubsubTopic)
|
||||||
|
)
|
||||||
|
require:
|
||||||
|
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
|
||||||
|
|
||||||
|
# When
|
||||||
|
let badMessage1: RelayWakuMessage = fakeWakuMessage(
|
||||||
|
contentTopic = DefaultContentTopic, payload = toBytes("")
|
||||||
|
)
|
||||||
|
.toRelayWakuMessage()
|
||||||
|
let badRequestBody1 =
|
||||||
|
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: badMessage1)
|
||||||
|
|
||||||
|
let badMessage2: RelayWakuMessage =
|
||||||
|
fakeWakuMessage(contentTopic = "", payload = toBytes("Sthg")).toRelayWakuMessage()
|
||||||
|
let badRequestBody2 =
|
||||||
|
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: badMessage2)
|
||||||
|
|
||||||
|
let badRequestBody3 =
|
||||||
|
PushRequest(pubsubTopic: none(PubsubTopic), message: badMessage2)
|
||||||
|
|
||||||
|
# var response: RestResponse[PushResponse]
|
||||||
|
|
||||||
|
var response = await restLightPushTest.restClient.sendPushRequest(badRequestBody1)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.status == 400
|
||||||
|
response.data.statusDesc.isSome()
|
||||||
|
response.data.statusDesc.get().startsWith("Invalid push request")
|
||||||
|
|
||||||
|
# when
|
||||||
|
response = await restLightPushTest.restClient.sendPushRequest(badRequestBody2)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.status == 400
|
||||||
|
response.data.statusDesc.isSome()
|
||||||
|
response.data.statusDesc.get().startsWith("Invalid push request")
|
||||||
|
|
||||||
|
# when
|
||||||
|
response = await restLightPushTest.restClient.sendPushRequest(badRequestBody3)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.data.statusDesc.isSome()
|
||||||
|
response.data.statusDesc.get().startsWith("Invalid push request")
|
||||||
|
|
||||||
|
await restLightPushTest.shutdown()
|
||||||
|
|
||||||
|
asyncTest "Request rate limit push message":
|
||||||
|
# Given
|
||||||
|
let budgetCap = 3
|
||||||
|
let tokenPeriod = 500.millis
|
||||||
|
let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod))
|
||||||
|
|
||||||
|
restLightPushTest.consumerNode.subscribe(
|
||||||
|
(kind: PubsubSub, topic: DefaultPubsubTopic)
|
||||||
|
)
|
||||||
|
restLightPushTest.serviceNode.subscribe(
|
||||||
|
(kind: PubsubSub, topic: DefaultPubsubTopic)
|
||||||
|
)
|
||||||
|
require:
|
||||||
|
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
|
||||||
|
|
||||||
|
# When
|
||||||
|
let pushProc = proc() {.async.} =
|
||||||
|
let message: RelayWakuMessage = fakeWakuMessage(
|
||||||
|
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
|
||||||
|
)
|
||||||
|
.toRelayWakuMessage()
|
||||||
|
|
||||||
|
let requestBody =
|
||||||
|
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
|
||||||
|
let response = await restLightPushTest.restClient.sendPushRequest(requestBody)
|
||||||
|
|
||||||
|
echo "response", $response
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.status == 200
|
||||||
|
response.data.relayPeerCount == some(1.uint32)
|
||||||
|
|
||||||
|
let pushRejectedProc = proc() {.async.} =
|
||||||
|
let message: RelayWakuMessage = fakeWakuMessage(
|
||||||
|
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
|
||||||
|
)
|
||||||
|
.toRelayWakuMessage()
|
||||||
|
|
||||||
|
let requestBody =
|
||||||
|
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
|
||||||
|
let response = await restLightPushTest.restClient.sendPushRequest(requestBody)
|
||||||
|
|
||||||
|
echo "response", $response
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
response.status == 429
|
||||||
|
response.data.statusDesc.isSome() # Ensure error status description is present
|
||||||
|
response.data.statusDesc.get().startsWith(
|
||||||
|
"Request rejected due to too many requests"
|
||||||
|
) # Check specific error message
|
||||||
|
|
||||||
|
await pushProc()
|
||||||
|
await pushProc()
|
||||||
|
await pushProc()
|
||||||
|
await pushRejectedProc()
|
||||||
|
|
||||||
|
await sleepAsync(tokenPeriod)
|
||||||
|
|
||||||
|
for runCnt in 0 ..< 3:
|
||||||
|
let startTime = Moment.now()
|
||||||
|
for sendCnt in 0 ..< budgetCap:
|
||||||
|
await pushProc()
|
||||||
|
|
||||||
|
let endTime = Moment.now()
|
||||||
|
let elapsed: Duration = (endTime - startTime)
|
||||||
|
await sleepAsync(tokenPeriod - elapsed + 10.millis)
|
||||||
|
|
||||||
|
await restLightPushTest.shutdown()
|
||||||
@ -274,28 +274,3 @@ suite "Waku v2 Rest API - lightpush":
|
|||||||
await sleepAsync(tokenPeriod - elapsed + 10.millis)
|
await sleepAsync(tokenPeriod - elapsed + 10.millis)
|
||||||
|
|
||||||
await restLightPushTest.shutdown()
|
await restLightPushTest.shutdown()
|
||||||
|
|
||||||
## TODO: Re-work this test when lightpush protocol change is done: https://github.com/waku-org/pm/issues/93
|
|
||||||
## This test is similar when no available peer exists for publish. Currently it is returning success,
|
|
||||||
## that makes this test not useful.
|
|
||||||
# asyncTest "Push message request service not available":
|
|
||||||
# # Given
|
|
||||||
# let restLightPushTest = await RestLightPushTest.init()
|
|
||||||
|
|
||||||
# # When
|
|
||||||
# let message : RelayWakuMessage = fakeWakuMessage(contentTopic = DefaultContentTopic,
|
|
||||||
# payload = toBytes("TEST-1")).toRelayWakuMessage()
|
|
||||||
|
|
||||||
# let requestBody = PushRequest(pubsubTopic: some("NoExistTopic"),
|
|
||||||
# message: message)
|
|
||||||
# let response = await restLightPushTest.client.sendPushRequest(requestBody)
|
|
||||||
|
|
||||||
# echo "response", $response
|
|
||||||
|
|
||||||
# # Then
|
|
||||||
# check:
|
|
||||||
# response.status == 503
|
|
||||||
# $response.contentType == $MIMETYPE_TEXT
|
|
||||||
# response.data == "Failed to request a message push: Can not publish to any peers"
|
|
||||||
|
|
||||||
# await restLightPushTest.shutdown()
|
|
||||||
|
|||||||
@ -26,18 +26,15 @@ logScope:
|
|||||||
|
|
||||||
const FutTimeoutForPushRequestProcessing* = 5.seconds
|
const FutTimeoutForPushRequestProcessing* = 5.seconds
|
||||||
|
|
||||||
const NoPeerNoDiscoError =
|
const NoPeerNoDiscoError = "No suitable service peer & no discovery method"
|
||||||
RestApiResponse.serviceUnavailable("No suitable service peer & no discovery method")
|
const NoPeerNoneFoundError = "No suitable service peer & none discovered"
|
||||||
|
|
||||||
const NoPeerNoneFoundError =
|
|
||||||
RestApiResponse.serviceUnavailable("No suitable service peer & none discovered")
|
|
||||||
|
|
||||||
proc useSelfHostedLightPush(node: WakuNode): bool =
|
proc useSelfHostedLightPush(node: WakuNode): bool =
|
||||||
return node.wakuLightPush != nil and node.wakuLightPushClient == nil
|
return node.wakuLightPush != nil and node.wakuLightPushClient == nil
|
||||||
|
|
||||||
proc convertErrorKindToHttpStatus(statusCode: LightpushStatusCode): HttpCode =
|
proc convertErrorKindToHttpStatus(statusCode: LightpushStatusCode): HttpCode =
|
||||||
## Lightpush status codes are matching HTTP status codes by design
|
## Lightpush status codes are matching HTTP status codes by design
|
||||||
return HttpCode(statusCode.int32)
|
return toHttpCode(statusCode.int).get(Http500)
|
||||||
|
|
||||||
proc makeRestResponse(response: WakuLightPushResult): RestApiResponse =
|
proc makeRestResponse(response: WakuLightPushResult): RestApiResponse =
|
||||||
var httpStatus: HttpCode = Http200
|
var httpStatus: HttpCode = Http200
|
||||||
@ -72,10 +69,11 @@ proc installLightPushRequestHandler*(
|
|||||||
debug "post", ROUTE_LIGHTPUSH, contentBody
|
debug "post", ROUTE_LIGHTPUSH, contentBody
|
||||||
|
|
||||||
let req: PushRequest = decodeRequestBody[PushRequest](contentBody).valueOr:
|
let req: PushRequest = decodeRequestBody[PushRequest](contentBody).valueOr:
|
||||||
return RestApiResponse.badRequest("Invalid push request: " & $error)
|
return
|
||||||
|
makeRestResponse(lightpushResultBadRequest("Invalid push request! " & $error))
|
||||||
|
|
||||||
let msg = req.message.toWakuMessage().valueOr:
|
let msg = req.message.toWakuMessage().valueOr:
|
||||||
return RestApiResponse.badRequest("Invalid message: " & $error)
|
return makeRestResponse(lightpushResultBadRequest("Invalid message! " & $error))
|
||||||
|
|
||||||
var toPeer = none(RemotePeerInfo)
|
var toPeer = none(RemotePeerInfo)
|
||||||
if useSelfHostedLightPush(node):
|
if useSelfHostedLightPush(node):
|
||||||
@ -83,19 +81,23 @@ proc installLightPushRequestHandler*(
|
|||||||
else:
|
else:
|
||||||
let aPeer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
let aPeer = node.peerManager.selectPeer(WakuLightPushCodec).valueOr:
|
||||||
let handler = discHandler.valueOr:
|
let handler = discHandler.valueOr:
|
||||||
return NoPeerNoDiscoError
|
return makeRestResponse(lightpushResultServiceUnavailable(NoPeerNoDiscoError))
|
||||||
|
|
||||||
let peerOp = (await handler()).valueOr:
|
let peerOp = (await handler()).valueOr:
|
||||||
return RestApiResponse.internalServerError("No value in peerOp: " & $error)
|
return makeRestResponse(
|
||||||
|
lightpushResultInternalError("No value in peerOp: " & $error)
|
||||||
|
)
|
||||||
|
|
||||||
peerOp.valueOr:
|
peerOp.valueOr:
|
||||||
return NoPeerNoneFoundError
|
return
|
||||||
|
makeRestResponse(lightpushResultServiceUnavailable(NoPeerNoneFoundError))
|
||||||
toPeer = some(aPeer)
|
toPeer = some(aPeer)
|
||||||
|
|
||||||
let subFut = node.lightpushPublish(req.pubsubTopic, msg, toPeer)
|
let subFut = node.lightpushPublish(req.pubsubTopic, msg, toPeer)
|
||||||
|
|
||||||
if not await subFut.withTimeout(FutTimeoutForPushRequestProcessing):
|
if not await subFut.withTimeout(FutTimeoutForPushRequestProcessing):
|
||||||
error "Failed to request a message push due to timeout!"
|
error "Failed to request a message push due to timeout!"
|
||||||
return RestApiResponse.serviceUnavailable("Push request timed out")
|
return
|
||||||
|
makeRestResponse(lightpushResultServiceUnavailable("Push request timed out"))
|
||||||
|
|
||||||
return makeRestResponse(subFut.value())
|
return makeRestResponse(subFut.value())
|
||||||
|
|||||||
@ -42,6 +42,12 @@ func lightpushSuccessResult*(relayPeerCount: uint32): WakuLightPushResult =
|
|||||||
func lightpushResultInternalError*(msg: string): WakuLightPushResult =
|
func lightpushResultInternalError*(msg: string): WakuLightPushResult =
|
||||||
return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, some(msg)))
|
return err((LightpushStatusCode.INTERNAL_SERVER_ERROR, some(msg)))
|
||||||
|
|
||||||
|
func lightpushResultBadRequest*(msg: string): WakuLightPushResult =
|
||||||
|
return err((LightpushStatusCode.BAD_REQUEST, some(msg)))
|
||||||
|
|
||||||
|
func lightpushResultServiceUnavailable*(msg: string): WakuLightPushResult =
|
||||||
|
return err((LightpushStatusCode.SERVICE_NOT_AVAILABLE, some(msg)))
|
||||||
|
|
||||||
func lighpushErrorResult*(
|
func lighpushErrorResult*(
|
||||||
statusCode: LightpushStatusCode, desc: Option[string]
|
statusCode: LightpushStatusCode, desc: Option[string]
|
||||||
): WakuLightPushResult =
|
): WakuLightPushResult =
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user