misc memory and perf fixes (#1899)

* misc memory and perf fixes

* use EpochRef for attestation aggregation
* compress effective balances in memory (medalla unfinalized: 4gb ->
1gb)
* avoid hitting db when rewinding to head or clearance state
* avoid hitting db when blocks can be applied to in-memory state -
speeds up startup considerably
* avoid storing epochref in fork choice
* simplify and speed up beacon block creation flow - avoids state reload
thanks to head rewind optimization
* iterator-based committee and attestation participation help avoid lots
of small memory allocations throughout epoch transition (40% speedup on
epoch processing, for example during startup)

* add constant for threshold
This commit is contained in:
Jacek Sieka 2020-10-22 12:53:33 +02:00 committed by GitHub
parent decfd66a78
commit 499e5ca991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 310 additions and 218 deletions

View File

@ -11,8 +11,7 @@ import
std/[options, sequtils, sets], std/[options, sequtils, sets],
chronos, chronicles, chronos, chronicles,
./spec/[ ./spec/[
beaconstate, datatypes, crypto, digest, helpers, network, validator, beaconstate, datatypes, crypto, digest, helpers, network, signatures],
signatures],
./block_pools/[spec_cache, chain_dag, quarantine, spec_cache], ./block_pools/[spec_cache, chain_dag, quarantine, spec_cache],
./attestation_pool, ./beacon_node_types, ./ssz, ./time ./attestation_pool, ./beacon_node_types, ./ssz, ./time
@ -29,26 +28,22 @@ func is_aggregator*(committee_len: uint64, slot_signature: ValidatorSig): bool =
bytes_to_uint64(eth2digest( bytes_to_uint64(eth2digest(
slot_signature.toRaw()).data.toOpenArray(0, 7)) mod modulo == 0 slot_signature.toRaw()).data.toOpenArray(0, 7)) mod modulo == 0
func is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, func is_aggregator(epochRef: EpochRef, slot: Slot, index: CommitteeIndex,
slot_signature: ValidatorSig, cache: var StateCache): bool = slot_signature: ValidatorSig): bool =
let let
committee_len = get_beacon_committee_len(state, slot, index, cache) committee_len = get_beacon_committee_len(epochRef, slot, index)
return is_aggregator(committee_len, slot_signature) return is_aggregator(committee_len, slot_signature)
proc aggregate_attestations*( proc aggregate_attestations*(
pool: AttestationPool, state: BeaconState, index: CommitteeIndex, pool: AttestationPool, epochRef: EpochRef, slot: Slot, index: CommitteeIndex,
validatorIndex: ValidatorIndex, slot_signature: ValidatorSig, validatorIndex: ValidatorIndex, slot_signature: ValidatorSig): Option[AggregateAndProof] =
cache: var StateCache): Option[AggregateAndProof] = doAssert validatorIndex in get_beacon_committee(epochRef, slot, index)
let doAssert index.uint64 < get_committee_count_per_slot(epochRef)
slot = state.slot
doAssert validatorIndex in get_beacon_committee(state, slot, index, cache)
doAssert index.uint64 < get_committee_count_per_slot(state, slot.epoch, cache)
# TODO for testing purposes, refactor this into the condition check # TODO for testing purposes, refactor this into the condition check
# and just calculation # and just calculation
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#aggregation-selection # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#aggregation-selection
if not is_aggregator(state, slot, index, slot_signature, cache): if not is_aggregator(epochRef, slot, index, slot_signature):
return none(AggregateAndProof) return none(AggregateAndProof)
let maybe_slot_attestation = getAggregatedAttestation(pool, slot, index) let maybe_slot_attestation = getAggregatedAttestation(pool, slot, index)

View File

@ -148,7 +148,7 @@ type
validator_key_store*: (Eth2Digest, ref seq[ValidatorPubKey]) validator_key_store*: (Eth2Digest, ref seq[ValidatorPubKey])
# balances, as used in fork choice # balances, as used in fork choice
effective_balances*: seq[Gwei] effective_balances_bytes*: seq[byte]
BlockRef* = ref object BlockRef* = ref object
## Node in object graph guaranteed to lead back to tail block, and to have ## Node in object graph guaranteed to lead back to tail block, and to have

View File

