mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
Rework retry_wrapper from template to proc
This commit is contained in:
parent
9b6b33f0e5
commit
879c79161f
@ -148,12 +148,6 @@ template initializedGuard(g: OnchainGroupManager): untyped =
|
||||
if not g.initialized:
|
||||
raise newException(CatchableError, "OnchainGroupManager is not initialized")
|
||||
|
||||
template retryWrapper(
|
||||
g: OnchainGroupManager, res: auto, errStr: string, body: untyped
|
||||
): auto =
|
||||
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
|
||||
body
|
||||
|
||||
proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
|
||||
let rootRes = (await g.fetchMerkleRoot()).valueOr:
|
||||
return false
|
||||
@ -172,7 +166,7 @@ proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
|
||||
|
||||
return false
|
||||
|
||||
proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError]).} =
|
||||
proc trackRootChanges*(g: OnchainGroupManager): Future[Result[void, string]] {.async.} =
|
||||
try:
|
||||
initializedGuard(g)
|
||||
const rpcDelay = 5.seconds
|
||||
@ -194,18 +188,21 @@ proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError
|
||||
|
||||
let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr:
|
||||
error "Failed to fetch next free index", error = error
|
||||
raise
|
||||
newException(CatchableError, "Failed to fetch next free index: " & error)
|
||||
return err("Failed to fetch next free index: " & error)
|
||||
|
||||
let memberCount = cast[int64](nextFreeIndex)
|
||||
waku_rln_number_registered_memberships.set(float64(memberCount))
|
||||
except CatchableError:
|
||||
error "Fatal error in trackRootChanges", error = getCurrentExceptionMsg()
|
||||
return err("Fatal error in trackRootChanges" & getCurrentExceptionMsg())
|
||||
|
||||
method register*(
|
||||
g: OnchainGroupManager, rateCommitment: RateCommitment
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
try:
|
||||
initializedGuard(g)
|
||||
except CatchableError as e:
|
||||
return err("Not initialized: " & e.msg)
|
||||
|
||||
try:
|
||||
let leaf = rateCommitment.toLeaf().get()
|
||||
@ -214,33 +211,41 @@ method register*(
|
||||
info "registering member via callback", rateCommitment = leaf, index = idx
|
||||
await g.registerCb.get()(@[Membership(rateCommitment: leaf, index: idx)])
|
||||
g.latestIndex.inc()
|
||||
except CatchableError:
|
||||
raise newException(ValueError, getCurrentExceptionMsg())
|
||||
except Exception as e:
|
||||
return err("Failed to call register callback: " & e.msg)
|
||||
|
||||
method register*(
|
||||
g: OnchainGroupManager,
|
||||
identityCredential: IdentityCredential,
|
||||
userMessageLimit: UserMessageLimit,
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
try:
|
||||
initializedGuard(g)
|
||||
except CatchableError as e:
|
||||
return err("Not initialized: " & e.msg)
|
||||
|
||||
let ethRpc = g.ethRpc.get()
|
||||
let wakuRlnContract = g.wakuRlnContract.get()
|
||||
|
||||
var gasPrice: int
|
||||
g.retryWrapper(gasPrice, "Failed to get gas price"):
|
||||
let fetchedGasPrice = uint64(await ethRpc.provider.eth_gasPrice())
|
||||
## Multiply by 2 to speed up the transaction
|
||||
## Check for overflow when casting to int
|
||||
if fetchedGasPrice > uint64(high(int) div 2):
|
||||
warn "Gas price overflow detected, capping at maximum int value",
|
||||
fetchedGasPrice = fetchedGasPrice, maxInt = high(int)
|
||||
high(int)
|
||||
else:
|
||||
let calculatedGasPrice = int(fetchedGasPrice) * 2
|
||||
debug "Gas price calculated",
|
||||
fetchedGasPrice = fetchedGasPrice, gasPrice = calculatedGasPrice
|
||||
calculatedGasPrice
|
||||
let gasPrice = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to get gas price",
|
||||
proc(): Future[int] {.async.} =
|
||||
let fetchedGasPrice = uint64(await ethRpc.provider.eth_gasPrice())
|
||||
if fetchedGasPrice > uint64(high(int) div 2):
|
||||
warn "Gas price overflow detected, capping at maximum int value",
|
||||
fetchedGasPrice = fetchedGasPrice, maxInt = high(int)
|
||||
return high(int)
|
||||
else:
|
||||
let calculatedGasPrice = int(fetchedGasPrice) * 2
|
||||
debug "Gas price calculated",
|
||||
fetchedGasPrice = fetchedGasPrice, gasPrice = calculatedGasPrice
|
||||
return calculatedGasPrice,
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to get gas price: " & error)
|
||||
|
||||
let idCommitmentHex = identityCredential.idCommitment.inHex()
|
||||
debug "identityCredential idCommitmentHex", idCommitment = idCommitmentHex
|
||||
let idCommitment = identityCredential.idCommitment.toUInt256()
|
||||
@ -249,16 +254,28 @@ method register*(
|
||||
idCommitment = idCommitment,
|
||||
userMessageLimit = userMessageLimit,
|
||||
idCommitmentsToErase = idCommitmentsToErase
|
||||
var txHash: TxHash
|
||||
g.retryWrapper(txHash, "Failed to register the member"):
|
||||
await wakuRlnContract
|
||||
.register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase)
|
||||
.send(gasPrice = gasPrice)
|
||||
let txHash = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to register the member",
|
||||
proc(): Future[TxHash] {.async.} =
|
||||
return await wakuRlnContract
|
||||
.register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase)
|
||||
.send(gasPrice = gasPrice),
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to register member: " & error)
|
||||
|
||||
# wait for the transaction to be mined
|
||||
var tsReceipt: ReceiptObject
|
||||
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
|
||||
await ethRpc.getMinedTransactionReceipt(txHash)
|
||||
let tsReceipt = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to get the transaction receipt",
|
||||
proc(): Future[ReceiptObject] {.async.} =
|
||||
return await ethRpc.getMinedTransactionReceipt(txHash),
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to get transaction receipt: " & error)
|
||||
debug "registration transaction mined", txHash = txHash
|
||||
g.registrationTxHash = some(txHash)
|
||||
# the receipt topic holds the hash of signature of the raised events
|
||||
@ -309,10 +326,13 @@ method register*(
|
||||
|
||||
if g.registerCb.isSome():
|
||||
let member = Membership(rateCommitment: rateCommitment, index: g.latestIndex)
|
||||
await g.registerCb.get()(@[member])
|
||||
try:
|
||||
await g.registerCb.get()(@[member])
|
||||
except Exception as e:
|
||||
return err("Failed to call register callback: " & e.msg)
|
||||
g.latestIndex.inc()
|
||||
|
||||
return
|
||||
return ok()
|
||||
|
||||
method withdraw*(
|
||||
g: OnchainGroupManager, idCommitment: IDCommitment
|
||||
@ -492,25 +512,30 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
|
||||
proc establishConnection(
|
||||
g: OnchainGroupManager
|
||||
): Future[GroupManagerResult[Web3]] {.async.} =
|
||||
var ethRpc: Web3
|
||||
let ethRpc = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to connect to the Ethereum client",
|
||||
proc(): Future[Web3] {.async.} =
|
||||
var innerEthRpc: Web3
|
||||
var connected = false
|
||||
for clientUrl in g.ethClientUrls:
|
||||
## We give a chance to the user to provide multiple clients
|
||||
## and we try to connect to each of them
|
||||
try:
|
||||
innerEthRpc = await newWeb3(clientUrl)
|
||||
connected = true
|
||||
break
|
||||
except CatchableError:
|
||||
error "failed connect Eth client", error = getCurrentExceptionMsg()
|
||||
|
||||
g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
|
||||
var innerEthRpc: Web3
|
||||
var connected = false
|
||||
for clientUrl in g.ethClientUrls:
|
||||
## We give a chance to the user to provide multiple clients
|
||||
## and we try to connect to each of them
|
||||
try:
|
||||
innerEthRpc = await newWeb3(clientUrl)
|
||||
connected = true
|
||||
break
|
||||
except CatchableError:
|
||||
error "failed connect Eth client", error = getCurrentExceptionMsg()
|
||||
if not connected:
|
||||
raise newException(CatchableError, "all failed")
|
||||
|
||||
if not connected:
|
||||
raise newException(CatchableError, "all failed")
|
||||
|
||||
innerEthRpc
|
||||
return innerEthRpc,
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to establish Ethereum connection: " & error)
|
||||
|
||||
return ok(ethRpc)
|
||||
|
||||
@ -519,9 +544,15 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
|
||||
let ethRpc: Web3 = (await establishConnection(g)).valueOr:
|
||||
return err("failed to connect to Ethereum clients: " & $error)
|
||||
|
||||
var fetchedChainId: UInt256
|
||||
g.retryWrapper(fetchedChainId, "Failed to get the chain id"):
|
||||
await ethRpc.provider.eth_chainId()
|
||||
let fetchedChainId = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to get the chain id",
|
||||
proc(): Future[UInt256] {.async.} =
|
||||
return await ethRpc.provider.eth_chainId(),
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to get chain id: " & error)
|
||||
|
||||
# Set the chain id
|
||||
if g.chainId == 0:
|
||||
@ -595,8 +626,10 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
|
||||
proc onDisconnect() {.async.} =
|
||||
error "Ethereum client disconnected"
|
||||
|
||||
var newEthRpc: Web3 = (await g.establishConnection()).valueOr:
|
||||
g.onFatalErrorAction("failed to connect to Ethereum clients onDisconnect")
|
||||
let newEthRpc: Web3 = (await g.establishConnection()).valueOr:
|
||||
error "Fatal: failed to reconnect to Ethereum clients after disconnect",
|
||||
error = error
|
||||
g.onFatalErrorAction("failed to reconnect to Ethereum clients: " & error)
|
||||
return
|
||||
|
||||
newEthRpc.ondisconnect = ethRpc.ondisconnect
|
||||
|
||||
@ -64,7 +64,7 @@ type WakuRLNRelay* = ref object of RootObj
|
||||
onFatalErrorAction*: OnFatalErrorHandler
|
||||
nonceManager*: NonceManager
|
||||
epochMonitorFuture*: Future[void]
|
||||
rootChangesFuture*: Future[void]
|
||||
rootChangesFuture*: Future[Result[void, string]]
|
||||
|
||||
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
|
||||
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user