diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index 15f8dbbcf..c05f0e996 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -88,6 +88,13 @@ proc addResolvedHeadBlock( # Update light client data dag.processNewBlockForLightClient(state, trustedBlock, parent.bid) + # Pre-heat the shuffling cache with the shuffling caused by this block - this + # is useful for attestation duty lookahead, REST API queries and attestation + # validation of untaken forks (in case of instability / multiple heads) + if dag.findShufflingRef(blockRef.bid, blockRef.slot.epoch + 1).isNone: + dag.putShufflingRef( + ShufflingRef.init(state, cache, blockRef.slot.epoch + 1)) + if not blockVerified: dag.optimisticRoots.incl blockRoot diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index ed2f3f01c..a6e2639b0 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -189,6 +189,8 @@ type cfg*: RuntimeConfig + shufflingRefs*: array[16, ShufflingRef] + epochRefs*: array[32, EpochRef] ## Cached information about a particular epoch ending with the given ## block - we limit the number of held EpochRefs to put a cap on @@ -243,8 +245,18 @@ type epoch*: Epoch bid*: BlockId + ShufflingRef* = ref object + ## Attestation committee shuffling that determines when validators have + ## duties - determined with one epoch of look-ahead - the dependent root is + ## the block root that is the last to affect the shuffling outcome. + epoch*: Epoch + attester_dependent_root*: Eth2Digest + ## Root of the block that determined the shuffling - this is the last + ## block that was applied in (epoch - 2). + + shuffled_active_validator_indices*: seq[ValidatorIndex] + EpochRef* = ref object - dag*: ChainDAGRef key*: EpochKey eth1_data*: Eth1Data @@ -255,8 +267,7 @@ type beacon_proposers*: array[SLOTS_PER_EPOCH, Option[ValidatorIndex]] proposer_dependent_root*: Eth2Digest - shuffled_active_validator_indices*: seq[ValidatorIndex] - attester_dependent_root*: Eth2Digest + shufflingRef*: ShufflingRef # balances, as used in fork choice effective_balances_bytes*: seq[byte] diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 5c2315f59..b93635ee7 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -139,14 +139,6 @@ func validatorKey*( ## non-head branch)! dag.db.immutableValidators.load(index) -func validatorKey*( - epochRef: EpochRef, index: ValidatorIndex or uint64): Option[CookedPubKey] = - ## Returns the validator pubkey for the index, assuming it's been observed - ## at any point in time - this function may return pubkeys for indicies that - ## are not (yet) part of the head state (if the key has been observed on a - ## non-head branch)! - validatorKey(epochRef.dag, index) - template is_merge_transition_complete( stateParam: ForkedHashedBeaconState): bool = withState(stateParam): @@ -155,57 +147,6 @@ template is_merge_transition_complete( else: false -func init*( - T: type EpochRef, dag: ChainDAGRef, state: ForkedHashedBeaconState, - cache: var StateCache): T = - let - epoch = state.get_current_epoch() - proposer_dependent_root = withState(state): state.proposer_dependent_root - attester_dependent_root = withState(state): state.attester_dependent_root - epochRef = EpochRef( - dag: dag, # This gives access to the validator pubkeys through an EpochRef - key: dag.epochAncestor(state.latest_block_id, epoch), - - eth1_data: - getStateField(state, eth1_data), - eth1_deposit_index: - getStateField(state, eth1_deposit_index), - - checkpoints: - FinalityCheckpoints( - justified: getStateField(state, current_justified_checkpoint), - finalized: getStateField(state, finalized_checkpoint)), - - # beacon_proposers: Separately filled below - proposer_dependent_root: proposer_dependent_root, - - shuffled_active_validator_indices: - cache.get_shuffled_active_validator_indices(state, epoch), - attester_dependent_root: attester_dependent_root, - ) - epochStart = epoch.start_slot() - - for i in 0'u64.. 2: (epoch - 1).start_slot() - 1 else: Slot(0) + dependent_bsi = dag.atSlot(bid, dependent_slot).valueOr: + return Opt.none(ShufflingRef) + + for s in dag.shufflingRefs: + if s == nil: continue + if s.epoch == epoch and dependent_bsi.bid.root == s.attester_dependent_root: + return Opt.some s + Opt.none(ShufflingRef) + +func putShufflingRef*(dag: ChainDAGRef, shufflingRef: ShufflingRef) = + ## Store shuffling in the cache + if shufflingRef.epoch < dag.finalizedHead.slot.epoch(): + # Only cache epoch information for unfinalized blocks - earlier states + # are seldomly used (ie RPC), so no need to cache + return + + # Because we put a cap on the number of shufflingRef we store, we want to + # prune the least useful state - for now, we'll assume that to be the + # oldest shufflingRef we know about. + var + oldest = 0 + for x in 0..= dag.finalizedHead.slot.epoch(): - # Only cache epoch information for unfinalized blocks - earlier states - # are seldomly used (ie RPC), so no need to cache - - # Because we put a cap on the number of epochRefs we store, we want to - # prune the least useful state - for now, we'll assume that to be the - # oldest epochRef we know about. - - var - oldest = 0 - for x in 0..= uint64(MAX_COMMITTEES_PER_SLOT): @@ -589,17 +590,19 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = if wallSlot > request.slot + 1: return RestApiResponse.jsonError(Http400, SlotFromThePastError) - let epoch = request.slot.epoch - let epochRef = if epoch == wallEpoch: - currentEpoch.getAndCacheEpochRef(wallEpoch) - elif epoch == wallEpoch + 1: - nextEpoch.getAndCacheEpochRef(wallEpoch + 1) - else: - return RestApiResponse.jsonError(Http400, - SlotNotInNextWallSlotEpochError) + let + epoch = request.slot.epoch + shufflingRef = + if epoch == wallEpoch: + currentEpoch.getAndCacheShufflingRef(wallEpoch) + elif epoch == wallEpoch + 1: + nextEpoch.getAndCacheShufflingRef(wallEpoch + 1) + else: + return RestApiResponse.jsonError(Http400, + SlotNotInNextWallSlotEpochError) let subnet_id = compute_subnet_for_attestation( - get_committee_count_per_slot(epochRef), request.slot, + get_committee_count_per_slot(shufflingRef), request.slot, request.committee_index) node.actionTracker.registerDuty( diff --git a/beacon_chain/validators/action_tracker.nim b/beacon_chain/validators/action_tracker.nim index 8ff34fe38..6619a63c6 100644 --- a/beacon_chain/validators/action_tracker.nim +++ b/beacon_chain/validators/action_tracker.nim @@ -225,7 +225,7 @@ func updateActions*( let epoch = epochRef.epoch - tracker.attesterDepRoot = epochRef.attester_dependent_root + tracker.attesterDepRoot = epochRef.shufflingRef.attester_dependent_root tracker.lastCalculatedEpoch = epoch let validatorIndices = toHashSet(toSeq(tracker.knownValidators.keys())) @@ -243,7 +243,7 @@ func updateActions*( static: doAssert SLOTS_PER_EPOCH <= 32 for (committeeIndex, subnet_id, slot) in - get_committee_assignments(epochRef, validatorIndices): + get_committee_assignments(epochRef.shufflingRef, validatorIndices): doAssert epoch(slot) == epoch diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 229d73a2d..fb516274a 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -209,19 +209,19 @@ proc routeAttestation*( return err( "Attempt to send attestation for unknown target") - epochRef = router[].dag.getEpochRef( + shufflingRef = router[].dag.getShufflingRef( target, attestation.data.target.epoch, false).valueOr: warn "Cannot construct EpochRef for attestation, skipping send - report bug", target = shortLog(target), attestation = shortLog(attestation) return committee_index = - epochRef.get_committee_index(attestation.data.index).valueOr: + shufflingRef.get_committee_index(attestation.data.index).valueOr: notice "Invalid committee index in attestation", attestation = shortLog(attestation) return err("Invalid committee index in attestation") subnet_id = compute_subnet_for_attestation( - get_committee_count_per_slot(epochRef), attestation.data.slot, + get_committee_count_per_slot(shufflingRef), attestation.data.slot, committee_index) return await router.routeAttestation( diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index d0f8b595c..ff0b74e4a 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -157,9 +157,8 @@ proc getAttachedValidator(node: BeaconNode, nil proc getAttachedValidator(node: BeaconNode, - epochRef: EpochRef, idx: ValidatorIndex): AttachedValidator = - let key = epochRef.validatorKey(idx) + let key = node.dag.validatorKey(idx) if key.isSome(): let validator = node.getAttachedValidator(key.get().toPubKey()) if validator != nil and validator.index != some(idx): @@ -169,7 +168,7 @@ proc getAttachedValidator(node: BeaconNode, validator else: warn "Validator key not found", - idx, epoch = epochRef.epoch + idx, head = shortLog(node.dag.head) nil proc isSynced*(node: BeaconNode, head: BlockRef): bool = @@ -875,15 +874,16 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) = warn "Cannot construct EpochRef for attestation head, report bug", attestationHead = shortLog(attestationHead), slot return - committees_per_slot = get_committee_count_per_slot(epochRef) + committees_per_slot = get_committee_count_per_slot(epochRef.shufflingRef) fork = node.dag.forkAtEpoch(slot.epoch) genesis_validators_root = node.dag.genesis_validators_root for committee_index in get_committee_indices(committees_per_slot): - let committee = get_beacon_committee(epochRef, slot, committee_index) + let committee = get_beacon_committee( + epochRef.shufflingRef, slot, committee_index) for index_in_committee, validator_index in committee: - let validator = node.getAttachedValidator(epochRef, validator_index) + let validator = node.getAttachedValidator(validator_index) if validator == nil: continue @@ -947,16 +947,12 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode, proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) = # TODO Use a view type to avoid the copy - var + let syncCommittee = node.dag.syncCommitteeParticipants(slot + 1) - epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr: - warn "Cannot construct EpochRef for head, report bug", - attestationHead = shortLog(head), slot - return for subcommitteeIdx in SyncSubcommitteeIndex: for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx): - let validator = node.getAttachedValidator(epochRef, valIdx) + let validator = node.getAttachedValidator(valIdx) if isNil(validator) or validator.index.isNone(): continue asyncSpawn createAndSendSyncCommitteeMessage(node, validator, slot, @@ -1021,14 +1017,10 @@ proc handleSyncCommitteeContributions( fork = node.dag.forkAtEpoch(slot.epoch) genesis_validators_root = node.dag.genesis_validators_root syncCommittee = node.dag.syncCommitteeParticipants(slot + 1) - epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr: - warn "Cannot construct EpochRef for head, report bug", - attestationHead = shortLog(head), slot - return for subcommitteeIdx in SyncSubCommitteeIndex: for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx): - let validator = node.getAttachedValidator(epochRef, valIdx) + let validator = node.getAttachedValidator(valIdx) if validator == nil: continue @@ -1062,7 +1054,7 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot): await proposeBlock(node, validator, proposer.get(), head, slot) proc signAndSendAggregate( - node: BeaconNode, validator: AttachedValidator, epochRef: EpochRef, + node: BeaconNode, validator: AttachedValidator, shufflingRef: ShufflingRef, slot: Slot, committee_index: CommitteeIndex) {.async.} = try: let @@ -1080,7 +1072,8 @@ proc signAndSendAggregate( res.get() # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#aggregation-selection - if not is_aggregator(epochRef, slot, committee_index, selectionProof): + if not is_aggregator( + shufflingRef, slot, committee_index, selectionProof): return # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#construct-aggregate @@ -1119,19 +1112,19 @@ proc sendAggregatedAttestations( # the given slot, for which `is_aggregator` returns `true. let - epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr: + shufflingRef = node.dag.getShufflingRef(head, slot.epoch, false).valueOr: warn "Cannot construct EpochRef for head, report bug", head = shortLog(head), slot return - committees_per_slot = get_committee_count_per_slot(epochRef) + committees_per_slot = get_committee_count_per_slot(shufflingRef) for committee_index in get_committee_indices(committees_per_slot): for _, validator_index in - get_beacon_committee(epochRef, slot, committee_index): - let validator = node.getAttachedValidator(epochRef, validator_index) + get_beacon_committee(shufflingRef, slot, committee_index): + let validator = node.getAttachedValidator(validator_index) if validator != nil: asyncSpawn signAndSendAggregate( - node, validator, epochRef, slot, committee_index) + node, validator, shufflingRef, slot, committee_index) proc updateValidatorMetrics*(node: BeaconNode) = # Technically, this only needs to be done on epoch transitions and if there's @@ -1419,19 +1412,19 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} = # be getting the duties one slot at a time for slot in wallSlot ..< wallSlot + SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS: let - epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr: + shufflingRef = node.dag.getShufflingRef(head, slot.epoch, false).valueOr: warn "Cannot construct EpochRef for duties - report bug", head = shortLog(head), slot return let fork = node.dag.forkAtEpoch(slot.epoch) - committees_per_slot = get_committee_count_per_slot(epochRef) + committees_per_slot = get_committee_count_per_slot(shufflingRef) for committee_index in get_committee_indices(committees_per_slot): - let committee = get_beacon_committee(epochRef, slot, committee_index) + let committee = get_beacon_committee(shufflingRef, slot, committee_index) for index_in_committee, validator_index in committee: - let validator = node.getAttachedValidator(epochRef, validator_index) + let validator = node.getAttachedValidator(validator_index) if validator != nil: let subnet_id = compute_subnet_for_attestation( diff --git a/tests/test_blockchain_dag.nim b/tests/test_blockchain_dag.nim index 8cc72cef3..a4d58aa13 100644 --- a/tests/test_blockchain_dag.nim +++ b/tests/test_blockchain_dag.nim @@ -74,6 +74,7 @@ suite "Block pool processing" & preset(): let b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback) b2Get = dag.getForkedBlock(b2.root) + sr = dag.findShufflingRef(b1Add[].bid, b1Add[].slot.epoch) er = dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch) validators = getStateField(dag.headState, validators).lenu64() @@ -89,15 +90,20 @@ suite "Block pool processing" & preset(): dag.getBlockIdAtSlot(b2Add[].slot).get() == BlockSlotId.init(dag.genesis, b2Add[].slot) - not er.isErr() + sr.isSome() + er.isSome() + # er reuses shuffling ref instance + er[].shufflingRef == sr[] # Same epoch - same epochRef er[] == dag.findEpochRef(b2Add[].bid, b2Add[].slot.epoch)[] # Different epoch that was never processed - dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch + 1).isErr() + dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch + 1).isNone() + # ... but we know the shuffling already! + dag.findShufflingRef(b1Add[].bid, b1Add[].slot.epoch + 1).isSome() - er[].validatorKey(0'u64).isSome() - er[].validatorKey(validators - 1).isSome() - er[].validatorKey(validators).isNone() + dag.validatorKey(0'u64).isSome() + dag.validatorKey(validators - 1).isSome() + dag.validatorKey(validators).isNone() # Skip one slot to get a gap check: @@ -172,6 +178,9 @@ suite "Block pool processing" & preset(): check: parentBsi.bid == dag.head.parent.bid parentBsi.slot == nextEpochSlot + # Pre-heated caches + dag.findShufflingRef(dag.head.parent.bid, dag.head.slot.epoch).isOk() + dag.findShufflingRef(dag.head.parent.bid, nextEpoch).isOk() dag.getEpochRef(dag.head.parent, nextEpoch, true).isOk() # Getting an EpochRef should not result in states being stored