@ -8,12 +8,9 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
# Standard libraries std/[options, sequtils, tables, sets],
chronicles, options, sequtils, tables, sets, metrics, snappy, chronicles,
# Status libraries ../ssz/[ssz_serialization, merkleization], ../beacon_chain_db, ../extras,
metrics,
# Internals
../ssz/merkleization, ../beacon_chain_db, ../extras,
../spec/[ ../spec/[
crypto, datatypes, digest, helpers, validator, state_transition, crypto, datatypes, digest, helpers, validator, state_transition,
beaconstate], beaconstate],
@ -143,12 +140,28 @@ proc init*(
newClone(mapIt(state.validators.toSeq, it.pubkey))) newClone(mapIt(state.validators.toSeq, it.pubkey)))
# When fork choice runs, it will need the effective balance of the justified # When fork choice runs, it will need the effective balance of the justified
# epoch - we pre-load the balances here to avoid rewinding the justified # checkpoint - we pre-load the balances here to avoid rewinding the justified
# state later # state later and compress them because not all checkpoints end up being used
epochRef.effective_balances = get_effective_balances(state) # for fork choice - specially during long periods of non-finalization
proc snappyEncode(inp: openArray[byte]): seq[byte] =
try:
snappy.encode(inp)
except CatchableError as err:
raiseAssert err.msg
epochRef.effective_balances_bytes =
snappyEncode(SSZ.encode(
List[Gwei, Limit VALIDATOR_REGISTRY_LIMIT](get_effective_balances(state))))
epochRef epochRef
func effective_balances*(epochRef: EpochRef): seq[Gwei] =
try:
SSZ.decode(snappy.decode(epochRef.effective_balances_bytes, uint32.high),
List[Gwei, Limit VALIDATOR_REGISTRY_LIMIT]).toSeq()
except CatchableError as exc:
raiseAssert exc.msg
func updateKeyStores*(epochRef: EpochRef, blck: BlockRef, finalized: BlockRef) = func updateKeyStores*(epochRef: EpochRef, blck: BlockRef, finalized: BlockRef) =
# Because key stores are additive lists, we can use a newer list whereever an # Because key stores are additive lists, we can use a newer list whereever an
# older list is expected - all indices in the new list will be valid for the # older list is expected - all indices in the new list will be valid for the
@ -651,13 +664,55 @@ proc updateStateData*(
# that the state has not been advanced past the desired block - if it has, # that the state has not been advanced past the desired block - if it has,
# an earlier state must be loaded since there's no way to undo the slot # an earlier state must be loaded since there's no way to undo the slot
# transitions # transitions
if state.blck == bs.blck and state.data.data.slot <= bs.slot:
# The block is the same and we're at an early enough slot - advance the
# state with empty slot processing until the slot is correct
dag.advanceSlots(state, bs.slot, save, cache)
return var
ancestors: seq[BlockRef]
cur = bs
found = false
template canAdvance(state: StateData, bs: BlockSlot): bool =
# The block is the same and we're at an early enough slot - the state can
# be used to arrive at the desired blockslot
state.blck == bs.blck and state.data.data.slot <= bs.slot
# First, run a quick check if we can simply apply a few blocks to an in-memory
# state - any in-memory state will be faster than loading from database.
# The limit here how many blocks we apply is somewhat arbitrary but two full
# epochs (might be more slots if there are skips) seems like a good enough
# first guess.
# This happens in particular during startup where we replay blocks
# sequentially to grab their votes.
const RewindBlockThreshold = 64
while ancestors.len < RewindBlockThreshold:
if canAdvance(state, cur):
found = true
break
if canAdvance(dag.headState, cur):
assign(state, dag.headState)
found = true
break
if canAdvance(dag.clearanceState, cur):
assign(state, dag.clearanceState)
found = true
break
if cur.slot == cur.blck.slot:
# This is not an empty slot, so the block will need to be applied to
# eventually reach bs
ancestors.add(cur.blck)
if cur.blck.parent == nil:
break
# Moving slot by slot helps find states that were advanced with empty slots
cur = cur.parentOrSlot
# Let's see if we're within a few epochs of the state block - then we can
# simply replay blocks without loading the whole state
if not found:
debug "UpdateStateData cache miss", debug "UpdateStateData cache miss",
bs, stateBlock = state.blck, stateSlot = state.data.data.slot bs, stateBlock = state.blck, stateSlot = state.data.data.slot
@ -665,9 +720,9 @@ proc updateStateData*(
# We'll now resort to loading the state from the database then reapplying # We'll now resort to loading the state from the database then reapplying
# blocks until we reach the desired point in time. # blocks until we reach the desired point in time.
var
ancestors: seq[BlockRef]
cur = bs cur = bs
ancestors.setLen(0)
# Look for a state in the database and load it - as long as it cannot be # Look for a state in the database and load it - as long as it cannot be
# found, keep track of the blocks that are needed to reach it from the # found, keep track of the blocks that are needed to reach it from the
# state that eventually will be found # state that eventually will be found
@ -689,6 +744,8 @@ proc updateStateData*(
# Moves back slot by slot, in case a state for an empty slot was saved # Moves back slot by slot, in case a state for an empty slot was saved
cur = cur.parent cur = cur.parent
beacon_state_rewinds.inc()
let let
startSlot {.used.} = state.data.data.slot # used in logs below startSlot {.used.} = state.data.data.slot # used in logs below
startRoot {.used.} = state.data.root startRoot {.used.} = state.data.root
@ -705,16 +762,15 @@ proc updateStateData*(
# ...and make sure to process empty slots as requested # ...and make sure to process empty slots as requested
dag.advanceSlots(state, bs.slot, save, cache) dag.advanceSlots(state, bs.slot, save, cache)
beacon_state_rewinds.inc() trace "State updated",
trace "State reloaded from database",
blocks = ancestors.len, blocks = ancestors.len,
slots = state.data.data.slot - startSlot, slots = state.data.data.slot - startSlot,
stateRoot = shortLog(state.data.root), stateRoot = shortLog(state.data.root),
stateSlot = state.data.data.slot, stateSlot = state.data.data.slot,
startRoot = shortLog(startRoot), startRoot = shortLog(startRoot),
startSlot, startSlot,
blck = shortLog(bs) blck = shortLog(bs),
found
proc delState(dag: ChainDAGRef, bs: BlockSlot) = proc delState(dag: ChainDAGRef, bs: BlockSlot) =
# Delete state state and mapping for a particular block+slot # Delete state state and mapping for a particular block+slot

View File

@ -7,8 +7,9 @@
import import
std/[algorithm, sequtils, sets], std/[algorithm, sequtils, sets],
chronicles,
../spec/[ ../spec/[
beaconstate, crypto, datatypes, digest, helpers, presets, signatures, crypto, datatypes, digest, helpers, presets, signatures,
validator], validator],
../extras, ../extras,
./block_pools_types, ./chain_dag ./block_pools_types, ./chain_dag
@ -21,6 +22,19 @@ func count_active_validators*(epochInfo: EpochRef): uint64 =
func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 = func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 =
get_committee_count_per_slot(count_active_validators(epochInfo)) get_committee_count_per_slot(count_active_validators(epochInfo))
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee
iterator get_beacon_committee*(
epochRef: EpochRef, slot: Slot, index: CommitteeIndex): ValidatorIndex =
# Return the beacon committee at ``slot`` for ``index``.
let
committees_per_slot = get_committee_count_per_slot(epochRef)
for idx in compute_committee(
epochRef.shuffled_active_validator_indices,
(slot mod SLOTS_PER_EPOCH) * committees_per_slot +
index.uint64,
committees_per_slot * SLOTS_PER_EPOCH
): yield idx
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee
func get_beacon_committee*( func get_beacon_committee*(
epochRef: EpochRef, slot: Slot, index: CommitteeIndex): seq[ValidatorIndex] = epochRef: EpochRef, slot: Slot, index: CommitteeIndex): seq[ValidatorIndex] =
@ -34,14 +48,41 @@ func get_beacon_committee*(
committees_per_slot * SLOTS_PER_EPOCH committees_per_slot * SLOTS_PER_EPOCH
) )
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee
func get_beacon_committee_len*(
epochRef: EpochRef, slot: Slot, index: CommitteeIndex): uint64 =
# Return the number of members in the beacon committee at ``slot`` for ``index``.
let
committees_per_slot = get_committee_count_per_slot(epochRef)
compute_committee_len(
count_active_validators(epochRef),
(slot mod SLOTS_PER_EPOCH) * committees_per_slot +
index.uint64,
committees_per_slot * SLOTS_PER_EPOCH
)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices
iterator get_attesting_indices*(epochRef: EpochRef,
data: AttestationData,
bits: CommitteeValidatorsBits):
ValidatorIndex =
if bits.lenu64 != get_beacon_committee_len(epochRef, data.slot, data.index.CommitteeIndex):
trace "get_attesting_indices: inconsistent aggregation and committee length"
else:
var i = 0
for index in get_beacon_committee(epochRef, data.slot, data.index.CommitteeIndex):
if bits[i]:
yield index
inc i
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices
func get_attesting_indices*(epochRef: EpochRef, func get_attesting_indices*(epochRef: EpochRef,
data: AttestationData, data: AttestationData,
bits: CommitteeValidatorsBits): bits: CommitteeValidatorsBits):
HashSet[ValidatorIndex] = HashSet[ValidatorIndex] =
get_attesting_indices( for idx in get_attesting_indices(epochRef, data, bits):
bits, result.incl(idx)
get_beacon_committee(epochRef, data.slot, data.index.CommitteeIndex))
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_indexed_attestation # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_indexed_attestation
func get_indexed_attestation*(epochRef: EpochRef, attestation: Attestation): IndexedAttestation = func get_indexed_attestation*(epochRef: EpochRef, attestation: Attestation): IndexedAttestation =
@ -59,20 +100,6 @@ func get_indexed_attestation*(epochRef: EpochRef, attestation: Attestation): Ind
signature: attestation.signature signature: attestation.signature
) )
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee
func get_beacon_committee_len*(
epochRef: EpochRef, slot: Slot, index: CommitteeIndex): uint64 =
# Return the number of members in the beacon committee at ``slot`` for ``index``.
let
committees_per_slot = get_committee_count_per_slot(epochRef)
compute_committee_len(
count_active_validators(epochRef),
(slot mod SLOTS_PER_EPOCH) * committees_per_slot +
index.uint64,
committees_per_slot * SLOTS_PER_EPOCH
)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#aggregation-selection # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/validator.md#aggregation-selection
func is_aggregator*(epochRef: EpochRef, slot: Slot, index: CommitteeIndex, func is_aggregator*(epochRef: EpochRef, slot: Slot, index: CommitteeIndex,
slot_signature: ValidatorSig): bool = slot_signature: ValidatorSig): bool =

View File

@ -71,10 +71,10 @@ proc init*(T: type ForkChoice,
epoch = epochRef.epoch, blck = shortLog(blck) epoch = epochRef.epoch, blck = shortLog(blck)
let let
justified = BalanceCheckpoint(blck: blck, epochRef: epochRef) justified = BalanceCheckpoint(blck: blck, epoch: epochRef.epoch, balances: epochRef.effective_balances)
finalized = Checkpoint(root: blck.root, epoch: epochRef.epoch) finalized = Checkpoint(root: blck.root, epoch: epochRef.epoch)
best_justified = Checkpoint( best_justified = Checkpoint(
root: justified.blck.root, epoch: justified.epochRef.epoch) root: justified.blck.root, epoch: justified.epoch)
ForkChoice( ForkChoice(
backend: ForkChoiceBackend.init( backend: ForkChoiceBackend.init(
@ -95,7 +95,6 @@ func extend[T](s: var seq[T], minLen: int) =
proc compute_slots_since_epoch_start(slot: Slot): uint64 = proc compute_slots_since_epoch_start(slot: Slot): uint64 =
slot - slot.epoch().compute_start_slot_at_epoch() slot - slot.epoch().compute_start_slot_at_epoch()
proc on_tick(self: var Checkpoints, dag: ChainDAGRef, time: Slot): FcResult[void] = proc on_tick(self: var Checkpoints, dag: ChainDAGRef, time: Slot): FcResult[void] =
if self.time > time: if self.time > time:
return err(ForkChoiceError(kind: fcInconsistentTick)) return err(ForkChoiceError(kind: fcInconsistentTick))
@ -104,15 +103,17 @@ proc on_tick(self: var Checkpoints, dag: ChainDAGRef, time: Slot): FcResult[void
self.time = time self.time = time
if newEpoch and if newEpoch and
self.best_justified.epoch > self.justified.epochRef.epoch: self.best_justified.epoch > self.justified.epoch:
let blck = dag.getRef(self.best_justified.root) let blck = dag.getRef(self.best_justified.root)
if blck.isNil: if blck.isNil:
return err(ForkChoiceError( return err(ForkChoiceError(
kind: fcJustifiedNodeUnknown, block_root: self.best_justified.root)) kind: fcJustifiedNodeUnknown, block_root: self.best_justified.root))
let epochRef = dag.getEpochRef(blck, self.best_justified.epoch)
self.justified = BalanceCheckpoint( self.justified = BalanceCheckpoint(
blck: blck, blck: blck,
epochRef: dag.getEpochRef(blck, self.best_justified.epoch)) epoch: epochRef.epoch,
balances: epochRef.effective_balances)
ok() ok()
proc process_attestation_queue(self: var ForkChoice) {.gcsafe.} proc process_attestation_queue(self: var ForkChoice) {.gcsafe.}
@ -202,18 +203,15 @@ proc should_update_justified_checkpoint(
return ok(true) return ok(true)
let let
justified_slot = compute_start_slot_at_epoch(self.justified.epochRef.epoch) justified_slot = compute_start_slot_at_epoch(self.justified.epoch)
new_justified_checkpoint = epochRef.current_justified_checkpoint
let new_justified_checkpoint = epochRef.current_justified_checkpoint; justified_blck = dag.getRef(new_justified_checkpoint.root)
let justified_blck = dag.getRef(new_justified_checkpoint.root)
if justified_blck.isNil: if justified_blck.isNil:
return err(ForkChoiceError( return err(ForkChoiceError(
kind: fcJustifiedNodeUnknown, block_root: new_justified_checkpoint.root)) kind: fcJustifiedNodeUnknown, block_root: new_justified_checkpoint.root))
let justified_ancestor = let justified_ancestor = justified_blck.atSlot(justified_slot)
justified_blck.atSlot(justified_slot)
if justified_ancestor.blck.root != self.justified.blck.root: if justified_ancestor.blck.root != self.justified.blck.root:
return ok(false) return ok(false)
@ -231,38 +229,44 @@ proc process_state(self: var Checkpoints,
trace "Processing epoch", trace "Processing epoch",
epoch = epochRef.epoch, epoch = epochRef.epoch,
state_justified_epoch = state_justified_epoch, state_justified_epoch = state_justified_epoch,
current_justified = self.justified.epochRef.epoch, current_justified = self.justified.epoch,
state_finalized_epoch = state_finalized_epoch, state_finalized_epoch = state_finalized_epoch,
current_finalized = self.finalized.epoch current_finalized = self.finalized.epoch
if state_justified_epoch > self.justified.epochRef.epoch: if state_justified_epoch > self.justified.epoch:
if state_justified_epoch > self.best_justified.epoch: if state_justified_epoch > self.best_justified.epoch:
self.best_justified = epochRef.current_justified_checkpoint self.best_justified = epochRef.current_justified_checkpoint
if ? should_update_justified_checkpoint(self, dag, epochRef): if ? should_update_justified_checkpoint(self, dag, epochRef):
let justifiedBlck = blck.atEpochStart(state_justified_epoch) let
justifiedBlck = blck.atEpochStart(state_justified_epoch)
justifiedEpoch = dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)
self.justified = self.justified =
BalanceCheckpoint( BalanceCheckpoint(
blck: justifiedBlck.blck, blck: justifiedBlck.blck,
epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)) epoch: justifiedEpoch.epoch,
balances: justifiedEpoch.effective_balances)
if state_finalized_epoch > self.finalized.epoch: if state_finalized_epoch > self.finalized.epoch:
self.finalized = epochRef.finalized_checkpoint self.finalized = epochRef.finalized_checkpoint
if self.justified.epochRef.epoch != state_justified_epoch or if self.justified.epoch != state_justified_epoch or
self.justified.blck.root != epochRef.current_justified_checkpoint.root: self.justified.blck.root != epochRef.current_justified_checkpoint.root:
if (state_justified_epoch > self.justified.epochRef.epoch) or if (state_justified_epoch > self.justified.epoch) or
(self.justified.blck.atEpochStart(self.finalized.epoch).blck.root != (self.justified.blck.atEpochStart(self.finalized.epoch).blck.root !=
self.finalized.root): self.finalized.root):
let justifiedBlck = blck.atEpochStart(state_justified_epoch) let
justifiedBlck = blck.atEpochStart(state_justified_epoch)
justifiedEpoch = dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)
self.justified = self.justified =
BalanceCheckpoint( BalanceCheckpoint(
blck: justifiedBlck.blck, blck: justifiedBlck.blck,
epochRef: dag.getEpochRef(justifiedBlck.blck, state_justified_epoch)) epoch: justifiedEpoch.epoch,
balances: justifiedEpoch.effective_balances)
ok() ok()
proc process_block*(self: var ForkChoiceBackend, proc process_block*(self: var ForkChoiceBackend,
@ -360,10 +364,10 @@ proc get_head*(self: var ForkChoice,
? self.update_time(dag, wallSlot) ? self.update_time(dag, wallSlot)
self.backend.find_head( self.backend.find_head(
self.checkpoints.justified.epochRef.epoch, self.checkpoints.justified.epoch,
self.checkpoints.justified.blck.root, self.checkpoints.justified.blck.root,
self.checkpoints.finalized.epoch, self.checkpoints.finalized.epoch,
self.checkpoints.justified.epochRef.effective_balances, self.checkpoints.justified.balances,
) )
func prune*( func prune*(

View File

@ -135,7 +135,8 @@ type
BalanceCheckpoint* = object BalanceCheckpoint* = object
blck*: BlockRef blck*: BlockRef
epochRef*: EpochRef epoch*: Epoch
balances*: seq[Gwei]
Checkpoints* = object Checkpoints* = object
time*: Slot time*: Slot

View File

@ -460,9 +460,14 @@ proc is_valid_indexed_attestation*(
ok() ok()
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices
func get_attesting_indices*(bits: CommitteeValidatorsBits, iterator get_attesting_indices*(bits: CommitteeValidatorsBits,
committee: openArray[ValidatorIndex]): committee: openArray[ValidatorIndex]):
HashSet[ValidatorIndex] = ValidatorIndex =
if bits.len == committee.len:
for i, index in committee:
if bits[i]:
yield index
else:
# This shouldn't happen if one begins with a valid BeaconState and applies # This shouldn't happen if one begins with a valid BeaconState and applies
# valid updates, but one can construct a BeaconState where it does. Do not # valid updates, but one can construct a BeaconState where it does. Do not
# do anything here since the PendingAttestation wouldn't have made it past # do anything here since the PendingAttestation wouldn't have made it past
@ -470,13 +475,28 @@ func get_attesting_indices*(bits: CommitteeValidatorsBits,
# which checks len(attestation.aggregation_bits) == len(committee) that in # which checks len(attestation.aggregation_bits) == len(committee) that in
# nimbus-eth2 lives in check_attestation(...). # nimbus-eth2 lives in check_attestation(...).
# Addresses https://github.com/status-im/nimbus-eth2/issues/922 # Addresses https://github.com/status-im/nimbus-eth2/issues/922
if bits.len != committee.len:
trace "get_attesting_indices: inconsistent aggregation and committee length"
return
for i, index in committee: trace "get_attesting_indices: inconsistent aggregation and committee length"
func get_attesting_indices*(bits: CommitteeValidatorsBits,
committee: openArray[ValidatorIndex]):
HashSet[ValidatorIndex] =
for idx in get_attesting_indices(bits, committee):
result.incl idx
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices
iterator get_attesting_indices*(state: BeaconState,
data: AttestationData,
bits: CommitteeValidatorsBits,
cache: var StateCache): ValidatorIndex =
if bits.lenu64 != get_beacon_committee_len(state, data.slot, data.index.CommitteeIndex, cache):
trace "get_attesting_indices: inconsistent aggregation and committee length"
else:
var i = 0
for index in get_beacon_committee(state, data.slot, data.index.CommitteeIndex, cache):
if bits[i]: if bits[i]:
result.incl index yield index
inc i
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_attesting_indices
func get_attesting_indices*(state: BeaconState, func get_attesting_indices*(state: BeaconState,
@ -485,9 +505,8 @@ func get_attesting_indices*(state: BeaconState,
cache: var StateCache): cache: var StateCache):
HashSet[ValidatorIndex] = HashSet[ValidatorIndex] =
# Return the set of attesting indices corresponding to ``data`` and ``bits``. # Return the set of attesting indices corresponding to ``data`` and ``bits``.
get_attesting_indices( for index in get_attesting_indices(state, data, bits, cache):
bits, result.incl index
get_beacon_committee(state, data.slot, data.index.CommitteeIndex, cache))
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_indexed_attestation # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_indexed_attestation
func get_indexed_attestation(state: BeaconState, attestation: Attestation, func get_indexed_attestation(state: BeaconState, attestation: Attestation,

View File

@ -22,16 +22,15 @@ func get_attesting_indices*(
cache: var StateCache): HashSet[ValidatorIndex] = cache: var StateCache): HashSet[ValidatorIndex] =
# This is part of get_unslashed_attesting_indices(...) in spec. # This is part of get_unslashed_attesting_indices(...) in spec.
# Exported bceause of external trace-level chronicles logging. # Exported bceause of external trace-level chronicles logging.
result = initHashSet[ValidatorIndex]()
for a in attestations: for a in attestations:
result.incl get_attesting_indices( for idx in get_attesting_indices(
state, a.data, a.aggregation_bits, cache) state, a.data, a.aggregation_bits, cache):
result.incl idx
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#helper-functions-1 # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#helper-functions-1
func get_unslashed_attesting_indices*( func get_unslashed_attesting_indices*(
state: BeaconState, attestations: openArray[PendingAttestation], state: BeaconState, attestations: openArray[PendingAttestation],
cache: var StateCache): HashSet[ValidatorIndex] = cache: var StateCache): HashSet[ValidatorIndex] =
result = initHashSet[ValidatorIndex]()
for a in attestations: for a in attestations:
for idx in get_attesting_indices( for idx in get_attesting_indices(
state, a.data, a.aggregation_bits, cache): state, a.data, a.aggregation_bits, cache):

View File

@ -200,6 +200,13 @@ func compute_committee_slice*(
start.int..(endIdx.int - 1) start.int..(endIdx.int - 1)
iterator compute_committee*(shuffled_indices: seq[ValidatorIndex],
index: uint64, count: uint64): ValidatorIndex =
let
slice = compute_committee_slice(shuffled_indices.lenu64, index, count)
for i in slice:
yield shuffled_indices[i]
func compute_committee*(shuffled_indices: seq[ValidatorIndex], func compute_committee*(shuffled_indices: seq[ValidatorIndex],
index: uint64, count: uint64): seq[ValidatorIndex] = index: uint64, count: uint64): seq[ValidatorIndex] =
## Return the committee corresponding to ``indices``, ``seed``, ``index``, ## Return the committee corresponding to ``indices``, ``seed``, ``index``,
@ -224,6 +231,20 @@ func compute_committee_len*(
(slice.b - slice.a + 1).uint64 (slice.b - slice.a + 1).uint64
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/beacon-chain.md#get_beacon_committee
iterator get_beacon_committee*(
state: BeaconState, slot: Slot, index: CommitteeIndex,
cache: var StateCache): ValidatorIndex =
## Return the beacon committee at ``slot`` for ``index``.
let
epoch = compute_epoch_at_slot(slot)
committees_per_slot = get_committee_count_per_slot(state, epoch, cache)
for idx in compute_committee(
cache.get_shuffled_active_validator_indices(state, epoch),
(slot mod SLOTS_PER_EPOCH) * committees_per_slot +
index.uint64,
committees_per_slot * SLOTS_PER_EPOCH
): yield idx
func get_beacon_committee*( func get_beacon_committee*(
state: BeaconState, slot: Slot, index: CommitteeIndex, state: BeaconState, slot: Slot, index: CommitteeIndex,
cache: var StateCache): seq[ValidatorIndex] = cache: var StateCache): seq[ValidatorIndex] =

View File

@ -321,13 +321,11 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
let proposer = node.chainDag.getProposer(head, slot) let proposer = node.chainDag.getProposer(head, slot)
if proposer.isNone(): if proposer.isNone():
raise newException(CatchableError, "could not retrieve block for slot: " & $slot) raise newException(CatchableError, "could not retrieve block for slot: " & $slot)
let valInfo = ValidatorInfoForMakeBeaconBlock(kind: viRandao_reveal, let message = makeBeaconBlockForHeadAndSlot(
randao_reveal: randao_reveal) node, randao_reveal, proposer.get()[0], graffiti, head, slot)
let res = await makeBeaconBlockForHeadAndSlot( if message.isNone():
node, valInfo, proposer.get()[0], graffiti, head, slot)
if res.message.isNone():
raise newException(CatchableError, "could not retrieve block for slot: " & $slot) raise newException(CatchableError, "could not retrieve block for slot: " & $slot)
return res.message.get() return message.get()
rpcServer.rpc("post_v1_validator_block") do (body: SignedBeaconBlock) -> bool: rpcServer.rpc("post_v1_validator_block") do (body: SignedBeaconBlock) -> bool:
debug "post_v1_validator_block", debug "post_v1_validator_block",

View File

@ -18,7 +18,7 @@ import
eth/[keys, async_utils], eth/p2p/discoveryv5/[protocol, enr], eth/[keys, async_utils], eth/p2p/discoveryv5/[protocol, enr],
# Local modules # Local modules
spec/[datatypes, digest, crypto, helpers, validator, network, signatures], spec/[datatypes, digest, crypto, helpers, network, signatures],
spec/state_transition, spec/state_transition,
conf, time, validator_pool, conf, time, validator_pool,
attestation_pool, exit_pool, block_pools/[spec_cache, chain_dag, clearance], attestation_pool, exit_pool, block_pools/[spec_cache, chain_dag, clearance],
@ -176,46 +176,20 @@ proc createAndSendAttestation(node: BeaconNode,
validator = shortLog(validator), validator = shortLog(validator),
indexInCommittee = indexInCommittee indexInCommittee = indexInCommittee
type
ValidatorInfoForMakeBeaconBlockKind* = enum
viValidator
viRandao_reveal
ValidatorInfoForMakeBeaconBlock* = object
case kind*: ValidatorInfoForMakeBeaconBlockKind
of viValidator: validator*: AttachedValidator
of viRandao_reveal: randao_reveal*: ValidatorSig
proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
val_info: ValidatorInfoForMakeBeaconBlock, randao_reveal: ValidatorSig,
validator_index: ValidatorIndex, validator_index: ValidatorIndex,
graffiti: GraffitiBytes, graffiti: GraffitiBytes,
head: BlockRef, head: BlockRef,
slot: Slot): slot: Slot): Option[BeaconBlock] =
Future[tuple[message: Option[BeaconBlock], fork: Fork, # Advance state to the slot that we're proposing for
genesis_validators_root: Eth2Digest]] {.async.} = node.chainDag.withState(node.chainDag.tmpState, head.atSlot(slot)):
# Advance state to the slot that we're proposing for - this is the equivalent
# of running `process_slots` up to the slot of the new block.
node.chainDag.withState(
node.chainDag.tmpState, head.atSlot(slot)):
let (eth1data, deposits) = let (eth1data, deposits) =
if node.mainchainMonitor.isNil: if node.mainchainMonitor.isNil:
(state.eth1_data, newSeq[Deposit]()) (state.eth1_data, newSeq[Deposit]())
else: else:
node.mainchainMonitor.getBlockProposalData(state) node.mainchainMonitor.getBlockProposalData(state)
# TODO perhaps just making the enclosing function accept 2 different types at the
# same time and doing some compile-time branching logic is cleaner (without the
# need for the discriminated union)... but we need the `state` from `withState`
# in order to get the fork/root for the specific head/slot for the randao_reveal
# and it's causing problems when the function becomes a generic for 2 types...
proc getRandaoReveal(val_info: ValidatorInfoForMakeBeaconBlock):
Future[ValidatorSig] {.async.} =
if val_info.kind == viValidator:
return await val_info.validator.genRandaoReveal(
state.fork, state.genesis_validators_root, slot)
elif val_info.kind == viRandao_reveal:
return val_info.randao_reveal
let let
poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived poolPtr = unsafeAddr node.chainDag # safe because restore is short-lived
@ -226,12 +200,12 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
doAssert v.addr == addr poolPtr.tmpState.data doAssert v.addr == addr poolPtr.tmpState.data
assign(poolPtr.tmpState, poolPtr.headState) assign(poolPtr.tmpState, poolPtr.headState)
let message = makeBeaconBlock( makeBeaconBlock(
node.config.runtimePreset, node.config.runtimePreset,
hashedState, hashedState,
validator_index, validator_index,
head.root, head.root,
await getRandaoReveal(val_info), randao_reveal,
eth1data, eth1data,
graffiti, graffiti,
node.attestationPool[].getAttestationsForBlock(state, cache), node.attestationPool[].getAttestationsForBlock(state, cache),
@ -242,15 +216,6 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
restore, restore,
cache) cache)
if message.isSome():
# TODO this restore is needed because otherwise tmpState will be internally
# inconsistent - it's blck will not be pointing to the block that
# created this state - we have to reset it here before `await` to avoid
# races.
restore(poolPtr.tmpState.data)
return (message, state.fork, state.genesis_validators_root)
proc proposeSignedBlock*(node: BeaconNode, proc proposeSignedBlock*(node: BeaconNode,
head: BlockRef, head: BlockRef,
validator: AttachedValidator, validator: AttachedValidator,
@ -310,27 +275,33 @@ proc proposeBlock(node: BeaconNode,
existingProposal = notSlashable.error existingProposal = notSlashable.error
return head return head
let valInfo = ValidatorInfoForMakeBeaconBlock(kind: viValidator, validator: validator) let
let beaconBlockTuple = await makeBeaconBlockForHeadAndSlot( fork = node.chainDag.headState.data.data.fork
node, valInfo, validator_index, node.graffitiBytes, head, slot) genesis_validators_root =
if not beaconBlockTuple.message.isSome(): node.chainDag.headState.data.data.genesis_validators_root
let
randao = await validator.genRandaoReveal(
fork, genesis_validators_root, slot)
message = makeBeaconBlockForHeadAndSlot(
node, randao, validator_index, node.graffitiBytes, head, slot)
if not message.isSome():
return head # already logged elsewhere! return head # already logged elsewhere!
var var
newBlock = SignedBeaconBlock( newBlock = SignedBeaconBlock(
message: beaconBlockTuple.message.get() message: message.get()
) )
newBlock.root = hash_tree_root(newBlock.message) newBlock.root = hash_tree_root(newBlock.message)
# TODO: recomputed in block proposal # TODO: recomputed in block proposal
let signing_root = compute_block_root( let signing_root = compute_block_root(
beaconBlockTuple.fork, beaconBlockTuple.genesis_validators_root, slot, newBlock.root) fork, genesis_validators_root, slot, newBlock.root)
node.attachedValidators node.attachedValidators
.slashingProtection .slashingProtection
.registerBlock(validator.pubkey, slot, signing_root) .registerBlock(validator.pubkey, slot, signing_root)
newBlock.signature = await validator.signBlockProposal( newBlock.signature = await validator.signBlockProposal(
beaconBlockTuple.fork, beaconBlockTuple.genesis_validators_root, slot, newBlock.root) fork, genesis_validators_root, slot, newBlock.root)
return await node.proposeSignedBlock(head, validator, newBlock) return await node.proposeSignedBlock(head, validator, newBlock)
@ -458,11 +429,13 @@ proc broadcastAggregatedAttestations(
# way to organize this. Then the private key for that validator should be # way to organize this. Then the private key for that validator should be
# the corresponding one -- whatver they are, they match. # the corresponding one -- whatver they are, they match.
let bs = BlockSlot(blck: aggregationHead, slot: aggregationSlot)
node.chainDag.withState(node.chainDag.tmpState, bs):
let let
committees_per_slot = bs = BlockSlot(blck: aggregationHead, slot: aggregationSlot)
get_committee_count_per_slot(state, aggregationSlot.epoch, cache) epochRef = node.chainDag.getEpochRef(aggregationHead, aggregationSlot.epoch)
fork = node.chainDag.headState.data.data.fork
genesis_validators_root =
node.chainDag.headState.data.data.genesis_validators_root
committees_per_slot = get_committee_count_per_slot(epochRef)
var var
slotSigs: seq[Future[ValidatorSig]] = @[] slotSigs: seq[Future[ValidatorSig]] = @[]
@ -472,31 +445,30 @@ proc broadcastAggregatedAttestations(
for committee_index in 0'u64..<committees_per_slot: for committee_index in 0'u64..<committees_per_slot:
let committee = get_beacon_committee( let committee = get_beacon_committee(
state, aggregationSlot, committee_index.CommitteeIndex, cache) epochRef, aggregationSlot, committee_index.CommitteeIndex)
for index_in_committee, validatorIdx in committee: for index_in_committee, validatorIdx in committee:
let validator = node.getAttachedValidator(state, validatorIdx) let validator = node.getAttachedValidator(epochRef, validatorIdx)
if validator != nil: if validator != nil:
# the validator index and private key pair. # the validator index and private key pair.
slotSigs.add getSlotSig(validator, state.fork, slotSigs.add getSlotSig(validator, fork,
state.genesis_validators_root, state.slot) genesis_validators_root, aggregationSlot)
slotSigsData.add (committee_index, validatorIdx, validator) slotSigsData.add (committee_index, validatorIdx, validator)
await allFutures(slotSigs) await allFutures(slotSigs)
for curr in zip(slotSigsData, slotSigs): for curr in zip(slotSigsData, slotSigs):
let aggregateAndProof = let aggregateAndProof =
aggregate_attestations(node.attestationPool[], state, aggregate_attestations(node.attestationPool[], epochRef, aggregationSlot,
curr[0].committee_index.CommitteeIndex, curr[0].committee_index.CommitteeIndex,
curr[0].validator_idx, curr[0].validator_idx,
curr[1].read, cache) curr[1].read)
# Don't broadcast when, e.g., this node isn't aggregator # Don't broadcast when, e.g., this node isn't aggregator
# TODO verify there is only one isSome() with test. # TODO verify there is only one isSome() with test.
if aggregateAndProof.isSome: if aggregateAndProof.isSome:
let sig = await signAggregateAndProof(curr[0].v, let sig = await signAggregateAndProof(curr[0].v,
aggregateAndProof.get, state.fork, aggregateAndProof.get, fork, genesis_validators_root)
state.genesis_validators_root)
var signedAP = SignedAggregateAndProof( var signedAP = SignedAggregateAndProof(
message: aggregateAndProof.get, message: aggregateAndProof.get,
signature: sig) signature: sig)