only subscribe to subnets when aggregating (#2254)

Only subscribe to subnets when aggregating
This commit is contained in:
tersec 2021-01-25 18:39:56 +01:00 committed by GitHub
parent 8c48d44788
commit 7d74d3bfbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 36 deletions

View File

@ -225,6 +225,11 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
+ Compile OK + Compile OK
``` ```
OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1
## Time utilities
```diff
+ humaneStr OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Zero signature sanity checks ## Zero signature sanity checks
```diff ```diff
+ SSZ serialization roundtrip of SignedBeaconBlockHeader OK + SSZ serialization roundtrip of SignedBeaconBlockHeader OK
@ -275,4 +280,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 2/2 Fail: 0/2 Skip: 0/2 OK: 2/2 Fail: 0/2 Skip: 0/2
---TOTAL--- ---TOTAL---
OK: 148/157 Fail: 0/157 Skip: 9/157 OK: 149/158 Fail: 0/158 Skip: 9/158

View File

@ -7,7 +7,7 @@
import import
# Standard library # Standard library
std/[math, os, strformat, strutils, tables, times, std/[math, os, sequtils, strformat, strutils, tables, times,
terminal, osproc], terminal, osproc],
system/ansi_c, system/ansi_c,
@ -24,11 +24,14 @@ import
# Local modules # Local modules
./rpc/[beacon_api, config_api, debug_api, event_api, nimbus_api, node_api, ./rpc/[beacon_api, config_api, debug_api, event_api, nimbus_api, node_api,
validator_api], validator_api],
spec/[datatypes, digest, crypto, beaconstate, helpers, network, presets], spec/[
datatypes, digest, crypto, beaconstate, helpers, network, presets,
validator],
spec/[weak_subjectivity, signatures], spec/[weak_subjectivity, signatures],
spec/eth2_apis/beacon_rpc_client, spec/eth2_apis/beacon_rpc_client,
conf, time, beacon_chain_db, validator_pool, extras, conf, time, beacon_chain_db, validator_pool, extras,
attestation_pool, exit_pool, eth2_network, eth2_discovery, attestation_aggregation, attestation_pool, exit_pool, eth2_network,
eth2_discovery,
beacon_node_common, beacon_node_types, beacon_node_status, beacon_node_common, beacon_node_types, beacon_node_status,
block_pools/[chain_dag, quarantine, clearance, block_pools_types], block_pools/[chain_dag, quarantine, clearance, block_pools_types],
nimbus_binary_common, network_metadata, nimbus_binary_common, network_metadata,
@ -391,22 +394,81 @@ func getStabilitySubnets(stabilitySubnets: auto): set[uint8] =
for subnetInfo in stabilitySubnets: for subnetInfo in stabilitySubnets:
result.incl subnetInfo.subnet result.incl subnetInfo.subnet
proc getAttachedValidators(node: BeaconNode): HashSet[ValidatorIndex] = proc getAttachedValidators(node: BeaconNode):
Table[ValidatorIndex, AttachedValidator] =
for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len: for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len:
if node.getAttachedValidator( let attachedValidator = node.getAttachedValidator(
node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) != nil: node.chainDag.headState.data.data, validatorIndex.ValidatorIndex)
result.incl validatorIndex.ValidatorIndex if attachedValidator.isNil:
continue
result[validatorIndex.ValidatorIndex] = attachedValidator
proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) = proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} =
doAssert epoch >= 1 doAssert epoch >= 1
let attachedValidators = node.getAttachedValidators() let
for (subnetIndex, slot) in get_committee_assignments( attachedValidators = node.getAttachedValidators()
node.chainDag.headState.data.data, epoch, attachedValidators): validatorIndices = toHashSet(toSeq(attachedValidators.keys()))
var cache = StateCache()
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#lookahead
# Only subscribe when this node should aggregate; libp2p broadcasting works
# on subnet topics regardless.
#
# Committee sizes in any given epoch vary by 1, i.e. committee sizes $n$
# $n+1$ can exist. Furthermore, according to
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#aggregation-selection
# is_aggregator uses `len(committee) div TARGET_AGGREGATORS_PER_COMMITTEE`
# to determine whether committee length/slot signature pairs aggregate the
# attestations in a slot/committee, where TARGET_AGGREGATORS_PER_COMMITTEE
# is currently 16 in all defined presets. Therefore, probe a committee len
# to determine whether it's possible that it's within a boundary such that
# either that length or other possible committee lengths don't cross those
# div/mod 16 boundaries which would change is_aggregator results.
static: doAssert TARGET_AGGREGATORS_PER_COMMITTEE == 16 # mainnet, minimal
let
probeCommitteeLen = get_beacon_committee_len(
node.chainDag.headState.data.data, compute_start_slot_at_epoch(epoch),
0.CommitteeIndex, cache)
# Without knowing whether probeCommitteeLen is the higher or lower, if it's
# [-1, 1] mod TARGET_AGGREGATORS_PER_COMMITTEE it might cross boundaries in
# is_aggregator, such that one can't hoist committee length calculation out
# of the anyIt(...) loop.
isConstAggregationLen =
(probeCommitteeLen mod TARGET_AGGREGATORS_PER_COMMITTEE) notin
[0'u64, 1'u64, TARGET_AGGREGATORS_PER_COMMITTEE - 1]
template isAnyCommitteeValidatorAggregating(
validatorIndices, committeeLen: untyped, slot: Slot): bool =
anyIt(
validatorIndices,
is_aggregator(
committeeLen,
await attachedValidators[it].getSlotSig(
node.chainDag.headState.data.data.fork,
node.chainDag.headState.data.data.genesis_validators_root, slot)))
for (validatorIndices, committeeIndex, subnetIndex, slot) in
get_committee_assignments(
node.chainDag.headState.data.data, epoch, validatorIndices, cache):
doAssert compute_epoch_at_slot(slot) == epoch
let committeeLen =
if isConstAggregationLen:
probeCommitteeLen
else:
get_beacon_committee_len(
node.chainDag.headState.data.data, slot, committeeIndex, cache)
if not isAnyCommitteeValidatorAggregating(
validatorIndices, committeeLen, slot):
continue
node.attestationSubnets.unsubscribeSlot[subnetIndex] = node.attestationSubnets.unsubscribeSlot[subnetIndex] =
max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnetIndex]) max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnetIndex])
if subnetIndex notin node.attestationSubnets.subscribedSubnets: if subnetIndex notin node.attestationSubnets.subscribedSubnets:
# A Pyrmont test validator didn't manage to subscribe in time with
# SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS being 32.
const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34 const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34
node.attestationSubnets.subscribeSlot[subnetIndex] = node.attestationSubnets.subscribeSlot[subnetIndex] =
@ -440,7 +502,7 @@ proc updateStabilitySubnets(node: BeaconNode, slot: Slot): set[uint8] =
proc cycleAttestationSubnetsPerEpoch( proc cycleAttestationSubnetsPerEpoch(
node: BeaconNode, wallSlot: Slot, prevStabilitySubnets: set[uint8]): node: BeaconNode, wallSlot: Slot, prevStabilitySubnets: set[uint8]):
set[uint8] = Future[set[uint8]] {.async.} =
# Per-epoch portion of subnet cycling: updating stability subnets and # Per-epoch portion of subnet cycling: updating stability subnets and
# calculating future attestation subnets. # calculating future attestation subnets.
@ -449,6 +511,7 @@ proc cycleAttestationSubnetsPerEpoch(
# of wallSlot, the clock's just incorrect. If the state slot's behind # of wallSlot, the clock's just incorrect. If the state slot's behind
# wallSlot, it would have to look more than MIN_SEED_LOOKAHEAD epochs # wallSlot, it would have to look more than MIN_SEED_LOOKAHEAD epochs
# ahead to compute the shuffling determining the beacon committees. # ahead to compute the shuffling determining the beacon committees.
static: doAssert MIN_SEED_LOOKAHEAD == 1
if node.chainDag.headState.data.data.slot.epoch != wallSlot.epoch: if node.chainDag.headState.data.data.slot.epoch != wallSlot.epoch:
debug "Requested attestation subnets too far in advance", debug "Requested attestation subnets too far in advance",
wallSlot, wallSlot,
@ -463,7 +526,7 @@ proc cycleAttestationSubnetsPerEpoch(
# attestations quickly and any individual attestation's likelihood of being # attestations quickly and any individual attestation's likelihood of being
# selected is low. # selected is low.
if node.attestationSubnets.nextCycleEpoch <= wallSlot.epoch: if node.attestationSubnets.nextCycleEpoch <= wallSlot.epoch:
node.updateSubscriptionSchedule(wallSlot.epoch + 1) await node.updateSubscriptionSchedule(wallSlot.epoch + 1)
node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1 node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1
let stabilitySubnets = node.updateStabilitySubnets(wallSlot) let stabilitySubnets = node.updateStabilitySubnets(wallSlot)
@ -475,9 +538,9 @@ proc cycleAttestationSubnetsPerEpoch(
# subnets in that case. # subnets in that case.
node.updateStabilitySubnetMetadata(stabilitySubnets) node.updateStabilitySubnetMetadata(stabilitySubnets)
stabilitySubnets return stabilitySubnets
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
doAssert not node.config.subscribeAllSubnets doAssert not node.config.subscribeAllSubnets
@ -495,7 +558,7 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) =
prevStabilitySubnets = prevStabilitySubnets =
getStabilitySubnets(node.attestationSubnets.stabilitySubnets) getStabilitySubnets(node.attestationSubnets.stabilitySubnets)
stabilitySubnets = stabilitySubnets =
node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets) await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets)
# Accounting specific to non-stability subnets # Accounting specific to non-stability subnets
for expiringSubnet in for expiringSubnet in
@ -531,11 +594,14 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) =
proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] = proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] =
let let
wallEpoch = node.beaconClock.now().slotOrZero().epoch wallEpoch = node.beaconClock.now().slotOrZero().epoch
validator_indices = node.getAttachedValidators() validatorIndices =
toHashSet(toSeq(node.getAttachedValidators().keys()))
var cache = StateCache()
template mergeAttestationSubnets(epoch: Epoch) = template mergeAttestationSubnets(epoch: Epoch) =
for (subnetIndex, slot) in get_committee_assignments( for (_, ci, subnetIndex, slot) in get_committee_assignments(
node.chainDag.headState.data.data, epoch, validator_indices): node.chainDag.headState.data.data, epoch, validatorIndices, cache):
if subnetIndex in result: if subnetIndex in result:
result[subnetIndex] = max(result[subnetIndex], slot + 1) result[subnetIndex] = max(result[subnetIndex], slot + 1)
else: else:
@ -722,9 +788,9 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) =
# subnets and they'll all remain subscribed. # subnets and they'll all remain subscribed.
if node.getTopicSubscriptionEnabled and not node.config.subscribeAllSubnets: if node.getTopicSubscriptionEnabled and not node.config.subscribeAllSubnets:
# This exits early all but one call each epoch. # This exits early all but one call each epoch.
node.cycleAttestationSubnets(slot) traceAsyncErrors node.cycleAttestationSubnets(slot)
proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot) = proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot) {.async.} =
# Things we do when slot processing has ended and we're about to wait for the # Things we do when slot processing has ended and we're about to wait for the
# next slot # next slot
@ -812,7 +878,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
slot = wallSlot.slot # afterGenesis == true! slot = wallSlot.slot # afterGenesis == true!
nextSlot = slot + 1 nextSlot = slot + 1
defer: onSlotEnd(node, slot, nextSlot) defer: await onSlotEnd(node, slot, nextSlot)
beacon_slot.set slot.int64 beacon_slot.set slot.int64
beacon_current_epoch.set slot.epoch.int64 beacon_current_epoch.set slot.epoch.int64

View File

@ -86,21 +86,26 @@ func getAttestationTopic*(forkDigest: ForkDigest, subnetIndex: uint64):
except ValueError as e: except ValueError as e:
raiseAssert e.msg raiseAssert e.msg
func get_committee_assignments*( iterator get_committee_assignments*(
state: BeaconState, epoch: Epoch, state: BeaconState, epoch: Epoch,
validator_indices: HashSet[ValidatorIndex]): validator_indices: HashSet[ValidatorIndex],
seq[tuple[subnetIndex: uint8, slot: Slot]] = cache: var StateCache):
var cache = StateCache() tuple[validatorIndices: HashSet[ValidatorIndex],
committeeIndex: CommitteeIndex,
subnetIndex: uint8, slot: Slot] =
let let
committees_per_slot = get_committee_count_per_slot(state, epoch, cache) committees_per_slot = get_committee_count_per_slot(state, epoch, cache)
start_slot = compute_start_slot_at_epoch(epoch) start_slot = compute_start_slot_at_epoch(epoch)
for slot in start_slot ..< start_slot + SLOTS_PER_EPOCH: for slot in start_slot ..< start_slot + SLOTS_PER_EPOCH:
for index in 0'u64 ..< committees_per_slot: for index in 0'u64 ..< committees_per_slot:
let idx = index.CommitteeIndex let
if not disjoint(validator_indices, idx = index.CommitteeIndex
get_beacon_committee(state, slot, idx, cache).toHashSet): includedIndices =
result.add( toHashSet(get_beacon_committee(state, slot, idx, cache)) *
(compute_subnet_for_attestation(committees_per_slot, slot, idx).uint8, validator_indices
slot)) if includedIndices.len > 0:
yield (
includedIndices, idx,
compute_subnet_for_attestation(committees_per_slot, slot, idx).uint8,
slot)