From 21604e6bfdbf7872f70de0432bc63109306d37e6 Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Fri, 8 Sep 2023 18:24:27 +0530 Subject: [PATCH] fix(rln-relay): missed roots during sync (#2015) --- .../test_rln_group_manager_onchain.nim | 18 ++++++ .../group_manager/on_chain/group_manager.nim | 64 ++++++------------- 2 files changed, 39 insertions(+), 43 deletions(-) diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index 600b3e1b7..a0ac49516 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -193,6 +193,8 @@ suite "Onchain group manager": manager.initialized manager.rlnContractDeployedBlockNumber > 0 + await manager.stop() + asyncTest "should error on initialization when loaded metadata does not match": let manager = await setup() await manager.init() @@ -220,12 +222,14 @@ suite "Onchain group manager": await manager.init() await manager.startGroupSync() + await manager.stop() asyncTest "startGroupSync: should guard against uninitialized state": let manager = await setup() expect(ValueError): await manager.startGroupSync() + await manager.stop() asyncTest "startGroupSync: should sync to the state of the group": let manager = await setup() @@ -262,6 +266,7 @@ suite "Onchain group manager": check: merkleRootBefore != merkleRootAfter + await manager.stop() asyncTest "startGroupSync: should fetch history correctly": let manager = await setup() @@ -303,6 +308,7 @@ suite "Onchain group manager": check: merkleRootBefore != merkleRootAfter manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize + await manager.stop() asyncTest "register: should guard against uninitialized state": let manager = await setup() @@ -310,6 +316,7 @@ suite "Onchain group manager": expect(ValueError): await manager.register(dummyCommitment) + await manager.stop() asyncTest "register: should register successfully": let manager = await setup() @@ -329,6 +336,7 @@ suite "Onchain group manager": check: merkleRootAfter.inHex() != merkleRootBefore.inHex() manager.latestIndex == 1 + await manager.stop() asyncTest "register: callback is called": let manager = await setup() @@ -354,6 +362,7 @@ suite "Onchain group manager": check: manager.rlnInstance.getMetadata().get().validRoots == manager.validRoots.toSeq() + await manager.stop() asyncTest "withdraw: should guard against uninitialized state": let manager = await setup() @@ -361,6 +370,7 @@ suite "Onchain group manager": expect(ValueError): await manager.withdraw(idSecretHash) + await manager.stop() asyncTest "validateRoot: should validate good root": let manager = await setup() @@ -402,6 +412,7 @@ suite "Onchain group manager": check: validated + await manager.stop() asyncTest "validateRoot: should reject bad root": let manager = await setup() @@ -432,6 +443,7 @@ suite "Onchain group manager": check: validated == false + await manager.stop() asyncTest "verifyProof: should verify valid proof": let manager = await setup() @@ -474,6 +486,7 @@ suite "Onchain group manager": check: verifiedRes.get() + await manager.stop() asyncTest "verifyProof: should reject invalid proof": let manager = await setup() @@ -510,6 +523,7 @@ suite "Onchain group manager": check: verifiedRes.get() == false + await manager.stop() asyncTest "backfillRootQueue: should backfill roots in event of chain reorg": let manager = await setup() @@ -554,6 +568,7 @@ suite "Onchain group manager": manager.validRoots.len() == credentialCount - 1 manager.validRootBuffer.len() == 0 manager.validRoots[credentialCount - 2] == expectedLastRoot + await manager.stop() asyncTest "isReady should return false if ethRpc is none": var manager = await setup() @@ -563,6 +578,7 @@ suite "Onchain group manager": check: (await manager.isReady()) == false + await manager.stop() asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed": var manager = await setup() @@ -570,6 +586,7 @@ suite "Onchain group manager": check: (await manager.isReady()) == false + await manager.stop() asyncTest "isReady should return true if ethRpc is ready": var manager = await setup() @@ -579,6 +596,7 @@ suite "Onchain group manager": check: (await manager.isReady()) == true + await manager.stop() ################################ diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index f5375f006..b82c298fd 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -232,30 +232,20 @@ proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Member proc getRawEvents(g: OnchainGroupManager, fromBlock: BlockNumber, - toBlock: Option[BlockNumber] = none(BlockNumber)): Future[JsonNode] {.async.} = + toBlock: BlockNumber): Future[JsonNode] {.async.} = initializedGuard(g) let ethRpc = g.ethRpc.get() let rlnContract = g.rlnContract.get() - var normalizedToBlock: BlockNumber - if toBlock.isSome(): - var value = toBlock.get() - if value == 0: - # set to latest block - value = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) - normalizedToBlock = value - else: - normalizedToBlock = fromBlock - let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), - toBlock = some(normalizedToBlock.blockId())) + toBlock = some(toBlock.blockId())) return events proc getBlockTable(g: OnchainGroupManager, - fromBlock: BlockNumber, - toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} = + fromBlock: BlockNumber, + toBlock: BlockNumber): Future[BlockTable] {.async.} = initializedGuard(g) var blockTable = default(BlockTable) @@ -311,23 +301,14 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable): Future proc getAndHandleEvents(g: OnchainGroupManager, fromBlock: BlockNumber, - toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} = + toBlock: BlockNumber): Future[void] {.async.} = initializedGuard(g) - proc getLatestBlockNumber(): BlockNumber = - if toBlock.isSome(): - # if toBlock = 0, that implies the latest block - # which is the case when we are syncing block-by-block - # therefore, toBlock = fromBlock + 1 - # if toBlock != 0, then we are chunking blocks - # therefore, toBlock = fromBlock + blockChunkSize (which is handled) - return max(fromBlock + 1, toBlock.get()) - return fromBlock let blockTable = await g.getBlockTable(fromBlock, toBlock) await g.handleEvents(blockTable) await g.handleRemovedEvents(blockTable) - g.latestProcessedBlock = getLatestBlockNumber() + g.latestProcessedBlock = toBlock let metadataSetRes = g.setMetadata() if metadataSetRes.isErr(): # this is not a fatal error, hence we don't raise an exception @@ -337,11 +318,13 @@ proc getAndHandleEvents(g: OnchainGroupManager, proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = - let latestBlock = blockheader.number.uint + let latestBlock = BlockNumber(blockheader.number) trace "block received", blockNumber = latestBlock # get logs from the last block try: - asyncSpawn g.getAndHandleEvents(latestBlock) + # inc by 1 to prevent double processing + let fromBlock = g.latestProcessedBlock + 1 + asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock) except CatchableError: warn "failed to handle log: ", error=getCurrentExceptionMsg() return newHeadCallback @@ -368,28 +351,23 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = let blockChunkSize = 2_000 var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber: - info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock g.latestProcessedBlock + 1 else: - info "starting onchain sync from deployed block number", deployedBlockNumber = g.rlnContractDeployedBlockNumber g.rlnContractDeployedBlockNumber - let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) try: # we always want to sync from last processed block => latest - if fromBlock == BlockNumber(0) or - fromBlock + BlockNumber(blockChunkSize) < latestBlock: - # chunk events - while true: - let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber()) - let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) - info "chunking events", fromBlock = fromBlock, toBlock = toBlock - await g.getAndHandleEvents(fromBlock, some(toBlock)) - fromBlock = toBlock + 1 - if fromBlock >= currentLatestBlock: - break - else: - await g.getAndHandleEvents(fromBlock, some(BlockNumber(0))) + # chunk events + while true: + let currentLatestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + if fromBlock >= currentLatestBlock: + break + + let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) + debug "fetching events", fromBlock = fromBlock, toBlock = toBlock + await g.getAndHandleEvents(fromBlock, toBlock) + fromBlock = toBlock + 1 + except CatchableError: raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())