Merge dce45ba9e876d3ade9f8b6da06d9fa26634c875b into 96196ab8bc05f31b09dac2403f9d5de3bc05f31b

This commit is contained in:
Tanya S 2025-12-22 15:17:50 -08:00 committed by GitHub
commit 1fcc4c00d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 199 additions and 191 deletions

View File

@ -86,10 +86,9 @@ suite "Onchain group manager":
let merkleRootBefore = waitFor manager.fetchMerkleRoot() let merkleRootBefore = waitFor manager.fetchMerkleRoot()
try: (waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
waitFor manager.register(credentials, UserMessageLimit(20)) assert false,
except Exception, CatchableError: "error returned when calling register: " & error
assert false, "exception raised: " & getCurrentExceptionMsg()
discard waitFor withTimeout(trackRootChanges(manager), 15.seconds) discard waitFor withTimeout(trackRootChanges(manager), 15.seconds)
@ -110,13 +109,11 @@ suite "Onchain group manager":
let merkleRootBefore = waitFor manager.fetchMerkleRoot() let merkleRootBefore = waitFor manager.fetchMerkleRoot()
try: for i in 0 ..< credentials.len():
for i in 0 ..< credentials.len(): info "Registering credential", index = i, credential = credentials[i]
info "Registering credential", index = i, credential = credentials[i] (waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
waitFor manager.register(credentials[i], UserMessageLimit(20)) assert false, "Failed to register credential " & $i & ": " & error
discard waitFor manager.updateRoots() discard waitFor manager.updateRoots()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
let merkleRootAfter = waitFor manager.fetchMerkleRoot() let merkleRootAfter = waitFor manager.fetchMerkleRoot()
@ -127,16 +124,15 @@ suite "Onchain group manager":
test "register: should guard against uninitialized state": test "register: should guard against uninitialized state":
let dummyCommitment = default(IDCommitment) let dummyCommitment = default(IDCommitment)
try: let res = waitFor manager.register(
waitFor manager.register( RateCommitment(
RateCommitment( idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(20)
idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(20)
)
) )
except CatchableError: )
assert true
except Exception: check:
assert false, "exception raised: " & getCurrentExceptionMsg() res.isErr()
res.error == "Not initialized: OnchainGroupManager is not initialized"
test "register: should register successfully": test "register: should register successfully":
# TODO :- similar to ```trackRootChanges: should fetch history correctly``` # TODO :- similar to ```trackRootChanges: should fetch history correctly```
@ -146,11 +142,9 @@ suite "Onchain group manager":
let idCredentials = generateCredentials() let idCredentials = generateCredentials()
let merkleRootBefore = waitFor manager.fetchMerkleRoot() let merkleRootBefore = waitFor manager.fetchMerkleRoot()
try: (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
waitFor manager.register(idCredentials, UserMessageLimit(20))
except Exception, CatchableError:
assert false, assert false,
"exception raised when calling register: " & getCurrentExceptionMsg() "error returned when calling register: " & error
let merkleRootAfter = waitFor manager.fetchMerkleRoot() let merkleRootAfter = waitFor manager.fetchMerkleRoot()
@ -177,26 +171,24 @@ suite "Onchain group manager":
manager.onRegister(callback) manager.onRegister(callback)
try: (waitFor manager.register(
waitFor manager.register( RateCommitment(
RateCommitment( idCommitment: idCommitment, userMessageLimit: UserMessageLimit(20)
idCommitment: idCommitment, userMessageLimit: UserMessageLimit(20)
)
) )
except Exception, CatchableError: )).isOkOr:
assert false, "exception raised: " & getCurrentExceptionMsg() assert false,
"error returned when calling register: " & error
waitFor fut waitFor fut
test "withdraw: should guard against uninitialized state": test "withdraw: should guard against uninitialized state":
let idSecretHash = generateCredentials().idSecretHash let idSecretHash = generateCredentials().idSecretHash
try: let res = waitFor manager.withdraw(idSecretHash)
waitFor manager.withdraw(idSecretHash)
except CatchableError: check:
assert true res.isErr()
except Exception: res.error == "Not initialized: OnchainGroupManager is not initialized"
assert false, "exception raised: " & getCurrentExceptionMsg()
test "validateRoot: should validate good root": test "validateRoot: should validate good root":
let idCredentials = generateCredentials() let idCredentials = generateCredentials()
@ -217,10 +209,8 @@ suite "Onchain group manager":
(waitFor manager.init()).isOkOr: (waitFor manager.init()).isOkOr:
raiseAssert $error raiseAssert $error
try: (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
waitFor manager.register(idCredentials, UserMessageLimit(20)) assert false, "error returned : " & getCurrentExceptionMsg()
except Exception, CatchableError:
assert false, "exception raised: " & getCurrentExceptionMsg()
waitFor fut waitFor fut
@ -299,10 +289,10 @@ suite "Onchain group manager":
manager.onRegister(callback) manager.onRegister(callback)
try:
waitFor manager.register(credentials, UserMessageLimit(20)) (waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
except Exception, CatchableError: assert false,
assert false, "exception raised: " & getCurrentExceptionMsg() "error returned when calling register: " & error
waitFor fut waitFor fut
let rootUpdated = waitFor manager.updateRoots() let rootUpdated = waitFor manager.updateRoots()
@ -337,11 +327,9 @@ suite "Onchain group manager":
let idCredential = generateCredentials() let idCredential = generateCredentials()
try: (waitFor manager.register(idCredential, UserMessageLimit(20))).isOkOr:
waitFor manager.register(idCredential, UserMessageLimit(20)) assert false,
except Exception, CatchableError: "error returned when calling register: " & error
assert false,
"exception raised when calling startGroupSync: " & getCurrentExceptionMsg()
let messageBytes = "Hello".toBytes() let messageBytes = "Hello".toBytes()
@ -395,14 +383,12 @@ suite "Onchain group manager":
return callback return callback
try: manager.onRegister(generateCallback(futures, credentials))
manager.onRegister(generateCallback(futures, credentials))
for i in 0 ..< credentials.len(): for i in 0 ..< credentials.len():
waitFor manager.register(credentials[i], UserMessageLimit(20)) (waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
discard waitFor manager.updateRoots() assert false, "Failed to register credential " & $i & ": " & error
except Exception, CatchableError: discard waitFor manager.updateRoots()
assert false, "exception raised: " & getCurrentExceptionMsg()
waitFor allFutures(futures) waitFor allFutures(futures)

View File

@ -239,11 +239,8 @@ suite "Waku rln relay":
let manager = cast[OnchainGroupManager](wakuRlnRelay.groupManager) let manager = cast[OnchainGroupManager](wakuRlnRelay.groupManager)
let idCredentials = generateCredentials() let idCredentials = generateCredentials()
try: (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
waitFor manager.register(idCredentials, UserMessageLimit(20)) assert false, "error returned when calling register: " & error
except Exception, CatchableError:
assert false,
"exception raised when calling register: " & getCurrentExceptionMsg()
let epoch1 = wakuRlnRelay.getCurrentEpoch() let epoch1 = wakuRlnRelay.getCurrentEpoch()
@ -296,11 +293,8 @@ suite "Waku rln relay":
let manager = cast[OnchainGroupManager](wakuRlnRelay.groupManager) let manager = cast[OnchainGroupManager](wakuRlnRelay.groupManager)
let idCredentials = generateCredentials() let idCredentials = generateCredentials()
try: (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
waitFor manager.register(idCredentials, UserMessageLimit(20)) assert false, "error returned when calling register: " & error
except Exception, CatchableError:
assert false,
"exception raised when calling register: " & getCurrentExceptionMsg()
# usually it's 20 seconds but we set it to 1 for testing purposes which make the test faster # usually it's 20 seconds but we set it to 1 for testing purposes which make the test faster
wakuRlnRelay.rlnMaxTimestampGap = 1 wakuRlnRelay.rlnMaxTimestampGap = 1
@ -346,11 +340,9 @@ suite "Waku rln relay":
let manager1 = cast[OnchainGroupManager](wakuRlnRelay1.groupManager) let manager1 = cast[OnchainGroupManager](wakuRlnRelay1.groupManager)
let idCredentials1 = generateCredentials() let idCredentials1 = generateCredentials()
try: (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
except Exception, CatchableError:
assert false, assert false,
"exception raised when calling register: " & getCurrentExceptionMsg() "error returned when calling register: " & error
let index2 = MembershipIndex(6) let index2 = MembershipIndex(6)
let rlnConf2 = getWakuRlnConfig(manager = manager, index = index2) let rlnConf2 = getWakuRlnConfig(manager = manager, index = index2)
@ -360,11 +352,9 @@ suite "Waku rln relay":
let manager2 = cast[OnchainGroupManager](wakuRlnRelay2.groupManager) let manager2 = cast[OnchainGroupManager](wakuRlnRelay2.groupManager)
let idCredentials2 = generateCredentials() let idCredentials2 = generateCredentials()
try: (waitFor manager2.register(idCredentials2, UserMessageLimit(20))).isOkOr:
waitFor manager2.register(idCredentials2, UserMessageLimit(20))
except Exception, CatchableError:
assert false, assert false,
"exception raised when calling register: " & getCurrentExceptionMsg() "error returned when calling register: " & error
# get the current epoch time # get the current epoch time
let epoch = wakuRlnRelay1.getCurrentEpoch() let epoch = wakuRlnRelay1.getCurrentEpoch()

View File

@ -65,11 +65,9 @@ procSuite "WakuNode - RLN relay":
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
let idCredentials1 = generateCredentials() let idCredentials1 = generateCredentials()
try: (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
except Exception, CatchableError:
assert false, assert false,
"exception raised when calling register: " & getCurrentExceptionMsg() "error returned when calling register: " & error
let rootUpdated1 = waitFor manager1.updateRoots() let rootUpdated1 = waitFor manager1.updateRoots()
info "Updated root for node1", rootUpdated1 info "Updated root for node1", rootUpdated1
@ -182,11 +180,9 @@ procSuite "WakuNode - RLN relay":
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager) let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
let idCredentials = generateCredentials() let idCredentials = generateCredentials()
try: (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
waitFor manager.register(idCredentials, UserMessageLimit(20))
except Exception, CatchableError:
assert false, assert false,
"exception raised when calling register: " & getCurrentExceptionMsg() "error returned when calling register: " & error
let rootUpdated = waitFor manager.updateRoots() let rootUpdated = waitFor manager.updateRoots()
info "Updated root for node", node = index + 1, rootUpdated = rootUpdated info "Updated root for node", node = index + 1, rootUpdated = rootUpdated
@ -301,11 +297,9 @@ procSuite "WakuNode - RLN relay":
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
let idCredentials1 = generateCredentials() let idCredentials1 = generateCredentials()
try: (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
except Exception, CatchableError:
assert false, assert false,
"exception raised when calling register: " & getCurrentExceptionMsg() "error returned when calling register: " & error
let rootUpdated1 = waitFor manager1.updateRoots() let rootUpdated1 = waitFor manager1.updateRoots()
info "Updated root for node1", rootUpdated1 info "Updated root for node1", rootUpdated1
@ -418,11 +412,9 @@ procSuite "WakuNode - RLN relay":
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
let idCredentials1 = generateCredentials() let idCredentials1 = generateCredentials()
try: (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
except Exception, CatchableError:
assert false, assert false,
"exception raised when calling register: " & getCurrentExceptionMsg() "error returned when calling register: " & error
let rootUpdated1 = waitFor manager1.updateRoots() let rootUpdated1 = waitFor manager1.updateRoots()
info "Updated root for node1", rootUpdated1 info "Updated root for node1", rootUpdated1
@ -585,11 +577,10 @@ procSuite "WakuNode - RLN relay":
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
let idCredentials1 = generateCredentials() let idCredentials1 = generateCredentials()
try: (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
waitFor manager1.register(idCredentials1, UserMessageLimit(20)) assert false,
except Exception, CatchableError: "error returned when calling register: " & error
assert false,
"exception raised when calling register: " & getCurrentExceptionMsg()
let rootUpdated1 = waitFor manager1.updateRoots() let rootUpdated1 = waitFor manager1.updateRoots()
info "Updated root for node1", rootUpdated1 info "Updated root for node1", rootUpdated1

View File

@ -73,7 +73,10 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
# 4. register on-chain # 4. register on-chain
try: try:
waitFor groupManager.register(credential, conf.userMessageLimit) let registerResult = waitFor groupManager.register(credential, conf.userMessageLimit)
if registerResult.isErr():
error "Failed to register on-chain", error = registerResult.error
quit(QuitFailure)
except Exception, CatchableError: except Exception, CatchableError:
error "failure while registering credentials on-chain", error "failure while registering credentials on-chain",
error = getCurrentExceptionMsg() error = getCurrentExceptionMsg()

View File

@ -148,12 +148,6 @@ template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized: if not g.initialized:
raise newException(CatchableError, "OnchainGroupManager is not initialized") raise newException(CatchableError, "OnchainGroupManager is not initialized")
template retryWrapper(
g: OnchainGroupManager, res: auto, errStr: string, body: untyped
): auto =
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
body
proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} = proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
let rootRes = (await g.fetchMerkleRoot()).valueOr: let rootRes = (await g.fetchMerkleRoot()).valueOr:
return false return false
@ -172,7 +166,7 @@ proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
return false return false
proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError]).} = proc trackRootChanges*(g: OnchainGroupManager): Future[Result[void, string]] {.async.} =
try: try:
initializedGuard(g) initializedGuard(g)
const rpcDelay = 5.seconds const rpcDelay = 5.seconds
@ -194,18 +188,21 @@ proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError
let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr: let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr:
error "Failed to fetch next free index", error = error error "Failed to fetch next free index", error = error
raise return err("Failed to fetch next free index: " & error)
newException(CatchableError, "Failed to fetch next free index: " & error)
let memberCount = cast[int64](nextFreeIndex) let memberCount = cast[int64](nextFreeIndex)
waku_rln_number_registered_memberships.set(float64(memberCount)) waku_rln_number_registered_memberships.set(float64(memberCount))
except CatchableError: except CatchableError:
error "Fatal error in trackRootChanges", error = getCurrentExceptionMsg() error "Fatal error in trackRootChanges", error = getCurrentExceptionMsg()
return err("Fatal error in trackRootChanges" & getCurrentExceptionMsg())
method register*( method register*(
g: OnchainGroupManager, rateCommitment: RateCommitment g: OnchainGroupManager, rateCommitment: RateCommitment
): Future[void] {.async: (raises: [Exception]).} = ): Future[Result[void, string]] {.async.} =
initializedGuard(g) try:
initializedGuard(g)
except CatchableError as e:
return err("Not initialized: " & e.msg)
try: try:
let leaf = rateCommitment.toLeaf().get() let leaf = rateCommitment.toLeaf().get()
@ -214,33 +211,43 @@ method register*(
info "registering member via callback", rateCommitment = leaf, index = idx info "registering member via callback", rateCommitment = leaf, index = idx
await g.registerCb.get()(@[Membership(rateCommitment: leaf, index: idx)]) await g.registerCb.get()(@[Membership(rateCommitment: leaf, index: idx)])
g.latestIndex.inc() g.latestIndex.inc()
except CatchableError: except Exception as e:
raise newException(ValueError, getCurrentExceptionMsg()) return err("Failed to call register callback: " & e.msg)
return ok()
method register*( method register*(
g: OnchainGroupManager, g: OnchainGroupManager,
identityCredential: IdentityCredential, identityCredential: IdentityCredential,
userMessageLimit: UserMessageLimit, userMessageLimit: UserMessageLimit,
): Future[void] {.async: (raises: [Exception]).} = ): Future[Result[void, string]] {.async.} =
initializedGuard(g) try:
initializedGuard(g)
except CatchableError as e:
return err("Not initialized: " & e.msg)
let ethRpc = g.ethRpc.get() let ethRpc = g.ethRpc.get()
let wakuRlnContract = g.wakuRlnContract.get() let wakuRlnContract = g.wakuRlnContract.get()
var gasPrice: int let gasPrice = (
g.retryWrapper(gasPrice, "Failed to get gas price"): await retryWrapper(
let fetchedGasPrice = uint64(await ethRpc.provider.eth_gasPrice()) RetryStrategy.new(),
## Multiply by 2 to speed up the transaction "Failed to get gas price",
## Check for overflow when casting to int proc(): Future[int] {.async.} =
if fetchedGasPrice > uint64(high(int) div 2): let fetchedGasPrice = uint64(await ethRpc.provider.eth_gasPrice())
warn "Gas price overflow detected, capping at maximum int value", if fetchedGasPrice > uint64(high(int) div 2):
fetchedGasPrice = fetchedGasPrice, maxInt = high(int) warn "Gas price overflow detected, capping at maximum int value",
high(int) fetchedGasPrice = fetchedGasPrice, maxInt = high(int)
else: return high(int)
let calculatedGasPrice = int(fetchedGasPrice) * 2 else:
debug "Gas price calculated", let calculatedGasPrice = int(fetchedGasPrice) * 2
fetchedGasPrice = fetchedGasPrice, gasPrice = calculatedGasPrice debug "Gas price calculated",
calculatedGasPrice fetchedGasPrice = fetchedGasPrice, gasPrice = calculatedGasPrice
return calculatedGasPrice,
)
).valueOr:
return err("Failed to get gas price: " & error)
let idCommitmentHex = identityCredential.idCommitment.inHex() let idCommitmentHex = identityCredential.idCommitment.inHex()
debug "identityCredential idCommitmentHex", idCommitment = idCommitmentHex debug "identityCredential idCommitmentHex", idCommitment = idCommitmentHex
let idCommitment = identityCredential.idCommitment.toUInt256() let idCommitment = identityCredential.idCommitment.toUInt256()
@ -249,27 +256,37 @@ method register*(
idCommitment = idCommitment, idCommitment = idCommitment,
userMessageLimit = userMessageLimit, userMessageLimit = userMessageLimit,
idCommitmentsToErase = idCommitmentsToErase idCommitmentsToErase = idCommitmentsToErase
var txHash: TxHash let txHash = (
g.retryWrapper(txHash, "Failed to register the member"): await retryWrapper(
await wakuRlnContract RetryStrategy.new(),
.register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase) "Failed to register the member",
.send(gasPrice = gasPrice) proc(): Future[TxHash] {.async.} =
return await wakuRlnContract
.register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase)
.send(gasPrice = gasPrice),
)
).valueOr:
return err("Failed to register member: " & error)
# wait for the transaction to be mined # wait for the transaction to be mined
var tsReceipt: ReceiptObject let tsReceipt = (
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"): await retryWrapper(
await ethRpc.getMinedTransactionReceipt(txHash) RetryStrategy.new(),
"Failed to get the transaction receipt",
proc(): Future[ReceiptObject] {.async.} =
return await ethRpc.getMinedTransactionReceipt(txHash),
)
).valueOr:
return err("Failed to get transaction receipt: " & error)
debug "registration transaction mined", txHash = txHash debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash) g.registrationTxHash = some(txHash)
# the receipt topic holds the hash of signature of the raised events # the receipt topic holds the hash of signature of the raised events
debug "ts receipt", receipt = tsReceipt[] debug "ts receipt", receipt = tsReceipt[]
if tsReceipt.status.isNone(): if tsReceipt.status.isNone():
raise newException(ValueError, "Transaction failed: status is None") return err("Transaction failed: status is None")
if tsReceipt.status.get() != 1.Quantity: if tsReceipt.status.get() != 1.Quantity:
raise newException( return err("Transaction failed with status: " & $tsReceipt.status.get())
ValueError, "Transaction failed with status: " & $tsReceipt.status.get()
)
## Search through all transaction logs to find the MembershipRegistered event ## Search through all transaction logs to find the MembershipRegistered event
let expectedEventSignature = cast[FixedBytes[32]](keccak.keccak256.digest( let expectedEventSignature = cast[FixedBytes[32]](keccak.keccak256.digest(
@ -283,9 +300,7 @@ method register*(
break break
if membershipRegisteredLog.isNone(): if membershipRegisteredLog.isNone():
raise newException( return err("register: MembershipRegistered event not found in transaction logs")
ValueError, "register: MembershipRegistered event not found in transaction logs"
)
let registrationLog = membershipRegisteredLog.get() let registrationLog = membershipRegisteredLog.get()
@ -309,20 +324,33 @@ method register*(
if g.registerCb.isSome(): if g.registerCb.isSome():
let member = Membership(rateCommitment: rateCommitment, index: g.latestIndex) let member = Membership(rateCommitment: rateCommitment, index: g.latestIndex)
await g.registerCb.get()(@[member]) try:
await g.registerCb.get()(@[member])
except Exception as e:
return err("Failed to call register callback: " & e.msg)
g.latestIndex.inc() g.latestIndex.inc()
return return ok()
method withdraw*( method withdraw*(
g: OnchainGroupManager, idCommitment: IDCommitment g: OnchainGroupManager, idCommitment: IDCommitment
): Future[void] {.async: (raises: [Exception]).} = ): Future[Result[void, string]] {.async.} =
initializedGuard(g) # TODO: after slashing is enabled on the contract try:
initializedGuard(g)
except CatchableError as e:
return err("Not initialized: " & e.msg)
return ok()
method withdrawBatch*( method withdrawBatch*(
g: OnchainGroupManager, idCommitments: seq[IDCommitment] g: OnchainGroupManager, idCommitments: seq[IDCommitment]
): Future[void] {.async: (raises: [Exception]).} = ): Future[Result[void, string]] {.async.} =
initializedGuard(g) try:
initializedGuard(g)
except CatchableError as e:
return err("Not initialized: " & e.msg)
return ok()
proc getRootFromProofAndIndex( proc getRootFromProofAndIndex(
g: OnchainGroupManager, elements: seq[byte], bits: seq[byte] g: OnchainGroupManager, elements: seq[byte], bits: seq[byte]
@ -492,25 +520,30 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
proc establishConnection( proc establishConnection(
g: OnchainGroupManager g: OnchainGroupManager
): Future[GroupManagerResult[Web3]] {.async.} = ): Future[GroupManagerResult[Web3]] {.async.} =
var ethRpc: Web3 let ethRpc = (
await retryWrapper(
RetryStrategy.new(),
"Failed to connect to the Ethereum client",
proc(): Future[Web3] {.async.} =
var innerEthRpc: Web3
var connected = false
for clientUrl in g.ethClientUrls:
## We give a chance to the user to provide multiple clients
## and we try to connect to each of them
try:
innerEthRpc = await newWeb3(clientUrl)
connected = true
break
except CatchableError:
error "failed connect Eth client", error = getCurrentExceptionMsg()
g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"): if not connected:
var innerEthRpc: Web3 raise newException(CatchableError, "all failed")
var connected = false
for clientUrl in g.ethClientUrls:
## We give a chance to the user to provide multiple clients
## and we try to connect to each of them
try:
innerEthRpc = await newWeb3(clientUrl)
connected = true
break
except CatchableError:
error "failed connect Eth client", error = getCurrentExceptionMsg()
if not connected: return innerEthRpc,
raise newException(CatchableError, "all failed") )
).valueOr:
innerEthRpc return err("Failed to establish Ethereum connection: " & error)
return ok(ethRpc) return ok(ethRpc)
@ -519,9 +552,15 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
let ethRpc: Web3 = (await establishConnection(g)).valueOr: let ethRpc: Web3 = (await establishConnection(g)).valueOr:
return err("failed to connect to Ethereum clients: " & $error) return err("failed to connect to Ethereum clients: " & $error)
var fetchedChainId: UInt256 let fetchedChainId = (
g.retryWrapper(fetchedChainId, "Failed to get the chain id"): await retryWrapper(
await ethRpc.provider.eth_chainId() RetryStrategy.new(),
"Failed to get the chain id",
proc(): Future[UInt256] {.async.} =
return await ethRpc.provider.eth_chainId(),
)
).valueOr:
return err("Failed to get chain id: " & error)
# Set the chain id # Set the chain id
if g.chainId == 0: if g.chainId == 0:
@ -595,8 +634,10 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
proc onDisconnect() {.async.} = proc onDisconnect() {.async.} =
error "Ethereum client disconnected" error "Ethereum client disconnected"
var newEthRpc: Web3 = (await g.establishConnection()).valueOr: let newEthRpc: Web3 = (await g.establishConnection()).valueOr:
g.onFatalErrorAction("failed to connect to Ethereum clients onDisconnect") error "Fatal: failed to reconnect to Ethereum clients after disconnect",
error = error
g.onFatalErrorAction("failed to reconnect to Ethereum clients: " & error)
return return
newEthRpc.ondisconnect = ethRpc.ondisconnect newEthRpc.ondisconnect = ethRpc.ondisconnect

View File

@ -1,36 +1,33 @@
import ../../../common/error_handling
import chronos import chronos
import results import results
const
DefaultRetryDelay* = 4000.millis
DefaultRetryCount* = 15'u
type RetryStrategy* = object type RetryStrategy* = object
shouldRetry*: bool
retryDelay*: Duration retryDelay*: Duration
retryCount*: uint retryCount*: uint
proc new*(T: type RetryStrategy): RetryStrategy = proc new*(T: type RetryStrategy): RetryStrategy =
return RetryStrategy(shouldRetry: true, retryDelay: 4000.millis, retryCount: 15) return RetryStrategy(retryDelay: DefaultRetryDelay, retryCount: DefaultRetryCount)
template retryWrapper*( proc retryWrapper*[T](
res: auto,
retryStrategy: RetryStrategy, retryStrategy: RetryStrategy,
errStr: string, errStr: string,
errCallback: OnFatalErrorHandler, body: proc(): Future[T] {.async.},
body: untyped, ): Future[Result[T, string]] {.async.} =
): auto =
if errCallback == nil:
raise newException(CatchableError, "Ensure that the errCallback is set")
var retryCount = retryStrategy.retryCount var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry var lastError = ""
var exceptionMessage = ""
while shouldRetry and retryCount > 0: while retryCount > 0:
try: try:
res = body let value = await body()
shouldRetry = false return ok(value)
except: except CatchableError as e:
retryCount -= 1 retryCount -= 1
exceptionMessage = getCurrentExceptionMsg() lastError = e.msg
await sleepAsync(retryStrategy.retryDelay) if retryCount > 0:
if shouldRetry: await sleepAsync(retryStrategy.retryDelay)
errCallback(errStr & ": " & exceptionMessage)
return return err(errStr & ": " & lastError)

View File

@ -64,7 +64,7 @@ type WakuRLNRelay* = ref object of RootObj
onFatalErrorAction*: OnFatalErrorHandler onFatalErrorAction*: OnFatalErrorHandler
nonceManager*: NonceManager nonceManager*: NonceManager
epochMonitorFuture*: Future[void] epochMonitorFuture*: Future[void]
rootChangesFuture*: Future[void] rootChangesFuture*: Future[Result[void, string]]
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch = proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
## gets time `t` as `flaot64` with subseconds resolution in the fractional part ## gets time `t` as `flaot64` with subseconds resolution in the fractional part