feat: decouple merkle path and on-demand strategy (#3940)

* make validateRoots async

* add on-demand refreshRoots functionality

* Move max rootsrefresh time to constants

* make generateProof async and add ensureFreshMerkleProofPath

* Update to match code format and linting

* Use Wakumessage.new()

* Add trigger for client side only merkleproofcache updates

* full decoupling of updateRoots and merkleproofcache update

* Fix isNil check format

* Move moment check to top of roots and merkle path update procs

* Update PathCheckMinInterval

* Add tests for on demand merkle path updates

* Replace appendRLNProof and use message.toRLNSignal

* Fix linting

* Remove commented code

* Remove more old commented code

* Fix formatting and simplifications
This commit is contained in:
Tanya S 2026-06-12 12:22:34 +02:00 committed by GitHub
parent b7c2cee2c9
commit 22040b739f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 427 additions and 182 deletions

View File

@ -42,7 +42,7 @@ proc benchmark(
start_time = getTime()
for i in 1 .. messageLimit:
var generate_time = getTime()
let proof = manager.generateProof(data, epoch, MessageId(i.uint8)).valueOr:
let proof = (await manager.generateProof(data, epoch, MessageId(i.uint8))).valueOr:
raiseAssert $error
proofGenTimes.add(getTime() - generate_time)

View File

@ -192,9 +192,12 @@ proc publish(c: Chat, line: string) =
if not isNil(c.node.wakuRlnRelay):
# for future version when we support more than one rln protected content topic,
# we should check the message content topic as well
if c.node.wakuRlnRelay.appendRLNProof(message, float64(time)).isErr():
let proofRes =
waitFor c.node.wakuRlnRelay.generateRLNProof(message.toRLNSignal(), float64(time))
if proofRes.isErr():
info "could not append rate limit proof to the message"
else:
message.proof = proofRes.get()
info "rate limit proof is appended to the message"
let proof = RateLimitProof.init(message.proof).valueOr:
error "could not decode the RLN proof"

View File

@ -46,8 +46,12 @@ proc sendThruWaku*(
timestamp: getNanosecondTime(time),
)
(self.waku.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr:
return err("could not append rate limit proof to the message: " & $error)
message.proof = (
await self.waku.node.wakuRlnRelay.generateRLNProof(
message.toRLNSignal(), float64(time)
)
).valueOr:
return err("could not append rate limit proof to the message: " & error)
(await self.waku.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
return err("failed to publish message: " & $error)

View File

@ -169,7 +169,11 @@ proc installRelayApiHandlers*(
if not node.wakuRlnRelay.isNil():
# append the proof to the message
node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())).isOkOr:
message.proof = (
await node.wakuRlnRelay.generateRLNProof(
message.toRLNSignal(), float64(getTime().toUnix())
)
).valueOr:
return RestApiResponse.internalServerError(
"Failed to publish: error appending RLN proof to message: " & $error
)
@ -295,9 +299,13 @@ proc installRelayApiHandlers*(
# if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil():
node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())).isOkOr:
message.proof = (
await node.wakuRlnRelay.generateRLNProof(
message.toRLNSignal(), float64(getTime().toUnix())
)
).valueOr:
return RestApiResponse.internalServerError(
"Failed to publish: error appending RLN proof to message: " & $error
"Failed to publish: error appending RLN proof to message: " & error
)
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:

View File

@ -13,7 +13,7 @@ import std/times, libp2p/peerid, stew/byteutils
proc checkAndGenerateRLNProof*(
rlnPeer: Option[WakuRLNRelay], message: WakuMessage
): Result[WakuMessage, string] =
): Future[Result[WakuMessage, string]] {.async.} =
# check if the message already has RLN proof
if message.proof.len > 0:
return ok(message)
@ -26,7 +26,10 @@ proc checkAndGenerateRLNProof*(
time = getTime().toUnix()
senderEpochTime = float64(time)
var msgWithProof = message
?(rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime))
msgWithProof.proof = (
await rlnPeer.get().generateRLNProof(msgWithProof.toRLNSignal, senderEpochTime)
).valueOr:
return err($error)
return ok(msgWithProof)
proc getNilPushHandler*(): PushMessageHandler =
@ -42,7 +45,7 @@ proc getRelayPushHandler*(
pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult] {.async.} =
# append RLN proof
let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message).valueOr:
let msgWithProof = (await checkAndGenerateRLNProof(rlnPeer, message)).valueOr:
return lighpushErrorResult(LightPushErrorCode.OUT_OF_RLN_PROOF, error)
(await wakuRelay.validateMessage(pubSubTopic, msgWithProof)).isOkOr:

View File

