chore: improve POST /relay/v1/auto/messages/{topic} error handling (#2339)

This commit is contained in:
gabrielmer 2024-01-18 13:49:13 +01:00 committed by GitHub
parent 8a9fad2905
commit f841454edc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 163 additions and 65 deletions

View File

@ -208,11 +208,16 @@ proc publish(c: Chat, line: string) =
# update the last epoch
c.node.wakuRlnRelay.lastEpoch = proof.epoch
if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
else:
asyncSpawn c.node.publish(some(DefaultPubsubTopic), message)
try:
if not c.node.wakuLightPush.isNil():
# Attempt lightpush
(waitFor c.node.lightpushPublish(some(DefaultPubsubTopic), message)).isOkOr:
error "failed to publish lightpush message", error = error
else:
(waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
error "failed to publish message", error = error
except CatchableError:
error "caught error publishing message: ", error = getCurrentExceptionMsg()
# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =

View File

@ -96,7 +96,8 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])
await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)
(await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)).isOkOr:
error "failed to publish message", error = error
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):

View File

@ -110,8 +110,14 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp
await node.publish(some(pubSubTopic), message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
let res = await node.publish(some(pubSubTopic), message)
if res.isOk:
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
else:
error "failed to publish message", error = res.error
await sleepAsync(5000)
when isMainModule:

View File

@ -70,7 +70,9 @@ suite "WakuNode":
node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
await node1.publish(some(pubSubTopic), message)
var res = await node1.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error
await sleepAsync(2000.millis)
check:

View File

@ -55,7 +55,8 @@ suite "WakuNode - Lightpush":
await sleepAsync(100.millis)
## When
await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
assert res.isOk(), $res.error
## Then
check await completionFutRelay.withTimeout(5.seconds)

View File

@ -95,7 +95,8 @@ suite "WakuNode - Relay":
node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node1.publish(some(pubSubTopic), message)
var res = await node1.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error
## Then
check:
@ -176,11 +177,15 @@ suite "WakuNode - Relay":
node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node1.publish(some(pubSubTopic), message1)
var res = await node1.publish(some(pubSubTopic), message1)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
# message2 never gets relayed because of the validator
await node1.publish(some(pubSubTopic), message2)
res = await node1.publish(some(pubSubTopic), message2)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
check:
@ -257,7 +262,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
@ -298,7 +305,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
@ -343,7 +352,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
check:
@ -383,7 +394,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
check:
@ -423,7 +436,9 @@ suite "WakuNode - Relay":
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(500.millis)
await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some(pubSubTopic), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)

View File

@ -91,7 +91,7 @@ procSuite "WakuNode - RLN relay":
## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn
## verifies the rate limit proof of the message and relays the message to node3
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
await node1.publish(some(DefaultPubsubTopic), message)
discard await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)
@ -165,8 +165,8 @@ procSuite "WakuNode - RLN relay":
# publish 3 messages from node[0] (last 2 are spam, window is 10 secs)
# publish 3 messages from node[1] (last 2 are spam, window is 10 secs)
for msg in messages1: await nodes[0].publish(some(pubsubTopics[0]), msg)
for msg in messages2: await nodes[1].publish(some(pubsubTopics[1]), msg)
for msg in messages1: discard await nodes[0].publish(some(pubsubTopics[0]), msg)
for msg in messages2: discard await nodes[1].publish(some(pubsubTopics[1]), msg)
# wait for gossip to propagate
await sleepAsync(5000.millis)
@ -266,7 +266,7 @@ procSuite "WakuNode - RLN relay":
## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3
## never gets called
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
await node1.publish(some(DefaultPubsubTopic), message)
discard await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)
check:
@ -378,10 +378,10 @@ procSuite "WakuNode - RLN relay":
## node2 should detect either of wm1 or wm2 as spam and not relay it
## node2 should relay wm3 to node3
## node2 should not relay wm4 because it has no valid rln proof
await node1.publish(some(DefaultPubsubTopic), wm1)
await node1.publish(some(DefaultPubsubTopic), wm2)
await node1.publish(some(DefaultPubsubTopic), wm3)
await node1.publish(some(DefaultPubsubTopic), wm4)
discard await node1.publish(some(DefaultPubsubTopic), wm1)
discard await node1.publish(some(DefaultPubsubTopic), wm2)
discard await node1.publish(some(DefaultPubsubTopic), wm3)
discard await node1.publish(some(DefaultPubsubTopic), wm4)
await sleepAsync(2000.millis)
let
@ -474,9 +474,9 @@ procSuite "WakuNode - RLN relay":
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
await node1.publish(some(DefaultPubsubTopic), wm1)
await node1.publish(some(DefaultPubsubTopic), wm2)
await node1.publish(some(DefaultPubsubTopic), wm3)
discard await node1.publish(some(DefaultPubsubTopic), wm1)
discard await node1.publish(some(DefaultPubsubTopic), wm2)
discard await node1.publish(some(DefaultPubsubTopic), wm3)
let
res1 = await completionFut1.withTimeout(10.seconds)

