diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 6237b287b..118e6c5ab 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -597,6 +597,19 @@ proc startListeningToEvents( let newBlockCallback = g.getNewBlockCallback() g.runInInterval(newBlockCallback, DefaultBlockPollRate) +proc batchAwaitBlockHandlingFuture( + g: OnchainGroupManager, futs: seq[Future[bool]] +): Future[void] {.async: (raises: [Exception]).} = + for fut in futs: + try: + var handleBlockRes: bool + g.retryWrapper(handleBlockRes, "Failed to handle block"): + await fut + except CatchableError: + raise newException( + CatchableError, "could not fetch events from block: " & getCurrentExceptionMsg() + ) + proc startOnchainSync( g: OnchainGroupManager ): Future[void] {.async: (raises: [Exception]).} = @@ -606,6 +619,10 @@ proc startOnchainSync( # static block chunk size let blockChunkSize = 2_000 + # delay between rpc calls to not overload the rate limit + let rpcDelay = 200.milliseconds + # max number of futures to run concurrently + let maxFutures = 10 var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber: @@ -616,25 +633,36 @@ proc startOnchainSync( blockNumber = g.rlnContractDeployedBlockNumber g.rlnContractDeployedBlockNumber + var futs = newSeq[Future[bool]]() + var currentLatestBlock: BlockNumber + g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): + cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + try: # we always want to sync from last processed block => latest # chunk events while true: - var currentLatestBlock: BlockNumber - g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): - cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + # if the fromBlock is less than 2k blocks behind the current block + # then fetch the new toBlock if fromBlock >= currentLatestBlock: break + + if fromBlock + blockChunkSize.uint > currentLatestBlock.uint: + g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): + cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) debug "fetching events", fromBlock = fromBlock, toBlock = toBlock - var handleBlockRes: bool - g.retryWrapper(handleBlockRes, "Failed to handle old blocks"): - await g.getAndHandleEvents(fromBlock, toBlock) + await sleepAsync(rpcDelay) + futs.add(g.getAndHandleEvents(fromBlock, toBlock)) + if futs.len >= maxFutures or toBlock == currentLatestBlock: + await g.batchAwaitBlockHandlingFuture(futs) + futs = newSeq[Future[bool]]() fromBlock = toBlock + 1 except CatchableError: raise newException( - ValueError, + CatchableError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg(), )