mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
fix: Do not allow invalid pubsub topic subscription via relay REST api (#3559)
* Check input pubsub topics for REST /relay/v1/subscriptions endpoint
This commit is contained in:
parent
8c73de095b
commit
d74d29e2f1
@ -44,6 +44,7 @@ suite "Waku v2 Rest API - Relay":
|
|||||||
assert false, "Failed to mount relay"
|
assert false, "Failed to mount relay"
|
||||||
|
|
||||||
var restPort = Port(0)
|
var restPort = Port(0)
|
||||||
|
|
||||||
let restAddress = parseIpAddress("0.0.0.0")
|
let restAddress = parseIpAddress("0.0.0.0")
|
||||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||||
|
|
||||||
@ -61,8 +62,23 @@ suite "Waku v2 Rest API - Relay":
|
|||||||
|
|
||||||
let shards = @[$shard0, $shard1, $shard2]
|
let shards = @[$shard0, $shard1, $shard2]
|
||||||
|
|
||||||
# When
|
let invalidTopic = "/test/2/this/is/a/content/topic/1"
|
||||||
|
|
||||||
|
var containsIncorrect = shards
|
||||||
|
containsIncorrect.add(invalidTopic)
|
||||||
|
|
||||||
|
# When contains incorrect pubsub topics, subscribe shall fail
|
||||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||||
|
let errorResponse = await client.relayPostSubscriptionsV1(containsIncorrect)
|
||||||
|
|
||||||
|
# Then
|
||||||
|
check:
|
||||||
|
errorResponse.status == 400
|
||||||
|
$errorResponse.contentType == $MIMETYPE_TEXT
|
||||||
|
errorResponse.data ==
|
||||||
|
"Invalid pubsub topic(s): @[\"/test/2/this/is/a/content/topic/1\"]"
|
||||||
|
|
||||||
|
# when all pubsub topics are correct, subscribe shall succeed
|
||||||
let response = await client.relayPostSubscriptionsV1(shards)
|
let response = await client.relayPostSubscriptionsV1(shards)
|
||||||
|
|
||||||
# Then
|
# Then
|
||||||
|
|||||||
@ -129,9 +129,10 @@ proc getShardsGetter(node: WakuNode): GetShards =
|
|||||||
# fetch pubsubTopics subscribed to relay and convert them to shards
|
# fetch pubsubTopics subscribed to relay and convert them to shards
|
||||||
if node.wakuRelay.isNil():
|
if node.wakuRelay.isNil():
|
||||||
return @[]
|
return @[]
|
||||||
let subTopics = node.wakuRelay.subscribedTopics()
|
let subscribedTopics = node.wakuRelay.subscribedTopics()
|
||||||
let relayShards = topicsToRelayShards(subTopics).valueOr:
|
let relayShards = topicsToRelayShards(subscribedTopics).valueOr:
|
||||||
error "could not convert relay topics to shards", error = $error
|
error "could not convert relay topics to shards",
|
||||||
|
error = $error, topics = subscribedTopics
|
||||||
return @[]
|
return @[]
|
||||||
if relayShards.isSome():
|
if relayShards.isSome():
|
||||||
let shards = relayShards.get().shardIds
|
let shards = relayShards.get().shardIds
|
||||||
|
|||||||
@ -41,6 +41,15 @@ const ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1* = "/relay/v1/auto/subscriptions"
|
|||||||
const ROUTE_RELAY_AUTO_MESSAGESV1* = "/relay/v1/auto/messages/{contentTopic}"
|
const ROUTE_RELAY_AUTO_MESSAGESV1* = "/relay/v1/auto/messages/{contentTopic}"
|
||||||
const ROUTE_RELAY_AUTO_MESSAGESV1_NO_TOPIC* = "/relay/v1/auto/messages"
|
const ROUTE_RELAY_AUTO_MESSAGESV1_NO_TOPIC* = "/relay/v1/auto/messages"
|
||||||
|
|
||||||
|
proc validatePubSubTopics(topics: seq[PubsubTopic]): Result[void, RestApiResponse] =
|
||||||
|
let badPubSubTopics = topics.filterIt(RelayShard.parseStaticSharding(it).isErr())
|
||||||
|
if badPubSubTopics.len > 0:
|
||||||
|
error "Invalid pubsub topic(s)", PubSubTopics = $badPubSubTopics
|
||||||
|
return
|
||||||
|
err(RestApiResponse.badRequest("Invalid pubsub topic(s): " & $badPubSubTopics))
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
proc installRelayApiHandlers*(
|
proc installRelayApiHandlers*(
|
||||||
router: var RestRouter, node: WakuNode, cache: MessageCache
|
router: var RestRouter, node: WakuNode, cache: MessageCache
|
||||||
) =
|
) =
|
||||||
@ -61,6 +70,9 @@ proc installRelayApiHandlers*(
|
|||||||
let req: seq[PubsubTopic] = decodeRequestBody[seq[PubsubTopic]](contentBody).valueOr:
|
let req: seq[PubsubTopic] = decodeRequestBody[seq[PubsubTopic]](contentBody).valueOr:
|
||||||
return error
|
return error
|
||||||
|
|
||||||
|
validatePubSubTopics(req).isOkOr:
|
||||||
|
return error
|
||||||
|
|
||||||
# Only subscribe to topics for which we have no subscribed topic handlers yet
|
# Only subscribe to topics for which we have no subscribed topic handlers yet
|
||||||
let newTopics = req.filterIt(not cache.isPubsubSubscribed(it))
|
let newTopics = req.filterIt(not cache.isPubsubSubscribed(it))
|
||||||
|
|
||||||
@ -87,6 +99,9 @@ proc installRelayApiHandlers*(
|
|||||||
let req: seq[PubsubTopic] = decodeRequestBody[seq[PubsubTopic]](contentBody).valueOr:
|
let req: seq[PubsubTopic] = decodeRequestBody[seq[PubsubTopic]](contentBody).valueOr:
|
||||||
return error
|
return error
|
||||||
|
|
||||||
|
validatePubSubTopics(req).isOkOr:
|
||||||
|
return error
|
||||||
|
|
||||||
# Unsubscribe all handlers from requested topics
|
# Unsubscribe all handlers from requested topics
|
||||||
for pubsubTopic in req:
|
for pubsubTopic in req:
|
||||||
cache.pubsubUnsubscribe(pubsubTopic)
|
cache.pubsubUnsubscribe(pubsubTopic)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user