View File

@ -74,7 +74,7 @@ suite "WakuNode2 - Validators":
# Include signature
msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[i].publish(some(spamProtectedTopic), msg)
discard await nodes[i].publish(some(spamProtectedTopic), msg)
# Wait for gossip
await sleepAsync(2.seconds)
@ -146,7 +146,7 @@ suite "WakuNode2 - Validators":
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[i].publish(some(spamProtectedTopic), msg)
discard await nodes[i].publish(some(spamProtectedTopic), msg)
# Each node sends 5 messages that are not signed (total = 25)
for i in 0..<5:
@ -154,7 +154,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages that dont contain timestamp (total = 25)
for i in 0..<5:
@ -162,7 +162,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: 0, ephemeral: true)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages way BEFORE than the current timestmap (total = 25)
for i in 0..<5:
@ -171,7 +171,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: beforeTimestamp, ephemeral: true)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Each node sends 5 messages way LATER than the current timestmap (total = 25)
for i in 0..<5:
@ -180,7 +180,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: afterTimestamp, ephemeral: true)
await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[i].publish(some(spamProtectedTopic), unsignedMessage)
# Wait for gossip
await sleepAsync(4.seconds)
@ -255,7 +255,7 @@ suite "WakuNode2 - Validators":
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[0].publish(some(spamProtectedTopic), unsignedMessage)
discard await nodes[0].publish(some(spamProtectedTopic), unsignedMessage)
# nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1])
for j in 0..<50:
@ -264,7 +264,7 @@ suite "WakuNode2 - Validators":
version: 2, timestamp: now(), ephemeral: true)
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[0].publish(some(spamProtectedTopic), msg)
discard await nodes[0].publish(some(spamProtectedTopic), msg)
# Wait for gossip
await sleepAsync(2.seconds)

View File

@ -198,7 +198,7 @@ suite "Waku v2 JSON-RPC API - Relay":
## When
for msg in messages:
await srcNode.publish(some(pubSubTopic), msg)
discard await srcNode.publish(some(pubSubTopic), msg)
await sleepAsync(200.millis)
@ -261,7 +261,7 @@ suite "Waku v2 JSON-RPC API - Relay":
## When
for msg in messages:
await srcNode.publish(none(PubsubTopic), msg)
discard await srcNode.publish(none(PubsubTopic), msg)
await sleepAsync(200.millis)

View File

