refactor(rln): decouple rln types from waku message type

This commit is contained in:
Lorenzo Delgado 2022-11-22 18:29:43 +01:00 committed by GitHub
parent 5ccbbf1316
commit 1237698484
5 changed files with 125 additions and 79 deletions

View File

@ -232,14 +232,19 @@ proc publish(c: Chat, line: string) =
debug "could not append rate limit proof to the message", success=success debug "could not append rate limit proof to the message", success=success
else: else:
debug "rate limit proof is appended to the message", success=success debug "rate limit proof is appended to the message", success=success
# TODO move it to log after doogfooding let decodeRes = RateLimitProof.init(message.proof)
let msgEpoch = fromEpoch(message.proof.epoch) if decodeRes.isErr():
if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == fromEpoch(message.proof.epoch): error "could not decode RLN proof"
let proof = decodeRes.get()
# TODO move it to log after dogfooding
let msgEpoch = fromEpoch(proof.epoch)
if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == fromEpoch(proof.epoch):
echo "--rln epoch: ", msgEpoch, " ⚠️ message rate violation! you are spamming the network!" echo "--rln epoch: ", msgEpoch, " ⚠️ message rate violation! you are spamming the network!"
else: else:
echo "--rln epoch: ", msgEpoch echo "--rln epoch: ", msgEpoch
# update the last epoch # update the last epoch
c.node.wakuRlnRelay.lastEpoch = message.proof.epoch c.node.wakuRlnRelay.lastEpoch = proof.epoch
if not c.node.wakuLightPush.isNil(): if not c.node.wakuLightPush.isNil():
# Attempt lightpush # Attempt lightpush
asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message) asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message)
@ -252,7 +257,7 @@ proc publish(c: Chat, line: string) =
var message = WakuMessage(payload: chat2pb.buffer, var message = WakuMessage(payload: chat2pb.buffer,
contentTopic: c.contentTopic, version: 0, timestamp: getNanosecondTime(time)) contentTopic: c.contentTopic, version: 0, timestamp: getNanosecondTime(time))
when defined(rln): when defined(rln):
if not isNil(c.node.wakuRlnRelay): if not isNil(c.node.wakuRlnRelay):
# for future version when we support more than one rln protected content topic, # for future version when we support more than one rln protected content topic,
# we should check the message content topic as well # we should check the message content topic as well
let success = c.node.wakuRlnRelay.appendRLNProof(message, float64(time)) let success = c.node.wakuRlnRelay.appendRLNProof(message, float64(time))
@ -260,14 +265,19 @@ proc publish(c: Chat, line: string) =
debug "could not append rate limit proof to the message", success=success debug "could not append rate limit proof to the message", success=success
else: else:
debug "rate limit proof is appended to the message", success=success debug "rate limit proof is appended to the message", success=success
# TODO move it to log after doogfooding let decodeRes = RateLimitProof.init(message.proof)
let msgEpoch = fromEpoch(message.proof.epoch) if decodeRes.isErr():
error "could not decode the RLN proof"
let proof = decodeRes.get()
# TODO move it to log after dogfooding
let msgEpoch = fromEpoch(proof.epoch)
if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == msgEpoch: if fromEpoch(c.node.wakuRlnRelay.lastEpoch) == msgEpoch:
echo "--rln epoch: ", msgEpoch, " ⚠️ message rate violation! you are spamming the network!" echo "--rln epoch: ", msgEpoch, " ⚠️ message rate violation! you are spamming the network!"
else: else:
echo "--rln epoch: ", msgEpoch echo "--rln epoch: ", msgEpoch
# update the last epoch # update the last epoch
c.node.wakuRlnRelay.lastEpoch = message.proof.epoch c.node.wakuRlnRelay.lastEpoch = proof.epoch
if not c.node.wakuLightPush.isNil(): if not c.node.wakuLightPush.isNil():
# Attempt lightpush # Attempt lightpush

View File

