fix(rln-relay): graceful shutdown with non-zero exit code (#2429)

* fix(rln-relay): graceful shutdown with non-zero exit code

* fix: missed args

* fix: exception str

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* fix: remove old comment

---------

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
Aaryamann Challani 2024-02-15 16:55:08 +05:30 committed by GitHub
parent 1151d50f55
commit 22026b7e89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 99 additions and 66 deletions

View File

@ -460,8 +460,13 @@ proc setupProtocols(node: WakuNode,
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
if conf.rlnRelay:
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)
if conf.rlnRelay:
when defined(rln_v2):
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
@ -472,6 +477,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
onFatalErrorAction: onFatalErrorAction,
)
else:
let rlnConf = WakuRlnConfig(
@ -482,6 +488,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
onFatalErrorAction: onFatalErrorAction,
)
try:
@ -490,18 +497,12 @@ proc setupProtocols(node: WakuNode,
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
if conf.store:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)
# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections,
onErrAction)
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

View File

@ -62,7 +62,7 @@ proc configureStore(node: WakuNode,
Future[Result[void, string]] {.async.} =
## This snippet is extracted/duplicated from the app.nim file
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
# error "Unrecoverable error occurred", error = msg
@ -74,7 +74,7 @@ proc configureStore(node: WakuNode,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onErrAction)
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

View File

@ -0,0 +1,2 @@
type
OnFatalErrorHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}

View File

@ -979,7 +979,7 @@ proc mountRlnRelay*(node: WakuNode,
raise newException(CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay")
let rlnRelayRes = waitFor WakuRlnRelay.new(rlnConf,
registrationHandler)
registrationHandler)
if rlnRelayRes.isErr():
raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
let rlnRelay = rlnRelayRes.get()

View File

@ -9,6 +9,7 @@ import
chronos
import
../waku_core,
../common/error_handling,
./common
const DefaultPageSize*: uint = 25
@ -16,7 +17,6 @@ const DefaultPageSize*: uint = 25
type
ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)

View File

@ -12,6 +12,7 @@ import
../driver,
../../common/databases/dburl,
../../common/databases/db_sqlite,
../../common/error_handling,
./sqlite_driver,
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
./queue_driver
@ -29,13 +30,13 @@ proc new*(T: type ArchiveDriver,
vacuum: bool,
migrate: bool,
maxNumConn: int,
onErrAction: OnErrHandler):
onFatalErrorAction: OnFatalErrorHandler):
Result[T, string] =
## url - string that defines the database
## vacuum - if true, a cleanup operation will be applied to the database
## migrate - if true, the database schema will be updated
## maxNumConn - defines the maximum number of connections to handle simultaneously (Postgres)
## onErrAction - called if, e.g., the connection with db got lost
## onFatalErrorAction - called if, e.g., the connection with db got lost
let dbUrlValidationRes = dburl.validateDbUrl(url)
if dbUrlValidationRes.isErr():
@ -85,7 +86,7 @@ proc new*(T: type ArchiveDriver,
when defined(postgres):
let res = PostgresDriver.new(dbUrl = url,
maxConnections = maxNumConn,
onErrAction = onErrAction)
onFatalErrorAction = onFatalErrorAction)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

View File

@ -11,6 +11,7 @@ import
chronos,
chronicles
import
../../../common/error_handling,
../../../waku_core,
../../common,
../../driver,
@ -89,7 +90,7 @@ const DefaultMaxNumConns = 50
proc new*(T: type PostgresDriver,
dbUrl: string,
maxConnections = DefaultMaxNumConns,
onErrAction: OnErrHandler = nil):
onFatalErrorAction: OnFatalErrorHandler = nil):
ArchiveDriverResult[T] =
## Very simplistic split of max connections
@ -101,11 +102,11 @@ proc new*(T: type PostgresDriver,
let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr:
return err("error creating write conn pool PgAsyncPool")
if not isNil(onErrAction):
asyncSpawn checkConnectivity(readConnPool, onErrAction)
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(readConnPool, onFatalErrorAction)
if not isNil(onErrAction):
asyncSpawn checkConnectivity(writeConnPool, onErrAction)
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction)
return ok(PostgresDriver(writeConnPool: writeConnPool,
readConnPool: readConnPool))

View File

