Minor post-merge cleanups (#3945)

https://github.com/status-im/nimbus-eth2/pull/3944

The use of nested `awaitWithRetries` calls would have
resulted in an unexpected number of retries (3x3).
We now use regular `await` in outer layer to avoid the problem.

https://github.com/status-im/nimbus-eth2/pull/3943

The new code has an invariant that the `headMerkleizer` field in
the `Eth1Chain` is always kept in sync with the blocks stored in
the chain.

This invariant is now enforced better by doing the necessary merkleizer updates
in the `Eth1Chain.addBlock` function, in the `Eth1Chain.init` function and in the
`Eth1Chain.reset` function.
This commit is contained in:
zah 2022-08-10 15:31:10 +03:00 committed by GitHub
parent ede83b1805
commit d64c17ffc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 91 deletions

View File

@ -70,10 +70,19 @@ type
GenesisStateRef = ref phase0.BeaconState
Eth1Block* = ref object
hash*: Eth2Digest
number*: Eth1BlockNumber
timestamp*: Eth1BlockTimestamp
## Basic properties of the block
## These must be initialized in the constructor
deposits*: seq[DepositData]
voteData*: Eth1Data
## Deposits inside this particular block
depositRoot*: Eth2Digest
depositCount*: uint64
## Global deposits count and hash tree root of the entire sequence
## These are computed when the block is added to the chain (see `addBlock`)
when hasGenesisDetection:
activeValidatorsCount*: uint64
@ -222,16 +231,16 @@ when hasGenesisDetection:
proc createGenesisState(m: Eth1Monitor, eth1Block: Eth1Block): GenesisStateRef =
notice "Generating genesis state",
blockNum = eth1Block.number,
blockHash = eth1Block.voteData.block_hash,
blockHash = eth1Block.hash,
blockTimestamp = eth1Block.timestamp,
totalDeposits = eth1Block.voteData.deposit_count,
totalDeposits = eth1Block.depositCount,
activeValidators = eth1Block.activeValidatorsCount
var deposits = m.allGenesisDepositsUpTo(eth1Block.voteData.deposit_count)
var deposits = m.allGenesisDepositsUpTo(eth1Block.depositCount)
result = newClone(initialize_beacon_state_from_eth1(
m.cfg,
eth1Block.voteData.block_hash,
eth1Block.hash,
eth1Block.timestamp.uint64,
deposits, {}))
@ -355,7 +364,7 @@ func asEngineExecutionPayload*(executionPayload: bellatrix.ExecutionPayload):
func shortLog*(b: Eth1Block): string =
try:
&"{b.number}:{shortLog b.voteData.block_hash}(deposits = {b.voteData.deposit_count})"
&"{b.number}:{shortLog b.hash}(deposits = {b.depositCount})"
except ValueError as exc: raiseAssert exc.msg
template findBlock(chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block =
@ -364,12 +373,9 @@ template findBlock(chain: Eth1Chain, eth1Data: Eth1Data): Eth1Block =
func makeSuccessorWithoutDeposits(existingBlock: Eth1Block,
successor: BlockObject): Eth1Block =
result = Eth1Block(
hash: successor.hash.asEth2Digest,
number: Eth1BlockNumber successor.number,
timestamp: Eth1BlockTimestamp successor.timestamp,
voteData: Eth1Data(
block_hash: successor.hash.asEth2Digest,
deposit_count: existingBlock.voteData.deposit_count,
deposit_root: existingBlock.voteData.deposit_root))
timestamp: Eth1BlockTimestamp successor.timestamp)
when hasGenesisDetection:
result.activeValidatorsCount = existingBlock.activeValidatorsCount
@ -382,14 +388,30 @@ func latestCandidateBlock(chain: Eth1Chain, periodStart: uint64): Eth1Block =
proc popFirst(chain: var Eth1Chain) =
let removed = chain.blocks.popFirst
chain.blocksByHash.del removed.voteData.block_hash.asBlockHash
chain.blocksByHash.del removed.hash.asBlockHash
eth1_chain_len.set chain.blocks.len.int64
func getDepositsRoot*(m: DepositsMerkleizer): Eth2Digest =
mixInLength(m.getFinalHash, int m.totalChunks)
proc addBlock*(chain: var Eth1Chain, newBlock: Eth1Block) =
for deposit in newBlock.deposits:
chain.headMerkleizer.addChunk hash_tree_root(deposit).data
newBlock.depositCount = chain.headMerkleizer.getChunkCount
newBlock.depositRoot = chain.headMerkleizer.getDepositsRoot
chain.blocks.addLast newBlock
chain.blocksByHash[newBlock.voteData.block_hash.asBlockHash] = newBlock
chain.blocksByHash[newBlock.hash.asBlockHash] = newBlock
eth1_chain_len.set chain.blocks.len.int64
func toVoteData(blk: Eth1Block): Eth1Data =
Eth1Data(
deposit_root: blk.depositRoot,
deposit_count: blk.depositCount,
block_hash: blk.hash)
func hash*(x: Eth1Data): Hash =
hash(x.block_hash)
@ -580,6 +602,12 @@ template readJsonField(j: JsonNode, fieldName: string, ValueType: type): untyped
template init[N: static int](T: type DynamicBytes[N, N]): T =
T newSeq[byte](N)
proc fetchTimestampWithRetries(blkParam: Eth1Block, p: Web3DataProviderRef) {.async.} =
let blk = blkParam
let web3block = awaitWithRetries(
p.getBlockByHash(blk.hash.asBlockHash))
blk.timestamp = Eth1BlockTimestamp web3block.timestamp
func depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] {.
raises: [Defect, CatchableError].} =
if depositsList.kind != JArray:
@ -596,8 +624,12 @@ func depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] {.
if lastEth1Block == nil or lastEth1Block.number != blockNumber:
lastEth1Block = Eth1Block(
number: blockNumber,
voteData: Eth1Data(block_hash: blockHash.asEth2Digest))
hash: blockHash.asEth2Digest,
number: blockNumber
# The `timestamp` is set in `syncBlockRange` immediately
# after calling this function, because we don't want to
# make this function `async`
)
result.add lastEth1Block
@ -628,11 +660,6 @@ func depositEventsToBlocks(depositsList: JsonNode): seq[Eth1Block] {.
amount: bytes_to_uint64(amount.toArray),
signature: ValidatorSig.init(signature.toArray))
proc fetchTimestamp(p: Web3DataProviderRef, blk: Eth1Block) {.async.} =
let web3block = awaitWithRetries(
p.getBlockByHash(blk.voteData.block_hash.asBlockHash))
blk.timestamp = Eth1BlockTimestamp web3block.timestamp
type
DepositContractDataStatus = enum
Fetched
@ -660,10 +687,10 @@ when hasDepositRootChecks:
try:
let fetchedRoot = asEth2Digest(
awaitOrRaiseOnTimeout(depositRoot, contractCallTimeout))
if blk.voteData.deposit_root.isZero:
blk.voteData.deposit_root = fetchedRoot
if blk.depositRoot.isZero:
blk.depositRoot = fetchedRoot
result = Fetched
elif blk.voteData.deposit_root == fetchedRoot:
elif blk.depositRoot == fetchedRoot:
result = VerifiedCorrect
else:
result = DepositRootIncorrect
@ -676,9 +703,9 @@ when hasDepositRootChecks:
try:
let fetchedCount = bytes_to_uint64(
awaitOrRaiseOnTimeout(rawCount, contractCallTimeout).toArray)
if blk.voteData.deposit_count == 0:
blk.voteData.deposit_count = fetchedCount
elif blk.voteData.deposit_count != fetchedCount:
if blk.depositCount == 0:
blk.depositCount = fetchedCount
elif blk.depositCount != fetchedCount:
result = DepositCountIncorrect
except CatchableError as err:
debug "Failed to fetch deposits count",
@ -694,23 +721,13 @@ proc onBlockHeaders(p: Web3DataProviderRef,
p.blockHeadersSubscription = awaitWithRetries(
p.web3.subscribeForBlockHeaders(blockHeaderHandler, errorHandler))
func getDepositsRoot*(m: DepositsMerkleizer): Eth2Digest =
mixInLength(m.getFinalHash, int m.totalChunks)
func eth1DataFromMerkleizer(eth1Block: Eth2Digest,
merkleizer: DepositsMerkleizer): Eth1Data =
Eth1Data(
block_hash: eth1Block,
deposit_count: merkleizer.getChunkCount,
deposit_root: merkleizer.getDepositsRoot)
proc pruneOldBlocks(chain: var Eth1Chain, depositIndex: uint64) =
let initialChunks = chain.finalizedDepositsMerkleizer.getChunkCount
var lastBlock: Eth1Block
while chain.blocks.len > 0:
let blk = chain.blocks.peekFirst
if blk.voteData.deposit_count >= depositIndex:
if blk.depositCount >= depositIndex:
break
else:
for deposit in blk.deposits:
@ -719,17 +736,17 @@ proc pruneOldBlocks(chain: var Eth1Chain, depositIndex: uint64) =
lastBlock = blk
if chain.finalizedDepositsMerkleizer.getChunkCount > initialChunks:
chain.finalizedBlockHash = lastBlock.voteData.block_hash
chain.finalizedBlockHash = lastBlock.hash
chain.db.putEth2FinalizedTo DepositContractSnapshot(
eth1Block: lastBlock.voteData.block_hash,
eth1Block: lastBlock.hash,
depositContractState: chain.finalizedDepositsMerkleizer.toDepositContractState)
eth1_finalized_head.set lastBlock.number.toGaugeValue
eth1_finalized_deposits.set lastBlock.voteData.deposit_count.toGaugeValue
eth1_finalized_deposits.set lastBlock.depositCount.toGaugeValue
debug "Eth1 blocks pruned",
newTailBlock = lastBlock.voteData.block_hash,
depositsCount = lastBlock.voteData.deposit_count
newTailBlock = lastBlock.hash,
depositsCount = lastBlock.depositCount
func advanceMerkleizer(chain: Eth1Chain,
merkleizer: var DepositsMerkleizer,
@ -737,12 +754,12 @@ func advanceMerkleizer(chain: Eth1Chain,
if chain.blocks.len == 0:
return depositIndex == merkleizer.getChunkCount
if chain.blocks.peekLast.voteData.deposit_count < depositIndex:
if chain.blocks.peekLast.depositCount < depositIndex:
return false
let
firstBlock = chain.blocks[0]
depositsInLastPrunedBlock = firstBlock.voteData.deposit_count -
depositsInLastPrunedBlock = firstBlock.depositCount -
firstBlock.deposits.lenu64
# advanceMerkleizer should always be called shortly after prunning the chain
@ -764,10 +781,10 @@ func getDepositsRange(chain: Eth1Chain, first, last: uint64): seq[DepositData] =
# in the Eth1Chain. This should hold true at the single call site right
# now, but we need to guard the pre-conditions better.
for blk in chain.blocks:
if blk.voteData.deposit_count <= first:
if blk.depositCount <= first:
continue
let firstDepositIdxInBlk = blk.voteData.deposit_count - blk.deposits.lenu64
let firstDepositIdxInBlk = blk.depositCount - blk.deposits.lenu64
if firstDepositIdxInBlk >= last:
return
@ -781,7 +798,7 @@ func lowerBound(chain: Eth1Chain, depositCount: uint64): Eth1Block =
# future, but the `algorithm` module currently requires an
# `openArray`, which the `deques` module can't provide yet.
for eth1Block in chain.blocks:
if eth1Block.voteData.deposit_count > depositCount:
if eth1Block.depositCount > depositCount:
return
result = eth1Block
@ -789,28 +806,30 @@ proc trackFinalizedState(chain: var Eth1Chain,
finalizedEth1Data: Eth1Data,
finalizedStateDepositIndex: uint64,
blockProposalExpected = false): bool =
# Returns true if the Eth1Monitor is synced to the finalization point
## This function will return true if the Eth1Monitor is synced
## to the finalization point.
if chain.blocks.len == 0:
debug "Eth1 chain not initialized"
return false
let latest = chain.blocks.peekLast
if latest.voteData.deposit_count < finalizedEth1Data.deposit_count:
if latest.depositCount < finalizedEth1Data.deposit_count:
if blockProposalExpected:
error "The Eth1 chain is not synced",
ourDepositsCount = latest.voteData.deposit_count,
ourDepositsCount = latest.depositCount,
targetDepositsCount = finalizedEth1Data.deposit_count
return false
let matchingBlock = chain.lowerBound(finalizedEth1Data.deposit_count)
result = if matchingBlock != nil:
if matchingBlock.voteData.deposit_root == finalizedEth1Data.deposit_root:
if matchingBlock.depositRoot == finalizedEth1Data.deposit_root:
true
else:
error "Corrupted deposits history detected",
ourDepositsCount = matchingBlock.voteData.deposit_count,
ourDepositsCount = matchingBlock.depositCount,
taretDepositsCount = finalizedEth1Data.deposit_count,
ourDepositsRoot = matchingBlock.voteData.deposit_root,
ourDepositsRoot = matchingBlock.depositRoot,
targetDepositsRoot = finalizedEth1Data.deposit_root
chain.hasConsensusViolation = true
false
@ -846,7 +865,7 @@ proc getBlockProposalData*(chain: var Eth1Chain,
for vote in getStateField(state, eth1_data_votes):
let eth1Block = chain.findBlock(vote)
if eth1Block != nil and
eth1Block.voteData.deposit_root == vote.deposit_root and
eth1Block.depositRoot == vote.deposit_root and
vote.deposit_count >= getStateField(state, eth1_data).deposit_count and
is_candidate_block(chain.cfg, eth1Block, periodStart):
otherVotesCountTable.inc vote
@ -882,7 +901,7 @@ proc getBlockProposalData*(chain: var Eth1Chain,
result.vote = getStateField(state, eth1_data)
else:
debug "No acceptable eth1 votes. Voting for latest candidate"
result.vote = latestBlock.voteData
result.vote = latestBlock.toVoteData
if pendingDepositsCount > 0:
if hasLatestDeposits:
@ -968,7 +987,8 @@ proc init*(T: type Eth1Chain, cfg: RuntimeConfig, db: BeaconChainDB): T =
T(db: db,
cfg: cfg,
finalizedBlockHash: finalizedDeposits.eth1Block,
finalizedDepositsMerkleizer: m)
finalizedDepositsMerkleizer: m,
headMerkleizer: copy m)
proc createInitialDepositSnapshot*(
depositContractAddress: Eth1Address,
@ -1037,7 +1057,7 @@ proc safeCancel(fut: var Future[void]) =
func clear(chain: var Eth1Chain) =
chain.blocks.clear()
chain.blocksByHash.clear()
chain.headMerkleizer.reset()
chain.headMerkleizer = copy chain.finalizedDepositsMerkleizer
chain.hasConsensusViolation = false
proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} =
@ -1174,7 +1194,7 @@ proc syncBlockRange(m: Eth1Monitor,
for i in 0 ..< blocksWithDeposits.len:
let blk = blocksWithDeposits[i]
awaitWithRetries m.dataProvider.fetchTimestamp(blk)
await blk.fetchTimestampWithRetries(m.dataProvider)
if blk.number > fullSyncFromBlock:
let lastBlock = m.depositsChain.blocks.peekLast
@ -1187,11 +1207,6 @@ proc syncBlockRange(m: Eth1Monitor,
lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits))
eth1_synced_head.set blockWithoutDeposits.number.toGaugeValue
for deposit in blk.deposits:
m.headMerkleizer.addChunk hash_tree_root(deposit).data
blk.voteData.deposit_count = m.headMerkleizer.getChunkCount
blk.voteData.deposit_root = m.headMerkleizer.getDepositsRoot
m.depositsChain.addBlock blk
eth1_synced_head.set blk.number.toGaugeValue
@ -1207,8 +1222,8 @@ proc syncBlockRange(m: Eth1Monitor,
when hasDepositRootChecks:
debug "Deposit contract state verified",
status = $status,
ourCount = lastBlock.voteData.deposit_count,
ourRoot = lastBlock.voteData.deposit_root
ourCount = lastBlock.depositCount,
ourRoot = lastBlock.depositRoot
case status
of DepositRootIncorrect, DepositCountIncorrect:
@ -1219,7 +1234,7 @@ proc syncBlockRange(m: Eth1Monitor,
info "Eth1 sync progress",
blockNumber = lastBlock.number,
depositsProcessed = lastBlock.voteData.deposit_count
depositsProcessed = lastBlock.depositCount
when hasGenesisDetection:
if blocksWithDeposits.len > 0:
@ -1229,7 +1244,7 @@ proc syncBlockRange(m: Eth1Monitor,
blk.activeValidatorsCount = m.genesisValidators.lenu64
let depositContractState = DepositContractSnapshot(
eth1Block: blocksWithDeposits[^1].voteData.block_hash,
eth1Block: blocksWithDeposits[^1].hash,
depositContractState: m.headMerkleizer.toDepositContractState)
m.depositsChain.db.putEth2FinalizedTo depositContractState
@ -1249,14 +1264,14 @@ proc syncBlockRange(m: Eth1Monitor,
m.depositsChain.addBlock lastBlock.makeSuccessorWithoutDeposits(web3Block)
else:
awaitWithRetries m.dataProvider.fetchTimestamp(lastBlock)
await lastBlock.fetchTimestampWithRetries(m.dataProvider)
var genesisBlockIdx = m.depositsChain.blocks.len - 1
if m.isAfterMinGenesisTime(m.depositsChain.blocks[genesisBlockIdx]):
for i in 1 ..< blocksWithDeposits.len:
let idx = (m.depositsChain.blocks.len - 1) - i
let blk = m.depositsChain.blocks[idx]
awaitWithRetries m.dataProvider.fetchTimestamp(blk)
await blk.fetchTimestampWithRetries(m.dataProvider)
if m.isGenesisCandidate(blk):
genesisBlockIdx = idx
else:
@ -1278,7 +1293,7 @@ proc syncBlockRange(m: Eth1Monitor,
if genesisBlockIdx > 0:
let genesisParent = m.depositsChain.blocks[genesisBlockIdx - 1]
if genesisParent.timestamp == 0:
awaitWithRetries m.dataProvider.fetchTimestamp(genesisParent)
await genesisParent.fetchTimestampWithRetries(m.dataProvider)
if m.hasEnoughValidators(genesisParent) and
genesisBlock.number - genesisParent.number > 1:
genesisBlock = awaitWithRetries(
@ -1330,7 +1345,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
matchingBlockAtNewProvider = awaitWithRetries(
m.dataProvider.getBlockByNumber lastKnownBlock.number)
lastKnownBlock.voteData.block_hash.asBlockHash != matchingBlockAtNewProvider.hash)
lastKnownBlock.hash.asBlockHash != matchingBlockAtNewProvider.hash)
if needsReset:
m.depositsChain.clear()
@ -1401,11 +1416,9 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
m.depositsChain.finalizedBlockHash.asBlockHash))
m.depositsChain.addBlock Eth1Block(
hash: m.depositsChain.finalizedBlockHash,
number: Eth1BlockNumber startBlock.number,
timestamp: Eth1BlockTimestamp startBlock.timestamp,
voteData: eth1DataFromMerkleizer(
m.depositsChain.finalizedBlockHash,
m.depositsChain.finalizedDepositsMerkleizer))
timestamp: Eth1BlockTimestamp startBlock.timestamp)
eth1SyncedTo = Eth1BlockNumber startBlock.number
@ -1414,8 +1427,6 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
eth1_finalized_deposits.set(
m.depositsChain.finalizedDepositsMerkleizer.getChunkCount.toGaugeValue)
m.depositsChain.headMerkleizer = copy m.finalizedDepositsMerkleizer
debug "Starting Eth1 syncing", `from` = shortLog(m.depositsChain.blocks[0])
while true:
@ -1596,7 +1607,6 @@ when hasGenesisDetection:
var
startBlock = startBlock
endBlock = endBlock
depositData = startBlock.voteData
activeValidatorsCountDuringRange = startBlock.activeValidatorsCount
while startBlock.number + 1 < endBlock.number:
@ -1610,10 +1620,9 @@ when hasGenesisDetection:
candidateBlock = awaitWithRetries(
m.dataProvider.getBlockByNumber(candidateNumber))
var candidateAsEth1Block = Eth1Block(number: candidateBlock.number.uint64,
timestamp: candidateBlock.timestamp.uint64,
voteData: depositData)
candidateAsEth1Block.voteData.block_hash = candidateBlock.hash.asEth2Digest
var candidateAsEth1Block = Eth1Block(hash: candidateBlock.hash.asEth2Digest,
number: candidateBlock.number.uint64,
timestamp: candidateBlock.timestamp.uint64)
let candidateGenesisTime = genesis_time_from_eth1_timestamp(
m.cfg, candidateBlock.timestamp.uint64)

View File

@ -101,10 +101,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
eth1Chain.addBlock Eth1Block(
number: Eth1BlockNumber 1,
timestamp: Eth1BlockTimestamp genesisTime,
voteData: Eth1Data(
deposit_root: merkleizer.getDepositsRoot,
deposit_count: merkleizer.getChunkCount))
timestamp: Eth1BlockTimestamp genesisTime)
let replayState = assignClone(dag.headState)
@ -385,10 +382,9 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
inc eth1BlockNum
var eth1Block = Eth1Block(
hash: makeFakeHash(eth1BlockNum),
number: Eth1BlockNumber eth1BlockNum,
timestamp: Eth1BlockTimestamp nextBlockTime,
voteData: Eth1Data(
block_hash: makeFakeHash(eth1BlockNum)))
timestamp: Eth1BlockTimestamp nextBlockTime)
let newDeposits = int clamp(gauss(r, 5.0, 8.0), 0.0, 1000.0)
for i in 0 ..< newDeposits:
@ -397,8 +393,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
eth1Block.deposits.add d
merkleizer.addChunk hash_tree_root(d).data
eth1Block.voteData.deposit_root = merkleizer.getDepositsRoot
eth1Block.voteData.deposit_count = merkleizer.getChunkCount
eth1Block.depositRoot = merkleizer.getDepositsRoot
eth1Block.depositCount = merkleizer.getChunkCount
eth1Chain.addBlock eth1Block
lastEth1BlockAt = nextBlockTime