fix obtaining deposits after connection loss (#3943)

* fix obtaining deposits after connection loss

When an error occurs during Eth1 deposits import, the already imported
blocks are kept while the connection to the EL is re-established.
However, the corresponding merkleizer is not persisted, leading to any
future deposits no longer being properly imported. This is quite common
when syncing a fresh Nimbus instance against an already-synced Geth EL.
Fixed by persisting the head merkleizer together with the blocks.
This commit is contained in:
Etan Kissling 2022-08-09 23:32:34 +02:00 committed by GitHub
parent 06a5c67e62
commit 4ef621f926
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 20 deletions

View File

@ -20,8 +20,7 @@ import
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers],
./filepath
spec/[eth2_ssz_serialization, helpers]
logScope: topics = "lcdata"

View File

@ -94,6 +94,9 @@ type
blocksByHash: Table[BlockHash, Eth1Block]
headMerkleizer: DepositsMerkleizer
## Merkleizer state after applying all `blocks`
hasConsensusViolation: bool
## The local chain contradicts the observed consensus on the network
@ -257,6 +260,9 @@ template depositChainBlocks*(m: Eth1Monitor): Deque[Eth1Block] =
template finalizedDepositsMerkleizer(m: Eth1Monitor): auto =
m.depositsChain.finalizedDepositsMerkleizer
template headMerkleizer(m: Eth1Monitor): auto =
m.depositsChain.headMerkleizer
proc fixupWeb3Urls*(web3Url: var string) =
var normalizedUrl = toLowerAscii(web3Url)
if not (normalizedUrl.startsWith("https://") or
@ -645,7 +651,7 @@ when hasDepositRootChecks:
const
contractCallTimeout = 60.seconds
func fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block):
proc fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block):
Future[DepositContractDataStatus] {.async.} =
let
depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number)
@ -668,8 +674,8 @@ when hasDepositRootChecks:
result = DepositRootUnavailable
try:
let fetchedCount = bytes_to_uint64(array[8, byte](
awaitOrRaiseOnTimeout(rawCount, contractCallTimeout)))
let fetchedCount = bytes_to_uint64(
awaitOrRaiseOnTimeout(rawCount, contractCallTimeout).toArray)
if blk.voteData.deposit_count == 0:
blk.voteData.deposit_count = fetchedCount
elif blk.voteData.deposit_count != fetchedCount:
@ -1031,6 +1037,7 @@ proc safeCancel(fut: var Future[void]) =
func clear(chain: var Eth1Chain) =
chain.blocks.clear()
chain.blocksByHash.clear()
chain.headMerkleizer.reset()
chain.hasConsensusViolation = false
proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} =
@ -1114,10 +1121,9 @@ func earliestBlockOfInterest(m: Eth1Monitor): Eth1BlockNumber =
m.latestEth1BlockNumber - (2 * m.cfg.ETH1_FOLLOW_DISTANCE) - votedBlocksSafetyMargin
proc syncBlockRange(m: Eth1Monitor,
merkleizer: ref DepositsMerkleizer,
fromBlock, toBlock,
fullSyncFromBlock: Eth1BlockNumber) {.gcsafe, async.} =
doAssert m.depositsChain.blocks.len > 0 and m.dataProvider != nil
doAssert m.depositsChain.blocks.len > 0
var currentBlock = fromBlock
while currentBlock <= toBlock:
@ -1169,12 +1175,6 @@ proc syncBlockRange(m: Eth1Monitor,
for i in 0 ..< blocksWithDeposits.len:
let blk = blocksWithDeposits[i]
for deposit in blk.deposits:
merkleizer[].addChunk hash_tree_root(deposit).data
blk.voteData.deposit_count = merkleizer[].getChunkCount
blk.voteData.deposit_root = merkleizer[].getDepositsRoot
if blk.number > fullSyncFromBlock:
let lastBlock = m.depositsChain.blocks.peekLast
for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number:
@ -1186,6 +1186,11 @@ proc syncBlockRange(m: Eth1Monitor,
lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits))
eth1_synced_head.set blockWithoutDeposits.number.toGaugeValue
for deposit in blk.deposits:
m.headMerkleizer.addChunk hash_tree_root(deposit).data
blk.voteData.deposit_count = m.headMerkleizer.getChunkCount
blk.voteData.deposit_root = m.headMerkleizer.getDepositsRoot
m.depositsChain.addBlock blk
eth1_synced_head.set blk.number.toGaugeValue
@ -1224,7 +1229,7 @@ proc syncBlockRange(m: Eth1Monitor,
let depositContractState = DepositContractSnapshot(
eth1Block: blocksWithDeposits[^1].voteData.block_hash,
depositContractState: merkleizer[].toDepositContractState)
depositContractState: m.headMerkleizer.toDepositContractState)
m.depositsChain.db.putEth2FinalizedTo depositContractState
@ -1388,11 +1393,11 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
subscriptionErrorHandler)
let shouldProcessDeposits = not m.depositContractAddress.isZeroMemory
var scratchMerkleizer: ref DepositsMerkleizer
var eth1SyncedTo: Eth1BlockNumber
if shouldProcessDeposits and m.depositsChain.blocks.len == 0:
let startBlock = awaitWithRetries(
m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash))
m.dataProvider.getBlockByHash(
m.depositsChain.finalizedBlockHash.asBlockHash))
m.depositsChain.addBlock Eth1Block(
number: Eth1BlockNumber startBlock.number,
@ -1408,7 +1413,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
eth1_finalized_deposits.set(
m.depositsChain.finalizedDepositsMerkleizer.getChunkCount.toGaugeValue)
scratchMerkleizer = newClone(copy m.finalizedDepositsMerkleizer)
m.depositsChain.headMerkleizer = copy m.finalizedDepositsMerkleizer
debug "Starting Eth1 syncing", `from` = shortLog(m.depositsChain.blocks[0])
@ -1476,7 +1481,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
m.terminalBlockHash = some terminalBlockCandidate.hash
m.terminalBlockNumber = some terminalBlockCandidate.number
if shouldProcessDeposits and scratchMerkleizer != nil:
if shouldProcessDeposits:
if m.latestEth1BlockNumber <= m.cfg.ETH1_FOLLOW_DISTANCE:
continue
@ -1485,8 +1490,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
continue
let earliestBlockOfInterest = m.earliestBlockOfInterest()
await m.syncBlockRange(scratchMerkleizer,
eth1SyncedTo + 1,
await m.syncBlockRange(eth1SyncedTo + 1,
targetBlock,
earliestBlockOfInterest)
eth1SyncedTo = targetBlock