@ -8,7 +8,8 @@ import
stew/results
import
../../driver,
../../../common/databases/db_postgres
../../../common/databases/db_postgres,
../../../common/error_handling
## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();"
@ -17,7 +18,7 @@ const MaxNumTrials = 20
const TrialInterval = 1.seconds
proc checkConnectivity*(connPool: PgAsyncPool,
onErrAction: OnErrHandler) {.async.} =
onFatalErrorAction: OnFatalErrorHandler) {.async.} =
while true:
@ -29,7 +30,7 @@ proc checkConnectivity*(connPool: PgAsyncPool,
block errorBlock:
## Force close all the opened connections. No need to close gracefully.
(await connPool.resetConnPool()).isOkOr:
onErrAction("checkConnectivity resetConnPool error: " & error)
onFatalErrorAction("checkConnectivity resetConnPool error: " & error)
var numTrial = 0
while numTrial < MaxNumTrials:
@ -42,6 +43,6 @@ proc checkConnectivity*(connPool: PgAsyncPool,
numTrial.inc()
## The connection couldn't be resumed. Let's inform the upper layers.
onErrAction("postgres health check error: " & error)
onFatalErrorAction("postgres health check error: " & error)
await sleepAsync(CheckConnectivityInterval)

View File

@ -1,4 +1,5 @@
import
../../common/error_handling,
../protocol_types,
../protocol_metrics,
../constants,
@ -44,6 +45,7 @@ type
initialized*: bool
latestIndex*: MembershipIndex
validRoots*: Deque[MerkleNode]
onFatalErrorAction*: OnFatalErrorHandler
when defined(rln_v2):
userMessageLimit*: Option[UserMessageLimit]

View File

@ -113,6 +113,10 @@ template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(CatchableError, "OnchainGroupManager is not initialized")
template retryWrapper(g: OnchainGroupManager, res: auto, errStr: string, body: untyped): auto =
retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction):
body
proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
try:
@ -234,19 +238,19 @@ when defined(rln_v2):
let membershipFee = g.membershipFee.get()
var gasPrice: int
retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"):
g.retryWrapper(gasPrice, "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = identityCredential.idCommitment.toUInt256()
var txHash: TxHash
let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex, userMessageLimit = userMessageLimit
retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"):
g.retryWrapper(txHash, "Failed to register the member"):
await registryContract.register(storageIndex, idCommitment, u256(userMessageLimit)).send(gasPrice = gasPrice)
# wait for the transaction to be mined
var tsReceipt: ReceiptObject
retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"):
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash)
@ -282,19 +286,19 @@ else:
let membershipFee = g.membershipFee.get()
var gasPrice: int
retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"):
g.retryWrapper(gasPrice, "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = credentials.idCommitment.toUInt256()
var txHash: TxHash
let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex
retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"):
g.retryWrapper(txHash, "Failed to register the member"):
await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice)
# wait for the transaction to be mined
var tsReceipt: ReceiptObject
retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"):
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash)
@ -393,7 +397,7 @@ proc getRawEvents(g: OnchainGroupManager,
let rlnContract = g.rlnContract.get()
var events: JsonNode
retryWrapper(events, RetryStrategy.new(), "Failed to get the events"):
g.retryWrapper(events, "Failed to get the events"):
await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()),
toBlock = some(toBlock.blockId()))
@ -486,7 +490,7 @@ proc getAndHandleEvents(g: OnchainGroupManager,
return true
proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void =
proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) =
g.blockFetchingActive = false
proc runIntervalLoop() {.async, gcsafe.} =
@ -494,10 +498,13 @@ proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void =
while g.blockFetchingActive:
var retCb: bool
retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"):
g.retryWrapper(retCb, "Failed to run the interval block fetching loop"):
await cb()
await sleepAsync(interval)
# using asyncSpawn is OK here since
# we make use of the error handling provided by
# OnFatalErrorHandler
asyncSpawn runIntervalLoop()
@ -505,7 +512,7 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc =
let ethRpc = g.ethRpc.get()
proc wrappedCb(): Future[bool] {.async, gcsafe.} =
var latestBlock: BlockNumber
retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
g.retryWrapper(latestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if latestBlock <= g.latestProcessedBlock:
@ -514,7 +521,7 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc =
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"):
g.retryWrapper(handleBlockRes, "Failed to handle new block"):
await g.getAndHandleEvents(fromBlock, latestBlock)
return true
return wrappedCb
@ -548,7 +555,7 @@ proc startOnchainSync(g: OnchainGroupManager):
# chunk events
while true:
var currentLatestBlock: BlockNumber
retryWrapper(currentLatestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if fromBlock >= currentLatestBlock:
break
@ -556,7 +563,7 @@ proc startOnchainSync(g: OnchainGroupManager):
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"):
g.retryWrapper(handleBlockRes, "Failed to handle old blocks"):
await g.getAndHandleEvents(fromBlock, toBlock)
fromBlock = toBlock + 1
@ -588,11 +595,11 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
method init*(g: OnchainGroupManager): Future[void] {.async.} =
var ethRpc: Web3
# check if the Ethereum client is reachable
retryWrapper(ethRpc, RetryStrategy.new(), "Failed to connect to the Ethereum client"):
g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
await newWeb3(g.ethClientUrl)
# Set the chain id
var chainId: Quantity
retryWrapper(chainId, RetryStrategy.new(), "Failed to get the chain id"):
g.retryWrapper(chainId, "Failed to get the chain id"):
await ethRpc.provider.eth_chainId()
g.chainId = some(chainId)
@ -609,12 +616,12 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
# get the current storage index
var usingStorageIndex: Uint16
retryWrapper(usingStorageIndex, RetryStrategy.new(), "Failed to get the storage index"):
g.retryWrapper(usingStorageIndex, "Failed to get the storage index"):
await registryContract.usingStorageIndex().call()
g.usingStorageIndex = some(usingStorageIndex)
var rlnContractAddress: Address
retryWrapper(rlnContractAddress, RetryStrategy.new(), "Failed to get the rln contract address"):
g.retryWrapper(rlnContractAddress, "Failed to get the rln contract address"):
await registryContract.storages(usingStorageIndex).call()
let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress)
@ -670,12 +677,12 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
# check if the contract exists by calling a static function
var membershipFee: Uint256
retryWrapper(membershipFee, RetryStrategy.new(), "Failed to get the membership deposit"):
g.retryWrapper(membershipFee, "Failed to get the membership deposit"):
await rlnContract.MEMBERSHIP_DEPOSIT().call()
g.membershipFee = some(membershipFee)
var deployedBlockNumber: Uint256
retryWrapper(deployedBlockNumber, RetryStrategy.new(), "Failed to get the deployed block number"):
g.retryWrapper(deployedBlockNumber, "Failed to get the deployed block number"):
await rlnContract.deployedBlockNumber().call()
debug "using rln storage", deployedBlockNumber, rlnContractAddress
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
@ -686,15 +693,16 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
var newEthRpc: Web3
retryWrapper(newEthRpc, RetryStrategy.new(), "Failed to reconnect with the Ethereum client"):
g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"):
await newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc)
try:
asyncSpawn g.startOnchainSync()
except CatchableError:
error "failed to restart group sync", error = getCurrentExceptionMsg()
await g.startOnchainSync()
except CatchableError, Exception:
g.onFatalErrorAction("failed to restart group sync" & ": " & getCurrentExceptionMsg())
ethRpc.ondisconnect = proc() =
asyncSpawn onDisconnect()
@ -719,7 +727,7 @@ proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
let ethRpc = g.ethRpc.get()
var syncing: JsonNode
retryWrapper(syncing, RetryStrategy.new(), "Failed to get the syncing status"):
g.retryWrapper(syncing, "Failed to get the syncing status"):
await ethRpc.provider.eth_syncing()
return syncing.getBool()
@ -731,7 +739,7 @@ method isReady*(g: OnchainGroupManager):
return false
var currentBlock: BlockNumber
retryWrapper(currentBlock, RetryStrategy.new(), "Failed to get the current block number"):
g.retryWrapper(currentBlock, "Failed to get the current block number"):
cast[BlockNumber](await g.ethRpc
.get()
.provider

View File

@ -1,6 +1,7 @@
import
../../../common/error_handling
import
chronos
type RetryStrategy* = object
shouldRetry*: bool
@ -18,6 +19,7 @@ proc new*(T: type RetryStrategy): RetryStrategy =
template retryWrapper*(res: auto,
retryStrategy: RetryStrategy,
errStr: string,
errCallback: OnFatalErrorHandler = nil,
body: untyped): auto =
var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry
@ -32,4 +34,8 @@ template retryWrapper*(res: auto,
exceptionMessage = getCurrentExceptionMsg()
await sleepAsync(retryStrategy.retryDelay)
if shouldRetry:
raise newException(CatchableError, errStr & ": " & exceptionMessage)
if errCallback == nil:
raise newException(CatchableError, errStr & " errCallback == nil: " & exceptionMessage)
else:
errCallback(errStr & ": " & exceptionMessage)
return

View File

@ -25,6 +25,7 @@ when defined(rln_v2):
import ./nonce_manager
import
../common/error_handling,
../waku_relay, # for WakuRelayHandler
../waku_core,
../waku_keystore,
@ -33,16 +34,18 @@ import
logScope:
topics = "waku rln_relay"
type WakuRlnConfig* = object
rlnRelayDynamic*: bool
rlnRelayCredIndex*: Option[uint]
rlnRelayEthContractAddress*: string
rlnRelayEthClientAddress*: string
rlnRelayCredPath*: string
rlnRelayCredPassword*: string
rlnRelayTreePath*: string
when defined(rln_v2):
rlnRelayUserMessageLimit*: uint64
type
WakuRlnConfig* = object
rlnRelayDynamic*: bool
rlnRelayCredIndex*: Option[uint]
rlnRelayEthContractAddress*: string
rlnRelayEthClientAddress*: string
rlnRelayCredPath*: string
rlnRelayCredPassword*: string
rlnRelayTreePath*: string
onFatalErrorAction*: OnFatalErrorHandler
when defined(rln_v2):
rlnRelayUserMessageLimit*: uint64
proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[(
seq[RawMembershipCredentials], string
@ -84,10 +87,11 @@ type WakuRLNRelay* = ref object of RootObj
nullifierLog*: OrderedTable[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
groupManager*: GroupManager
onFatalErrorAction*: OnFatalErrorHandler
when defined(rln_v2):
nonceManager: NonceManager
method stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
## stops the rln-relay protocol
## Throws an error if it cannot stop the rln-relay protocol
@ -370,6 +374,7 @@ proc mount(conf: WakuRlnConfig,
): Future[WakuRlnRelay] {.async: (raises: [Exception]).} =
var
groupManager: GroupManager
wakuRlnRelay: WakuRLNRelay
# create an RLN instance
let rlnInstanceRes = createRLNInstance(tree_path = conf.rlnRelayTreePath)
if rlnInstanceRes.isErr():
@ -383,7 +388,8 @@ proc mount(conf: WakuRlnConfig,
groupManager = StaticGroupManager(groupSize: StaticGroupSize,
groupKeys: parsedGroupKeysRes.get(),
membershipIndex: conf.rlnRelayCredIndex,
rlnInstance: rlnInstance)
rlnInstance: rlnInstance,
onFatalErrorAction: conf.onFatalErrorAction)
# we don't persist credentials in static mode since they exist in ./constants.nim
else:
# dynamic setup
@ -398,7 +404,9 @@ proc mount(conf: WakuRlnConfig,
registrationHandler: registrationHandler,
keystorePath: rlnRelayCredPath,
keystorePassword: rlnRelayCredPassword,
membershipIndex: conf.rlnRelayCredIndex)
membershipIndex: conf.rlnRelayCredIndex,
onFatalErrorAction: conf.onFatalErrorAction)
# Initialize the groupManager
await groupManager.init()
# Start the group sync
@ -406,9 +414,12 @@ proc mount(conf: WakuRlnConfig,
when defined(rln_v2):
return WakuRLNRelay(groupManager: groupManager,
nonceManager: NonceManager.init(conf.rlnRelayUserMessageLimit))
nonceManager: NonceManager.init(conf.rlnRelayUserMessageLimit),
onFatalErrorAction: conf.onFatalErrorAction)
else:
return WakuRLNRelay(groupManager: groupManager)
return WakuRLNRelay(groupManager: groupManager,
onFatalErrorAction: conf.onFatalErrorAction)
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
## returns true if the rln-relay protocol is ready to relay messages