@ -12,7 +12,7 @@ import std/times, libp2p/peerid, stew/byteutils
proc checkAndGenerateRLNProof*(
rlnPeer: Option[WakuRLNRelay], message: WakuMessage
): Result[WakuMessage, string] =
): Future[Result[WakuMessage, string]] {.async.} =
# check if the message already has RLN proof
if message.proof.len > 0:
return ok(message)
@ -25,7 +25,10 @@ proc checkAndGenerateRLNProof*(
time = getTime().toUnix()
senderEpochTime = float64(time)
var msgWithProof = message
?(rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime))
msgWithProof.proof = (
await rlnPeer.get().generateRLNProof(msgWithProof.toRLNSignal(), senderEpochTime)
).valueOr:
return err($error)
return ok(msgWithProof)
proc getNilPushHandler*(): PushMessageHandler =
@ -41,7 +44,7 @@ proc getRelayPushHandler*(
pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
# append RLN proof
let msgWithProof = ?checkAndGenerateRLNProof(rlnPeer, message)
let msgWithProof = ?(await checkAndGenerateRLNProof(rlnPeer, message))
?(await wakuRelay.validateMessage(pubSubTopic, msgWithProof))

View File

@ -1,4 +1,4 @@
import stint
import stint, chronos
import ./protocol_types
@ -10,6 +10,15 @@ const AcceptableRootWindowSize* = 50
#Size if RLN contract root cache
const RlnContractRootCacheSize* = 5
# Minimum time between two consecutive root refreshes, to avoid refreshing the roots too often when receiving messages with old roots
# Using Linea block generation time as reference, which is around 2 seconds
const RootsRefreshMinInterval* = 2.seconds
# Minimum time between two consecutive merkle proof path freshness checks.
# Bounds how often the publish path queries chain when generating proofs at a high rate.
# Using Linea block generation time ~2s and AcceptableRootWindowSize=50, we give a generous safety margin within this
const PathCheckMinInterval* = 30.seconds
# RLN membership key and index files path
const RlnCredentialsFilename* = "rlnCredentials.txt"

View File

@ -121,9 +121,7 @@ method indexOfRoot*(
## returns the index of the root in the merkle tree and returns -1 if the root is not found
return g.validRoots.find(root)
method validateRoot*(
g: GroupManager, root: MerkleNode
): bool {.base, gcsafe, raises: [].} =
method validateRoot*(g: GroupManager, root: MerkleNode): Future[bool] {.base, async.} =
## validates the root against the valid roots queue
return g.indexOfRoot(root) >= 0
@ -139,7 +137,7 @@ method generateProof*(
epoch: Epoch,
messageId: MessageId,
rlnIdentifier = DefaultRlnIdentifier,
): GroupManagerResult[RateLimitProof] {.base, gcsafe, raises: [].} =
): Future[GroupManagerResult[RateLimitProof]] {.base, async.} =
## Dummy implementation for generateProof
return err("generateProof is not implemented")

View File

@ -43,6 +43,10 @@ type
registrationHandler*: Option[RegistrationHandler]
latestProcessedBlock*: BlockNumber
merkleProofCache*: seq[byte]
lastMerklePathCheckMoment*: Moment
proofPathRefreshInFlightFut*: Future[void]
lastRootsRefreshMoment*: Moment
rootsRefreshInFlightFut*: Future[void]
# The below code is not working with the latest web3 version due to chainId being null (specifically on linea-sepolia)
# TODO: find better solution than this custom sendEthCallWithoutParams call
@ -192,7 +196,7 @@ proc updateRecentRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
newRootsDequeOrder.add(UInt256ToField(u))
if newRootsDequeOrder.len == 0:
debug "no non-zero recent roots to add; skipping update"
trace "no non-zero recent roots to add; skipping update"
return false
# Determine overlap with existing tail so we only append truly new roots
@ -217,32 +221,83 @@ proc updateRecentRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
return true
proc trackRootChanges*(g: OnchainGroupManager): Future[Result[void, string]] {.async.} =
?checkInitialized(g)
proc updateMemberCount*(
g: OnchainGroupManager
): Future[Result[void, string]] {.async.} =
## Refreshes the registered-memberships metric from on-chain `nextFreeIndex`.
## Called whenever a root change is observed.
let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr:
return err("Failed to fetch next free index: " & error)
let memberCount = cast[int64](nextFreeIndex)
waku_rln_number_registered_memberships.set(float64(memberCount))
return ok()
const rpcDelay = 10.seconds
proc refreshRoots(g: OnchainGroupManager): Future[void] {.async.} =
## On-demand refresh of validRoots from the on-chain root cache.
## Throttled to at most one refresh per RootsRefreshMinInterval; concurrent
## callers outside the throttle window coalesce onto a single in-flight refresh.
if Moment.now() - g.lastRootsRefreshMoment < RootsRefreshMinInterval:
return
while true:
let rootUpdated = await g.updateRecentRoots()
if not g.rootsRefreshInFlightFut.isNil() and not g.rootsRefreshInFlightFut.finished():
await g.rootsRefreshInFlightFut
return
if rootUpdated:
## The membership set on-chain has changed (some new members have joined or some members have left)
if g.membershipIndex.isSome():
## A membership index exists only if the node has registered with RLN.
## Non-registered nodes cannot have Merkle proof elements.
let proofResult = await g.fetchMerkleProofElements()
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
else:
g.merkleProofCache = proofResult.get()
proc doRefresh(): Future[void] {.async.} =
if await g.updateRecentRoots():
# Best-effort metric refresh - if there's a failure, it will update with the next root change
discard await g.updateMemberCount()
g.lastRootsRefreshMoment = Moment.now()
let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr:
error "Failed to fetch next free index", error = error
return err("Failed to fetch next free index: " & error)
g.rootsRefreshInFlightFut = doRefresh()
await g.rootsRefreshInFlightFut
let memberCount = cast[int64](nextFreeIndex)
waku_rln_number_registered_memberships.set(float64(memberCount))
await sleepAsync(rpcDelay)
method validateRoot*(g: OnchainGroupManager, root: MerkleNode): Future[bool] {.async.} =
if g.indexOfRoot(root) >= 0:
return true
await g.refreshRoots()
return g.indexOfRoot(root) >= 0
proc ensureFreshMerkleProofPath*(
g: OnchainGroupManager
): Future[Result[void, string]] {.async.} =
## Keeps `merkleProofCache` fresh independently of the validRoots window
## used by the receive path. Refetches the path whenever the throttle
## (`PathCheckMinInterval`) expires; trusts the cached path otherwise.
## Guards against a missing membership index because `fetchMerkleProofElements`
## unwraps it.
if g.membershipIndex.isNone():
return err("membership index is not set")
if g.merkleProofCache.len > 0 and
Moment.now() - g.lastMerklePathCheckMoment < PathCheckMinInterval:
return ok()
if not g.proofPathRefreshInFlightFut.isNil() and
not g.proofPathRefreshInFlightFut.finished():
await g.proofPathRefreshInFlightFut
if g.merkleProofCache.len > 0:
return ok()
return err("merkle proof path refresh failed")
var fetchOk = false
proc doRefresh(): Future[void] {.async.} =
let pathBytes = (await g.fetchMerkleProofElements()).valueOr:
error "Failed to refresh merkle proof path", error = error
return
g.merkleProofCache = pathBytes
g.lastMerklePathCheckMoment = Moment.now()
fetchOk = true
# Best-effort metric refresh - if there's a failure, it will update with the next root change
discard await g.updateMemberCount()
g.proofPathRefreshInFlightFut = doRefresh()
await g.proofPathRefreshInFlightFut
if not fetchOk:
return err("merkle proof path refresh failed")
return ok()
method register*(
g: OnchainGroupManager, rateCommitment: RateCommitment
@ -419,7 +474,7 @@ method generateProof*(
epoch: Epoch,
messageId: MessageId,
rlnIdentifier = DefaultRlnIdentifier,
): GroupManagerResult[RateLimitProof] {.gcsafe.} =
): Future[GroupManagerResult[RateLimitProof]] {.async.} =
## Generates an RLN proof using the cached Merkle proof and custom witness
# Ensure identity credentials and membership index are set
if g.idCredentials.isNone():
@ -429,6 +484,9 @@ method generateProof*(
if g.userMessageLimit.isNone():
return err("user message limit is not set")
debug "Generating RLN proof"
?(await g.ensureFreshMerkleProofPath())
if (g.merkleProofCache.len mod 32) != 0:
return err("Invalid merkle proof cache length")

View File

@ -177,7 +177,7 @@ proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] =
proc validateMessage*(
rlnPeer: WakuRLNRelay, msg: WakuMessage
): MessageValidationResult =
): Future[MessageValidationResult] {.async.} =
## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e.,
## the `msg`'s epoch is within MaxEpochGap of the current epoch
## the `msg` has valid rate limit proof
@ -217,7 +217,7 @@ proc validateMessage*(
waku_rln_invalid_messages_total.inc(labelValues = ["timestamp_mismatch"])
return MessageValidationResult.Invalid
let rootValidationRes = rlnPeer.groupManager.validateRoot(proof.merkleRoot)
let rootValidationRes = await rlnPeer.groupManager.validateRoot(proof.merkleRoot)
if not rootValidationRes:
warn "invalid message: provided root does not belong to acceptable window of roots",
provided = proof.merkleRoot.inHex(),
@ -272,11 +272,11 @@ proc validateMessage*(
proc validateMessageAndUpdateLog*(
rlnPeer: WakuRLNRelay, msg: WakuMessage
): MessageValidationResult =
): Future[MessageValidationResult] {.async.} =
## validates the message and updates the log to prevent double messaging
## in future messages
let isValidMessage = rlnPeer.validateMessage(msg)
let isValidMessage = await rlnPeer.validateMessage(msg)
let msgProof = RateLimitProof.init(msg.proof).valueOr:
return MessageValidationResult.Invalid
@ -291,32 +291,16 @@ proc validateMessageAndUpdateLog*(
return isValidMessage
proc createRlnProof(
rlnPeer: WakuRLNRelay, msg: WakuMessage, senderEpochTime: float64
): RlnRelayResult[seq[byte]] =
## returns a new `RateLimitProof` for the supplied `msg`
## returns an error if it cannot create the proof
## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds.
## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`)
let input = msg.toRLNSignal()
proc generateRLNProof*(
rlnPeer: WakuRLNRelay, input: seq[byte], senderEpochTime: float64
): Future[RlnRelayResult[seq[byte]]] {.async.} =
let epoch = rlnPeer.calcEpoch(senderEpochTime)
let nonce = rlnPeer.nonceManager.getNonce().valueOr:
return err("could not get new message id to generate an rln proof: " & $error)
let proof = rlnPeer.groupManager.generateProof(input, epoch, nonce).valueOr:
let proof = (await rlnPeer.groupManager.generateProof(input, epoch, nonce)).valueOr:
return err("could not generate rln-v2 proof: " & $error)
return ok(proof.encode().buffer)
proc appendRLNProof*(
rlnPeer: WakuRLNRelay, msg: var WakuMessage, senderEpochTime: float64
): RlnRelayResult[void] =
msg.proof = rlnPeer.createRlnProof(msg, senderEpochTime).valueOr:
return err($error)
return ok()
proc clearNullifierLog*(rlnPeer: WakuRlnRelay) =
# clear the first MaxEpochGap epochs of the nullifer log
# if more than MaxEpochGap epochs are in the log
@ -353,7 +337,7 @@ proc generateRlnValidator*(
return pubsub.ValidationResult.Reject
# validate the message and update log
let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(message)
let validationRes = await wakuRlnRelay.validateMessageAndUpdateLog(message)
let
proof = byteutils.toHex(msgProof.proof)
@ -455,11 +439,6 @@ proc mount(
brokerCtx: globalBrokerContext(),
)
# track root changes on smart contract merkle tree
if groupManager of OnchainGroupManager:
let onchainManager = cast[OnchainGroupManager](groupManager)
wakuRlnRelay.rootChangesFuture = onchainManager.trackRootChanges()
# Start epoch monitoring in the background
wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay)
@ -468,8 +447,10 @@ proc mount(
proc(
msg: WakuMessage, senderEpochTime: float64
): Future[Result[RequestGenerateRlnProof, string]] {.async.} =
let proof = createRlnProof(wakuRlnRelay, msg, senderEpochTime).valueOr:
return err("Could not create RLN proof: " & $error)
let proof = (
await wakuRlnRelay.generateRLNProof(msg.toRLNSignal(), senderEpochTime)
).valueOr:
return err("Could not create RLN proof: " & error)
return ok(RequestGenerateRlnProof(proof: proof)),
).isOkOr:

View File

@ -277,34 +277,34 @@ suite "Waku RlnRelay - End to End - Static":
message151kibPlus =
WakuMessage(payload: @payload150kibPlus, contentTopic: contentTopic)
doAssert(
client.wakuRlnRelay
.appendRLNProof(
message1b, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 0)
)
.isOk()
)
doAssert(
client.wakuRlnRelay
.appendRLNProof(
message1kib, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 1)
)
.isOk()
)
doAssert(
client.wakuRlnRelay
.appendRLNProof(
message150kib, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 2)
)
.isOk()
)
doAssert(
client.wakuRlnRelay
.appendRLNProof(
message151kibPlus, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 3)
)
.isOk()
)
message1b.proof = (
await client.wakuRlnRelay.generateRLNProof(
message1b.toRLNSignal(),
epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 0),
)
).valueOr:
raiseAssert "generateRLNProof failed: " & error
message1kib.proof = (
await client.wakuRlnRelay.generateRLNProof(
message1kib.toRLNSignal(),
epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 1),
)
).valueOr:
raiseAssert "generateRLNProof failed: " & error
message150kib.proof = (
await client.wakuRlnRelay.generateRLNProof(
message150kib.toRLNSignal(),
epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 2),
)
).valueOr:
raiseAssert "generateRLNProof failed: " & error
message151kibPlus.proof = (
await client.wakuRlnRelay.generateRLNProof(
message151kibPlus.toRLNSignal(),
epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 3),
)
).valueOr:
raiseAssert "generateRLNProof failed: " & error
# When sending the 1B message
discard await client.publish(some(pubsubTopic), message1b)
@ -366,13 +366,13 @@ suite "Waku RlnRelay - End to End - Static":
var message151kibPlus =
WakuMessage(payload: @payload150kibPlus, contentTopic: contentTopic)
doAssert(
client.wakuRlnRelay
.appendRLNProof(
message151kibPlus, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 3)
)
.isOk()
)
message151kibPlus.proof = (
await client.wakuRlnRelay.generateRLNProof(
message151kibPlus.toRLNSignal(),
epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 3),
)
).valueOr:
raiseAssert "generateRLNProof failed: " & error
# When sending the 150KiB plus message
completionFut = newPushHandlerFuture() # Reset Future

