Backfill only up to MIN_EPOCHS_FOR_BLOCK_REQUESTS blocks (#4421)

When backfilling, we only need to download blocks that are newer than
MIN_EPOCHS_FOR_BLOCK_REQUESTS - the rest cannot reliably be fetched from
the network and does not have to be provided to others.

This change affects only trusted-node-synced clients - genesis sync
continues to work as before (because it needs to construct a state by
building it from genesis).

Those wishing to complete a backfill should do so with era files
instead.
This commit is contained in:
Jacek Sieka 2022-12-23 08:42:55 +01:00 committed by GitHub
parent 2ac7609259
commit 75c7195bfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 89 additions and 54 deletions

View File

@ -410,6 +410,13 @@ proc addBackfillBlock*(
debug "Block does not match expected backfill root" debug "Block does not match expected backfill root"
return err(VerifierError.MissingParent) # MissingChild really, but .. return err(VerifierError.MissingParent) # MissingChild really, but ..
if blck.slot < dag.horizon:
# This can happen as the horizon keeps moving - we'll discard it as
# duplicate since it would have duplicated an existing block had we been
# interested
debug "Block past horizon, dropping", horizon = dag.horizon
return err(VerifierError.Duplicate)
checkSignature() checkSignature()
let sigVerifyTick = Moment.now let sigVerifyTick = Moment.now

View File

@ -362,6 +362,14 @@ template frontfill*(dagParam: ChainDAGRef): Opt[BlockId] =
else: else:
dag.genesis dag.genesis
func horizon*(dag: ChainDAGRef): Slot =
## The sync horizon that we target during backfill - ie we will not backfill
## blocks older than this from the network
if dag.head.slot.epoch > dag.cfg.MIN_EPOCHS_FOR_BLOCK_REQUESTS:
start_slot(dag.head.slot.epoch - dag.cfg.MIN_EPOCHS_FOR_BLOCK_REQUESTS)
else:
GENESIS_SLOT
template epoch*(e: EpochRef): Epoch = e.key.epoch template epoch*(e: EpochRef): Epoch = e.key.epoch
func shortLog*(v: EpochKey): string = func shortLog*(v: EpochKey): string =

View File

@ -1295,8 +1295,13 @@ proc getBlockRange*(
head = shortLog(dag.head.root), requestedCount, startSlot, skipStep, headSlot head = shortLog(dag.head.root), requestedCount, startSlot, skipStep, headSlot
if startSlot < dag.backfill.slot: if startSlot < dag.backfill.slot:
notice "Got request for pre-backfill slot", if startSlot < dag.horizon:
startSlot, backfillSlot = dag.backfill.slot # We will not backfill these
debug "Got request for pre-horizon slot",
startSlot, backfillSlot = dag.backfill.slot
else:
notice "Got request for pre-backfill slot",
startSlot, backfillSlot = dag.backfill.slot
return output.len return output.len
if headSlot <= startSlot or requestedCount == 0: if headSlot <= startSlot or requestedCount == 0:

View File

@ -244,7 +244,6 @@ proc newExecutionPayload*(
executionPayload: bellatrix.ExecutionPayload | capella.ExecutionPayload): executionPayload: bellatrix.ExecutionPayload | capella.ExecutionPayload):
Future[Opt[PayloadExecutionStatus]] {.async.} = Future[Opt[PayloadExecutionStatus]] {.async.} =
if eth1Monitor.isNil: if eth1Monitor.isNil:
warn "newPayload: attempting to process execution payload without Eth1Monitor. Ensure --web3-url setting is correct and JWT is configured."
return Opt.none PayloadExecutionStatus return Opt.none PayloadExecutionStatus
debug "newPayload: inserting block into execution engine", debug "newPayload: inserting block into execution engine",
@ -320,6 +319,9 @@ proc getExecutionValidity(
if not blck.message.is_execution_block: if not blck.message.is_execution_block:
return NewPayloadStatus.valid # vacuously return NewPayloadStatus.valid # vacuously
if eth1Monitor.isNil:
return NewPayloadStatus.noResponse
try: try:
# Minimize window for Eth1 monitor to shut down connection # Minimize window for Eth1 monitor to shut down connection
await eth1Monitor.ensureDataProvider() await eth1Monitor.ensureDataProvider()
@ -381,8 +383,10 @@ proc storeBlock*(
# `processBlock` (indirectly). `validator_duties` does call `storeBlock` # `processBlock` (indirectly). `validator_duties` does call `storeBlock`
# directly, so is exposed to this, but only cares about whether there is # directly, so is exposed to this, but only cares about whether there is
# an error or not. # an error or not.
return err(( if self[].consensusManager.eth1Monitor.isNil:
VerifierError.MissingParent, ProcessingStatus.notCompleted)) warn "Attempting to process execution payload without execution client. Ensure --web3-url setting is correct and JWT is configured."
return err((VerifierError.MissingParent, ProcessingStatus.notCompleted))
# Client software MUST validate blockHash value as being equivalent to # Client software MUST validate blockHash value as being equivalent to
# Keccak256(RLP(ExecutionBlockHeader)) # Keccak256(RLP(ExecutionBlockHeader))

View File

@ -277,10 +277,7 @@ proc initFullNode(
dag.backfill.slot dag.backfill.slot
func getFrontfillSlot(): Slot = func getFrontfillSlot(): Slot =
if dag.frontfill.isSome(): max(dag.frontfill.get(BlockId()).slot, dag.horizon)
dag.frontfill.get().slot
else:
GENESIS_SLOT
let let
quarantine = newClone( quarantine = newClone(
@ -451,10 +448,7 @@ proc init*(T: type BeaconNode,
let optJwtSecret = rng[].loadJwtSecret(config, allowCreate = false) let optJwtSecret = rng[].loadJwtSecret(config, allowCreate = false)
if config.web3Urls.len() == 0: if config.web3Urls.len() == 0:
if cfg.BELLATRIX_FORK_EPOCH == FAR_FUTURE_EPOCH: notice "Running without execution client - validator features disabled (see https://nimbus.guide/eth1.html)"
notice "Running without execution client - validator features partially disabled (see https://nimbus.guide/eth1.html)"
else:
notice "Running without execution client - validator features disabled (see https://nimbus.guide/eth1.html)"
var eth1Monitor: Eth1Monitor var eth1Monitor: Eth1Monitor
@ -1633,8 +1627,6 @@ proc start*(node: BeaconNode) {.raises: [Defect, CatchableError].} =
if node.eth1Monitor != nil: if node.eth1Monitor != nil:
node.eth1Monitor.start() node.eth1Monitor.start()
else:
notice "Running without execution chain monitor, block producation partially disabled"
node.run() node.run()

View File

@ -183,10 +183,19 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
return return
proc remainingSlots(man: SyncManager): uint64 = proc remainingSlots(man: SyncManager): uint64 =
let
first = man.getFirstSlot()
last = man.getLastSlot()
if man.direction == SyncQueueKind.Forward: if man.direction == SyncQueueKind.Forward:
man.getLastSlot() - man.getFirstSlot() if last > first:
man.getLastSlot() - man.getFirstSlot()
else:
0'u64
else: else:
man.getFirstSlot() - man.getLastSlot() if first > last:
man.getFirstSlot() - man.getLastSlot()
else:
0'u64
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
logScope: logScope:
@ -297,9 +306,8 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
peer.updateScore(PeerScoreUseless) peer.updateScore(PeerScoreUseless)
return return
if man.direction == SyncQueueKind.Forward: # Wall clock keeps ticking, so we need to update the queue
# Wall clock keeps ticking, so we need to update the queue man.queue.updateLastSlot(man.getLastSlot())
man.queue.updateLastSlot(man.getLastSlot())
man.workers[index].status = SyncWorkerStatus.Requesting man.workers[index].status = SyncWorkerStatus.Requesting
let req = man.queue.pop(peerSlot, peer) let req = man.queue.pop(peerSlot, peer)
@ -575,15 +583,27 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
progress = progress =
case man.queue.kind case man.queue.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
man.queue.outSlot - pivot if man.queue.outSlot >= pivot:
man.queue.outSlot - pivot
else:
0'u64
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
pivot - man.queue.outSlot if pivot >= man.queue.outSlot:
pivot - man.queue.outSlot
else:
0'u64
total = total =
case man.queue.kind case man.queue.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
man.queue.finalSlot + 1'u64 - pivot if man.queue.finalSlot >= pivot:
man.queue.finalSlot + 1'u64 - pivot
else:
0'u64
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
pivot + 1'u64 - man.queue.finalSlot if pivot >= man.queue.finalSlot:
pivot + 1'u64 - man.queue.finalSlot
else:
0'u64
remaining = total - progress remaining = total - progress
done = done =
if total > 0: if total > 0:

View File

@ -281,17 +281,7 @@ proc makePending*[T](sq: SyncQueue[T], req: var SyncRequest[T]) =
proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} = proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
## Update last slot stored in queue ``sq`` with value ``last``. ## Update last slot stored in queue ``sq`` with value ``last``.
case sq.kind sq.finalSlot = last
of SyncQueueKind.Forward:
doAssert(sq.finalSlot <= last,
"Last slot could not be lower then stored one " &
$sq.finalSlot & " <= " & $last)
sq.finalSlot = last
of SyncQueueKind.Backward:
doAssert(sq.finalSlot >= last,
"Last slot could not be higher then stored one " &
$sq.finalSlot & " >= " & $last)
sq.finalSlot = last
proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) = proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) =
## Wakeup one or all blocked waiters. ## Wakeup one or all blocked waiters.
@ -793,7 +783,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
if safeSlot > failSlot: if safeSlot > failSlot:
let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) let rewindSlot = sq.getRewindPoint(failSlot, safeSlot)
# It's quite common peers give us fewer blocks than we ask for # It's quite common peers give us fewer blocks than we ask for
info "Gap in block range response, rewinding", request = req, debug "Gap in block range response, rewinding", request = req,
rewind_to_slot = rewindSlot, rewind_fail_slot = failSlot, rewind_to_slot = rewindSlot, rewind_fail_slot = failSlot,
finalized_slot = safeSlot, blocks_count = len(item.data), finalized_slot = safeSlot, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data) blocks_map = getShortMap(req, item.data)
@ -974,22 +964,31 @@ proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} =
## Returns number of slots left in queue ``sq``. ## Returns number of slots left in queue ``sq``.
case sq.kind case sq.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
sq.finalSlot + 1'u64 - sq.outSlot if sq.finalSlot >= sq.outSlot:
sq.finalSlot + 1'u64 - sq.outSlot
else:
0'u64
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
sq.outSlot + 1'u64 - sq.finalSlot if sq.outSlot >= sq.finalSlot:
sq.outSlot + 1'u64 - sq.finalSlot
else:
0'u64
proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} = proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} =
## Returns total number of slots in queue ``sq``. ## Returns total number of slots in queue ``sq``.
case sq.kind case sq.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
sq.finalSlot + 1'u64 - sq.startSlot if sq.finalSlot >= sq.startSlot:
sq.finalSlot + 1'u64 - sq.startSlot
else:
0'u64
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
sq.startSlot + 1'u64 - sq.finalSlot if sq.startSlot >= sq.finalSlot:
sq.startSlot + 1'u64 - sq.finalSlot
else:
0'u64
proc progress*[T](sq: SyncQueue[T]): uint64 = proc progress*[T](sq: SyncQueue[T]): uint64 =
## How many slots we've synced so far ## How many useful slots we've synced so far, adjusting for how much has
case sq.kind ## become obsolete by time movements
of SyncQueueKind.Forward: sq.total - sq.len
sq.outSlot - sq.startSlot
of SyncQueueKind.Backward:
sq.startSlot - sq.outSlot

