From 84a501d1ff80d6cb5b0ebf0e898891c1845ee56e Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 6 Aug 2020 21:48:47 +0200 Subject: [PATCH] remove one cache, add another (#1449) * remove one cache, add another This cache removes the need for rewinding in most attestation validation flow since the attestations come from one of two epochs and must be targetting a viable block. Additionally, it also removes all state caches which are less likely to be used over-all - more metrics are needed to track the rewinding. On risk is that when chains don't finalize, we'll have lots of epochrefs in memory meaning lots of validator key databases, most being exactly the same. This can be addressed in any number of ways. Some of the memory usage is mitigated by the fact that we previously had lots of big state caches and now we're keeping only keys instead. * cleanups * doc --- beacon_chain/attestation_aggregation.nim | 153 +++++++++--------- beacon_chain/attestation_pool.nim | 7 +- .../block_pools/block_pools_types.nim | 7 +- beacon_chain/block_pools/chain_dag.nim | 79 +-------- beacon_chain/block_pools/spec_cache.nim | 73 ++++++++- beacon_chain/spec/validator.nim | 3 +- beacon_chain/validator_api.nim | 2 +- beacon_chain/validator_duties.nim | 5 +- 8 files changed, 172 insertions(+), 157 deletions(-) diff --git a/beacon_chain/attestation_aggregation.nim b/beacon_chain/attestation_aggregation.nim index a76cd16e7..dd8e4269f 100644 --- a/beacon_chain/attestation_aggregation.nim +++ b/beacon_chain/attestation_aggregation.nim @@ -12,8 +12,8 @@ import ./spec/[ beaconstate, datatypes, crypto, digest, helpers, network, validator, signatures], - ./block_pools/[spec_cache, chain_dag, quarantine], ./attestation_pool, - ./beacon_node_types, ./ssz + ./block_pools/[spec_cache, chain_dag, quarantine, spec_cache], + ./attestation_pool, ./beacon_node_types, ./ssz logScope: topics = "att_aggr" @@ -108,7 +108,8 @@ func checkPropagationSlotRange(data: AttestationData, current_slot: Slot): bool # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id proc isValidAttestation*( - pool: var AttestationPool, attestation: Attestation, current_slot: Slot, + pool: var AttestationPool, + attestation: Attestation, current_slot: Slot, topicCommitteeIndex: uint64): bool = logScope: topics = "att_aggr valid_att" @@ -187,36 +188,40 @@ proc isValidAttestation*( # compute_start_slot_at_epoch(store.finalized_checkpoint.epoch)) == # store.finalized_checkpoint.root - pool.chainDag.withState( - pool.chainDag.tmpState, - tgtBlck.atSlot(attestation.data.target.epoch.compute_start_slot_at_epoch)): - # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id - # [REJECT] The attestation is for the correct subnet -- i.e. - # compute_subnet_for_attestation(committees_per_slot, - # attestation.data.slot, attestation.data.index) == subnet_id, where - # committees_per_slot = get_committee_count_per_slot(state, - # attestation.data.target.epoch), which may be pre-computed along with the - # committee information for the signature check. - var cache = getEpochCache(blck, state) - let - epochInfo = blck.getEpochInfo(state, cache) - requiredSubnetIndex = - compute_subnet_for_attestation( - get_committee_count_per_slot(epochInfo), - attestation.data.slot, attestation.data.index.CommitteeIndex) + let epochRef = pool.chainDag.getEpochRef( + tgtBlck, attestation.data.target.epoch) - if requiredSubnetIndex != topicCommitteeIndex: - debug "attestation's committee index not for the correct subnet", - topicCommitteeIndex = topicCommitteeIndex, - attestation_data_index = attestation.data.index, - requiredSubnetIndex = requiredSubnetIndex - return false + # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id + # [REJECT] The attestation is for the correct subnet -- i.e. + # compute_subnet_for_attestation(committees_per_slot, + # attestation.data.slot, attestation.data.index) == subnet_id, where + # committees_per_slot = get_committee_count_per_slot(state, + # attestation.data.target.epoch), which may be pre-computed along with the + # committee information for the signature check. + let + requiredSubnetIndex = + compute_subnet_for_attestation( + get_committee_count_per_slot(epochRef), + attestation.data.slot, attestation.data.index.CommitteeIndex) + + if requiredSubnetIndex != topicCommitteeIndex: + debug "attestation's committee index not for the correct subnet", + topicCommitteeIndex = topicCommitteeIndex, + attestation_data_index = attestation.data.index, + requiredSubnetIndex = requiredSubnetIndex + return false + + let + fork = pool.chainDag.headState.data.data.fork + genesis_validators_root = + pool.chainDag.headState.data.data.genesis_validators_root # The signature of attestation is valid. - if not is_valid_indexed_attestation( - state, get_indexed_attestation(state, attestation, cache), {}): - debug "signature verification failed" - return false + if not is_valid_indexed_attestation( + fork, genesis_validators_root, + epochRef, get_indexed_attestation(epochRef, attestation), {}): + debug "signature verification failed" + return false true @@ -303,53 +308,55 @@ proc isValidAggregatedAttestation*( pool.quarantine.addMissing(aggregate.data.target.root) return - # TODO this could be any state in the target epoch - pool.chainDag.withState( - pool.chainDag.tmpState, - tgtBlck.atSlot(aggregate.data.target.epoch.compute_start_slot_at_epoch)): - var cache = getEpochCache(blck, state) - if not is_aggregator( - state, aggregate.data.slot, aggregate.data.index.CommitteeIndex, - aggregate_and_proof.selection_proof, cache): - debug "Incorrect aggregator" - return false + let epochRef = pool.chainDag.getEpochRef(tgtBlck, aggregate.data.target.epoch) - # [REJECT] The aggregator's validator index is within the committee -- i.e. - # aggregate_and_proof.aggregator_index in get_beacon_committee(state, - # aggregate.data.slot, aggregate.data.index). - if aggregate_and_proof.aggregator_index.ValidatorIndex notin - get_beacon_committee( - state, aggregate.data.slot, aggregate.data.index.CommitteeIndex, cache): - debug "Aggregator's validator index not in committee" - return false + if not is_aggregator( + epochRef, aggregate.data.slot, aggregate.data.index.CommitteeIndex, + aggregate_and_proof.selection_proof): + debug "Incorrect aggregator" + return false - # [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the - # aggregate.data.slot by the validator with index - # aggregate_and_proof.aggregator_index. - # get_slot_signature(state, aggregate.data.slot, privkey) - if aggregate_and_proof.aggregator_index >= state.validators.lenu64: - debug "Invalid aggregator_index" - return false + # [REJECT] The aggregator's validator index is within the committee -- i.e. + # aggregate_and_proof.aggregator_index in get_beacon_committee(state, + # aggregate.data.slot, aggregate.data.index). + if aggregate_and_proof.aggregator_index.ValidatorIndex notin + get_beacon_committee( + epochRef, aggregate.data.slot, aggregate.data.index.CommitteeIndex): + debug "Aggregator's validator index not in committee" + return false - if not verify_slot_signature( - state.fork, state.genesis_validators_root, aggregate.data.slot, - state.validators[aggregate_and_proof.aggregator_index].pubkey, - aggregate_and_proof.selection_proof): - debug "Selection_proof signature verification failed" - return false + # [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the + # aggregate.data.slot by the validator with index + # aggregate_and_proof.aggregator_index. + # get_slot_signature(state, aggregate.data.slot, privkey) + if aggregate_and_proof.aggregator_index >= epochRef.validator_keys.lenu64: + debug "Invalid aggregator_index" + return false - # [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid. - if not verify_aggregate_and_proof_signature( - state.fork, state.genesis_validators_root, aggregate_and_proof, - state.validators[aggregate_and_proof.aggregator_index].pubkey, - signed_aggregate_and_proof.signature): - debug "Signed_aggregate_and_proof signature verification failed" - return false + let + fork = pool.chainDag.headState.data.data.fork + genesis_validators_root = + pool.chainDag.headState.data.data.genesis_validators_root + if not verify_slot_signature( + fork, genesis_validators_root, aggregate.data.slot, + epochRef.validator_keys[aggregate_and_proof.aggregator_index], + aggregate_and_proof.selection_proof): + debug "Selection_proof signature verification failed" + return false - # [REJECT] The signature of aggregate is valid. - if not is_valid_indexed_attestation( - state, get_indexed_attestation(state, aggregate, cache), {}): - debug "Aggregate signature verification failed" - return false + # [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid. + if not verify_aggregate_and_proof_signature( + fork, genesis_validators_root, aggregate_and_proof, + epochRef.validator_keys[aggregate_and_proof.aggregator_index], + signed_aggregate_and_proof.signature): + debug "Signed_aggregate_and_proof signature verification failed" + return false + + # [REJECT] The signature of aggregate is valid. + if not is_valid_indexed_attestation( + fork, genesis_validators_root, + epochRef, get_indexed_attestation(epochRef, aggregate), {}): + debug "Aggregate signature verification failed" + return false true diff --git a/beacon_chain/attestation_pool.nim b/beacon_chain/attestation_pool.nim index 54636e9d8..66d6db027 100644 --- a/beacon_chain/attestation_pool.nim +++ b/beacon_chain/attestation_pool.nim @@ -29,10 +29,9 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin # should probably be removed as a dependency of AttestationPool (or some other # smart refactoring) - chainDag.withState(chainDag.tmpState, chainDag.finalizedHead): - var forkChoice = initForkChoice( - chainDag.tmpState, - ).get() + let tmpState = newClone(chainDag.headState) + chainDag.withState(tmpState[], chainDag.finalizedHead): + var forkChoice = initForkChoice(tmpState[]).get() # Feed fork choice with unfinalized history - during startup, block pool only # keeps track of a single history so we just need to follow it diff --git a/beacon_chain/block_pools/block_pools_types.nim b/beacon_chain/block_pools/block_pools_types.nim index 9180a4a33..7ad5170e8 100644 --- a/beacon_chain/block_pools/block_pools_types.nim +++ b/beacon_chain/block_pools/block_pools_types.nim @@ -115,9 +115,6 @@ type # ----------------------------------- # Rewinder - Mutable state processing - cachedStates*: seq[tuple[blockRoot: Eth2Digest, slot: Slot, - state: ref HashedBeaconState]] - headState*: StateData ##\ ## State given by the head block; only update in `updateHead`, not anywhere ## else via `withState` @@ -143,6 +140,10 @@ type beacon_proposers*: array[ SLOTS_PER_EPOCH, Option[(ValidatorIndex, ValidatorPubKey)]] shuffled_active_validator_indices*: seq[ValidatorIndex] + # This is an expensive cache that could probably be shared among epochref + # instances - in particular, validators keep their keys and locations in the + # structure + validator_keys*: seq[ValidatorPubKey] BlockRef* = ref object ## Node in object graph guaranteed to lead back to tail block, and to have diff --git a/beacon_chain/block_pools/chain_dag.nim b/beacon_chain/block_pools/chain_dag.nim index ee424d9e4..6a4535d20 100644 --- a/beacon_chain/block_pools/chain_dag.nim +++ b/beacon_chain/block_pools/chain_dag.nim @@ -22,8 +22,8 @@ import export block_pools_types declareCounter beacon_reorgs_total, "Total occurrences of reorganizations of the chain" # On fork choice -declareCounter beacon_state_data_cache_hits, "dag.cachedStates hits" -declareCounter beacon_state_data_cache_misses, "dag.cachedStates misses" +declareCounter beacon_state_data_cache_hits, "EpochRef hits" +declareCounter beacon_state_data_cache_misses, "EpochRef misses" logScope: topics = "hotdb" @@ -76,7 +76,9 @@ proc init*(T: type EpochRef, state: BeaconState, cache: var StateCache): T = state, cache, epoch.compute_start_slot_at_epoch() + i) if idx.isSome(): epochRef.beacon_proposers[i] = - some((idx.get(), state.validators[idx.get].pubkey.initPubKey)) + some((idx.get(), state.validators[idx.get].pubkey)) + + epochRef.validator_keys = mapIt(state.validators.toSeq, it.pubkey) epochRef func link*(parent, child: BlockRef) = @@ -344,11 +346,14 @@ proc getEpochRef*(dag: ChainDAGRef, blck: BlockRef, epoch: Epoch): EpochRef = # we start at the most recent one for e in bs.blck.epochsInfo: if e.epoch == epoch: + beacon_state_data_cache_hits.inc return e if bs.slot == epoch.compute_start_slot_at_epoch: break bs = bs.parent + beacon_state_data_cache_misses.inc + dag.withState(dag.tmpState, bs): var cache = StateCache() getEpochInfo(blck, state, cache) @@ -376,39 +381,6 @@ proc getState( true -func getStateCacheIndex( - dag: ChainDAGRef, blockRoot: Eth2Digest, slot: Slot, matchEpoch: bool): - int = - for i, cachedState in dag.cachedStates: - let (cacheBlockRoot, cacheSlot, _) = cachedState - if cacheBlockRoot == blockRoot and (cacheSlot == slot or - (matchEpoch and - cacheSlot.compute_epoch_at_slot == slot.compute_epoch_at_slot)): - return i - - -1 - -func putStateCache*( - dag: ChainDAGRef, state: HashedBeaconState, blck: BlockRef) = - # Efficiently access states for both attestation aggregation and to process - # block proposals going back to the last finalized slot. - let stateCacheIndex = - dag.getStateCacheIndex(blck.root, state.data.slot, false) - if stateCacheIndex == -1: - const MAX_CACHE_SIZE = 18 - - let cacheLen = dag.cachedStates.len - doAssert cacheLen <= MAX_CACHE_SIZE - - let entry = - if dag.cachedStates.len == MAX_CACHE_SIZE: dag.cachedStates.pop().state - else: (ref HashedBeaconState)() - assign(entry[], state) - - insert(dag.cachedStates, (blck.root, state.data.slot, entry)) - trace "ChainDAGRef.putState(): state cache updated", - cacheLen, root = shortLog(blck.root), slot = state.data.slot - proc putState*(dag: ChainDAGRef, state: HashedBeaconState, blck: BlockRef) = # TODO we save state at every epoch start but never remove them - we also # potentially save multiple states per slot if reorgs happen, meaning @@ -434,9 +406,6 @@ proc putState*(dag: ChainDAGRef, state: HashedBeaconState, blck: BlockRef) = if not rootWritten: dag.db.putStateRoot(blck.root, state.data.slot, state.root) - if state.data.slot mod 2 == 0: - putStateCache(dag, state, blck) - func getRef*(dag: ChainDAGRef, root: Eth2Digest): BlockRef = ## Retrieve a resolved block reference, if available dag.blocks.getOrDefault(root, nil) @@ -577,24 +546,6 @@ proc rewindState( if parBs.blck != curBs.blck: ancestors.add(parBs.blck) - # TODO investigate replacing with getStateCached, by refactoring whole - # function. Empirically, this becomes pretty rare once good caches are - # used in the front-end. - let idx = dag.getStateCacheIndex(parBs.blck.root, parBs.slot, matchEpoch) - if idx >= 0: - assign(state.data, dag.cachedStates[idx].state[]) - let ancestor = ancestors.pop() - state.blck = ancestor - - beacon_state_data_cache_hits.inc() - trace "Replaying state transitions via in-memory cache", - stateSlot = shortLog(state.data.data.slot), - ancestorStateRoot = shortLog(state.data.root), - ancestors = ancestors.len - - return ancestors - - beacon_state_data_cache_misses.inc() if (let tmp = dag.db.getStateRoot(parBs.blck.root, parBs.slot); tmp.isSome()): if dag.db.containsState(tmp.get): stateRoot = tmp @@ -636,21 +587,9 @@ proc getStateDataCached( # mostly matches updateStateData(...), because it's too expensive to run the # rewindState(...)/skipAndUpdateState(...)/state_transition(...) procs, when # each hash_tree_root(...) consumes a nontrivial fraction of a second. - when false: - # For debugging/development purposes to assess required lookback window for - # any given use case. - doAssert state.data.data.slot <= bs.slot + 4 - - let idx = dag.getStateCacheIndex(bs.blck.root, bs.slot, matchEpoch) - if idx >= 0: - assign(state.data, dag.cachedStates[idx].state[]) - state.blck = bs.blck - beacon_state_data_cache_hits.inc() - return true # In-memory caches didn't hit. Try main block pool database. This is slower # than the caches due to SSZ (de)serializing and disk I/O, so prefer them. - beacon_state_data_cache_misses.inc() if (let tmp = dag.db.getStateRoot(bs.blck.root, bs.slot); tmp.isSome()): return dag.getState(dag.db, tmp.get(), bs.blck, state) @@ -716,8 +655,6 @@ proc updateStateData*( state.blck = bs.blck - dag.putStateCache(state.data, bs.blck) - proc loadTailState*(dag: ChainDAGRef): StateData = ## Load the state associated with the current tail in the dag let stateRoot = dag.db.getBlock(dag.tail.root).get().message.state_root diff --git a/beacon_chain/block_pools/spec_cache.nim b/beacon_chain/block_pools/spec_cache.nim index c4ac16ce4..3930bc6a8 100644 --- a/beacon_chain/block_pools/spec_cache.nim +++ b/beacon_chain/block_pools/spec_cache.nim @@ -7,16 +7,22 @@ import std/[algorithm, sequtils, sets], - ../spec/[beaconstate, datatypes, presets, validator], - block_pools_types + chronicles, + ../spec/[ + beaconstate, crypto, datatypes, digest, helpers, presets, signatures, + validator], + ../extras, + ./block_pools_types # Spec functions implemented based on cached values instead of the full state func count_active_validators*(epochInfo: EpochRef): uint64 = epochInfo.shuffled_active_validator_indices.lenu64 +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#get_committee_count_per_slot func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 = get_committee_count_per_slot(count_active_validators(epochInfo)) +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#get_beacon_committee func get_beacon_committee*( epochRef: EpochRef, slot: Slot, index: CommitteeIndex): seq[ValidatorIndex] = # Return the beacon committee at ``slot`` for ``index``. @@ -53,3 +59,66 @@ func get_indexed_attestation*(epochRef: EpochRef, attestation: Attestation): Ind data: attestation.data, signature: attestation.signature ) + +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#get_beacon_committee +func get_beacon_committee_len*( + epochRef: EpochRef, slot: Slot, index: CommitteeIndex): uint64 = + # Return the number of members in the beacon committee at ``slot`` for ``index``. + let + epoch = compute_epoch_at_slot(slot) + committees_per_slot = get_committee_count_per_slot(epochRef) + + compute_committee_len( + count_active_validators(epochRef), + (slot mod SLOTS_PER_EPOCH) * committees_per_slot + + index.uint64, + committees_per_slot * SLOTS_PER_EPOCH + ) + +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#aggregation-selection +func is_aggregator*(epochRef: EpochRef, slot: Slot, index: CommitteeIndex, + slot_signature: ValidatorSig): bool = + let + committee_len = get_beacon_committee_len(epochRef, slot, index) + modulo = max(1'u64, committee_len div TARGET_AGGREGATORS_PER_COMMITTEE) + bytes_to_uint64(eth2digest(slot_signature.toRaw()).data[0..7]) mod modulo == 0 + +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#is_valid_indexed_attestation +proc is_valid_indexed_attestation*( + fork: Fork, genesis_validators_root: Eth2Digest, + epochRef: EpochRef, indexed_attestation: SomeIndexedAttestation, + flags: UpdateFlags): bool = + # Check if ``indexed_attestation`` is not empty, has sorted and unique + # indices and has a valid aggregate signature. + + template is_sorted_and_unique(s: untyped): bool = + for i in 1 ..< s.len: + if s[i - 1].uint64 >= s[i].uint64: + return false + + true + + # Not from spec, but this function gets used in front-line roles, not just + # behind firewall. + let num_validators = epochRef.validator_keys.lenu64 + if anyIt(indexed_attestation.attesting_indices, it >= num_validators): + trace "indexed attestation: not all indices valid validators" + return false + + # Verify indices are sorted and unique + let indices = indexed_attestation.attesting_indices.asSeq + if len(indices) == 0 or not is_sorted_and_unique(indices): + trace "indexed attestation: indices not sorted and unique" + return false + + # Verify aggregate signature + if skipBLSValidation notin flags: + # TODO: fuse loops with blsFastAggregateVerify + let pubkeys = mapIt(indices, epochRef.validator_keys[it]) + if not verify_attestation_signature( + fork, genesis_validators_root, indexed_attestation.data, + pubkeys, indexed_attestation.signature): + trace "indexed attestation: signature verification failure" + return false + + true diff --git a/beacon_chain/spec/validator.nim b/beacon_chain/spec/validator.nim index 186183608..81bed1ab9 100644 --- a/beacon_chain/spec/validator.nim +++ b/beacon_chain/spec/validator.nim @@ -172,7 +172,7 @@ func compute_committee*(shuffled_indices: seq[ValidatorIndex], except KeyError: raiseAssert("Cached entries are added before use") -func compute_committee_len(active_validators: uint64, +func compute_committee_len*(active_validators: uint64, index: uint64, count: uint64): uint64 = ## Return the committee corresponding to ``indices``, ``seed``, ``index``, ## and committee ``count``. @@ -206,6 +206,7 @@ func get_beacon_committee*( committees_per_slot * SLOTS_PER_EPOCH ) +# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#get_beacon_committee func get_beacon_committee_len*( state: BeaconState, slot: Slot, index: CommitteeIndex, cache: var StateCache): uint64 = diff --git a/beacon_chain/validator_api.nim b/beacon_chain/validator_api.nim index 82331d42f..ba1445363 100644 --- a/beacon_chain/validator_api.nim +++ b/beacon_chain/validator_api.nim @@ -375,7 +375,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = let currSlot = compute_start_slot_at_epoch(epoch) + i let proposer = node.chainDag.getProposer(head, currSlot) if proposer.isSome(): - result.add((public_key: proposer.get()[1], slot: currSlot)) + result.add((public_key: proposer.get()[1].initPubKey(), slot: currSlot)) rpcServer.rpc("post_v1_validator_beacon_committee_subscriptions") do ( committee_index: CommitteeIndex, slot: Slot, aggregator: bool, diff --git a/beacon_chain/validator_duties.nim b/beacon_chain/validator_duties.nim index edddbec46..9006c1bae 100644 --- a/beacon_chain/validator_duties.nim +++ b/beacon_chain/validator_duties.nim @@ -358,7 +358,8 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot): if proposer.isNone(): return head - let validator = node.attachedValidators.getValidator(proposer.get()[1]) + let validator = + node.attachedValidators.getValidator(proposer.get()[1].initPubKey()) if validator != nil: return await proposeBlock(node, validator, proposer.get()[0], head, slot) @@ -367,7 +368,7 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot): headRoot = shortLog(head.root), slot = shortLog(slot), proposer_index = proposer.get()[0], - proposer = shortLog(proposer.get()[1]), + proposer = shortLog(proposer.get()[1].initPubKey()), pcs = "wait_for_proposal" return head