mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-04 11:05:38 +00:00
chore(rln-relay): clean up nullifier table every MaxEpochGap (#1994)
This commit is contained in:
parent
ea31b531b5
commit
483f40c8f7
@ -402,7 +402,6 @@ proc setupProtocols(node: WakuNode,
|
||||
rlnRelayCredPath: conf.rlnRelayCredPath,
|
||||
rlnRelayCredPassword: conf.rlnRelayCredPassword,
|
||||
rlnRelayTreePath: conf.rlnRelayTreePath,
|
||||
rlnRelayBandwidthThreshold: conf.rlnRelayBandwidthThreshold
|
||||
)
|
||||
|
||||
try:
|
||||
|
@ -7,17 +7,12 @@ import
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/peerid,
|
||||
libp2p/multiaddress,
|
||||
libp2p/switch,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
eth/keys
|
||||
libp2p/protocols/pubsub/pubsub
|
||||
import
|
||||
../../../waku/waku_core,
|
||||
../../../waku/waku_node,
|
||||
../../../waku/waku_rln_relay,
|
||||
../../../waku/waku_keystore,
|
||||
../testlib/wakucore,
|
||||
../testlib/wakunode
|
||||
|
||||
@ -206,7 +201,6 @@ procSuite "WakuNode - RLN relay":
|
||||
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(1.uint),
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
))
|
||||
|
||||
await node1.start()
|
||||
@ -217,7 +211,6 @@ procSuite "WakuNode - RLN relay":
|
||||
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(2.uint),
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
))
|
||||
|
||||
await node2.start()
|
||||
@ -228,7 +221,6 @@ procSuite "WakuNode - RLN relay":
|
||||
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(3.uint),
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
))
|
||||
|
||||
await node3.start()
|
||||
@ -308,7 +300,6 @@ procSuite "WakuNode - RLN relay":
|
||||
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(1.uint),
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
))
|
||||
|
||||
await node1.start()
|
||||
@ -320,7 +311,6 @@ procSuite "WakuNode - RLN relay":
|
||||
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(2.uint),
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
))
|
||||
|
||||
await node2.start()
|
||||
@ -332,7 +322,6 @@ procSuite "WakuNode - RLN relay":
|
||||
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(3.uint),
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"),
|
||||
rlnRelayBandwidthThreshold: 0,
|
||||
))
|
||||
|
||||
await node3.start()
|
||||
@ -407,3 +396,98 @@ procSuite "WakuNode - RLN relay":
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
|
||||
asyncTest "clearNullifierLog: should clear epochs > MaxEpochGap":
|
||||
|
||||
let
|
||||
# publisher node
|
||||
nodeKey1 = generateSecp256k1Key()
|
||||
node1 = newTestWakuNode(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
# Relay node
|
||||
nodeKey2 = generateSecp256k1Key()
|
||||
node2 = newTestWakuNode(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
# Subscriber
|
||||
nodeKey3 = generateSecp256k1Key()
|
||||
node3 = newTestWakuNode(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
# set up 2 nodes
|
||||
# node1
|
||||
await node1.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(1.uint),
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_10"),
|
||||
))
|
||||
|
||||
await node1.start()
|
||||
|
||||
# node 2
|
||||
await node2.mountRelay(@[DefaultPubsubTopic])
|
||||
|
||||
# mount rlnrelay in off-chain mode
|
||||
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
|
||||
rlnRelayCredIndex: some(2.uint),
|
||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_11"),
|
||||
))
|
||||
|
||||
await node2.start()
|
||||
|
||||
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
# get the current epoch time
|
||||
let time = epochTime()
|
||||
# create some messages with rate limit proofs
|
||||
var
|
||||
wm1 = WakuMessage(payload: "message 1".toBytes(), contentTopic: contentTopic)
|
||||
proofAdded1 = node1.wakuRlnRelay.appendRLNProof(wm1, time)
|
||||
# another message in the same epoch as wm1, it will break the messaging rate limit
|
||||
wm2 = WakuMessage(payload: "message 2".toBytes(), contentTopic: contentTopic)
|
||||
proofAdded2 = node1.wakuRlnRelay.appendRLNProof(wm2, time + EpochUnitSeconds)
|
||||
# wm3 points to the next epoch
|
||||
wm3 = WakuMessage(payload: "message 3".toBytes(), contentTopic: contentTopic)
|
||||
proofAdded3 = node1.wakuRlnRelay.appendRLNProof(wm3, time + EpochUnitSeconds * 2)
|
||||
|
||||
# check proofs are added correctly
|
||||
check:
|
||||
proofAdded1
|
||||
proofAdded2
|
||||
proofAdded3
|
||||
|
||||
# relay handler for node2
|
||||
var completionFut1 = newFuture[bool]()
|
||||
var completionFut2 = newFuture[bool]()
|
||||
var completionFut3 = newFuture[bool]()
|
||||
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
|
||||
debug "The received topic:", topic
|
||||
if topic == DefaultPubsubTopic:
|
||||
if msg == wm1:
|
||||
completionFut1.complete(true)
|
||||
if msg == wm2:
|
||||
completionFut2.complete(true)
|
||||
if msg == wm3:
|
||||
completionFut3.complete(true)
|
||||
|
||||
# mount the relay handler for node2
|
||||
node2.subscribe(DefaultPubsubTopic, relayHandler)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
await node1.publish(DefaultPubsubTopic, wm1)
|
||||
await sleepAsync(10.seconds)
|
||||
await node1.publish(DefaultPubsubTopic, wm2)
|
||||
await sleepAsync(10.seconds)
|
||||
await node1.publish(DefaultPubsubTopic, wm3)
|
||||
|
||||
let
|
||||
res1 = await completionFut1.withTimeout(10.seconds)
|
||||
res2 = await completionFut2.withTimeout(10.seconds)
|
||||
res3 = await completionFut3.withTimeout(10.seconds)
|
||||
|
||||
check:
|
||||
(res1 and res2 and res3) == true # all 3 are valid
|
||||
node2.wakuRlnRelay.nullifierLog.len() == 1 # after clearing, only 1 is stored
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
@ -38,7 +38,6 @@ type WakuRlnConfig* = object
|
||||
rlnRelayCredPath*: string
|
||||
rlnRelayCredPassword*: string
|
||||
rlnRelayTreePath*: string
|
||||
rlnRelayBandwidthThreshold*: int
|
||||
|
||||
proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[(
|
||||
seq[RawMembershipCredentials], string
|
||||
@ -77,7 +76,7 @@ proc calcEpoch*(t: float64): Epoch =
|
||||
|
||||
type WakuRLNRelay* = ref object of RootObj
|
||||
# the log of nullifiers and Shamir shares of the past messages grouped per epoch
|
||||
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
|
||||
nullifierLog*: OrderedTable[Epoch, seq[ProofMetadata]]
|
||||
lastEpoch*: Epoch # the epoch of the last published rln message
|
||||
groupManager*: GroupManager
|
||||
|
||||
@ -300,13 +299,26 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay,
|
||||
msg.proof = proofGenRes.get().encode().buffer
|
||||
return true
|
||||
|
||||
proc clearNullifierLog(rlnPeer: WakuRlnRelay) =
|
||||
# clear the first MaxEpochGap epochs of the nullifer log
|
||||
# if more than MaxEpochGap epochs are in the log
|
||||
# note: the epochs are ordered ascendingly
|
||||
if rlnPeer.nullifierLog.len().uint < MaxEpochGap:
|
||||
return
|
||||
|
||||
trace "clearing epochs from the nullifier log", count = MaxEpochGap
|
||||
let epochsToClear = rlnPeer.nullifierLog.keys().toSeq()[0..<MaxEpochGap]
|
||||
for epoch in epochsToClear:
|
||||
rlnPeer.nullifierLog.del(epoch)
|
||||
|
||||
proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
|
||||
spamHandler: Option[SpamHandler] = none(SpamHandler)): WakuValidatorHandler =
|
||||
spamHandler = none(SpamHandler)): WakuValidatorHandler =
|
||||
## this procedure is a thin wrapper for the pubsub addValidator method
|
||||
## it sets a validator for waku messages, acting in the registered pubsub topic
|
||||
## the message validation logic is according to https://rfc.vac.dev/spec/17/
|
||||
proc validator(topic: string, message: WakuMessage): Future[pubsub.ValidationResult] {.async.} =
|
||||
trace "rln-relay topic validator is called"
|
||||
wakuRlnRelay.clearNullifierLog()
|
||||
|
||||
let decodeRes = RateLimitProof.init(message.proof)
|
||||
|
||||
@ -342,7 +354,7 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
|
||||
return validator
|
||||
|
||||
proc mount(conf: WakuRlnConfig,
|
||||
registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)
|
||||
registrationHandler = none(RegistrationHandler)
|
||||
): Future[WakuRlnRelay] {.async.} =
|
||||
var
|
||||
groupManager: GroupManager
|
||||
|
Loading…
x
Reference in New Issue
Block a user