From f9bd39ea662632b2a18b09327ecfec02e5b0b16d Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Mon, 11 Dec 2023 14:59:16 +0530 Subject: [PATCH] fix(rln-relay): graceful retries on rpc calls (#2250) * fix(rln-relay): graceful retries on rpc calls * fix: missing file --- .../group_manager/on_chain/group_manager.nim | 120 +++++++++--------- .../group_manager/on_chain/retry_wrapper.nim | 32 +++++ 2 files changed, 92 insertions(+), 60 deletions(-) create mode 100644 waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 2cfaf1b74..0e0f05aea 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -18,7 +18,8 @@ import ../../../waku_keystore, ../../rln, ../../conversion_utils, - ../group_manager_base + ../group_manager_base, + ./retry_wrapper from strutils import parseHexInt @@ -142,20 +143,21 @@ method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential let registryContract = g.registryContract.get() let membershipFee = g.membershipFee.get() - let gasPrice = int(await ethRpc.provider.eth_gasPrice()) * 2 + var gasPrice: int + retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"): + int(await ethRpc.provider.eth_gasPrice()) * 2 let idCommitment = identityCredentials.idCommitment.toUInt256() var txHash: TxHash - try: # send the registration transaction and check if any error occurs - let storageIndex = g.usingStorageIndex.get() - debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex - txHash = await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice) - except CatchableError: - error "error while registering the member", msg = getCurrentExceptionMsg() - raise newException(CatchableError, "could not register the member: " & getCurrentExceptionMsg()) + let storageIndex = g.usingStorageIndex.get() + debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex + retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"): + await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice) # wait for the transaction to be mined - let tsReceipt = await ethRpc.getMinedTransactionReceipt(txHash) + var tsReceipt: ReceiptObject + retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"): + await ethRpc.getMinedTransactionReceipt(txHash) debug "registration transaction mined", txHash = txHash g.registrationTxHash = some(txHash) # the receipt topic holds the hash of signature of the raised events @@ -241,9 +243,11 @@ proc getRawEvents(g: OnchainGroupManager, let ethRpc = g.ethRpc.get() let rlnContract = g.rlnContract.get() - let events = await rlnContract.getJsonLogs(MemberRegistered, - fromBlock = some(fromBlock.blockId()), - toBlock = some(toBlock.blockId())) + var events: JsonNode + retryWrapper(events, RetryStrategy.new(), "Failed to get the events"): + await rlnContract.getJsonLogs(MemberRegistered, + fromBlock = some(fromBlock.blockId()), + toBlock = some(toBlock.blockId())) return events proc getBlockTable(g: OnchainGroupManager, @@ -340,10 +344,9 @@ proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} = let ethRpc = g.ethRpc.get() let newHeadCallback = g.getNewHeadCallback() - try: - discard await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback) - except CatchableError: - raise newException(ValueError, "failed to subscribe to block headers: " & getCurrentExceptionMsg()) + var blockHeaderSub: Subscription + retryWrapper(blockHeaderSub, RetryStrategy.new(), "Failed to subscribe to block headers"): + await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback) proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = initializedGuard(g) @@ -364,7 +367,9 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = # we always want to sync from last processed block => latest # chunk events while true: - let currentLatestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + var currentLatestBlock: BlockNumber + retryWrapper(currentLatestBlock, RetryStrategy.new(), "Failed to get the latest block number"): + cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) if fromBlock >= currentLatestBlock: break @@ -400,14 +405,12 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = method init*(g: OnchainGroupManager): Future[void] {.async.} = var ethRpc: Web3 # check if the Ethereum client is reachable - try: - ethRpc = await newWeb3(g.ethClientUrl) - except CatchableError: - let errMsg = "could not connect to the Ethereum client: " & getCurrentExceptionMsg() - raise newException(ValueError, errMsg) - + retryWrapper(ethRpc, RetryStrategy.new(), "Failed to connect to the Ethereum client"): + await newWeb3(g.ethClientUrl) # Set the chain id - let chainId = await ethRpc.provider.eth_chainId() + var chainId: Quantity + retryWrapper(chainId, RetryStrategy.new(), "Failed to get the chain id"): + await ethRpc.provider.eth_chainId() g.chainId = some(chainId) if g.ethPrivateKey.isSome(): @@ -422,9 +425,14 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = let registryContract = ethRpc.contractSender(WakuRlnRegistry, registryAddress) # get the current storage index - let usingStorageIndex = await registryContract.usingStorageIndex().call() + var usingStorageIndex: Uint16 + retryWrapper(usingStorageIndex, RetryStrategy.new(), "Failed to get the storage index"): + await registryContract.usingStorageIndex().call() + g.usingStorageIndex = some(usingStorageIndex) - let rlnContractAddress = await registryContract.storages(usingStorageIndex).call() + var rlnContractAddress: Address + retryWrapper(rlnContractAddress, RetryStrategy.new(), "Failed to get the rln contract address"): + await registryContract.storages(usingStorageIndex).call() let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress) g.ethRpc = some(ethRpc) @@ -477,39 +485,36 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = # check if the contract exists by calling a static function var membershipFee: Uint256 - try: - membershipFee = await rlnContract.MEMBERSHIP_DEPOSIT().call() - except CatchableError: - raise newException(ValueError, - "could not get the membership deposit: " & getCurrentExceptionMsg()) + retryWrapper(membershipFee, RetryStrategy.new(), "Failed to get the membership deposit"): + await rlnContract.MEMBERSHIP_DEPOSIT().call() g.membershipFee = some(membershipFee) var deployedBlockNumber: Uint256 - try: - deployedBlockNumber = await rlnContract.deployedBlockNumber().call() - debug "using rln storage", deployedBlockNumber, rlnContractAddress - except CatchableError: - raise newException(ValueError, - "could not get the deployed block number: " & getCurrentExceptionMsg()) + retryWrapper(deployedBlockNumber, RetryStrategy.new(), "Failed to get the deployed block number"): + await rlnContract.deployedBlockNumber().call() + debug "using rln storage", deployedBlockNumber, rlnContractAddress g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) - ethRpc.ondisconnect = proc() = + proc onDisconnect() {.async.} = error "Ethereum client disconnected" let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock - try: - let newEthRpc = waitFor newWeb3(g.ethClientUrl) - newEthRpc.ondisconnect = ethRpc.ondisconnect - g.ethRpc = some(newEthRpc) - except CatchableError: - error "failed to reconnect with the Ethereum client", error = getCurrentExceptionMsg() - return + var newEthRpc: Web3 + retryWrapper(newEthRpc, RetryStrategy.new(), "Failed to reconnect with the Ethereum client"): + await newWeb3(g.ethClientUrl) + newEthRpc.ondisconnect = ethRpc.ondisconnect + g.ethRpc = some(newEthRpc) + try: asyncSpawn g.startOnchainSync() except CatchableError: error "failed to restart group sync", error = getCurrentExceptionMsg() + ethRpc.ondisconnect = proc() = + asyncCheck onDisconnect() + + waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) g.initialized = true @@ -526,12 +531,10 @@ method stop*(g: OnchainGroupManager): Future[void] {.async.} = proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = let ethRpc = g.ethRpc.get() - try: - let syncing = await ethRpc.provider.eth_syncing() - return syncing.getBool() - except CatchableError: - error "failed to get the syncing status", error = getCurrentExceptionMsg() - return false + var syncing: JsonNode + retryWrapper(syncing, RetryStrategy.new(), "Failed to get the syncing status"): + await ethRpc.provider.eth_syncing() + return syncing.getBool() method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = initializedGuard(g) @@ -540,14 +543,11 @@ method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = return false var currentBlock: BlockNumber - try: - currentBlock = cast[BlockNumber](await g.ethRpc - .get() - .provider - .eth_blockNumber()) - except CatchableError: - error "failed to get the current block number", error = getCurrentExceptionMsg() - return false + retryWrapper(currentBlock, RetryStrategy.new(), "Failed to get the current block number"): + cast[BlockNumber](await g.ethRpc + .get() + .provider + .eth_blockNumber()) if g.latestProcessedBlock < currentBlock: return false diff --git a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim new file mode 100644 index 000000000..f4daa5ff2 --- /dev/null +++ b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim @@ -0,0 +1,32 @@ +import + chronos + + +type RetryStrategy* = object + shouldRetry*: bool + retryDelay*: Duration + retryCount*: uint + +proc new*(T: type RetryStrategy): RetryStrategy = + return RetryStrategy( + shouldRetry: true, + retryDelay: 1000.millis, + retryCount: 3 + ) + + +template retryWrapper*(res: auto, + retryStrategy: RetryStrategy, + errStr: string, + body: untyped): auto = + var retryCount = retryStrategy.retryCount + var shouldRetry = retryStrategy.shouldRetry + while shouldRetry and retryCount > 0: + try: + res = body + shouldRetry = false + except: + retryCount -= 1 + await sleepAsync(retryStrategy.retryDelay) + if shouldRetry: + raise newException(CatchableError, errStr & ": " & $getCurrentExceptionMsg())