View File

@ -209,18 +209,18 @@ proc doTrustedNodeSync*(
validatorMonitor = newClone(ValidatorMonitor.init(false, false)) validatorMonitor = newClone(ValidatorMonitor.init(false, false))
dag = ChainDAGRef.init(cfg, db, validatorMonitor, {}, eraPath = eraDir) dag = ChainDAGRef.init(cfg, db, validatorMonitor, {}, eraPath = eraDir)
backfillSlot = dag.backfill.slot backfillSlot = dag.backfill.slot
frontfill = dag.frontfill.valueOr(BlockId()) horizon = max(dag.horizon, dag.frontfill.valueOr(BlockId()).slot)
let canReindex = if backfillSlot <= frontfill.slot: let canReindex = if backfillSlot <= horizon:
info "Database backfilled" info "Database backfilled", backfill = dag.backfill, horizon
true true
elif backfill: elif backfill:
# +1 because we need to download the frontfill slot for the frontfill match # +1 because we need to download the frontfill slot for the frontfill match
# detection to kick in, in addBackfillBlock # detection to kick in, in addBackfillBlock
let missingSlots = dag.backfill.slot - frontfill.slot + 1 let missingSlots = dag.backfill.slot - horizon + 1
notice "Downloading historical blocks - you can interrupt this process at any time and it automatically be completed when you start the beacon node", notice "Downloading historical blocks - you can interrupt this process at any time and it automatically be completed when you start the beacon node",
backfillSlot, frontfill, missingSlots backfillSlot, horizon, missingSlots
var # Same averaging as SyncManager var # Same averaging as SyncManager
syncCount = 0 syncCount = 0
@ -260,7 +260,7 @@ proc doTrustedNodeSync*(
syncCount += 1 syncCount += 1
let let
remaining = dag.backfill.slot - frontfill.slot remaining = dag.backfill.slot - horizon
slotsPerSec = speed(stamp, newStamp) slotsPerSec = speed(stamp, newStamp)
avgSyncSpeed = avgSyncSpeed + (slotsPerSec - avgSyncSpeed) / float(syncCount) avgSyncSpeed = avgSyncSpeed + (slotsPerSec - avgSyncSpeed) / float(syncCount)
@ -314,9 +314,9 @@ proc doTrustedNodeSync*(
notice "Backfilling incomplete - blocks will be downloaded when starting the node", msg = exc.msg notice "Backfilling incomplete - blocks will be downloaded when starting the node", msg = exc.msg
false false
else: else:
let missingSlots = dag.backfill.slot - frontfill.slot let missingSlots = dag.backfill.slot - horizon
notice "Database initialized, historical blocks will be backfilled when starting the node", notice "Database initialized, historical blocks will be backfilled when starting the node",
missingSlots missingSlots, backfill = dag.backfill, horizon
false false