View File

@ -77,7 +77,10 @@ proc sendRlnMessage*(
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
var message = WakuMessage(payload: payload, contentTopic: contentTopic)
doAssert(client.wakuRlnRelay.appendRLNProof(message, epochTime()).isOk())
message.proof = (
await client.wakuRlnRelay.generateRLNProof(message.toRLNSignal(), epochTime())
).valueOr:
raiseAssert "generateRLNProof failed: " & error
discard await client.publish(some(pubsubTopic), message)
let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT)
return isCompleted
@ -89,14 +92,14 @@ proc sendRlnMessageWithInvalidProof*(
completionFuture: Future[bool],
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
let extraBytes: seq[byte] = @[byte(1), 2, 3]
let rateLimitProofRes = await client.wakuRlnRelay.groupManager.generateProof(
concat(payload, extraBytes),
# we add extra bytes to invalidate proof verification against original payload
client.wakuRlnRelay.getCurrentEpoch(),
messageId = MessageId(0),
)
let
extraBytes: seq[byte] = @[byte(1), 2, 3]
rateLimitProofRes = client.wakuRlnRelay.groupManager.generateProof(
concat(payload, extraBytes),
# we add extra bytes to invalidate proof verification against original payload
client.wakuRlnRelay.getCurrentEpoch(),
messageId = MessageId(0),
)
rateLimitProof = rateLimitProofRes.get().encode().buffer
message =
WakuMessage(payload: @payload, contentTopic: contentTopic, proof: rateLimitProof)

View File

@ -13,7 +13,7 @@ proc createRLNInstanceWrapper*(): RLNResult =
proc unsafeAppendRLNProof*(
rlnPeer: WakuRLNRelay, msg: var WakuMessage, epoch: Epoch, messageId: MessageId
): RlnRelayResult[void] =
## Test helper derived from `appendRLNProof`.
## Test helper derived from the publish-path proof flow.
## - Skips nonce validation to intentionally allow generating "bad" message IDs for tests.
## - Forces a real-time on-chain Merkle root refresh via `updateRoots()` and fetches Merkle
## proof elements, updating `merkleProofCache` (bypasses `trackRootsChanges`).
@ -29,7 +29,7 @@ proc unsafeAppendRLNProof*(
error "Failed to fetch Merkle proof", error = proofResult.error
manager.merkleProofCache = proofResult.get()
let proof = manager.generateProof(msg.toRLNSignal(), epoch, messageId).valueOr:
let proof = (waitFor manager.generateProof(msg.toRLNSignal(), epoch, messageId)).valueOr:
return err("could not generate rln-v2 proof: " & $error)
msg.proof = proof.encode().buffer

View File

@ -20,6 +20,7 @@ import
logos_delivery/waku/[
waku_rln_relay,
waku_rln_relay/protocol_types,
waku_rln_relay/protocol_metrics,
waku_rln_relay/constants,
waku_rln_relay/rln,
waku_rln_relay/conversion_utils,
@ -73,36 +74,27 @@ suite "Onchain group manager":
(waitFor manager.init()).isErrOr:
raiseAssert "Expected error when keystore file doesn't exist"
test "trackRootChanges: should guard against uninitialized state":
let initializedResult = waitFor manager.trackRootChanges()
check:
initializedResult.isErr()
initializedResult.error == "OnchainGroupManager is not initialized"
test "trackRootChanges: should sync to the state of the group":
let credentials = generateCredentials()
test "updateMemberCount: reflects on-chain registered member count":
(waitFor manager.init()).isOkOr:
raiseAssert $error
let merkleRootBefore = waitFor manager.fetchMerkleRoot()
# No members registered yet; metric should reflect nextFreeIndex == 0.
(waitFor manager.updateMemberCount()).isOkOr:
raiseAssert "updateMemberCount failed (initial): " & error
check waku_rln_number_registered_memberships.value() == 0.0
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
assert false, "error returned when calling register: " & error
const credentialCount = 3
let credentials = generateCredentials(credentialCount)
for i in 0 ..< credentials.len:
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
assert false, "register failed for credential " & $i & ": " & error
discard waitFor withTimeout(trackRootChanges(manager), 15.seconds)
(waitFor manager.updateMemberCount()).isOkOr:
raiseAssert "updateMemberCount failed (after registrations): " & error
check waku_rln_number_registered_memberships.value() == float64(credentialCount)
let merkleRootAfter = waitFor manager.fetchMerkleRoot()
check:
merkleRootBefore != merkleRootAfter
test "trackRootChanges: should fetch history correctly: fetch single root()":
test "updateRoots: appends new on-chain root to validRoots after registration":
# basic check for the soon to be deprecated root contract function, is replaced by getRecentRoots()
# TODO: We can't use `trackRootChanges()` directly in this test because its current implementation
# relies on a busy loop rather than event-based monitoring. but that busy loop fetch root every 5 seconds
# so we can't use it in this test.
const credentialCount = 6
let credentials = generateCredentials(credentialCount)
(waitFor manager.init()).isOkOr:
@ -111,7 +103,7 @@ suite "Onchain group manager":
let merkleRootBefore = (waitFor manager.fetchMerkleRoot()).valueOr:
raiseAssert "Failed to fetch merkle root before: " & error
for i in 0 ..< credentials.len():
for i in 0 ..< credentials.len:
info "Registering credential", index = i, credential = credentials[i]
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
assert false, "Failed to register credential " & $i & ": " & error
@ -122,14 +114,11 @@ suite "Onchain group manager":
check:
merkleRootBefore != merkleRootAfter
manager.validRoots.len() == credentialCount
manager.validRoots.len == credentialCount
test "trackRootChanges: should fetch history correctly: fetch root cache":
test "updateRecentRoots: appends new on-chain roots from contract cache":
# Verify that the group_manager list of valid roots is updated correctly from the recent roots
# cache as new credentials are registered.
# TODO: We can't use `trackRootChanges()` directly in this test because its current implementation
# relies on a busy loop rather than event-based monitoring. but that busy loop fetch root every 5 seconds
# so we can't use it in this test.
const credentialCount = RlnContractRootCacheSize
let credentials = generateCredentials(credentialCount)
@ -142,9 +131,9 @@ suite "Onchain group manager":
check:
merkleRootCacheBefore.len == RlnContractRootCacheSize * 32
merkleRootCacheBefore.allIt(it == 0'u8)
manager.validRoots.len() == 0
manager.validRoots.len == 0
for i in 0 ..< credentials.len():
for i in 0 ..< credentials.len:
info "Registering credential", index = i, credential = credentials[i]
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
assert false, "Failed to register credential " & $i & ": " & error
@ -156,10 +145,10 @@ suite "Onchain group manager":
check:
merkleRootCacheAfter.len == RlnContractRootCacheSize * 32
not merkleRootCacheAfter.allIt(it == 0'u8)
manager.validRoots.len() == credentialCount
manager.validRoots.len == credentialCount
manager.validRoots.items().toSeq().allIt(it != default(MerkleNode))
test "trackRootChanges: oldest roots are evicted once the window is exceeded":
test "updateRecentRoots: oldest roots are evicted once the window is exceeded":
const
initialCount = AcceptableRootWindowSize - RlnContractRootCacheSize
additionalCount = RlnContractRootCacheSize + 1
@ -174,12 +163,12 @@ suite "Onchain group manager":
assert false, "Failed to register credential " & $i & ": " & error
discard waitFor manager.updateRecentRoots()
check manager.validRoots.len() >= 3
check manager.validRoots.len >= 3
let firstThreeBefore =
@[manager.validRoots[0], manager.validRoots[1], manager.validRoots[2]]
# Register the remaining credentials, pushing the deque past AcceptableRootWindowSize.
for i in initialCount ..< credentials.len():
for i in initialCount ..< credentials.len:
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
assert false, "Failed to register credential " & $i & ": " & error
discard waitFor manager.updateRecentRoots()
@ -189,7 +178,7 @@ suite "Onchain group manager":
# AcceptableRootWindowSize + 1 registrations evicts exactly the single oldest root,
# so only the first of the original three is gone; the other two remain.
check:
manager.validRoots.len() == AcceptableRootWindowSize
manager.validRoots.len == AcceptableRootWindowSize
firstThreeBefore[0] notin rootsAfter
firstThreeBefore[1] in rootsAfter
firstThreeBefore[2] in rootsAfter
@ -208,7 +197,6 @@ suite "Onchain group manager":
res.error == "OnchainGroupManager is not initialized"
test "register: should register successfully":
# TODO :- similar to ```trackRootChanges: should fetch history correctly```
(waitFor manager.init()).isOkOr:
raiseAssert $error
@ -299,7 +287,7 @@ suite "Onchain group manager":
let epoch = default(Epoch)
info "epoch in bytes", epochHex = epoch.inHex()
let validProofRes = manager.generateProof(
let validProofRes = waitFor manager.generateProof(
data = messageBytes, epoch = epoch, messageId = MessageId(1)
)
@ -307,7 +295,7 @@ suite "Onchain group manager":
validProofRes.isOk()
let validProof = validProofRes.get()
let validated = manager.validateRoot(validProof.merkleRoot)
let validated = waitFor manager.validateRoot(validProof.merkleRoot)
check:
validated
@ -329,13 +317,16 @@ suite "Onchain group manager":
# chunk[0] becomes the MSB after reversal in group_manager; must be < 0x30
for i in 0 ..< 20:
manager.merkleProofCache[i * 32] = 0
# Pin the freshness throttle so ensureFreshMerkleProofPath does NOT refetch
# and overwrite the intentionally-corrupted cache we just planted.
manager.lastMerklePathCheckMoment = Moment.now()
let messageBytes = "Hello".toBytes()
let epoch = default(Epoch)
info "epoch in bytes", epochHex = epoch.inHex()
let validProofRes = manager.generateProof(
let validProofRes = waitFor manager.generateProof(
data = messageBytes, epoch = epoch, messageId = MessageId(1)
)
@ -343,11 +334,189 @@ suite "Onchain group manager":
validProofRes.isOk()
let validProof = validProofRes.get()
let validated = manager.validateRoot(validProof.merkleRoot)
let validated = waitFor manager.validateRoot(validProof.merkleRoot)
check:
validated == false
test "validateRoot: fast-paths without refresh when root is in window":
(waitFor manager.init()).isOkOr:
raiseAssert $error
let credentials = generateCredentials()
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
assert false, "register failed: " & error
discard waitFor manager.updateRecentRoots()
check manager.validRoots.len > 0
let knownRoot = manager.validRoots[0]
let preRefreshTs = manager.lastRootsRefreshMoment
let validated = waitFor manager.validateRoot(knownRoot)
check:
validated
# No refresh should have been triggered on a fast-path hit.
manager.lastRootsRefreshMoment == preRefreshTs
test "validateRoot: triggers on-demand refresh when root is not in window":
(waitFor manager.init()).isOkOr:
raiseAssert $error
let credentials = generateCredentials()
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
assert false, "register failed: " & error
# validRoots is intentionally not pre-populated — the refresh must happen
# inside validateRoot to bring the on-chain root into the window.
check:
manager.validRoots.len == 0
let onChainRootU256 = (waitFor manager.fetchMerkleRoot()).valueOr:
raiseAssert "failed to fetch root: " & error
let onChainRoot = UInt256ToField(onChainRootU256)
let validated = waitFor manager.validateRoot(onChainRoot)
check:
validated
manager.validRoots.len > 0
test "validateRoot: throttles refreshes to RootsRefreshMinInterval":
(waitFor manager.init()).isOkOr:
raiseAssert $error
let credentials = generateCredentials()
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
assert false, "register failed: " & error
# First miss: an unknown root forces refreshRoots to run end-to-end.
var badRoot1: MerkleNode
badRoot1[0] = 0x42
discard waitFor manager.validateRoot(badRoot1)
let firstRefreshTs = manager.lastRootsRefreshMoment
# Second miss within RootsRefreshMinInterval must be throttled out;
# lastRootsRefreshMoment stays pinned to the previous refresh timestamp.
var badRoot2: MerkleNode
badRoot2[0] = 0x43
discard waitFor manager.validateRoot(badRoot2)
check:
manager.lastRootsRefreshMoment == firstRefreshTs
test "generateProof: fast-paths without refresh inside throttle window":
(waitFor manager.init()).isOkOr:
raiseAssert $error
let credentials = generateCredentials()
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
assert false, "register failed: " & error
# Prime cache and pin the throttle so the publish-path freshness check
# short-circuits on the cached value.
manager.merkleProofCache = (waitFor manager.fetchMerkleProofElements()).valueOr:
raiseAssert "failed to fetch initial path: " & error
manager.lastMerklePathCheckMoment = Moment.now()
manager.proofPathRefreshInFlightFut = nil
let primedCache = manager.merkleProofCache
let proofRes = waitFor manager.generateProof(
data = "hello".toBytes(), epoch = default(Epoch), messageId = MessageId(1)
)
check:
proofRes.isOk()
# Hot path: no refresh future created, cache untouched.
manager.proofPathRefreshInFlightFut == nil
manager.merkleProofCache == primedCache
test "generateProof: refetches path when cache is empty":
(waitFor manager.init()).isOkOr:
raiseAssert $error
let credentials = generateCredentials()
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
assert false, "register failed: " & error
# No path yet; generateProof must run the freshness check, see the empty
# cache, and refetch.
manager.merkleProofCache = @[]
manager.proofPathRefreshInFlightFut = nil
let proofRes = waitFor manager.generateProof(
data = "hello".toBytes(), epoch = default(Epoch), messageId = MessageId(1)
)
check:
proofRes.isOk()
manager.merkleProofCache.len > 0
manager.proofPathRefreshInFlightFut != nil
test "ensureFreshMerkleProofPath: errors when membership index is not set":
(waitFor manager.init()).isOkOr:
raiseAssert $error
# No registration → no membership index. Guard must error rather than
# crashing inside fetchMerkleProofElements (which unwraps the Option).
check:
manager.membershipIndex.isNone()
let res = waitFor manager.ensureFreshMerkleProofPath()
check:
res.isErr()
res.error == "membership index is not set"
test "ensureFreshMerkleProofPath: refetches when throttle window has expired":
(waitFor manager.init()).isOkOr:
raiseAssert $error
let credentials = generateCredentials()
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
assert false, "register failed: " & error
# Prime cache with a non-empty value and an old throttle timestamp, so
# the cache fast-path does NOT trigger and we exercise the refetch branch.
manager.merkleProofCache = (waitFor manager.fetchMerkleProofElements()).valueOr:
raiseAssert "failed to prime path: " & error
manager.lastMerklePathCheckMoment = Moment.now() - PathCheckMinInterval - 1.seconds
manager.proofPathRefreshInFlightFut = nil
let preCheckTs = manager.lastMerklePathCheckMoment
let res = waitFor manager.ensureFreshMerkleProofPath()
check:
res.isOk()
manager.merkleProofCache.len > 0
manager.proofPathRefreshInFlightFut != nil
# lastMerklePathCheckMoment was bumped to "now" by the refetch.
manager.lastMerklePathCheckMoment > preCheckTs
test "ensureFreshMerkleProofPath: refresh bumps the member-count metric":
(waitFor manager.init()).isOkOr:
raiseAssert $error
const credentialCount = 4
let credentials = generateCredentials(credentialCount)
for i in 0 ..< credentials.len:
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
assert false, "register failed for credential " & $i & ": " & error
# Force a refetch by emptying the cache; the doRefresh closure should
# invoke updateMemberCount on the success path.
manager.merkleProofCache = @[]
manager.proofPathRefreshInFlightFut = nil
waku_rln_number_registered_memberships.set(0.0) # baseline
let res = waitFor manager.ensureFreshMerkleProofPath()
check:
res.isOk()
manager.merkleProofCache.len > 0
waku_rln_number_registered_memberships.value() == float64(credentialCount)
test "verifyProof: should verify valid proof":
let credentials = generateCredentials()
(waitFor manager.init()).isOkOr:
@ -384,8 +553,10 @@ suite "Onchain group manager":
info "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let validProof = manager.generateProof(
data = messageBytes, epoch = epoch, messageId = MessageId(0)
let validProof = (
waitFor manager.generateProof(
data = messageBytes, epoch = epoch, messageId = MessageId(0)
)
).valueOr:
raiseAssert $error
@ -414,12 +585,15 @@ suite "Onchain group manager":
# chunk[0] becomes the MSB after reversal in group_manager; must be < 0x30
for i in 0 ..< 20:
manager.merkleProofCache[i * 32] = 0
# Pin the freshness throttle so ensureFreshMerkleProofPath does NOT refetch
# and overwrite the intentionally-corrupted cache we just planted.
manager.lastMerklePathCheckMoment = Moment.now()
let epoch = default(Epoch)
info "epoch in bytes", epochHex = epoch.inHex()
# generate proof
let invalidProofRes = manager.generateProof(
let invalidProofRes = waitFor manager.generateProof(
data = messageBytes, epoch = epoch, messageId = MessageId(0)
)
@ -442,7 +616,7 @@ suite "Onchain group manager":
type TestBackfillFuts = array[0 .. credentialCount - 1, Future[void]]
var futures: TestBackfillFuts
for i in 0 ..< futures.len():
for i in 0 ..< futures.len:
futures[i] = newFuture[void]()
proc generateCallback(
@ -461,7 +635,7 @@ suite "Onchain group manager":
manager.onRegister(generateCallback(futures, credentials))
for i in 0 ..< credentials.len():
for i in 0 ..< credentials.len:
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
assert false, "Failed to register credential " & $i & ": " & error
discard waitFor manager.updateRecentRoots()
@ -469,7 +643,7 @@ suite "Onchain group manager":
waitFor allFutures(futures)
check:
manager.validRoots.len() == credentialCount
manager.validRoots.len == credentialCount
test "isReady should return false if ethRpc is none":
(waitFor manager.init()).isOkOr:

View File

@ -269,13 +269,13 @@ suite "Waku rln relay":
# Validate messages
let
msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1)
msgValidate1 = await wakuRlnRelay.validateMessageAndUpdateLog(wm1)
# wm2 is within the same epoch as wm1 → should be spam
msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2)
msgValidate2 = await wakuRlnRelay.validateMessageAndUpdateLog(wm2)
# wm3 is in the next epoch → should be valid
msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3)
msgValidate3 = await wakuRlnRelay.validateMessageAndUpdateLog(wm3)
# wm4 has no RLN proof → should be invalid
msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4)
msgValidate4 = await wakuRlnRelay.validateMessageAndUpdateLog(wm4)
check:
msgValidate1 == MessageValidationResult.Valid
@ -323,12 +323,12 @@ suite "Waku rln relay":
raiseAssert $error
# validate the first message because it's timestamp is the same as the generated timestamp
let msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1)
let msgValidate1 = await wakuRlnRelay.validateMessageAndUpdateLog(wm1)
# wait for 2 seconds to make the timestamp different from generated timestamp
await sleepAsync(2.seconds)
let msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2)
let msgValidate2 = await wakuRlnRelay.validateMessageAndUpdateLog(wm2)
check:
msgValidate1 == MessageValidationResult.Valid
@ -378,8 +378,8 @@ suite "Waku rln relay":
raiseAssert $error
let
msgValidate1 = wakuRlnRelay1.validateMessageAndUpdateLog(wm1)
msgValidate2 = wakuRlnRelay1.validateMessageAndUpdateLog(wm2)
msgValidate1 = await wakuRlnRelay1.validateMessageAndUpdateLog(wm1)
msgValidate2 = await wakuRlnRelay1.validateMessageAndUpdateLog(wm2)
check:
msgValidate1 == MessageValidationResult.Valid

View File

@ -54,9 +54,10 @@ proc sendRlnMessage*(
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
var message = WakuMessage(payload: payload, contentTopic: contentTopic)
let appendResult = client.wakuRlnRelay.appendRLNProof(message, epochTime())
# Assignment required or crashess
assertResultOk(appendResult)
message.proof = (
await client.wakuRlnRelay.generateRLNProof(message.toRLNSignal(), epochTime())
).valueOr:
raiseAssert "generateRLNProof failed: " & error
discard await client.publish(some(pubsubTopic), message)
let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT)
return isCompleted
@ -68,13 +69,13 @@ proc sendRlnMessageWithInvalidProof*(
completionFuture: Future[bool],
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
let extraBytes: seq[byte] = @[byte(1), 2, 3]
let rateLimitProofRes = await client.wakuRlnRelay.groupManager.generateProof(
concat(payload, extraBytes),
# we add extra bytes to invalidate proof verification against original payload
client.wakuRlnRelay.getCurrentEpoch(),
)
let
extraBytes: seq[byte] = @[byte(1), 2, 3]
rateLimitProofRes = client.wakuRlnRelay.groupManager.generateProof(
concat(payload, extraBytes),
# we add extra bytes to invalidate proof verification against original payload
client.wakuRlnRelay.getCurrentEpoch(),
)
rateLimitProof = rateLimitProofRes.get().encode().buffer
message =
WakuMessage(payload: @payload, contentTopic: contentTopic, proof: rateLimitProof)