From 5cb440d2b8c38a73d67fe5bfd42e121aec72f394 Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Wed, 28 Jun 2023 18:00:45 +0530 Subject: [PATCH] fix(rln-relay): chunk event fetching (#1830) --- .../group_manager/on_chain/group_manager.nim | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim index 1013aa511..8f484eca0 100644 --- a/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -62,6 +62,8 @@ type const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePassword* = "password" +const BlockChunkSize* = 100'u64 + template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: raise newException(ValueError, "OnchainGroupManager is not initialized") @@ -316,16 +318,32 @@ proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} = proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = initializedGuard(g) - let fromBlock = if g.latestProcessedBlock.isSome(): + let ethRpc = g.ethRpc.get() + + var fromBlock = if g.latestProcessedBlock.isSome(): info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get() g.latestProcessedBlock.get() else: info "starting onchain sync from scratch" + # chunk size is 1000 blocks BlockNumber(0) + let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) try: # we always want to sync from last processed block => latest - await g.getAndHandleEvents(fromBlock, some(BlockNumber(0))) + if fromBlock == BlockNumber(0) or + fromBlock + BlockNumber(BlockChunkSize) < latestBlock: + # chunk events + while true: + let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber()) + let toBlock = min(fromBlock + BlockNumber(BlockChunkSize), currentLatestBlock) + info "chunking events", fromBlock = fromBlock, toBlock = toBlock + await g.getAndHandleEvents(fromBlock, some(toBlock)) + fromBlock = toBlock + 1 + if fromBlock >= currentLatestBlock: + break + else: + await g.getAndHandleEvents(fromBlock, some(BlockNumber(0))) except CatchableError: raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())