@ -40,10 +40,12 @@ suite "Waku v2 Rest API - Relay":
await node.start()
await node.mountRelay()
let restPort = Port(58011)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
@ -88,10 +90,12 @@ suite "Waku v2 Rest API - Relay":
"pubsub-topic-x",
])
let restPort = Port(58012)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let cache = MessageCache.init()
cache.pubsubSubscribe("pubsub-topic-1")
cache.pubsubSubscribe("pubsub-topic-2")
@ -140,10 +144,12 @@ suite "Waku v2 Rest API - Relay":
await node.start()
await node.mountRelay()
let restPort = Port(58013)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let pubSubTopic = "/waku/2/default-waku/proto"
var messages = @[
@ -203,10 +209,12 @@ suite "Waku v2 Rest API - Relay":
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1")))
# RPC server setup
let restPort = Port(58014)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
@ -243,10 +251,12 @@ suite "Waku v2 Rest API - Relay":
await node.start()
await node.mountRelay()
let restPort = Port(58011)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
@ -289,10 +299,12 @@ suite "Waku v2 Rest API - Relay":
await node.start()
await node.mountRelay()
let restPort = Port(58012)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let contentTopics = @[
ContentTopic("/waku/2/default-content1/proto"),
ContentTopic("/waku/2/default-content2/proto"),
@ -335,10 +347,12 @@ suite "Waku v2 Rest API - Relay":
await node.start()
await node.mountRelay()
let restPort = Port(58013)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let contentTopic = DefaultContentTopic
var messages = @[
@ -397,10 +411,12 @@ suite "Waku v2 Rest API - Relay":
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1")))
# RPC server setup
let restPort = Port(58014)
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
@ -424,6 +440,46 @@ suite "Waku v2 Rest API - Relay":
$response.contentType == $MIMETYPE_TEXT
response.data == "OK"
await restServer.stop()
await restServer.closeWait()
await node.stop()
asyncTest "Post a message to an invalid content topic - POST /relay/v1/auto/messages/{topic}":
## "Relay API: publish and subscribe/unsubscribe":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1")))
# RPC server setup
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()
restPort = restServer.server.address.port # update with bound port for client use
let cache = MessageCache.init()
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
# When
let response = await client.relayPostAutoMessagesV1(RelayWakuMessage(
payload: base64.encode("TEST-PAYLOAD"),
contentTopic: some("invalidContentTopic"),
timestamp: some(int64(2022))
))
# Then
check:
response.status == 400
$response.contentType == $MIMETYPE_TEXT
response.data == "Failed to publish. Autosharding error: invalid format: topic must start with slash"
await restServer.stop()
await restServer.closeWait()
await node.stop()

View File

@ -316,20 +316,22 @@ proc publish*(
node: WakuNode,
pubsubTopicOp: Option[PubsubTopic],
message: WakuMessage
) {.async, gcsafe.} =
) : Future[Result[void, string]] {.async, gcsafe.} =
## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard.
## `WakuMessage` should contain a `contentTopic` field for light node functionality.
## It is also used to determine the shard.
if node.wakuRelay.isNil():
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
let msg = "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
error "publish error", msg=msg
# TODO: Improve error handling
return
return err(msg)
let pubsubTopic = pubsubTopicOp.valueOr:
getShard(message.contentTopic).valueOr:
error "Autosharding error", error=error
return
let msg = "Autosharding error: " & error
error "publish error", msg=msg
return err(msg)
#TODO instead of discard return error when 0 peers received the message
discard await node.wakuRelay.publish(pubsubTopic, message)
@ -339,6 +341,8 @@ proc publish*(
pubsubTopic=pubsubTopic,
hash=pubsubTopic.computeMessageHash(message).to0xHex(),
publishTime=getNowInNanosecondTime()
return ok()
proc startRelay*(node: WakuNode) {.async.} =
## Setup and start relay protocol
@ -942,22 +946,25 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message
return await node.wakuLightpushClient.publish($pubsub, message, peer)
# TODO: Move to application module (e.g., wakunode2.nim)
proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage): Future[void] {.async, gcsafe,
proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message: WakuMessage): Future[WakuLightPushResult[void]] {.async, gcsafe,
deprecated: "Use 'node.lightpushPublish()' instead".} =
if node.wakuLightpushClient.isNil():
error "failed to publish message", error="waku lightpush client is nil"
return
let msg = "waku lightpush client is nil"
error "failed to publish message", msg=msg
return err(msg)
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
error "failed to publish message", error="no suitable remote peers"
return
let msg = "no suitable remote peers"
error "failed to publish message", msg=msg
return err(msg)
let publishRes = await node.lightpushPublish(pubsubTopic, message, peer=peerOpt.get())
if publishRes.isOk():
return
error "failed to publish message", error=publishRes.error
if publishRes.isErr():
error "failed to publish message", error=publishRes.error
return publishRes
## Waku RLN Relay

View File

@ -236,9 +236,14 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", contentTopic=message.contentTopic, rln=not node.wakuRlnRelay.isNil()
if not (waitFor node.publish(none(PubSubTopic), message).withTimeout(futTimeout)):
error "Failed to publish message to topic", contentTopic=message.contentTopic
return RestApiResponse.internalServerError("Failed to publish: timedout")
var publishFut = node.publish(none(PubSubTopic), message)
if not await publishFut.withTimeout(futTimeout):
return RestApiResponse.internalServerError("Failed to publish: timedout")
var res = publishFut.read()
if res.isErr():
return RestApiResponse.badRequest("Failed to publish. " & res.error)
return RestApiResponse.ok()