mirror of https://github.com/waku-org/nwaku.git
fix(rln-relay): missed roots during sync (#2015)
This commit is contained in:
parent
5d976df9a6
commit
21604e6bfd
|
@ -193,6 +193,8 @@ suite "Onchain group manager":
|
|||
manager.initialized
|
||||
manager.rlnContractDeployedBlockNumber > 0
|
||||
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "should error on initialization when loaded metadata does not match":
|
||||
let manager = await setup()
|
||||
await manager.init()
|
||||
|
@ -220,12 +222,14 @@ suite "Onchain group manager":
|
|||
|
||||
await manager.init()
|
||||
await manager.startGroupSync()
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "startGroupSync: should guard against uninitialized state":
|
||||
let manager = await setup()
|
||||
|
||||
expect(ValueError):
|
||||
await manager.startGroupSync()
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "startGroupSync: should sync to the state of the group":
|
||||
let manager = await setup()
|
||||
|
@ -262,6 +266,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
merkleRootBefore != merkleRootAfter
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "startGroupSync: should fetch history correctly":
|
||||
let manager = await setup()
|
||||
|
@ -303,6 +308,7 @@ suite "Onchain group manager":
|
|||
check:
|
||||
merkleRootBefore != merkleRootAfter
|
||||
manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "register: should guard against uninitialized state":
|
||||
let manager = await setup()
|
||||
|
@ -310,6 +316,7 @@ suite "Onchain group manager":
|
|||
|
||||
expect(ValueError):
|
||||
await manager.register(dummyCommitment)
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "register: should register successfully":
|
||||
let manager = await setup()
|
||||
|
@ -329,6 +336,7 @@ suite "Onchain group manager":
|
|||
check:
|
||||
merkleRootAfter.inHex() != merkleRootBefore.inHex()
|
||||
manager.latestIndex == 1
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "register: callback is called":
|
||||
let manager = await setup()
|
||||
|
@ -354,6 +362,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
manager.rlnInstance.getMetadata().get().validRoots == manager.validRoots.toSeq()
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "withdraw: should guard against uninitialized state":
|
||||
let manager = await setup()
|
||||
|
@ -361,6 +370,7 @@ suite "Onchain group manager":
|
|||
|
||||
expect(ValueError):
|
||||
await manager.withdraw(idSecretHash)
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "validateRoot: should validate good root":
|
||||
let manager = await setup()
|
||||
|
@ -402,6 +412,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
validated
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "validateRoot: should reject bad root":
|
||||
let manager = await setup()
|
||||
|
@ -432,6 +443,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
validated == false
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "verifyProof: should verify valid proof":
|
||||
let manager = await setup()
|
||||
|
@ -474,6 +486,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
verifiedRes.get()
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "verifyProof: should reject invalid proof":
|
||||
let manager = await setup()
|
||||
|
@ -510,6 +523,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
verifiedRes.get() == false
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "backfillRootQueue: should backfill roots in event of chain reorg":
|
||||
let manager = await setup()
|
||||
|
@ -554,6 +568,7 @@ suite "Onchain group manager":
|
|||
manager.validRoots.len() == credentialCount - 1
|
||||
manager.validRootBuffer.len() == 0
|
||||
manager.validRoots[credentialCount - 2] == expectedLastRoot
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "isReady should return false if ethRpc is none":
|
||||
var manager = await setup()
|
||||
|
@ -563,6 +578,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
(await manager.isReady()) == false
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
|
||||
var manager = await setup()
|
||||
|
@ -570,6 +586,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
(await manager.isReady()) == false
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "isReady should return true if ethRpc is ready":
|
||||
var manager = await setup()
|
||||
|
@ -579,6 +596,7 @@ suite "Onchain group manager":
|
|||
|
||||
check:
|
||||
(await manager.isReady()) == true
|
||||
await manager.stop()
|
||||
|
||||
|
||||
################################
|
||||
|
|
|
@ -232,30 +232,20 @@ proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Member
|
|||
|
||||
proc getRawEvents(g: OnchainGroupManager,
|
||||
fromBlock: BlockNumber,
|
||||
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[JsonNode] {.async.} =
|
||||
toBlock: BlockNumber): Future[JsonNode] {.async.} =
|
||||
initializedGuard(g)
|
||||
|
||||
let ethRpc = g.ethRpc.get()
|
||||
let rlnContract = g.rlnContract.get()
|
||||
|
||||
var normalizedToBlock: BlockNumber
|
||||
if toBlock.isSome():
|
||||
var value = toBlock.get()
|
||||
if value == 0:
|
||||
# set to latest block
|
||||
value = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
normalizedToBlock = value
|
||||
else:
|
||||
normalizedToBlock = fromBlock
|
||||
|
||||
let events = await rlnContract.getJsonLogs(MemberRegistered,
|
||||
fromBlock = some(fromBlock.blockId()),
|
||||
toBlock = some(normalizedToBlock.blockId()))
|
||||
toBlock = some(toBlock.blockId()))
|
||||
return events
|
||||
|
||||
proc getBlockTable(g: OnchainGroupManager,
|
||||
fromBlock: BlockNumber,
|
||||
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
|
||||
fromBlock: BlockNumber,
|
||||
toBlock: BlockNumber): Future[BlockTable] {.async.} =
|
||||
initializedGuard(g)
|
||||
|
||||
var blockTable = default(BlockTable)
|
||||
|
@ -311,23 +301,14 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable): Future
|
|||
|
||||
proc getAndHandleEvents(g: OnchainGroupManager,
|
||||
fromBlock: BlockNumber,
|
||||
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
|
||||
toBlock: BlockNumber): Future[void] {.async.} =
|
||||
initializedGuard(g)
|
||||
proc getLatestBlockNumber(): BlockNumber =
|
||||
if toBlock.isSome():
|
||||
# if toBlock = 0, that implies the latest block
|
||||
# which is the case when we are syncing block-by-block
|
||||
# therefore, toBlock = fromBlock + 1
|
||||
# if toBlock != 0, then we are chunking blocks
|
||||
# therefore, toBlock = fromBlock + blockChunkSize (which is handled)
|
||||
return max(fromBlock + 1, toBlock.get())
|
||||
return fromBlock
|
||||
|
||||
let blockTable = await g.getBlockTable(fromBlock, toBlock)
|
||||
await g.handleEvents(blockTable)
|
||||
await g.handleRemovedEvents(blockTable)
|
||||
|
||||
g.latestProcessedBlock = getLatestBlockNumber()
|
||||
g.latestProcessedBlock = toBlock
|
||||
let metadataSetRes = g.setMetadata()
|
||||
if metadataSetRes.isErr():
|
||||
# this is not a fatal error, hence we don't raise an exception
|
||||
|
@ -337,11 +318,13 @@ proc getAndHandleEvents(g: OnchainGroupManager,
|
|||
|
||||
proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
|
||||
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
|
||||
let latestBlock = blockheader.number.uint
|
||||
let latestBlock = BlockNumber(blockheader.number)
|
||||
trace "block received", blockNumber = latestBlock
|
||||
# get logs from the last block
|
||||
try:
|
||||
asyncSpawn g.getAndHandleEvents(latestBlock)
|
||||
# inc by 1 to prevent double processing
|
||||
let fromBlock = g.latestProcessedBlock + 1
|
||||
asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock)
|
||||
except CatchableError:
|
||||
warn "failed to handle log: ", error=getCurrentExceptionMsg()
|
||||
return newHeadCallback
|
||||
|
@ -368,28 +351,23 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
|
|||
let blockChunkSize = 2_000
|
||||
|
||||
var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
|
||||
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock
|
||||
g.latestProcessedBlock + 1
|
||||
else:
|
||||
info "starting onchain sync from deployed block number", deployedBlockNumber = g.rlnContractDeployedBlockNumber
|
||||
g.rlnContractDeployedBlockNumber
|
||||
|
||||
let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
try:
|
||||
# we always want to sync from last processed block => latest
|
||||
if fromBlock == BlockNumber(0) or
|
||||
fromBlock + BlockNumber(blockChunkSize) < latestBlock:
|
||||
# chunk events
|
||||
while true:
|
||||
let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber())
|
||||
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
|
||||
info "chunking events", fromBlock = fromBlock, toBlock = toBlock
|
||||
await g.getAndHandleEvents(fromBlock, some(toBlock))
|
||||
fromBlock = toBlock + 1
|
||||
if fromBlock >= currentLatestBlock:
|
||||
break
|
||||
else:
|
||||
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
|
||||
# chunk events
|
||||
while true:
|
||||
let currentLatestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
if fromBlock >= currentLatestBlock:
|
||||
break
|
||||
|
||||
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
|
||||
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
|
||||
await g.getAndHandleEvents(fromBlock, toBlock)
|
||||
fromBlock = toBlock + 1
|
||||
|
||||
except CatchableError:
|
||||
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())
|
||||
|
||||
|
|
Loading…
Reference in New Issue