mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-11 21:04:11 +00:00
Release BeconSync from touching finality
This commit is contained in:
parent
70a1f768f7
commit
7796636020
@ -110,7 +110,6 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
|
||||
|
||||
# Update sync header (if any)
|
||||
com.syncReqNewHead(header)
|
||||
com.reqBeaconSyncTargetCB(header, update.finalizedBlockHash)
|
||||
|
||||
return simpleFCU(PayloadExecutionStatus.syncing)
|
||||
|
||||
|
@ -264,6 +264,10 @@ proc delayPayloadImport*(ben: BeaconEngineRef, header: Header): PayloadStatusV1
|
||||
# at a later time.
|
||||
ben.put(blockHash, header)
|
||||
|
||||
info "delayPayloadImport requested sync to new head",
|
||||
number = header.number,
|
||||
hash = blockHash.short
|
||||
|
||||
# Although we don't want to trigger a sync, if there is one already in
|
||||
# progress, try to extend it with the current payload request to relieve
|
||||
# some strain from the forkchoice update.
|
||||
|
@ -42,9 +42,6 @@ type
|
||||
SyncReqNewHeadCB* = proc(header: Header) {.gcsafe, raises: [].}
|
||||
## Update head for syncing
|
||||
|
||||
ReqBeaconSyncTargetCB* = proc(header: Header; finHash: Hash32) {.gcsafe, raises: [].}
|
||||
## Ditto (for beacon sync)
|
||||
|
||||
NotifyBadBlockCB* = proc(invalid, origin: Header) {.gcsafe, raises: [].}
|
||||
## Notify engine-API of encountered bad block
|
||||
|
||||
@ -76,13 +73,9 @@ type
|
||||
## Call back function for the sync processor. This function stages
|
||||
## the arguent header to a private aerea for subsequent processing.
|
||||
|
||||
reqBeaconSyncTargetCB: ReqBeaconSyncTargetCB
|
||||
## Call back function for a sync processor that returns the canonical
|
||||
## header.
|
||||
|
||||
notifyBadBlock: NotifyBadBlockCB
|
||||
## Allow synchronizer to inform engine-API of bad encountered during sync
|
||||
## progress
|
||||
## Allow synchronizer to inform engine-API of bad block encountered
|
||||
## during sync progress.
|
||||
|
||||
startOfHistory: Hash32
|
||||
## This setting is needed for resuming blockwise syncying after
|
||||
@ -334,11 +327,6 @@ proc syncReqNewHead*(com: CommonRef; header: Header)
|
||||
if not com.syncReqNewHead.isNil:
|
||||
com.syncReqNewHead(header)
|
||||
|
||||
proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header; finHash: Hash32) =
|
||||
## Used by RPC updater
|
||||
if not com.reqBeaconSyncTargetCB.isNil:
|
||||
com.reqBeaconSyncTargetCB(header, finHash)
|
||||
|
||||
proc notifyBadBlock*(com: CommonRef; invalid, origin: Header)
|
||||
{.gcsafe, raises: [].} =
|
||||
|
||||
@ -447,10 +435,6 @@ func `syncReqNewHead=`*(com: CommonRef; cb: SyncReqNewHeadCB) =
|
||||
## Activate or reset a call back handler for syncing.
|
||||
com.syncReqNewHead = cb
|
||||
|
||||
func `reqBeaconSyncTarget=`*(com: CommonRef; cb: ReqBeaconSyncTargetCB) =
|
||||
## Activate or reset a call back handler for syncing.
|
||||
com.reqBeaconSyncTargetCB = cb
|
||||
|
||||
func `notifyBadBlock=`*(com: CommonRef; cb: NotifyBadBlockCB) =
|
||||
## Activate or reset a call back handler for bad block notification.
|
||||
com.notifyBadBlock = cb
|
||||
|
@ -158,8 +158,9 @@ proc validateBlock(c: ForkedChainRef,
|
||||
if updateCursor:
|
||||
c.updateCursor(blk, move(res.value))
|
||||
|
||||
let blockHash = blk.header.blockHash
|
||||
for i, tx in blk.transactions:
|
||||
c.txRecords[rlpHash(tx)] = (blk.header.blockHash, uint64(i))
|
||||
c.txRecords[rlpHash(tx)] = (blockHash, uint64(i))
|
||||
|
||||
ok()
|
||||
|
||||
@ -317,6 +318,19 @@ func canonicalChain(c: ForkedChainRef,
|
||||
|
||||
err("Block hash not in canonical chain")
|
||||
|
||||
func calculateNewBase(c: ForkedChainRef,
|
||||
headHash: Hash32,
|
||||
targetNumber: BlockNumber): BaseDesc =
|
||||
shouldNotKeyError:
|
||||
var prevHash = headHash
|
||||
while prevHash != c.baseHash:
|
||||
var header = c.blocks[prevHash].blk.header
|
||||
if header.number == targetNumber:
|
||||
return BaseDesc(hash: prevHash, header: move(header))
|
||||
prevHash = header.parentHash
|
||||
|
||||
doAssert(false, "Unreachable code")
|
||||
|
||||
func calculateNewBase(c: ForkedChainRef,
|
||||
finalizedHeader: Header,
|
||||
headHash: Hash32,
|
||||
@ -331,15 +345,7 @@ func calculateNewBase(c: ForkedChainRef,
|
||||
if targetNumber <= c.baseHeader.number + c.baseDistance:
|
||||
return BaseDesc(hash: c.baseHash, header: c.baseHeader)
|
||||
|
||||
shouldNotKeyError:
|
||||
var prevHash = headHash
|
||||
while prevHash != c.baseHash:
|
||||
var header = c.blocks[prevHash].blk.header
|
||||
if header.number == targetNumber:
|
||||
return BaseDesc(hash: prevHash, header: move(header))
|
||||
prevHash = header.parentHash
|
||||
|
||||
doAssert(false, "Unreachable code")
|
||||
c.calculateNewBase(headHash, targetNumber)
|
||||
|
||||
func trimCanonicalChain(c: ForkedChainRef,
|
||||
head: CanonicalDesc,
|
||||
@ -493,6 +499,34 @@ proc importBlock*(c: ForkedChainRef, blk: Block): Result[void, string] =
|
||||
c.replaySegment(header.parentHash)
|
||||
c.validateBlock(c.cursorHeader, blk)
|
||||
|
||||
proc importBlockBlindly*(c: ForkedChainRef, blk: Block): Result[void, string] =
|
||||
?c.importBlock(blk)
|
||||
|
||||
if blk.header.number <= c.baseHeader.number:
|
||||
return ok()
|
||||
|
||||
let distanceFromBase = blk.header.number - c.baseHeader.number
|
||||
# finalizerThreshold is baseDistance + 25% of baseDistancce capped at 32.
|
||||
let finalizerThreshold = c.baseDistance + max(1'u64, min(c.baseDistance div 4'u64, 32'u64))
|
||||
if distanceFromBase < finalizerThreshold:
|
||||
return ok()
|
||||
|
||||
# Move the base forward `baseDistance` blocks,
|
||||
# and keep finalizerThreshold-baseDistance blocks in memory.
|
||||
let targetNumber = c.baseHeader.number + c.baseDistance
|
||||
let newBase = c.calculateNewBase(c.cursorHash, targetNumber)
|
||||
|
||||
c.replaySegment(newBase.hash)
|
||||
c.writeBaggage(newBase.hash)
|
||||
c.stagingTx.commit()
|
||||
c.stagingTx = nil
|
||||
|
||||
# Update base forward to newBase.
|
||||
c.updateBase(newBase.hash, newBase.header, c.cursorHash)
|
||||
c.db.persistent(newBase.header.number).isOkOr:
|
||||
return err("Failed to save state: " & $$error)
|
||||
ok()
|
||||
|
||||
proc forkChoice*(c: ForkedChainRef,
|
||||
headHash: Hash32,
|
||||
finalizedHash: Hash32): Result[void, string] =
|
||||
|
@ -14,7 +14,7 @@
|
||||
{.push gcsafe, raises: [].}
|
||||
|
||||
import
|
||||
std/[algorithm, sequtils],
|
||||
std/[sequtils],
|
||||
chronicles,
|
||||
eth/[common, rlp],
|
||||
stew/byteutils,
|
||||
|
@ -172,10 +172,6 @@ proc runPeer*(
|
||||
buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun
|
||||
buddy.only.nMultiLoop.inc # statistics/debugging
|
||||
|
||||
# Update consensus header target when needed. It comes with a finalised
|
||||
# header hash where we need to complete the block number.
|
||||
await buddy.headerStagedUpdateTarget info
|
||||
|
||||
if not await buddy.napUnlessSomethingToFetch():
|
||||
#
|
||||
# Layout of a triple of linked header chains (see `README.md`)
|
||||
|
@ -276,10 +276,11 @@ proc blocksStagedImport*(
|
||||
block importLoop:
|
||||
for n in 0 ..< nBlocks:
|
||||
let nBn = qItem.data.blocks[n].header.number
|
||||
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
|
||||
ctx.pool.chain.importBlockBlindly(qItem.data.blocks[n]).isOkOr:
|
||||
warn info & ": import block error", n, iv,
|
||||
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
|
||||
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n), `error`=error
|
||||
ctx.chain.com.notifyBadBlock(qItem.data.blocks[n].header, ctx.chain.latestHeader())
|
||||
# Restore what is left over below
|
||||
maxImport = ctx.chain.latestNumber()
|
||||
break importLoop
|
||||
@ -295,30 +296,6 @@ proc blocksStagedImport*(
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
||||
# Occasionally mark the chain finalized
|
||||
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
|
||||
let
|
||||
nthHash = qItem.data.getNthHash(n)
|
||||
finHash = if nBn < ctx.layout.final: nthHash
|
||||
else: ctx.layout.finalHash
|
||||
|
||||
doAssert nBn == ctx.chain.latestNumber()
|
||||
ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr:
|
||||
warn info & ": fork choice error", n, iv,
|
||||
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
|
||||
F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash,
|
||||
finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error
|
||||
# Restore what is left over below
|
||||
maxImport = ctx.chain.latestNumber()
|
||||
break importLoop
|
||||
|
||||
# Allow pseudo/async thread switch.
|
||||
try: await sleepAsync asyncThreadSwitchTimeSlot
|
||||
except CancelledError: discard
|
||||
if not ctx.daemon:
|
||||
maxImport = ctx.chain.latestNumber()
|
||||
break importLoop
|
||||
|
||||
# Import probably incomplete, so a partial roll back may be needed
|
||||
if maxImport < iv.maxPt:
|
||||
ctx.blocksUnprocCommit(0, maxImport+1, qItem.data.blocks[^1].header.number)
|
||||
|
@ -100,9 +100,6 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
# If there was a manual import after a previous sync, then saved state
|
||||
# might be outdated.
|
||||
if rc.isOk and
|
||||
# The base number is the least record of the FCU chains/tree. So the
|
||||
# finalised entry must not be smaller.
|
||||
ctx.chain.baseNumber() <= rc.value.final and
|
||||
# If the latest FCU number is not larger than the head, there is nothing
|
||||
# to do (might also happen after a manual import.)
|
||||
latest < rc.value.head:
|
||||
@ -134,14 +131,6 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool =
|
||||
couplerHash: latestHash,
|
||||
dangling: latest,
|
||||
danglingParent: latestParent,
|
||||
# There is no need to record a separate finalised head `F` as its only
|
||||
# use is to serve as second argument in `forkChoice()` when committing
|
||||
# a batch of imported blocks. Currently, there are no blocks to fetch
|
||||
# and import. The system must wait for instructions and update the fields
|
||||
# `final` and `head` while the latter will be increased so that import
|
||||
# can start.
|
||||
final: latest,
|
||||
finalHash: latestHash,
|
||||
head: latest,
|
||||
headHash: latestHash,
|
||||
headLocked: false)
|
||||
|
@ -52,51 +52,6 @@ proc fetchAndCheck(
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc headerStagedUpdateTarget*(
|
||||
buddy: BeaconBuddyRef;
|
||||
info: static[string];
|
||||
) {.async: (raises: []).} =
|
||||
## Fetch finalised beacon header if there is an update available
|
||||
let
|
||||
ctx = buddy.ctx
|
||||
peer = buddy.peer
|
||||
if not ctx.layout.headLocked and
|
||||
ctx.target.final == 0 and
|
||||
ctx.target.finalHash != zeroHash32 and
|
||||
not ctx.target.locked:
|
||||
const iv = BnRange.new(1u,1u) # dummy interval
|
||||
|
||||
ctx.target.locked = true
|
||||
let rc = await buddy.headersFetchReversed(iv, ctx.target.finalHash, info)
|
||||
ctx.target.locked = false
|
||||
|
||||
if rc.isOk:
|
||||
let hash = rlp.encode(rc.value[0]).keccak256
|
||||
if hash != ctx.target.finalHash:
|
||||
# Oops
|
||||
buddy.ctrl.zombie = true
|
||||
trace info & ": finalised header hash mismatch", peer, hash,
|
||||
expected=ctx.target.finalHash
|
||||
else:
|
||||
let final = rc.value[0].number
|
||||
if final < ctx.chain.baseNumber():
|
||||
trace info & ": finalised number too low", peer,
|
||||
B=ctx.chain.baseNumber.bnStr, finalised=final.bnStr,
|
||||
delta=(ctx.chain.baseNumber - final)
|
||||
ctx.target.reset
|
||||
else:
|
||||
ctx.target.final = final
|
||||
|
||||
# Activate running (unless done yet)
|
||||
if ctx.hibernate:
|
||||
ctx.hibernate = false
|
||||
trace info & ": activated syncer", peer,
|
||||
finalised=final.bnStr, head=ctx.layout.head.bnStr
|
||||
|
||||
# Update, so it can be followed nicely
|
||||
ctx.updateMetrics()
|
||||
|
||||
|
||||
proc headersStagedCollect*(
|
||||
buddy: BeaconBuddyRef;
|
||||
info: static[string];
|
||||
|
@ -58,21 +58,17 @@ when enableTicker:
|
||||
nBuddies: ctx.pool.nBuddies)
|
||||
|
||||
|
||||
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB =
|
||||
proc updateBeaconHeaderCB(ctx: BeaconCtxRef): SyncReqNewHeadCB =
|
||||
## Update beacon header. This function is intended as a call back function
|
||||
## for the RPC module.
|
||||
return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} =
|
||||
return proc(h: Header) {.gcsafe, raises: [].} =
|
||||
|
||||
# Check whether there is an update running (otherwise take next upate)
|
||||
if not ctx.target.locked and # ignore if currently updating
|
||||
ctx.target.final == 0 and # ignore if complete already
|
||||
f != zeroHash32 and # finalised hash is set
|
||||
ctx.layout.head < h.number and # update is advancing
|
||||
ctx.target.consHead.number < h.number: # .. ditto
|
||||
|
||||
ctx.target.consHead = h
|
||||
ctx.target.final = BlockNumber(0)
|
||||
ctx.target.finalHash = f
|
||||
ctx.target.changed = true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -130,11 +126,11 @@ proc setupDatabase*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
|
||||
proc setupRpcMagic*(ctx: BeaconCtxRef) =
|
||||
## Helper for `setup()`: Enable external pivot update via RPC
|
||||
ctx.pool.chain.com.reqBeaconSyncTarget = ctx.updateBeaconHeaderCB
|
||||
ctx.pool.chain.com.syncReqNewHead = ctx.updateBeaconHeaderCB
|
||||
|
||||
proc destroyRpcMagic*(ctx: BeaconCtxRef) =
|
||||
## Helper for `release()`
|
||||
ctx.pool.chain.com.reqBeaconSyncTarget = ReqBeaconSyncTargetCB(nil)
|
||||
ctx.pool.chain.com.syncReqNewHead = SyncReqNewHeadCB(nil)
|
||||
|
||||
# ---------
|
||||
|
||||
|
@ -69,8 +69,6 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) =
|
||||
couplerHash: ctx.layout.couplerHash,
|
||||
dangling: target,
|
||||
danglingParent: ctx.target.consHead.parentHash,
|
||||
final: ctx.target.final,
|
||||
finalHash: ctx.target.finalHash,
|
||||
head: target,
|
||||
headHash: rlpHeader.keccak256,
|
||||
headLocked: true)
|
||||
@ -126,8 +124,6 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) =
|
||||
couplerHash: ctx.layout.headHash,
|
||||
dangling: ctx.layout.head, # `D`
|
||||
danglingParent: ctx.dbPeekParentHash(ctx.layout.head).expect "Hash32",
|
||||
final: ctx.layout.final, # `F`
|
||||
finalHash: ctx.layout.finalHash,
|
||||
head: ctx.layout.head, # `H`
|
||||
headHash: ctx.layout.headHash,
|
||||
headLocked: ctx.layout.headLocked)
|
||||
@ -171,8 +167,7 @@ proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) =
|
||||
|
||||
# Check whether there is something to do regarding beacon node change
|
||||
if not ctx.layout.headLocked and # there was an active import request
|
||||
ctx.target.changed and # and there is a new target from CL
|
||||
ctx.target.final != 0: # .. ditto
|
||||
ctx.target.changed: # and there is a new target from CL
|
||||
ctx.target.changed = false
|
||||
ctx.updateTargetChange info
|
||||
|
||||
|
@ -23,16 +23,13 @@ declareGauge beacon_base, "" &
|
||||
|
||||
declareGauge beacon_latest, "" &
|
||||
"Block number of latest imported blocks"
|
||||
|
||||
|
||||
declareGauge beacon_coupler, "" &
|
||||
"Max block number for header chain starting at genesis"
|
||||
|
||||
declareGauge beacon_dangling, "" &
|
||||
"Starting/min block number for higher up headers chain"
|
||||
|
||||
declareGauge beacon_final, "" &
|
||||
"Max number of finalised block in higher up headers chain"
|
||||
|
||||
declareGauge beacon_head, "" &
|
||||
"Ending/max block number of higher up headers chain"
|
||||
|
||||
@ -62,7 +59,6 @@ template updateMetricsImpl(ctx: BeaconCtxRef) =
|
||||
metrics.set(beacon_latest, ctx.chain.latestNumber().int64)
|
||||
metrics.set(beacon_coupler, ctx.layout.coupler.int64)
|
||||
metrics.set(beacon_dangling, ctx.layout.dangling.int64)
|
||||
metrics.set(beacon_final, ctx.layout.final.int64)
|
||||
metrics.set(beacon_head, ctx.layout.head.int64)
|
||||
metrics.set(beacon_target, ctx.target.consHead.number.int64)
|
||||
|
||||
|
@ -121,10 +121,6 @@ const
|
||||
## entry block number is too high and so leaves a gap to the ledger state
|
||||
## block number.)
|
||||
|
||||
finaliserChainLengthMax* = 32
|
||||
## When importing with `importBlock()`, finalise after at most this many
|
||||
## invocations of `importBlock()`.
|
||||
|
||||
# ----------------------
|
||||
|
||||
static:
|
||||
|
@ -69,8 +69,6 @@ type
|
||||
locked*: bool ## Don't update while fetching header
|
||||
changed*: bool ## Tell that something has changed
|
||||
consHead*: Header ## Consensus head
|
||||
final*: BlockNumber ## Finalised block number
|
||||
finalHash*: Hash32 ## Finalised hash
|
||||
|
||||
SyncStateLayout* = object
|
||||
## Layout of a linked header chains defined by the triple `(C,D,H)` as
|
||||
@ -92,9 +90,6 @@ type
|
||||
dangling*: BlockNumber ## Left end `D` of linked chain `[D,H]`
|
||||
danglingParent*: Hash32 ## Parent hash of `D`
|
||||
|
||||
final*: BlockNumber ## Finalised block number `F`
|
||||
finalHash*: Hash32 ## Hash of `F`
|
||||
|
||||
head*: BlockNumber ## `H`, block num of some finalised block
|
||||
headHash*: Hash32 ## Hash of `H`
|
||||
headLocked*: bool ## No need to update `H` yet
|
||||
|
Loading…
x
Reference in New Issue
Block a user