cache shuffling separately from other EpochRef data (fixes #2677) (#3990)

In order to avoid full replays when validating attestations hailing from
untaken forks, it's better to keep shufflings separate from `EpochRef`
and perform a lookahead on the shuffling when processing the block that
determines them.

This also helps performance in the case where REST clients are trying to
perform lookahead on attestation duties and decreases memory usage by
sharing shufflings between EpochRef instances of the same dependent
root.
This commit is contained in:
Jacek Sieka 2022-08-18 20:07:01 +02:00 committed by GitHub
parent d3dbfd6021
commit 0d9fd54857
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 319 additions and 221 deletions

View File

@ -88,6 +88,13 @@ proc addResolvedHeadBlock(
# Update light client data
dag.processNewBlockForLightClient(state, trustedBlock, parent.bid)
# Pre-heat the shuffling cache with the shuffling caused by this block - this
# is useful for attestation duty lookahead, REST API queries and attestation
# validation of untaken forks (in case of instability / multiple heads)
if dag.findShufflingRef(blockRef.bid, blockRef.slot.epoch + 1).isNone:
dag.putShufflingRef(
ShufflingRef.init(state, cache, blockRef.slot.epoch + 1))
if not blockVerified:
dag.optimisticRoots.incl blockRoot

View File

@ -189,6 +189,8 @@ type
cfg*: RuntimeConfig
shufflingRefs*: array[16, ShufflingRef]
epochRefs*: array[32, EpochRef]
## Cached information about a particular epoch ending with the given
## block - we limit the number of held EpochRefs to put a cap on
@ -243,8 +245,18 @@ type
epoch*: Epoch
bid*: BlockId
ShufflingRef* = ref object
## Attestation committee shuffling that determines when validators have
## duties - determined with one epoch of look-ahead - the dependent root is
## the block root that is the last to affect the shuffling outcome.
epoch*: Epoch
attester_dependent_root*: Eth2Digest
## Root of the block that determined the shuffling - this is the last
## block that was applied in (epoch - 2).
shuffled_active_validator_indices*: seq[ValidatorIndex]
EpochRef* = ref object
dag*: ChainDAGRef
key*: EpochKey
eth1_data*: Eth1Data
@ -255,8 +267,7 @@ type
beacon_proposers*: array[SLOTS_PER_EPOCH, Option[ValidatorIndex]]
proposer_dependent_root*: Eth2Digest
shuffled_active_validator_indices*: seq[ValidatorIndex]
attester_dependent_root*: Eth2Digest
shufflingRef*: ShufflingRef
# balances, as used in fork choice
effective_balances_bytes*: seq[byte]

View File

@ -139,14 +139,6 @@ func validatorKey*(
## non-head branch)!
dag.db.immutableValidators.load(index)
func validatorKey*(
epochRef: EpochRef, index: ValidatorIndex or uint64): Option[CookedPubKey] =
## Returns the validator pubkey for the index, assuming it's been observed
## at any point in time - this function may return pubkeys for indicies that
## are not (yet) part of the head state (if the key has been observed on a
## non-head branch)!
validatorKey(epochRef.dag, index)
template is_merge_transition_complete(
stateParam: ForkedHashedBeaconState): bool =
withState(stateParam):
@ -155,57 +147,6 @@ template is_merge_transition_complete(
else:
false
func init*(
T: type EpochRef, dag: ChainDAGRef, state: ForkedHashedBeaconState,
cache: var StateCache): T =
let
epoch = state.get_current_epoch()
proposer_dependent_root = withState(state): state.proposer_dependent_root
attester_dependent_root = withState(state): state.attester_dependent_root
epochRef = EpochRef(
dag: dag, # This gives access to the validator pubkeys through an EpochRef
key: dag.epochAncestor(state.latest_block_id, epoch),
eth1_data:
getStateField(state, eth1_data),
eth1_deposit_index:
getStateField(state, eth1_deposit_index),
checkpoints:
FinalityCheckpoints(
justified: getStateField(state, current_justified_checkpoint),
finalized: getStateField(state, finalized_checkpoint)),
# beacon_proposers: Separately filled below
proposer_dependent_root: proposer_dependent_root,
shuffled_active_validator_indices:
cache.get_shuffled_active_validator_indices(state, epoch),
attester_dependent_root: attester_dependent_root,
)
epochStart = epoch.start_slot()
for i in 0'u64..<SLOTS_PER_EPOCH:
epochRef.beacon_proposers[i] =
get_beacon_proposer_index(state, cache, epochStart + i)
# When fork choice runs, it will need the effective balance of the justified
# checkpoint - we pre-load the balances here to avoid rewinding the justified
# state later and compress them because not all checkpoints end up being used
# for fork choice - specially during long periods of non-finalization
proc snappyEncode(inp: openArray[byte]): seq[byte] =
try:
snappy.encode(inp)
except CatchableError as err:
raiseAssert err.msg
epochRef.effective_balances_bytes =
snappyEncode(SSZ.encode(
List[Gwei, Limit VALIDATOR_REGISTRY_LIMIT](
get_effective_balances(getStateField(state, validators).asSeq, epoch))))
epochRef
func effective_balances*(epochRef: EpochRef): seq[Gwei] =
try:
SSZ.decode(snappy.decode(epochRef.effective_balances_bytes, uint32.high),
@ -346,17 +287,146 @@ func epochAncestor*(dag: ChainDAGRef, bid: BlockId, epoch: Epoch): EpochKey =
EpochKey(epoch: epoch, bid: bsi.bid)
func findShufflingRef*(
dag: ChainDAGRef, bid: BlockId, epoch: Epoch): Opt[ShufflingRef] =
## Lookup a shuffling in the cache, returning `none` if it's not present - see
## `getShufflingRef` for a version that creates a new instance if it's missing
let
dependent_slot = if epoch > 2: (epoch - 1).start_slot() - 1 else: Slot(0)
dependent_bsi = dag.atSlot(bid, dependent_slot).valueOr:
return Opt.none(ShufflingRef)
for s in dag.shufflingRefs:
if s == nil: continue
if s.epoch == epoch and dependent_bsi.bid.root == s.attester_dependent_root:
return Opt.some s
Opt.none(ShufflingRef)
func putShufflingRef*(dag: ChainDAGRef, shufflingRef: ShufflingRef) =
## Store shuffling in the cache
if shufflingRef.epoch < dag.finalizedHead.slot.epoch():
# Only cache epoch information for unfinalized blocks - earlier states
# are seldomly used (ie RPC), so no need to cache
return
# Because we put a cap on the number of shufflingRef we store, we want to
# prune the least useful state - for now, we'll assume that to be the
# oldest shufflingRef we know about.
var
oldest = 0
for x in 0..<dag.shufflingRefs.len:
let candidate = dag.shufflingRefs[x]
if candidate == nil:
oldest = x
break
if candidate.epoch < dag.shufflingRefs[oldest].epoch:
oldest = x
dag.shufflingRefs[oldest] = shufflingRef
func findEpochRef*(
dag: ChainDAGRef, bid: BlockId, epoch: Epoch): Opt[EpochRef] =
## Look for an existing cached EpochRef, but unlike `getEpochRef`, don't
## try to create one by recreating the epoch state
## Lookup an EpochRef in the cache, returning `none` if it's not present - see
## `getEpochRef` for a version that creates a new instance if it's missing
let ancestor = dag.epochAncestor(bid, epoch)
for i in 0..<dag.epochRefs.len:
if dag.epochRefs[i] != nil and dag.epochRefs[i].key == ancestor:
return ok(dag.epochRefs[i])
for e in dag.epochRefs:
if e == nil: continue
if e.key == ancestor:
return Opt.some e
err()
Opt.none(EpochRef)
func putEpochRef(dag: ChainDAGRef, epochRef: EpochRef) =
if epochRef.epoch < dag.finalizedHead.slot.epoch():
# Only cache epoch information for unfinalized blocks - earlier states
# are seldomly used (ie RPC), so no need to cache
return
# Because we put a cap on the number of epochRefs we store, we want to
# prune the least useful state - for now, we'll assume that to be the
# oldest epochRef we know about.
var
oldest = 0
for x in 0..<dag.epochRefs.len:
let candidate = dag.epochRefs[x]
if candidate == nil:
oldest = x
break
if candidate.key.epoch < dag.epochRefs[oldest].epoch:
oldest = x
dag.epochRefs[oldest] = epochRef
func init*(
T: type ShufflingRef, state: ForkedHashedBeaconState,
cache: var StateCache, epoch: Epoch): T =
let
dependent_epoch =
if epoch < 1: Epoch(0) else: epoch - 1
attester_dependent_root =
withState(state): state.dependent_root(dependent_epoch)
ShufflingRef(
epoch: epoch,
attester_dependent_root: attester_dependent_root,
shuffled_active_validator_indices:
cache.get_shuffled_active_validator_indices(state, epoch),
)
func init*(
T: type EpochRef, dag: ChainDAGRef, state: ForkedHashedBeaconState,
cache: var StateCache): T =
let
epoch = state.get_current_epoch()
proposer_dependent_root = withState(state): state.proposer_dependent_root
shufflingRef = dag.findShufflingRef(state.latest_block_id, epoch).valueOr:
let tmp = ShufflingRef.init(state, cache, epoch)
dag.putShufflingRef(tmp)
tmp
attester_dependent_root = withState(state): state.attester_dependent_root
epochRef = EpochRef(
key: dag.epochAncestor(state.latest_block_id, epoch),
eth1_data:
getStateField(state, eth1_data),
eth1_deposit_index:
getStateField(state, eth1_deposit_index),
checkpoints:
FinalityCheckpoints(
justified: getStateField(state, current_justified_checkpoint),
finalized: getStateField(state, finalized_checkpoint)),
# beacon_proposers: Separately filled below
proposer_dependent_root: proposer_dependent_root,
shufflingRef: shufflingRef
)
epochStart = epoch.start_slot()
for i in 0'u64..<SLOTS_PER_EPOCH:
epochRef.beacon_proposers[i] =
get_beacon_proposer_index(state, cache, epochStart + i)
# When fork choice runs, it will need the effective balance of the justified
# checkpoint - we pre-load the balances here to avoid rewinding the justified
# state later and compress them because not all checkpoints end up being used
# for fork choice - specially during long periods of non-finalization
proc snappyEncode(inp: openArray[byte]): seq[byte] =
try:
snappy.encode(inp)
except CatchableError as err:
raiseAssert err.msg
epochRef.effective_balances_bytes =
snappyEncode(SSZ.encode(
List[Gwei, Limit VALIDATOR_REGISTRY_LIMIT](
get_effective_balances(getStateField(state, validators).asSeq, epoch))))
epochRef
func loadStateCache(
dag: ChainDAGRef, cache: var StateCache, bid: BlockId, epoch: Epoch) =
@ -368,12 +438,14 @@ func loadStateCache(
block:
let epoch = e
if epoch notin cache.shuffled_active_validator_indices:
let shufflingRef = dag.findShufflingRef(bid, epoch)
if shufflingRef.isSome():
cache.shuffled_active_validator_indices[epoch] =
shufflingRef[][].shuffled_active_validator_indices
let epochRef = dag.findEpochRef(bid, epoch)
if epochRef.isSome():
cache.shuffled_active_validator_indices[epoch] =
epochRef[].shuffled_active_validator_indices
let start_slot = epoch.start_slot()
for i, idx in epochRef[].beacon_proposers:
for i, idx in epochRef[][].beacon_proposers:
cache.beacon_proposer_indices[start_slot + i] = idx
load(epoch)
@ -992,25 +1064,7 @@ func getEpochRef*(
var epochRef = dag.findEpochRef(bid, epoch)
if epochRef.isErr:
let res = EpochRef.init(dag, state, cache)
if epoch >= dag.finalizedHead.slot.epoch():
# Only cache epoch information for unfinalized blocks - earlier states
# are seldomly used (ie RPC), so no need to cache
# Because we put a cap on the number of epochRefs we store, we want to
# prune the least useful state - for now, we'll assume that to be the
# oldest epochRef we know about.
var
oldest = 0
for x in 0..<dag.epochRefs.len:
let candidate = dag.epochRefs[x]
if candidate == nil:
oldest = x
break
if candidate.key.epoch < dag.epochRefs[oldest].epoch:
oldest = x
dag.epochRefs[oldest] = res
dag.putEpochRef(res)
res
else:
epochRef.get()
@ -1071,6 +1125,24 @@ proc getFinalizedEpochRef*(dag: ChainDAGRef): EpochRef =
dag.finalizedHead.blck, dag.finalizedHead.slot.epoch, false).expect(
"getEpochRef for finalized head should always succeed")
proc getShufflingRef*(
dag: ChainDAGRef, blck: BlockRef, epoch: Epoch,
preFinalized: bool): Opt[ShufflingRef] =
## Return the shuffling in the given history and epoch - this potentially is
## faster than returning a full EpochRef because the shuffling is determined
## an epoch in advance and therefore is less sensitive to reorgs
let shufflingRef = dag.findShufflingRef(blck.bid, epoch)
if shufflingRef.isNone:
# TODO here, we could check the existing cached states and see if any one
# has the right dependent root - unlike EpochRef, we don't need an _exact_
# epoch match
let epochRef = dag.getEpochRef(blck, epoch, preFinalized).valueOr:
return Opt.none ShufflingRef
dag.putShufflingRef(epochRef.shufflingRef)
Opt.some epochRef.shufflingRef
else:
shufflingRef
func stateCheckpoint*(dag: ChainDAGRef, bsi: BlockSlotId): BlockSlotId =
## The first ancestor BlockSlot that is a state checkpoint
var bsi = bsi
@ -1601,6 +1673,11 @@ proc pruneStateCachesDAG*(dag: ChainDAGRef) =
if dag.epochRefs[i] != nil and
dag.epochRefs[i].epoch < dag.finalizedHead.slot.epoch:
dag.epochRefs[i] = nil
for i in 0..<dag.shufflingRefs.len:
if dag.shufflingRefs[i] != nil and
dag.shufflingRefs[i].epoch < dag.finalizedHead.slot.epoch:
dag.shufflingRefs[i] = nil
let epochRefPruneTick = Moment.now()
dag.lastPrunePoint = dag.finalizedHead.toBlockSlotId().expect("not nil")

View File

@ -25,71 +25,71 @@ export
logScope: topics = "spec_cache"
# Spec functions implemented based on cached values instead of the full state
func count_active_validators*(epochInfo: EpochRef): uint64 =
epochInfo.shuffled_active_validator_indices.lenu64
func count_active_validators*(shufflingRef: ShufflingRef): uint64 =
shufflingRef.shuffled_active_validator_indices.lenu64
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.2/specs/phase0/beacon-chain.md#get_committee_count_per_slot
func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 =
get_committee_count_per_slot(count_active_validators(epochInfo))
func get_committee_count_per_slot*(shufflingRef: ShufflingRef): uint64 =
get_committee_count_per_slot(count_active_validators(shufflingRef))
iterator get_committee_indices*(epochRef: EpochRef): CommitteeIndex =
let committees_per_slot = get_committee_count_per_slot(epochRef)
iterator get_committee_indices*(shufflingRef: ShufflingRef): CommitteeIndex =
let committees_per_slot = get_committee_count_per_slot(shufflingRef)
for committee_index in get_committee_indices(committees_per_slot):
yield committee_index
func get_committee_index*(epochRef: EpochRef, index: uint64):
func get_committee_index*(shufflingRef: ShufflingRef, index: uint64):
Result[CommitteeIndex, cstring] =
check_attestation_index(index, get_committee_count_per_slot(epochRef))
check_attestation_index(index, get_committee_count_per_slot(shufflingRef))
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.2/specs/phase0/beacon-chain.md#get_beacon_committee
iterator get_beacon_committee*(
epochRef: EpochRef, slot: Slot, committee_index: CommitteeIndex):
shufflingRef: ShufflingRef, slot: Slot, committee_index: CommitteeIndex):
(int, ValidatorIndex) =
## Return the beacon committee at ``slot`` for ``index``.
doAssert slot.epoch == epochRef.epoch
let committees_per_slot = get_committee_count_per_slot(epochRef)
doAssert slot.epoch == shufflingRef.epoch
let committees_per_slot = get_committee_count_per_slot(shufflingRef)
for index_in_committee, idx in compute_committee(
epochRef.shuffled_active_validator_indices,
shufflingRef.shuffled_active_validator_indices,
(slot mod SLOTS_PER_EPOCH) * committees_per_slot + committee_index.asUInt64,
committees_per_slot * SLOTS_PER_EPOCH
): yield (index_in_committee, idx)
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.2/specs/phase0/beacon-chain.md#get_beacon_committee
func get_beacon_committee*(
epochRef: EpochRef, slot: Slot, committee_index: CommitteeIndex):
shufflingRef: ShufflingRef, slot: Slot, committee_index: CommitteeIndex):
seq[ValidatorIndex] =
## Return the beacon committee at ``slot`` for ``index``.
doAssert slot.epoch == epochRef.epoch
let committees_per_slot = get_committee_count_per_slot(epochRef)
doAssert slot.epoch == shufflingRef.epoch
let committees_per_slot = get_committee_count_per_slot(shufflingRef)
compute_committee(
epochRef.shuffled_active_validator_indices,
shufflingRef.shuffled_active_validator_indices,
(slot mod SLOTS_PER_EPOCH) * committees_per_slot + committee_index.asUInt64,
committees_per_slot * SLOTS_PER_EPOCH
)
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.2/specs/phase0/beacon-chain.md#get_beacon_committee
func get_beacon_committee_len*(
epochRef: EpochRef, slot: Slot, committee_index: CommitteeIndex): uint64 =
shufflingRef: ShufflingRef, slot: Slot, committee_index: CommitteeIndex): uint64 =
## Return the number of members in the beacon committee at ``slot`` for ``index``.
doAssert slot.epoch == epochRef.epoch
let committees_per_slot = get_committee_count_per_slot(epochRef)
doAssert slot.epoch == shufflingRef.epoch
let committees_per_slot = get_committee_count_per_slot(shufflingRef)
compute_committee_len(
count_active_validators(epochRef),
count_active_validators(shufflingRef),
(slot mod SLOTS_PER_EPOCH) * committees_per_slot + committee_index.asUInt64,
committees_per_slot * SLOTS_PER_EPOCH
)
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.2/specs/phase0/beacon-chain.md#get_attesting_indices
iterator get_attesting_indices*(epochRef: EpochRef,
iterator get_attesting_indices*(shufflingRef: ShufflingRef,
slot: Slot,
committee_index: CommitteeIndex,
bits: CommitteeValidatorsBits):
ValidatorIndex =
if bits.lenu64 != get_beacon_committee_len(epochRef, slot, committee_index):
if bits.lenu64 != get_beacon_committee_len(shufflingRef, slot, committee_index):
trace "get_attesting_indices: inconsistent aggregation and committee length"
else:
for index_in_committee, validator_index in get_beacon_committee(
epochRef, slot, committee_index):
shufflingRef, slot, committee_index):
if bits[index_in_committee]:
yield validator_index
@ -111,13 +111,13 @@ iterator get_attesting_indices*(
warn "Invalid attestation target"
doAssert strictVerification notin dag.updateFlags
break
epochRef =
dag.getEpochRef(target.blck, target.slot.epoch, false).valueOr:
warn "Attestation `EpochRef` not found"
shufflingRef =
dag.getShufflingRef(target.blck, target.slot.epoch, false).valueOr:
warn "Attestation shuffling not found"
doAssert strictVerification notin dag.updateFlags
break
committeesPerSlot = get_committee_count_per_slot(epochRef)
committeesPerSlot = get_committee_count_per_slot(shufflingRef)
committeeIndex =
CommitteeIndex.init(attestation.data.index, committeesPerSlot).valueOr:
warn "Unexpected committee index in block attestation"
@ -125,10 +125,10 @@ iterator get_attesting_indices*(
break
for validator in get_attesting_indices(
epochRef, slot, committeeIndex, attestation.aggregation_bits):
shufflingRef, slot, committeeIndex, attestation.aggregation_bits):
yield validator
func get_attesting_indices_one*(epochRef: EpochRef,
func get_attesting_indices_one*(shufflingRef: ShufflingRef,
slot: Slot,
committee_index: CommitteeIndex,
bits: CommitteeValidatorsBits):
@ -137,25 +137,25 @@ func get_attesting_indices_one*(epochRef: EpochRef,
# if only one validator index is set
var res = none(ValidatorIndex)
for validator_index in get_attesting_indices(
epochRef, slot, committee_index, bits):
shufflingRef, slot, committee_index, bits):
if res.isSome(): return none(ValidatorIndex)
res = some(validator_index)
res
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.2/specs/phase0/beacon-chain.md#get_attesting_indices
func get_attesting_indices*(epochRef: EpochRef,
func get_attesting_indices*(shufflingRef: ShufflingRef,
slot: Slot,
committee_index: CommitteeIndex,
bits: CommitteeValidatorsBits):
seq[ValidatorIndex] =
# TODO sequtils2 mapIt
for idx in get_attesting_indices(epochRef, slot, committee_index, bits):
for idx in get_attesting_indices(shufflingRef, slot, committee_index, bits):
result.add(idx)
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.2/specs/phase0/beacon-chain.md#is_valid_indexed_attestation
proc is_valid_indexed_attestation*(
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: EpochRef,
dag: ChainDAGRef, shufflingRef: EpochRef,
attestation: SomeAttestation, flags: UpdateFlags): Result[void, cstring] =
# This is a variation on `is_valid_indexed_attestation` that works directly
# with an attestation instead of first constructing an `IndexedAttestation`
@ -171,8 +171,8 @@ proc is_valid_indexed_attestation*(
var
pubkeys = newSeqOfCap[CookedPubKey](sigs)
for index in get_attesting_indices(
epochRef, attestation.data, attestation.aggregation_bits):
pubkeys.add(epochRef.validatorKey(index).get())
shufflingRef, attestation.data, attestation.aggregation_bits):
pubkeys.add(dag.validatorKey(index).get())
if not verify_attestation_signature(
fork, genesis_validators_root, attestation.data,
@ -209,23 +209,23 @@ func makeAttestationData*(
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#validator-assignments
iterator get_committee_assignments*(
epochRef: EpochRef, validator_indices: HashSet[ValidatorIndex]):
shufflingRef: ShufflingRef, validator_indices: HashSet[ValidatorIndex]):
tuple[committee_index: CommitteeIndex,
subnet_id: SubnetId, slot: Slot] =
let
committees_per_slot = get_committee_count_per_slot(epochRef)
epoch = epochRef.epoch
committees_per_slot = get_committee_count_per_slot(shufflingRef)
epoch = shufflingRef.epoch
for slot in epoch.slots():
for committee_index in get_committee_indices(committees_per_slot):
if anyIt(get_beacon_committee(epochRef, slot, committee_index), it in validator_indices):
if anyIt(get_beacon_committee(shufflingRef, slot, committee_index), it in validator_indices):
yield (
committee_index,
compute_subnet_for_attestation(committees_per_slot, slot, committee_index),
slot)
func is_aggregator*(epochRef: EpochRef, slot: Slot, index: CommitteeIndex,
slot_signature: ValidatorSig): bool =
func is_aggregator*(shufflingRef: ShufflingRef, slot: Slot,
index: CommitteeIndex, slot_signature: ValidatorSig): bool =
let
committee_len = get_beacon_committee_len(epochRef, slot, index)
committee_len = get_beacon_committee_len(shufflingRef, slot, index)
return is_aggregator(committee_len, slot_signature)

View File

@ -339,7 +339,7 @@ proc scheduleAggregateChecks*(
batchCrypto: ref BatchCrypto,
fork: Fork, genesis_validators_root: Eth2Digest,
signedAggregateAndProof: SignedAggregateAndProof,
epochRef: EpochRef,
dag: ChainDAGRef,
attesting_indices: openArray[ValidatorIndex]
): Result[tuple[
aggregatorFut, slotFut, aggregateFut: Future[BatchResult],
@ -364,13 +364,13 @@ proc scheduleAggregateChecks*(
# Do the eager steps first to avoid polluting batches with needlessly
let
aggregatorKey =
epochRef.validatorKey(aggregate_and_proof.aggregator_index).orReturnErr(
dag.validatorKey(aggregate_and_proof.aggregator_index).orReturnErr(
"SignedAggregateAndProof: invalid aggregator index")
aggregatorSig = signedAggregateAndProof.signature.load().orReturnErr(
"aggregateAndProof: invalid proof signature")
slotSig = aggregate_and_proof.selection_proof.load().orReturnErr(
"aggregateAndProof: invalid selection signature")
aggregateKey = ? aggregateAll(epochRef.dag, attesting_indices)
aggregateKey = ? aggregateAll(dag, attesting_indices)
aggregateSig = aggregate.signature.load().orReturnErr(
"aggregateAndProof: invalid aggregate signature")

View File

@ -142,11 +142,11 @@ func check_aggregation_count(
ok()
func check_attestation_subnet(
epochRef: EpochRef, slot: Slot, committee_index: CommitteeIndex,
shufflingRef: ShufflingRef, slot: Slot, committee_index: CommitteeIndex,
subnet_id: SubnetId): Result[void, ValidationError] =
let
expectedSubnet = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), slot, committee_index)
get_committee_count_per_slot(shufflingRef), slot, committee_index)
if expectedSubnet != subnet_id:
return errReject("Attestation not on the correct subnet")
@ -447,18 +447,17 @@ proc validateAttestation*(
# compute_start_slot_at_epoch(store.finalized_checkpoint.epoch)) ==
# store.finalized_checkpoint.root
let
epochRef = block:
let tmp = pool.dag.getEpochRef(target.blck, target.slot.epoch, false)
if isErr(tmp): # shouldn't happen since we verified target
warn "No EpochRef for attestation",
shufflingRef =
pool.dag.getShufflingRef(target.blck, target.slot.epoch, false).valueOr:
# Target is verified - shouldn't happen
warn "No shuffling for attestation - report bug",
attestation = shortLog(attestation), target = shortLog(target)
return errIgnore("Attestation: no EpochRef")
tmp.get()
return errIgnore("Attestation: no shuffling")
# [REJECT] The committee index is within the expected range -- i.e.
# data.index < get_committee_count_per_slot(state, data.target.epoch).
let committee_index = block:
let idx = epochRef.get_committee_index(attestation.data.index)
let idx = shufflingRef.get_committee_index(attestation.data.index)
if idx.isErr():
return checkedReject("Attestation: committee index not within expected range")
idx.get()
@ -471,7 +470,7 @@ proc validateAttestation*(
# committee information for the signature check.
block:
let v = check_attestation_subnet(
epochRef, attestation.data.slot, committee_index, subnet_id) # [REJECT]
shufflingRef, attestation.data.slot, committee_index, subnet_id) # [REJECT]
if v.isErr():
return err(v.error)
@ -483,7 +482,7 @@ proc validateAttestation*(
# epoch matches its target and attestation.data.target.root is an ancestor of
# attestation.data.beacon_block_root.
if not (attestation.aggregation_bits.lenu64 == get_beacon_committee_len(
epochRef, attestation.data.slot, committee_index)):
shufflingRef, attestation.data.slot, committee_index)):
return checkedReject(
"Attestation: number of aggregation bits and committee size mismatch")
@ -492,7 +491,7 @@ proc validateAttestation*(
genesis_validators_root =
getStateField(pool.dag.headState, genesis_validators_root)
attesting_index = get_attesting_indices_one(
epochRef, slot, committee_index, attestation.aggregation_bits)
shufflingRef, slot, committee_index, attestation.aggregation_bits)
# The number of aggregation bits matches the committee size, which ensures
# this condition holds.
@ -509,7 +508,7 @@ proc validateAttestation*(
attestation.data.target.epoch:
return errIgnore("Attestation: Validator has already voted in epoch")
let pubkey = epochRef.validatorKey(validator_index)
let pubkey = pool.dag.validatorKey(validator_index)
if pubkey.isNone():
# can't happen, in theory, because we checked the aggregator index above
return errIgnore("Attestation: cannot find validator pubkey")
@ -635,18 +634,17 @@ proc validateAggregate*(
return errIgnore("Aggregate already covered")
let
epochRef = block:
let tmp = pool.dag.getEpochRef(target.blck, target.slot.epoch, false)
if tmp.isErr: # shouldn't happen since we verified target
warn "No EpochRef for attestation - report bug",
shufflingRef =
pool.dag.getShufflingRef(target.blck, target.slot.epoch, false).valueOr:
# Target is verified - shouldn't happen
warn "No shuffling for attestation - report bug",
aggregate = shortLog(aggregate), target = shortLog(target)
return errIgnore("Aggregate: no EpochRef")
tmp.get()
return errIgnore("Aggregate: no shuffling")
# [REJECT] The committee index is within the expected range -- i.e.
# data.index < get_committee_count_per_slot(state, data.target.epoch).
let committee_index = block:
let idx = epochRef.get_committee_index(aggregate.data.index)
let idx = shufflingRef.get_committee_index(aggregate.data.index)
if idx.isErr():
return checkedReject("Attestation: committee index not within expected range")
idx.get()
@ -655,7 +653,7 @@ proc validateAggregate*(
# aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot,
# aggregate.data.index, aggregate_and_proof.selection_proof) returns True.
if not is_aggregator(
epochRef, slot, committee_index, aggregate_and_proof.selection_proof):
shufflingRef, slot, committee_index, aggregate_and_proof.selection_proof):
return checkedReject("Aggregate: incorrect aggregator")
# [REJECT] The aggregator's validator index is within the committee -- i.e.
@ -667,7 +665,7 @@ proc validateAggregate*(
return checkedReject("Aggregate: invalid aggregator index")
if aggregator_index notin
get_beacon_committee(epochRef, slot, committee_index):
get_beacon_committee(shufflingRef, slot, committee_index):
return checkedReject("Aggregate: aggregator's validator index not in committee")
# 1. [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the
@ -682,14 +680,14 @@ proc validateAggregate*(
genesis_validators_root =
getStateField(pool.dag.headState, genesis_validators_root)
attesting_indices = get_attesting_indices(
epochRef, slot, committee_index, aggregate.aggregation_bits)
shufflingRef, slot, committee_index, aggregate.aggregation_bits)
let
sig = if checkSignature:
let deferredCrypto = batchCrypto
.scheduleAggregateChecks(
fork, genesis_validators_root,
signedAggregateAndProof, epochRef, attesting_indices
signedAggregateAndProof, pool.dag, attesting_indices
)
if deferredCrypto.isErr():
return checkedReject(deferredCrypto.error)

View File

@ -68,7 +68,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
if res.isErr():
return RestApiResponse.jsonError(Http503, BeaconNodeInSyncError)
res.get()
let epochRef = node.dag.getEpochRef(qhead, qepoch, true).valueOr:
let shufflingRef = node.dag.getShufflingRef(qhead, qepoch, true).valueOr:
return RestApiResponse.jsonError(Http400, PrunedStateError)
let duties =
@ -76,13 +76,15 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
var res: seq[RestAttesterDuty]
let
committees_per_slot = get_committee_count_per_slot(epochRef)
committees_per_slot = get_committee_count_per_slot(shufflingRef)
for committee_index in get_committee_indices(committees_per_slot):
for slot in qepoch.slots():
let committee = get_beacon_committee(epochRef, slot, committee_index)
let
committee =
get_beacon_committee(shufflingRef, slot, committee_index)
for index_in_committee, validator_index in committee:
if validator_index in indexList:
let validator_key = epochRef.validatorKey(validator_index)
let validator_key = node.dag.validatorKey(validator_index)
if validator_key.isSome():
res.add(
RestAttesterDuty(
@ -105,7 +107,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
none[bool]()
return RestApiResponse.jsonResponseWRoot(
duties, epochRef.attester_dependent_root, optimistic)
duties, shufflingRef.attester_dependent_root, optimistic)
# https://ethereum.github.io/beacon-APIs/#/Validator/getProposerDuties
router.api(MethodGet, "/eth/v1/validator/duties/proposer/{epoch}") do (
@ -144,7 +146,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
if bp.isSome():
res.add(
RestProposerDuty(
pubkey: epochRef.validatorKey(bp.get()).get().toPubKey(),
pubkey: node.dag.validatorKey(bp.get()).get().toPubKey(),
validator_index: bp.get(),
slot: qepoch.start_slot() + i
)
@ -566,17 +568,16 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
wallEpoch = wallSlot.epoch
head = node.dag.head
var currentEpoch, nextEpoch: Option[EpochRef]
template getAndCacheEpochRef(epochRefVar: var Option[EpochRef],
epoch: Epoch): EpochRef =
if epochRefVar.isNone:
epochRefVar = block:
let epochRef = node.dag.getEpochRef(head, epoch, true)
if isErr(epochRef):
var currentEpoch, nextEpoch: Opt[ShufflingRef]
template getAndCacheShufflingRef(shufflingRefVar: var Opt[ShufflingRef],
epoch: Epoch): ShufflingRef =
if shufflingRefVar.isNone:
shufflingRefVar = block:
let tmp = node.dag.getShufflingRef(head, epoch, true).valueOr:
return RestApiResponse.jsonError(Http400, PrunedStateError)
some epochRef.get()
Opt.some tmp
epochRefVar.get
shufflingRefVar.get
for request in requests:
if uint64(request.committee_index) >= uint64(MAX_COMMITTEES_PER_SLOT):
@ -589,17 +590,19 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
if wallSlot > request.slot + 1:
return RestApiResponse.jsonError(Http400, SlotFromThePastError)
let epoch = request.slot.epoch
let epochRef = if epoch == wallEpoch:
currentEpoch.getAndCacheEpochRef(wallEpoch)
elif epoch == wallEpoch + 1:
nextEpoch.getAndCacheEpochRef(wallEpoch + 1)
else:
return RestApiResponse.jsonError(Http400,
SlotNotInNextWallSlotEpochError)
let
epoch = request.slot.epoch
shufflingRef =
if epoch == wallEpoch:
currentEpoch.getAndCacheShufflingRef(wallEpoch)
elif epoch == wallEpoch + 1:
nextEpoch.getAndCacheShufflingRef(wallEpoch + 1)
else:
return RestApiResponse.jsonError(Http400,
SlotNotInNextWallSlotEpochError)
let subnet_id = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), request.slot,
get_committee_count_per_slot(shufflingRef), request.slot,
request.committee_index)
node.actionTracker.registerDuty(

View File

@ -225,7 +225,7 @@ func updateActions*(
let
epoch = epochRef.epoch
tracker.attesterDepRoot = epochRef.attester_dependent_root
tracker.attesterDepRoot = epochRef.shufflingRef.attester_dependent_root
tracker.lastCalculatedEpoch = epoch
let validatorIndices = toHashSet(toSeq(tracker.knownValidators.keys()))
@ -243,7 +243,7 @@ func updateActions*(
static: doAssert SLOTS_PER_EPOCH <= 32
for (committeeIndex, subnet_id, slot) in
get_committee_assignments(epochRef, validatorIndices):
get_committee_assignments(epochRef.shufflingRef, validatorIndices):
doAssert epoch(slot) == epoch

View File

@ -209,19 +209,19 @@ proc routeAttestation*(
return err(
"Attempt to send attestation for unknown target")
epochRef = router[].dag.getEpochRef(
shufflingRef = router[].dag.getShufflingRef(
target, attestation.data.target.epoch, false).valueOr:
warn "Cannot construct EpochRef for attestation, skipping send - report bug",
target = shortLog(target),
attestation = shortLog(attestation)
return
committee_index =
epochRef.get_committee_index(attestation.data.index).valueOr:
shufflingRef.get_committee_index(attestation.data.index).valueOr:
notice "Invalid committee index in attestation",
attestation = shortLog(attestation)
return err("Invalid committee index in attestation")
subnet_id = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), attestation.data.slot,
get_committee_count_per_slot(shufflingRef), attestation.data.slot,
committee_index)
return await router.routeAttestation(

View File

@ -157,9 +157,8 @@ proc getAttachedValidator(node: BeaconNode,
nil
proc getAttachedValidator(node: BeaconNode,
epochRef: EpochRef,
idx: ValidatorIndex): AttachedValidator =
let key = epochRef.validatorKey(idx)
let key = node.dag.validatorKey(idx)
if key.isSome():
let validator = node.getAttachedValidator(key.get().toPubKey())
if validator != nil and validator.index != some(idx):
@ -169,7 +168,7 @@ proc getAttachedValidator(node: BeaconNode,
validator
else:
warn "Validator key not found",
idx, epoch = epochRef.epoch
idx, head = shortLog(node.dag.head)
nil
proc isSynced*(node: BeaconNode, head: BlockRef): bool =
@ -875,15 +874,16 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
warn "Cannot construct EpochRef for attestation head, report bug",
attestationHead = shortLog(attestationHead), slot
return
committees_per_slot = get_committee_count_per_slot(epochRef)
committees_per_slot = get_committee_count_per_slot(epochRef.shufflingRef)
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root = node.dag.genesis_validators_root
for committee_index in get_committee_indices(committees_per_slot):
let committee = get_beacon_committee(epochRef, slot, committee_index)
let committee = get_beacon_committee(
epochRef.shufflingRef, slot, committee_index)
for index_in_committee, validator_index in committee:
let validator = node.getAttachedValidator(epochRef, validator_index)
let validator = node.getAttachedValidator(validator_index)
if validator == nil:
continue
@ -947,16 +947,12 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode,
proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) =
# TODO Use a view type to avoid the copy
var
let
syncCommittee = node.dag.syncCommitteeParticipants(slot + 1)
epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for head, report bug",
attestationHead = shortLog(head), slot
return
for subcommitteeIdx in SyncSubcommitteeIndex:
for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
let validator = node.getAttachedValidator(epochRef, valIdx)
let validator = node.getAttachedValidator(valIdx)
if isNil(validator) or validator.index.isNone():
continue
asyncSpawn createAndSendSyncCommitteeMessage(node, validator, slot,
@ -1021,14 +1017,10 @@ proc handleSyncCommitteeContributions(
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root = node.dag.genesis_validators_root
syncCommittee = node.dag.syncCommitteeParticipants(slot + 1)
epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for head, report bug",
attestationHead = shortLog(head), slot
return
for subcommitteeIdx in SyncSubCommitteeIndex:
for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
let validator = node.getAttachedValidator(epochRef, valIdx)
let validator = node.getAttachedValidator(valIdx)
if validator == nil:
continue
@ -1062,7 +1054,7 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
await proposeBlock(node, validator, proposer.get(), head, slot)
proc signAndSendAggregate(
node: BeaconNode, validator: AttachedValidator, epochRef: EpochRef,
node: BeaconNode, validator: AttachedValidator, shufflingRef: ShufflingRef,
slot: Slot, committee_index: CommitteeIndex) {.async.} =
try:
let
@ -1080,7 +1072,8 @@ proc signAndSendAggregate(
res.get()
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#aggregation-selection
if not is_aggregator(epochRef, slot, committee_index, selectionProof):
if not is_aggregator(
shufflingRef, slot, committee_index, selectionProof):
return
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#construct-aggregate
@ -1119,19 +1112,19 @@ proc sendAggregatedAttestations(
# the given slot, for which `is_aggregator` returns `true.
let
epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr:
shufflingRef = node.dag.getShufflingRef(head, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for head, report bug",
head = shortLog(head), slot
return
committees_per_slot = get_committee_count_per_slot(epochRef)
committees_per_slot = get_committee_count_per_slot(shufflingRef)
for committee_index in get_committee_indices(committees_per_slot):
for _, validator_index in
get_beacon_committee(epochRef, slot, committee_index):
let validator = node.getAttachedValidator(epochRef, validator_index)
get_beacon_committee(shufflingRef, slot, committee_index):
let validator = node.getAttachedValidator(validator_index)
if validator != nil:
asyncSpawn signAndSendAggregate(
node, validator, epochRef, slot, committee_index)
node, validator, shufflingRef, slot, committee_index)
proc updateValidatorMetrics*(node: BeaconNode) =
# Technically, this only needs to be done on epoch transitions and if there's
@ -1419,19 +1412,19 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} =
# be getting the duties one slot at a time
for slot in wallSlot ..< wallSlot + SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS:
let
epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr:
shufflingRef = node.dag.getShufflingRef(head, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for duties - report bug",
head = shortLog(head), slot
return
let
fork = node.dag.forkAtEpoch(slot.epoch)
committees_per_slot = get_committee_count_per_slot(epochRef)
committees_per_slot = get_committee_count_per_slot(shufflingRef)
for committee_index in get_committee_indices(committees_per_slot):
let committee = get_beacon_committee(epochRef, slot, committee_index)
let committee = get_beacon_committee(shufflingRef, slot, committee_index)
for index_in_committee, validator_index in committee:
let validator = node.getAttachedValidator(epochRef, validator_index)
let validator = node.getAttachedValidator(validator_index)
if validator != nil:
let
subnet_id = compute_subnet_for_attestation(

View File

@ -74,6 +74,7 @@ suite "Block pool processing" & preset():
let
b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback)
b2Get = dag.getForkedBlock(b2.root)
sr = dag.findShufflingRef(b1Add[].bid, b1Add[].slot.epoch)
er = dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch)
validators = getStateField(dag.headState, validators).lenu64()
@ -89,15 +90,20 @@ suite "Block pool processing" & preset():
dag.getBlockIdAtSlot(b2Add[].slot).get() ==
BlockSlotId.init(dag.genesis, b2Add[].slot)
not er.isErr()
sr.isSome()
er.isSome()
# er reuses shuffling ref instance
er[].shufflingRef == sr[]
# Same epoch - same epochRef
er[] == dag.findEpochRef(b2Add[].bid, b2Add[].slot.epoch)[]
# Different epoch that was never processed
dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch + 1).isErr()
dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch + 1).isNone()
# ... but we know the shuffling already!
dag.findShufflingRef(b1Add[].bid, b1Add[].slot.epoch + 1).isSome()
er[].validatorKey(0'u64).isSome()
er[].validatorKey(validators - 1).isSome()
er[].validatorKey(validators).isNone()
dag.validatorKey(0'u64).isSome()
dag.validatorKey(validators - 1).isSome()
dag.validatorKey(validators).isNone()
# Skip one slot to get a gap
check:
@ -172,6 +178,9 @@ suite "Block pool processing" & preset():
check:
parentBsi.bid == dag.head.parent.bid
parentBsi.slot == nextEpochSlot
# Pre-heated caches
dag.findShufflingRef(dag.head.parent.bid, dag.head.slot.epoch).isOk()
dag.findShufflingRef(dag.head.parent.bid, nextEpoch).isOk()
dag.getEpochRef(dag.head.parent, nextEpoch, true).isOk()
# Getting an EpochRef should not result in states being stored