fix(rln-relay): graceful shutdown with non-zero exit code (#2429)

* fix(rln-relay): graceful shutdown with non-zero exit code

* fix: missed args

* fix: exception str

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* fix: remove old comment

---------

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
Aaryamann Challani 2024-02-15 16:55:08 +05:30 committed by GitHub
parent 8eeeb92c66
commit 1563ea8188
12 changed files with 99 additions and 66 deletions

View File

@ -460,8 +460,13 @@ proc setupProtocols(node: WakuNode,
except CatchableError: except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
if conf.rlnRelay: var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)
if conf.rlnRelay:
when defined(rln_v2): when defined(rln_v2):
let rlnConf = WakuRlnConfig( let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic, rlnRelayDynamic: conf.rlnRelayDynamic,
@ -472,6 +477,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPassword: conf.rlnRelayCredPassword, rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath, rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit, rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
onFatalErrorAction: onFatalErrorAction,
) )
else: else:
let rlnConf = WakuRlnConfig( let rlnConf = WakuRlnConfig(
@ -482,6 +488,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPath: conf.rlnRelayCredPath, rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword, rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath, rlnRelayTreePath: conf.rlnRelayTreePath,
onFatalErrorAction: onFatalErrorAction,
) )
try: try:
@ -490,18 +497,12 @@ proc setupProtocols(node: WakuNode,
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
if conf.store: if conf.store:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)
# Archive setup # Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl, let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum, conf.storeMessageDbVacuum,
conf.storeMessageDbMigration, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, conf.storeMaxNumDbConnections,
onErrAction) onFatalErrorAction)
if archiveDriverRes.isErr(): if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error) return err("failed to setup archive driver: " & archiveDriverRes.error)

View File

@ -62,7 +62,7 @@ proc configureStore(node: WakuNode,
Future[Result[void, string]] {.async.} = Future[Result[void, string]] {.async.} =
## This snippet is extracted/duplicated from the app.nim file ## This snippet is extracted/duplicated from the app.nim file
var onErrAction = proc(msg: string) {.gcsafe, closure.} = var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run. ## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered. ## e.g. the connection with the database is lost and not recovered.
# error "Unrecoverable error occurred", error = msg # error "Unrecoverable error occurred", error = msg
@ -74,7 +74,7 @@ proc configureStore(node: WakuNode,
storeVacuum, storeVacuum,
storeDbMigration, storeDbMigration,
storeMaxNumDbConnections, storeMaxNumDbConnections,
onErrAction) onFatalErrorAction)
if archiveDriverRes.isErr(): if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error) return err("failed to setup archive driver: " & archiveDriverRes.error)

View File

@ -0,0 +1,2 @@
type
OnFatalErrorHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}

View File

@ -979,7 +979,7 @@ proc mountRlnRelay*(node: WakuNode,
raise newException(CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay") raise newException(CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay")
let rlnRelayRes = waitFor WakuRlnRelay.new(rlnConf, let rlnRelayRes = waitFor WakuRlnRelay.new(rlnConf,
registrationHandler) registrationHandler)
if rlnRelayRes.isErr(): if rlnRelayRes.isErr():
raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error) raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
let rlnRelay = rlnRelayRes.get() let rlnRelay = rlnRelayRes.get()

View File

@ -9,6 +9,7 @@ import
chronos chronos
import import
../waku_core, ../waku_core,
../common/error_handling,
./common ./common
const DefaultPageSize*: uint = 25 const DefaultPageSize*: uint = 25
@ -16,7 +17,6 @@ const DefaultPageSize*: uint = 25
type type
ArchiveDriverResult*[T] = Result[T, string] ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj ArchiveDriver* = ref object of RootObj
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp) type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)

View File

