Check invalid ancestor in engine-api and beacon-sync (#2192)
This commit is contained in:
parent
293ce28e4d
commit
33ac53217f
|
@ -71,6 +71,11 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
|
|||
# reason.
|
||||
var header: common.BlockHeader
|
||||
if not db.getBlockHeader(blockHash, header):
|
||||
# If this block was previously invalidated, keep rejecting it here too
|
||||
let res = ben.checkInvalidAncestor(blockHash, blockHash)
|
||||
if res.isSome:
|
||||
return simpleFCU(res.get)
|
||||
|
||||
# If the head hash is unknown (was not given to us in a newPayload request),
|
||||
# we cannot resolve the header, so not much to do. This could be extended in
|
||||
# the future to resolve from the `eth` network, but it's an unexpected case
|
||||
|
|
|
@ -139,6 +139,11 @@ proc newPayload*(ben: BeaconEngineRef,
|
|||
number = header.blockNumber, hash = blockHash.short
|
||||
return validStatus(blockHash)
|
||||
|
||||
# If this block was rejected previously, keep rejecting it
|
||||
let res = ben.checkInvalidAncestor(blockHash, blockHash)
|
||||
if res.isSome:
|
||||
return res.get
|
||||
|
||||
# If the parent is missing, we - in theory - could trigger a sync, but that
|
||||
# would also entail a reorg. That is problematic if multiple sibling blocks
|
||||
# are being fed to us, and even moreso, if some semi-distant uncle shortens
|
||||
|
@ -147,26 +152,7 @@ proc newPayload*(ben: BeaconEngineRef,
|
|||
# update after legit payload executions.
|
||||
var parent: common.BlockHeader
|
||||
if not db.getBlockHeader(header.parentHash, parent):
|
||||
# Stash the block away for a potential forced forckchoice update to it
|
||||
# at a later time.
|
||||
ben.put(blockHash, header)
|
||||
|
||||
# Although we don't want to trigger a sync, if there is one already in
|
||||
# progress, try to extend if with the current payload request to relieve
|
||||
# some strain from the forkchoice update.
|
||||
#if err := api.eth.Downloader().BeaconExtend(api.eth.SyncMode(), block.Header()); err == nil {
|
||||
# log.Debug("Payload accepted for sync extension", "number", params.Number, "hash", params.BlockHash)
|
||||
# return beacon.PayloadStatusV1{Status: beacon.SYNCING}, nil
|
||||
|
||||
# Either no beacon sync was started yet, or it rejected the delivered
|
||||
# payload as non-integratable on top of the existing sync. We'll just
|
||||
# have to rely on the beacon client to forcefully update the head with
|
||||
# a forkchoice update request.
|
||||
warn "Ignoring payload with missing parent",
|
||||
number = header.blockNumber,
|
||||
hash = blockHash.short,
|
||||
parent = header.parentHash.short
|
||||
return acceptedStatus()
|
||||
return ben.delayPayloadImport(header)
|
||||
|
||||
# We have an existing parent, do some sanity checks to avoid the beacon client
|
||||
# triggering too early
|
||||
|
@ -185,6 +171,14 @@ proc newPayload*(ben: BeaconEngineRef,
|
|||
parent = parent.timestamp, header = header.timestamp
|
||||
return invalidStatus(parent.blockHash, "Invalid timestamp")
|
||||
|
||||
# Another corner case: if the node is in snap sync mode, but the CL client
|
||||
# tries to make it import a block. That should be denied as pushing something
|
||||
# into the database directly will conflict with the assumptions of snap sync
|
||||
# that it has an empty db that it can fill itself.
|
||||
when false:
|
||||
if api.eth.SyncMode() != downloader.FullSync:
|
||||
return api.delayPayloadImport(header)
|
||||
|
||||
if not db.haveBlockAndState(header.parentHash):
|
||||
ben.put(blockHash, header)
|
||||
warn "State not available, ignoring new payload",
|
||||
|
@ -198,6 +192,7 @@ proc newPayload*(ben: BeaconEngineRef,
|
|||
let body = blockBody(payload)
|
||||
let vres = ben.chain.insertBlockWithoutSetHead(header, body)
|
||||
if vres != ValidationResult.OK:
|
||||
ben.setInvalidAncestor(header, blockHash)
|
||||
let blockHash = latestValidHash(db, parent, ttd)
|
||||
return invalidStatus(blockHash, "Failed to insert block")
|
||||
|
||||
|
|
|
@ -69,6 +69,9 @@ proc validateBlockHash*(header: common.BlockHeader,
|
|||
template toValidHash*(x: common.Hash256): Option[Web3Hash] =
|
||||
some(w3Hash x)
|
||||
|
||||
proc simpleFCU*(status: PayloadStatusV1): ForkchoiceUpdatedResponse =
|
||||
ForkchoiceUpdatedResponse(payloadStatus: status)
|
||||
|
||||
proc simpleFCU*(status: PayloadExecutionStatus): ForkchoiceUpdatedResponse =
|
||||
ForkchoiceUpdatedResponse(payloadStatus: PayloadStatusV1(status: status))
|
||||
|
||||
|
|
|
@ -8,12 +8,14 @@
|
|||
# those terms.
|
||||
|
||||
import
|
||||
std/sequtils,
|
||||
std/[sequtils, tables],
|
||||
./web3_eth_conv,
|
||||
./payload_conv,
|
||||
chronicles,
|
||||
web3/execution_types,
|
||||
./merge_tracker,
|
||||
./payload_queue,
|
||||
./api_handler/api_utils,
|
||||
../db/core_db,
|
||||
../core/[tx_pool, casper, chain],
|
||||
../common/common
|
||||
|
@ -29,8 +31,43 @@ type
|
|||
queue : PayloadQueue
|
||||
chain : ChainRef
|
||||
|
||||
# The forkchoice update and new payload method require us to return the
|
||||
# latest valid hash in an invalid chain. To support that return, we need
|
||||
# to track historical bad blocks as well as bad tipsets in case a chain
|
||||
# is constantly built on it.
|
||||
#
|
||||
# There are a few important caveats in this mechanism:
|
||||
# - The bad block tracking is ephemeral, in-memory only. We must never
|
||||
# persist any bad block information to disk as a bug in Geth could end
|
||||
# up blocking a valid chain, even if a later Geth update would accept
|
||||
# it.
|
||||
# - Bad blocks will get forgotten after a certain threshold of import
|
||||
# attempts and will be retried. The rationale is that if the network
|
||||
# really-really-really tries to feed us a block, we should give it a
|
||||
# new chance, perhaps us being racey instead of the block being legit
|
||||
# bad (this happened in Geth at a point with import vs. pending race).
|
||||
# - Tracking all the blocks built on top of the bad one could be a bit
|
||||
# problematic, so we will only track the head chain segment of a bad
|
||||
# chain to allow discarding progressing bad chains and side chains,
|
||||
# without tracking too much bad data.
|
||||
|
||||
# Ephemeral cache to track invalid blocks and their hit count
|
||||
invalidBlocksHits: Table[common.Hash256, int]
|
||||
# Ephemeral cache to track invalid tipsets and their bad ancestor
|
||||
invalidTipsets : Table[common.Hash256, common.BlockHeader]
|
||||
|
||||
{.push gcsafe, raises:[].}
|
||||
|
||||
const
|
||||
# invalidBlockHitEviction is the number of times an invalid block can be
|
||||
# referenced in forkchoice update or new payload before it is attempted
|
||||
# to be reprocessed again.
|
||||
invalidBlockHitEviction = 128
|
||||
|
||||
# invalidTipsetsCap is the max number of recent block hashes tracked that
|
||||
# have lead to some bad ancestor block. It's just an OOM protection.
|
||||
invalidTipsetsCap = 512
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -48,6 +85,13 @@ template wrapException(body: untyped): auto =
|
|||
except CatchableError as ex:
|
||||
err(ex.msg)
|
||||
|
||||
# setInvalidAncestor is a callback for the downloader to notify us if a bad block
|
||||
# is encountered during the async sync.
|
||||
proc setInvalidAncestor(ben: BeaconEngineRef,
|
||||
invalid, origin: common.BlockHeader) =
|
||||
ben.invalidTipsets[origin.blockHash] = invalid
|
||||
inc ben.invalidBlocksHits.mgetOrPut(invalid.blockHash, 0)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Constructors
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -55,13 +99,19 @@ template wrapException(body: untyped): auto =
|
|||
proc new*(_: type BeaconEngineRef,
|
||||
txPool: TxPoolRef,
|
||||
chain: ChainRef): BeaconEngineRef =
|
||||
BeaconEngineRef(
|
||||
let ben = BeaconEngineRef(
|
||||
txPool: txPool,
|
||||
merge : MergeTracker.init(txPool.com.db),
|
||||
queue : PayloadQueue(),
|
||||
chain : chain,
|
||||
)
|
||||
|
||||
txPool.com.notifyBadBlock = proc(invalid, origin: common.BlockHeader)
|
||||
{.gcsafe, raises: [].} =
|
||||
ben.setInvalidAncestor(invalid, origin)
|
||||
|
||||
ben
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions, setters
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -204,3 +254,87 @@ proc generatePayload*(ben: BeaconEngineRef,
|
|||
ok ExecutionPayloadAndBlobsBundle(
|
||||
executionPayload: executionPayload(bundle.blk),
|
||||
blobsBundle: blobsBundle)
|
||||
|
||||
proc setInvalidAncestor*(ben: BeaconEngineRef, header: common.BlockHeader, blockHash: common.Hash256) =
|
||||
ben.invalidBlocksHits[blockHash] = 1
|
||||
ben.invalidTipsets[blockHash] = header
|
||||
|
||||
# checkInvalidAncestor checks whether the specified chain end links to a known
|
||||
# bad ancestor. If yes, it constructs the payload failure response to return.
|
||||
proc checkInvalidAncestor*(ben: BeaconEngineRef,
|
||||
check, head: common.Hash256): Opt[PayloadStatusV1] =
|
||||
# If the hash to check is unknown, return valid
|
||||
ben.invalidTipsets.withValue(check, invalid) do:
|
||||
# If the bad hash was hit too many times, evict it and try to reprocess in
|
||||
# the hopes that we have a data race that we can exit out of.
|
||||
let badHash = invalid[].blockHash
|
||||
|
||||
inc ben.invalidBlocksHits.mgetOrPut(badHash, 0)
|
||||
if ben.invalidBlocksHits.getOrDefault(badHash) >= invalidBlockHitEviction:
|
||||
warn "Too many bad block import attempt, trying",
|
||||
number=invalid.blockNumber, hash=badHash.short
|
||||
|
||||
ben.invalidBlocksHits.del(badHash)
|
||||
|
||||
var deleted = newSeq[common.Hash256]()
|
||||
for descendant, badHeader in ben.invalidTipsets:
|
||||
if badHeader.blockHash == badHash:
|
||||
deleted.add descendant
|
||||
|
||||
for x in deleted:
|
||||
ben.invalidTipsets.del(x)
|
||||
|
||||
return Opt.none(PayloadStatusV1)
|
||||
|
||||
# Not too many failures yet, mark the head of the invalid chain as invalid
|
||||
if check != head:
|
||||
warn "Marked new chain head as invalid",
|
||||
hash=head, badnumber=invalid.blockNumber, badhash=badHash
|
||||
|
||||
if ben.invalidTipsets.len >= invalidTipsetsCap:
|
||||
let size = invalidTipsetsCap - ben.invalidTipsets.len
|
||||
var deleted = newSeqOfCap[common.Hash256](size)
|
||||
for key in ben.invalidTipsets.keys:
|
||||
deleted.add key
|
||||
if deleted.len >= size:
|
||||
break
|
||||
for x in deleted:
|
||||
ben.invalidTipsets.del(x)
|
||||
|
||||
ben.invalidTipsets[head] = invalid[]
|
||||
|
||||
var lastValid = invalid.parentHash
|
||||
|
||||
# If the last valid hash is the terminal pow block, return 0x0 for latest valid hash
|
||||
var header: common.BlockHeader
|
||||
if ben.com.db.getBlockHeader(invalid.parentHash, header):
|
||||
if header.difficulty != 0.u256:
|
||||
lastValid = common.Hash256()
|
||||
|
||||
return Opt.some invalidStatus(lastValid, "links to previously rejected block")
|
||||
|
||||
do:
|
||||
return Opt.none(PayloadStatusV1)
|
||||
|
||||
# delayPayloadImport stashes the given block away for import at a later time,
|
||||
# either via a forkchoice update or a sync extension. This method is meant to
|
||||
# be called by the newpayload command when the block seems to be ok, but some
|
||||
# prerequisite prevents it from being processed (e.g. no parent, or snap sync).
|
||||
proc delayPayloadImport*(ben: BeaconEngineRef, header: common.BlockHeader): PayloadStatusV1 =
|
||||
# Sanity check that this block's parent is not on a previously invalidated
|
||||
# chain. If it is, mark the block as invalid too.
|
||||
let blockHash = header.blockHash
|
||||
let res = ben.checkInvalidAncestor(header.parentHash, blockHash)
|
||||
if res.isSome:
|
||||
return res.get
|
||||
|
||||
# Stash the block away for a potential forced forkchoice update to it
|
||||
# at a later time.
|
||||
ben.put(blockHash, header)
|
||||
|
||||
# Although we don't want to trigger a sync, if there is one already in
|
||||
# progress, try to extend it with the current payload request to relieve
|
||||
# some strain from the forkchoice update.
|
||||
ben.com.syncReqNewHead(header)
|
||||
|
||||
PayloadStatusV1(status: PayloadExecutionStatus.syncing)
|
||||
|
|
|
@ -39,6 +39,9 @@ type
|
|||
SyncReqNewHeadCB* = proc(header: BlockHeader) {.gcsafe, raises: [].}
|
||||
## Update head for syncing
|
||||
|
||||
NotifyBadBlockCB* = proc(invalid, origin: BlockHeader) {.gcsafe, raises: [].}
|
||||
## Notify engine-API of encountered bad block
|
||||
|
||||
CommonRef* = ref object
|
||||
# all purpose storage
|
||||
db: CoreDbRef
|
||||
|
@ -79,6 +82,10 @@ type
|
|||
## `syncReqNewHead` is set.) although `shanghaiTime` is unavailable
|
||||
## or has not reached, yet.
|
||||
|
||||
notifyBadBlock: NotifyBadBlockCB
|
||||
## Allow synchronizer to inform engine-API of bad encountered during sync
|
||||
## progress
|
||||
|
||||
startOfHistory: Hash256
|
||||
## This setting is needed for resuming blockwise syncying after
|
||||
## installing a snapshot pivot. The default value for this field is
|
||||
|
@ -424,6 +431,12 @@ proc syncReqNewHead*(com: CommonRef; header: BlockHeader)
|
|||
if not com.syncReqNewHead.isNil:
|
||||
com.syncReqNewHead(header)
|
||||
|
||||
proc notifyBadBlock*(com: CommonRef; invalid, origin: BlockHeader)
|
||||
{.gcsafe, raises: [].} =
|
||||
|
||||
if not com.notifyBadBlock.isNil:
|
||||
com.notifyBadBlock(invalid, origin)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Getters
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -566,6 +579,10 @@ proc `syncReqRelaxV2=`*(com: CommonRef; val: bool) =
|
|||
if not com.syncReqNewHead.isNil:
|
||||
com.syncReqRelaxV2 = val
|
||||
|
||||
proc `notifyBadBlock=`*(com: CommonRef; cb: NotifyBadBlockCB) =
|
||||
## Activate or reset a call back handler for bad block notification.
|
||||
com.notifyBadBlock = cb
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -237,8 +237,12 @@ proc shiftSyncTarget*(ctx: BeaconCtxRef): Future[BlockHeader] {.async.} =
|
|||
|
||||
proc setSyncTarget*(ctx: BeaconCtxRef): Future[void] {.async.} =
|
||||
let header = await ctx.shiftSyncTarget()
|
||||
let job = makeGetBodyJob(header, setHead = true)
|
||||
ctx.pool.jobs.addLast(job)
|
||||
let res = ctx.pool.skeleton.setHead(header, force = true)
|
||||
if res.isOk:
|
||||
let job = makeGetBodyJob(header, setHead = true)
|
||||
ctx.pool.jobs.addLast(job)
|
||||
else:
|
||||
error "setSyncTarget.setHead", msg=res.error
|
||||
|
||||
proc fillBlocksGaps*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
|
||||
if last - least < MaxGetBlocks:
|
||||
|
@ -277,14 +281,18 @@ proc executeGetBodyJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.a
|
|||
|
||||
let b = await peer.getBlockBodies([job.getBodyJob.headerHash])
|
||||
if b.isNone:
|
||||
debug "executeGetBodyJob->getBodies none"
|
||||
debug "executeGetBodyJob->getBodies none",
|
||||
hash=job.getBodyJob.headerHash.short,
|
||||
number=job.getBodyJob.header.blockNumber
|
||||
# retry with other peer
|
||||
buddy.requeue job
|
||||
return
|
||||
|
||||
let bodies = b.get
|
||||
if bodies.blocks.len == 0:
|
||||
debug "executeGetBodyJob->getBodies isZero"
|
||||
debug "executeGetBodyJob->getBodies isZero",
|
||||
hash=job.getBodyJob.headerHash.short,
|
||||
number=job.getBodyJob.header.blockNumber
|
||||
# retry with other peer
|
||||
buddy.requeue job
|
||||
return
|
||||
|
|
|
@ -324,6 +324,13 @@ proc fillCanonicalChain*(sk: SkeletonRef): Result[void, string] =
|
|||
let header = maybeHeader.get
|
||||
let res = sk.insertBlock(header, true)
|
||||
if res.isErr:
|
||||
let maybeHead = sk.getHeader(subchain.head).valueOr:
|
||||
return err(error)
|
||||
|
||||
# In post-merge, notify the engine API of encountered bad chains
|
||||
if maybeHead.isSome:
|
||||
sk.com.notifyBadBlock(header, maybeHead.get)
|
||||
|
||||
debug "fillCanonicalChain putBlock", msg=res.error
|
||||
if maybeOldHead.isSome:
|
||||
let oldHead = maybeOldHead.get
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at
|
||||
# https://opensource.org/licenses/MIT).
|
||||
|
@ -84,6 +84,9 @@ func blockHeight*(sk: SkeletonRef): uint64 =
|
|||
func genesisHash*(sk: SkeletonRef): Hash256 =
|
||||
sk.chain.com.genesisHash
|
||||
|
||||
func com*(sk: SkeletonRef): CommonRef =
|
||||
sk.chain.com
|
||||
|
||||
func len*(sk: SkeletonRef): int =
|
||||
sk.progress.segments.len
|
||||
|
||||
|
|
Loading…
Reference in New Issue