chore(rln-relay): verify proofs based on bandwidth usage (#1844)

* chore(rln-relay): Verify proofs based on bandwidth usage

* fix(rln-relay): make default threshold 0, for backwards compat

* fix(rln-relay): add unit test for bandwidth cutoff

* fix(rln-relay): rlnRelayBandwidthTreshold option
This commit is contained in:
Aaryamann Challani 2023-07-07 17:28:37 +05:30 committed by GitHub
parent 29614e2e52
commit 3fe4522a7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 99 additions and 7 deletions

View File

@ -383,7 +383,8 @@ proc setupProtocols(node: WakuNode,
rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress, rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress,
rlnRelayCredPath: conf.rlnRelayCredPath, rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword, rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword,
rlnRelayTreePath: conf.rlnRelayTreePath rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayBandwidthThreshold: conf.rlnRelayBandwidthThreshold
) )
try: try:

View File

@ -211,6 +211,11 @@ type
defaultValue: "" defaultValue: ""
name: "rln-relay-tree-path" }: string name: "rln-relay-tree-path" }: string
rlnRelayBandwidthThreshold* {.
desc: "Message rate in bytes/sec after which verification of proofs should happen",
defaultValue: 0 # to maintain backwards compatibility
name: "rln-relay-bandwidth-threshold" }: int
staticnodes* {. staticnodes* {.
desc: "Peer multiaddr to directly connect with. Argument may be repeated." desc: "Peer multiaddr to directly connect with. Argument may be repeated."
name: "staticnode" }: seq[string] name: "staticnode" }: seq[string]

View File

@ -662,7 +662,8 @@ suite "Waku rln relay":
let rlnConf = WakuRlnConfig(rlnRelayDynamic: false, let rlnConf = WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: RlnRelayPubsubTopic, rlnRelayPubsubTopic: RlnRelayPubsubTopic,
rlnRelayContentTopic: RlnRelayContentTopic, rlnRelayContentTopic: RlnRelayContentTopic,
rlnRelayCredIndex: index.uint) rlnRelayCredIndex: index.uint,
rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_2"))
let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf) let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf)
require: require:
wakuRlnRelayRes.isOk() wakuRlnRelayRes.isOk()
@ -708,6 +709,63 @@ suite "Waku rln relay":
msgValidate2 == MessageValidationResult.Spam msgValidate2 == MessageValidationResult.Spam
msgValidate3 == MessageValidationResult.Valid msgValidate3 == MessageValidationResult.Valid
msgValidate4 == MessageValidationResult.Invalid msgValidate4 == MessageValidationResult.Invalid
asyncTest "should validate invalid proofs if bandwidth is available":
let index = MembershipIndex(5)
let rlnConf = WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayPubsubTopic: RlnRelayPubsubTopic,
rlnRelayContentTopic: RlnRelayContentTopic,
rlnRelayCredIndex: index.uint,
rlnRelayBandwidthThreshold: 4,
rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_3"))
let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf)
require:
wakuRlnRelayRes.isOk()
let wakuRlnRelay = wakuRlnRelayRes.get()
# get the current epoch time
let time = epochTime()
# create some messages from the same peer and append rln proof to them, except wm4
var
# this one will pass through the bandwidth threshold
wm1 = WakuMessage(payload: "Spam".toBytes())
# this message, will be over the bandwidth threshold, hence has to be verified
wm2 = WakuMessage(payload: "Valid message".toBytes())
# this message will be over the bandwidth threshold, hence has to be verified, will be false (since no proof)
wm3 = WakuMessage(payload: "Invalid message".toBytes())
wm4 = WakuMessage(payload: "Spam message".toBytes())
let
proofAdded1 = wakuRlnRelay.appendRLNProof(wm1, time)
proofAdded2 = wakuRlnRelay.appendRLNProof(wm2, time+EpochUnitSeconds)
proofAdded3 = wakuRlnRelay.appendRLNProof(wm4, time)
# ensure proofs are added
require:
proofAdded1
proofAdded2
proofAdded3
# validate messages
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid)
let
# this should be no verification, Valid
msgValidate1 = wakuRlnRelay.validateMessage(wm1, some(time))
# this should be verification, Valid
msgValidate2 = wakuRlnRelay.validateMessage(wm2, some(time))
# this should be verification, Invalid
msgValidate3 = wakuRlnRelay.validateMessage(wm3, some(time))
# this should be verification, Spam
msgValidate4 = wakuRlnRelay.validateMessage(wm4, some(time))
check:
msgValidate1 == MessageValidationResult.Valid
msgValidate2 == MessageValidationResult.Valid
msgValidate3 == MessageValidationResult.Invalid
msgValidate4 == MessageValidationResult.Spam
test "toIDCommitment and toUInt256": test "toIDCommitment and toUInt256":
# create an instance of rln # create an instance of rln