@ -12,6 +12,7 @@ import
../driver, ../driver,
../../common/databases/dburl, ../../common/databases/dburl,
../../common/databases/db_sqlite, ../../common/databases/db_sqlite,
../../common/error_handling,
./sqlite_driver, ./sqlite_driver,
./sqlite_driver/migrations as archive_driver_sqlite_migrations, ./sqlite_driver/migrations as archive_driver_sqlite_migrations,
./queue_driver ./queue_driver
@ -29,13 +30,13 @@ proc new*(T: type ArchiveDriver,
vacuum: bool, vacuum: bool,
migrate: bool, migrate: bool,
maxNumConn: int, maxNumConn: int,
onErrAction: OnErrHandler): onFatalErrorAction: OnFatalErrorHandler):
Result[T, string] = Result[T, string] =
## url - string that defines the database ## url - string that defines the database
## vacuum - if true, a cleanup operation will be applied to the database ## vacuum - if true, a cleanup operation will be applied to the database
## migrate - if true, the database schema will be updated ## migrate - if true, the database schema will be updated
## maxNumConn - defines the maximum number of connections to handle simultaneously (Postgres) ## maxNumConn - defines the maximum number of connections to handle simultaneously (Postgres)
## onErrAction - called if, e.g., the connection with db got lost ## onFatalErrorAction - called if, e.g., the connection with db got lost
let dbUrlValidationRes = dburl.validateDbUrl(url) let dbUrlValidationRes = dburl.validateDbUrl(url)
if dbUrlValidationRes.isErr(): if dbUrlValidationRes.isErr():
@ -85,7 +86,7 @@ proc new*(T: type ArchiveDriver,
when defined(postgres): when defined(postgres):
let res = PostgresDriver.new(dbUrl = url, let res = PostgresDriver.new(dbUrl = url,
maxConnections = maxNumConn, maxConnections = maxNumConn,
onErrAction = onErrAction) onFatalErrorAction = onFatalErrorAction)
if res.isErr(): if res.isErr():
return err("failed to init postgres archive driver: " & res.error) return err("failed to init postgres archive driver: " & res.error)

View File

@ -11,6 +11,7 @@ import
chronos, chronos,
chronicles chronicles
import import
../../../common/error_handling,
../../../waku_core, ../../../waku_core,
../../common, ../../common,
../../driver, ../../driver,
@ -89,7 +90,7 @@ const DefaultMaxNumConns = 50
proc new*(T: type PostgresDriver, proc new*(T: type PostgresDriver,
dbUrl: string, dbUrl: string,
maxConnections = DefaultMaxNumConns, maxConnections = DefaultMaxNumConns,
onErrAction: OnErrHandler = nil): onFatalErrorAction: OnFatalErrorHandler = nil):
ArchiveDriverResult[T] = ArchiveDriverResult[T] =
## Very simplistic split of max connections ## Very simplistic split of max connections
@ -101,11 +102,11 @@ proc new*(T: type PostgresDriver,
let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr: let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr:
return err("error creating write conn pool PgAsyncPool") return err("error creating write conn pool PgAsyncPool")
if not isNil(onErrAction): if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(readConnPool, onErrAction) asyncSpawn checkConnectivity(readConnPool, onFatalErrorAction)
if not isNil(onErrAction): if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(writeConnPool, onErrAction) asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction)
return ok(PostgresDriver(writeConnPool: writeConnPool, return ok(PostgresDriver(writeConnPool: writeConnPool,
readConnPool: readConnPool)) readConnPool: readConnPool))

View File

@ -8,7 +8,8 @@ import
stew/results stew/results
import import
../../driver, ../../driver,
../../../common/databases/db_postgres ../../../common/databases/db_postgres,
../../../common/error_handling
## Simple query to validate that the postgres is working and attending requests ## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();" const HealthCheckQuery = "SELECT version();"
@ -17,7 +18,7 @@ const MaxNumTrials = 20
const TrialInterval = 1.seconds const TrialInterval = 1.seconds
proc checkConnectivity*(connPool: PgAsyncPool, proc checkConnectivity*(connPool: PgAsyncPool,
onErrAction: OnErrHandler) {.async.} = onFatalErrorAction: OnFatalErrorHandler) {.async.} =
while true: while true:
@ -29,7 +30,7 @@ proc checkConnectivity*(connPool: PgAsyncPool,
block errorBlock: block errorBlock:
## Force close all the opened connections. No need to close gracefully. ## Force close all the opened connections. No need to close gracefully.
(await connPool.resetConnPool()).isOkOr: (await connPool.resetConnPool()).isOkOr:
onErrAction("checkConnectivity resetConnPool error: " & error) onFatalErrorAction("checkConnectivity resetConnPool error: " & error)
var numTrial = 0 var numTrial = 0
while numTrial < MaxNumTrials: while numTrial < MaxNumTrials:
@ -42,6 +43,6 @@ proc checkConnectivity*(connPool: PgAsyncPool,
numTrial.inc() numTrial.inc()
## The connection couldn't be resumed. Let's inform the upper layers. ## The connection couldn't be resumed. Let's inform the upper layers.
onErrAction("postgres health check error: " & error) onFatalErrorAction("postgres health check error: " & error)
await sleepAsync(CheckConnectivityInterval) await sleepAsync(CheckConnectivityInterval)

View File

@ -1,4 +1,5 @@
import import
../../common/error_handling,
../protocol_types, ../protocol_types,
../protocol_metrics, ../protocol_metrics,
../constants, ../constants,
@ -44,6 +45,7 @@ type
initialized*: bool initialized*: bool
latestIndex*: MembershipIndex latestIndex*: MembershipIndex
validRoots*: Deque[MerkleNode] validRoots*: Deque[MerkleNode]
onFatalErrorAction*: OnFatalErrorHandler
when defined(rln_v2): when defined(rln_v2):
userMessageLimit*: Option[UserMessageLimit] userMessageLimit*: Option[UserMessageLimit]

View File

@ -113,6 +113,10 @@ template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized: if not g.initialized:
raise newException(CatchableError, "OnchainGroupManager is not initialized") raise newException(CatchableError, "OnchainGroupManager is not initialized")
template retryWrapper(g: OnchainGroupManager, res: auto, errStr: string, body: untyped): auto =
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
body
proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] = proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
try: try:
@ -234,19 +238,19 @@ when defined(rln_v2):
let membershipFee = g.membershipFee.get() let membershipFee = g.membershipFee.get()
var gasPrice: int var gasPrice: int
retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"): g.retryWrapper(gasPrice, "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2 int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = identityCredential.idCommitment.toUInt256() let idCommitment = identityCredential.idCommitment.toUInt256()
var txHash: TxHash var txHash: TxHash
let storageIndex = g.usingStorageIndex.get() let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex, userMessageLimit = userMessageLimit debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex, userMessageLimit = userMessageLimit
retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"): g.retryWrapper(txHash, "Failed to register the member"):
await registryContract.register(storageIndex, idCommitment, u256(userMessageLimit)).send(gasPrice = gasPrice) await registryContract.register(storageIndex, idCommitment, u256(userMessageLimit)).send(gasPrice = gasPrice)
# wait for the transaction to be mined # wait for the transaction to be mined
var tsReceipt: ReceiptObject var tsReceipt: ReceiptObject
retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"): g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash) await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash) g.registrationTxHash = some(txHash)
@ -282,19 +286,19 @@ else:
let membershipFee = g.membershipFee.get() let membershipFee = g.membershipFee.get()
var gasPrice: int var gasPrice: int
retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"): g.retryWrapper(gasPrice, "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2 int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = credentials.idCommitment.toUInt256() let idCommitment = credentials.idCommitment.toUInt256()
var txHash: TxHash var txHash: TxHash
let storageIndex = g.usingStorageIndex.get() let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex
retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"): g.retryWrapper(txHash, "Failed to register the member"):
await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice) await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice)
# wait for the transaction to be mined # wait for the transaction to be mined
var tsReceipt: ReceiptObject var tsReceipt: ReceiptObject
retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"): g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash) await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash) g.registrationTxHash = some(txHash)
@ -393,7 +397,7 @@ proc getRawEvents(g: OnchainGroupManager,
let rlnContract = g.rlnContract.get() let rlnContract = g.rlnContract.get()
var events: JsonNode var events: JsonNode
retryWrapper(events, RetryStrategy.new(), "Failed to get the events"): g.retryWrapper(events, "Failed to get the events"):
await rlnContract.getJsonLogs(MemberRegistered, await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()), fromBlock = some(fromBlock.blockId()),
toBlock = some(toBlock.blockId())) toBlock = some(toBlock.blockId()))
@ -486,7 +490,7 @@ proc getAndHandleEvents(g: OnchainGroupManager,
return true return true
proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void = proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) =
g.blockFetchingActive = false g.blockFetchingActive = false
proc runIntervalLoop() {.async, gcsafe.} = proc runIntervalLoop() {.async, gcsafe.} =
@ -494,10 +498,13 @@ proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void =
while g.blockFetchingActive: while g.blockFetchingActive:
var retCb: bool var retCb: bool
retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"): g.retryWrapper(retCb, "Failed to run the interval block fetching loop"):
await cb() await cb()
await sleepAsync(interval) await sleepAsync(interval)
# using asyncSpawn is OK here since
# we make use of the error handling provided by
# OnFatalErrorHandler
asyncSpawn runIntervalLoop() asyncSpawn runIntervalLoop()
@ -505,7 +512,7 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc =
let ethRpc = g.ethRpc.get() let ethRpc = g.ethRpc.get()
proc wrappedCb(): Future[bool] {.async, gcsafe.} = proc wrappedCb(): Future[bool] {.async, gcsafe.} =
var latestBlock: BlockNumber var latestBlock: BlockNumber
retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"): g.retryWrapper(latestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if latestBlock <= g.latestProcessedBlock: if latestBlock <= g.latestProcessedBlock:
@ -514,7 +521,7 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc =
# inc by 1 to prevent double processing # inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1 let fromBlock = g.latestProcessedBlock + 1
var handleBlockRes: bool var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"): g.retryWrapper(handleBlockRes, "Failed to handle new block"):
await g.getAndHandleEvents(fromBlock, latestBlock) await g.getAndHandleEvents(fromBlock, latestBlock)
return true return true
return wrappedCb return wrappedCb
@ -548,7 +555,7 @@ proc startOnchainSync(g: OnchainGroupManager):
# chunk events # chunk events
while true: while true:
var currentLatestBlock: BlockNumber var currentLatestBlock: BlockNumber
retryWrapper(currentLatestBlock, RetryStrategy.new(), "Failed to get the latest block number"): g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if fromBlock >= currentLatestBlock: if fromBlock >= currentLatestBlock:
break break
@ -556,7 +563,7 @@ proc startOnchainSync(g: OnchainGroupManager):
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
var handleBlockRes: bool var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"): g.retryWrapper(handleBlockRes, "Failed to handle old blocks"):
await g.getAndHandleEvents(fromBlock, toBlock) await g.getAndHandleEvents(fromBlock, toBlock)
fromBlock = toBlock + 1 fromBlock = toBlock + 1
@ -588,11 +595,11 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
method init*(g: OnchainGroupManager): Future[void] {.async.} = method init*(g: OnchainGroupManager): Future[void] {.async.} =
var ethRpc: Web3 var ethRpc: Web3
# check if the Ethereum client is reachable # check if the Ethereum client is reachable
retryWrapper(ethRpc, RetryStrategy.new(), "Failed to connect to the Ethereum client"): g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
await newWeb3(g.ethClientUrl) await newWeb3(g.ethClientUrl)
# Set the chain id # Set the chain id
var chainId: Quantity var chainId: Quantity
retryWrapper(chainId, RetryStrategy.new(), "Failed to get the chain id"): g.retryWrapper(chainId, "Failed to get the chain id"):
await ethRpc.provider.eth_chainId() await ethRpc.provider.eth_chainId()
g.chainId = some(chainId) g.chainId = some(chainId)
@ -609,12 +616,12 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
# get the current storage index # get the current storage index
var usingStorageIndex: Uint16 var usingStorageIndex: Uint16
retryWrapper(usingStorageIndex, RetryStrategy.new(), "Failed to get the storage index"): g.retryWrapper(usingStorageIndex, "Failed to get the storage index"):
await registryContract.usingStorageIndex().call() await registryContract.usingStorageIndex().call()
g.usingStorageIndex = some(usingStorageIndex) g.usingStorageIndex = some(usingStorageIndex)
var rlnContractAddress: Address var rlnContractAddress: Address
retryWrapper(rlnContractAddress, RetryStrategy.new(), "Failed to get the rln contract address"): g.retryWrapper(rlnContractAddress, "Failed to get the rln contract address"):
await registryContract.storages(usingStorageIndex).call() await registryContract.storages(usingStorageIndex).call()
let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress) let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress)
@ -670,12 +677,12 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
# check if the contract exists by calling a static function # check if the contract exists by calling a static function
var membershipFee: Uint256 var membershipFee: Uint256
retryWrapper(membershipFee, RetryStrategy.new(), "Failed to get the membership deposit"): g.retryWrapper(membershipFee, "Failed to get the membership deposit"):
await rlnContract.MEMBERSHIP_DEPOSIT().call() await rlnContract.MEMBERSHIP_DEPOSIT().call()
g.membershipFee = some(membershipFee) g.membershipFee = some(membershipFee)
var deployedBlockNumber: Uint256 var deployedBlockNumber: Uint256
retryWrapper(deployedBlockNumber, RetryStrategy.new(), "Failed to get the deployed block number"): g.retryWrapper(deployedBlockNumber, "Failed to get the deployed block number"):
await rlnContract.deployedBlockNumber().call() await rlnContract.deployedBlockNumber().call()
debug "using rln storage", deployedBlockNumber, rlnContractAddress debug "using rln storage", deployedBlockNumber, rlnContractAddress
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
@ -686,15 +693,16 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
var newEthRpc: Web3 var newEthRpc: Web3
retryWrapper(newEthRpc, RetryStrategy.new(), "Failed to reconnect with the Ethereum client"): g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"):
await newWeb3(g.ethClientUrl) await newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc) g.ethRpc = some(newEthRpc)
try: try:
asyncSpawn g.startOnchainSync() await g.startOnchainSync()
except CatchableError: except CatchableError, Exception:
error "failed to restart group sync", error = getCurrentExceptionMsg() g.onFatalErrorAction("failed to restart group sync" & ": " & getCurrentExceptionMsg())
ethRpc.ondisconnect = proc() = ethRpc.ondisconnect = proc() =
asyncSpawn onDisconnect() asyncSpawn onDisconnect()
@ -719,7 +727,7 @@ proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
let ethRpc = g.ethRpc.get() let ethRpc = g.ethRpc.get()
var syncing: JsonNode var syncing: JsonNode
retryWrapper(syncing, RetryStrategy.new(), "Failed to get the syncing status"): g.retryWrapper(syncing, "Failed to get the syncing status"):
await ethRpc.provider.eth_syncing() await ethRpc.provider.eth_syncing()
return syncing.getBool() return syncing.getBool()
@ -731,7 +739,7 @@ method isReady*(g: OnchainGroupManager):
return false return false
var currentBlock: BlockNumber var currentBlock: BlockNumber
retryWrapper(currentBlock, RetryStrategy.new(), "Failed to get the current block number"): g.retryWrapper(currentBlock, "Failed to get the current block number"):
cast[BlockNumber](await g.ethRpc cast[BlockNumber](await g.ethRpc
.get() .get()
.provider .provider

View File

@ -1,6 +1,7 @@
import
../../../common/error_handling
import import
chronos chronos
type RetryStrategy* = object type RetryStrategy* = object
shouldRetry*: bool shouldRetry*: bool
@ -18,6 +19,7 @@ proc new*(T: type RetryStrategy): RetryStrategy =
template retryWrapper*(res: auto, template retryWrapper*(res: auto,
retryStrategy: RetryStrategy, retryStrategy: RetryStrategy,
errStr: string, errStr: string,
errCallback: OnFatalErrorHandler = nil,
body: untyped): auto = body: untyped): auto =
var retryCount = retryStrategy.retryCount var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry var shouldRetry = retryStrategy.shouldRetry
@ -32,4 +34,8 @@ template retryWrapper*(res: auto,
exceptionMessage = getCurrentExceptionMsg() exceptionMessage = getCurrentExceptionMsg()
await sleepAsync(retryStrategy.retryDelay) await sleepAsync(retryStrategy.retryDelay)
if shouldRetry: if shouldRetry:
raise newException(CatchableError, errStr & ": " & exceptionMessage) if errCallback == nil:
raise newException(CatchableError, errStr & " errCallback == nil: " & exceptionMessage)
else:
errCallback(errStr & ": " & exceptionMessage)
return

View File

@ -25,6 +25,7 @@ when defined(rln_v2):
import ./nonce_manager import ./nonce_manager
import import
../common/error_handling,
../waku_relay, # for WakuRelayHandler ../waku_relay, # for WakuRelayHandler
../waku_core, ../waku_core,
../waku_keystore, ../waku_keystore,
@ -33,16 +34,18 @@ import
logScope: logScope:
topics = "waku rln_relay" topics = "waku rln_relay"
type WakuRlnConfig* = object type
rlnRelayDynamic*: bool WakuRlnConfig* = object
rlnRelayCredIndex*: Option[uint] rlnRelayDynamic*: bool
rlnRelayEthContractAddress*: string rlnRelayCredIndex*: Option[uint]
rlnRelayEthClientAddress*: string rlnRelayEthContractAddress*: string
rlnRelayCredPath*: string rlnRelayEthClientAddress*: string
rlnRelayCredPassword*: string rlnRelayCredPath*: string
rlnRelayTreePath*: string rlnRelayCredPassword*: string
when defined(rln_v2): rlnRelayTreePath*: string
rlnRelayUserMessageLimit*: uint64 onFatalErrorAction*: OnFatalErrorHandler
when defined(rln_v2):
rlnRelayUserMessageLimit*: uint64
proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[( proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[(
seq[RawMembershipCredentials], string seq[RawMembershipCredentials], string
@ -84,10 +87,11 @@ type WakuRLNRelay* = ref object of RootObj
nullifierLog*: OrderedTable[Epoch, seq[ProofMetadata]] nullifierLog*: OrderedTable[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message lastEpoch*: Epoch # the epoch of the last published rln message
groupManager*: GroupManager groupManager*: GroupManager
onFatalErrorAction*: OnFatalErrorHandler
when defined(rln_v2): when defined(rln_v2):
nonceManager: NonceManager nonceManager: NonceManager
method stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} = proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
## stops the rln-relay protocol ## stops the rln-relay protocol
## Throws an error if it cannot stop the rln-relay protocol ## Throws an error if it cannot stop the rln-relay protocol
@ -370,6 +374,7 @@ proc mount(conf: WakuRlnConfig,
): Future[WakuRlnRelay] {.async: (raises: [Exception]).} = ): Future[WakuRlnRelay] {.async: (raises: [Exception]).} =
var var
groupManager: GroupManager groupManager: GroupManager
wakuRlnRelay: WakuRLNRelay
# create an RLN instance # create an RLN instance
let rlnInstanceRes = createRLNInstance(tree_path = conf.rlnRelayTreePath) let rlnInstanceRes = createRLNInstance(tree_path = conf.rlnRelayTreePath)
if rlnInstanceRes.isErr(): if rlnInstanceRes.isErr():
@ -383,7 +388,8 @@ proc mount(conf: WakuRlnConfig,
groupManager = StaticGroupManager(groupSize: StaticGroupSize, groupManager = StaticGroupManager(groupSize: StaticGroupSize,
groupKeys: parsedGroupKeysRes.get(), groupKeys: parsedGroupKeysRes.get(),
membershipIndex: conf.rlnRelayCredIndex, membershipIndex: conf.rlnRelayCredIndex,
rlnInstance: rlnInstance) rlnInstance: rlnInstance,
onFatalErrorAction: conf.onFatalErrorAction)
# we don't persist credentials in static mode since they exist in ./constants.nim # we don't persist credentials in static mode since they exist in ./constants.nim
else: else:
# dynamic setup # dynamic setup
@ -398,7 +404,9 @@ proc mount(conf: WakuRlnConfig,
registrationHandler: registrationHandler, registrationHandler: registrationHandler,
keystorePath: rlnRelayCredPath, keystorePath: rlnRelayCredPath,
keystorePassword: rlnRelayCredPassword, keystorePassword: rlnRelayCredPassword,
membershipIndex: conf.rlnRelayCredIndex) membershipIndex: conf.rlnRelayCredIndex,
onFatalErrorAction: conf.onFatalErrorAction)
# Initialize the groupManager # Initialize the groupManager
await groupManager.init() await groupManager.init()
# Start the group sync # Start the group sync
@ -406,9 +414,12 @@ proc mount(conf: WakuRlnConfig,
when defined(rln_v2): when defined(rln_v2):
return WakuRLNRelay(groupManager: groupManager, return WakuRLNRelay(groupManager: groupManager,
nonceManager: NonceManager.init(conf.rlnRelayUserMessageLimit)) nonceManager: NonceManager.init(conf.rlnRelayUserMessageLimit),
onFatalErrorAction: conf.onFatalErrorAction)
else: else:
return WakuRLNRelay(groupManager: groupManager) return WakuRLNRelay(groupManager: groupManager,
onFatalErrorAction: conf.onFatalErrorAction)
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} = proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
## returns true if the rln-relay protocol is ready to relay messages ## returns true if the rln-relay protocol is ready to relay messages