From 499e5ca9917e15b64784e21b45fd2f727eceabc2 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 22 Oct 2020 12:53:33 +0200 Subject: [PATCH] misc memory and perf fixes (#1899) * misc memory and perf fixes * use EpochRef for attestation aggregation * compress effective balances in memory (medalla unfinalized: 4gb -> 1gb) * avoid hitting db when rewinding to head or clearance state * avoid hitting db when blocks can be applied to in-memory state - speeds up startup considerably * avoid storing epochref in fork choice * simplify and speed up beacon block creation flow - avoids state reload thanks to head rewind optimization * iterator-based committee and attestation participation help avoid lots of small memory allocations throughout epoch transition (40% speedup on epoch processing, for example during startup) * add constant for threshold --- beacon_chain/attestation_aggregation.nim | 23 ++- .../block_pools/block_pools_types.nim | 2 +- beacon_chain/block_pools/chain_dag.nim | 146 +++++++++++------ beacon_chain/block_pools/spec_cache.nim | 65 +++++--- beacon_chain/fork_choice/fork_choice.nim | 50 +++--- .../fork_choice/fork_choice_types.nim | 3 +- beacon_chain/spec/beaconstate.nim | 51 ++++-- .../spec/state_transition_helpers.nim | 7 +- beacon_chain/spec/validator.nim | 21 +++ beacon_chain/validator_api.nim | 10 +- beacon_chain/validator_duties.nim | 150 +++++++----------- 11 files changed, 310 insertions(+), 218 deletions(-) diff --git a/beacon_chain/attestation_aggregation.nim b/beacon_chain/attestation_aggregation.nim index 0b2d4cfd2..8243ca3b5 100644 --- a/beacon_chain/attestation_aggregation.nim +++ b/beacon_chain/attestation_aggregation.nim @@ -11,8 +11,7 @@ import std/[options, sequtils, sets], chronos, chronicles, ./spec/[ - beaconstate, datatypes, crypto, digest, helpers, network, validator, - signatures], + beaconstate, datatypes, crypto, digest, helpers, network, signatures], ./block_pools/[spec_cache, chain_dag, quarantine, spec_cache], ./attestation_pool, ./beacon_node_types, ./ssz, ./time @@ -29,26 +28,22 @@ func is_aggregator*(committee_len: uint64, slot_signature: ValidatorSig): bool = bytes_to_uint64(eth2digest( slot_signature.toRaw()).data.toOpenArray(0, 7)) mod modulo == 0 -func is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, - slot_signature: ValidatorSig, cache: var StateCache): bool = +func is_aggregator(epochRef: EpochRef, slot: Slot, index: CommitteeIndex, + slot_signature: ValidatorSig): bool = let - committee_len = get_beacon_committee_len(state, slot, index, cache) + committee_len = get_beacon_committee_len(epochRef, slot, index) return is_aggregator(committee_len, slot_signature) proc aggregate_attestations*( - pool: AttestationPool, state: BeaconState, index: CommitteeIndex, - validatorIndex: ValidatorIndex, slot_signature: ValidatorSig, - cache: var StateCache): Option[AggregateAndProof] = - let - slot = state.slot - - doAssert validatorIndex in get_beacon_committee(state, slot, index, cache) - doAssert index.uint64 < get_committee_count_per_slot(state, slot.epoch, cache) + pool: AttestationPool, epochRef: EpochRef, slot: Slot, index: CommitteeIndex, + validatorIndex: ValidatorIndex, slot_signature: ValidatorSig): Option[AggregateAndProof] = + doAssert validatorIndex in get_beacon_committee(epochRef, slot, index) + doAssert index.uint64 < get_committee_count_per_slot(epochRef) # TODO for testing purposes, refactor this into the condition check # and just calculation # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#aggregation-selection - if not is_aggregator(state, slot, index, slot_signature, cache): + if not is_aggregator(epochRef, slot, index, slot_signature): return none(AggregateAndProof) let maybe_slot_attestation = getAggregatedAttestation(pool, slot, index) diff --git a/beacon_chain/block_pools/block_pools_types.nim b/beacon_chain/block_pools/block_pools_types.nim index 3e6577037..757ad5916 100644 --- a/beacon_chain/block_pools/block_pools_types.nim +++ b/beacon_chain/block_pools/block_pools_types.nim @@ -148,7 +148,7 @@ type validator_key_store*: (Eth2Digest, ref seq[ValidatorPubKey]) # balances, as used in fork choice - effective_balances*: seq[Gwei] + effective_balances_bytes*: seq[byte] BlockRef* = ref object ## Node in object graph guaranteed to lead back to tail block, and to have diff --git a/beacon_chain/block_pools/chain_dag.nim b/beacon_chain/block_pools/chain_dag.nim index 4cf740b03..314cc9190 100644 --- a/beacon_chain/block_pools/chain_dag.nim +++ b/beacon_chain/block_pools/chain_dag.nim @@ -8,12 +8,9 @@ {.push raises: [Defect].} import - # Standard libraries - chronicles, options, sequtils, tables, sets, - # Status libraries - metrics, - # Internals - ../ssz/merkleization, ../beacon_chain_db, ../extras, + std/[options, sequtils, tables, sets], + metrics, snappy, chronicles, + ../ssz/[ssz_serialization, merkleization], ../beacon_chain_db, ../extras, ../spec/[ crypto, datatypes, digest, helpers, validator, state_transition, beaconstate], @@ -143,12 +140,28 @@ proc init*( newClone(mapIt(state.validators.toSeq, it.pubkey))) # When fork choice runs, it will need the effective balance of the justified - # epoch - we pre-load the balances here to avoid rewinding the justified - # state later - epochRef.effective_balances = get_effective_balances(state) + # checkpoint - we pre-load the balances here to avoid rewinding the justified + # state later and compress them because not all checkpoints end up being used + # for fork choice - specially during long periods of non-finalization + proc snappyEncode(inp: openArray[byte]): seq[byte] = + try: + snappy.encode(inp) + except CatchableError as err: + raiseAssert err.msg + + epochRef.effective_balances_bytes = + snappyEncode(SSZ.encode( + List[Gwei, Limit VALIDATOR_REGISTRY_LIMIT](get_effective_balances(state)))) epochRef +func effective_balances*(epochRef: EpochRef): seq[Gwei] = + try: + SSZ.decode(snappy.decode(epochRef.effective_balances_bytes, uint32.high), + List[Gwei, Limit VALIDATOR_REGISTRY_LIMIT]).toSeq() + except CatchableError as exc: + raiseAssert exc.msg + func updateKeyStores*(epochRef: EpochRef, blck: BlockRef, finalized: BlockRef) = # Because key stores are additive lists, we can use a newer list whereever an # older list is expected - all indices in the new list will be valid for the @@ -651,43 +664,87 @@ proc updateStateData*( # that the state has not been advanced past the desired block - if it has, # an earlier state must be loaded since there's no way to undo the slot # transitions - if state.blck == bs.blck and state.data.data.slot <= bs.slot: - # The block is the same and we're at an early enough slot - advance the - # state with empty slot processing until the slot is correct - dag.advanceSlots(state, bs.slot, save, cache) - - return - - debug "UpdateStateData cache miss", - bs, stateBlock = state.blck, stateSlot = state.data.data.slot - - # Either the state is too new or was created by applying a different block. - # We'll now resort to loading the state from the database then reapplying - # blocks until we reach the desired point in time. var ancestors: seq[BlockRef] cur = bs - # Look for a state in the database and load it - as long as it cannot be - # found, keep track of the blocks that are needed to reach it from the - # state that eventually will be found - while not dag.getState(state, cur): - # There's no state saved for this particular BlockSlot combination, keep - # looking... - if cur.blck.parent != nil and - cur.blck.slot.epoch != epoch(cur.blck.parent.slot): - # We store the state of the parent block with the epoch processing applied - # in the database - we'll need to apply the block however! - ancestors.add(cur.blck) - cur = cur.blck.parent.atEpochStart(cur.blck.slot.epoch) - else: - if cur.slot == cur.blck.slot: - # This is not an empty slot, so the block will need to be applied to - # eventually reach bs - ancestors.add(cur.blck) + found = false - # Moves back slot by slot, in case a state for an empty slot was saved - cur = cur.parent + template canAdvance(state: StateData, bs: BlockSlot): bool = + # The block is the same and we're at an early enough slot - the state can + # be used to arrive at the desired blockslot + state.blck == bs.blck and state.data.data.slot <= bs.slot + + # First, run a quick check if we can simply apply a few blocks to an in-memory + # state - any in-memory state will be faster than loading from database. + # The limit here how many blocks we apply is somewhat arbitrary but two full + # epochs (might be more slots if there are skips) seems like a good enough + # first guess. + # This happens in particular during startup where we replay blocks + # sequentially to grab their votes. + const RewindBlockThreshold = 64 + while ancestors.len < RewindBlockThreshold: + if canAdvance(state, cur): + found = true + break + + if canAdvance(dag.headState, cur): + assign(state, dag.headState) + found = true + break + + if canAdvance(dag.clearanceState, cur): + assign(state, dag.clearanceState) + found = true + break + + if cur.slot == cur.blck.slot: + # This is not an empty slot, so the block will need to be applied to + # eventually reach bs + ancestors.add(cur.blck) + + if cur.blck.parent == nil: + break + + # Moving slot by slot helps find states that were advanced with empty slots + cur = cur.parentOrSlot + + # Let's see if we're within a few epochs of the state block - then we can + # simply replay blocks without loading the whole state + + if not found: + debug "UpdateStateData cache miss", + bs, stateBlock = state.blck, stateSlot = state.data.data.slot + + # Either the state is too new or was created by applying a different block. + # We'll now resort to loading the state from the database then reapplying + # blocks until we reach the desired point in time. + + cur = bs + ancestors.setLen(0) + + # Look for a state in the database and load it - as long as it cannot be + # found, keep track of the blocks that are needed to reach it from the + # state that eventually will be found + while not dag.getState(state, cur): + # There's no state saved for this particular BlockSlot combination, keep + # looking... + if cur.blck.parent != nil and + cur.blck.slot.epoch != epoch(cur.blck.parent.slot): + # We store the state of the parent block with the epoch processing applied + # in the database - we'll need to apply the block however! + ancestors.add(cur.blck) + cur = cur.blck.parent.atEpochStart(cur.blck.slot.epoch) + else: + if cur.slot == cur.blck.slot: + # This is not an empty slot, so the block will need to be applied to + # eventually reach bs + ancestors.add(cur.blck) + + # Moves back slot by slot, in case a state for an empty slot was saved + cur = cur.parent + + beacon_state_rewinds.inc() let startSlot {.used.} = state.data.data.slot # used in logs below @@ -705,16 +762,15 @@ proc updateStateData*( # ...and make sure to process empty slots as requested dag.advanceSlots(state, bs.slot, save, cache) - beacon_state_rewinds.inc() - - trace "State reloaded from database", + trace "State updated", blocks = ancestors.len, slots = state.data.data.slot - startSlot, stateRoot = shortLog(state.data.root), stateSlot = state.data.data.slot, startRoot = shortLog(startRoot), startSlot, - blck = shortLog(bs) + blck = shortLog(bs), + found proc delState(dag: ChainDAGRef, bs: BlockSlot) = # Delete state state and mapping for a particular block+slot diff --git a/beacon_chain/block_pools/spec_cache.nim b/beacon_chain/block_pools/spec_cache.nim index ccd4bd6f5..4f4a69701 100644 --- a/beacon_chain/block_pools/spec_cache.nim +++ b/beacon_chain/block_pools/spec_cache.nim @@ -7,8 +7,9 @@ import std/[algorithm, sequtils, sets], + chronicles, ../spec/[ - beaconstate, crypto, datatypes, digest, helpers, presets, signatures, + crypto, datatypes, digest, helpers, presets, signatures, validator], ../extras, ./block_pools_types, ./chain_dag @@ -21,6 +22,19 @@ func count_active_validators*(epochInfo: EpochRef): uint64 = func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 = get_committee_count_per_slot(count_active_validators(epochInfo)) +# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee +iterator get_beacon_committee*( + epochRef: EpochRef, slot: Slot, index: CommitteeIndex): ValidatorIndex = + # Return the beacon committee at ``slot`` for ``index``. + let + committees_per_slot = get_committee_count_per_slot(epochRef) + for idx in compute_committee( + epochRef.shuffled_active_validator_indices, + (slot mod SLOTS_PER_EPOCH) * committees_per_slot + + index.uint64, + committees_per_slot * SLOTS_PER_EPOCH + ): yield idx + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee func get_beacon_committee*( epochRef: EpochRef, slot: Slot, index: CommitteeIndex): seq[ValidatorIndex] = @@ -34,14 +48,41 @@ func get_beacon_committee*( committees_per_slot * SLOTS_PER_EPOCH ) +# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee +func get_beacon_committee_len*( + epochRef: EpochRef, slot: Slot, index: CommitteeIndex): uint64 = + # Return the number of members in the beacon committee at ``slot`` for ``index``. + let + committees_per_slot = get_committee_count_per_slot(epochRef) + + compute_committee_len( + count_active_validators(epochRef), + (slot mod SLOTS_PER_EPOCH) * committees_per_slot + + index.uint64, + committees_per_slot * SLOTS_PER_EPOCH + ) + +# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices +iterator get_attesting_indices*(epochRef: EpochRef, + data: AttestationData, + bits: CommitteeValidatorsBits): + ValidatorIndex = + if bits.lenu64 != get_beacon_committee_len(epochRef, data.slot, data.index.CommitteeIndex): + trace "get_attesting_indices: inconsistent aggregation and committee length" + else: + var i = 0 + for index in get_beacon_committee(epochRef, data.slot, data.index.CommitteeIndex): + if bits[i]: + yield index + inc i + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices func get_attesting_indices*(epochRef: EpochRef, data: AttestationData, bits: CommitteeValidatorsBits): - HashSet[ValidatorIndex] = - get_attesting_indices( - bits, - get_beacon_committee(epochRef, data.slot, data.index.CommitteeIndex)) + HashSet[ValidatorIndex] = + for idx in get_attesting_indices(epochRef, data, bits): + result.incl(idx) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_indexed_attestation func get_indexed_attestation*(epochRef: EpochRef, attestation: Attestation): IndexedAttestation = @@ -59,20 +100,6 @@ func get_indexed_attestation*(epochRef: EpochRef, attestation: Attestation): Ind signature: attestation.signature ) -# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee -func get_beacon_committee_len*( - epochRef: EpochRef, slot: Slot, index: CommitteeIndex): uint64 = - # Return the number of members in the beacon committee at ``slot`` for ``index``. - let - committees_per_slot = get_committee_count_per_slot(epochRef) - - compute_committee_len( - count_active_validators(epochRef), - (slot mod SLOTS_PER_EPOCH) * committees_per_slot + - index.uint64, - committees_per_slot * SLOTS_PER_EPOCH - ) - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#aggregation-selection func is_aggregator*(epochRef: EpochRef, slot: Slot, index: CommitteeIndex, slot_signature: ValidatorSig): bool = diff --git a/beacon_chain/fork_choice/fork_choice.nim b/beacon_chain/fork_choice/fork_choice.nim index 175ebc378..c0cb1a5a8 100644 --- a/beacon_chain/fork_choice/fork_choice.nim +++ b/beacon_chain/fork_choice/fork_choice.nim @@ -71,10 +71,10 @@ proc init*(T: type ForkChoice, epoch = epochRef.epoch, blck = shortLog(blck) let - justified = BalanceCheckpoint(blck: blck, epochRef: epochRef) + justified = BalanceCheckpoint(blck: blck, epoch: epochRef.epoch, balances: epochRef.effective_balances) finalized = Checkpoint(root: blck.root, epoch: epochRef.epoch) best_justified = Checkpoint( - root: justified.blck.root, epoch: justified.epochRef.epoch) + root: justified.blck.root, epoch: justified.epoch) ForkChoice( backend: ForkChoiceBackend.init( @@ -95,7 +95,6 @@ func extend[T](s: var seq[T], minLen: int) = proc compute_slots_since_epoch_start(slot: Slot): uint64 = slot - slot.epoch().compute_start_slot_at_epoch() - proc on_tick(self: var Checkpoints, dag: ChainDAGRef, time: Slot): FcResult[void] = if self.time > time: return err(ForkChoiceError(kind: fcInconsistentTick)) @@ -104,15 +103,17 @@ proc on_tick(self: var Checkpoints, dag: ChainDAGRef, time: Slot): FcResult[void self.time = time if newEpoch and - self.best_justified.epoch > self.justified.epochRef.epoch: + self.best_justified.epoch > self.justified.epoch: let blck = dag.getRef(self.best_justified.root) if blck.isNil: return err(ForkChoiceError( kind: fcJustifiedNodeUnknown, block_root: self.best_justified.root)) + let epochRef = dag.getEpochRef(blck, self.best_justified.epoch) self.justified = BalanceCheckpoint( blck: blck, - epochRef: dag.getEpochRef(blck, self.best_justified.epoch)) + epoch: epochRef.epoch, + balances: epochRef.effective_balances) ok() proc process_attestation_queue(self: var ForkChoice) {.gcsafe.} @@ -199,21 +200,18 @@ proc should_update_justified_checkpoint( dag: ChainDAGRef, epochRef: EpochRef): FcResult[bool] = if compute_slots_since_epoch_start(self.time) < SAFE_SLOTS_TO_UPDATE_JUSTIFIED: - return ok(true) + return ok(true) let - justified_slot = compute_start_slot_at_epoch(self.justified.epochRef.epoch) - - let new_justified_checkpoint = epochRef.current_justified_checkpoint; - - let justified_blck = dag.getRef(new_justified_checkpoint.root) + justified_slot = compute_start_slot_at_epoch(self.justified.epoch) + new_justified_checkpoint = epochRef.current_justified_checkpoint + justified_blck = dag.getRef(new_justified_checkpoint.root) if justified_blck.isNil: return err(ForkChoiceError( kind: fcJustifiedNodeUnknown, block_root: new_justified_checkpoint.root)) - let justified_ancestor = - justified_blck.atSlot(justified_slot) + let justified_ancestor = justified_blck.atSlot(justified_slot) if justified_ancestor.blck.root != self.justified.blck.root: return ok(false) @@ -231,38 +229,44 @@ proc process_state(self: var Checkpoints, trace "Processing epoch", epoch = epochRef.epoch, state_justified_epoch = state_justified_epoch, - current_justified = self.justified.epochRef.epoch, + current_justified = self.justified.epoch, state_finalized_epoch = state_finalized_epoch, current_finalized = self.finalized.epoch - if state_justified_epoch > self.justified.epochRef.epoch: + if state_justified_epoch > self.justified.epoch: if state_justified_epoch > self.best_justified.epoch: self.best_justified = epochRef.current_justified_checkpoint if ? should_update_justified_checkpoint(self, dag, epochRef): - let justifiedBlck = blck.atEpochStart(state_justified_epoch) + let + justifiedBlck = blck.atEpochStart(state_justified_epoch) + justifiedEpoch = dag.getEpochRef(justifiedBlck.blck, state_justified_epoch) self.justified = BalanceCheckpoint( blck: justifiedBlck.blck, - epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)) + epoch: justifiedEpoch.epoch, + balances: justifiedEpoch.effective_balances) if state_finalized_epoch > self.finalized.epoch: self.finalized = epochRef.finalized_checkpoint - if self.justified.epochRef.epoch != state_justified_epoch or + if self.justified.epoch != state_justified_epoch or self.justified.blck.root != epochRef.current_justified_checkpoint.root: - if (state_justified_epoch > self.justified.epochRef.epoch) or + if (state_justified_epoch > self.justified.epoch) or (self.justified.blck.atEpochStart(self.finalized.epoch).blck.root != self.finalized.root): - let justifiedBlck = blck.atEpochStart(state_justified_epoch) + let + justifiedBlck = blck.atEpochStart(state_justified_epoch) + justifiedEpoch = dag.getEpochRef(justifiedBlck.blck, state_justified_epoch) self.justified = BalanceCheckpoint( blck: justifiedBlck.blck, - epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)) + epoch: justifiedEpoch.epoch, + balances: justifiedEpoch.effective_balances) ok() proc process_block*(self: var ForkChoiceBackend, @@ -360,10 +364,10 @@ proc get_head*(self: var ForkChoice, ? self.update_time(dag, wallSlot) self.backend.find_head( - self.checkpoints.justified.epochRef.epoch, + self.checkpoints.justified.epoch, self.checkpoints.justified.blck.root, self.checkpoints.finalized.epoch, - self.checkpoints.justified.epochRef.effective_balances, + self.checkpoints.justified.balances, ) func prune*( diff --git a/beacon_chain/fork_choice/fork_choice_types.nim b/beacon_chain/fork_choice/fork_choice_types.nim index ff6f67b1d..b79d500df 100644 --- a/beacon_chain/fork_choice/fork_choice_types.nim +++ b/beacon_chain/fork_choice/fork_choice_types.nim @@ -135,7 +135,8 @@ type BalanceCheckpoint* = object blck*: BlockRef - epochRef*: EpochRef + epoch*: Epoch + balances*: seq[Gwei] Checkpoints* = object time*: Slot diff --git a/beacon_chain/spec/beaconstate.nim b/beacon_chain/spec/beaconstate.nim index f10f4f9c5..fcd28a5cf 100644 --- a/beacon_chain/spec/beaconstate.nim +++ b/beacon_chain/spec/beaconstate.nim @@ -460,23 +460,43 @@ proc is_valid_indexed_attestation*( ok() # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices +iterator get_attesting_indices*(bits: CommitteeValidatorsBits, + committee: openArray[ValidatorIndex]): + ValidatorIndex = + if bits.len == committee.len: + for i, index in committee: + if bits[i]: + yield index + else: + # This shouldn't happen if one begins with a valid BeaconState and applies + # valid updates, but one can construct a BeaconState where it does. Do not + # do anything here since the PendingAttestation wouldn't have made it past + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#attestations + # which checks len(attestation.aggregation_bits) == len(committee) that in + # nimbus-eth2 lives in check_attestation(...). + # Addresses https://github.com/status-im/nimbus-eth2/issues/922 + + trace "get_attesting_indices: inconsistent aggregation and committee length" + func get_attesting_indices*(bits: CommitteeValidatorsBits, committee: openArray[ValidatorIndex]): HashSet[ValidatorIndex] = - # This shouldn't happen if one begins with a valid BeaconState and applies - # valid updates, but one can construct a BeaconState where it does. Do not - # do anything here since the PendingAttestation wouldn't have made it past - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#attestations - # which checks len(attestation.aggregation_bits) == len(committee) that in - # nimbus-eth2 lives in check_attestation(...). - # Addresses https://github.com/status-im/nimbus-eth2/issues/922 - if bits.len != committee.len: - trace "get_attesting_indices: inconsistent aggregation and committee length" - return + for idx in get_attesting_indices(bits, committee): + result.incl idx - for i, index in committee: - if bits[i]: - result.incl index +# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices +iterator get_attesting_indices*(state: BeaconState, + data: AttestationData, + bits: CommitteeValidatorsBits, + cache: var StateCache): ValidatorIndex = + if bits.lenu64 != get_beacon_committee_len(state, data.slot, data.index.CommitteeIndex, cache): + trace "get_attesting_indices: inconsistent aggregation and committee length" + else: + var i = 0 + for index in get_beacon_committee(state, data.slot, data.index.CommitteeIndex, cache): + if bits[i]: + yield index + inc i # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices func get_attesting_indices*(state: BeaconState, @@ -485,9 +505,8 @@ func get_attesting_indices*(state: BeaconState, cache: var StateCache): HashSet[ValidatorIndex] = # Return the set of attesting indices corresponding to ``data`` and ``bits``. - get_attesting_indices( - bits, - get_beacon_committee(state, data.slot, data.index.CommitteeIndex, cache)) + for index in get_attesting_indices(state, data, bits, cache): + result.incl index # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_indexed_attestation func get_indexed_attestation(state: BeaconState, attestation: Attestation, diff --git a/beacon_chain/spec/state_transition_helpers.nim b/beacon_chain/spec/state_transition_helpers.nim index caa70b36c..8c1716575 100644 --- a/beacon_chain/spec/state_transition_helpers.nim +++ b/beacon_chain/spec/state_transition_helpers.nim @@ -22,16 +22,15 @@ func get_attesting_indices*( cache: var StateCache): HashSet[ValidatorIndex] = # This is part of get_unslashed_attesting_indices(...) in spec. # Exported bceause of external trace-level chronicles logging. - result = initHashSet[ValidatorIndex]() for a in attestations: - result.incl get_attesting_indices( - state, a.data, a.aggregation_bits, cache) + for idx in get_attesting_indices( + state, a.data, a.aggregation_bits, cache): + result.incl idx # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#helper-functions-1 func get_unslashed_attesting_indices*( state: BeaconState, attestations: openArray[PendingAttestation], cache: var StateCache): HashSet[ValidatorIndex] = - result = initHashSet[ValidatorIndex]() for a in attestations: for idx in get_attesting_indices( state, a.data, a.aggregation_bits, cache): diff --git a/beacon_chain/spec/validator.nim b/beacon_chain/spec/validator.nim index 45dabc224..e51118d6d 100644 --- a/beacon_chain/spec/validator.nim +++ b/beacon_chain/spec/validator.nim @@ -200,6 +200,13 @@ func compute_committee_slice*( start.int..(endIdx.int - 1) +iterator compute_committee*(shuffled_indices: seq[ValidatorIndex], + index: uint64, count: uint64): ValidatorIndex = + let + slice = compute_committee_slice(shuffled_indices.lenu64, index, count) + for i in slice: + yield shuffled_indices[i] + func compute_committee*(shuffled_indices: seq[ValidatorIndex], index: uint64, count: uint64): seq[ValidatorIndex] = ## Return the committee corresponding to ``indices``, ``seed``, ``index``, @@ -224,6 +231,20 @@ func compute_committee_len*( (slice.b - slice.a + 1).uint64 # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee +iterator get_beacon_committee*( + state: BeaconState, slot: Slot, index: CommitteeIndex, + cache: var StateCache): ValidatorIndex = + ## Return the beacon committee at ``slot`` for ``index``. + let + epoch = compute_epoch_at_slot(slot) + committees_per_slot = get_committee_count_per_slot(state, epoch, cache) + for idx in compute_committee( + cache.get_shuffled_active_validator_indices(state, epoch), + (slot mod SLOTS_PER_EPOCH) * committees_per_slot + + index.uint64, + committees_per_slot * SLOTS_PER_EPOCH + ): yield idx + func get_beacon_committee*( state: BeaconState, slot: Slot, index: CommitteeIndex, cache: var StateCache): seq[ValidatorIndex] = diff --git a/beacon_chain/validator_api.nim b/beacon_chain/validator_api.nim index ed179da7e..b2e3955b5 100644 --- a/beacon_chain/validator_api.nim +++ b/beacon_chain/validator_api.nim @@ -321,13 +321,11 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = let proposer = node.chainDag.getProposer(head, slot) if proposer.isNone(): raise newException(CatchableError, "could not retrieve block for slot: " & $slot) - let valInfo = ValidatorInfoForMakeBeaconBlock(kind: viRandao_reveal, - randao_reveal: randao_reveal) - let res = await makeBeaconBlockForHeadAndSlot( - node, valInfo, proposer.get()[0], graffiti, head, slot) - if res.message.isNone(): + let message = makeBeaconBlockForHeadAndSlot( + node, randao_reveal, proposer.get()[0], graffiti, head, slot) + if message.isNone(): raise newException(CatchableError, "could not retrieve block for slot: " & $slot) - return res.message.get() + return message.get() rpcServer.rpc("post_v1_validator_block") do (body: SignedBeaconBlock) -> bool: debug "post_v1_validator_block", diff --git a/beacon_chain/validator_duties.nim b/beacon_chain/validator_duties.nim index 9711b496e..bfd0fd64f 100644 --- a/beacon_chain/validator_duties.nim +++ b/beacon_chain/validator_duties.nim @@ -18,7 +18,7 @@ import eth/[keys, async_utils], eth/p2p/discoveryv5/[protocol, enr], # Local modules - spec/[datatypes, digest, crypto, helpers, validator, network, signatures], + spec/[datatypes, digest, crypto, helpers, network, signatures], spec/state_transition, conf, time, validator_pool, attestation_pool, exit_pool, block_pools/[spec_cache, chain_dag, clearance], @@ -176,46 +176,20 @@ proc createAndSendAttestation(node: BeaconNode, validator = shortLog(validator), indexInCommittee = indexInCommittee -type - ValidatorInfoForMakeBeaconBlockKind* = enum - viValidator - viRandao_reveal - ValidatorInfoForMakeBeaconBlock* = object - case kind*: ValidatorInfoForMakeBeaconBlockKind - of viValidator: validator*: AttachedValidator - of viRandao_reveal: randao_reveal*: ValidatorSig - proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, - val_info: ValidatorInfoForMakeBeaconBlock, + randao_reveal: ValidatorSig, validator_index: ValidatorIndex, graffiti: GraffitiBytes, head: BlockRef, - slot: Slot): - Future[tuple[message: Option[BeaconBlock], fork: Fork, - genesis_validators_root: Eth2Digest]] {.async.} = - # Advance state to the slot that we're proposing for - this is the equivalent - # of running `process_slots` up to the slot of the new block. - node.chainDag.withState( - node.chainDag.tmpState, head.atSlot(slot)): + slot: Slot): Option[BeaconBlock] = + # Advance state to the slot that we're proposing for + node.chainDag.withState(node.chainDag.tmpState, head.atSlot(slot)): let (eth1data, deposits) = if node.mainchainMonitor.isNil: (state.eth1_data, newSeq[Deposit]()) else: node.mainchainMonitor.getBlockProposalData(state) - # TODO perhaps just making the enclosing function accept 2 different types at the - # same time and doing some compile-time branching logic is cleaner (without the - # need for the discriminated union)... but we need the `state` from `withState` - # in order to get the fork/root for the specific head/slot for the randao_reveal - # and it's causing problems when the function becomes a generic for 2 types... - proc getRandaoReveal(val_info: ValidatorInfoForMakeBeaconBlock): - Future[ValidatorSig] {.async.} = - if val_info.kind == viValidator: - return await val_info.validator.genRandaoReveal( - state.fork, state.genesis_validators_root, slot) - elif val_info.kind == viRandao_reveal: - return val_info.randao_reveal - let poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived @@ -226,12 +200,12 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, doAssert v.addr == addr poolPtr.tmpState.data assign(poolPtr.tmpState, poolPtr.headState) - let message = makeBeaconBlock( + makeBeaconBlock( node.config.runtimePreset, hashedState, validator_index, head.root, - await getRandaoReveal(val_info), + randao_reveal, eth1data, graffiti, node.attestationPool[].getAttestationsForBlock(state, cache), @@ -242,15 +216,6 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, restore, cache) - if message.isSome(): - # TODO this restore is needed because otherwise tmpState will be internally - # inconsistent - it's blck will not be pointing to the block that - # created this state - we have to reset it here before `await` to avoid - # races. - restore(poolPtr.tmpState.data) - - return (message, state.fork, state.genesis_validators_root) - proc proposeSignedBlock*(node: BeaconNode, head: BlockRef, validator: AttachedValidator, @@ -310,27 +275,33 @@ proc proposeBlock(node: BeaconNode, existingProposal = notSlashable.error return head - let valInfo = ValidatorInfoForMakeBeaconBlock(kind: viValidator, validator: validator) - let beaconBlockTuple = await makeBeaconBlockForHeadAndSlot( - node, valInfo, validator_index, node.graffitiBytes, head, slot) - if not beaconBlockTuple.message.isSome(): + let + fork = node.chainDag.headState.data.data.fork + genesis_validators_root = + node.chainDag.headState.data.data.genesis_validators_root + let + randao = await validator.genRandaoReveal( + fork, genesis_validators_root, slot) + message = makeBeaconBlockForHeadAndSlot( + node, randao, validator_index, node.graffitiBytes, head, slot) + if not message.isSome(): return head # already logged elsewhere! var newBlock = SignedBeaconBlock( - message: beaconBlockTuple.message.get() + message: message.get() ) newBlock.root = hash_tree_root(newBlock.message) # TODO: recomputed in block proposal let signing_root = compute_block_root( - beaconBlockTuple.fork, beaconBlockTuple.genesis_validators_root, slot, newBlock.root) + fork, genesis_validators_root, slot, newBlock.root) node.attachedValidators .slashingProtection .registerBlock(validator.pubkey, slot, signing_root) newBlock.signature = await validator.signBlockProposal( - beaconBlockTuple.fork, beaconBlockTuple.genesis_validators_root, slot, newBlock.root) + fork, genesis_validators_root, slot, newBlock.root) return await node.proposeSignedBlock(head, validator, newBlock) @@ -458,52 +429,53 @@ proc broadcastAggregatedAttestations( # way to organize this. Then the private key for that validator should be # the corresponding one -- whatver they are, they match. - let bs = BlockSlot(blck: aggregationHead, slot: aggregationSlot) - node.chainDag.withState(node.chainDag.tmpState, bs): - let - committees_per_slot = - get_committee_count_per_slot(state, aggregationSlot.epoch, cache) + let + bs = BlockSlot(blck: aggregationHead, slot: aggregationSlot) + epochRef = node.chainDag.getEpochRef(aggregationHead, aggregationSlot.epoch) + fork = node.chainDag.headState.data.data.fork + genesis_validators_root = + node.chainDag.headState.data.data.genesis_validators_root + committees_per_slot = get_committee_count_per_slot(epochRef) - var - slotSigs: seq[Future[ValidatorSig]] = @[] - slotSigsData: seq[tuple[committee_index: uint64, - validator_idx: ValidatorIndex, - v: AttachedValidator]] = @[] + var + slotSigs: seq[Future[ValidatorSig]] = @[] + slotSigsData: seq[tuple[committee_index: uint64, + validator_idx: ValidatorIndex, + v: AttachedValidator]] = @[] - for committee_index in 0'u64..