mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-04 11:05:38 +00:00
feat(relay): ordered validator execution (#1966)
* feat(relay): ordered validator execution * fix: make more readable * test: ignore accepts only * fix: idempotent .subscribe * fix(rln-relay): make validators private Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> * fix: include comments, unsubscribe behaviour * fix: compilation --------- Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
parent
97a7c9d04d
commit
debc5f19d9
@ -52,18 +52,16 @@ proc withinTimeWindow*(msg: WakuMessage): bool =
|
||||
proc addSignedTopicValidator*(w: WakuRelay, topic: PubsubTopic, publicTopicKey: SkPublicKey) =
|
||||
debug "adding validator to signed topic", topic=topic, publicTopicKey=publicTopicKey
|
||||
|
||||
proc validator(topic: string, message: messages.Message): Future[errors.ValidationResult] {.async.} =
|
||||
let msg = WakuMessage.decode(message.data)
|
||||
proc validator(topic: string, msg: WakuMessage): Future[errors.ValidationResult] {.async.} =
|
||||
var outcome = errors.ValidationResult.Reject
|
||||
|
||||
if msg.isOk():
|
||||
if msg.get.timestamp != 0:
|
||||
if msg.get.withinTimeWindow():
|
||||
let msgHash = SkMessage(topic.msgHash(msg.get))
|
||||
let recoveredSignature = SkSignature.fromRaw(msg.get.meta)
|
||||
if recoveredSignature.isOk():
|
||||
if recoveredSignature.get.verify(msgHash, publicTopicKey):
|
||||
outcome = errors.ValidationResult.Accept
|
||||
if msg.timestamp != 0:
|
||||
if msg.withinTimeWindow():
|
||||
let msgHash = SkMessage(topic.msgHash(msg))
|
||||
let recoveredSignature = SkSignature.fromRaw(msg.meta)
|
||||
if recoveredSignature.isOk():
|
||||
if recoveredSignature.get.verify(msgHash, publicTopicKey):
|
||||
outcome = errors.ValidationResult.Accept
|
||||
|
||||
waku_msg_validator_signed_outcome.inc(labelValues = [$outcome])
|
||||
return outcome
|
||||
|
@ -201,13 +201,9 @@ suite "Waku Relay":
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
# Validator
|
||||
proc validator(topic: PubsubTopic, msg: Message): Future[ValidationResult] {.async.} =
|
||||
let msg = WakuMessage.decode(msg.data)
|
||||
if msg.isErr():
|
||||
return ValidationResult.Ignore
|
||||
|
||||
proc validator(topic: PubsubTopic, msg: WakuMessage): Future[ValidationResult] {.async.} =
|
||||
# only relay messages with contentTopic1
|
||||
if msg.value.contentTopic != contentTopic:
|
||||
if msg.contentTopic != contentTopic:
|
||||
return ValidationResult.Reject
|
||||
|
||||
return ValidationResult.Accept
|
||||
|
@ -158,18 +158,13 @@ suite "WakuNode - Relay":
|
||||
var completionFutValidatorRej = newFuture[bool]()
|
||||
|
||||
# set a topic validator for pubSubTopic
|
||||
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
|
||||
proc validator(topic: string, msg: WakuMessage): Future[ValidationResult] {.async.} =
|
||||
## the validator that only allows messages with contentTopic1 to be relayed
|
||||
check:
|
||||
topic == pubSubTopic
|
||||
|
||||
let msg = WakuMessage.decode(message.data)
|
||||
if msg.isErr():
|
||||
completionFutValidatorAcc.complete(false)
|
||||
return ValidationResult.Reject
|
||||
|
||||
# only relay messages with contentTopic1
|
||||
if msg.value.contentTopic != contentTopic1:
|
||||
if msg.contentTopic != contentTopic1:
|
||||
completionFutValidatorRej.complete(true)
|
||||
return ValidationResult.Reject
|
||||
|
||||
|
@ -710,61 +710,6 @@ suite "Waku rln relay":
|
||||
msgValidate3 == MessageValidationResult.Valid
|
||||
msgValidate4 == MessageValidationResult.Invalid
|
||||
|
||||
asyncTest "should validate invalid proofs if bandwidth is available":
|
||||
let index = MembershipIndex(5)
|
||||
|
||||
let rlnConf = WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(index),
|
||||
rlnRelayBandwidthThreshold: 4,
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_3"))
|
||||
let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf)
|
||||
require:
|
||||
wakuRlnRelayRes.isOk()
|
||||
let wakuRlnRelay = wakuRlnRelayRes.get()
|
||||
|
||||
# get the current epoch time
|
||||
let time = epochTime()
|
||||
|
||||
# create some messages from the same peer and append rln proof to them, except wm4
|
||||
var
|
||||
# this one will pass through the bandwidth threshold
|
||||
wm1 = WakuMessage(payload: "Spam".toBytes())
|
||||
# this message, will be over the bandwidth threshold, hence has to be verified
|
||||
wm2 = WakuMessage(payload: "Valid message".toBytes())
|
||||
# this message will be over the bandwidth threshold, hence has to be verified, will be false (since no proof)
|
||||
wm3 = WakuMessage(payload: "Invalid message".toBytes())
|
||||
wm4 = WakuMessage(payload: "Spam message".toBytes())
|
||||
|
||||
let
|
||||
proofAdded1 = wakuRlnRelay.appendRLNProof(wm1, time)
|
||||
proofAdded2 = wakuRlnRelay.appendRLNProof(wm2, time+EpochUnitSeconds)
|
||||
proofAdded3 = wakuRlnRelay.appendRLNProof(wm4, time)
|
||||
|
||||
# ensure proofs are added
|
||||
require:
|
||||
proofAdded1
|
||||
proofAdded2
|
||||
proofAdded3
|
||||
|
||||
# validate messages
|
||||
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid)
|
||||
let
|
||||
# this should be no verification, Valid
|
||||
msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time))
|
||||
# this should be verification, Valid
|
||||
msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time))
|
||||
# this should be verification, Invalid
|
||||
msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time))
|
||||
# this should be verification, Spam
|
||||
msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time))
|
||||
|
||||
check:
|
||||
msgValidate1 == MessageValidationResult.Valid
|
||||
msgValidate2 == MessageValidationResult.Valid
|
||||
msgValidate3 == MessageValidationResult.Invalid
|
||||
msgValidate4 == MessageValidationResult.Spam
|
||||
|
||||
|
||||
test "toIDCommitment and toUInt256":
|
||||
# create an instance of rln
|
||||
let rlnInstance = createRLNInstanceWrapper()
|
||||
|
@ -749,7 +749,7 @@ when defined(rln):
|
||||
# register rln validator for all subscribed relay pubsub topics
|
||||
for pubsubTopic in node.wakuRelay.subscribedTopics:
|
||||
debug "Registering RLN validator for topic", pubsubTopic=pubsubTopic
|
||||
procCall GossipSub(node.wakuRelay).addValidator(pubsubTopic, validator)
|
||||
node.wakuRelay.addValidator(pubsubTopic, validator)
|
||||
node.wakuRlnRelay = rlnRelay
|
||||
|
||||
## Waku peer-exchange
|
||||
|
@ -8,8 +8,8 @@ else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/sequtils,
|
||||
stew/results,
|
||||
sequtils,
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
@ -122,7 +122,14 @@ const GossipsubParameters = GossipSubParams(
|
||||
type
|
||||
WakuRelayResult*[T] = Result[T, string]
|
||||
WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].}
|
||||
WakuValidatorHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[ValidationResult] {.gcsafe, raises: [Defect].}
|
||||
WakuRelay* = ref object of GossipSub
|
||||
# a map of PubsubTopic's => seq[WakuValidatorHandler] that are
|
||||
# called in order every time a message is received on a given pubsub topic
|
||||
wakuValidators: Table[PubsubTopic, seq[WakuValidatorHandler]]
|
||||
# a map that stores whether the ordered validator has been inserted
|
||||
# for a given PubsubTopic
|
||||
validatorInserted: Table[PubsubTopic, bool]
|
||||
|
||||
proc initProtocolHandler(w: WakuRelay) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
@ -167,9 +174,11 @@ proc new*(T: type WakuRelay, switch: Switch): WakuRelayResult[T] =
|
||||
|
||||
return ok(w)
|
||||
|
||||
method addValidator*(w: WakuRelay, topic: varargs[string], handler: ValidatorHandler) {.gcsafe.} =
|
||||
procCall GossipSub(w).addValidator(topic, handler)
|
||||
|
||||
proc addValidator*(w: WakuRelay,
|
||||
topic: varargs[string],
|
||||
handler: WakuValidatorHandler) {.gcsafe.} =
|
||||
for t in topic:
|
||||
w.wakuValidators.mgetOrPut(t, @[]).add(handler)
|
||||
|
||||
method start*(w: WakuRelay) {.async.} =
|
||||
debug "start"
|
||||
@ -179,21 +188,32 @@ method stop*(w: WakuRelay) {.async.} =
|
||||
debug "stop"
|
||||
await procCall GossipSub(w).stop()
|
||||
|
||||
# rejects messages that are not WakuMessage
|
||||
proc validator(pubsubTopic: string, message: messages.Message): Future[ValidationResult] {.async.} =
|
||||
# can be optimized by checking if the message is a WakuMessage without allocating memory
|
||||
# see nim-libp2p protobuf library
|
||||
let msg = WakuMessage.decode(message.data)
|
||||
if msg.isOk():
|
||||
return ValidationResult.Accept
|
||||
return ValidationResult.Reject
|
||||
|
||||
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
|
||||
GossipSub(w).topics.hasKey(topic)
|
||||
|
||||
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
|
||||
return toSeq(GossipSub(w).topics.keys())
|
||||
|
||||
proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} =
|
||||
# rejects messages that are not WakuMessage
|
||||
let wrappedValidator = proc(pubsubTopic: string,
|
||||
message: messages.Message): Future[ValidationResult] {.async.} =
|
||||
# can be optimized by checking if the message is a WakuMessage without allocating memory
|
||||
# see nim-libp2p protobuf library
|
||||
let msgRes = WakuMessage.decode(message.data)
|
||||
if msgRes.isErr():
|
||||
return ValidationResult.Reject
|
||||
let msg = msgRes.get()
|
||||
|
||||
# now sequentially validate the message
|
||||
if w.wakuValidators.hasKey(pubsubTopic):
|
||||
for validator in w.wakuValidators[pubsubTopic]:
|
||||
let validatorRes = await validator(pubsubTopic, msg)
|
||||
if validatorRes != ValidationResult.Accept:
|
||||
return validatorRes
|
||||
return ValidationResult.Accept
|
||||
return wrappedValidator
|
||||
|
||||
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) =
|
||||
debug "subscribe", pubsubTopic=pubsubTopic
|
||||
|
||||
@ -209,8 +229,10 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle
|
||||
else:
|
||||
return handler(pubsubTopic, decMsg.get())
|
||||
|
||||
# add the default validator to the topic
|
||||
procCall GossipSub(w).addValidator(pubSubTopic, validator)
|
||||
# add the ordered validator to the topic
|
||||
if not w.validatorInserted.hasKey(pubSubTopic):
|
||||
procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator())
|
||||
w.validatorInserted[pubSubTopic] = true
|
||||
|
||||
# set this topic parameters for scoring
|
||||
w.topicParams[pubsubTopic] = TopicParameters
|
||||
@ -222,6 +244,7 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
|
||||
debug "unsubscribe", pubsubTopic=pubsubTopic
|
||||
|
||||
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
|
||||
w.validatorInserted.del(pubsubTopic)
|
||||
|
||||
proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} =
|
||||
trace "publish", pubsubTopic=pubsubTopic
|
||||
|
@ -22,6 +22,7 @@ import
|
||||
./protocol_types,
|
||||
./protocol_metrics
|
||||
import
|
||||
../waku_relay, # for WakuRelayHandler
|
||||
../waku_core,
|
||||
../waku_keystore,
|
||||
../utils/collector
|
||||
@ -79,7 +80,6 @@ type WakuRLNRelay* = ref object of RootObj
|
||||
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
|
||||
lastEpoch*: Epoch # the epoch of the last published rln message
|
||||
groupManager*: GroupManager
|
||||
messageBucket*: Option[TokenBucket]
|
||||
|
||||
method stop*(rlnPeer: WakuRLNRelay) {.async.} =
|
||||
## stops the rln-relay protocol
|
||||
@ -301,57 +301,44 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay,
|
||||
return true
|
||||
|
||||
proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
|
||||
spamHandler: Option[SpamHandler] = none(SpamHandler)): pubsub.ValidatorHandler =
|
||||
spamHandler: Option[SpamHandler] = none(SpamHandler)): WakuValidatorHandler =
|
||||
## this procedure is a thin wrapper for the pubsub addValidator method
|
||||
## it sets a validator for waku messages, acting in the registered pubsub topic
|
||||
## the message validation logic is according to https://rfc.vac.dev/spec/17/
|
||||
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
|
||||
proc validator(topic: string, message: WakuMessage): Future[pubsub.ValidationResult] {.async.} =
|
||||
trace "rln-relay topic validator is called"
|
||||
|
||||
## Check if enough tokens can be consumed from the message bucket
|
||||
try:
|
||||
if wakuRlnRelay.messageBucket.isSome() and
|
||||
wakuRlnRelay.messageBucket.get().tryConsume(message.data.len):
|
||||
let decodeRes = RateLimitProof.init(message.proof)
|
||||
|
||||
if decodeRes.isErr():
|
||||
return pubsub.ValidationResult.Reject
|
||||
|
||||
let msgProof = decodeRes.get()
|
||||
|
||||
# validate the message and update log
|
||||
let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(message)
|
||||
|
||||
let
|
||||
proof = toHex(msgProof.proof)
|
||||
epoch = fromEpoch(msgProof.epoch)
|
||||
root = inHex(msgProof.merkleRoot)
|
||||
shareX = inHex(msgProof.shareX)
|
||||
shareY = inHex(msgProof.shareY)
|
||||
nullifier = inHex(msgProof.nullifier)
|
||||
payload = string.fromBytes(message.payload)
|
||||
case validationRes:
|
||||
of Valid:
|
||||
trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||
return pubsub.ValidationResult.Accept
|
||||
else:
|
||||
trace "message bandwidth limit exceeded, running rate limit proof validation"
|
||||
except OverflowDefect: # not a problem
|
||||
trace "not enough bandwidth, running rate limit proof validation"
|
||||
|
||||
let decodeRes = WakuMessage.decode(message.data)
|
||||
if decodeRes.isOk():
|
||||
let wakumessage = decodeRes.value
|
||||
let decodeRes = RateLimitProof.init(wakumessage.proof)
|
||||
|
||||
if decodeRes.isErr():
|
||||
of Invalid:
|
||||
trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||
return pubsub.ValidationResult.Reject
|
||||
of Spam:
|
||||
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||
if spamHandler.isSome():
|
||||
let handler = spamHandler.get()
|
||||
handler(message)
|
||||
return pubsub.ValidationResult.Reject
|
||||
|
||||
let msgProof = decodeRes.get()
|
||||
|
||||
# validate the message and update log
|
||||
let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(wakumessage)
|
||||
|
||||
let
|
||||
proof = toHex(msgProof.proof)
|
||||
epoch = fromEpoch(msgProof.epoch)
|
||||
root = inHex(msgProof.merkleRoot)
|
||||
shareX = inHex(msgProof.shareX)
|
||||
shareY = inHex(msgProof.shareY)
|
||||
nullifier = inHex(msgProof.nullifier)
|
||||
payload = string.fromBytes(wakumessage.payload)
|
||||
case validationRes:
|
||||
of Valid:
|
||||
trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||
return pubsub.ValidationResult.Accept
|
||||
of Invalid:
|
||||
trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||
return pubsub.ValidationResult.Reject
|
||||
of Spam:
|
||||
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
|
||||
if spamHandler.isSome():
|
||||
let handler = spamHandler.get()
|
||||
handler(wakumessage)
|
||||
return pubsub.ValidationResult.Reject
|
||||
return validator
|
||||
|
||||
proc mount(conf: WakuRlnConfig,
|
||||
@ -393,12 +380,7 @@ proc mount(conf: WakuRlnConfig,
|
||||
# Start the group sync
|
||||
await groupManager.startGroupSync()
|
||||
|
||||
let messageBucket = if conf.rlnRelayBandwidthThreshold > 0:
|
||||
some(TokenBucket.new(conf.rlnRelayBandwidthThreshold))
|
||||
else: none(TokenBucket)
|
||||
|
||||
return WakuRLNRelay(groupManager: groupManager,
|
||||
messageBucket: messageBucket)
|
||||
return WakuRLNRelay(groupManager: groupManager)
|
||||
|
||||
|
||||
proc new*(T: type WakuRlnRelay,
|
||||
|
Loading…
x
Reference in New Issue
Block a user