View File

@ -142,6 +142,7 @@ procSuite "WakuNode - RLN relay":
rlnRelayContentTopic: contentTopic, rlnRelayContentTopic: contentTopic,
rlnRelayCredIndex: 1.uint, rlnRelayCredIndex: 1.uint,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"), rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"),
rlnRelayBandwidthThreshold: 0,
)) ))
await node1.start() await node1.start()
@ -154,6 +155,7 @@ procSuite "WakuNode - RLN relay":
rlnRelayContentTopic: contentTopic, rlnRelayContentTopic: contentTopic,
rlnRelayCredIndex: 2.uint, rlnRelayCredIndex: 2.uint,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"), rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"),
rlnRelayBandwidthThreshold: 0,
)) ))
await node2.start() await node2.start()
@ -166,6 +168,7 @@ procSuite "WakuNode - RLN relay":
rlnRelayContentTopic: contentTopic, rlnRelayContentTopic: contentTopic,
rlnRelayCredIndex: 3.uint, rlnRelayCredIndex: 3.uint,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"), rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"),
rlnRelayBandwidthThreshold: 0,
)) ))
await node3.start() await node3.start()
@ -248,6 +251,7 @@ procSuite "WakuNode - RLN relay":
rlnRelayContentTopic: contentTopic, rlnRelayContentTopic: contentTopic,
rlnRelayCredIndex: 1.uint, rlnRelayCredIndex: 1.uint,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"), rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"),
rlnRelayBandwidthThreshold: 0,
)) ))
await node1.start() await node1.start()
@ -261,6 +265,7 @@ procSuite "WakuNode - RLN relay":
rlnRelayContentTopic: contentTopic, rlnRelayContentTopic: contentTopic,
rlnRelayCredIndex: 2.uint, rlnRelayCredIndex: 2.uint,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"), rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"),
rlnRelayBandwidthThreshold: 0,
)) ))
await node2.start() await node2.start()
@ -274,6 +279,7 @@ procSuite "WakuNode - RLN relay":
rlnRelayContentTopic: contentTopic, rlnRelayContentTopic: contentTopic,
rlnRelayCredIndex: 3.uint, rlnRelayCredIndex: 3.uint,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"), rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"),
rlnRelayBandwidthThreshold: 0,
)) ))
await node3.start() await node3.start()

View File

