diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index e61ffb956..2c062a873 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -118,27 +118,37 @@ method atomicBatch*( rateCommitments = newSeq[RawRateCommitment](), toRemoveIndices = newSeq[MembershipIndex](), ): Future[void] {.async: (raises: [Exception]), base.} = + echo "-------------- atomicBatch 1" initializedGuard(g) + echo "-------------- atomicBatch 2" waku_rln_membership_insertion_duration_seconds.nanosecondTime: let operationSuccess = g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices) + echo "-------------- atomicBatch 3" if not operationSuccess: + echo "-------------- atomicBatch " raise newException(CatchableError, "atomic batch operation failed") # TODO: when slashing is enabled, we need to track slashed members waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) + echo "-------------- atomicBatch 5" if g.registerCb.isSome(): + echo "-------------- atomicBatch 6" var membersSeq = newSeq[Membership]() + echo "-------------- atomicBatch rateCommitments.len: ", rateCommitments.len for i in 0 ..< rateCommitments.len: var index = start + MembershipIndex(i) debug "registering member to callback", rateCommitment = rateCommitments[i], index = index let member = Membership(rateCommitment: rateCommitments[i], index: index) membersSeq.add(member) + echo "-------------- atomicBatch 7" await g.registerCb.get()(membersSeq) + echo "-------------- atomicBatch 8" g.validRootBuffer = g.slideRootQueue() + echo "-------------- atomicBatch 9" method register*( g: OnchainGroupManager, rateCommitment: RateCommitment @@ -404,6 +414,7 @@ proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) = proc getNewBlockCallback(g: OnchainGroupManager): proc = let ethRpc = g.ethRpc.get() proc wrappedCb(): Future[bool] {.async, gcsafe.} = + echo "------------ calling wrappedCb" var latestBlock: BlockNumber g.retryWrapper(latestBlock, "Failed to get the latest block number"): cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) @@ -430,10 +441,14 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc = proc startListeningToEvents( g: OnchainGroupManager ): Future[void] {.async: (raises: [Exception]).} = + echo "---------- startListeningToEvents 1" initializedGuard(g) + echo "---------- startListeningToEvents 2" let ethRpc = g.ethRpc.get() + echo "---------- startListeningToEvents 3" let newBlockCallback = g.getNewBlockCallback() + echo "---------- startListeningToEvents 4" g.runInInterval(newBlockCallback, DefaultBlockPollRate) proc batchAwaitBlockHandlingFuture( @@ -454,6 +469,7 @@ proc startOnchainSync( ): Future[void] {.async: (raises: [Exception]).} = initializedGuard(g) + echo "---------- startOnchainSync 1" let ethRpc = g.ethRpc.get() # static block chunk size @@ -463,6 +479,7 @@ proc startOnchainSync( # max number of futures to run concurrently let maxFutures = 10 + echo "---------- startOnchainSync 2" var fromBlock: BlockNumber = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber: info "syncing from last processed block", blockNumber = g.latestProcessedBlock @@ -472,34 +489,47 @@ proc startOnchainSync( blockNumber = g.rlnContractDeployedBlockNumber g.rlnContractDeployedBlockNumber + echo "---------- startOnchainSync 3" var futs = newSeq[Future[bool]]() var currentLatestBlock: BlockNumber g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + echo "---------- startOnchainSync 4" try: # we always want to sync from last processed block => latest # chunk events while true: + echo "---------- startOnchainSync fromBlock", fromBlock + echo "---------- startOnchainSync currentLatestBlock", currentLatestBlock # if the fromBlock is less than 2k blocks behind the current block # then fetch the new toBlock if fromBlock >= currentLatestBlock: + echo "---------- startOnchainSync 5" break if fromBlock + blockChunkSize > currentLatestBlock: + echo "---------- startOnchainSync 6" g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) let toBlock = min(fromBlock + blockChunkSize, currentLatestBlock) debug "fetching events", fromBlock = fromBlock, toBlock = toBlock + echo "---------- startOnchainSync 7" await sleepAsync(rpcDelay) + echo "---------- startOnchainSync 8" futs.add(g.getAndHandleEvents(fromBlock, toBlock)) + echo "---------- startOnchainSync 9" if futs.len >= maxFutures or toBlock == currentLatestBlock: + echo "---------- startOnchainSync 10" await g.batchAwaitBlockHandlingFuture(futs) + echo "---------- startOnchainSync 11" g.setMetadata(lastProcessedBlock = some(toBlock)).isOkOr: error "failed to persist rln metadata", error = $error futs = newSeq[Future[bool]]() + echo "---------- startOnchainSync 12" fromBlock = toBlock + 1 + echo "---------- startOnchainSync 13" except CatchableError: raise newException( CatchableError, @@ -508,6 +538,7 @@ proc startOnchainSync( # listen to blockheaders and contract events try: + echo "---------- startOnchainSync " await g.startListeningToEvents() except CatchableError: raise newException(