From 7d74d3bfbcdcb00a15d7513caaeb08b46833df7e Mon Sep 17 00:00:00 2001 From: tersec Date: Mon, 25 Jan 2021 18:39:56 +0100 Subject: [PATCH] only subscribe to subnets when aggregating (#2254) Only subscribe to subnets when aggregating --- AllTests-mainnet.md | 7 +- beacon_chain/nimbus_beacon_node.nim | 114 ++++++++++++++++++++++------ beacon_chain/spec/network.nim | 27 ++++--- 3 files changed, 112 insertions(+), 36 deletions(-) diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index 2a5eec613..522a97f50 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -225,6 +225,11 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 + Compile OK ``` OK: 1/1 Fail: 0/1 Skip: 0/1 +## Time utilities +```diff ++ humaneStr OK +``` +OK: 1/1 Fail: 0/1 Skip: 0/1 ## Zero signature sanity checks ```diff + SSZ serialization roundtrip of SignedBeaconBlockHeader OK @@ -275,4 +280,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 2/2 Fail: 0/2 Skip: 0/2 ---TOTAL--- -OK: 148/157 Fail: 0/157 Skip: 9/157 +OK: 149/158 Fail: 0/158 Skip: 9/158 diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index f0c693bed..5e6214ca4 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -7,7 +7,7 @@ import # Standard library - std/[math, os, strformat, strutils, tables, times, + std/[math, os, sequtils, strformat, strutils, tables, times, terminal, osproc], system/ansi_c, @@ -24,11 +24,14 @@ import # Local modules ./rpc/[beacon_api, config_api, debug_api, event_api, nimbus_api, node_api, validator_api], - spec/[datatypes, digest, crypto, beaconstate, helpers, network, presets], + spec/[ + datatypes, digest, crypto, beaconstate, helpers, network, presets, + validator], spec/[weak_subjectivity, signatures], spec/eth2_apis/beacon_rpc_client, conf, time, beacon_chain_db, validator_pool, extras, - attestation_pool, exit_pool, eth2_network, eth2_discovery, + attestation_aggregation, attestation_pool, exit_pool, eth2_network, + eth2_discovery, beacon_node_common, beacon_node_types, beacon_node_status, block_pools/[chain_dag, quarantine, clearance, block_pools_types], nimbus_binary_common, network_metadata, @@ -391,22 +394,81 @@ func getStabilitySubnets(stabilitySubnets: auto): set[uint8] = for subnetInfo in stabilitySubnets: result.incl subnetInfo.subnet -proc getAttachedValidators(node: BeaconNode): HashSet[ValidatorIndex] = +proc getAttachedValidators(node: BeaconNode): + Table[ValidatorIndex, AttachedValidator] = for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len: - if node.getAttachedValidator( - node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) != nil: - result.incl validatorIndex.ValidatorIndex + let attachedValidator = node.getAttachedValidator( + node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) + if attachedValidator.isNil: + continue + result[validatorIndex.ValidatorIndex] = attachedValidator -proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) = +proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} = doAssert epoch >= 1 - let attachedValidators = node.getAttachedValidators() - for (subnetIndex, slot) in get_committee_assignments( - node.chainDag.headState.data.data, epoch, attachedValidators): + let + attachedValidators = node.getAttachedValidators() + validatorIndices = toHashSet(toSeq(attachedValidators.keys())) + + var cache = StateCache() + + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#lookahead + # Only subscribe when this node should aggregate; libp2p broadcasting works + # on subnet topics regardless. + # + # Committee sizes in any given epoch vary by 1, i.e. committee sizes $n$ + # $n+1$ can exist. Furthermore, according to + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#aggregation-selection + # is_aggregator uses `len(committee) div TARGET_AGGREGATORS_PER_COMMITTEE` + # to determine whether committee length/slot signature pairs aggregate the + # attestations in a slot/committee, where TARGET_AGGREGATORS_PER_COMMITTEE + # is currently 16 in all defined presets. Therefore, probe a committee len + # to determine whether it's possible that it's within a boundary such that + # either that length or other possible committee lengths don't cross those + # div/mod 16 boundaries which would change is_aggregator results. + static: doAssert TARGET_AGGREGATORS_PER_COMMITTEE == 16 # mainnet, minimal + + let + probeCommitteeLen = get_beacon_committee_len( + node.chainDag.headState.data.data, compute_start_slot_at_epoch(epoch), + 0.CommitteeIndex, cache) + + # Without knowing whether probeCommitteeLen is the higher or lower, if it's + # [-1, 1] mod TARGET_AGGREGATORS_PER_COMMITTEE it might cross boundaries in + # is_aggregator, such that one can't hoist committee length calculation out + # of the anyIt(...) loop. + isConstAggregationLen = + (probeCommitteeLen mod TARGET_AGGREGATORS_PER_COMMITTEE) notin + [0'u64, 1'u64, TARGET_AGGREGATORS_PER_COMMITTEE - 1] + + template isAnyCommitteeValidatorAggregating( + validatorIndices, committeeLen: untyped, slot: Slot): bool = + anyIt( + validatorIndices, + is_aggregator( + committeeLen, + await attachedValidators[it].getSlotSig( + node.chainDag.headState.data.data.fork, + node.chainDag.headState.data.data.genesis_validators_root, slot))) + + for (validatorIndices, committeeIndex, subnetIndex, slot) in + get_committee_assignments( + node.chainDag.headState.data.data, epoch, validatorIndices, cache): + + doAssert compute_epoch_at_slot(slot) == epoch + let committeeLen = + if isConstAggregationLen: + probeCommitteeLen + else: + get_beacon_committee_len( + node.chainDag.headState.data.data, slot, committeeIndex, cache) + + if not isAnyCommitteeValidatorAggregating( + validatorIndices, committeeLen, slot): + continue + node.attestationSubnets.unsubscribeSlot[subnetIndex] = max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnetIndex]) if subnetIndex notin node.attestationSubnets.subscribedSubnets: - # A Pyrmont test validator didn't manage to subscribe in time with - # SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS being 32. const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34 node.attestationSubnets.subscribeSlot[subnetIndex] = @@ -440,7 +502,7 @@ proc updateStabilitySubnets(node: BeaconNode, slot: Slot): set[uint8] = proc cycleAttestationSubnetsPerEpoch( node: BeaconNode, wallSlot: Slot, prevStabilitySubnets: set[uint8]): - set[uint8] = + Future[set[uint8]] {.async.} = # Per-epoch portion of subnet cycling: updating stability subnets and # calculating future attestation subnets. @@ -449,6 +511,7 @@ proc cycleAttestationSubnetsPerEpoch( # of wallSlot, the clock's just incorrect. If the state slot's behind # wallSlot, it would have to look more than MIN_SEED_LOOKAHEAD epochs # ahead to compute the shuffling determining the beacon committees. + static: doAssert MIN_SEED_LOOKAHEAD == 1 if node.chainDag.headState.data.data.slot.epoch != wallSlot.epoch: debug "Requested attestation subnets too far in advance", wallSlot, @@ -463,7 +526,7 @@ proc cycleAttestationSubnetsPerEpoch( # attestations quickly and any individual attestation's likelihood of being # selected is low. if node.attestationSubnets.nextCycleEpoch <= wallSlot.epoch: - node.updateSubscriptionSchedule(wallSlot.epoch + 1) + await node.updateSubscriptionSchedule(wallSlot.epoch + 1) node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1 let stabilitySubnets = node.updateStabilitySubnets(wallSlot) @@ -475,9 +538,9 @@ proc cycleAttestationSubnetsPerEpoch( # subnets in that case. node.updateStabilitySubnetMetadata(stabilitySubnets) - stabilitySubnets + return stabilitySubnets -proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = +proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 doAssert not node.config.subscribeAllSubnets @@ -495,7 +558,7 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = prevStabilitySubnets = getStabilitySubnets(node.attestationSubnets.stabilitySubnets) stabilitySubnets = - node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets) + await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets) # Accounting specific to non-stability subnets for expiringSubnet in @@ -531,11 +594,14 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] = let wallEpoch = node.beaconClock.now().slotOrZero().epoch - validator_indices = node.getAttachedValidators() + validatorIndices = + toHashSet(toSeq(node.getAttachedValidators().keys())) + + var cache = StateCache() template mergeAttestationSubnets(epoch: Epoch) = - for (subnetIndex, slot) in get_committee_assignments( - node.chainDag.headState.data.data, epoch, validator_indices): + for (_, ci, subnetIndex, slot) in get_committee_assignments( + node.chainDag.headState.data.data, epoch, validatorIndices, cache): if subnetIndex in result: result[subnetIndex] = max(result[subnetIndex], slot + 1) else: @@ -722,9 +788,9 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) = # subnets and they'll all remain subscribed. if node.getTopicSubscriptionEnabled and not node.config.subscribeAllSubnets: # This exits early all but one call each epoch. - node.cycleAttestationSubnets(slot) + traceAsyncErrors node.cycleAttestationSubnets(slot) -proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot) = +proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot) {.async.} = # Things we do when slot processing has ended and we're about to wait for the # next slot @@ -812,7 +878,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = slot = wallSlot.slot # afterGenesis == true! nextSlot = slot + 1 - defer: onSlotEnd(node, slot, nextSlot) + defer: await onSlotEnd(node, slot, nextSlot) beacon_slot.set slot.int64 beacon_current_epoch.set slot.epoch.int64 diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index cb4b53f97..8675ea6e9 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -86,21 +86,26 @@ func getAttestationTopic*(forkDigest: ForkDigest, subnetIndex: uint64): except ValueError as e: raiseAssert e.msg -func get_committee_assignments*( +iterator get_committee_assignments*( state: BeaconState, epoch: Epoch, - validator_indices: HashSet[ValidatorIndex]): - seq[tuple[subnetIndex: uint8, slot: Slot]] = - var cache = StateCache() - + validator_indices: HashSet[ValidatorIndex], + cache: var StateCache): + tuple[validatorIndices: HashSet[ValidatorIndex], + committeeIndex: CommitteeIndex, + subnetIndex: uint8, slot: Slot] = let committees_per_slot = get_committee_count_per_slot(state, epoch, cache) start_slot = compute_start_slot_at_epoch(epoch) for slot in start_slot ..< start_slot + SLOTS_PER_EPOCH: for index in 0'u64 ..< committees_per_slot: - let idx = index.CommitteeIndex - if not disjoint(validator_indices, - get_beacon_committee(state, slot, idx, cache).toHashSet): - result.add( - (compute_subnet_for_attestation(committees_per_slot, slot, idx).uint8, - slot)) + let + idx = index.CommitteeIndex + includedIndices = + toHashSet(get_beacon_committee(state, slot, idx, cache)) * + validator_indices + if includedIndices.len > 0: + yield ( + includedIndices, idx, + compute_subnet_for_attestation(committees_per_slot, slot, idx).uint8, + slot)