@ -5,7 +5,7 @@ else:
import import
std/[algorithm, sequtils, strutils, tables, times, os, deques], std/[algorithm, sequtils, strutils, tables, times, os, deques],
chronicles, options, chronos, stint, chronicles, options, chronos, chronos/ratelimit, stint,
confutils, confutils,
web3, json, web3, json,
web3/ethtypes, web3/ethtypes,
@ -23,7 +23,8 @@ import
./protocol_metrics ./protocol_metrics
import import
../waku_core, ../waku_core,
../waku_keystore ../waku_keystore,
../utils/collector
logScope: logScope:
topics = "waku rln_relay" topics = "waku rln_relay"
@ -41,6 +42,7 @@ type WakuRlnConfig* = object
rlnRelayCredPath*: string rlnRelayCredPath*: string
rlnRelayCredentialsPassword*: string rlnRelayCredentialsPassword*: string
rlnRelayTreePath*: string rlnRelayTreePath*: string
rlnRelayBandwidthThreshold*: int
proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[( proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[(
seq[RawMembershipCredentials], string seq[RawMembershipCredentials], string
@ -86,6 +88,7 @@ type WakuRLNRelay* = ref object of RootObj
nullifierLog*: Table[Epoch, seq[ProofMetadata]] nullifierLog*: Table[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message lastEpoch*: Epoch # the epoch of the last published rln message
groupManager*: GroupManager groupManager*: GroupManager
messageBucket*: Option[TokenBucket]
proc hasDuplicate*(rlnPeer: WakuRLNRelay, proc hasDuplicate*(rlnPeer: WakuRLNRelay,
proofMetadata: ProofMetadata): RlnRelayResult[bool] = proofMetadata: ProofMetadata): RlnRelayResult[bool] =
@ -160,14 +163,16 @@ proc absDiff*(e1, e2: Epoch): uint64 =
else: else:
return epoch2 - epoch1 return epoch2 - epoch1
proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, proc validateMessage*(rlnPeer: WakuRLNRelay,
timeOption: Option[float64] = none(float64)): MessageValidationResult = msg: WakuMessage,
timeOption = none(float64)): MessageValidationResult =
## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e., ## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e.,
## the `msg`'s epoch is within MaxEpochGap of the current epoch ## the `msg`'s epoch is within MaxEpochGap of the current epoch
## the `msg` has valid rate limit proof ## the `msg` has valid rate limit proof
## the `msg` does not violate the rate limit ## the `msg` does not violate the rate limit
## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds) ## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds)
## if `timeOption` is supplied, then the current epoch is calculated based on that ## if `timeOption` is supplied, then the current epoch is calculated based on that
let decodeRes = RateLimitProof.init(msg.proof) let decodeRes = RateLimitProof.init(msg.proof)
if decodeRes.isErr(): if decodeRes.isErr():
return MessageValidationResult.Invalid return MessageValidationResult.Invalid
@ -286,6 +291,18 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
let contentTopic = wakuRlnRelay.contentTopic let contentTopic = wakuRlnRelay.contentTopic
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called" trace "rln-relay topic validator is called"
## Check if enough tokens can be consumed from the message bucket
try:
if wakuRlnRelay.messageBucket.isSome() and
wakuRlnRelay.messageBucket.get().tryConsume(message.data.len):
return pubsub.ValidationResult.Accept
else:
info "message bandwidth limit exceeded, running rate limit proof validation"
except OverflowDefect: # not a problem
debug "not enough bandwidth, running rate limit proof validation"
let decodeRes = WakuMessage.decode(message.data) let decodeRes = WakuMessage.decode(message.data)
if decodeRes.isOk(): if decodeRes.isOk():
let let
@ -377,9 +394,14 @@ proc mount(conf: WakuRlnConfig,
# Start the group sync # Start the group sync
await groupManager.startGroupSync() await groupManager.startGroupSync()
let messageBucket = if conf.rlnRelayBandwidthThreshold > 0:
some(TokenBucket.new(conf.rlnRelayBandwidthThreshold))
else: none(TokenBucket)
return WakuRLNRelay(pubsubTopic: conf.rlnRelayPubsubTopic, return WakuRLNRelay(pubsubTopic: conf.rlnRelayPubsubTopic,
contentTopic: conf.rlnRelayContentTopic, contentTopic: conf.rlnRelayContentTopic,
groupManager: groupManager) groupManager: groupManager,
messageBucket: messageBucket)
proc new*(T: type WakuRlnRelay, proc new*(T: type WakuRlnRelay,