remove one cache, add another (#1449)

* remove one cache, add another

This cache removes the need for rewinding in most attestation validation
flow since the attestations come from one of two epochs and must be
targetting a viable block.

Additionally, it also removes all state caches which are less likely to
be used over-all - more metrics are needed to track the rewinding.

On risk is that when chains don't finalize, we'll have lots of epochrefs
in memory meaning lots of validator key databases, most being exactly
the same. This can be addressed in any number of ways. Some of the
memory usage is mitigated by the fact that we previously had lots of big
state caches and now we're keeping only keys instead.

* cleanups

* doc
This commit is contained in:
Jacek Sieka 2020-08-06 21:48:47 +02:00 committed by GitHub
parent c5077af4bc
commit 84a501d1ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 172 additions and 157 deletions

View File

@ -12,8 +12,8 @@ import
./spec/[
beaconstate, datatypes, crypto, digest, helpers, network, validator,
signatures],
./block_pools/[spec_cache, chain_dag, quarantine], ./attestation_pool,
./beacon_node_types, ./ssz
./block_pools/[spec_cache, chain_dag, quarantine, spec_cache],
./attestation_pool, ./beacon_node_types, ./ssz
logScope:
topics = "att_aggr"
@ -108,7 +108,8 @@ func checkPropagationSlotRange(data: AttestationData, current_slot: Slot): bool
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id
proc isValidAttestation*(
pool: var AttestationPool, attestation: Attestation, current_slot: Slot,
pool: var AttestationPool,
attestation: Attestation, current_slot: Slot,
topicCommitteeIndex: uint64): bool =
logScope:
topics = "att_aggr valid_att"
@ -187,36 +188,40 @@ proc isValidAttestation*(
# compute_start_slot_at_epoch(store.finalized_checkpoint.epoch)) ==
# store.finalized_checkpoint.root
pool.chainDag.withState(
pool.chainDag.tmpState,
tgtBlck.atSlot(attestation.data.target.epoch.compute_start_slot_at_epoch)):
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id
# [REJECT] The attestation is for the correct subnet -- i.e.
# compute_subnet_for_attestation(committees_per_slot,
# attestation.data.slot, attestation.data.index) == subnet_id, where
# committees_per_slot = get_committee_count_per_slot(state,
# attestation.data.target.epoch), which may be pre-computed along with the
# committee information for the signature check.
var cache = getEpochCache(blck, state)
let
epochInfo = blck.getEpochInfo(state, cache)
requiredSubnetIndex =
compute_subnet_for_attestation(
get_committee_count_per_slot(epochInfo),
attestation.data.slot, attestation.data.index.CommitteeIndex)
let epochRef = pool.chainDag.getEpochRef(
tgtBlck, attestation.data.target.epoch)
if requiredSubnetIndex != topicCommitteeIndex:
debug "attestation's committee index not for the correct subnet",
topicCommitteeIndex = topicCommitteeIndex,
attestation_data_index = attestation.data.index,
requiredSubnetIndex = requiredSubnetIndex
return false
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id
# [REJECT] The attestation is for the correct subnet -- i.e.
# compute_subnet_for_attestation(committees_per_slot,
# attestation.data.slot, attestation.data.index) == subnet_id, where
# committees_per_slot = get_committee_count_per_slot(state,
# attestation.data.target.epoch), which may be pre-computed along with the
# committee information for the signature check.
let
requiredSubnetIndex =
compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef),
attestation.data.slot, attestation.data.index.CommitteeIndex)
if requiredSubnetIndex != topicCommitteeIndex:
debug "attestation's committee index not for the correct subnet",
topicCommitteeIndex = topicCommitteeIndex,
attestation_data_index = attestation.data.index,
requiredSubnetIndex = requiredSubnetIndex
return false
let
fork = pool.chainDag.headState.data.data.fork
genesis_validators_root =
pool.chainDag.headState.data.data.genesis_validators_root
# The signature of attestation is valid.
if not is_valid_indexed_attestation(
state, get_indexed_attestation(state, attestation, cache), {}):
debug "signature verification failed"
return false
if not is_valid_indexed_attestation(
fork, genesis_validators_root,
epochRef, get_indexed_attestation(epochRef, attestation), {}):
debug "signature verification failed"
return false
true
@ -303,53 +308,55 @@ proc isValidAggregatedAttestation*(
pool.quarantine.addMissing(aggregate.data.target.root)
return
# TODO this could be any state in the target epoch
pool.chainDag.withState(
pool.chainDag.tmpState,
tgtBlck.atSlot(aggregate.data.target.epoch.compute_start_slot_at_epoch)):
var cache = getEpochCache(blck, state)
if not is_aggregator(
state, aggregate.data.slot, aggregate.data.index.CommitteeIndex,
aggregate_and_proof.selection_proof, cache):
debug "Incorrect aggregator"
return false
let epochRef = pool.chainDag.getEpochRef(tgtBlck, aggregate.data.target.epoch)
# [REJECT] The aggregator's validator index is within the committee -- i.e.
# aggregate_and_proof.aggregator_index in get_beacon_committee(state,
# aggregate.data.slot, aggregate.data.index).
if aggregate_and_proof.aggregator_index.ValidatorIndex notin
get_beacon_committee(
state, aggregate.data.slot, aggregate.data.index.CommitteeIndex, cache):
debug "Aggregator's validator index not in committee"
return false
if not is_aggregator(
epochRef, aggregate.data.slot, aggregate.data.index.CommitteeIndex,
aggregate_and_proof.selection_proof):
debug "Incorrect aggregator"
return false
# [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the
# aggregate.data.slot by the validator with index
# aggregate_and_proof.aggregator_index.
# get_slot_signature(state, aggregate.data.slot, privkey)
if aggregate_and_proof.aggregator_index >= state.validators.lenu64:
debug "Invalid aggregator_index"
return false
# [REJECT] The aggregator's validator index is within the committee -- i.e.
# aggregate_and_proof.aggregator_index in get_beacon_committee(state,
# aggregate.data.slot, aggregate.data.index).
if aggregate_and_proof.aggregator_index.ValidatorIndex notin
get_beacon_committee(
epochRef, aggregate.data.slot, aggregate.data.index.CommitteeIndex):
debug "Aggregator's validator index not in committee"
return false
if not verify_slot_signature(
state.fork, state.genesis_validators_root, aggregate.data.slot,
state.validators[aggregate_and_proof.aggregator_index].pubkey,
aggregate_and_proof.selection_proof):
debug "Selection_proof signature verification failed"
return false
# [REJECT] The aggregate_and_proof.selection_proof is a valid signature of the
# aggregate.data.slot by the validator with index
# aggregate_and_proof.aggregator_index.
# get_slot_signature(state, aggregate.data.slot, privkey)
if aggregate_and_proof.aggregator_index >= epochRef.validator_keys.lenu64:
debug "Invalid aggregator_index"
return false
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
if not verify_aggregate_and_proof_signature(
state.fork, state.genesis_validators_root, aggregate_and_proof,
state.validators[aggregate_and_proof.aggregator_index].pubkey,
signed_aggregate_and_proof.signature):
debug "Signed_aggregate_and_proof signature verification failed"
return false
let
fork = pool.chainDag.headState.data.data.fork
genesis_validators_root =
pool.chainDag.headState.data.data.genesis_validators_root
if not verify_slot_signature(
fork, genesis_validators_root, aggregate.data.slot,
epochRef.validator_keys[aggregate_and_proof.aggregator_index],
aggregate_and_proof.selection_proof):
debug "Selection_proof signature verification failed"
return false
# [REJECT] The signature of aggregate is valid.
if not is_valid_indexed_attestation(
state, get_indexed_attestation(state, aggregate, cache), {}):
debug "Aggregate signature verification failed"
return false
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
if not verify_aggregate_and_proof_signature(
fork, genesis_validators_root, aggregate_and_proof,
epochRef.validator_keys[aggregate_and_proof.aggregator_index],
signed_aggregate_and_proof.signature):
debug "Signed_aggregate_and_proof signature verification failed"
return false
# [REJECT] The signature of aggregate is valid.
if not is_valid_indexed_attestation(
fork, genesis_validators_root,
epochRef, get_indexed_attestation(epochRef, aggregate), {}):
debug "Aggregate signature verification failed"
return false
true

View File

@ -29,10 +29,9 @@ proc init*(T: type AttestationPool, chainDag: ChainDAGRef, quarantine: Quarantin
# should probably be removed as a dependency of AttestationPool (or some other
# smart refactoring)
chainDag.withState(chainDag.tmpState, chainDag.finalizedHead):
var forkChoice = initForkChoice(
chainDag.tmpState,
).get()
let tmpState = newClone(chainDag.headState)
chainDag.withState(tmpState[], chainDag.finalizedHead):
var forkChoice = initForkChoice(tmpState[]).get()
# Feed fork choice with unfinalized history - during startup, block pool only
# keeps track of a single history so we just need to follow it

View File

@ -115,9 +115,6 @@ type
# -----------------------------------
# Rewinder - Mutable state processing
cachedStates*: seq[tuple[blockRoot: Eth2Digest, slot: Slot,
state: ref HashedBeaconState]]
headState*: StateData ##\
## State given by the head block; only update in `updateHead`, not anywhere
## else via `withState`
@ -143,6 +140,10 @@ type
beacon_proposers*: array[
SLOTS_PER_EPOCH, Option[(ValidatorIndex, ValidatorPubKey)]]
shuffled_active_validator_indices*: seq[ValidatorIndex]
# This is an expensive cache that could probably be shared among epochref
# instances - in particular, validators keep their keys and locations in the
# structure
validator_keys*: seq[ValidatorPubKey]
BlockRef* = ref object
## Node in object graph guaranteed to lead back to tail block, and to have

View File

@ -22,8 +22,8 @@ import
export block_pools_types
declareCounter beacon_reorgs_total, "Total occurrences of reorganizations of the chain" # On fork choice
declareCounter beacon_state_data_cache_hits, "dag.cachedStates hits"
declareCounter beacon_state_data_cache_misses, "dag.cachedStates misses"
declareCounter beacon_state_data_cache_hits, "EpochRef hits"
declareCounter beacon_state_data_cache_misses, "EpochRef misses"
logScope: topics = "hotdb"
@ -76,7 +76,9 @@ proc init*(T: type EpochRef, state: BeaconState, cache: var StateCache): T =
state, cache, epoch.compute_start_slot_at_epoch() + i)
if idx.isSome():
epochRef.beacon_proposers[i] =
some((idx.get(), state.validators[idx.get].pubkey.initPubKey))
some((idx.get(), state.validators[idx.get].pubkey))
epochRef.validator_keys = mapIt(state.validators.toSeq, it.pubkey)
epochRef
func link*(parent, child: BlockRef) =
@ -344,11 +346,14 @@ proc getEpochRef*(dag: ChainDAGRef, blck: BlockRef, epoch: Epoch): EpochRef =
# we start at the most recent one
for e in bs.blck.epochsInfo:
if e.epoch == epoch:
beacon_state_data_cache_hits.inc
return e
if bs.slot == epoch.compute_start_slot_at_epoch:
break
bs = bs.parent
beacon_state_data_cache_misses.inc
dag.withState(dag.tmpState, bs):
var cache = StateCache()
getEpochInfo(blck, state, cache)
@ -376,39 +381,6 @@ proc getState(
true
func getStateCacheIndex(
dag: ChainDAGRef, blockRoot: Eth2Digest, slot: Slot, matchEpoch: bool):
int =
for i, cachedState in dag.cachedStates:
let (cacheBlockRoot, cacheSlot, _) = cachedState
if cacheBlockRoot == blockRoot and (cacheSlot == slot or
(matchEpoch and
cacheSlot.compute_epoch_at_slot == slot.compute_epoch_at_slot)):
return i
-1
func putStateCache*(
dag: ChainDAGRef, state: HashedBeaconState, blck: BlockRef) =
# Efficiently access states for both attestation aggregation and to process
# block proposals going back to the last finalized slot.
let stateCacheIndex =
dag.getStateCacheIndex(blck.root, state.data.slot, false)
if stateCacheIndex == -1:
const MAX_CACHE_SIZE = 18
let cacheLen = dag.cachedStates.len
doAssert cacheLen <= MAX_CACHE_SIZE
let entry =
if dag.cachedStates.len == MAX_CACHE_SIZE: dag.cachedStates.pop().state
else: (ref HashedBeaconState)()
assign(entry[], state)
insert(dag.cachedStates, (blck.root, state.data.slot, entry))
trace "ChainDAGRef.putState(): state cache updated",
cacheLen, root = shortLog(blck.root), slot = state.data.slot
proc putState*(dag: ChainDAGRef, state: HashedBeaconState, blck: BlockRef) =
# TODO we save state at every epoch start but never remove them - we also
# potentially save multiple states per slot if reorgs happen, meaning
@ -434,9 +406,6 @@ proc putState*(dag: ChainDAGRef, state: HashedBeaconState, blck: BlockRef) =
if not rootWritten:
dag.db.putStateRoot(blck.root, state.data.slot, state.root)
if state.data.slot mod 2 == 0:
putStateCache(dag, state, blck)
func getRef*(dag: ChainDAGRef, root: Eth2Digest): BlockRef =
## Retrieve a resolved block reference, if available
dag.blocks.getOrDefault(root, nil)
@ -577,24 +546,6 @@ proc rewindState(
if parBs.blck != curBs.blck:
ancestors.add(parBs.blck)
# TODO investigate replacing with getStateCached, by refactoring whole
# function. Empirically, this becomes pretty rare once good caches are
# used in the front-end.
let idx = dag.getStateCacheIndex(parBs.blck.root, parBs.slot, matchEpoch)
if idx >= 0:
assign(state.data, dag.cachedStates[idx].state[])
let ancestor = ancestors.pop()
state.blck = ancestor
beacon_state_data_cache_hits.inc()
trace "Replaying state transitions via in-memory cache",
stateSlot = shortLog(state.data.data.slot),
ancestorStateRoot = shortLog(state.data.root),
ancestors = ancestors.len
return ancestors
beacon_state_data_cache_misses.inc()
if (let tmp = dag.db.getStateRoot(parBs.blck.root, parBs.slot); tmp.isSome()):
if dag.db.containsState(tmp.get):
stateRoot = tmp
@ -636,21 +587,9 @@ proc getStateDataCached(
# mostly matches updateStateData(...), because it's too expensive to run the
# rewindState(...)/skipAndUpdateState(...)/state_transition(...) procs, when
# each hash_tree_root(...) consumes a nontrivial fraction of a second.
when false:
# For debugging/development purposes to assess required lookback window for
# any given use case.
doAssert state.data.data.slot <= bs.slot + 4
let idx = dag.getStateCacheIndex(bs.blck.root, bs.slot, matchEpoch)
if idx >= 0:
assign(state.data, dag.cachedStates[idx].state[])
state.blck = bs.blck
beacon_state_data_cache_hits.inc()
return true
# In-memory caches didn't hit. Try main block pool database. This is slower
# than the caches due to SSZ (de)serializing and disk I/O, so prefer them.
beacon_state_data_cache_misses.inc()
if (let tmp = dag.db.getStateRoot(bs.blck.root, bs.slot); tmp.isSome()):
return dag.getState(dag.db, tmp.get(), bs.blck, state)
@ -716,8 +655,6 @@ proc updateStateData*(
state.blck = bs.blck
dag.putStateCache(state.data, bs.blck)
proc loadTailState*(dag: ChainDAGRef): StateData =
## Load the state associated with the current tail in the dag
let stateRoot = dag.db.getBlock(dag.tail.root).get().message.state_root

View File

@ -7,16 +7,22 @@
import
std/[algorithm, sequtils, sets],
../spec/[beaconstate, datatypes, presets, validator],
block_pools_types
chronicles,
../spec/[
beaconstate, crypto, datatypes, digest, helpers, presets, signatures,
validator],
../extras,
./block_pools_types
# Spec functions implemented based on cached values instead of the full state
func count_active_validators*(epochInfo: EpochRef): uint64 =
epochInfo.shuffled_active_validator_indices.lenu64
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#get_committee_count_per_slot
func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 =
get_committee_count_per_slot(count_active_validators(epochInfo))
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#get_beacon_committee
func get_beacon_committee*(
epochRef: EpochRef, slot: Slot, index: CommitteeIndex): seq[ValidatorIndex] =
# Return the beacon committee at ``slot`` for ``index``.
@ -53,3 +59,66 @@ func get_indexed_attestation*(epochRef: EpochRef, attestation: Attestation): Ind
data: attestation.data,
signature: attestation.signature
)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#get_beacon_committee
func get_beacon_committee_len*(
epochRef: EpochRef, slot: Slot, index: CommitteeIndex): uint64 =
# Return the number of members in the beacon committee at ``slot`` for ``index``.
let
epoch = compute_epoch_at_slot(slot)
committees_per_slot = get_committee_count_per_slot(epochRef)
compute_committee_len(
count_active_validators(epochRef),
(slot mod SLOTS_PER_EPOCH) * committees_per_slot +
index.uint64,
committees_per_slot * SLOTS_PER_EPOCH
)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#aggregation-selection
func is_aggregator*(epochRef: EpochRef, slot: Slot, index: CommitteeIndex,
slot_signature: ValidatorSig): bool =
let
committee_len = get_beacon_committee_len(epochRef, slot, index)
modulo = max(1'u64, committee_len div TARGET_AGGREGATORS_PER_COMMITTEE)
bytes_to_uint64(eth2digest(slot_signature.toRaw()).data[0..7]) mod modulo == 0
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#is_valid_indexed_attestation
proc is_valid_indexed_attestation*(
fork: Fork, genesis_validators_root: Eth2Digest,
epochRef: EpochRef, indexed_attestation: SomeIndexedAttestation,
flags: UpdateFlags): bool =
# Check if ``indexed_attestation`` is not empty, has sorted and unique
# indices and has a valid aggregate signature.
template is_sorted_and_unique(s: untyped): bool =
for i in 1 ..< s.len:
if s[i - 1].uint64 >= s[i].uint64:
return false
true
# Not from spec, but this function gets used in front-line roles, not just
# behind firewall.
let num_validators = epochRef.validator_keys.lenu64
if anyIt(indexed_attestation.attesting_indices, it >= num_validators):
trace "indexed attestation: not all indices valid validators"
return false
# Verify indices are sorted and unique
let indices = indexed_attestation.attesting_indices.asSeq
if len(indices) == 0 or not is_sorted_and_unique(indices):
trace "indexed attestation: indices not sorted and unique"
return false
# Verify aggregate signature
if skipBLSValidation notin flags:
# TODO: fuse loops with blsFastAggregateVerify
let pubkeys = mapIt(indices, epochRef.validator_keys[it])
if not verify_attestation_signature(
fork, genesis_validators_root, indexed_attestation.data,
pubkeys, indexed_attestation.signature):
trace "indexed attestation: signature verification failure"
return false
true

View File

@ -172,7 +172,7 @@ func compute_committee*(shuffled_indices: seq[ValidatorIndex],
except KeyError:
raiseAssert("Cached entries are added before use")
func compute_committee_len(active_validators: uint64,
func compute_committee_len*(active_validators: uint64,
index: uint64, count: uint64): uint64 =
## Return the committee corresponding to ``indices``, ``seed``, ``index``,
## and committee ``count``.
@ -206,6 +206,7 @@ func get_beacon_committee*(
committees_per_slot * SLOTS_PER_EPOCH
)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/beacon-chain.md#get_beacon_committee
func get_beacon_committee_len*(
state: BeaconState, slot: Slot, index: CommitteeIndex,
cache: var StateCache): uint64 =

View File

@ -375,7 +375,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
let currSlot = compute_start_slot_at_epoch(epoch) + i
let proposer = node.chainDag.getProposer(head, currSlot)
if proposer.isSome():
result.add((public_key: proposer.get()[1], slot: currSlot))
result.add((public_key: proposer.get()[1].initPubKey(), slot: currSlot))
rpcServer.rpc("post_v1_validator_beacon_committee_subscriptions") do (
committee_index: CommitteeIndex, slot: Slot, aggregator: bool,

View File

@ -358,7 +358,8 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
if proposer.isNone():
return head
let validator = node.attachedValidators.getValidator(proposer.get()[1])
let validator =
node.attachedValidators.getValidator(proposer.get()[1].initPubKey())
if validator != nil:
return await proposeBlock(node, validator, proposer.get()[0], head, slot)
@ -367,7 +368,7 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
headRoot = shortLog(head.root),
slot = shortLog(slot),
proposer_index = proposer.get()[0],
proposer = shortLog(proposer.get()[1]),
proposer = shortLog(proposer.get()[1].initPubKey()),
pcs = "wait_for_proposal"
return head