mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-03-15 05:23:09 +00:00
Chore: Simplify on chain group manager error handling (#3678)
This commit is contained in:
parent
03249df715
commit
bc9454db5e
@ -134,11 +134,8 @@ suite "RLN Proofs as a Lightpush Service":
|
||||
let manager1 = cast[OnchainGroupManager](server.wakuRlnRelay.groupManager)
|
||||
let idCredentials1 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated1 = waitFor manager1.updateRoots()
|
||||
info "Updated root for node1", rootUpdated1
|
||||
|
||||
@ -137,11 +137,8 @@ suite "RLN Proofs as a Lightpush Service":
|
||||
let manager1 = cast[OnchainGroupManager](server.wakuRlnRelay.groupManager)
|
||||
let idCredentials1 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated1 = waitFor manager1.updateRoots()
|
||||
info "Updated root for node1", rootUpdated1
|
||||
|
||||
@ -74,10 +74,11 @@ suite "Onchain group manager":
|
||||
raiseAssert "Expected error when keystore file doesn't exist"
|
||||
|
||||
test "trackRootChanges: should guard against uninitialized state":
|
||||
try:
|
||||
discard manager.trackRootChanges()
|
||||
except CatchableError:
|
||||
check getCurrentExceptionMsg().len == 38
|
||||
let initializedResult = waitFor manager.trackRootChanges()
|
||||
|
||||
check:
|
||||
initializedResult.isErr()
|
||||
initializedResult.error == "OnchainGroupManager is not initialized"
|
||||
|
||||
test "trackRootChanges: should sync to the state of the group":
|
||||
let credentials = generateCredentials()
|
||||
@ -86,10 +87,8 @@ suite "Onchain group manager":
|
||||
|
||||
let merkleRootBefore = waitFor manager.fetchMerkleRoot()
|
||||
|
||||
try:
|
||||
waitFor manager.register(credentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
discard waitFor withTimeout(trackRootChanges(manager), 15.seconds)
|
||||
|
||||
@ -110,13 +109,11 @@ suite "Onchain group manager":
|
||||
|
||||
let merkleRootBefore = waitFor manager.fetchMerkleRoot()
|
||||
|
||||
try:
|
||||
for i in 0 ..< credentials.len():
|
||||
info "Registering credential", index = i, credential = credentials[i]
|
||||
waitFor manager.register(credentials[i], UserMessageLimit(20))
|
||||
discard waitFor manager.updateRoots()
|
||||
except Exception, CatchableError:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
for i in 0 ..< credentials.len():
|
||||
info "Registering credential", index = i, credential = credentials[i]
|
||||
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register credential " & $i & ": " & error
|
||||
discard waitFor manager.updateRoots()
|
||||
|
||||
let merkleRootAfter = waitFor manager.fetchMerkleRoot()
|
||||
|
||||
@ -127,16 +124,15 @@ suite "Onchain group manager":
|
||||
test "register: should guard against uninitialized state":
|
||||
let dummyCommitment = default(IDCommitment)
|
||||
|
||||
try:
|
||||
waitFor manager.register(
|
||||
RateCommitment(
|
||||
idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(20)
|
||||
)
|
||||
let res = waitFor manager.register(
|
||||
RateCommitment(
|
||||
idCommitment: dummyCommitment, userMessageLimit: UserMessageLimit(20)
|
||||
)
|
||||
except CatchableError:
|
||||
assert true
|
||||
except Exception:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
)
|
||||
|
||||
check:
|
||||
res.isErr()
|
||||
res.error == "OnchainGroupManager is not initialized"
|
||||
|
||||
test "register: should register successfully":
|
||||
# TODO :- similar to ```trackRootChanges: should fetch history correctly```
|
||||
@ -146,11 +142,8 @@ suite "Onchain group manager":
|
||||
let idCredentials = generateCredentials()
|
||||
let merkleRootBefore = waitFor manager.fetchMerkleRoot()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let merkleRootAfter = waitFor manager.fetchMerkleRoot()
|
||||
|
||||
@ -177,26 +170,25 @@ suite "Onchain group manager":
|
||||
|
||||
manager.onRegister(callback)
|
||||
|
||||
try:
|
||||
(
|
||||
waitFor manager.register(
|
||||
RateCommitment(
|
||||
idCommitment: idCommitment, userMessageLimit: UserMessageLimit(20)
|
||||
)
|
||||
)
|
||||
except Exception, CatchableError:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
waitFor fut
|
||||
|
||||
test "withdraw: should guard against uninitialized state":
|
||||
let idSecretHash = generateCredentials().idSecretHash
|
||||
|
||||
try:
|
||||
waitFor manager.withdraw(idSecretHash)
|
||||
except CatchableError:
|
||||
assert true
|
||||
except Exception:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
let res = waitFor manager.withdraw(idSecretHash)
|
||||
|
||||
check:
|
||||
res.isErr()
|
||||
res.error == "OnchainGroupManager is not initialized"
|
||||
|
||||
test "validateRoot: should validate good root":
|
||||
let idCredentials = generateCredentials()
|
||||
@ -217,10 +209,8 @@ suite "Onchain group manager":
|
||||
(waitFor manager.init()).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned : " & getCurrentExceptionMsg()
|
||||
|
||||
waitFor fut
|
||||
|
||||
@ -299,10 +289,8 @@ suite "Onchain group manager":
|
||||
|
||||
manager.onRegister(callback)
|
||||
|
||||
try:
|
||||
waitFor manager.register(credentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
waitFor fut
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
@ -337,11 +325,8 @@ suite "Onchain group manager":
|
||||
|
||||
let idCredential = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredential, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling startGroupSync: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredential, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let messageBytes = "Hello".toBytes()
|
||||
|
||||
@ -395,14 +380,12 @@ suite "Onchain group manager":
|
||||
|
||||
return callback
|
||||
|
||||
try:
|
||||
manager.onRegister(generateCallback(futures, credentials))
|
||||
manager.onRegister(generateCallback(futures, credentials))
|
||||
|
||||
for i in 0 ..< credentials.len():
|
||||
waitFor manager.register(credentials[i], UserMessageLimit(20))
|
||||
discard waitFor manager.updateRoots()
|
||||
except Exception, CatchableError:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
for i in 0 ..< credentials.len():
|
||||
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register credential " & $i & ": " & error
|
||||
discard waitFor manager.updateRoots()
|
||||
|
||||
waitFor allFutures(futures)
|
||||
|
||||
|
||||
@ -242,11 +242,8 @@ suite "Waku rln relay":
|
||||
let manager = cast[OnchainGroupManager](wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let epoch1 = wakuRlnRelay.getCurrentEpoch()
|
||||
|
||||
@ -301,11 +298,8 @@ suite "Waku rln relay":
|
||||
let manager = cast[OnchainGroupManager](wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
# usually it's 20 seconds but we set it to 1 for testing purposes which make the test faster
|
||||
wakuRlnRelay.rlnMaxTimestampGap = 1
|
||||
@ -353,11 +347,9 @@ suite "Waku rln relay":
|
||||
let manager1 = cast[OnchainGroupManager](wakuRlnRelay1.groupManager)
|
||||
let idCredentials1 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
(waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
"error returned when calling register: " & error
|
||||
|
||||
let index2 = MembershipIndex(6)
|
||||
let rlnConf2 = getWakuRlnConfig(manager = manager, index = index2)
|
||||
@ -369,11 +361,9 @@ suite "Waku rln relay":
|
||||
let manager2 = cast[OnchainGroupManager](wakuRlnRelay2.groupManager)
|
||||
let idCredentials2 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager2.register(idCredentials2, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
(waitFor manager2.register(idCredentials2, UserMessageLimit(20))).isOkOr:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
"error returned when calling register: " & error
|
||||
|
||||
# get the current epoch time
|
||||
let epoch = wakuRlnRelay1.getCurrentEpoch()
|
||||
|
||||
@ -58,11 +58,8 @@ procSuite "WakuNode - RLN relay":
|
||||
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
|
||||
let idCredentials1 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated1 = waitFor manager1.updateRoots()
|
||||
info "Updated root for node1", rootUpdated1
|
||||
@ -172,11 +169,8 @@ procSuite "WakuNode - RLN relay":
|
||||
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
|
||||
let idCredentials1 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated1 = waitFor manager1.updateRoots()
|
||||
info "Updated root for node", node = 1, rootUpdated = rootUpdated1
|
||||
@ -192,11 +186,8 @@ procSuite "WakuNode - RLN relay":
|
||||
let manager2 = cast[OnchainGroupManager](node2.wakuRlnRelay.groupManager)
|
||||
let idCredentials2 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager2.register(idCredentials2, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager2.register(idCredentials2, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated2 = waitFor manager2.updateRoots()
|
||||
info "Updated root for node", node = 2, rootUpdated = rootUpdated2
|
||||
@ -212,11 +203,8 @@ procSuite "WakuNode - RLN relay":
|
||||
let manager3 = cast[OnchainGroupManager](node3.wakuRlnRelay.groupManager)
|
||||
let idCredentials3 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager3.register(idCredentials3, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager3.register(idCredentials3, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated3 = waitFor manager3.updateRoots()
|
||||
info "Updated root for node", node = 3, rootUpdated = rootUpdated3
|
||||
@ -333,11 +321,8 @@ procSuite "WakuNode - RLN relay":
|
||||
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
|
||||
let idCredentials1 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated1 = waitFor manager1.updateRoots()
|
||||
info "Updated root for node1", rootUpdated1
|
||||
@ -448,11 +433,8 @@ procSuite "WakuNode - RLN relay":
|
||||
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
|
||||
let idCredentials1 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated1 = waitFor manager1.updateRoots()
|
||||
info "Updated root for node1", rootUpdated1
|
||||
@ -620,11 +602,8 @@ procSuite "WakuNode - RLN relay":
|
||||
let manager1 = cast[OnchainGroupManager](node1.wakuRlnRelay.groupManager)
|
||||
let idCredentials1 = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager1.register(idCredentials1, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager1.register(idCredentials1, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "error returned when calling register: " & error
|
||||
|
||||
let rootUpdated1 = waitFor manager1.updateRoots()
|
||||
info "Updated root for node1", rootUpdated1
|
||||
|
||||
@ -42,8 +42,8 @@ suite "Waku v2 Rest API - Relay":
|
||||
var manager {.threadVar.}: OnchainGroupManager
|
||||
|
||||
setup:
|
||||
anvilProc = runAnvil()
|
||||
manager = waitFor setupOnchainGroupManager()
|
||||
anvilProc = runAnvil(stateFile = some(DEFAULT_ANVIL_STATE_PATH))
|
||||
manager = waitFor setupOnchainGroupManager(deployContracts = false)
|
||||
|
||||
teardown:
|
||||
stopAnvil(anvilProc)
|
||||
@ -268,11 +268,8 @@ suite "Waku v2 Rest API - Relay":
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register identity credentials" & getCurrentExceptionMsg()
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
info "Updated root for node", rootUpdated
|
||||
@ -545,11 +542,8 @@ suite "Waku v2 Rest API - Relay":
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register identity credentials" & getCurrentExceptionMsg()
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
info "Updated root for node", rootUpdated
|
||||
@ -617,11 +611,8 @@ suite "Waku v2 Rest API - Relay":
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register identity credentials" & getCurrentExceptionMsg()
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
info "Updated root for node", rootUpdated
|
||||
@ -679,11 +670,8 @@ suite "Waku v2 Rest API - Relay":
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register identity credentials" & getCurrentExceptionMsg()
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
info "Updated root for node", rootUpdated
|
||||
@ -754,11 +742,8 @@ suite "Waku v2 Rest API - Relay":
|
||||
let manager = cast[OnchainGroupManager](node.wakuRlnRelay.groupManager)
|
||||
let idCredentials = generateCredentials()
|
||||
|
||||
try:
|
||||
waitFor manager.register(idCredentials, UserMessageLimit(20))
|
||||
except Exception, CatchableError:
|
||||
assert false,
|
||||
"exception raised when calling register: " & getCurrentExceptionMsg()
|
||||
(waitFor manager.register(idCredentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register identity credentials" & getCurrentExceptionMsg()
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
info "Updated root for node", rootUpdated
|
||||
|
||||
@ -73,7 +73,9 @@ proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
|
||||
|
||||
# 4. register on-chain
|
||||
try:
|
||||
waitFor groupManager.register(credential, conf.userMessageLimit)
|
||||
(waitFor groupManager.register(credential, conf.userMessageLimit)).isOkOr:
|
||||
error "Failed to register on-chain", error = error
|
||||
quit(QuitFailure)
|
||||
except Exception, CatchableError:
|
||||
error "failure while registering credentials on-chain",
|
||||
error = getCurrentExceptionMsg()
|
||||
|
||||
@ -144,6 +144,4 @@ method generateProof*(
|
||||
return err("generateProof is not implemented")
|
||||
|
||||
method isReady*(g: GroupManager): Future[bool] {.base, async.} =
|
||||
raise newException(
|
||||
CatchableError, "isReady proc for " & $g.type & " is not implemented yet"
|
||||
)
|
||||
return true
|
||||
|
||||
@ -50,109 +50,85 @@ type
|
||||
proc fetchMerkleProofElements*(
|
||||
g: OnchainGroupManager
|
||||
): Future[Result[seq[byte], string]] {.async.} =
|
||||
try:
|
||||
let membershipIndex = g.membershipIndex.get()
|
||||
let index40 = stuint(membershipIndex, 40)
|
||||
let membershipIndex = g.membershipIndex.get()
|
||||
let index40 = stuint(membershipIndex, 40)
|
||||
|
||||
let methodSig = "getMerkleProof(uint40)"
|
||||
var paddedParam = newSeq[byte](32)
|
||||
let indexBytes = index40.toBytesBE()
|
||||
for i in 0 ..< min(indexBytes.len, paddedParam.len):
|
||||
paddedParam[paddedParam.len - indexBytes.len + i] = indexBytes[i]
|
||||
let methodSig = "getMerkleProof(uint40)"
|
||||
var paddedParam = newSeq[byte](32)
|
||||
let indexBytes = index40.toBytesBE()
|
||||
for i in 0 ..< min(indexBytes.len, paddedParam.len):
|
||||
paddedParam[paddedParam.len - indexBytes.len + i] = indexBytes[i]
|
||||
|
||||
let response = await sendEthCallWithParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = methodSig,
|
||||
params = paddedParam,
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
let response = await sendEthCallWithParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = methodSig,
|
||||
params = paddedParam,
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
|
||||
return response
|
||||
except CatchableError:
|
||||
error "Failed to fetch Merkle proof elements", error = getCurrentExceptionMsg()
|
||||
return err("Failed to fetch merkle proof elements: " & getCurrentExceptionMsg())
|
||||
return response
|
||||
|
||||
proc fetchMerkleRoot*(
|
||||
g: OnchainGroupManager
|
||||
): Future[Result[UInt256, string]] {.async.} =
|
||||
try:
|
||||
let merkleRoot = await sendEthCallWithoutParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "root()",
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
return merkleRoot
|
||||
except CatchableError:
|
||||
error "Failed to fetch Merkle root", error = getCurrentExceptionMsg()
|
||||
return err("Failed to fetch merkle root: " & getCurrentExceptionMsg())
|
||||
let merkleRoot = await sendEthCallWithoutParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "root()",
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
return merkleRoot
|
||||
|
||||
proc fetchNextFreeIndex*(
|
||||
g: OnchainGroupManager
|
||||
): Future[Result[UInt256, string]] {.async.} =
|
||||
try:
|
||||
let nextFreeIndex = await sendEthCallWithoutParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "nextFreeIndex()",
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
return nextFreeIndex
|
||||
except CatchableError:
|
||||
error "Failed to fetch next free index", error = getCurrentExceptionMsg()
|
||||
return err("Failed to fetch next free index: " & getCurrentExceptionMsg())
|
||||
let nextFreeIndex = await sendEthCallWithoutParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "nextFreeIndex()",
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
return nextFreeIndex
|
||||
|
||||
proc fetchMembershipStatus*(
|
||||
g: OnchainGroupManager, idCommitment: IDCommitment
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
try:
|
||||
let params = idCommitment.reversed()
|
||||
let responseBytes = (
|
||||
await sendEthCallWithParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "isInMembershipSet(uint256)",
|
||||
params = params,
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to check membership: " & error)
|
||||
|
||||
return ok(responseBytes.len == 32 and responseBytes[^1] == 1'u8)
|
||||
except CatchableError:
|
||||
error "Failed to fetch membership set membership", error = getCurrentExceptionMsg()
|
||||
return err("Failed to fetch membership set membership: " & getCurrentExceptionMsg())
|
||||
|
||||
proc fetchMaxMembershipRateLimit*(
|
||||
g: OnchainGroupManager
|
||||
): Future[Result[UInt256, string]] {.async.} =
|
||||
try:
|
||||
let maxMembershipRateLimit = await sendEthCallWithoutParams(
|
||||
let params = idCommitment.reversed()
|
||||
let responseBytes = (
|
||||
await sendEthCallWithParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "maxMembershipRateLimit()",
|
||||
functionSignature = "isInMembershipSet(uint256)",
|
||||
params = params,
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
return maxMembershipRateLimit
|
||||
except CatchableError:
|
||||
error "Failed to fetch max membership rate limit", error = getCurrentExceptionMsg()
|
||||
return err("Failed to fetch max membership rate limit: " & getCurrentExceptionMsg())
|
||||
).valueOr:
|
||||
return err("Failed to check membership: " & error)
|
||||
|
||||
template initializedGuard(g: OnchainGroupManager): untyped =
|
||||
return ok(responseBytes.len == 32 and responseBytes[^1] == 1'u8)
|
||||
|
||||
proc fetchMaxMembershipRateLimit*(
|
||||
g: OnchainGroupManager
|
||||
): Future[Result[UInt256, string]] {.async.} =
|
||||
let maxMembershipRateLimit = await sendEthCallWithoutParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "maxMembershipRateLimit()",
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
|
||||
return maxMembershipRateLimit
|
||||
|
||||
proc checkInitialized(g: OnchainGroupManager): Result[void, string] =
|
||||
if not g.initialized:
|
||||
raise newException(CatchableError, "OnchainGroupManager is not initialized")
|
||||
|
||||
template retryWrapper(
|
||||
g: OnchainGroupManager, res: auto, errStr: string, body: untyped
|
||||
): auto =
|
||||
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
|
||||
body
|
||||
return err("OnchainGroupManager is not initialized")
|
||||
return ok()
|
||||
|
||||
proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
|
||||
let rootRes = (await g.fetchMerkleRoot()).valueOr:
|
||||
@ -172,40 +148,37 @@ proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
|
||||
|
||||
return false
|
||||
|
||||
proc trackRootChanges*(g: OnchainGroupManager) {.async: (raises: [CatchableError]).} =
|
||||
try:
|
||||
initializedGuard(g)
|
||||
const rpcDelay = 5.seconds
|
||||
proc trackRootChanges*(g: OnchainGroupManager): Future[Result[void, string]] {.async.} =
|
||||
?checkInitialized(g)
|
||||
|
||||
while true:
|
||||
await sleepAsync(rpcDelay)
|
||||
let rootUpdated = await g.updateRoots()
|
||||
const rpcDelay = 5.seconds
|
||||
|
||||
if rootUpdated:
|
||||
## The membership set on-chain has changed (some new members have joined or some members have left)
|
||||
if g.membershipIndex.isSome():
|
||||
## A membership index exists only if the node has registered with RLN.
|
||||
## Non-registered nodes cannot have Merkle proof elements.
|
||||
let proofResult = await g.fetchMerkleProofElements()
|
||||
if proofResult.isErr():
|
||||
error "Failed to fetch Merkle proof", error = proofResult.error
|
||||
else:
|
||||
g.merkleProofCache = proofResult.get()
|
||||
while true:
|
||||
await sleepAsync(rpcDelay)
|
||||
let rootUpdated = await g.updateRoots()
|
||||
|
||||
let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr:
|
||||
error "Failed to fetch next free index", error = error
|
||||
raise
|
||||
newException(CatchableError, "Failed to fetch next free index: " & error)
|
||||
if rootUpdated:
|
||||
## The membership set on-chain has changed (some new members have joined or some members have left)
|
||||
if g.membershipIndex.isSome():
|
||||
## A membership index exists only if the node has registered with RLN.
|
||||
## Non-registered nodes cannot have Merkle proof elements.
|
||||
let proofResult = await g.fetchMerkleProofElements()
|
||||
if proofResult.isErr():
|
||||
error "Failed to fetch Merkle proof", error = proofResult.error
|
||||
else:
|
||||
g.merkleProofCache = proofResult.get()
|
||||
|
||||
let memberCount = cast[int64](nextFreeIndex)
|
||||
waku_rln_number_registered_memberships.set(float64(memberCount))
|
||||
except CatchableError:
|
||||
error "Fatal error in trackRootChanges", error = getCurrentExceptionMsg()
|
||||
let nextFreeIndex = (await g.fetchNextFreeIndex()).valueOr:
|
||||
error "Failed to fetch next free index", error = error
|
||||
return err("Failed to fetch next free index: " & error)
|
||||
|
||||
let memberCount = cast[int64](nextFreeIndex)
|
||||
waku_rln_number_registered_memberships.set(float64(memberCount))
|
||||
|
||||
method register*(
|
||||
g: OnchainGroupManager, rateCommitment: RateCommitment
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkInitialized(g)
|
||||
|
||||
try:
|
||||
let leaf = rateCommitment.toLeaf().get()
|
||||
@ -214,33 +187,40 @@ method register*(
|
||||
info "registering member via callback", rateCommitment = leaf, index = idx
|
||||
await g.registerCb.get()(@[Membership(rateCommitment: leaf, index: idx)])
|
||||
g.latestIndex.inc()
|
||||
except CatchableError:
|
||||
raise newException(ValueError, getCurrentExceptionMsg())
|
||||
except Exception as e:
|
||||
return err("Failed to call register callback: " & e.msg)
|
||||
|
||||
return ok()
|
||||
|
||||
method register*(
|
||||
g: OnchainGroupManager,
|
||||
identityCredential: IdentityCredential,
|
||||
userMessageLimit: UserMessageLimit,
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkInitialized(g)
|
||||
|
||||
let ethRpc = g.ethRpc.get()
|
||||
let wakuRlnContract = g.wakuRlnContract.get()
|
||||
|
||||
var gasPrice: int
|
||||
g.retryWrapper(gasPrice, "Failed to get gas price"):
|
||||
let fetchedGasPrice = uint64(await ethRpc.provider.eth_gasPrice())
|
||||
## Multiply by 2 to speed up the transaction
|
||||
## Check for overflow when casting to int
|
||||
if fetchedGasPrice > uint64(high(int) div 2):
|
||||
warn "Gas price overflow detected, capping at maximum int value",
|
||||
fetchedGasPrice = fetchedGasPrice, maxInt = high(int)
|
||||
high(int)
|
||||
else:
|
||||
let calculatedGasPrice = int(fetchedGasPrice) * 2
|
||||
debug "Gas price calculated",
|
||||
fetchedGasPrice = fetchedGasPrice, gasPrice = calculatedGasPrice
|
||||
calculatedGasPrice
|
||||
let gasPrice = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to get gas price",
|
||||
proc(): Future[int] {.async.} =
|
||||
let fetchedGasPrice = uint64(await ethRpc.provider.eth_gasPrice())
|
||||
if fetchedGasPrice > uint64(high(int) div 2):
|
||||
warn "Gas price overflow detected, capping at maximum int value",
|
||||
fetchedGasPrice = fetchedGasPrice, maxInt = high(int)
|
||||
return high(int)
|
||||
else:
|
||||
let calculatedGasPrice = int(fetchedGasPrice) * 2
|
||||
debug "Gas price calculated",
|
||||
fetchedGasPrice = fetchedGasPrice, gasPrice = calculatedGasPrice
|
||||
return calculatedGasPrice,
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to get gas price: " & error)
|
||||
|
||||
let idCommitmentHex = identityCredential.idCommitment.inHex()
|
||||
debug "identityCredential idCommitmentHex", idCommitment = idCommitmentHex
|
||||
let idCommitment = identityCredential.idCommitment.toUInt256()
|
||||
@ -249,27 +229,37 @@ method register*(
|
||||
idCommitment = idCommitment,
|
||||
userMessageLimit = userMessageLimit,
|
||||
idCommitmentsToErase = idCommitmentsToErase
|
||||
var txHash: TxHash
|
||||
g.retryWrapper(txHash, "Failed to register the member"):
|
||||
await wakuRlnContract
|
||||
.register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase)
|
||||
.send(gasPrice = gasPrice)
|
||||
let txHash = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to register the member",
|
||||
proc(): Future[TxHash] {.async.} =
|
||||
return await wakuRlnContract
|
||||
.register(idCommitment, userMessageLimit.stuint(32), idCommitmentsToErase)
|
||||
.send(gasPrice = gasPrice),
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to register member: " & error)
|
||||
|
||||
# wait for the transaction to be mined
|
||||
var tsReceipt: ReceiptObject
|
||||
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
|
||||
await ethRpc.getMinedTransactionReceipt(txHash)
|
||||
let tsReceipt = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to get the transaction receipt",
|
||||
proc(): Future[ReceiptObject] {.async.} =
|
||||
return await ethRpc.getMinedTransactionReceipt(txHash),
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to get transaction receipt: " & error)
|
||||
debug "registration transaction mined", txHash = txHash
|
||||
g.registrationTxHash = some(txHash)
|
||||
# the receipt topic holds the hash of signature of the raised events
|
||||
debug "ts receipt", receipt = tsReceipt[]
|
||||
|
||||
if tsReceipt.status.isNone():
|
||||
raise newException(ValueError, "Transaction failed: status is None")
|
||||
return err("Transaction failed: status is None")
|
||||
if tsReceipt.status.get() != 1.Quantity:
|
||||
raise newException(
|
||||
ValueError, "Transaction failed with status: " & $tsReceipt.status.get()
|
||||
)
|
||||
return err("Transaction failed with status: " & $tsReceipt.status.get())
|
||||
|
||||
## Search through all transaction logs to find the MembershipRegistered event
|
||||
let expectedEventSignature = cast[FixedBytes[32]](keccak.keccak256.digest(
|
||||
@ -283,9 +273,7 @@ method register*(
|
||||
break
|
||||
|
||||
if membershipRegisteredLog.isNone():
|
||||
raise newException(
|
||||
ValueError, "register: MembershipRegistered event not found in transaction logs"
|
||||
)
|
||||
return err("register: MembershipRegistered event not found in transaction logs")
|
||||
|
||||
let registrationLog = membershipRegisteredLog.get()
|
||||
|
||||
@ -309,20 +297,28 @@ method register*(
|
||||
|
||||
if g.registerCb.isSome():
|
||||
let member = Membership(rateCommitment: rateCommitment, index: g.latestIndex)
|
||||
await g.registerCb.get()(@[member])
|
||||
try:
|
||||
await g.registerCb.get()(@[member])
|
||||
except Exception as e:
|
||||
return err("Failed to call register callback: " & e.msg)
|
||||
g.latestIndex.inc()
|
||||
|
||||
return
|
||||
return ok()
|
||||
|
||||
method withdraw*(
|
||||
g: OnchainGroupManager, idCommitment: IDCommitment
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g) # TODO: after slashing is enabled on the contract
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
checkInitialized(g).isOkOr:
|
||||
return err(error)
|
||||
return ok()
|
||||
|
||||
method withdrawBatch*(
|
||||
g: OnchainGroupManager, idCommitments: seq[IDCommitment]
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
checkInitialized(g).isOkOr:
|
||||
return err(error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc getRootFromProofAndIndex(
|
||||
g: OnchainGroupManager, elements: seq[byte], bits: seq[byte]
|
||||
@ -354,7 +350,7 @@ method generateProof*(
|
||||
epoch: Epoch,
|
||||
messageId: MessageId,
|
||||
rlnIdentifier = DefaultRlnIdentifier,
|
||||
): GroupManagerResult[RateLimitProof] {.gcsafe, raises: [].} =
|
||||
): GroupManagerResult[RateLimitProof] {.gcsafe.} =
|
||||
## Generates an RLN proof using the cached Merkle proof and custom witness
|
||||
# Ensure identity credentials and membership index are set
|
||||
if g.idCredentials.isNone():
|
||||
@ -452,7 +448,7 @@ method generateProof*(
|
||||
|
||||
method verifyProof*(
|
||||
g: OnchainGroupManager, input: seq[byte], proof: RateLimitProof
|
||||
): GroupManagerResult[bool] {.gcsafe, raises: [].} =
|
||||
): GroupManagerResult[bool] {.gcsafe.} =
|
||||
## -- Verifies an RLN rate-limit proof against the set of valid Merkle roots --
|
||||
|
||||
var normalizedProof = proof
|
||||
@ -492,25 +488,31 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
|
||||
proc establishConnection(
|
||||
g: OnchainGroupManager
|
||||
): Future[GroupManagerResult[Web3]] {.async.} =
|
||||
var ethRpc: Web3
|
||||
let ethRpc = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to connect to the Ethereum client",
|
||||
proc(): Future[Web3] {.async.} =
|
||||
var innerEthRpc: Web3
|
||||
var connected = false
|
||||
for clientUrl in g.ethClientUrls:
|
||||
## We give a chance to the user to provide multiple clients
|
||||
## and we try to connect to each of them
|
||||
try:
|
||||
innerEthRpc = await newWeb3(clientUrl)
|
||||
connected = true
|
||||
break
|
||||
except CatchableError:
|
||||
error "failed connect Eth client", error = getCurrentExceptionMsg()
|
||||
|
||||
g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
|
||||
var innerEthRpc: Web3
|
||||
var connected = false
|
||||
for clientUrl in g.ethClientUrls:
|
||||
## We give a chance to the user to provide multiple clients
|
||||
## and we try to connect to each of them
|
||||
try:
|
||||
innerEthRpc = await newWeb3(clientUrl)
|
||||
connected = true
|
||||
break
|
||||
except CatchableError:
|
||||
error "failed connect Eth client", error = getCurrentExceptionMsg()
|
||||
## this exception is handled by the retrywrapper
|
||||
if not connected:
|
||||
raise newException(CatchableError, "all failed")
|
||||
|
||||
if not connected:
|
||||
raise newException(CatchableError, "all failed")
|
||||
|
||||
innerEthRpc
|
||||
return innerEthRpc,
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to establish Ethereum connection: " & error)
|
||||
|
||||
return ok(ethRpc)
|
||||
|
||||
@ -519,9 +521,15 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
|
||||
let ethRpc: Web3 = (await establishConnection(g)).valueOr:
|
||||
return err("failed to connect to Ethereum clients: " & $error)
|
||||
|
||||
var fetchedChainId: UInt256
|
||||
g.retryWrapper(fetchedChainId, "Failed to get the chain id"):
|
||||
await ethRpc.provider.eth_chainId()
|
||||
let fetchedChainId = (
|
||||
await retryWrapper(
|
||||
RetryStrategy.new(),
|
||||
"Failed to get the chain id",
|
||||
proc(): Future[UInt256] {.async.} =
|
||||
return await ethRpc.provider.eth_chainId(),
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to get chain id: " & error)
|
||||
|
||||
# Set the chain id
|
||||
if g.chainId == 0:
|
||||
@ -595,8 +603,10 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
|
||||
proc onDisconnect() {.async.} =
|
||||
error "Ethereum client disconnected"
|
||||
|
||||
var newEthRpc: Web3 = (await g.establishConnection()).valueOr:
|
||||
g.onFatalErrorAction("failed to connect to Ethereum clients onDisconnect")
|
||||
let newEthRpc: Web3 = (await g.establishConnection()).valueOr:
|
||||
error "Fatal: failed to reconnect to Ethereum clients after disconnect",
|
||||
error = error
|
||||
g.onFatalErrorAction("failed to reconnect to Ethereum clients: " & error)
|
||||
return
|
||||
|
||||
newEthRpc.ondisconnect = ethRpc.ondisconnect
|
||||
@ -616,12 +626,14 @@ method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} =
|
||||
g.initialized = false
|
||||
|
||||
method isReady*(g: OnchainGroupManager): Future[bool] {.async.} =
|
||||
initializedGuard(g)
|
||||
checkInitialized(g).isOkOr:
|
||||
return false
|
||||
|
||||
if g.ethRpc.isNone():
|
||||
error "Ethereum RPC client is not configured"
|
||||
return false
|
||||
|
||||
if g.wakuRlnContract.isNone():
|
||||
error "Waku RLN contract is not configured"
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
@ -1,36 +1,31 @@
|
||||
import ../../../common/error_handling
|
||||
import chronos
|
||||
import results
|
||||
|
||||
const
|
||||
DefaultRetryDelay* = 4000.millis
|
||||
DefaultRetryCount* = 15'u
|
||||
|
||||
type RetryStrategy* = object
|
||||
shouldRetry*: bool
|
||||
retryDelay*: Duration
|
||||
retryCount*: uint
|
||||
|
||||
proc new*(T: type RetryStrategy): RetryStrategy =
|
||||
return RetryStrategy(shouldRetry: true, retryDelay: 4000.millis, retryCount: 15)
|
||||
return RetryStrategy(retryDelay: DefaultRetryDelay, retryCount: DefaultRetryCount)
|
||||
|
||||
template retryWrapper*(
|
||||
res: auto,
|
||||
retryStrategy: RetryStrategy,
|
||||
errStr: string,
|
||||
errCallback: OnFatalErrorHandler,
|
||||
body: untyped,
|
||||
): auto =
|
||||
if errCallback == nil:
|
||||
raise newException(CatchableError, "Ensure that the errCallback is set")
|
||||
proc retryWrapper*[T](
|
||||
retryStrategy: RetryStrategy, errStr: string, body: proc(): Future[T] {.async.}
|
||||
): Future[Result[T, string]] {.async.} =
|
||||
var retryCount = retryStrategy.retryCount
|
||||
var shouldRetry = retryStrategy.shouldRetry
|
||||
var exceptionMessage = ""
|
||||
var lastError = ""
|
||||
|
||||
while shouldRetry and retryCount > 0:
|
||||
while retryCount > 0:
|
||||
try:
|
||||
res = body
|
||||
shouldRetry = false
|
||||
except:
|
||||
let value = await body()
|
||||
return ok(value)
|
||||
except CatchableError as e:
|
||||
retryCount -= 1
|
||||
exceptionMessage = getCurrentExceptionMsg()
|
||||
await sleepAsync(retryStrategy.retryDelay)
|
||||
if shouldRetry:
|
||||
errCallback(errStr & ": " & exceptionMessage)
|
||||
return
|
||||
lastError = e.msg
|
||||
if retryCount > 0:
|
||||
await sleepAsync(retryStrategy.retryDelay)
|
||||
|
||||
return err(errStr & ": " & lastError)
|
||||
|
||||
@ -68,7 +68,7 @@ type WakuRLNRelay* = ref object of RootObj
|
||||
onFatalErrorAction*: OnFatalErrorHandler
|
||||
nonceManager*: NonceManager
|
||||
epochMonitorFuture*: Future[void]
|
||||
rootChangesFuture*: Future[void]
|
||||
rootChangesFuture*: Future[Result[void, string]]
|
||||
brokerCtx*: BrokerContext
|
||||
|
||||
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
|
||||
@ -467,7 +467,7 @@ proc mount(
|
||||
|
||||
return ok(wakuRlnRelay)
|
||||
|
||||
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
|
||||
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async.} =
|
||||
## returns true if the rln-relay protocol is ready to relay messages
|
||||
## returns false otherwise
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user