mirror of https://github.com/waku-org/nwaku.git
chore(rln-relay): add isReady check (#1989)
* chore(rln-relay): add isReady check * fix(rln-relay): multiple parameters for checking if node is in sync * fix: set latesthead in newHeadCallback * fix: explicit rpc call * fix: unhandled exception
This commit is contained in:
parent
483f40c8f7
commit
5638bd06bb
|
@ -555,6 +555,31 @@ suite "Onchain group manager":
|
|||
manager.validRootBuffer.len() == 0
|
||||
manager.validRoots[credentialCount - 2] == expectedLastRoot
|
||||
|
||||
asyncTest "isReady should return false if ethRpc is none":
|
||||
var manager = await setup()
|
||||
await manager.init()
|
||||
|
||||
manager.ethRpc = none(Web3)
|
||||
|
||||
check:
|
||||
(await manager.isReady()) == false
|
||||
|
||||
asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
|
||||
var manager = await setup()
|
||||
await manager.init()
|
||||
|
||||
check:
|
||||
(await manager.isReady()) == false
|
||||
|
||||
asyncTest "isReady should return true if ethRpc is ready":
|
||||
var manager = await setup()
|
||||
await manager.init()
|
||||
# node can only be ready after group sync is done
|
||||
await manager.startGroupSync()
|
||||
|
||||
check:
|
||||
(await manager.isReady()) == true
|
||||
|
||||
|
||||
################################
|
||||
## Terminating/removing Ganache
|
||||
|
|
|
@ -732,8 +732,8 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message
|
|||
when defined(rln):
|
||||
proc mountRlnRelay*(node: WakuNode,
|
||||
rlnConf: WakuRlnConfig,
|
||||
spamHandler: Option[SpamHandler] = none(SpamHandler),
|
||||
registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)) {.async.} =
|
||||
spamHandler = none(SpamHandler),
|
||||
registrationHandler = none(RegistrationHandler)) {.async.} =
|
||||
info "mounting rln relay"
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
|
@ -903,3 +903,11 @@ proc stop*(node: WakuNode) {.async.} =
|
|||
await node.wakuRlnRelay.stop()
|
||||
|
||||
node.started = false
|
||||
|
||||
proc isReady*(node: WakuNode): Future[bool] {.async.} =
|
||||
when defined(rln):
|
||||
if node.wakuRlnRelay == nil:
|
||||
return false
|
||||
return await node.wakuRlnRelay.isReady()
|
||||
## TODO: add other protocol `isReady` checks
|
||||
return true
|
||||
|
|
|
@ -162,3 +162,6 @@ method generateProof*(g: GroupManager,
|
|||
if proofGenRes.isErr():
|
||||
return err("proof generation failed: " & $proofGenRes.error())
|
||||
return ok(proofGenRes.value())
|
||||
|
||||
method isReady*(g: GroupManager): Future[bool] {.base,gcsafe.} =
|
||||
raise newException(CatchableError, "isReady proc for " & $g.type & " is not implemented yet")
|
||||
|
|
|
@ -313,13 +313,21 @@ proc getAndHandleEvents(g: OnchainGroupManager,
|
|||
fromBlock: BlockNumber,
|
||||
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
|
||||
initializedGuard(g)
|
||||
proc getLatestBlockNumber(): BlockNumber =
|
||||
if toBlock.isSome():
|
||||
# if toBlock = 0, that implies the latest block
|
||||
# which is the case when we are syncing block-by-block
|
||||
# therefore, toBlock = fromBlock + 1
|
||||
# if toBlock != 0, then we are chunking blocks
|
||||
# therefore, toBlock = fromBlock + blockChunkSize (which is handled)
|
||||
return max(fromBlock + 1, toBlock.get())
|
||||
return fromBlock
|
||||
|
||||
let blockTable = await g.getBlockTable(fromBlock, toBlock)
|
||||
let blockTable = await g.getBlockTable(fromBlock, toBlock)
|
||||
await g.handleEvents(blockTable)
|
||||
await g.handleRemovedEvents(blockTable)
|
||||
|
||||
g.latestProcessedBlock = if toBlock.isSome(): toBlock.get()
|
||||
else: fromBlock
|
||||
g.latestProcessedBlock = getLatestBlockNumber()
|
||||
let metadataSetRes = g.setMetadata()
|
||||
if metadataSetRes.isErr():
|
||||
# this is not a fatal error, hence we don't raise an exception
|
||||
|
@ -473,7 +481,6 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
|
|||
let metadataGetRes = g.rlnInstance.getMetadata()
|
||||
if metadataGetRes.isErr():
|
||||
warn "could not initialize with persisted rln metadata"
|
||||
g.latestProcessedBlock = BlockNumber(0)
|
||||
else:
|
||||
let metadata = metadataGetRes.get()
|
||||
if metadata.chainId != uint64(g.chainId.get()):
|
||||
|
@ -500,6 +507,7 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
|
|||
raise newException(ValueError,
|
||||
"could not get the deployed block number: " & getCurrentExceptionMsg())
|
||||
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
|
||||
g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
|
||||
|
||||
ethRpc.ondisconnect = proc() =
|
||||
error "Ethereum client disconnected"
|
||||
|
@ -528,3 +536,34 @@ method stop*(g: OnchainGroupManager): Future[void] {.async.} =
|
|||
error "failed to flush to the tree db"
|
||||
|
||||
g.initialized = false
|
||||
|
||||
proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
|
||||
let ethRpc = g.ethRpc.get()
|
||||
|
||||
try:
|
||||
let syncing = await ethRpc.provider.eth_syncing()
|
||||
return syncing.getBool()
|
||||
except CatchableError:
|
||||
error "failed to get the syncing status", error = getCurrentExceptionMsg()
|
||||
return false
|
||||
|
||||
method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
|
||||
initializedGuard(g)
|
||||
|
||||
if g.ethRpc.isNone():
|
||||
return false
|
||||
|
||||
var currentBlock: BlockNumber
|
||||
try:
|
||||
currentBlock = cast[BlockNumber](await g.ethRpc
|
||||
.get()
|
||||
.provider
|
||||
.eth_blockNumber())
|
||||
except CatchableError:
|
||||
error "failed to get the current block number", error = getCurrentExceptionMsg()
|
||||
return false
|
||||
|
||||
if g.latestProcessedBlock < currentBlock:
|
||||
return false
|
||||
|
||||
return not (await g.isSyncing())
|
|
@ -109,3 +109,9 @@ method stop*(g: StaticGroupManager): Future[void] =
|
|||
var retFut = newFuture[void]("StaticGroupManager.stop")
|
||||
retFut.complete()
|
||||
return retFut
|
||||
|
||||
method isReady*(g: StaticGroupManager): Future[bool] {.gcsafe.} =
|
||||
initializedGuard(g)
|
||||
var retFut = newFuture[bool]("StaticGroupManager.isReady")
|
||||
retFut.complete(true)
|
||||
return retFut
|
||||
|
|
|
@ -394,6 +394,18 @@ proc mount(conf: WakuRlnConfig,
|
|||
|
||||
return WakuRLNRelay(groupManager: groupManager)
|
||||
|
||||
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async.} =
|
||||
## returns true if the rln-relay protocol is ready to relay messages
|
||||
## returns false otherwise
|
||||
|
||||
# could be nil during startup
|
||||
if rlnPeer.groupManager == nil:
|
||||
return false
|
||||
try:
|
||||
return await rlnPeer.groupManager.isReady()
|
||||
except CatchableError:
|
||||
error "could not check if the rln-relay protocol is ready", err = getCurrentExceptionMsg()
|
||||
return false
|
||||
|
||||
proc new*(T: type WakuRlnRelay,
|
||||
conf: WakuRlnConfig,
|
||||
|
|
Loading…
Reference in New Issue