From 0d9fd548577206908f1827f3c0687816aae8ac6f Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 18 Aug 2022 20:07:01 +0200 Subject: [PATCH] cache shuffling separately from other EpochRef data (fixes #2677) (#3990) In order to avoid full replays when validating attestations hailing from untaken forks, it's better to keep shufflings separate from `EpochRef` and perform a lookahead on the shuffling when processing the block that determines them. This also helps performance in the case where REST clients are trying to perform lookahead on attestation duties and decreases memory usage by sharing shufflings between EpochRef instances of the same dependent root. --- .../block_clearance.nim | 7 + .../block_pools_types.nim | 17 +- .../consensus_object_pools/blockchain_dag.nim | 251 ++++++++++++------ .../consensus_object_pools/spec_cache.nim | 84 +++--- .../gossip_processing/batch_validation.nim | 6 +- .../gossip_processing/gossip_validation.nim | 46 ++-- beacon_chain/rpc/rest_validator_api.nim | 51 ++-- beacon_chain/validators/action_tracker.nim | 4 +- beacon_chain/validators/message_router.nim | 6 +- beacon_chain/validators/validator_duties.nim | 49 ++-- tests/test_blockchain_dag.nim | 19 +- 11 files changed, 319 insertions(+), 221 deletions(-) diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index 15f8dbbcf..c05f0e996 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -88,6 +88,13 @@ proc addResolvedHeadBlock( # Update light client data dag.processNewBlockForLightClient(state, trustedBlock, parent.bid) + # Pre-heat the shuffling cache with the shuffling caused by this block - this + # is useful for attestation duty lookahead, REST API queries and attestation + # validation of untaken forks (in case of instability / multiple heads) + if dag.findShufflingRef(blockRef.bid, blockRef.slot.epoch + 1).isNone: + dag.putShufflingRef( + ShufflingRef.init(state, cache, blockRef.slot.epoch + 1)) + if not blockVerified: dag.optimisticRoots.incl blockRoot diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index ed2f3f01c..a6e2639b0 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -189,6 +189,8 @@ type cfg*: RuntimeConfig + shufflingRefs*: array[16, ShufflingRef] + epochRefs*: array[32, EpochRef] ## Cached information about a particular epoch ending with the given ## block - we limit the number of held EpochRefs to put a cap on @@ -243,8 +245,18 @@ type epoch*: Epoch bid*: BlockId + ShufflingRef* = ref object + ## Attestation committee shuffling that determines when validators have + ## duties - determined with one epoch of look-ahead - the dependent root is + ## the block root that is the last to affect the shuffling outcome. + epoch*: Epoch + attester_dependent_root*: Eth2Digest + ## Root of the block that determined the shuffling - this is the last + ## block that was applied in (epoch - 2). + + shuffled_active_validator_indices*: seq[ValidatorIndex] + EpochRef* = ref object - dag*: ChainDAGRef key*: EpochKey eth1_data*: Eth1Data @@ -255,8 +267,7 @@ type beacon_proposers*: array[SLOTS_PER_EPOCH, Option[ValidatorIndex]] proposer_dependent_root*: Eth2Digest - shuffled_active_validator_indices*: seq[ValidatorIndex] - attester_dependent_root*: Eth2Digest + shufflingRef*: ShufflingRef # balances, as used in fork choice effective_balances_bytes*: seq[byte] diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 5c2315f59..b93635ee7 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -139,14 +139,6 @@ func validatorKey*( ## non-head branch)! dag.db.immutableValidators.load(index) -func validatorKey*( - epochRef: EpochRef, index: ValidatorIndex or uint64): Option[CookedPubKey] = - ## Returns the validator pubkey for the index, assuming it's been observed - ## at any point in time - this function may return pubkeys for indicies that - ## are not (yet) part of the head state (if the key has been observed on a - ## non-head branch)! - validatorKey(epochRef.dag, index) - template is_merge_transition_complete( stateParam: ForkedHashedBeaconState): bool = withState(stateParam): @@ -155,57 +147,6 @@ template is_merge_transition_complete( else: false -func init*( - T: type EpochRef, dag: ChainDAGRef, state: ForkedHashedBeaconState, - cache: var StateCache): T = - let - epoch = state.get_current_epoch() - proposer_dependent_root = withState(state): state.proposer_dependent_root - attester_dependent_root = withState(state): state.attester_dependent_root - epochRef = EpochRef( - dag: dag, # This gives access to the validator pubkeys through an EpochRef - key: dag.epochAncestor(state.latest_block_id, epoch), - - eth1_data: - getStateField(state, eth1_data), - eth1_deposit_index: - getStateField(state, eth1_deposit_index), - - checkpoints: - FinalityCheckpoints( - justified: getStateField(state, current_justified_checkpoint), - finalized: getStateField(state, finalized_checkpoint)), - - # beacon_proposers: Separately filled below - proposer_dependent_root: proposer_dependent_root, - - shuffled_active_validator_indices: - cache.get_shuffled_active_validator_indices(state, epoch), - attester_dependent_root: attester_dependent_root, - ) - epochStart = epoch.start_slot() - - for i in 0'u64.. 2: (epoch - 1).start_slot() - 1 else: Slot(0) + dependent_bsi = dag.atSlot(bid, dependent_slot).valueOr: + return Opt.none(ShufflingRef) + + for s in dag.shufflingRefs: + if s == nil: continue + if s.epoch == epoch and dependent_bsi.bid.root == s.attester_dependent_root: + return Opt.some s + Opt.none(ShufflingRef) + +func putShufflingRef*(dag: ChainDAGRef, shufflingRef: ShufflingRef) = + ## Store shuffling in the cache + if shufflingRef.epoch < dag.finalizedHead.slot.epoch(): + # Only cache epoch information for unfinalized blocks - earlier states + # are seldomly used (ie RPC), so no need to cache + return + + # Because we put a cap on the number of shufflingRef we store, we want to + # prune the least useful state - for now, we'll assume that to be the + # oldest shufflingRef we know about. + var + oldest = 0 + for x in 0..= dag.finalizedHead.slot.epoch(): - # Only cache epoch information for unfinalized blocks - earlier states - # are seldomly used (ie RPC), so no need to cache - - # Because we put a cap on the number of epochRefs we store, we want to - # prune the least useful state - for now, we'll assume that to be the - # oldest epochRef we know about. - - var - oldest = 0 - for x in 0..= uint64(MAX_COMMITTEES_PER_SLOT): @@ -589,17 +590,19 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = if wallSlot > request.slot + 1: return RestApiResponse.jsonError(Http400, SlotFromThePastError) - let epoch = request.slot.epoch - let epochRef = if epoch == wallEpoch: - currentEpoch.getAndCacheEpochRef(wallEpoch) - elif epoch == wallEpoch + 1: - nextEpoch.getAndCacheEpochRef(wallEpoch + 1) - else: - return RestApiResponse.jsonError(Http400, - SlotNotInNextWallSlotEpochError) + let + epoch = request.slot.epoch + shufflingRef = + if epoch == wallEpoch: + currentEpoch.getAndCacheShufflingRef(wallEpoch) + elif epoch == wallEpoch + 1: + nextEpoch.getAndCacheShufflingRef(wallEpoch + 1) + else: + return RestApiResponse.jsonError(Http400, + SlotNotInNextWallSlotEpochError) let subnet_id = compute_subnet_for_attestation( - get_committee_count_per_slot(epochRef), request.slot, + get_committee_count_per_slot(shufflingRef), request.slot, request.committee_index) node.actionTracker.registerDuty( diff --git a/beacon_chain/validators/action_tracker.nim b/beacon_chain/validators/action_tracker.nim index 8ff34fe38..6619a63c6 100644 --- a/beacon_chain/validators/action_tracker.nim +++ b/beacon_chain/validators/action_tracker.nim @@ -225,7 +225,7 @@ func updateActions*( let epoch = epochRef.epoch - tracker.attesterDepRoot = epochRef.attester_dependent_root + tracker.attesterDepRoot = epochRef.shufflingRef.attester_dependent_root tracker.lastCalculatedEpoch = epoch let validatorIndices = toHashSet(toSeq(tracker.knownValidators.keys())) @@ -243,7 +243,7 @@ func updateActions*( static: doAssert SLOTS_PER_EPOCH <= 32 for (committeeIndex, subnet_id, slot) in - get_committee_assignments(epochRef, validatorIndices): + get_committee_assignments(epochRef.shufflingRef, validatorIndices): doAssert epoch(slot) == epoch diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 229d73a2d..fb516274a 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -209,19 +209,19 @@ proc routeAttestation*( return err( "Attempt to send attestation for unknown target") - epochRef = router[].dag.getEpochRef( + shufflingRef = router[].dag.getShufflingRef( target, attestation.data.target.epoch, false).valueOr: warn "Cannot construct EpochRef for attestation, skipping send - report bug", target = shortLog(target), attestation = shortLog(attestation) return committee_index = - epochRef.get_committee_index(attestation.data.index).valueOr: + shufflingRef.get_committee_index(attestation.data.index).valueOr: notice "Invalid committee index in attestation", attestation = shortLog(attestation) return err("Invalid committee index in attestation") subnet_id = compute_subnet_for_attestation( - get_committee_count_per_slot(epochRef), attestation.data.slot, + get_committee_count_per_slot(shufflingRef), attestation.data.slot, committee_index) return await router.routeAttestation( diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index d0f8b595c..ff0b74e4a 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -157,9 +157,8 @@ proc getAttachedValidator(node: BeaconNode, nil proc getAttachedValidator(node: BeaconNode, - epochRef: EpochRef, idx: ValidatorIndex): AttachedValidator = - let key = epochRef.validatorKey(idx) + let key = node.dag.validatorKey(idx) if key.isSome(): let validator = node.getAttachedValidator(key.get().toPubKey()) if validator != nil and validator.index != some(idx): @@ -169,7 +168,7 @@ proc getAttachedValidator(node: BeaconNode, validator else: warn "Validator key not found", - idx, epoch = epochRef.epoch + idx, head = shortLog(node.dag.head) nil proc isSynced*(node: BeaconNode, head: BlockRef): bool = @@ -875,15 +874,16 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) = warn "Cannot construct EpochRef for attestation head, report bug", attestationHead = shortLog(attestationHead), slot return - committees_per_slot = get_committee_count_per_slot(epochRef) + committees_per_slot = get_committee_count_per_slot(epochRef.shufflingRef) fork = node.dag.forkAtEpoch(slot.epoch) genesis_validators_root = node.dag.genesis_validators_root for committee_index in get_committee_indices(committees_per_slot): - let committee = get_beacon_committee(epochRef, slot, committee_index) + let committee = get_beacon_committee( + epochRef.shufflingRef, slot, committee_index) for index_in_committee, validator_index in committee: - let validator = node.getAttachedValidator(epochRef, validator_index) + let validator = node.getAttachedValidator(validator_index) if validator == nil: continue @@ -947,16 +947,12 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode, proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) = # TODO Use a view type to avoid the copy - var + let syncCommittee = node.dag.syncCommitteeParticipants(slot + 1) - epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr: - warn "Cannot construct EpochRef for head, report bug", - attestationHead = shortLog(head), slot - return for subcommitteeIdx in SyncSubcommitteeIndex: for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx): - let validator = node.getAttachedValidator(epochRef, valIdx) + let validator = node.getAttachedValidator(valIdx) if isNil(validator) or validator.index.isNone(): continue asyncSpawn createAndSendSyncCommitteeMessage(node, validator, slot, @@ -1021,14 +1017,10 @@ proc handleSyncCommitteeContributions( fork = node.dag.forkAtEpoch(slot.epoch) genesis_validators_root = node.dag.genesis_validators_root syncCommittee = node.dag.syncCommitteeParticipants(slot + 1) - epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr: - warn "Cannot construct EpochRef for head, report bug", - attestationHead = shortLog(head), slot - return for subcommitteeIdx in SyncSubCommitteeIndex: for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx): - let validator = node.getAttachedValidator(epochRef, valIdx) + let validator = node.getAttachedValidator(valIdx) if validator == nil: continue @@ -1062,7 +1054,7 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot): await proposeBlock(node, validator, proposer.get(), head, slot) proc signAndSendAggregate( - node: BeaconNode, validator: AttachedValidator, epochRef: EpochRef, + node: BeaconNode, validator: AttachedValidator, shufflingRef: ShufflingRef, slot: Slot, committee_index: CommitteeIndex) {.async.} = try: let @@ -1080,7 +1072,8 @@ proc signAndSendAggregate( res.get() # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#aggregation-selection - if not is_aggregator(epochRef, slot, committee_index, selectionProof): + if not is_aggregator( + shufflingRef, slot, committee_index, selectionProof): return # https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#construct-aggregate @@ -1119,19 +1112,19 @@ proc sendAggregatedAttestations( # the given slot, for which `is_aggregator` returns `true. let - epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr: + shufflingRef = node.dag.getShufflingRef(head, slot.epoch, false).valueOr: warn "Cannot construct EpochRef for head, report bug", head = shortLog(head), slot return - committees_per_slot = get_committee_count_per_slot(epochRef) + committees_per_slot = get_committee_count_per_slot(shufflingRef) for committee_index in get_committee_indices(committees_per_slot): for _, validator_index in - get_beacon_committee(epochRef, slot, committee_index): - let validator = node.getAttachedValidator(epochRef, validator_index) + get_beacon_committee(shufflingRef, slot, committee_index): + let validator = node.getAttachedValidator(validator_index) if validator != nil: asyncSpawn signAndSendAggregate( - node, validator, epochRef, slot, committee_index) + node, validator, shufflingRef, slot, committee_index) proc updateValidatorMetrics*(node: BeaconNode) = # Technically, this only needs to be done on epoch transitions and if there's @@ -1419,19 +1412,19 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} = # be getting the duties one slot at a time for slot in wallSlot ..< wallSlot + SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS: let - epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr: + shufflingRef = node.dag.getShufflingRef(head, slot.epoch, false).valueOr: warn "Cannot construct EpochRef for duties - report bug", head = shortLog(head), slot return let fork = node.dag.forkAtEpoch(slot.epoch) - committees_per_slot = get_committee_count_per_slot(epochRef) + committees_per_slot = get_committee_count_per_slot(shufflingRef) for committee_index in get_committee_indices(committees_per_slot): - let committee = get_beacon_committee(epochRef, slot, committee_index) + let committee = get_beacon_committee(shufflingRef, slot, committee_index) for index_in_committee, validator_index in committee: - let validator = node.getAttachedValidator(epochRef, validator_index) + let validator = node.getAttachedValidator(validator_index) if validator != nil: let subnet_id = compute_subnet_for_attestation( diff --git a/tests/test_blockchain_dag.nim b/tests/test_blockchain_dag.nim index 8cc72cef3..a4d58aa13 100644 --- a/tests/test_blockchain_dag.nim +++ b/tests/test_blockchain_dag.nim @@ -74,6 +74,7 @@ suite "Block pool processing" & preset(): let b2Add = dag.addHeadBlock(verifier, b2, nilPhase0Callback) b2Get = dag.getForkedBlock(b2.root) + sr = dag.findShufflingRef(b1Add[].bid, b1Add[].slot.epoch) er = dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch) validators = getStateField(dag.headState, validators).lenu64() @@ -89,15 +90,20 @@ suite "Block pool processing" & preset(): dag.getBlockIdAtSlot(b2Add[].slot).get() == BlockSlotId.init(dag.genesis, b2Add[].slot) - not er.isErr() + sr.isSome() + er.isSome() + # er reuses shuffling ref instance + er[].shufflingRef == sr[] # Same epoch - same epochRef er[] == dag.findEpochRef(b2Add[].bid, b2Add[].slot.epoch)[] # Different epoch that was never processed - dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch + 1).isErr() + dag.findEpochRef(b1Add[].bid, b1Add[].slot.epoch + 1).isNone() + # ... but we know the shuffling already! + dag.findShufflingRef(b1Add[].bid, b1Add[].slot.epoch + 1).isSome() - er[].validatorKey(0'u64).isSome() - er[].validatorKey(validators - 1).isSome() - er[].validatorKey(validators).isNone() + dag.validatorKey(0'u64).isSome() + dag.validatorKey(validators - 1).isSome() + dag.validatorKey(validators).isNone() # Skip one slot to get a gap check: @@ -172,6 +178,9 @@ suite "Block pool processing" & preset(): check: parentBsi.bid == dag.head.parent.bid parentBsi.slot == nextEpochSlot + # Pre-heated caches + dag.findShufflingRef(dag.head.parent.bid, dag.head.slot.epoch).isOk() + dag.findShufflingRef(dag.head.parent.bid, nextEpoch).isOk() dag.getEpochRef(dag.head.parent, nextEpoch, true).isOk() # Getting an EpochRef should not result in states being stored