mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
fix(rln-relay): sync from deployed block number (#1955)
* fix(rln-relay): sync from deployed block number * fix(rln-relay): remove option usage for ints * fix: unnecessary decl
This commit is contained in:
parent
04df6d1fa7
commit
bd3be21929
@ -191,6 +191,7 @@ suite "Onchain group manager":
|
|||||||
manager.rlnContract.isSome()
|
manager.rlnContract.isSome()
|
||||||
manager.membershipFee.isSome()
|
manager.membershipFee.isSome()
|
||||||
manager.initialized
|
manager.initialized
|
||||||
|
manager.rlnContractDeployedBlockNumber > 0
|
||||||
|
|
||||||
asyncTest "should error on initialization when loaded metadata does not match":
|
asyncTest "should error on initialization when loaded metadata does not match":
|
||||||
let manager = await setup()
|
let manager = await setup()
|
||||||
|
|||||||
@ -39,6 +39,7 @@ contract(RlnStorage):
|
|||||||
proc MEMBERSHIP_DEPOSIT(): Uint256 {.pure.}
|
proc MEMBERSHIP_DEPOSIT(): Uint256 {.pure.}
|
||||||
proc members(idCommitment: Uint256): Uint256 {.view.}
|
proc members(idCommitment: Uint256): Uint256 {.view.}
|
||||||
proc idCommitmentIndex(): Uint256 {.view.}
|
proc idCommitmentIndex(): Uint256 {.view.}
|
||||||
|
proc deployedBlockNumber(): Uint256 {.view.}
|
||||||
|
|
||||||
type
|
type
|
||||||
RegistryContractWithSender = Sender[WakuRlnRegistry]
|
RegistryContractWithSender = Sender[WakuRlnRegistry]
|
||||||
@ -49,10 +50,11 @@ type
|
|||||||
ethContractAddress*: string
|
ethContractAddress*: string
|
||||||
ethRpc*: Option[Web3]
|
ethRpc*: Option[Web3]
|
||||||
rlnContract*: Option[RlnContractWithSender]
|
rlnContract*: Option[RlnContractWithSender]
|
||||||
|
rlnContractDeployedBlockNumber*: BlockNumber
|
||||||
registryContract*: Option[RegistryContractWithSender]
|
registryContract*: Option[RegistryContractWithSender]
|
||||||
usingStorageIndex: Option[Uint16]
|
usingStorageIndex: Option[Uint16]
|
||||||
membershipFee*: Option[Uint256]
|
membershipFee*: Option[Uint256]
|
||||||
latestProcessedBlock*: Option[BlockNumber]
|
latestProcessedBlock*: BlockNumber
|
||||||
registrationTxHash*: Option[TxHash]
|
registrationTxHash*: Option[TxHash]
|
||||||
chainId*: Option[Quantity]
|
chainId*: Option[Quantity]
|
||||||
keystorePath*: Option[string]
|
keystorePath*: Option[string]
|
||||||
@ -68,20 +70,15 @@ type
|
|||||||
const DefaultKeyStorePath* = "rlnKeystore.json"
|
const DefaultKeyStorePath* = "rlnKeystore.json"
|
||||||
const DefaultKeyStorePassword* = "password"
|
const DefaultKeyStorePassword* = "password"
|
||||||
|
|
||||||
const DecayFactor* = 1.2
|
|
||||||
const DefaultChunkSize* = 1000
|
|
||||||
|
|
||||||
template initializedGuard(g: OnchainGroupManager): untyped =
|
template initializedGuard(g: OnchainGroupManager): untyped =
|
||||||
if not g.initialized:
|
if not g.initialized:
|
||||||
raise newException(ValueError, "OnchainGroupManager is not initialized")
|
raise newException(ValueError, "OnchainGroupManager is not initialized")
|
||||||
|
|
||||||
|
|
||||||
proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
|
proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
|
||||||
if g.latestProcessedBlock.isNone():
|
|
||||||
return err("latest processed block is not set")
|
|
||||||
try:
|
try:
|
||||||
let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata(
|
let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata(
|
||||||
lastProcessedBlock: g.latestProcessedBlock.get(),
|
lastProcessedBlock: g.latestProcessedBlock,
|
||||||
chainId: uint64(g.chainId.get()),
|
chainId: uint64(g.chainId.get()),
|
||||||
contractAddress: g.ethContractAddress,
|
contractAddress: g.ethContractAddress,
|
||||||
validRoots: g.validRoots.toSeq()))
|
validRoots: g.validRoots.toSeq()))
|
||||||
@ -314,15 +311,14 @@ proc getAndHandleEvents(g: OnchainGroupManager,
|
|||||||
await g.handleEvents(blockTable)
|
await g.handleEvents(blockTable)
|
||||||
await g.handleRemovedEvents(blockTable)
|
await g.handleRemovedEvents(blockTable)
|
||||||
|
|
||||||
let latestProcessedBlock = if toBlock.isSome(): toBlock.get()
|
g.latestProcessedBlock = if toBlock.isSome(): toBlock.get()
|
||||||
else: fromBlock
|
else: fromBlock
|
||||||
g.latestProcessedBlock = some(latestProcessedBlock)
|
|
||||||
let metadataSetRes = g.setMetadata()
|
let metadataSetRes = g.setMetadata()
|
||||||
if metadataSetRes.isErr():
|
if metadataSetRes.isErr():
|
||||||
# this is not a fatal error, hence we don't raise an exception
|
# this is not a fatal error, hence we don't raise an exception
|
||||||
warn "failed to persist rln metadata", error=metadataSetRes.error()
|
warn "failed to persist rln metadata", error=metadataSetRes.error()
|
||||||
else:
|
else:
|
||||||
trace "rln metadata persisted", blockNumber = latestProcessedBlock
|
trace "rln metadata persisted", blockNumber = g.latestProcessedBlock
|
||||||
|
|
||||||
proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
|
proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
|
||||||
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
|
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
|
||||||
@ -353,16 +349,15 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
|
|
||||||
let ethRpc = g.ethRpc.get()
|
let ethRpc = g.ethRpc.get()
|
||||||
|
|
||||||
# the block chunk size decays exponentially with the number of blocks
|
# static block chunk size
|
||||||
# the minimum chunk size is 100
|
let blockChunkSize = 2_000
|
||||||
var blockChunkSize = 1_000_000
|
|
||||||
|
|
||||||
var fromBlock = if g.latestProcessedBlock.isSome():
|
var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
|
||||||
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get()
|
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock
|
||||||
g.latestProcessedBlock.get() + 1
|
g.latestProcessedBlock + 1
|
||||||
else:
|
else:
|
||||||
info "starting onchain sync from scratch"
|
info "starting onchain sync from deployed block number", deployedBlockNumber = g.rlnContractDeployedBlockNumber
|
||||||
BlockNumber(0)
|
g.rlnContractDeployedBlockNumber
|
||||||
|
|
||||||
let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||||
try:
|
try:
|
||||||
@ -378,8 +373,6 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
fromBlock = toBlock + 1
|
fromBlock = toBlock + 1
|
||||||
if fromBlock >= currentLatestBlock:
|
if fromBlock >= currentLatestBlock:
|
||||||
break
|
break
|
||||||
let newChunkSize = float(blockChunkSize) / DecayFactor
|
|
||||||
blockChunkSize = max(int(newChunkSize), DefaultChunkSize)
|
|
||||||
else:
|
else:
|
||||||
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
|
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
@ -457,7 +450,7 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
let metadataGetRes = g.rlnInstance.getMetadata()
|
let metadataGetRes = g.rlnInstance.getMetadata()
|
||||||
if metadataGetRes.isErr():
|
if metadataGetRes.isErr():
|
||||||
warn "could not initialize with persisted rln metadata"
|
warn "could not initialize with persisted rln metadata"
|
||||||
g.latestProcessedBlock = some(BlockNumber(0))
|
g.latestProcessedBlock = BlockNumber(0)
|
||||||
else:
|
else:
|
||||||
let metadata = metadataGetRes.get()
|
let metadata = metadataGetRes.get()
|
||||||
if metadata.chainId != uint64(g.chainId.get()):
|
if metadata.chainId != uint64(g.chainId.get()):
|
||||||
@ -465,7 +458,7 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
|
|
||||||
if metadata.contractAddress != g.ethContractAddress.toLower():
|
if metadata.contractAddress != g.ethContractAddress.toLower():
|
||||||
raise newException(ValueError, "persisted data: contract address mismatch")
|
raise newException(ValueError, "persisted data: contract address mismatch")
|
||||||
g.latestProcessedBlock = some(metadata.lastProcessedBlock)
|
g.latestProcessedBlock = metadata.lastProcessedBlock
|
||||||
g.validRoots = metadata.validRoots.toDeque()
|
g.validRoots = metadata.validRoots.toDeque()
|
||||||
|
|
||||||
# check if the contract exists by calling a static function
|
# check if the contract exists by calling a static function
|
||||||
@ -477,10 +470,25 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
"could not get the membership deposit: " & getCurrentExceptionMsg())
|
"could not get the membership deposit: " & getCurrentExceptionMsg())
|
||||||
g.membershipFee = some(membershipFee)
|
g.membershipFee = some(membershipFee)
|
||||||
|
|
||||||
|
var deployedBlockNumber: Uint256
|
||||||
|
try:
|
||||||
|
deployedBlockNumber = await rlnContract.deployedBlockNumber().call()
|
||||||
|
except CatchableError:
|
||||||
|
raise newException(ValueError,
|
||||||
|
"could not get the deployed block number: " & getCurrentExceptionMsg())
|
||||||
|
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
|
||||||
|
|
||||||
ethRpc.ondisconnect = proc() =
|
ethRpc.ondisconnect = proc() =
|
||||||
error "Ethereum client disconnected"
|
error "Ethereum client disconnected"
|
||||||
let fromBlock = g.latestProcessedBlock.get()
|
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
|
||||||
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
|
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
|
||||||
|
try:
|
||||||
|
let newEthRpc = waitFor newWeb3(g.ethClientUrl)
|
||||||
|
newEthRpc.ondisconnect = ethRpc.ondisconnect
|
||||||
|
g.ethRpc = some(newEthRpc)
|
||||||
|
except CatchableError:
|
||||||
|
error "failed to reconnect with the Ethereum client", error = getCurrentExceptionMsg()
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
asyncSpawn g.startOnchainSync()
|
asyncSpawn g.startOnchainSync()
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user