Handle access to historical data for which there is no state (#3217)

With checkpoint sync in particular, and state pruning in the future,
loading states or state-dependent data may fail. This PR adjusts the
code to allow this to be handled gracefully.

In particular, the new availability assumption is that states are always
available for the finalized checkpoint and newer, but may fail for
anything older.

The `tail` remains the point where state loading de-facto fails, meaning
that between the tail and the finalized checkpoint, we can still get
historical data (but code should be prepared to handle this as an
error).

However, to harden the code against long replays, several operations
which are assumed to work only with non-final data (such as gossip
verification and validator duties) now limit their search horizon to
post-finalized data.

* harden several state-dependent operations by logging an error instead
of introducing a panic when state loading fails
* `withState` -> `withUpdatedState` to differentiate from the other
`withState`
* `updateStateData` can now fail if no state is found in database - it
is also hardened against excessively long replays
* `getEpochRef` can now fail when replay fails
* reject blocks with invalid target root - they would be ignored
previously
* fix recursion bug in `isProposed`
This commit is contained in:
Jacek Sieka 2022-01-05 19:38:04 +01:00 committed by GitHub
parent 7a0119ac32
commit 0a4728a241
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 474 additions and 239 deletions

View File

@ -134,7 +134,9 @@ proc init*(T: type AttestationPool, dag: ChainDAGRef,
epochRef.current_justified_checkpoint,
epochRef.finalized_checkpoint)
else:
epochRef = dag.getEpochRef(blckRef, blckRef.slot.epoch)
epochRef = dag.getEpochRef(blckRef, blckRef.slot.epoch, false).expect(
"Getting an EpochRef should always work for non-finalized blocks")
withBlck(dag.get(blckRef).data):
forkChoice.process_block(
dag, epochRef, blckRef, blck.message, blckRef.slot.toBeaconTime)

View File

@ -144,10 +144,14 @@ proc advanceClearanceState*(dag: ChainDAGRef) =
let startTick = Moment.now()
var cache = StateCache()
updateStateData(dag, dag.clearanceState, next, true, cache)
debug "Prepared clearance state for next block",
next, updateStateDur = Moment.now() - startTick
if not updateStateData(dag, dag.clearanceState, next, true, cache):
# The next head update will likely fail - something is very wrong here
error "Cannot advance to next slot, database corrupt?",
clearance = shortLog(dag.clearanceState.blck),
next = shortLog(next)
else:
debug "Prepared clearance state for next block",
next, updateStateDur = Moment.now() - startTick
proc addHeadBlock*(
dag: ChainDAGRef, verifier: var BatchVerifier,
@ -229,8 +233,17 @@ proc addHeadBlock*(
# by the time a new block reaches this point, the parent block will already
# have "established" itself in the network to some degree at least.
var cache = StateCache()
updateStateData(
dag, dag.clearanceState, parent.atSlot(signedBlock.message.slot), true, cache)
if not updateStateData(
dag, dag.clearanceState, parent.atSlot(signedBlock.message.slot), true,
cache):
# We should never end up here - the parent must be a block no older than and
# rooted in the finalized checkpoint, hence we should always be able to
# load its corresponding state
error "Unable to load clearance state for parent block, database corrupt?",
parent = shortLog(parent.atSlot(signedBlock.message.slot)),
clearance = shortLog(dag.clearanceState.blck)
return err(BlockError.MissingParent)
let stateDataTick = Moment.now()
# First, batch-verify all signatures in block

View File

@ -46,13 +46,19 @@ declareGauge beacon_processed_deposits_total, "Number of total deposits included
logScope: topics = "chaindag"
const
# When finality happens, we prune historical states from the database except
# for a snapshort every 32 epochs from which replays can happen - there's a
# balance here between making long replays and saving on disk space
EPOCHS_PER_STATE_SNAPSHOT = 32
proc putBlock*(
dag: ChainDAGRef, signedBlock: ForkyTrustedSignedBeaconBlock) =
dag.db.putBlock(signedBlock)
proc updateStateData*(
dag: ChainDAGRef, state: var StateData, bs: BlockSlot, save: bool,
cache: var StateCache) {.gcsafe.}
cache: var StateCache): bool {.gcsafe.}
template withStateVars*(
stateDataInternal: var StateData, body: untyped): untyped =
@ -66,20 +72,19 @@ template withStateVars*(
body
template withState*(
template withUpdatedState*(
dag: ChainDAGRef, stateData: var StateData, blockSlot: BlockSlot,
body: untyped): untyped =
okBody: untyped, failureBody: untyped): untyped =
## Helper template that updates stateData to a particular BlockSlot - usage of
## stateData is unsafe outside of block.
## TODO async transformations will lead to a race where stateData gets updated
## while waiting for future to complete - catch this here somehow?
## stateData is unsafe outside of block, or across `await` boundaries
block:
var cache {.inject.} = StateCache()
updateStateData(dag, stateData, blockSlot, false, cache)
withStateVars(stateData):
body
if updateStateData(dag, stateData, blockSlot, false, cache):
withStateVars(stateData):
okBody
else:
failureBody
func get_effective_balances(validators: openArray[Validator], epoch: Epoch):
seq[Gwei] =
@ -142,6 +147,8 @@ func init*(
)
epochStart = epoch.compute_start_slot_at_epoch()
doAssert epochRef.key.blck != nil, "epochAncestor should not fail for state block"
for i in 0'u64..<SLOTS_PER_EPOCH:
epochRef.beacon_proposers[i] = get_beacon_proposer_index(
state.data, cache, epochStart + i)
@ -236,22 +243,26 @@ func epochAncestor*(blck: BlockRef, epoch: Epoch): EpochKey =
while blck.slot.epoch >= epoch and not blck.parent.isNil:
blck = blck.parent
EpochKey(epoch: epoch, blck: blck)
if blck.slot.epoch > epoch:
EpochKey() # The searched-for epoch predates our tail block
else:
EpochKey(epoch: epoch, blck: blck)
func findEpochRef*(
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch): EpochRef = # may return nil!
if epoch < dag.tail.slot.epoch:
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch): Opt[EpochRef] =
# Look for an existing EpochRef in the cache
let ancestor = epochAncestor(blck, epoch)
if isNil(ancestor.blck):
# We can't compute EpochRef instances for states before the tail because
# we do not have them!
return
let ancestor = epochAncestor(blck, epoch)
doAssert ancestor.blck != nil
return err()
for i in 0..<dag.epochRefs.len:
if dag.epochRefs[i] != nil and dag.epochRefs[i].key == ancestor:
return dag.epochRefs[i]
return ok(dag.epochRefs[i])
return nil
err()
func loadStateCache(
dag: ChainDAGRef, cache: var StateCache, blck: BlockRef, epoch: Epoch) =
@ -260,15 +271,17 @@ func loadStateCache(
# functions
template load(e: Epoch) =
if e notin cache.shuffled_active_validator_indices:
let epochRef = dag.findEpochRef(blck, e)
if epochRef != nil:
cache.shuffled_active_validator_indices[epochRef.epoch] =
epochRef.shuffled_active_validator_indices
block:
let epoch = e
if epoch notin cache.shuffled_active_validator_indices:
let epochRef = dag.findEpochRef(blck, epoch)
if epochRef.isSome():
cache.shuffled_active_validator_indices[epoch] =
epochRef[].shuffled_active_validator_indices
for i, idx in epochRef.beacon_proposers:
cache.beacon_proposer_indices[
epochRef.epoch.compute_start_slot_at_epoch + i] = idx
for i, idx in epochRef[].beacon_proposers:
cache.beacon_proposer_indices[
epoch.compute_start_slot_at_epoch + i] = idx
load(epoch)
@ -444,7 +457,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
cur = cur.parentOrSlot()
if tmpState.blck == nil:
warn "No state found in head history, database corrupt?",
fatal "No state found in head history, database corrupt?",
genesisRef, tailRef, headRef, tailRoot, headRoot,
blocks = blocks.len()
# TODO Potentially we could recover from here instead of crashing - what
@ -520,7 +533,11 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
doAssert dag.updateFlags in [{}, {verifyFinalization}]
var cache: StateCache
dag.updateStateData(dag.headState, headRef.atSlot(), false, cache)
if not dag.updateStateData(dag.headState, headRef.atSlot(), false, cache):
fatal "Unable to load head state, database corrupt?",
head = shortLog(headRef)
quit 1
# The tail block is "implicitly" finalized as it was given either as a
# checkpoint block, or is the genesis, thus we use it as a lower bound when
@ -568,13 +585,15 @@ template genesisValidatorsRoot*(dag: ChainDAGRef): Eth2Digest =
func getEpochRef*(
dag: ChainDAGRef, state: StateData, cache: var StateCache): EpochRef =
## Get a cached `EpochRef` or construct one based on the given state - always
## returns an EpochRef instance
let
blck = state.blck
epoch = state.data.get_current_epoch()
var epochRef = dag.findEpochRef(blck, epoch)
if epochRef == nil:
epochRef = EpochRef.init(dag, state, cache)
if epochRef.isErr:
let res = EpochRef.init(dag, state, cache)
if epoch >= dag.finalizedHead.slot.epoch():
# Only cache epoch information for unfinalized blocks - earlier states
@ -594,13 +613,34 @@ func getEpochRef*(
if candidate.key.epoch < dag.epochRefs[oldest].epoch:
oldest = x
dag.epochRefs[oldest] = epochRef
dag.epochRefs[oldest] = res
res
else:
epochRef.get()
epochRef
proc getEpochRef*(
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch,
preFinalized: bool): Opt[EpochRef] =
## Return a cached EpochRef or construct one from the database, if possible -
## returns `none` on failure.
##
## When `preFinalized` is true, include epochs from before the finalized
## checkpoint in the search - this potentially can result in long processing
## times due to state replays.
##
## Requests for epochs >= dag.finalizedHead.slot.epoch always return an
## instance. One must be careful to avoid race conditions in `async` code
## where the finalized head might change during an `await`.
##
## Requests for epochs < dag.finalizedHead.slot.epoch may fail, either because
## the search was limited by the `preFinalized` flag or because state history
## has been pruned - none will be returned in this case.
if not preFinalized and epoch < dag.finalizedHead.slot.epoch:
return err()
proc getEpochRef*(dag: ChainDAGRef, blck: BlockRef, epoch: Epoch): EpochRef =
let epochRef = dag.findEpochRef(blck, epoch)
if epochRef != nil:
if epochRef.isOk():
beacon_state_data_cache_hits.inc
return epochRef
@ -608,14 +648,19 @@ proc getEpochRef*(dag: ChainDAGRef, blck: BlockRef, epoch: Epoch): EpochRef =
let
ancestor = epochAncestor(blck, epoch)
if isNil(ancestor.blck): # past the tail
return err()
dag.epochRefState.blck = BlockRef()
dag.withState(
dag.epochRefState, ancestor.blck.atEpochStart(ancestor.epoch)):
dag.getEpochRef(stateData, cache)
dag.withUpdatedState(
dag.epochRefState, ancestor.blck.atEpochStart(ancestor.epoch)) do:
ok(dag.getEpochRef(stateData, cache))
do:
err()
proc getFinalizedEpochRef*(dag: ChainDAGRef): EpochRef =
dag.getEpochRef(dag.finalizedHead.blck, dag.finalizedHead.slot.epoch)
dag.getEpochRef(
dag.finalizedHead.blck, dag.finalizedHead.slot.epoch, false).expect(
"getEpochRef for finalized head should always succeed")
func stateCheckpoint*(bs: BlockSlot): BlockSlot =
## The first ancestor BlockSlot that is a state checkpoint
@ -842,25 +887,30 @@ proc applyBlock(
proc updateStateData*(
dag: ChainDAGRef, state: var StateData, bs: BlockSlot, save: bool,
cache: var StateCache) =
cache: var StateCache): bool =
## Rewind or advance state such that it matches the given block and slot -
## this may include replaying from an earlier snapshot if blck is on a
## different branch or has advanced to a higher slot number than slot
## If slot is higher than blck.slot, replay will fill in with empty/non-block
## slots, else it is ignored
## If `bs.slot` is higher than `bs.blck.slot`, `updateStateData` will fill in
## with empty/non-block slots
# First, see if we're already at the requested block. If we are, also check
# that the state has not been advanced past the desired block - if it has,
# an earlier state must be loaded since there's no way to undo the slot
# transitions
if isNil(bs.blck):
info "Requesting state for unknown block, historical data not available?",
head = shortLog(dag.head), tail = shortLog(dag.tail)
return false
let
startTick = Moment.now()
current {.used.} = state.blck.atSlot(getStateField(state.data, slot))
var
ancestors: seq[BlockRef]
cur = bs
found = false
template exactMatch(state: StateData, bs: BlockSlot): bool =
@ -876,61 +926,60 @@ proc updateStateData*(
# Fast path: check all caches for an exact match - this is faster than
# advancing a state where there's epoch processing to do, by a wide margin -
# it also avoids `hash_tree_root` for slot processing
if exactMatch(state, cur):
if exactMatch(state, bs):
found = true
elif not save:
# When required to save states, we cannot rely on the caches because that
# would skip the extra processing that save does - not all information that
# goes into the database is cached
if exactMatch(dag.headState, cur):
if exactMatch(dag.headState, bs):
assign(state, dag.headState)
found = true
elif exactMatch(dag.clearanceState, cur):
elif exactMatch(dag.clearanceState, bs):
assign(state, dag.clearanceState)
found = true
elif exactMatch(dag.epochRefState, cur):
elif exactMatch(dag.epochRefState, bs):
assign(state, dag.epochRefState)
found = true
# First, run a quick check if we can simply apply a few blocks to an in-memory
# state - any in-memory state will be faster than loading from database.
# The limit here how many blocks we apply is somewhat arbitrary but two full
# epochs (might be more slots if there are skips) seems like a good enough
# first guess.
# This happens in particular during startup where we replay blocks
# sequentially to grab their votes.
const RewindBlockThreshold = 64
while not found and ancestors.len < RewindBlockThreshold:
if canAdvance(state, cur): # Typical case / fast path when there's no reorg
found = true
break
if not save: # See above
if canAdvance(dag.headState, cur):
assign(state, dag.headState)
if not found:
# No exact match found - see if any in-memory state can be used as a base
# onto which we can apply a few blocks - there's a tradeoff here between
# loading the state from disk and performing the block applications
var cur = bs
while ancestors.len < RewindBlockThreshold:
if isNil(cur.blck): # tail reached
break
if canAdvance(state, cur): # Typical case / fast path when there's no reorg
found = true
break
if canAdvance(dag.clearanceState, cur):
assign(state, dag.clearanceState)
found = true
break
if not save: # see above
if canAdvance(dag.headState, cur):
assign(state, dag.headState)
found = true
break
if canAdvance(dag.epochRefState, cur):
assign(state, dag.epochRefState)
found = true
break
if canAdvance(dag.clearanceState, cur):
assign(state, dag.clearanceState)
found = true
break
if cur.slot == cur.blck.slot:
# This is not an empty slot, so the block will need to be applied to
# eventually reach bs
ancestors.add(cur.blck)
if canAdvance(dag.epochRefState, cur):
assign(state, dag.epochRefState)
found = true
break
if cur.blck.parent == nil:
break
if cur.isProposed():
# This is not an empty slot, so the block will need to be applied to
# eventually reach bs
ancestors.add(cur.blck)
# Moving slot by slot helps find states that were advanced with empty slots
cur = cur.parentOrSlot()
# Move slot by slot to capture epoch boundary states
cur = cur.parentOrSlot()
if not found:
debug "UpdateStateData cache miss",
@ -940,29 +989,34 @@ proc updateStateData*(
# We'll now resort to loading the state from the database then reapplying
# blocks until we reach the desired point in time.
cur = bs
var cur = bs
ancestors.setLen(0)
# Look for a state in the database and load it - as long as it cannot be
# found, keep track of the blocks that are needed to reach it from the
# state that eventually will be found. Besides finding the state in the
# database we may also reach the input state provided to the function.
# It can also act as a viable starting point for the block replay later.
# state that eventually will be found.
# If we hit the tail, it means that we've reached a point for which we can
# no longer recreate history - this happens for example when starting from
# a checkpoint block
let startEpoch = bs.slot.epoch
while not canAdvance(state, cur) and not dag.getState(state, cur):
# There's no state saved for this particular BlockSlot combination, keep
# looking...
if cur.slot == cur.blck.slot:
# There's no state saved for this particular BlockSlot combination, and
# the state we have can't trivially be advanced (in case it was older than
# RewindBlockThreshold), keep looking..
if cur.isProposed():
# This is not an empty slot, so the block will need to be applied to
# eventually reach bs
ancestors.add(cur.blck)
if cur.slot == dag.tail.slot:
# If we've walked all the way to the tail and still not found a state,
# there's no hope finding one - the database likely has become corrupt
# and one will have to resync from start.
fatal "Cannot find state to load, the database is likely corrupt",
cur, bs, head = dag.head, tail = dag.tail
quit 1
if cur.slot == dag.tail.slot or
(cur.slot.epoch + EPOCHS_PER_STATE_SNAPSHOT * 2 < startEpoch):
# We've either walked two full state snapshot lengths or hit the tail
# and still can't find a matching state: this can happen when
# starting the node from an arbitrary finalized checkpoint and not
# backfilling the states
notice "Request for pruned historical state",
request = shortLog(bs), tail = shortLog(dag.tail), cur = shortLog(cur)
return false
# Move slot by slot to capture epoch boundary states
cur = cur.parentOrSlot()
@ -1038,6 +1092,8 @@ proc updateStateData*(
assignDur,
replayDur
true
proc delState(dag: ChainDAGRef, bs: BlockSlot) =
# Delete state state and mapping for a particular block+slot
if not isStateCheckpoint(bs):
@ -1074,7 +1130,7 @@ proc pruneBlocksDAG(dag: ChainDAGRef) =
while not cur.blck.isAncestorOf(dag.finalizedHead.blck):
dag.delState(cur) # TODO: should we move that disk I/O to `onSlotEnd`
if cur.blck.slot == cur.slot:
if cur.isProposed():
dag.blocks.excl(KeyedBlockRef.init(cur.blck))
dag.db.delBlock(cur.blck.root)
@ -1196,12 +1252,8 @@ proc pruneStateCachesDAG*(dag: ChainDAGRef) =
cur = dag.finalizedHead.stateCheckpoint.parentOrSlot
prev = dag.lastPrunePoint.stateCheckpoint.parentOrSlot
while cur.blck != nil and cur != prev:
# TODO This is a quick fix to prune some states from the database, but
# not all, pending a smarter storage - the downside of pruning these
# states is that certain rewinds will take longer
# After long periods of non-finalization, it can also take some time to
# release all these states!
if cur.slot.epoch mod 32 != 0 and cur.slot != dag.tail.slot:
if cur.slot.epoch mod EPOCHS_PER_STATE_SNAPSHOT != 0 and
cur.slot != dag.tail.slot:
dag.delState(cur)
cur = cur.parentOrSlot
let statePruneTick = Moment.now()
@ -1248,8 +1300,15 @@ proc updateHead*(
# Start off by making sure we have the right state - updateStateData will try
# to use existing in-memory states to make this smooth
var cache: StateCache
updateStateData(
dag, dag.headState, newHead.atSlot(), false, cache)
if not updateStateData(
dag, dag.headState, newHead.atSlot(), false, cache):
# Advancing the head state should never fail, given that the tail is
# implicitly finalised, the head is an ancestor of the tail and we always
# store the tail state in the database, as well as every epoch slot state in
# between
fatal "Unable to load head state during head update, database corrupt?",
lastHead = shortLog(lastHead)
quit 1
dag.db.putHeadBlock(newHead.root)
@ -1360,8 +1419,10 @@ proc updateHead*(
dag.headState.data, previous_justified_checkpoint).root.toGaugeValue)
let
epochRef = getEpochRef(dag, newHead, newHead.slot.epoch)
number_of_active_validators = epochRef.shuffled_active_validator_indices.lenu64().toGaugeValue
epochRef = getEpochRef(dag, dag.headState, cache)
number_of_active_validators =
epochRef.shuffled_active_validator_indices.lenu64().toGaugeValue
beacon_active_validators.set(number_of_active_validators)
beacon_current_active_validators.set(number_of_active_validators)
@ -1526,7 +1587,11 @@ proc preInit*(
proc getProposer*(
dag: ChainDAGRef, head: BlockRef, slot: Slot): Option[ValidatorIndex] =
let
epochRef = dag.getEpochRef(head, slot.compute_epoch_at_slot())
epochRef = block:
let tmp = dag.getEpochRef(head, slot.compute_epoch_at_slot(), false)
if tmp.isErr():
return none(ValidatorIndex)
tmp.get()
slotInEpoch = slot - slot.compute_epoch_at_slot().compute_start_slot_at_epoch()
let proposer = epochRef.beacon_proposers[slotInEpoch]

View File

@ -120,13 +120,21 @@ proc on_tick(self: var Checkpoints, dag: ChainDAGRef, time: BeaconTime):
return err ForkChoiceError(
kind: fcJustifiedNodeUnknown,
blockRoot: self.best_justified.root)
let ancestor = blck.atEpochStart(self.finalized.epoch)
if ancestor.blck.root == self.finalized.root:
let epochRef = dag.getEpochRef(blck, self.best_justified.epoch)
self.justified = BalanceCheckpoint(
checkpoint: Checkpoint(root: blck.root, epoch: epochRef.epoch),
balances: epochRef.effective_balances)
let epochRef = dag.getEpochRef(blck, self.best_justified.epoch, false)
if epochRef.isSome():
self.justified = BalanceCheckpoint(
checkpoint: Checkpoint(root: blck.root, epoch: epochRef[].epoch),
balances: epochRef[].effective_balances)
else:
# Shouldn't happen for justified data unless fork choice is out of sync
# with ChainDAG
warn "No `EpochRef` for justified epoch, skipping update - report bug",
justified = shortLog(self.justified.checkpoint),
best = shortLog(self.best_justified.epoch),
blck = shortLog(blck)
ok()
func process_attestation_queue(self: var ForkChoice) {.gcsafe.}
@ -266,15 +274,23 @@ proc process_state(self: var Checkpoints,
if ? should_update_justified_checkpoint(self, dag, epochRef):
let
justifiedBlck = blck.atEpochStart(state_justified_epoch)
justifiedEpochRef = dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)
self.justified =
BalanceCheckpoint(
checkpoint: Checkpoint(
root: justifiedBlck.blck.root,
epoch: justifiedEpochRef.epoch
),
balances: justifiedEpochRef.effective_balances)
justifiedEpochRef = dag.getEpochRef(
justifiedBlck.blck, state_justified_epoch, false)
if justifiedEpochRef.isOk():
self.justified =
BalanceCheckpoint(
checkpoint: Checkpoint(
root: justifiedBlck.blck.root,
epoch: state_justified_epoch
),
balances: justifiedEpochRef[].effective_balances)
else:
# Shouldn't happen, unless fork choice is out of sync with ChainDAG
warn "Skipping justified checkpoint update, no EpochRef - report bug",
epoch = epochRef.epoch,
justifiedBlck = shortLog(justifiedBlck),
state_justified = shortLog(epochRef.current_justified_checkpoint),
state_finalized = shortLog(epochRef.finalized_checkpoint)
if state_finalized_epoch > self.finalized.epoch:
self.finalized = epochRef.finalized_checkpoint
@ -288,15 +304,22 @@ proc process_state(self: var Checkpoints,
let
justifiedBlck = blck.atEpochStart(state_justified_epoch)
justifiedEpochRef = dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)
self.justified =
BalanceCheckpoint(
checkpoint: Checkpoint(
root: justifiedBlck.blck.root,
epoch: justifiedEpochRef.epoch
),
balances: justifiedEpochRef.effective_balances)
justifiedEpochRef = dag.getEpochRef(
justifiedBlck.blck, state_justified_epoch, false)
if justifiedEpochRef.isOk():
self.justified =
BalanceCheckpoint(
checkpoint: Checkpoint(
root: justifiedBlck.blck.root,
epoch: justifiedEpochRef[].epoch
),
balances: justifiedEpochRef[].effective_balances)
else:
warn "Skipping justified checkpoint update, no EpochRef - report bug",
epoch = epochRef.epoch,
justifiedBlck = shortLog(justifiedBlck),
state_justified = shortLog(epochRef.current_justified_checkpoint),
state_finalized = shortLog(epochRef.finalized_checkpoint)
ok()
func process_block*(self: var ForkChoiceBackend,

View File

@ -46,6 +46,7 @@ type
fcInconsistentTick
fcUnknownParent
fcPruningFromOutdatedFinalizedRoot
fcInvalidEpochRef
Index* = int
Delta* = int64
@ -56,7 +57,7 @@ type
of fcFinalizedNodeUnknown,
fcJustifiedNodeUnknown:
blockRoot*: Eth2Digest
of fcInconsistentTick:
of fcInconsistentTick, fcInvalidEpochRef:
discard
of fcInvalidNodeIndex,
fcInvalidJustifiedIndex,

View File

@ -258,10 +258,16 @@ proc checkForPotentialDoppelganger(
if attestation.data.slot.epoch <
self.doppelgangerDetection.broadcastStartEpoch:
let tgtBlck = self.dag.getRef(attestation.data.target.root)
doAssert not tgtBlck.isNil # because attestation is valid above
# We expect the EpochRef not to have been pruned between validating the
# attestation and checking for doppelgangers - this may need to be revisited
# when online pruning of states is implemented
let epochRef = self.dag.getEpochRef(
tgtBlck, attestation.data.target.epoch)
tgtBlck, attestation.data.target.epoch, true).expect(
"Target block EpochRef must be valid")
for validatorIndex in attesterIndices:
let validatorPubkey = epochRef.validatorKey(validatorIndex).get().toPubKey()
if self.doppelgangerDetectionEnabled and

View File

@ -103,7 +103,7 @@ func check_propagation_slot_range(
func check_beacon_and_target_block(
pool: var AttestationPool, data: AttestationData):
Result[BlockRef, ValidationError] =
Result[BlockSlot, ValidationError] =
# The block being voted for (data.beacon_block_root) passes validation - by
# extension, the target block must at that point also pass validation.
# The target block is returned.
@ -122,13 +122,18 @@ func check_beacon_and_target_block(
# attestation.data.beacon_block_root,
# compute_start_slot_at_epoch(attestation.data.target.epoch)) ==
# attestation.data.target.root
# the sanity of target.epoch has been checked by check_attestation_slot_target
let
target = get_ancestor(
blck, compute_start_slot_at_epoch(data.target.epoch), SLOTS_PER_EPOCH.int)
target = blck.atSlot(compute_start_slot_at_epoch(data.target.epoch))
if not (target.root == data.target.root):
return errIgnore(
"Attestation's target block not an ancestor of LMD vote block")
if isNil(target.blck):
# Shouldn't happen - we've checked that the target epoch is within range
# already
return errReject("Attestation target block not found")
if not (target.blck.root == data.target.root):
return errReject(
"Attestation target block not the correct ancestor of LMD vote block")
ok(target)
@ -190,8 +195,10 @@ template validateBeaconBlockBellatrix(
if signed_beacon_block.message.body.execution_payload !=
default(ExecutionPayload):
true
elif dag.getEpochRef(parent_ref, parent_ref.slot.epoch).merge_transition_complete:
# Should usually be inexpensive, but could require cache refilling
elif dag.getEpochRef(parent_ref, parent_ref.slot.epoch, true).expect(
"parent EpochRef doesn't fail").merge_transition_complete:
# Should usually be inexpensive, but could require cache refilling - the
# parent block can be no older than the latest finalized block
true
else:
# Somewhat more expensive fallback, with database I/O, but should be
@ -284,7 +291,8 @@ proc validateBeaconBlock*(
let
slotBlock = getBlockBySlot(dag, signed_beacon_block.message.slot)
if slotBlock.slot == signed_beacon_block.message.slot:
if slotBlock.isProposed() and
slotBlock.blck.slot == signed_beacon_block.message.slot:
let blck = dag.get(slotBlock.blck).data
if getForkedBlockField(blck, proposer_index) ==
signed_beacon_block.message.proposer_index and
@ -416,7 +424,13 @@ proc validateAttestation*(
# compute_start_slot_at_epoch(store.finalized_checkpoint.epoch)) ==
# store.finalized_checkpoint.root
let
epochRef = pool.dag.getEpochRef(target, attestation.data.target.epoch)
epochRef = block:
let tmp = pool.dag.getEpochRef(target.blck, target.slot.epoch, false)
if isErr(tmp): # shouldn't happen since we verified target
warn "No EpochRef for attestation",
attestation = shortLog(attestation), target = shortLog(target)
return errIgnore("Attestation: no EpochRef")
tmp.get()
# [REJECT] The committee index is within the expected range -- i.e.
# data.index < get_committee_count_per_slot(state, data.target.epoch).
@ -587,7 +601,13 @@ proc validateAggregate*(
# aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot,
# aggregate.data.index, aggregate_and_proof.selection_proof) returns True.
let
epochRef = pool.dag.getEpochRef(target, aggregate.data.target.epoch)
epochRef = block:
let tmp = pool.dag.getEpochRef(target.blck, target.slot.epoch, false)
if tmp.isErr: # shouldn't happen since we verified target
warn "No EpochRef for attestation - report bug",
aggregate = shortLog(aggregate), target = shortLog(target)
return errIgnore("Aggregate: no EpochRef")
tmp.get()
# [REJECT] The committee index is within the expected range -- i.e.
# data.index < get_committee_count_per_slot(state, data.target.epoch).

View File

@ -774,9 +774,11 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
# We "know" the actions for the current and the next epoch
if node.isSynced(head):
node.actionTracker.updateActions(
node.dag.getEpochRef(head, slot.epoch))
node.dag.getEpochRef(head, slot.epoch, false).expect(
"Getting head EpochRef should never fail"))
node.actionTracker.updateActions(
node.dag.getEpochRef(head, slot.epoch + 1))
node.dag.getEpochRef(head, slot.epoch + 1, false).expect(
"Getting head EpochRef should never fail"))
if node.gossipState.card > 0 and targetGossipState.card == 0:
debug "Disabling topic subscriptions",
@ -855,8 +857,9 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# pessimistically with respect to the shuffling - this needs fixing
# at EpochRef level by not mixing balances and shufflings in the same
# place
let epochRef = node.dag.getEpochRef(node.dag.head, slot.epoch + 1)
node.actionTracker.updateActions(epochRef)
node.actionTracker.updateActions(node.dag.getEpochRef(
node.dag.head, slot.epoch + 1, false).expect(
"Getting head EpochRef should never fail"))
let
nextAttestationSlot = node.actionTracker.getNextAttestationSlot(slot)

View File

@ -185,3 +185,5 @@ const
"Bad request format"
InvalidAuthorization* =
"Invalid Authorization Header"
PrunedStateError* =
"Trying to access a pruned historical state"

View File

@ -209,9 +209,11 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) =
return RestApiResponse.jsonError(Http503, BeaconNodeInSyncError)
res.get()
let proposalState = assignClone(node.dag.headState)
node.dag.withState(proposalState[], head.atSlot(wallSlot)):
node.dag.withUpdatedState(proposalState[], head.atSlot(wallSlot)) do:
return RestApiResponse.jsonResponse(
node.getBlockProposalEth1Data(stateData.data))
do:
return RestApiResponse.jsonError(Http400, PrunedStateError)
router.api(MethodGet, "/api/nimbus/v1/debug/chronos/futures") do (
) -> RestApiResponse:

View File

@ -50,7 +50,7 @@ func getCurrentSlot*(node: BeaconNode, slot: Slot):
func getCurrentBlock*(node: BeaconNode, slot: Slot):
Result[BlockRef, cstring] =
let bs = node.dag.getBlockBySlot(? node.getCurrentSlot(slot))
if bs.slot == bs.blck.slot:
if bs.isProposed():
ok(bs.blck)
else:
err("Block not found")
@ -73,13 +73,17 @@ proc getBlockSlot*(node: BeaconNode,
stateIdent: StateIdent): Result[BlockSlot, cstring] =
case stateIdent.kind
of StateQueryKind.Slot:
ok(node.dag.getBlockBySlot(? node.getCurrentSlot(stateIdent.slot)))
let bs = node.dag.getBlockBySlot(? node.getCurrentSlot(stateIdent.slot))
if not isNil(bs.blck):
ok(bs)
else:
err("State for given slot not found, history not available?")
of StateQueryKind.Root:
if stateIdent.root == getStateRoot(node.dag.headState.data):
ok(node.dag.headState.blck.atSlot())
else:
# We don't have a state root -> BlockSlot mapping
err("State not found")
err("State for given root not found")
of StateQueryKind.Named:
case stateIdent.value
of StateIdentType.Head:
@ -179,14 +183,13 @@ template withStateForBlockSlot*(nodeParam: BeaconNode,
else:
assignClone(node.dag.headState)
node.dag.updateStateData(stateToAdvance[], blockSlot, false, cache)
if node.dag.updateStateData(stateToAdvance[], blockSlot, false, cache):
if cachedState == nil and node.stateTtlCache != nil:
# This was not a cached state, we can cache it now
node.stateTtlCache.add(stateToAdvance)
if cachedState == nil and node.stateTtlCache != nil:
# This was not a cached state, we can cache it now
node.stateTtlCache.add(stateToAdvance)
withStateVars(stateToAdvance[]):
body
withStateVars(stateToAdvance[]):
body
proc toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex,
ValidatorIndexError] =

View File

@ -72,7 +72,12 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
let duties =
block:
var res: seq[RestAttesterDuty]
let epochRef = node.dag.getEpochRef(qhead, qepoch)
let epochRef = block:
let tmp = node.dag.getEpochRef(qhead, qepoch, true)
if isErr(tmp):
return RestApiResponse.jsonError(Http400, PrunedStateError)
tmp.get()
let committees_per_slot = get_committee_count_per_slot(epochRef)
for i in 0 ..< SLOTS_PER_EPOCH:
let slot = compute_start_slot_at_epoch(qepoch) + i
@ -125,7 +130,11 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
let duties =
block:
var res: seq[RestProposerDuty]
let epochRef = node.dag.getEpochRef(qhead, qepoch)
let epochRef = block:
let tmp = node.dag.getEpochRef(qhead, qepoch, true)
if isErr(tmp):
return RestApiResponse.jsonError(Http400, PrunedStateError)
tmp.get()
for i, bp in epochRef.beacon_proposers:
if i == 0 and qepoch == 0:
# Fix for https://github.com/status-im/nimbus-eth2/issues/2488
@ -259,7 +268,11 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
# in order to compute the sync committee for the epoch. See the following
# discussion for more details:
# https://github.com/status-im/nimbus-eth2/pull/3133#pullrequestreview-817184693
node.withStateForBlockSlot(node.dag.getBlockBySlot(earliestSlotInQSyncPeriod)):
let bs = node.dag.getBlockBySlot(earliestSlotInQSyncPeriod)
if bs.blck.isNil:
return RestApiResponse.jsonError(Http404, StateNotFoundError)
node.withStateForBlockSlot(bs):
let res = withState(stateData().data):
when stateFork >= BeaconStateFork.Altair:
produceResponse(indexList,
@ -413,7 +426,11 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
if res.isErr():
return RestApiResponse.jsonError(Http503, BeaconNodeInSyncError)
res.get()
let epochRef = node.dag.getEpochRef(qhead, qslot.epoch)
let epochRef = block:
let tmp = node.dag.getEpochRef(qhead, qslot.epoch, true)
if isErr(tmp):
return RestApiResponse.jsonError(Http400, PrunedStateError)
tmp.get()
makeAttestationData(epochRef, qhead.atSlot(qslot), qindex)
return RestApiResponse.jsonResponse(adata)
@ -512,7 +529,12 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
template getAndCacheEpochRef(epochRefVar: var Option[EpochRef],
epoch: Epoch): EpochRef =
if epochRefVar.isNone:
epochRefVar = some node.dag.getEpochRef(head, epoch)
epochRefVar = block:
let epochRef = node.dag.getEpochRef(head, epoch, true)
if isErr(epochRef):
return RestApiResponse.jsonError(Http400, PrunedStateError)
some epochRef.get()
epochRefVar.get
for request in requests:

View File

@ -108,8 +108,10 @@ proc installNimbusApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
head = node.doChecksAndGetCurrentHead(wallSlot)
let proposalState = assignClone(node.dag.headState)
node.dag.withState(proposalState[], head.atSlot(wallSlot)):
node.dag.withUpdatedState(proposalState[], head.atSlot(wallSlot)):
return node.getBlockProposalEth1Data(stateData.data)
do:
raise (ref CatchableError)(msg: "Trying to access pruned state")
rpcServer.rpc("debug_getChronosFutures") do () -> seq[FutureInfo]:
when defined(chronosFutureTracking):

View File

@ -35,8 +35,10 @@ template withStateForStateId*(stateId: string, body: untyped): untyped =
body
else:
let rpcState = assignClone(node.dag.headState)
node.dag.withState(rpcState[], bs):
node.dag.withUpdatedState(rpcState[], bs) do:
body
do:
raise (ref CatchableError)(msg: "Trying to access pruned state")
proc parseRoot*(str: string): Eth2Digest {.raises: [Defect, ValueError].} =
Eth2Digest(data: hexToByteArray[32](str))

View File

@ -58,7 +58,12 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
debug "get_v1_validator_attestation_data", slot = slot
let
head = node.doChecksAndGetCurrentHead(slot)
epochRef = node.dag.getEpochRef(head, slot.epoch)
epochRef = block:
let tmp = node.dag.getEpochRef(head, slot.epoch, true)
if isErr(tmp):
raise (ref CatchableError)(msg: "Trying to access pruned state")
tmp.get()
return makeAttestationData(epochRef, head.atSlot(slot), committee_index)
rpcServer.rpc("get_v1_validator_aggregate_attestation") do (
@ -79,7 +84,13 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
debug "get_v1_validator_duties_attester", epoch = epoch
let
head = node.doChecksAndGetCurrentHead(epoch)
epochRef = node.dag.getEpochRef(head, epoch)
epochRef = block:
let tmp = node.dag.getEpochRef(head, epoch, true)
if isErr(tmp):
raise (ref CatchableError)(msg: "Trying to access pruned state")
tmp.get()
let
committees_per_slot = get_committee_count_per_slot(epochRef)
for i in 0 ..< SLOTS_PER_EPOCH:
let slot = compute_start_slot_at_epoch(epoch) + i
@ -102,7 +113,12 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
debug "get_v1_validator_duties_proposer", epoch = epoch
let
head = node.doChecksAndGetCurrentHead(epoch)
epochRef = node.dag.getEpochRef(head, epoch)
epochRef = block:
let tmp = node.dag.getEpochRef(head, epoch, true)
if isErr(tmp):
raise (ref CatchableError)(msg: "Trying to access pruned state")
tmp.get()
for i, bp in epochRef.beacon_proposers:
if bp.isSome():
result.add((public_key: epochRef.validatorKey(bp.get).get().toPubKey,
@ -141,7 +157,12 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
let
head = node.doChecksAndGetCurrentHead(epoch)
epochRef = node.dag.getEpochRef(head, epoch)
epochRef = block:
let tmp = node.dag.getEpochRef(head, epoch, true)
if isErr(tmp):
raise (ref CatchableError)(msg: "Trying to access pruned state")
tmp.get()
let
subnet_id = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), slot, committee_index)

View File

@ -444,7 +444,7 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
let
proposalState = assignClone(node.dag.headState)
node.dag.withState(proposalState[], head.atSlot(slot - 1)):
node.dag.withUpdatedState(proposalState[], head.atSlot(slot - 1)) do:
# Advance to the given slot without calculating state root - we'll only
# need a state root _with_ the block applied
var info: ForkedEpochInfo
@ -479,6 +479,10 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
default(merge.ExecutionPayload),
noRollback, # Temporary state - no need for rollback
cache)
do:
warn "Cannot get proposal state - skipping block productioon, database corrupt?",
head = shortLog(head),
slot
proc proposeBlock(node: BeaconNode,
validator: AttachedValidator,
@ -597,7 +601,7 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
# finalized epoch.. also, it seems that posting very old attestations
# is risky from a slashing perspective. More work is needed here.
warn "Skipping attestation, head is too recent",
headSlot = shortLog(head.slot),
head = shortLog(head),
slot = shortLog(slot)
return
@ -607,21 +611,25 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
# attesting to a past state - we must then recreate the world as it looked
# like back then
notice "Attesting to a state in the past, falling behind?",
headSlot = shortLog(head.slot),
attestationHeadSlot = shortLog(attestationHead.slot),
attestationSlot = shortLog(slot)
attestationHead = shortLog(attestationHead),
head = shortLog(head)
trace "Checking attestations",
attestationHeadRoot = shortLog(attestationHead.blck.root),
attestationSlot = shortLog(slot)
attestationHead = shortLog(attestationHead),
head = shortLog(head)
# We need to run attestations exactly for the slot that we're attesting to.
# In case blocks went missing, this means advancing past the latest block
# using empty slots as fillers.
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/validator.md#validator-assignments
let
epochRef = node.dag.getEpochRef(
attestationHead.blck, slot.compute_epoch_at_slot())
epochRef = block:
let tmp = node.dag.getEpochRef(attestationHead.blck, slot.epoch, false)
if isErr(tmp):
warn "Cannot construct EpochRef for attestation head, report bug",
attestationHead = shortLog(attestationHead), slot
return
tmp.get()
committees_per_slot = get_committee_count_per_slot(epochRef)
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root =
@ -874,18 +882,20 @@ proc makeAggregateAndProof*(
selection_proof: slot_signature))
proc sendAggregatedAttestations(
node: BeaconNode, aggregationHead: BlockRef, aggregationSlot: Slot) {.async.} =
# The index is via a
# locally attested validator. Unlike in handleAttestations(...) there's a
# single one at most per slot (because that's how aggregation attestation
# works), so the machinery that has to handle looping across, basically a
# set of locally attached validators is in principle not necessary, but a
# way to organize this. Then the private key for that validator should be
# the corresponding one -- whatver they are, they match.
node: BeaconNode, head: BlockRef, slot: Slot) {.async.} =
# Aggregated attestations must be sent by members of the beacon committees for
# the given slot, for which `is_aggregator` returns `true.
let
epochRef = node.dag.getEpochRef(aggregationHead, aggregationSlot.epoch)
fork = node.dag.forkAtEpoch(aggregationSlot.epoch)
epochRef = block:
let tmp = node.dag.getEpochRef(head, slot.epoch, false)
if isErr(tmp): # Some unusual race condition perhaps?
warn "Cannot construct EpochRef for head, report bug",
head = shortLog(head), slot
return
tmp.get()
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root =
getStateField(node.dag.headState.data, genesis_validators_root)
committees_per_slot = get_committee_count_per_slot(epochRef)
@ -898,14 +908,14 @@ proc sendAggregatedAttestations(
for committee_index in 0'u64..<committees_per_slot:
let committee = get_beacon_committee(
epochRef, aggregationSlot, committee_index.CommitteeIndex)
epochRef, slot, committee_index.CommitteeIndex)
for index_in_committee, validatorIdx in committee:
let validator = node.getAttachedValidator(epochRef, validatorIdx)
if validator != nil:
# the validator index and private key pair.
slotSigs.add getSlotSig(validator, fork,
genesis_validators_root, aggregationSlot)
genesis_validators_root, slot)
slotSigsData.add (committee_index, validatorIdx, validator)
await allFutures(slotSigs)
@ -915,10 +925,10 @@ proc sendAggregatedAttestations(
if slotSig.isErr():
error "Unable to create slot signature using remote signer",
validator = shortLog(curr[0].v),
aggregation_slot = aggregationSlot, error_msg = slotSig.error()
slot, error_msg = slotSig.error()
continue
let aggregateAndProof =
makeAggregateAndProof(node.attestationPool[], epochRef, aggregationSlot,
makeAggregateAndProof(node.attestationPool[], epochRef, slot,
curr[0].committee_index.CommitteeIndex,
curr[0].validator_idx,
slotSig.get())
@ -1134,18 +1144,26 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
proc sendAttestation*(node: BeaconNode,
attestation: Attestation): Future[SendResult] {.async.} =
# REST/JSON-RPC API helper procedure.
let attestationBlock =
block:
let res = node.dag.getRef(attestation.data.beacon_block_root)
if isNil(res):
debug "Attempt to send attestation without corresponding block",
attestation = shortLog(attestation)
return SendResult.err(
"Attempt to send attestation without corresponding block")
res
let
epochRef = node.dag.getEpochRef(
attestationBlock, attestation.data.target.epoch)
blck =
block:
let res = node.dag.getRef(attestation.data.beacon_block_root)
if isNil(res):
notice "Attempt to send attestation for unknown block",
attestation = shortLog(attestation)
return SendResult.err(
"Attempt to send attestation for unknown block")
res
epochRef = block:
let tmp = node.dag.getEpochRef(
blck, attestation.data.target.epoch, false)
if tmp.isErr(): # Shouldn't happen
warn "Cannot construct EpochRef for attestation, skipping send - report bug",
blck = shortLog(blck),
attestation = shortLog(attestation)
return
tmp.get()
subnet_id = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), attestation.data.slot,
attestation.data.index.CommitteeIndex)
@ -1293,7 +1311,14 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} =
# be getting the duties one slot at a time
for slot in wallSlot ..< wallSlot + SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS:
let
epochRef = node.dag.getEpochRef(head, slot.epoch)
epochRef = block:
let tmp = node.dag.getEpochRef(head, slot.epoch, false)
if tmp.isErr(): # Shouldn't happen
warn "Cannot construct EpochRef for duties - report bug",
head = shortLog(head), slot
return
tmp.get()
let
fork = node.dag.forkAtEpoch(slot.epoch)
committees_per_slot = get_committee_count_per_slot(epochRef)

View File

@ -238,7 +238,7 @@ proc cmdBench(conf: DbConf, cfg: RuntimeConfig) =
(ref merge.HashedBeaconState)())
withTimer(timers[tLoadState]):
dag.updateStateData(
doAssert dag.updateStateData(
stateData[], blockRefs[^1].atSlot(blockRefs[^1].slot - 1), false, cache)
template processBlocks(blocks: auto) =
@ -486,10 +486,11 @@ proc cmdRewindState(conf: DbConf, cfg: RuntimeConfig) =
return
let tmpState = assignClone(dag.headState)
dag.withState(tmpState[], blckRef.atSlot(Slot(conf.slot))):
dag.withUpdatedState(tmpState[], blckRef.atSlot(Slot(conf.slot))) do:
echo "Writing state..."
withState(stateData.data):
dump("./", state)
do: raiseAssert "withUpdatedState failed"
func atCanonicalSlot(blck: BlockRef, slot: Slot): BlockSlot =
if slot == 0:
@ -527,8 +528,9 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
var e2s = E2Store.open(".", name, firstSlot).get()
defer: e2s.close()
dag.withState(tmpState[], canonical):
dag.withUpdatedState(tmpState[], canonical) do:
e2s.appendRecord(stateData.data.phase0Data.data).get()
do: raiseAssert "withUpdatedState failed"
var
ancestors: seq[BlockRef]
@ -589,7 +591,7 @@ proc cmdValidatorPerf(conf: DbConf, cfg: RuntimeConfig) =
blockRefs[^1].slot.epoch, " - ", blockRefs[0].slot.epoch
let state = newClone(dag.headState)
dag.updateStateData(
doAssert dag.updateStateData(
state[], blockRefs[^1].atSlot(blockRefs[^1].slot - 1), false, cache)
proc processEpoch() =
@ -831,7 +833,7 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
start.epoch, " - ", ends.epoch
let state = newClone(dag.headState)
dag.updateStateData(
doAssert dag.updateStateData(
state[], blockRefs[^1].atSlot(if start > 0: start - 1 else: 0.Slot),
false, cache)

View File

@ -112,7 +112,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
let
attestationHead = dag.head.atSlot(slot)
dag.withState(tmpState[], attestationHead):
dag.withUpdatedState(tmpState[], attestationHead) do:
let committees_per_slot =
get_committee_count_per_slot(stateData.data, slot.epoch, cache)
@ -138,6 +138,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
aggregation_bits: aggregation_bits,
signature: sig.toValidatorSig()
), [validatorIdx], sig, data.slot.toBeaconTime)
do:
raiseAssert "withUpdatedState failed"
proc handleSyncCommitteeActions(slot: Slot) =
type
@ -301,7 +303,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
if rand(r, 1.0) > blockRatio:
return
dag.withState(tmpState[], dag.head.atSlot(slot)):
dag.withUpdatedState(tmpState[], dag.head.atSlot(slot)) do:
let
newBlock = getNewBlock[phase0.SignedBeaconBlock](stateData, slot, cache)
added = dag.addHeadBlock(verifier, newBlock) do (
@ -316,12 +318,14 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
attPool.prune()
do:
raiseAssert "withUpdatedState failed"
proc proposeAltairBlock(slot: Slot) =
if rand(r, 1.0) > blockRatio:
return
dag.withState(tmpState[], dag.head.atSlot(slot)):
dag.withUpdatedState(tmpState[], dag.head.atSlot(slot)) do:
let
newBlock = getNewBlock[altair.SignedBeaconBlock](stateData, slot, cache)
added = dag.addHeadBlock(verifier, newBlock) do (
@ -336,12 +340,14 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
attPool.prune()
do:
raiseAssert "withUpdatedState failed"
proc proposeBellatrixBlock(slot: Slot) =
if rand(r, 1.0) > blockRatio:
return
dag.withState(tmpState[], dag.head.atSlot(slot)):
dag.withUpdatedState(tmpState[], dag.head.atSlot(slot)) do:
let
newBlock = getNewBlock[merge.SignedBeaconBlock](stateData, slot, cache)
added = dag.addHeadBlock(verifier, newBlock) do (
@ -356,6 +362,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG()
attPool.prune()
do:
raiseAssert "withUpdatedState failed"
var
lastEth1BlockAt = genesisTime
@ -424,7 +432,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
if replay:
withTimer(timers[tReplay]):
var cache = StateCache()
dag.updateStateData(
doAssert dag.updateStateData(
replayState[], dag.head.atSlot(Slot(slots)), false, cache)
echo "Done!"

View File

@ -166,7 +166,7 @@ proc stepOnBlock(
signedBlock: ForkySignedBeaconBlock,
time: BeaconTime): Result[BlockRef, BlockError] =
# 1. Move state to proper slot.
dag.updateStateData(
doAssert dag.updateStateData(
state,
dag.head.atSlot(time.slotOrZero),
save = false,
@ -203,7 +203,9 @@ proc stepOnAttestation(
att: Attestation,
time: BeaconTime): FcResult[void] =
let epochRef =
dag.getEpochRef(dag.head, time.slotOrZero.compute_epoch_at_slot())
dag.getEpochRef(
dag.head, time.slotOrZero().compute_epoch_at_slot(),
false).expect("no pruning in test")
let attesters = epochRef.get_attesting_indices(att.data, att.aggregation_bits)
let status = fkChoice[].on_attestation(

View File

@ -46,6 +46,15 @@ suite "ChainDAG helpers":
ancestor.blck.epochAncestor(cur.slot.epoch) == ancestor
ancestor.blck.epochAncestor(ancestor.blck.slot.epoch) != ancestor
let
farEpoch = Epoch(42)
farTail = BlockRef(
bid: BlockId(slot: farEpoch.compute_start_slot_at_epoch() + 5))
check:
not isNil(epochAncestor(farTail, farEpoch).blck)
isNil(epochAncestor(farTail, farEpoch - 1).blck)
suite "Block pool processing" & preset():
setup:
var
@ -95,15 +104,15 @@ suite "Block pool processing" & preset():
b2Add[].root == b2Get.get().refs.root
dag.heads.len == 1
dag.heads[0] == b2Add[]
not er.isNil
not er.isErr()
# Same epoch - same epochRef
er == dag.findEpochRef(b2Add[], b2Add[].slot.epoch)
er[] == dag.findEpochRef(b2Add[], b2Add[].slot.epoch)[]
# Different epoch that was never processed
dag.findEpochRef(b1Add[], b1Add[].slot.epoch + 1).isNil
dag.findEpochRef(b1Add[], b1Add[].slot.epoch + 1).isErr()
er.validatorKey(0'u64).isSome()
er.validatorKey(validators - 1).isSome()
er.validatorKey(validators).isNone()
er[].validatorKey(0'u64).isSome()
er[].validatorKey(validators - 1).isSome()
er[].validatorKey(validators).isNone()
# Skip one slot to get a gap
check:
@ -167,7 +176,7 @@ suite "Block pool processing" & preset():
stateCheckpoint = dag.head.parent.atSlot(nextEpochSlot).stateCheckpoint
check:
not dag.getEpochRef(dag.head.parent, nextEpoch).isNil
dag.getEpochRef(dag.head.parent, nextEpoch, true).isOk()
# Getting an EpochRef should not result in states being stored
db.getStateRoot(stateCheckpoint.blck.root, stateCheckpoint.slot).isErr()
@ -216,40 +225,38 @@ suite "Block pool processing" & preset():
# move to specific block
var cache = StateCache()
dag.updateStateData(tmpState[], bs1, false, cache)
check:
dag.updateStateData(tmpState[], bs1, false, cache)
tmpState.blck == b1Add[]
getStateField(tmpState.data, slot) == bs1.slot
# Skip slots
dag.updateStateData(tmpState[], bs1_3, false, cache) # skip slots
check:
dag.updateStateData(tmpState[], bs1_3, false, cache) # skip slots
tmpState.blck == b1Add[]
getStateField(tmpState.data, slot) == bs1_3.slot
# Move back slots, but not blocks
dag.updateStateData(tmpState[], bs1_3.parent(), false, cache)
check:
dag.updateStateData(tmpState[], bs1_3.parent(), false, cache)
tmpState.blck == b1Add[]
getStateField(tmpState.data, slot) == bs1_3.parent().slot
# Move to different block and slot
dag.updateStateData(tmpState[], bs2_3, false, cache)
check:
dag.updateStateData(tmpState[], bs2_3, false, cache)
tmpState.blck == b2Add[]
getStateField(tmpState.data, slot) == bs2_3.slot
# Move back slot and block
dag.updateStateData(tmpState[], bs1, false, cache)
check:
dag.updateStateData(tmpState[], bs1, false, cache)
tmpState.blck == b1Add[]
getStateField(tmpState.data, slot) == bs1.slot
# Move back to genesis
dag.updateStateData(tmpState[], bs1.parent(), false, cache)
check:
dag.updateStateData(tmpState[], bs1.parent(), false, cache)
tmpState.blck == b1Add[].parent
getStateField(tmpState.data, slot) == bs1.parent.slot
@ -391,12 +398,13 @@ suite "chain DAG finalization tests" & preset():
dag.db.immutableValidators.len() == getStateField(dag.headState.data, validators).len()
let
finalER = dag.findEpochRef(dag.finalizedHead.blck, dag.finalizedHead.slot.epoch)
finalER = dag.getEpochRef(
dag.finalizedHead.blck, dag.finalizedHead.slot.epoch, false)
# The EpochRef for the finalized block is needed for eth1 voting, so we
# should never drop it!
check:
not finalER.isNil
not finalER.isErr()
block:
for er in dag.epochRefs:
@ -409,7 +417,7 @@ suite "chain DAG finalization tests" & preset():
# just processed the head the relevant epochrefs should not have been
# evicted yet
cache = StateCache()
updateStateData(
check: updateStateData(
dag, tmpStateData[], dag.head.atSlot(dag.head.slot), false, cache)
check:
@ -514,8 +522,8 @@ suite "chain DAG finalization tests" & preset():
tmpStateData = assignClone(dag.headState)
while cur.slot >= dag.finalizedHead.slot:
assign(tmpStateData[], dag.headState)
dag.updateStateData(tmpStateData[], cur.atSlot(cur.slot), false, cache)
check:
dag.updateStateData(tmpStateData[], cur.atSlot(cur.slot), false, cache)
dag.get(cur).data.phase0Data.message.state_root ==
getStateRoot(tmpStateData[].data)
getStateRoot(tmpStateData[].data) == hash_tree_root(
@ -665,7 +673,7 @@ suite "Backfill":
blocks = block:
var blocks: seq[ForkedSignedBeaconBlock]
var cache: StateCache
for i in 0..<SLOTS_PER_EPOCH:
for i in 0..<SLOTS_PER_EPOCH * 2:
blocks.add addTestBlock(tailState[], cache)
blocks
@ -694,6 +702,9 @@ suite "Backfill":
dag.getBlockSlotIdBySlot(Slot(0)) == dag.genesis.bid.atSlot(Slot(0))
dag.getBlockSlotIdBySlot(Slot(1)) == BlockSlotId()
# No epochref for pre-tail epochs
dag.getEpochRef(dag.tail, dag.tail.slot.epoch - 1, true).isErr()
var
badBlock = blocks[^2].phase0Data
badBlock.signature = blocks[^3].phase0Data.signature