fix(rln-relay): graceful retries on rpc calls (#2250)

* fix(rln-relay): graceful retries on rpc calls

* fix: missing file
This commit is contained in:
Aaryamann Challani 2023-12-11 14:59:16 +05:30 committed by GitHub
parent 77c5ba7669
commit 15c1f974df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 60 deletions

View File

@ -18,7 +18,8 @@ import
../../../waku_keystore, ../../../waku_keystore,
../../rln, ../../rln,
../../conversion_utils, ../../conversion_utils,
../group_manager_base ../group_manager_base,
./retry_wrapper
from strutils import parseHexInt from strutils import parseHexInt
@ -142,20 +143,21 @@ method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential
let registryContract = g.registryContract.get() let registryContract = g.registryContract.get()
let membershipFee = g.membershipFee.get() let membershipFee = g.membershipFee.get()
let gasPrice = int(await ethRpc.provider.eth_gasPrice()) * 2 var gasPrice: int
retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = identityCredentials.idCommitment.toUInt256() let idCommitment = identityCredentials.idCommitment.toUInt256()
var txHash: TxHash var txHash: TxHash
try: # send the registration transaction and check if any error occurs
let storageIndex = g.usingStorageIndex.get() let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex
txHash = await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice) retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"):
except CatchableError: await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice)
error "error while registering the member", msg = getCurrentExceptionMsg()
raise newException(CatchableError, "could not register the member: " & getCurrentExceptionMsg())
# wait for the transaction to be mined # wait for the transaction to be mined
let tsReceipt = await ethRpc.getMinedTransactionReceipt(txHash) var tsReceipt: ReceiptObject
retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash) g.registrationTxHash = some(txHash)
# the receipt topic holds the hash of signature of the raised events # the receipt topic holds the hash of signature of the raised events
@ -241,7 +243,9 @@ proc getRawEvents(g: OnchainGroupManager,
let ethRpc = g.ethRpc.get() let ethRpc = g.ethRpc.get()
let rlnContract = g.rlnContract.get() let rlnContract = g.rlnContract.get()
let events = await rlnContract.getJsonLogs(MemberRegistered, var events: JsonNode
retryWrapper(events, RetryStrategy.new(), "Failed to get the events"):
await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()), fromBlock = some(fromBlock.blockId()),
toBlock = some(toBlock.blockId())) toBlock = some(toBlock.blockId()))
return events return events
@ -340,10 +344,9 @@ proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} =
let ethRpc = g.ethRpc.get() let ethRpc = g.ethRpc.get()
let newHeadCallback = g.getNewHeadCallback() let newHeadCallback = g.getNewHeadCallback()
try: var blockHeaderSub: Subscription
discard await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback) retryWrapper(blockHeaderSub, RetryStrategy.new(), "Failed to subscribe to block headers"):
except CatchableError: await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback)
raise newException(ValueError, "failed to subscribe to block headers: " & getCurrentExceptionMsg())
proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
initializedGuard(g) initializedGuard(g)
@ -364,7 +367,9 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
# we always want to sync from last processed block => latest # we always want to sync from last processed block => latest
# chunk events # chunk events
while true: while true:
let currentLatestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) var currentLatestBlock: BlockNumber
retryWrapper(currentLatestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if fromBlock >= currentLatestBlock: if fromBlock >= currentLatestBlock:
break break
@ -400,14 +405,12 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
method init*(g: OnchainGroupManager): Future[void] {.async.} = method init*(g: OnchainGroupManager): Future[void] {.async.} =
var ethRpc: Web3 var ethRpc: Web3
# check if the Ethereum client is reachable # check if the Ethereum client is reachable
try: retryWrapper(ethRpc, RetryStrategy.new(), "Failed to connect to the Ethereum client"):
ethRpc = await newWeb3(g.ethClientUrl) await newWeb3(g.ethClientUrl)
except CatchableError:
let errMsg = "could not connect to the Ethereum client: " & getCurrentExceptionMsg()
raise newException(ValueError, errMsg)
# Set the chain id # Set the chain id
let chainId = await ethRpc.provider.eth_chainId() var chainId: Quantity
retryWrapper(chainId, RetryStrategy.new(), "Failed to get the chain id"):
await ethRpc.provider.eth_chainId()
g.chainId = some(chainId) g.chainId = some(chainId)
if g.ethPrivateKey.isSome(): if g.ethPrivateKey.isSome():
@ -422,9 +425,14 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
let registryContract = ethRpc.contractSender(WakuRlnRegistry, registryAddress) let registryContract = ethRpc.contractSender(WakuRlnRegistry, registryAddress)
# get the current storage index # get the current storage index
let usingStorageIndex = await registryContract.usingStorageIndex().call() var usingStorageIndex: Uint16
retryWrapper(usingStorageIndex, RetryStrategy.new(), "Failed to get the storage index"):
await registryContract.usingStorageIndex().call()
g.usingStorageIndex = some(usingStorageIndex) g.usingStorageIndex = some(usingStorageIndex)
let rlnContractAddress = await registryContract.storages(usingStorageIndex).call() var rlnContractAddress: Address
retryWrapper(rlnContractAddress, RetryStrategy.new(), "Failed to get the rln contract address"):
await registryContract.storages(usingStorageIndex).call()
let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress) let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress)
g.ethRpc = some(ethRpc) g.ethRpc = some(ethRpc)
@ -477,39 +485,36 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
# check if the contract exists by calling a static function # check if the contract exists by calling a static function
var membershipFee: Uint256 var membershipFee: Uint256
try: retryWrapper(membershipFee, RetryStrategy.new(), "Failed to get the membership deposit"):
membershipFee = await rlnContract.MEMBERSHIP_DEPOSIT().call() await rlnContract.MEMBERSHIP_DEPOSIT().call()
except CatchableError:
raise newException(ValueError,
"could not get the membership deposit: " & getCurrentExceptionMsg())
g.membershipFee = some(membershipFee) g.membershipFee = some(membershipFee)
var deployedBlockNumber: Uint256 var deployedBlockNumber: Uint256
try: retryWrapper(deployedBlockNumber, RetryStrategy.new(), "Failed to get the deployed block number"):
deployedBlockNumber = await rlnContract.deployedBlockNumber().call() await rlnContract.deployedBlockNumber().call()
debug "using rln storage", deployedBlockNumber, rlnContractAddress debug "using rln storage", deployedBlockNumber, rlnContractAddress
except CatchableError:
raise newException(ValueError,
"could not get the deployed block number: " & getCurrentExceptionMsg())
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
ethRpc.ondisconnect = proc() = proc onDisconnect() {.async.} =
error "Ethereum client disconnected" error "Ethereum client disconnected"
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
try: var newEthRpc: Web3
let newEthRpc = waitFor newWeb3(g.ethClientUrl) retryWrapper(newEthRpc, RetryStrategy.new(), "Failed to reconnect with the Ethereum client"):
await newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc) g.ethRpc = some(newEthRpc)
except CatchableError:
error "failed to reconnect with the Ethereum client", error = getCurrentExceptionMsg()
return
try: try:
asyncSpawn g.startOnchainSync() asyncSpawn g.startOnchainSync()
except CatchableError: except CatchableError:
error "failed to restart group sync", error = getCurrentExceptionMsg() error "failed to restart group sync", error = getCurrentExceptionMsg()
ethRpc.ondisconnect = proc() =
asyncCheck onDisconnect()
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true g.initialized = true
@ -526,12 +531,10 @@ method stop*(g: OnchainGroupManager): Future[void] {.async.} =
proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
let ethRpc = g.ethRpc.get() let ethRpc = g.ethRpc.get()
try: var syncing: JsonNode
let syncing = await ethRpc.provider.eth_syncing() retryWrapper(syncing, RetryStrategy.new(), "Failed to get the syncing status"):
await ethRpc.provider.eth_syncing()
return syncing.getBool() return syncing.getBool()
except CatchableError:
error "failed to get the syncing status", error = getCurrentExceptionMsg()
return false
method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
initializedGuard(g) initializedGuard(g)
@ -540,14 +543,11 @@ method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
return false return false
var currentBlock: BlockNumber var currentBlock: BlockNumber
try: retryWrapper(currentBlock, RetryStrategy.new(), "Failed to get the current block number"):
currentBlock = cast[BlockNumber](await g.ethRpc cast[BlockNumber](await g.ethRpc
.get() .get()
.provider .provider
.eth_blockNumber()) .eth_blockNumber())
except CatchableError:
error "failed to get the current block number", error = getCurrentExceptionMsg()
return false
if g.latestProcessedBlock < currentBlock: if g.latestProcessedBlock < currentBlock:
return false return false

View File

@ -0,0 +1,32 @@
import
chronos
type RetryStrategy* = object
shouldRetry*: bool
retryDelay*: Duration
retryCount*: uint
proc new*(T: type RetryStrategy): RetryStrategy =
return RetryStrategy(
shouldRetry: true,
retryDelay: 1000.millis,
retryCount: 3
)
template retryWrapper*(res: auto,
retryStrategy: RetryStrategy,
errStr: string,
body: untyped): auto =
var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry
while shouldRetry and retryCount > 0:
try:
res = body
shouldRetry = false
except:
retryCount -= 1
await sleepAsync(retryStrategy.retryDelay)
if shouldRetry:
raise newException(CatchableError, errStr & ": " & $getCurrentExceptionMsg())