@ -854,14 +854,22 @@ suite "Waku rln relay":
for index, x in shareX3.mpairs: shareX3[index] = 3 for index, x in shareX3.mpairs: shareX3[index] = 3
let shareY3 = shareX3 let shareY3 = shareX3
## TODO: when zerokit rln is integrated, RateLimitProof should be initialized passing a rlnIdentifier too (now implicitely set to 0) proc encodeAndGetBuf(proof: RateLimitProof): seq[byte] =
return proof.encode().buffer
let let
wm1 = WakuMessage(proof: RateLimitProof(epoch: epoch, wm1 = WakuMessage(proof: RateLimitProof(epoch: epoch,
nullifier: nullifier1, shareX: shareX1, shareY: shareY1)) nullifier: nullifier1,
shareX: shareX1,
shareY: shareY1).encodeAndGetBuf())
wm2 = WakuMessage(proof: RateLimitProof(epoch: epoch, wm2 = WakuMessage(proof: RateLimitProof(epoch: epoch,
nullifier: nullifier2, shareX: shareX2, shareY: shareY2)) nullifier: nullifier2,
shareX: shareX2,
shareY: shareY2).encodeAndGetBuf())
wm3 = WakuMessage(proof: RateLimitProof(epoch: epoch, wm3 = WakuMessage(proof: RateLimitProof(epoch: epoch,
nullifier: nullifier3, shareX: shareX3, shareY: shareY3)) nullifier: nullifier3,
shareX: shareX3,
shareY: shareY3).encodeAndGetBuf())
# check whether hasDuplicate correctly finds records with the same nullifiers but different secret shares # check whether hasDuplicate correctly finds records with the same nullifiers but different secret shares
# no duplicate for wm1 should be found, since the log is empty # no duplicate for wm1 should be found, since the log is empty

View File

