From bc9454db5e0258eaa964f36c37d7204244ece9a5 Mon Sep 17 00:00:00 2001 From: Tanya S <120410716+stubbsta@users.noreply.github.com> Date: Thu, 12 Mar 2026 22:27:50 +0200 Subject: [PATCH] Chore: Simplify on chain group manager error handling (#3678) --- tests/node/test_wakunode_legacy_lightpush.nim | 7 +- tests/node/test_wakunode_lightpush.nim | 7 +- .../test_rln_group_manager_onchain.nim | 99 ++--- tests/waku_rln_relay/test_waku_rln_relay.nim | 26 +- .../test_wakunode_rln_relay.nim | 49 +-- tests/wakunode_rest/test_rest_relay.nim | 39 +- .../rln_keystore_generator.nim | 4 +- .../group_manager/group_manager_base.nim | 4 +- .../group_manager/on_chain/group_manager.nim | 358 +++++++++--------- .../group_manager/on_chain/retry_wrapper.nim | 41 +- waku/waku_rln_relay/rln_relay.nim | 4 +- 11 files changed, 288 insertions(+), 350 deletions(-) diff --git a/tests/node/test_wakunode_legacy_lightpush.nim b/tests/node/test_wakunode_legacy_lightpush.nim index 902464bcd..68c6cacde 100644 --- a/tests/node/test_wakunode_legacy_lightpush.nim +++ b/tests/node/test_wakunode_legacy_lightpush.nim @@ -134,11 +134,8 @@ suite "RLN Proofs as a Lightpush Service": let manager1 = cast[OnchainGroupManager](server.wakuRlnRelay.groupManager) let idCredentials1 = generateCredentials() - try: - waitFor manager1.register(idCredentials1, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated1 = waitFor manager1.updateRoots() info "Updated root for node1", rootUpdated1 diff --git a/tests/node/test_wakunode_lightpush.nim b/tests/node/test_wakunode_lightpush.nim index 66b87b85e..b407327e3 100644 --- a/tests/node/test_wakunode_lightpush.nim +++ b/tests/node/test_wakunode_lightpush.nim @@ -137,11 +137,8 @@ suite "RLN Proofs as a Lightpush Service": let manager1 = cast[OnchainGroupManager](server.wakuRlnRelay.groupManager) let idCredentials1 = generateCredentials() - try: - waitFor manager1.register(idCredentials1, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated1 = waitFor manager1.updateRoots() info "Updated root for node1", rootUpdated1 diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index aac900911..29da94129 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -74,10 +74,11 @@ suite "Onchain group manager": raiseAssert "Expected error when keystore file doesn't exist" test "trackRootChanges: should guard against uninitialized state": - try: - discard manager.trackRootChanges() - except CatchableError: - check getCurrentExceptionMsg().len == 38 + let initializedResult = waitFor manager.trackRootChanges() + + check: + initializedResult.isErr() + initializedResult.error == "OnchainGroupManager is not initialized" test "trackRootChanges: should sync to the state of the group": let credentials = generateCredentials() @@ -86,10 +87,8 @@ suite "Onchain group manager": let merkleRootBefore = waitFor manager.fetchMerkleRoot() - try: - waitFor manager.register(credentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() + (waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error discard waitFor withTimeout(trackRootChanges(manager), 15.seconds) @@ -110,13 +109,11 @@ suite "Onchain group manager": let merkleRootBefore = waitFor manager.fetchMerkleRoot() - try: - for i in 0 ..< credentials.len(): - info "Registering credential", index = i, credential = credentials[i] - waitFor manager.register(credentials[i], UserMessageLimit(20)) - discard waitFor manager.updateRoots() - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() + for i in 0 ..< credentials.len(): + info "Registering credential", index = i, credential = credentials[i] + (waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr: + assert false, "Failed to register credential " & $i & ": " & error + discard waitFor manager.updateRoots() let merkleRootAfter = waitFor manager.fetchMerkleRoot() @@ -127,16 +124,15 @@ suite "Onchain group manager": test "register: should guard against uninitialized state": let dummyCommitment = default(IDCommitment) - try: - waitFor manager.register( - RateCommitment( - idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(20) - ) + let res = waitFor manager.register( + RateCommitment( + idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(20) ) - except CatchableError: - assert true - except Exception: - assert false, "exception raised: " & getCurrentExceptionMsg() + ) + + check: + res.isErr() + res.error == "OnchainGroupManager is not initialized" test "register: should register successfully": # TODO :- similar to ```trackRootChanges: should fetch history correctly``` @@ -146,11 +142,8 @@ suite "Onchain group manager": let idCredentials = generateCredentials() let merkleRootBefore = waitFor manager.fetchMerkleRoot() - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let merkleRootAfter = waitFor manager.fetchMerkleRoot() @@ -177,26 +170,25 @@ suite "Onchain group manager": manager.onRegister(callback) - try: + ( waitFor manager.register( RateCommitment( idCommitment: idCommitment, userMessageLimit: UserMessageLimit(20) ) ) - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() + ).isOkOr: + assert false, "error returned when calling register: " & error waitFor fut test "withdraw: should guard against uninitialized state": let idSecretHash = generateCredentials().idSecretHash - try: - waitFor manager.withdraw(idSecretHash) - except CatchableError: - assert true - except Exception: - assert false, "exception raised: " & getCurrentExceptionMsg() + let res = waitFor manager.withdraw(idSecretHash) + + check: + res.isErr() + res.error == "OnchainGroupManager is not initialized" test "validateRoot: should validate good root": let idCredentials = generateCredentials() @@ -217,10 +209,8 @@ suite "Onchain group manager": (waitFor manager.init()).isOkOr: raiseAssert $error - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "error returned : " & getCurrentExceptionMsg() waitFor fut @@ -299,10 +289,8 @@ suite "Onchain group manager": manager.onRegister(callback) - try: - waitFor manager.register(credentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() + (waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error waitFor fut let rootUpdated = waitFor manager.updateRoots() @@ -337,11 +325,8 @@ suite "Onchain group manager": let idCredential = generateCredentials() - try: - waitFor manager.register(idCredential, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredential, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let messageBytes = "Hello".toBytes() @@ -395,14 +380,12 @@ suite "Onchain group manager": return callback - try: - manager.onRegister(generateCallback(futures, credentials)) + manager.onRegister(generateCallback(futures, credentials)) - for i in 0 ..< credentials.len(): - waitFor manager.register(credentials[i], UserMessageLimit(20)) - discard waitFor manager.updateRoots() - except Exception, CatchableError: - assert false, "exception raised: " & getCurrentExceptionMsg() + for i in 0 ..< credentials.len(): + (waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr: + assert false, "Failed to register credential " & $i & ": " & error + discard waitFor manager.updateRoots() waitFor allFutures(futures) diff --git a/tests/waku_rln_relay/test_waku_rln_relay.nim b/tests/waku_rln_relay/test_waku_rln_relay.nim index d9fe0d890..e41b79608 100644 --- a/tests/waku_rln_relay/test_waku_rln_relay.nim +++ b/tests/waku_rln_relay/test_waku_rln_relay.nim @@ -242,11 +242,8 @@ suite "Waku rln relay": let manager = cast[OnchainGroupManager](wakuRlnRelay.groupManager) let idCredentials = generateCredentials() - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let epoch1 = wakuRlnRelay.getCurrentEpoch() @@ -301,11 +298,8 @@ suite "Waku rln relay": let manager = cast[OnchainGroupManager](wakuRlnRelay.groupManager) let idCredentials = generateCredentials() - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error # usually it's 20 seconds but we set it to 1 for testing purposes which make the test faster wakuRlnRelay.rlnMaxTimestampGap = 1 @@ -353,11 +347,9 @@ suite "Waku rln relay": let manager1 = cast[OnchainGroupManager](wakuRlnRelay1.groupManager) let idCredentials1 = generateCredentials() - try: - waitFor manager1.register(idCredentials1, UserMessageLimit(20)) - except Exception, CatchableError: + (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr: assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + "error returned when calling register: " & error let index2 = MembershipIndex(6) let rlnConf2 = getWakuRlnConfig(manager = manager, index = index2) @@ -369,11 +361,9 @@ suite "Waku rln relay": let manager2 = cast[OnchainGroupManager](wakuRlnRelay2.groupManager) let idCredentials2 = generateCredentials() - try: - waitFor manager2.register(idCredentials2, UserMessageLimit(20)) - except Exception, CatchableError: + (waitFor manager2.register(idCredentials2, UserMessageLimit(20))).isOkOr: assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + "error returned when calling register: " & error # get the current epoch time let epoch = wakuRlnRelay1.getCurrentEpoch() diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index fcf97a671..79a4d6711 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -58,11 +58,8 @@ procSuite "WakuNode - RLN relay": let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let idCredentials1 = generateCredentials() - try: - waitFor manager1.register(idCredentials1, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated1 = waitFor manager1.updateRoots() info "Updated root for node1", rootUpdated1 @@ -172,11 +169,8 @@ procSuite "WakuNode - RLN relay": let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let idCredentials1 = generateCredentials() - try: - waitFor manager1.register(idCredentials1, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated1 = waitFor manager1.updateRoots() info "Updated root for node", node = 1, rootUpdated = rootUpdated1 @@ -192,11 +186,8 @@ procSuite "WakuNode - RLN relay": let manager2 = cast[OnchainGroupManager](node2.wakuRlnRelay.groupManager) let idCredentials2 = generateCredentials() - try: - waitFor manager2.register(idCredentials2, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager2.register(idCredentials2, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated2 = waitFor manager2.updateRoots() info "Updated root for node", node = 2, rootUpdated = rootUpdated2 @@ -212,11 +203,8 @@ procSuite "WakuNode - RLN relay": let manager3 = cast[OnchainGroupManager](node3.wakuRlnRelay.groupManager) let idCredentials3 = generateCredentials() - try: - waitFor manager3.register(idCredentials3, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager3.register(idCredentials3, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated3 = waitFor manager3.updateRoots() info "Updated root for node", node = 3, rootUpdated = rootUpdated3 @@ -333,11 +321,8 @@ procSuite "WakuNode - RLN relay": let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let idCredentials1 = generateCredentials() - try: - waitFor manager1.register(idCredentials1, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated1 = waitFor manager1.updateRoots() info "Updated root for node1", rootUpdated1 @@ -448,11 +433,8 @@ procSuite "WakuNode - RLN relay": let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let idCredentials1 = generateCredentials() - try: - waitFor manager1.register(idCredentials1, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated1 = waitFor manager1.updateRoots() info "Updated root for node1", rootUpdated1 @@ -620,11 +602,8 @@ procSuite "WakuNode - RLN relay": let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager) let idCredentials1 = generateCredentials() - try: - waitFor manager1.register(idCredentials1, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr: + assert false, "error returned when calling register: " & error let rootUpdated1 = waitFor manager1.updateRoots() info "Updated root for node1", rootUpdated1 diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index f16e5c4f4..efdd597ba 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -42,8 +42,8 @@ suite "Waku v2 Rest API - Relay": var manager {.threadVar.}: OnchainGroupManager setup: - anvilProc = runAnvil() - manager = waitFor setupOnchainGroupManager() + anvilProc = runAnvil(stateFile = some(DEFAULT_ANVIL_STATE_PATH)) + manager = waitFor setupOnchainGroupManager(deployContracts = false) teardown: stopAnvil(anvilProc) @@ -268,11 +268,8 @@ suite "Waku v2 Rest API - Relay": let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager) let idCredentials = generateCredentials() - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "Failed to register identity credentials" & getCurrentExceptionMsg() let rootUpdated = waitFor manager.updateRoots() info "Updated root for node", rootUpdated @@ -545,11 +542,8 @@ suite "Waku v2 Rest API - Relay": let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager) let idCredentials = generateCredentials() - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "Failed to register identity credentials" & getCurrentExceptionMsg() let rootUpdated = waitFor manager.updateRoots() info "Updated root for node", rootUpdated @@ -617,11 +611,8 @@ suite "Waku v2 Rest API - Relay": let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager) let idCredentials = generateCredentials() - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "Failed to register identity credentials" & getCurrentExceptionMsg() let rootUpdated = waitFor manager.updateRoots() info "Updated root for node", rootUpdated @@ -679,11 +670,8 @@ suite "Waku v2 Rest API - Relay": let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager) let idCredentials = generateCredentials() - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "Failed to register identity credentials" & getCurrentExceptionMsg() let rootUpdated = waitFor manager.updateRoots() info "Updated root for node", rootUpdated @@ -754,11 +742,8 @@ suite "Waku v2 Rest API - Relay": let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager) let idCredentials = generateCredentials() - try: - waitFor manager.register(idCredentials, UserMessageLimit(20)) - except Exception, CatchableError: - assert false, - "exception raised when calling register: " & getCurrentExceptionMsg() + (waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr: + assert false, "Failed to register identity credentials" & getCurrentExceptionMsg() let rootUpdated = waitFor manager.updateRoots() info "Updated root for node", rootUpdated diff --git a/tools/rln_keystore_generator/rln_keystore_generator.nim b/tools/rln_keystore_generator/rln_keystore_generator.nim index 85df37982..503e8d58e 100644 --- a/tools/rln_keystore_generator/rln_keystore_generator.nim +++ b/tools/rln_keystore_generator/rln_keystore_generator.nim @@ -73,7 +73,9 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) = # 4. register on-chain try: - waitFor groupManager.register(credential, conf.userMessageLimit) + (waitFor groupManager.register(credential, conf.userMessageLimit)).isOkOr: + error "Failed to register on-chain", error = error + quit(QuitFailure) except Exception, CatchableError: error "failure while registering credentials on-chain", error = getCurrentExceptionMsg() diff --git a/waku/waku_rln_relay/group_manager/group_manager_base.nim b/waku/waku_rln_relay/group_manager/group_manager_base.nim index de2962e42..9c088d4c5 100644 --- a/waku/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/waku_rln_relay/group_manager/group_manager_base.nim @@ -144,6 +144,4 @@ method generateProof*( return err("generateProof is not implemented") method isReady*(g: GroupManager): Future[bool] {.base, async.} = - raise newException( - CatchableError, "isReady proc for " & $g.type & " is not implemented yet" - ) + return true diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 2ce7d4423..2e4882891 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -50,109 +50,85 @@ type proc fetchMerkleProofElements*( g: OnchainGroupManager ): Future[Result[seq[byte], string]] {.async.} = - try: - let membershipIndex = g.membershipIndex.get() - let index40 = stuint(membershipIndex, 40) + let membershipIndex = g.membershipIndex.get() + let index40 = stuint(membershipIndex, 40) - let methodSig = "getMerkleProof(uint40)" - var paddedParam = newSeq[byte](32) - let indexBytes = index40.toBytesBE() - for i in 0 ..< min(indexBytes.len, paddedParam.len): - paddedParam[paddedParam.len - indexBytes.len + i] = indexBytes[i] + let methodSig = "getMerkleProof(uint40)" + var paddedParam = newSeq[byte](32) + let indexBytes = index40.toBytesBE() + for i in 0 ..< min(indexBytes.len, paddedParam.len): + paddedParam[paddedParam.len - indexBytes.len + i] = indexBytes[i] - let response = await sendEthCallWithParams( - ethRpc = g.ethRpc.get(), - functionSignature = methodSig, - params = paddedParam, - fromAddress = g.ethRpc.get().defaultAccount, - toAddress = fromHex(Address, g.ethContractAddress), - chainId = g.chainId, - ) + let response = await sendEthCallWithParams( + ethRpc = g.ethRpc.get(), + functionSignature = methodSig, + params = paddedParam, + fromAddress = g.ethRpc.get().defaultAccount, + toAddress = fromHex(Address, g.ethContractAddress), + chainId = g.chainId, + ) - return response - except CatchableError: - error "Failed to fetch Merkle proof elements", error = getCurrentExceptionMsg() - return err("Failed to fetch merkle proof elements: " & getCurrentExceptionMsg()) + return response proc fetchMerkleRoot*( g: OnchainGroupManager ): Future[Result[UInt256, string]] {.async.} = - try: - let merkleRoot = await sendEthCallWithoutParams( - ethRpc = g.ethRpc.get(), - functionSignature = "root()", - fromAddress = g.ethRpc.get().defaultAccount, - toAddress = fromHex(Address, g.ethContractAddress), - chainId = g.chainId, - ) - return merkleRoot - except CatchableError: - error "Failed to fetch Merkle root", error = getCurrentExceptionMsg() - return err("Failed to fetch merkle root: " & getCurrentExceptionMsg()) + let merkleRoot = await sendEthCallWithoutParams( + ethRpc = g.ethRpc.get(), + functionSignature = "root()", + fromAddress = g.ethRpc.get().defaultAccount, + toAddress = fromHex(Address, g.ethContractAddress), + chainId = g.chainId, + ) + return merkleRoot proc fetchNextFreeIndex*( g: OnchainGroupManager ): Future[Result[UInt256, string]] {.async.} = - try: - let nextFreeIndex = await sendEthCallWithoutParams( - ethRpc = g.ethRpc.get(), - functionSignature = "nextFreeIndex()", - fromAddress = g.ethRpc.get().defaultAccount, - toAddress = fromHex(Address, g.ethContractAddress), - chainId = g.chainId, - ) - return nextFreeIndex - except CatchableError: - error "Failed to fetch next free index", error = getCurrentExceptionMsg() - return err("Failed to fetch next free index: " & getCurrentExceptionMsg()) + let nextFreeIndex = await sendEthCallWithoutParams( + ethRpc = g.ethRpc.get(), + functionSignature = "nextFreeIndex()", + fromAddress = g.ethRpc.get().defaultAccount, + toAddress = fromHex(Address, g.ethContractAddress), + chainId = g.chainId, + ) + return nextFreeIndex proc fetchMembershipStatus*( g: OnchainGroupManager, idCommitment: IDCommitment ): Future[Result[bool, string]] {.async.} = - try: - let params = idCommitment.reversed() - let responseBytes = ( - await sendEthCallWithParams( - ethRpc = g.ethRpc.get(), - functionSignature = "isInMembershipSet(uint256)", - params = params, - fromAddress = g.ethRpc.get().defaultAccount, - toAddress = fromHex(Address, g.ethContractAddress), - chainId = g.chainId, - ) - ).valueOr: - return err("Failed to check membership: " & error) - - return ok(responseBytes.len == 32 and responseBytes[^1] == 1'u8) - except CatchableError: - error "Failed to fetch membership set membership", error = getCurrentExceptionMsg() - return err("Failed to fetch membership set membership: " & getCurrentExceptionMsg()) - -proc fetchMaxMembershipRateLimit*( - g: OnchainGroupManager -): Future[Result[UInt256, string]] {.async.} = - try: - let maxMembershipRateLimit = await sendEthCallWithoutParams( + let params = idCommitment.reversed() + let responseBytes = ( + await sendEthCallWithParams( ethRpc = g.ethRpc.get(), - functionSignature = "maxMembershipRateLimit()", + functionSignature = "isInMembershipSet(uint256)", + params = params, fromAddress = g.ethRpc.get().defaultAccount, toAddress = fromHex(Address, g.ethContractAddress), chainId = g.chainId, ) - return maxMembershipRateLimit - except CatchableError: - error "Failed to fetch max membership rate limit", error = getCurrentExceptionMsg() - return err("Failed to fetch max membership rate limit: " & getCurrentExceptionMsg()) + ).valueOr: + return err("Failed to check membership: " & error) -template initializedGuard(g: OnchainGroupManager): untyped = + return ok(responseBytes.len == 32 and responseBytes[^1] == 1'u8) + +proc fetchMaxMembershipRateLimit*( + g: OnchainGroupManager +): Future[Result[UInt256, string]] {.async.} = + let maxMembershipRateLimit = await sendEthCallWithoutParams( + ethRpc = g.ethRpc.get(), + functionSignature = "maxMembershipRateLimit()", + fromAddress = g.ethRpc.get().defaultAccount, + toAddress = fromHex(Address, g.ethContractAddress), + chainId = g.chainId, + ) + + return maxMembershipRateLimit + +proc checkInitialized(g: OnchainGroupManager): Result[void, string] = if not g.initialized: - raise newException(CatchableError, "OnchainGroupManager is not initialized") - -template retryWrapper( - g: OnchainGroupManager, res: auto, errStr: string, body: untyped -): auto = - retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction): - body + return err("OnchainGroupManager is not initialized") + return ok() proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} = let rootRes = (await g.fetchMerkleRoot()).valueOr: @@ -172,40 +148,37 @@ proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} = return false -proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError]).} = - try: - initializedGuard(g) - const rpcDelay = 5.seconds +proc trackRootChanges*(g: OnchainGroupManager): Future[Result[void, string]] {.async.} = + ?checkInitialized(g) - while true: - await sleepAsync(rpcDelay) - let rootUpdated = await g.updateRoots() + const rpcDelay = 5.seconds - if rootUpdated: - ## The membership set on-chain has changed (some new members have joined or some members have left) - if g.membershipIndex.isSome(): - ## A membership index exists only if the node has registered with RLN. - ## Non-registered nodes cannot have Merkle proof elements. - let proofResult = await g.fetchMerkleProofElements() - if proofResult.isErr(): - error "Failed to fetch Merkle proof", error = proofResult.error - else: - g.merkleProofCache = proofResult.get() + while true: + await sleepAsync(rpcDelay) + let rootUpdated = await g.updateRoots() - let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr: - error "Failed to fetch next free index", error = error - raise - newException(CatchableError, "Failed to fetch next free index: " & error) + if rootUpdated: + ## The membership set on-chain has changed (some new members have joined or some members have left) + if g.membershipIndex.isSome(): + ## A membership index exists only if the node has registered with RLN. + ## Non-registered nodes cannot have Merkle proof elements. + let proofResult = await g.fetchMerkleProofElements() + if proofResult.isErr(): + error "Failed to fetch Merkle proof", error = proofResult.error + else: + g.merkleProofCache = proofResult.get() - let memberCount = cast[int64](nextFreeIndex) - waku_rln_number_registered_memberships.set(float64(memberCount)) - except CatchableError: - error "Fatal error in trackRootChanges", error = getCurrentExceptionMsg() + let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr: + error "Failed to fetch next free index", error = error + return err("Failed to fetch next free index: " & error) + + let memberCount = cast[int64](nextFreeIndex) + waku_rln_number_registered_memberships.set(float64(memberCount)) method register*( g: OnchainGroupManager, rateCommitment: RateCommitment -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) +): Future[Result[void, string]] {.async.} = + ?checkInitialized(g) try: let leaf = rateCommitment.toLeaf().get() @@ -214,33 +187,40 @@ method register*( info "registering member via callback", rateCommitment = leaf, index = idx await g.registerCb.get()(@[Membership(rateCommitment: leaf, index: idx)]) g.latestIndex.inc() - except CatchableError: - raise newException(ValueError, getCurrentExceptionMsg()) + except Exception as e: + return err("Failed to call register callback: " & e.msg) + + return ok() method register*( g: OnchainGroupManager, identityCredential: IdentityCredential, userMessageLimit: UserMessageLimit, -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) +): Future[Result[void, string]] {.async.} = + ?checkInitialized(g) let ethRpc = g.ethRpc.get() let wakuRlnContract = g.wakuRlnContract.get() - var gasPrice: int - g.retryWrapper(gasPrice, "Failed to get gas price"): - let fetchedGasPrice = uint64(await ethRpc.provider.eth_gasPrice()) - ## Multiply by 2 to speed up the transaction - ## Check for overflow when casting to int - if fetchedGasPrice > uint64(high(int) div 2): - warn "Gas price overflow detected, capping at maximum int value", - fetchedGasPrice = fetchedGasPrice, maxInt = high(int) - high(int) - else: - let calculatedGasPrice = int(fetchedGasPrice) * 2 - debug "Gas price calculated", - fetchedGasPrice = fetchedGasPrice, gasPrice = calculatedGasPrice - calculatedGasPrice + let gasPrice = ( + await retryWrapper( + RetryStrategy.new(), + "Failed to get gas price", + proc(): Future[int] {.async.} = + let fetchedGasPrice = uint64(await ethRpc.provider.eth_gasPrice()) + if fetchedGasPrice > uint64(high(int) div 2): + warn "Gas price overflow detected, capping at maximum int value", + fetchedGasPrice = fetchedGasPrice, maxInt = high(int) + return high(int) + else: + let calculatedGasPrice = int(fetchedGasPrice) * 2 + debug "Gas price calculated", + fetchedGasPrice = fetchedGasPrice, gasPrice = calculatedGasPrice + return calculatedGasPrice, + ) + ).valueOr: + return err("Failed to get gas price: " & error) + let idCommitmentHex = identityCredential.idCommitment.inHex() debug "identityCredential idCommitmentHex", idCommitment = idCommitmentHex let idCommitment = identityCredential.idCommitment.toUInt256() @@ -249,27 +229,37 @@ method register*( idCommitment = idCommitment, userMessageLimit = userMessageLimit, idCommitmentsToErase = idCommitmentsToErase - var txHash: TxHash - g.retryWrapper(txHash, "Failed to register the member"): - await wakuRlnContract - .register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase) - .send(gasPrice = gasPrice) + let txHash = ( + await retryWrapper( + RetryStrategy.new(), + "Failed to register the member", + proc(): Future[TxHash] {.async.} = + return await wakuRlnContract + .register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase) + .send(gasPrice = gasPrice), + ) + ).valueOr: + return err("Failed to register member: " & error) # wait for the transaction to be mined - var tsReceipt: ReceiptObject - g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"): - await ethRpc.getMinedTransactionReceipt(txHash) + let tsReceipt = ( + await retryWrapper( + RetryStrategy.new(), + "Failed to get the transaction receipt", + proc(): Future[ReceiptObject] {.async.} = + return await ethRpc.getMinedTransactionReceipt(txHash), + ) + ).valueOr: + return err("Failed to get transaction receipt: " & error) debug "registration transaction mined", txHash = txHash g.registrationTxHash = some(txHash) # the receipt topic holds the hash of signature of the raised events debug "ts receipt", receipt = tsReceipt[] if tsReceipt.status.isNone(): - raise newException(ValueError, "Transaction failed: status is None") + return err("Transaction failed: status is None") if tsReceipt.status.get() != 1.Quantity: - raise newException( - ValueError, "Transaction failed with status: " & $tsReceipt.status.get() - ) + return err("Transaction failed with status: " & $tsReceipt.status.get()) ## Search through all transaction logs to find the MembershipRegistered event let expectedEventSignature = cast[FixedBytes[32]](keccak.keccak256.digest( @@ -283,9 +273,7 @@ method register*( break if membershipRegisteredLog.isNone(): - raise newException( - ValueError, "register: MembershipRegistered event not found in transaction logs" - ) + return err("register: MembershipRegistered event not found in transaction logs") let registrationLog = membershipRegisteredLog.get() @@ -309,20 +297,28 @@ method register*( if g.registerCb.isSome(): let member = Membership(rateCommitment: rateCommitment, index: g.latestIndex) - await g.registerCb.get()(@[member]) + try: + await g.registerCb.get()(@[member]) + except Exception as e: + return err("Failed to call register callback: " & e.msg) g.latestIndex.inc() - return + return ok() method withdraw*( g: OnchainGroupManager, idCommitment: IDCommitment -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) # TODO: after slashing is enabled on the contract +): Future[Result[void, string]] {.async.} = + checkInitialized(g).isOkOr: + return err(error) + return ok() method withdrawBatch*( g: OnchainGroupManager, idCommitments: seq[IDCommitment] -): Future[void] {.async: (raises: [Exception]).} = - initializedGuard(g) +): Future[Result[void, string]] {.async.} = + checkInitialized(g).isOkOr: + return err(error) + + return ok() proc getRootFromProofAndIndex( g: OnchainGroupManager, elements: seq[byte], bits: seq[byte] @@ -354,7 +350,7 @@ method generateProof*( epoch: Epoch, messageId: MessageId, rlnIdentifier = DefaultRlnIdentifier, -): GroupManagerResult[RateLimitProof] {.gcsafe, raises: [].} = +): GroupManagerResult[RateLimitProof] {.gcsafe.} = ## Generates an RLN proof using the cached Merkle proof and custom witness # Ensure identity credentials and membership index are set if g.idCredentials.isNone(): @@ -452,7 +448,7 @@ method generateProof*( method verifyProof*( g: OnchainGroupManager, input: seq[byte], proof: RateLimitProof -): GroupManagerResult[bool] {.gcsafe, raises: [].} = +): GroupManagerResult[bool] {.gcsafe.} = ## -- Verifies an RLN rate-limit proof against the set of valid Merkle roots -- var normalizedProof = proof @@ -492,25 +488,31 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = proc establishConnection( g: OnchainGroupManager ): Future[GroupManagerResult[Web3]] {.async.} = - var ethRpc: Web3 + let ethRpc = ( + await retryWrapper( + RetryStrategy.new(), + "Failed to connect to the Ethereum client", + proc(): Future[Web3] {.async.} = + var innerEthRpc: Web3 + var connected = false + for clientUrl in g.ethClientUrls: + ## We give a chance to the user to provide multiple clients + ## and we try to connect to each of them + try: + innerEthRpc = await newWeb3(clientUrl) + connected = true + break + except CatchableError: + error "failed connect Eth client", error = getCurrentExceptionMsg() - g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"): - var innerEthRpc: Web3 - var connected = false - for clientUrl in g.ethClientUrls: - ## We give a chance to the user to provide multiple clients - ## and we try to connect to each of them - try: - innerEthRpc = await newWeb3(clientUrl) - connected = true - break - except CatchableError: - error "failed connect Eth client", error = getCurrentExceptionMsg() + ## this exception is handled by the retrywrapper + if not connected: + raise newException(CatchableError, "all failed") - if not connected: - raise newException(CatchableError, "all failed") - - innerEthRpc + return innerEthRpc, + ) + ).valueOr: + return err("Failed to establish Ethereum connection: " & error) return ok(ethRpc) @@ -519,9 +521,15 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} let ethRpc: Web3 = (await establishConnection(g)).valueOr: return err("failed to connect to Ethereum clients: " & $error) - var fetchedChainId: UInt256 - g.retryWrapper(fetchedChainId, "Failed to get the chain id"): - await ethRpc.provider.eth_chainId() + let fetchedChainId = ( + await retryWrapper( + RetryStrategy.new(), + "Failed to get the chain id", + proc(): Future[UInt256] {.async.} = + return await ethRpc.provider.eth_chainId(), + ) + ).valueOr: + return err("Failed to get chain id: " & error) # Set the chain id if g.chainId == 0: @@ -595,8 +603,10 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} proc onDisconnect() {.async.} = error "Ethereum client disconnected" - var newEthRpc: Web3 = (await g.establishConnection()).valueOr: - g.onFatalErrorAction("failed to connect to Ethereum clients onDisconnect") + let newEthRpc: Web3 = (await g.establishConnection()).valueOr: + error "Fatal: failed to reconnect to Ethereum clients after disconnect", + error = error + g.onFatalErrorAction("failed to reconnect to Ethereum clients: " & error) return newEthRpc.ondisconnect = ethRpc.ondisconnect @@ -616,12 +626,14 @@ method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} = g.initialized = false method isReady*(g: OnchainGroupManager): Future[bool] {.async.} = - initializedGuard(g) + checkInitialized(g).isOkOr: + return false if g.ethRpc.isNone(): + error "Ethereum RPC client is not configured" return false if g.wakuRlnContract.isNone(): + error "Waku RLN contract is not configured" return false - return true diff --git a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim index df8716279..97bc0c435 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim @@ -1,36 +1,31 @@ -import ../../../common/error_handling import chronos import results +const + DefaultRetryDelay* = 4000.millis + DefaultRetryCount* = 15'u + type RetryStrategy* = object - shouldRetry*: bool retryDelay*: Duration retryCount*: uint proc new*(T: type RetryStrategy): RetryStrategy = - return RetryStrategy(shouldRetry: true, retryDelay: 4000.millis, retryCount: 15) + return RetryStrategy(retryDelay: DefaultRetryDelay, retryCount: DefaultRetryCount) -template retryWrapper*( - res: auto, - retryStrategy: RetryStrategy, - errStr: string, - errCallback: OnFatalErrorHandler, - body: untyped, -): auto = - if errCallback == nil: - raise newException(CatchableError, "Ensure that the errCallback is set") +proc retryWrapper*[T]( + retryStrategy: RetryStrategy, errStr: string, body: proc(): Future[T] {.async.} +): Future[Result[T, string]] {.async.} = var retryCount = retryStrategy.retryCount - var shouldRetry = retryStrategy.shouldRetry - var exceptionMessage = "" + var lastError = "" - while shouldRetry and retryCount > 0: + while retryCount > 0: try: - res = body - shouldRetry = false - except: + let value = await body() + return ok(value) + except CatchableError as e: retryCount -= 1 - exceptionMessage = getCurrentExceptionMsg() - await sleepAsync(retryStrategy.retryDelay) - if shouldRetry: - errCallback(errStr & ": " & exceptionMessage) - return + lastError = e.msg + if retryCount > 0: + await sleepAsync(retryStrategy.retryDelay) + + return err(errStr & ": " & lastError) diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 5c893e2a2..8559dcd66 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -68,7 +68,7 @@ type WakuRLNRelay* = ref object of RootObj onFatalErrorAction*: OnFatalErrorHandler nonceManager*: NonceManager epochMonitorFuture*: Future[void] - rootChangesFuture*: Future[void] + rootChangesFuture*: Future[Result[void, string]] brokerCtx*: BrokerContext proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch = @@ -467,7 +467,7 @@ proc mount( return ok(wakuRlnRelay) -proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} = +proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async.} = ## returns true if the rln-relay protocol is ready to relay messages ## returns false otherwise