From b3bb7a1113c6d584e30eecfd724175461f0ada9c Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Fri, 18 Aug 2023 16:38:24 +0530 Subject: [PATCH] fix(rln-relay): invalid start index being set results in invalid proofs (#1915) * fix(rln-relay): invalid proof usage * fix(rln-relay): use startIndex from first event in block * fix: latestIndex set after registerBatch --- waku/node/waku_node.nim | 2 +- .../group_manager/on_chain/group_manager.nim | 56 +++++++++++-------- waku/waku_rln_relay/rln/wrappers.nim | 2 +- waku/waku_rln_relay/rln_relay.nim | 3 +- 4 files changed, 36 insertions(+), 27 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index f26dd8287..48714a296 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -738,7 +738,7 @@ when defined(rln): let rlnRelayRes = await WakuRlnRelay.new(rlnConf, registrationHandler) if rlnRelayRes.isErr(): - raise newException(CatchableError, "failed to mount WakuRlnRelay: {rlnRelayRes.error}") + raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error) let rlnRelay = rlnRelayRes.get() let validator = generateRlnValidator(rlnRelay, spamHandler) let pb = PubSub(node.wakuRelay) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 0cf68f54f..b097597c2 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -69,40 +69,40 @@ template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: raise newException(ValueError, "OnchainGroupManager is not initialized") -method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = - initializedGuard(g) - - await g.registerBatch(@[idCommitment]) - method atomicBatch*(g: OnchainGroupManager, + start: MembershipIndex, idCommitments = newSeq[IDCommitment](), toRemoveIndices = newSeq[MembershipIndex]()): Future[void] {.async.} = initializedGuard(g) - let startIndex = g.latestIndex waku_rln_membership_insertion_duration_seconds.nanosecondTime: - let operationSuccess = g.rlnInstance.atomicWrite(some(startIndex), idCommitments, toRemoveIndices) + let operationSuccess = g.rlnInstance.atomicWrite(some(start), idCommitments, toRemoveIndices) if not operationSuccess: raise newException(ValueError, "atomic batch operation failed") if g.registerCb.isSome(): var membersSeq = newSeq[Membership]() for i in 0 ..< idCommitments.len(): - var index = g.latestIndex + MembershipIndex(i) - debug "registering member", idCommitment = idCommitments[i], index = index, latestIndex = g.latestIndex + var index = start + MembershipIndex(i) + debug "registering member", idCommitment = idCommitments[i], index = index let member = Membership(idCommitment: idCommitments[i], index: index) membersSeq.add(member) await g.registerCb.get()(membersSeq) g.validRootBuffer = g.slideRootQueue() - g.latestIndex += MembershipIndex(idCommitments.len()) +method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = + initializedGuard(g) + + await g.registerBatch(@[idCommitment]) + method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = initializedGuard(g) - await g.atomicBatch(idCommitments) + await g.atomicBatch(g.latestIndex, idCommitments) + g.latestIndex += MembershipIndex(idCommitments.len()) method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): Future[void] {.async.} = @@ -234,7 +234,7 @@ proc getBlockTable(g: OnchainGroupManager, let events = await g.getRawEvents(fromBlock, toBlock) if events.len == 0: - debug "no events found" + trace "no events found" return blockTable for event in events: @@ -255,20 +255,17 @@ proc handleEvents(g: OnchainGroupManager, for blockNumber, members in blockTable.pairs(): try: - await g.atomicBatch(idCommitments = members.mapIt(it[0].idCommitment), - toRemoveIndices = members.filterIt(it[1]).mapIt(it[0].index)) + let startIndex = blockTable[blockNumber].filterIt(not it[1])[0][0].index + let removalIndices = members.filterIt(it[1]).mapIt(it[0].index) + let idCommitments = members.mapIt(it[0].idCommitment) + await g.atomicBatch(start = startIndex, + idCommitments = idCommitments, + toRemoveIndices = removalIndices) + g.latestIndex = startIndex + MembershipIndex(idCommitments.len()) except CatchableError: error "failed to insert members into the tree", error=getCurrentExceptionMsg() raise newException(ValueError, "failed to insert members into the tree") trace "new members added to the Merkle tree", commitments=members.mapIt(it[0].idCommitment.inHex()) - g.latestProcessedBlock = some(blockNumber) - let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata( - lastProcessedBlock: blockNumber)) - if metadataSetRes.isErr(): - # this is not a fatal error, hence we don't raise an exception - warn "failed to persist rln metadata", error=metadataSetRes.error() - else: - info "rln metadata persisted", lastProcessedBlock = blockNumber return @@ -292,10 +289,21 @@ proc getAndHandleEvents(g: OnchainGroupManager, await g.handleEvents(blockTable) await g.handleRemovedEvents(blockTable) + let latestProcessedBlock = if toBlock.isSome(): toBlock.get() + else: fromBlock + g.latestProcessedBlock = some(latestProcessedBlock) + let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata( + lastProcessedBlock: latestProcessedBlock)) + if metadataSetRes.isErr(): + # this is not a fatal error, hence we don't raise an exception + warn "failed to persist rln metadata", error=metadataSetRes.error() + else: + debug "rln metadata persisted", blockNumber = latestProcessedBlock + proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = let latestBlock = blockheader.number.uint - debug "block received", blockNumber = latestBlock + trace "block received", blockNumber = latestBlock # get logs from the last block try: asyncSpawn g.getAndHandleEvents(latestBlock) @@ -327,7 +335,7 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = var fromBlock = if g.latestProcessedBlock.isSome(): info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get() - g.latestProcessedBlock.get() + g.latestProcessedBlock.get() + 1 else: info "starting onchain sync from scratch" BlockNumber(0) diff --git a/waku/waku_rln_relay/rln/wrappers.nim b/waku/waku_rln_relay/rln/wrappers.nim index 9fbd24ba4..1b9c53b53 100644 --- a/waku/waku_rln_relay/rln/wrappers.nim +++ b/waku/waku_rln_relay/rln/wrappers.nim @@ -89,7 +89,7 @@ proc createRLNInstanceLocal(d = MerkleTreeDepth, cache_capacity: 15_000, mode: "high_throughput", compression: false, - flush_interval: 12_000, + flush_interval: 500, path: if tree_path != "": tree_path else: DefaultRlnTreePath ) ) diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 3dd3b5631..b6e5cd299 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -95,6 +95,7 @@ method stop*(rlnPeer: WakuRLNRelay) {.async.} = ## Throws an error if it cannot stop the rln-relay protocol # stop the group sync, and flush data to tree db + info "stopping rln-relay" await rlnPeer.groupManager.stop() proc hasDuplicate*(rlnPeer: WakuRLNRelay, @@ -217,7 +218,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, let rootValidationRes = rlnPeer.groupManager.validateRoot(proof.merkleRoot) if not rootValidationRes: - debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot, validRoots=rlnPeer.groupManager.validRoots.mapIt(it.inHex()) + debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot.inHex(), validRoots=rlnPeer.groupManager.validRoots.mapIt(it.inHex()) waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"]) return MessageValidationResult.Invalid