@ -235,8 +235,9 @@ procSuite "WakuNode - RLN relay":
memKeys = node1.wakuRlnRelay.membershipKeyPair, memKeys = node1.wakuRlnRelay.membershipKeyPair,
memIndex = MembershipIndex(1), memIndex = MembershipIndex(1),
epoch = epoch) epoch = epoch)
doAssert(rateLimitProofRes.isOk()) require:
let rateLimitProof = rateLimitProofRes.value rateLimitProofRes.isOk()
let rateLimitProof = rateLimitProofRes.get().encode().buffer
let message = WakuMessage(payload: @payload, let message = WakuMessage(payload: @payload,
contentTopic: contentTopic, contentTopic: contentTopic,

View File

@ -15,13 +15,9 @@ import
../../common/protobuf, ../../common/protobuf,
../utils/time ../utils/time
when defined(rln):
import
./waku_rln_relay/protocol_types
const MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default const MaxWakuMessageSize* = 1024 * 1024 # In bytes. Corresponds to PubSub default
type type
PubsubTopic* = string PubsubTopic* = string
ContentTopic* = string ContentTopic* = string
@ -41,7 +37,7 @@ type WakuMessage* = object
# this field will be used in the rln-relay protocol # this field will be used in the rln-relay protocol
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec # XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
when defined(rln): when defined(rln):
proof*: RateLimitProof proof*: seq[byte]
# The ephemeral field indicates if the message should # The ephemeral field indicates if the message should
# be stored. bools and uints are # be stored. bools and uints are
# equivalent in serialization of the protobuf # equivalent in serialization of the protobuf
@ -58,7 +54,7 @@ proc encode*(message: WakuMessage): ProtoBuffer =
buf.write3(3, message.version) buf.write3(3, message.version)
buf.write3(10, zint64(message.timestamp)) buf.write3(10, zint64(message.timestamp))
when defined(rln): when defined(rln):
buf.write3(21, message.proof.encode()) buf.write3(21, message.proof)
buf.write3(31, uint64(message.ephemeral)) buf.write3(31, uint64(message.ephemeral))
buf.finish3() buf.finish3()
@ -76,11 +72,11 @@ proc decode*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
discard ?pb.getField(10, timestamp) discard ?pb.getField(10, timestamp)
msg.timestamp = Timestamp(timestamp) msg.timestamp = Timestamp(timestamp)
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec # Experimental: this is part of https://rfc.vac.dev/spec/17/ spec
when defined(rln): when defined(rln):
var proofBytes: seq[byte] var proofBytes: seq[byte]
discard ?pb.getField(21, proofBytes) if ?pb.getField(21, proofBytes):
msg.proof = ?RateLimitProof.init(proofBytes) msg.proof = proofBytes
var ephemeral: uint var ephemeral: uint
if ?pb.getField(31, ephemeral): if ?pb.getField(31, ephemeral):

View File

@ -583,20 +583,29 @@ proc hasDuplicate*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool
## otherwise, returns false ## otherwise, returns false
## Returns an error if it cannot check for duplicates ## Returns an error if it cannot check for duplicates
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():
return err("failed to decode the RLN proof")
let proof = decodeRes.get()
# extract the proof metadata of the supplied `msg` # extract the proof metadata of the supplied `msg`
let proofMD = ProofMetadata(nullifier: msg.proof.nullifier, let proofMD = ProofMetadata(
shareX: msg.proof.shareX, shareY: msg.proof.shareY) nullifier: proof.nullifier,
shareX: proof.shareX,
shareY: proof.shareY
)
# check if the epoch exists # check if the epoch exists
if not rlnPeer.nullifierLog.hasKey(msg.proof.epoch): if not rlnPeer.nullifierLog.hasKey(proof.epoch):
return ok(false) return ok(false)
try: try:
if rlnPeer.nullifierLog[msg.proof.epoch].contains(proofMD): if rlnPeer.nullifierLog[proof.epoch].contains(proofMD):
# there is an identical record, ignore rhe mag # there is an identical record, ignore rhe mag
return ok(false) return ok(false)
# check for a message with the same nullifier but different secret shares # check for a message with the same nullifier but different secret shares
let matched = rlnPeer.nullifierLog[msg.proof.epoch].filterIt(( let matched = rlnPeer.nullifierLog[proof.epoch].filterIt((
it.nullifier == proofMD.nullifier) and ((it.shareX != proofMD.shareX) or it.nullifier == proofMD.nullifier) and ((it.shareX != proofMD.shareX) or
(it.shareY != proofMD.shareY))) (it.shareY != proofMD.shareY)))
@ -615,21 +624,31 @@ proc updateLog*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool] =
## saves it in the `nullifierLog` of the `rlnPeer` ## saves it in the `nullifierLog` of the `rlnPeer`
## Returns an error if it cannot update the log ## Returns an error if it cannot update the log
let proofMD = ProofMetadata(nullifier: msg.proof.nullifier, let decodeRes = RateLimitProof.init(msg.proof)
shareX: msg.proof.shareX, shareY: msg.proof.shareY) if decodeRes.isErr():
return err("failed to decode the RLN proof")
let proof = decodeRes.get()
# extract the proof metadata of the supplied `msg`
let proofMD = ProofMetadata(
nullifier: proof.nullifier,
shareX: proof.shareX,
shareY: proof.shareY
)
debug "proof metadata", proofMD = proofMD debug "proof metadata", proofMD = proofMD
# check if the epoch exists # check if the epoch exists
if not rlnPeer.nullifierLog.hasKey(msg.proof.epoch): if not rlnPeer.nullifierLog.hasKey(proof.epoch):
rlnPeer.nullifierLog[msg.proof.epoch] = @[proofMD] rlnPeer.nullifierLog[proof.epoch] = @[proofMD]
return ok(true) return ok(true)
try: try:
# check if an identical record exists # check if an identical record exists
if rlnPeer.nullifierLog[msg.proof.epoch].contains(proofMD): if rlnPeer.nullifierLog[proof.epoch].contains(proofMD):
return ok(true) return ok(true)
# add proofMD to the log # add proofMD to the log
rlnPeer.nullifierLog[msg.proof.epoch].add(proofMD) rlnPeer.nullifierLog[proof.epoch].add(proofMD)
return ok(true) return ok(true)
except KeyError as e: except KeyError as e:
return err("the epoch was not found") return err("the epoch was not found")
@ -680,6 +699,11 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
## the `msg` does not violate the rate limit ## the `msg` does not violate the rate limit
## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds) ## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds)
## if `timeOption` is supplied, then the current epoch is calculated based on that ## if `timeOption` is supplied, then the current epoch is calculated based on that
let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr():
return MessageValidationResult.Invalid
let proof = decodeRes.get()
# track message count for metrics # track message count for metrics
waku_rln_messages_total.inc() waku_rln_messages_total.inc()
@ -695,7 +719,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
debug "current epoch", currentEpoch = fromEpoch(epoch) debug "current epoch", currentEpoch = fromEpoch(epoch)
let let
msgEpoch = msg.proof.epoch msgEpoch = proof.epoch
# calculate the gaps # calculate the gaps
gap = absDiff(epoch, msgEpoch) gap = absDiff(epoch, msgEpoch)
@ -711,8 +735,8 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
return MessageValidationResult.Invalid return MessageValidationResult.Invalid
## TODO: FIXME after resolving this issue https://github.com/status-im/nwaku/issues/1247 ## TODO: FIXME after resolving this issue https://github.com/status-im/nwaku/issues/1247
if not rlnPeer.validateRoot(msg.proof.merkleRoot): if not rlnPeer.validateRoot(proof.merkleRoot):
debug "invalid message: provided root does not belong to acceptable window of roots", provided=msg.proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex()) debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex())
waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"]) waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"])
# return MessageValidationResult.Invalid # return MessageValidationResult.Invalid
@ -723,7 +747,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
waku_rln_proof_verification_total.inc() waku_rln_proof_verification_total.inc()
waku_rln_proof_verification_duration_seconds.nanosecondTime: waku_rln_proof_verification_duration_seconds.nanosecondTime:
let proofVerificationRes = rlnPeer.rlnInstance.proofVerify(input, msg.proof) let proofVerificationRes = rlnPeer.rlnInstance.proofVerify(input, proof)
if proofVerificationRes.isErr(): if proofVerificationRes.isErr():
waku_rln_errors_total.inc(labelValues=["proof_verification"]) waku_rln_errors_total.inc(labelValues=["proof_verification"])
@ -749,7 +773,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage,
# it will never error out # it will never error out
discard rlnPeer.updateLog(msg) discard rlnPeer.updateLog(msg)
debug "message is valid", payload = string.fromBytes(msg.payload) debug "message is valid", payload = string.fromBytes(msg.payload)
let rootIndex = rlnPeer.validMerkleRoots.find(msg.proof.merkleRoot) let rootIndex = rlnPeer.validMerkleRoots.find(proof.merkleRoot)
waku_rln_valid_messages_total.observe(rootIndex.toFloat()) waku_rln_valid_messages_total.observe(rootIndex.toFloat())
return MessageValidationResult.Valid return MessageValidationResult.Valid
@ -775,10 +799,10 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay, msg: var WakuMessage,
memIndex = rlnPeer.membershipIndex, memIndex = rlnPeer.membershipIndex,
epoch = calcEpoch(senderEpochTime)) epoch = calcEpoch(senderEpochTime))
if proof.isErr: if proof.isErr():
return false return false
msg.proof = proof.value msg.proof = proof.value.encode().buffer
return true return true
proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResult[void] = proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResult[void] =
@ -955,10 +979,10 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi
## the message validation logic is according to https://rfc.vac.dev/spec/17/ ## the message validation logic is according to https://rfc.vac.dev/spec/17/
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called" trace "rln-relay topic validator is called"
let msg = WakuMessage.decode(message.data) let decodeRes = WakuMessage.decode(message.data)
if msg.isOk(): if decodeRes.isOk():
let let
wakumessage = msg.value() wakumessage = decodeRes.value
payload = string.fromBytes(wakumessage.payload) payload = string.fromBytes(wakumessage.payload)
# check the contentTopic # check the contentTopic
@ -966,15 +990,22 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi
trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload
return pubsub.ValidationResult.Accept return pubsub.ValidationResult.Accept
let decodeRes = RateLimitProof.init(wakumessage.proof)
if decodeRes.isErr():
return pubsub.ValidationResult.Reject
let msgProof = decodeRes.get()
# validate the message # validate the message
let let
validationRes = node.wakuRlnRelay.validateMessage(wakumessage) validationRes = node.wakuRlnRelay.validateMessage(wakumessage)
proof = toHex(wakumessage.proof.proof) proof = toHex(msgProof.proof)
epoch = fromEpoch(wakumessage.proof.epoch) epoch = fromEpoch(msgProof.epoch)
root = inHex(wakumessage.proof.merkleRoot) root = inHex(msgProof.merkleRoot)
shareX = inHex(wakumessage.proof.shareX) shareX = inHex(msgProof.shareX)
shareY = inHex(wakumessage.proof.shareY) shareY = inHex(msgProof.shareY)
nullifier = inHex(wakumessage.proof.nullifier) nullifier = inHex(msgProof.nullifier)
case validationRes: case validationRes:
of Valid: of Valid:
debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
@ -987,8 +1018,8 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi
of Spam: of Spam:
debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
if spamHandler.isSome: if spamHandler.isSome():
let handler = spamHandler.get let handler = spamHandler.get()
handler(wakumessage) handler(wakumessage)
return pubsub.ValidationResult.Reject return pubsub.ValidationResult.Reject
# set a validator for the supplied pubsubTopic # set a validator for the supplied pubsubTopic