mirror of https://github.com/waku-org/nwaku.git
feat(rln-relay): resume onchain sync from persisted tree db (#1805)
* feat(rln-relay): resume onchain sync from persisted tree db * chore(rln-relay): bump zerokit
This commit is contained in:
parent
f8e270fbe3
commit
bbded9eea7
|
@ -552,7 +552,8 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
||||||
rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey,
|
rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey,
|
||||||
rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress,
|
rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress,
|
||||||
rlnRelayCredPath: conf.rlnRelayCredPath,
|
rlnRelayCredPath: conf.rlnRelayCredPath,
|
||||||
rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword
|
rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword,
|
||||||
|
rlnRelayTreePath: conf.rlnRelayTreePath
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 2793fe0e24f7b48813fe3b6e9a0e6e5ee4a4c8ce
|
Subproject commit d87077083f3d1b4dc02ac56f735b212ba9b033f4
|
|
@ -259,6 +259,13 @@ proc handleEvents(g: OnchainGroupManager,
|
||||||
raise newException(ValueError, "failed to insert members into the tree")
|
raise newException(ValueError, "failed to insert members into the tree")
|
||||||
trace "new members added to the Merkle tree", commitments=members.mapIt(it[0].idCommitment.inHex())
|
trace "new members added to the Merkle tree", commitments=members.mapIt(it[0].idCommitment.inHex())
|
||||||
g.latestProcessedBlock = some(blockNumber)
|
g.latestProcessedBlock = some(blockNumber)
|
||||||
|
let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata(
|
||||||
|
lastProcessedBlock: blockNumber))
|
||||||
|
if metadataSetRes.isErr():
|
||||||
|
# this is not a fatal error, hence we don't raise an exception
|
||||||
|
warn "failed to persist rln metadata", error=metadataSetRes.error()
|
||||||
|
else:
|
||||||
|
info "rln metadata persisted", lastProcessedBlock = blockNumber
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -306,11 +313,19 @@ proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} =
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
raise newException(ValueError, "failed to subscribe to block headers: " & getCurrentExceptionMsg())
|
raise newException(ValueError, "failed to subscribe to block headers: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
proc startOnchainSync(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNumber(0)): Future[void] {.async.} =
|
proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
|
||||||
initializedGuard(g)
|
initializedGuard(g)
|
||||||
|
|
||||||
|
let fromBlock = if g.latestProcessedBlock.isSome():
|
||||||
|
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get()
|
||||||
|
g.latestProcessedBlock.get()
|
||||||
|
else:
|
||||||
|
info "starting onchain sync from scratch"
|
||||||
|
BlockNumber(0)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await g.getAndHandleEvents(fromBlock, some(fromBlock))
|
# we always want to sync from last processed block => latest
|
||||||
|
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())
|
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
@ -445,12 +460,20 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
|
||||||
g.idCredentials = some(parsedCreds[g.keystoreIndex].identityCredential)
|
g.idCredentials = some(parsedCreds[g.keystoreIndex].identityCredential)
|
||||||
g.membershipIndex = some(parsedCreds[g.keystoreIndex].membershipGroups[g.membershipGroupIndex].treeIndex)
|
g.membershipIndex = some(parsedCreds[g.keystoreIndex].membershipGroups[g.membershipGroupIndex].treeIndex)
|
||||||
|
|
||||||
|
let metadataGetRes = g.rlnInstance.getMetadata()
|
||||||
|
if metadataGetRes.isErr():
|
||||||
|
warn "could not initialize with persisted rln metadata"
|
||||||
|
g.latestProcessedBlock = some(BlockNumber(0))
|
||||||
|
else:
|
||||||
|
let metadata = metadataGetRes.get()
|
||||||
|
g.latestProcessedBlock = some(metadata.lastProcessedBlock)
|
||||||
|
|
||||||
ethRpc.ondisconnect = proc() =
|
ethRpc.ondisconnect = proc() =
|
||||||
error "Ethereum client disconnected"
|
error "Ethereum client disconnected"
|
||||||
let fromBlock = g.latestProcessedBlock.get()
|
let fromBlock = g.latestProcessedBlock.get()
|
||||||
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
|
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
|
||||||
try:
|
try:
|
||||||
asyncSpawn g.startOnchainSync(fromBlock)
|
asyncSpawn g.startOnchainSync()
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
error "failed to restart group sync", error = getCurrentExceptionMsg()
|
error "failed to restart group sync", error = getCurrentExceptionMsg()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue