mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore(api): validate rln message before sending (rest + rpc) (#1968)
This commit is contained in:
parent
40848871c5
commit
1b96999d55
@ -658,7 +658,7 @@ suite "Waku rln relay":
|
||||
# it is a duplicate
|
||||
result3.value == true
|
||||
|
||||
asyncTest "validateMessage test":
|
||||
asyncTest "validateMessageAndUpdateLog test":
|
||||
let index = MembershipIndex(5)
|
||||
|
||||
let rlnConf = WakuRlnConfig(rlnRelayDynamic: false,
|
||||
@ -695,13 +695,13 @@ suite "Waku rln relay":
|
||||
# validate messages
|
||||
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid)
|
||||
let
|
||||
msgValidate1 = wakuRlnRelay.validateMessage(wm1, some(time))
|
||||
msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time))
|
||||
# wm2 is published within the same Epoch as wm1 and should be found as spam
|
||||
msgValidate2 = wakuRlnRelay.validateMessage(wm2, some(time))
|
||||
msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time))
|
||||
# a valid message should be validated successfully
|
||||
msgValidate3 = wakuRlnRelay.validateMessage(wm3, some(time))
|
||||
msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time))
|
||||
# wm4 has no rln proof and should not be validated
|
||||
msgValidate4 = wakuRlnRelay.validateMessage(wm4, some(time))
|
||||
msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time))
|
||||
|
||||
|
||||
check:
|
||||
@ -750,13 +750,13 @@ suite "Waku rln relay":
|
||||
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid)
|
||||
let
|
||||
# this should be no verification, Valid
|
||||
msgValidate1 = wakuRlnRelay.validateMessage(wm1, some(time))
|
||||
msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time))
|
||||
# this should be verification, Valid
|
||||
msgValidate2 = wakuRlnRelay.validateMessage(wm2, some(time))
|
||||
msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time))
|
||||
# this should be verification, Invalid
|
||||
msgValidate3 = wakuRlnRelay.validateMessage(wm3, some(time))
|
||||
msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time))
|
||||
# this should be verification, Spam
|
||||
msgValidate4 = wakuRlnRelay.validateMessage(wm4, some(time))
|
||||
msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time))
|
||||
|
||||
check:
|
||||
msgValidate1 == MessageValidationResult.Valid
|
||||
@ -848,7 +848,7 @@ suite "Waku rln relay":
|
||||
|
||||
# getMembershipCredentials returns the credential in the keystore which matches
|
||||
# the query, in this case the query is =
|
||||
# chainId = "5" and
|
||||
# chainId = "5" and
|
||||
# address = "0x0123456789012345678901234567890123456789" and
|
||||
# treeIndex = 1
|
||||
let readKeystoreMembership = readKeystoreRes.get()
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, sequtils],
|
||||
std/[options, sequtils, tempfiles],
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
@ -21,6 +21,10 @@ import
|
||||
../testlib/wakucore,
|
||||
../testlib/wakunode
|
||||
|
||||
when defined(rln):
|
||||
import
|
||||
../../../waku/waku_rln_relay
|
||||
|
||||
|
||||
proc newTestMessageCache(): relay_api.MessageCache =
|
||||
relay_api.MessageCache.init(capacity=30)
|
||||
@ -100,6 +104,15 @@ suite "Waku v2 JSON-RPC API - Relay":
|
||||
await srcNode.mountRelay(@[pubSubTopic])
|
||||
await dstNode.mountRelay(@[pubSubTopic])
|
||||
|
||||
when defined(rln):
|
||||
await srcNode.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: 1,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1")))
|
||||
|
||||
await dstNode.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: 2,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_2")))
|
||||
|
||||
await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
|
||||
@ -139,7 +152,12 @@ suite "Waku v2 JSON-RPC API - Relay":
|
||||
response == true
|
||||
await dstHandlerFut.withTimeout(chronos.seconds(5))
|
||||
|
||||
let (topic, msg) = dstHandlerFut.read()
|
||||
var (topic, msg) = dstHandlerFut.read()
|
||||
|
||||
# proof is injected under the hood, we compare just the message
|
||||
when defined(rln):
|
||||
msg.proof = @[]
|
||||
|
||||
check:
|
||||
topic == pubSubTopic
|
||||
msg == message
|
||||
@ -171,7 +189,7 @@ suite "Waku v2 JSON-RPC API - Relay":
|
||||
|
||||
# RPC server (destination node)
|
||||
let
|
||||
rpcPort = Port(8548)
|
||||
rpcPort = Port(8549)
|
||||
ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort)
|
||||
server = newRpcHttpServer([ta])
|
||||
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/sequtils,
|
||||
std/[sequtils,tempfiles],
|
||||
stew/byteutils,
|
||||
stew/shims/net,
|
||||
testutils/unittests,
|
||||
@ -22,6 +22,9 @@ import
|
||||
../testlib/wakucore,
|
||||
../testlib/wakunode
|
||||
|
||||
when defined(rln):
|
||||
import
|
||||
../../../waku/waku_rln_relay
|
||||
|
||||
proc testWakuNode(): WakuNode =
|
||||
let
|
||||
@ -183,6 +186,10 @@ suite "Waku v2 Rest API - Relay":
|
||||
let node = testWakuNode()
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
when defined(rln):
|
||||
await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: 1,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1")))
|
||||
|
||||
# RPC server setup
|
||||
let restPort = Port(58014)
|
||||
|
||||
@ -22,7 +22,8 @@ from std/times import toUnix
|
||||
|
||||
when defined(rln):
|
||||
import
|
||||
../../../waku_rln_relay
|
||||
../../../waku_rln_relay,
|
||||
../../../waku_rln_relay/rln/wrappers
|
||||
|
||||
logScope:
|
||||
topics = "waku node jsonrpc relay_api"
|
||||
@ -77,9 +78,9 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
|
||||
|
||||
return true
|
||||
|
||||
server.rpc("post_waku_v2_relay_v1_message") do (topic: PubsubTopic, msg: WakuMessageRPC) -> bool:
|
||||
server.rpc("post_waku_v2_relay_v1_message") do (pubsubTopic: PubsubTopic, msg: WakuMessageRPC) -> bool:
|
||||
## Publishes a WakuMessage to a PubSub topic
|
||||
debug "post_waku_v2_relay_v1_message"
|
||||
debug "post_waku_v2_relay_v1_message", pubsubTopic=pubsubTopic
|
||||
|
||||
let payloadRes = base64.decode(msg.payload)
|
||||
if payloadRes.isErr():
|
||||
@ -93,18 +94,38 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
|
||||
timestamp: msg.timestamp.get(Timestamp(0)),
|
||||
ephemeral: msg.ephemeral.get(false)
|
||||
)
|
||||
|
||||
|
||||
# ensure the node is subscribed to the pubsubTopic. otherwise it risks publishing
|
||||
# to a topic with no connected peers
|
||||
if pubsubTopic notin node.wakuRelay.subscribedTopics():
|
||||
raise newException(ValueError, "Failed to publish: Node not subscribed to pubsubTopic: " & pubsubTopic)
|
||||
|
||||
# if RLN is mounted, append the proof to the message
|
||||
when defined(rln):
|
||||
if not node.wakuRlnRelay.isNil():
|
||||
let success = node.wakuRlnRelay.appendRLNProof(message,
|
||||
# append the proof to the message
|
||||
let success = node.wakuRlnRelay.appendRLNProof(message,
|
||||
float64(getTime().toUnix()))
|
||||
if not success:
|
||||
raise newException(ValueError, "Failed to append RLN proof to message")
|
||||
|
||||
let publishFut = node.publish(topic, message)
|
||||
raise newException(ValueError, "Failed to publish: error appending RLN proof to message")
|
||||
# validate the message before sending it
|
||||
let result = node.wakuRlnRelay.validateMessage(message)
|
||||
if result == MessageValidationResult.Invalid:
|
||||
raise newException(ValueError, "Failed to publish: invalid RLN proof")
|
||||
elif result == MessageValidationResult.Spam:
|
||||
raise newException(ValueError, "Failed to publish: limit exceeded, try again later")
|
||||
elif result == MessageValidationResult.Valid:
|
||||
debug "RLN proof validated successfully", pubSubTopic=pubSubTopic
|
||||
else:
|
||||
raise newException(ValueError, "Failed to publish: unknown RLN proof validation result")
|
||||
else:
|
||||
raise newException(ValueError, "Failed to publish: RLN enabled but not mounted")
|
||||
|
||||
# if we reach here its either a non-RLN message or a RLN message with a valid proof
|
||||
debug "Publishing message", pubSubTopic=pubSubTopic, rln=defined(rln)
|
||||
let publishFut = node.publish(pubsubTopic, message)
|
||||
if not await publishFut.withTimeout(futTimeout):
|
||||
raise newException(ValueError, "Failed to publish to topic " & topic)
|
||||
raise newException(ValueError, "Failed to publish: timed out")
|
||||
|
||||
return true
|
||||
|
||||
|
||||
@ -13,11 +13,20 @@ import
|
||||
presto/common
|
||||
import
|
||||
../../waku_node,
|
||||
../../../waku_relay/protocol,
|
||||
../serdes,
|
||||
../responses,
|
||||
./types,
|
||||
./topic_cache
|
||||
|
||||
from std/times import getTime
|
||||
from std/times import toUnix
|
||||
|
||||
when defined(rln):
|
||||
import
|
||||
../../../waku_rln_relay,
|
||||
../../../waku_rln_relay/rln/wrappers
|
||||
|
||||
export types
|
||||
|
||||
|
||||
@ -127,6 +136,11 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode)
|
||||
return RestApiResponse.badRequest()
|
||||
let pubSubTopic = topic.get()
|
||||
|
||||
# ensure the node is subscribed to the topic. otherwise it risks publishing
|
||||
# to a topic with no connected peers
|
||||
if pubSubTopic notin node.wakuRelay.subscribedTopics():
|
||||
return RestApiResponse.badRequest("Failed to publish: Node not subscribed to topic: " & pubsubTopic)
|
||||
|
||||
# Check the request body
|
||||
if contentBody.isNone():
|
||||
return RestApiResponse.badRequest()
|
||||
@ -144,9 +158,35 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode)
|
||||
if resMessage.isErr():
|
||||
return RestApiResponse.badRequest()
|
||||
|
||||
var message = resMessage.get()
|
||||
|
||||
# if RLN is mounted, append the proof to the message
|
||||
when defined(rln):
|
||||
if not node.wakuRlnRelay.isNil():
|
||||
# append the proof to the message
|
||||
let success = node.wakuRlnRelay.appendRLNProof(message,
|
||||
float64(getTime().toUnix()))
|
||||
if not success:
|
||||
return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message")
|
||||
|
||||
# validate the message before sending it
|
||||
let result = node.wakuRlnRelay.validateMessage(message)
|
||||
if result == MessageValidationResult.Invalid:
|
||||
return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof")
|
||||
elif result == MessageValidationResult.Spam:
|
||||
return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later")
|
||||
elif result == MessageValidationResult.Valid:
|
||||
debug "RLN proof validated successfully", pubSubTopic=pubSubTopic
|
||||
else:
|
||||
return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result")
|
||||
else:
|
||||
return RestApiResponse.internalServerError("Failed to publish: RLN enabled but not mounted")
|
||||
|
||||
# if we reach here its either a non-RLN message or a RLN message with a valid proof
|
||||
debug "Publishing message", pubSubTopic=pubSubTopic, rln=defined(rln)
|
||||
if not (waitFor node.publish(pubSubTopic, resMessage.value).withTimeout(futTimeout)):
|
||||
error "Failed to publish message to topic", topic=pubSubTopic
|
||||
return RestApiResponse.internalServerError()
|
||||
error "Failed to publish message to topic", pubSubTopic=pubSubTopic
|
||||
return RestApiResponse.internalServerError("Failed to publish: timedout")
|
||||
|
||||
return RestApiResponse.ok()
|
||||
|
||||
|
||||
@ -191,9 +191,8 @@ proc validator(pubsubTopic: string, message: messages.Message): Future[Validatio
|
||||
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
|
||||
GossipSub(w).topics.hasKey(topic)
|
||||
|
||||
iterator subscribedTopics*(w: WakuRelay): lent PubsubTopic =
|
||||
for topic in GossipSub(w).topics.keys():
|
||||
yield topic
|
||||
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
|
||||
return toSeq(GossipSub(w).topics.keys())
|
||||
|
||||
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) =
|
||||
debug "subscribe", pubsubTopic=pubsubTopic
|
||||
@ -202,7 +201,7 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle
|
||||
let wrappedHandler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} =
|
||||
let decMsg = WakuMessage.decode(data)
|
||||
if decMsg.isErr():
|
||||
# fine if triggerSelf enabled, since validators are bypassed
|
||||
# fine if triggerSelf enabled, since validators are bypassed
|
||||
error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error
|
||||
let fut = newFuture[void]()
|
||||
fut.complete()
|
||||
|
||||
@ -181,7 +181,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay,
|
||||
# track message count for metrics
|
||||
waku_rln_messages_total.inc()
|
||||
|
||||
# checks if the `msg`'s epoch is far from the current epoch
|
||||
# checks if the `msg`'s epoch is far from the current epoch
|
||||
# it corresponds to the validation of rln external nullifier
|
||||
var epoch: Epoch
|
||||
if timeOption.isSome():
|
||||
@ -190,20 +190,19 @@ proc validateMessage*(rlnPeer: WakuRLNRelay,
|
||||
# get current rln epoch
|
||||
epoch = getCurrentEpoch()
|
||||
|
||||
debug "current epoch", currentEpoch = fromEpoch(epoch)
|
||||
let
|
||||
msgEpoch = proof.epoch
|
||||
# calculate the gaps
|
||||
gap = absDiff(epoch, msgEpoch)
|
||||
|
||||
debug "message epoch", msgEpoch = fromEpoch(msgEpoch)
|
||||
debug "epoch info", currentEpoch = fromEpoch(epoch), msgEpoch = fromEpoch(msgEpoch)
|
||||
|
||||
# validate the epoch
|
||||
if gap > MaxEpochGap:
|
||||
# message's epoch is too old or too ahead
|
||||
# accept messages whose epoch is within +-MaxEpochGap from the current epoch
|
||||
warn "invalid message: epoch gap exceeds a threshold", gap = gap,
|
||||
payload = string.fromBytes(msg.payload), msgEpoch = fromEpoch(proof.epoch)
|
||||
payloadLen = msg.payload.len, msgEpoch = fromEpoch(proof.epoch)
|
||||
waku_rln_invalid_messages_total.inc(labelValues=["invalid_epoch"])
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
@ -224,11 +223,11 @@ proc validateMessage*(rlnPeer: WakuRLNRelay,
|
||||
|
||||
if proofVerificationRes.isErr():
|
||||
waku_rln_errors_total.inc(labelValues=["proof_verification"])
|
||||
warn "invalid message: proof verification failed", payload = string.fromBytes(msg.payload)
|
||||
warn "invalid message: proof verification failed", payloadLen = msg.payload.len
|
||||
return MessageValidationResult.Invalid
|
||||
if not proofVerificationRes.value():
|
||||
# invalid proof
|
||||
debug "invalid message: invalid proof", payload = string.fromBytes(msg.payload)
|
||||
debug "invalid message: invalid proof", payloadLen = msg.payload.len
|
||||
waku_rln_invalid_messages_total.inc(labelValues=["invalid_proof"])
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
@ -241,19 +240,39 @@ proc validateMessage*(rlnPeer: WakuRLNRelay,
|
||||
if hasDup.isErr():
|
||||
waku_rln_errors_total.inc(labelValues=["duplicate_check"])
|
||||
elif hasDup.value == true:
|
||||
debug "invalid message: message is spam", payload = string.fromBytes(msg.payload)
|
||||
debug "invalid message: message is spam", payloadLen = msg.payload.len
|
||||
waku_rln_spam_messages_total.inc()
|
||||
return MessageValidationResult.Spam
|
||||
|
||||
# insert the message to the log
|
||||
# the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e.,
|
||||
# it will never error out
|
||||
discard rlnPeer.updateLog(proofMetadataRes.get())
|
||||
debug "message is valid", payload = string.fromBytes(msg.payload)
|
||||
debug "message is valid", payloadLen = msg.payload.len
|
||||
let rootIndex = rlnPeer.groupManager.indexOfRoot(proof.merkleRoot)
|
||||
waku_rln_valid_messages_total.observe(rootIndex.toFloat())
|
||||
return MessageValidationResult.Valid
|
||||
|
||||
proc validateMessageAndUpdateLog*(
|
||||
rlnPeer: WakuRLNRelay,
|
||||
msg: WakuMessage,
|
||||
timeOption = none(float64)): MessageValidationResult =
|
||||
## validates the message and updates the log to prevent double messaging
|
||||
## in future messages
|
||||
|
||||
let result = rlnPeer.validateMessage(msg, timeOption)
|
||||
|
||||
let decodeRes = RateLimitProof.init(msg.proof)
|
||||
if decodeRes.isErr():
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
let msgProof = decodeRes.get()
|
||||
let proofMetadataRes = msgProof.extractMetadata()
|
||||
|
||||
if proofMetadataRes.isErr():
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
# insert the message to the log (never errors)
|
||||
discard rlnPeer.updateLog(proofMetadataRes.get())
|
||||
|
||||
return result
|
||||
|
||||
proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] =
|
||||
## it is a utility proc that prepares the `data` parameter of the proof generation procedure i.e., `proofGen` that resides in the current module
|
||||
## it extracts the `contentTopic` and the `payload` of the supplied `wakumessage` and serializes them into a byte sequence
|
||||
@ -309,9 +328,10 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
|
||||
|
||||
let msgProof = decodeRes.get()
|
||||
|
||||
# validate the message
|
||||
# validate the message and update log
|
||||
let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(wakumessage)
|
||||
|
||||
let
|
||||
validationRes = wakuRlnRelay.validateMessage(wakumessage)
|
||||
proof = toHex(msgProof.proof)
|
||||
epoch = fromEpoch(msgProof.epoch)
|
||||
root = inHex(msgProof.merkleRoot)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user