diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index c94089f64..b28579700 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -1095,6 +1095,7 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID, {"eth2": SSZ.encode(result.forkId), "attnets": SSZ.encode(result.metadata.attnets)}, rng) result.discoveryEnabled = discovery + result.rng = rng newSeq result.protocolStates, allProtocols.len for proto in allProtocols: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index d62ef4543..c697b8b20 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -7,8 +7,8 @@ import # Standard library - std/[os, tables, strutils, strformat, times, math, - terminal, osproc, random], + std/[math, os, strformat, strutils, tables, times, + terminal, osproc], system/ansi_c, # Nimble packages @@ -19,7 +19,7 @@ import eth/[keys, async_utils], eth/db/[kvstore, kvstore_sqlite3], - eth/p2p/enode, eth/p2p/discoveryv5/[protocol, enr], + eth/p2p/enode, eth/p2p/discoveryv5/[protocol, enr, random2], # Local modules ./rpc/[beacon_api, config_api, debug_api, event_api, nimbus_api, node_api, @@ -390,15 +390,58 @@ func getStabilitySubnets(stabilitySubnets: auto): set[uint8] = for subnetInfo in stabilitySubnets: result.incl subnetInfo.subnet -proc getAttachedValidators(node: BeaconNode): seq[ValidatorIndex] = +proc getAttachedValidators(node: BeaconNode): HashSet[ValidatorIndex] = for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len: if node.getAttachedValidator( node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) != nil: - result.add validatorIndex.ValidatorIndex + result.incl validatorIndex.ValidatorIndex -proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = - static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 - doAssert not node.config.subscribeAllSubnets +proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) = + doAssert epoch >= 1 + let attachedValidators = node.getAttachedValidators() + for (subnetIndex, slot) in get_committee_assignments( + node.chainDag.headState.data.data, epoch, attachedValidators): + node.attestationSubnets.unsubscribeSlot[subnetIndex] = + max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnetIndex]) + if subnetIndex notin node.attestationSubnets.subscribedSubnets: + # A Pyrmont test validator didn't manage to subscribe in time with + # SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS being 32. + const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34 + + node.attestationSubnets.subscribeSlot[subnetIndex] = + # Queue upcoming subscription potentially earlier + # SLOTS_PER_EPOCH emulates one boundary condition of the per-epoch + # cycling mechanism timing buffers + min( + slot - min(slot.uint64, SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS), + node.attestationSubnets.subscribeSlot[subnetIndex]) + +# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability +proc getStabilitySubnetLength(node: BeaconNode): uint64 = + EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION + + node.network.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64 + +proc updateStabilitySubnets(node: BeaconNode, slot: Slot): set[uint8] = + # Equivalent to wallSlot by cycleAttestationSubnets(), especially + # since it'll try to run early in epochs, avoiding race conditions. + static: doAssert ATTESTATION_SUBNET_COUNT <= high(uint8) + let epoch = slot.epoch + + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability + for i in 0 ..< node.attestationSubnets.stabilitySubnets.len: + if epoch >= node.attestationSubnets.stabilitySubnets[i].expiration: + node.attestationSubnets.stabilitySubnets[i].subnet = + node.network.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).uint8 + node.attestationSubnets.stabilitySubnets[i].expiration = + epoch + node.getStabilitySubnetLength() + + result.incl node.attestationSubnets.stabilitySubnets[i].subnet + +proc cycleAttestationSubnetsPerEpoch( + node: BeaconNode, wallSlot: Slot, prevStabilitySubnets: set[uint8]): + set[uint8] = + # Per-epoch portion of subnet cycling: updating stability subnets and + # calculating future attestation subnets. # Only know RANDAO mix, which determines shuffling seed, one epoch in # advance. When node.chainDag.headState.data.data.slot.epoch is ahead @@ -409,11 +452,7 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = debug "Requested attestation subnets too far in advance", wallSlot, stateSlot = node.chainDag.headState.data.data.slot - return - - if node.attestationSubnets.nextCycleEpoch > wallSlot.epoch: - return - node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1 + return prevStabilitySubnets # This works so long as at least one block in an epoch provides a basis for # calculating the shuffling for the next epoch. It will keep checking for a @@ -422,52 +461,84 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = # important to attest regardless, as those upcoming blocks will hit maximum # attestations quickly and any individual attestation's likelihood of being # selected is low. - let - epochParity = wallSlot.epoch mod 2 - attachedValidators = node.getAttachedValidators() + if node.attestationSubnets.nextCycleEpoch <= wallSlot.epoch: + node.updateSubscriptionSchedule(wallSlot.epoch + 1) + node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1 - let (newAttestationSubnets, expiringSubnets, newSubnets) = - get_attestation_subnet_changes( - node.chainDag.headState.data.data, attachedValidators, - node.attestationSubnets) + let stabilitySubnets = node.updateStabilitySubnets(wallSlot) - let prevStabilitySubnets = - getStabilitySubnets(node.attestationSubnets.stabilitySubnets) - - node.attestationSubnets = newAttestationSubnets - debug "Attestation subnets", - expiring_subnets = expiringSubnets, - current_epoch_subnets = - node.attestationSubnets.subscribedSubnets[1 - epochParity], - upcoming_subnets = node.attestationSubnets.subscribedSubnets[epochParity], - new_subnets = newSubnets, - epoch = wallSlot.epoch, - num_stability_subnets = node.attestationSubnets.stabilitySubnets.len - - block: - for expiringSubnet in expiringSubnets: - node.network.unsubscribe(getAttestationTopic(node.forkDigest, expiringSubnet)) - - node.installAttestationSubnetHandlers(newSubnets) - - if not node.config.subscribeAllSubnets: + if not node.config.subscribeAllSubnets and + stabilitySubnets != prevStabilitySubnets: # In subscribeAllSubnets mode, this only gets set once, at initial subnet # attestation handler creation, since they're all considered as stability # subnets in that case. - let stabilitySubnets = - getStabilitySubnets(node.attestationSubnets.stabilitySubnets) - if stabilitySubnets != prevStabilitySubnets: - node.updateStabilitySubnetMetadata(stabilitySubnets) + node.updateStabilitySubnetMetadata(stabilitySubnets) -proc getInitialAttestationSubnets(node: BeaconNode): set[uint8] = + stabilitySubnets + +proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = + static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 + doAssert not node.config.subscribeAllSubnets + + let prevSubscribedSubnets = node.attestationSubnets.subscribedSubnets + + for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: + if i in node.attestationSubnets.subscribedSubnets: + if wallSlot >= node.attestationSubnets.unsubscribeSlot[i]: + node.attestationSubnets.subscribedSubnets.excl i + else: + if wallSlot >= node.attestationSubnets.subscribeSlot[i]: + node.attestationSubnets.subscribedSubnets.incl i + + let + prevStabilitySubnets = + getStabilitySubnets(node.attestationSubnets.stabilitySubnets) + stabilitySubnets = + node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets) + + # Accounting specific to non-stability subnets + for expiringSubnet in + prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets: + node.attestationSubnets.subscribeSlot[expiringSubnet] = FAR_FUTURE_SLOT + + let + prevAllSubnets = prevSubscribedSubnets + prevStabilitySubnets + allSubnets = node.attestationSubnets.subscribedSubnets + stabilitySubnets + unsubscribedSubnets = prevAllSubnets - allSubnets + subscribedSubnets = allSubnets - prevAllSubnets + + for subnet in unsubscribedSubnets: + node.network.unsubscribe( + getAttestationTopic(node.forkDigest, subnet)) + + node.installAttestationSubnetHandlers(subscribedSubnets) + + debug "Attestation subnets", + expiringSubnets = + prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets, + subnets = node.attestationSubnets.subscribedSubnets, + newSubnets = + node.attestationSubnets.subscribedSubnets - prevSubscribedSubnets, + wallSlot, + wallEpoch = wallSlot.epoch, + num_stability_subnets = node.attestationSubnets.stabilitySubnets.len, + expiring_stability_subnets = prevStabilitySubnets - stabilitySubnets, + new_stability_subnets = stabilitySubnets - prevStabilitySubnets, + subscribedSubnets, + unsubscribedSubnets + +proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] = let wallEpoch = node.beaconClock.now().slotOrZero().epoch - validator_indices = toHashSet(node.getAttachedValidators()) + validator_indices = node.getAttachedValidators() template mergeAttestationSubnets(epoch: Epoch) = - for (subnetIndex, _) in get_committee_assignments( + for (subnetIndex, slot) in get_committee_assignments( node.chainDag.headState.data.data, epoch, validator_indices): - result.incl subnetIndex + if subnetIndex in result: + result[subnetIndex] = max(result[subnetIndex], slot + 1) + else: + result[subnetIndex] = slot + 1 # Either wallEpoch is 0, in which case it might be pre-genesis, but we only # care about the already-known first two epochs of attestations, or it's in @@ -484,10 +555,10 @@ proc getAttestationSubnetHandlers(node: BeaconNode) = # - Restarting the node with a presistent netkey # - When going from synced -> syncing -> synced state - template getAllAttestationSubnets(): set[uint8] = - var subnets: set[uint8] + template getAllAttestationSubnets(): Table[uint8, Slot] = + var subnets: Table[uint8, Slot] for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: - subnets.incl i + subnets[i] = FAR_FUTURE_SLOT subnets let @@ -504,23 +575,25 @@ proc getAttestationSubnetHandlers(node: BeaconNode) = node.attachedValidators.count) for i in 0 ..< node.attachedValidators.count: node.attestationSubnets.stabilitySubnets[i] = ( - subnet: rand(ATTESTATION_SUBNET_COUNT - 1).uint8, - expiration: wallEpoch + getStabilitySubnetLength()) + subnet: node.network.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).uint8, + expiration: wallEpoch + node.getStabilitySubnetLength()) initialStabilitySubnets.incl( node.attestationSubnets.stabilitySubnets[i].subnet) node.updateStabilitySubnetMetadata( if node.config.subscribeAllSubnets: - initialSubnets + {0'u8 .. (ATTESTATION_SUBNET_COUNT - 1)} else: node.attestationSubnets.stabilitySubnets.getStabilitySubnets) - # Sets the "current" and "future" attestation subnets. One of these gets - # replaced by get_attestation_subnet_changes() immediately. Symmetric so - # that it's robust to the exact timing of when cycleAttestationSubnets() - # first runs, by making that first (effective) swap a no-op. - node.attestationSubnets.subscribedSubnets[0] = initialSubnets - node.attestationSubnets.subscribedSubnets[1] = initialSubnets + for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: + if i in initialSubnets: + node.attestationSubnets.subscribedSubnets.incl i + node.attestationSubnets.unsubscribeSlot[i] = initialSubnets[i] + else: + node.attestationSubnets.subscribedSubnets.excl i + node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT + node.attestationSubnets.enabled = true debug "Initial attestation subnets subscribed", @@ -528,7 +601,7 @@ proc getAttestationSubnetHandlers(node: BeaconNode) = initialStabilitySubnets, wallEpoch node.installAttestationSubnetHandlers( - initialSubnets + initialStabilitySubnets) + node.attestationSubnets.subscribedSubnets + initialStabilitySubnets) proc addMessageHandlers(node: BeaconNode) = node.network.subscribe(node.topicBeaconBlocks, enableTopicMetrics = true) @@ -542,8 +615,6 @@ func getTopicSubscriptionEnabled(node: BeaconNode): bool = node.attestationSubnets.enabled proc removeMessageHandlers(node: BeaconNode) = - node.attestationSubnets.subscribedSubnets[0] = {} - node.attestationSubnets.subscribedSubnets[1] = {} node.attestationSubnets.enabled = false doAssert not node.getTopicSubscriptionEnabled() diff --git a/beacon_chain/rpc/validator_api.nim b/beacon_chain/rpc/validator_api.nim index 59801ccad..bfeb04ed8 100644 --- a/beacon_chain/rpc/validator_api.nim +++ b/beacon_chain/rpc/validator_api.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2020 Status Research & Development GmbH +# Copyright (c) 2018-2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -145,12 +145,21 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = let head = node.doChecksAndGetCurrentHead(epoch) epochRef = node.chainDag.getEpochRef(head, epoch) - let subnet = compute_subnet_for_attestation( - get_committee_count_per_slot(epochRef), slot, committee_index).uint8 - if subnet notin node.attestationSubnets.subscribedSubnets[0] and - subnet notin node.attestationSubnets.subscribedSubnets[1]: - node.network.subscribe(getAttestationTopic(node.forkDigest, subnet)) + subnet = compute_subnet_for_attestation( + get_committee_count_per_slot(epochRef), slot, committee_index).uint8 - # But it might only be in current - node.attestationSubnets.subscribedSubnets[0].incl subnet - node.attestationSubnets.subscribedSubnets[1].incl subnet + # Either subnet already subscribed or not. If not, subscribe. If it is, + # extend subscription. All one knows from the API combined with how far + # ahead one can check for attestation schedule is that it might be used + # for up to the end of next epoch. Therefore, arrange for subscriptions + # to last at least that long. + if subnet notin node.attestationSubnets.subscribedSubnets: + # When to subscribe. Since it's not clear when from the API it's first + # needed, do so immediately. + node.attestationSubnets.subscribeSlot[subnet] = + min(node.attestationSubnets.subscribeSlot[subnet], wallSlot) + + node.attestationSubnets.unsubscribeSlot[subnet] = + max( + compute_start_slot_at_epoch(epoch + 2), + node.attestationSubnets.unsubscribeSlot[subnet]) diff --git a/beacon_chain/spec/datatypes.nim b/beacon_chain/spec/datatypes.nim index 6f557fc64..609a46b03 100644 --- a/beacon_chain/spec/datatypes.nim +++ b/beacon_chain/spec/datatypes.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2020 Status Research & Development GmbH +# Copyright (c) 2018-2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -61,6 +61,7 @@ const # Not part of spec. Still useful, pending removing usage if appropriate. ZERO_HASH* = Eth2Digest() MAX_GRAFFITI_SIZE = 32 + FAR_FUTURE_SLOT* = (not 0'u64).Slot # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#configuration MAXIMUM_GOSSIP_CLOCK_DISPARITY* = 500.millis @@ -71,6 +72,9 @@ const DEPOSIT_CONTRACT_TREE_DEPTH* = 32 BASE_REWARDS_PER_EPOCH* = 4 + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#misc + ATTESTATION_SUBNET_COUNT* = 64 + # https://github.com/ethereum/eth2.0-specs/pull/2101 ATTESTATION_PRODUCTION_DIVISOR* = 3 ATTESTATION_ENTROPY_DIVISOR* = 12 @@ -451,9 +455,13 @@ type AttestationSubnets* = object enabled*: bool - nextCycleEpoch*: Epoch - subscribedSubnets*: array[2, set[uint8]] stabilitySubnets*: seq[tuple[subnet: uint8, expiration: Epoch]] + nextCycleEpoch*: Epoch + + # These encode states in per-subnet state machines + subscribedSubnets*: set[uint8] + subscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot] + unsubscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot] # This matches the mutable state of the Solidity deposit contract # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/solidity_deposit_contract/deposit_contract.sol diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 91ebabfd5..cb4b53f97 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -8,7 +8,7 @@ {.push raises: [Defect].} import - std/[strformat, sets, random], + std/[strformat, sets], ./datatypes, ./helpers, ./validator const @@ -19,9 +19,6 @@ const topicAttesterSlashingsSuffix* = "attester_slashing/ssz" topicAggregateAndProofsSuffix* = "beacon_aggregate_and_proof/ssz" - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#misc - ATTESTATION_SUBNET_COUNT* = 64 - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#eth2-network-interaction-domains MAX_CHUNK_SIZE* = 1 * 1024 * 1024 # bytes GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes @@ -107,61 +104,3 @@ func get_committee_assignments*( result.add( (compute_subnet_for_attestation(committees_per_slot, slot, idx).uint8, slot)) - -# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability -proc getStabilitySubnetLength*(): uint64 = - EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION + - rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64 - -proc get_attestation_subnet_changes*( - state: BeaconState, attachedValidators: openArray[ValidatorIndex], - prevAttestationSubnets: AttestationSubnets): - tuple[a: AttestationSubnets, b: set[uint8], c: set[uint8]] = - static: doAssert ATTESTATION_SUBNET_COUNT == 64 # Fits in a set[uint8] - - # Guaranteed equivalent to wallSlot by cycleAttestationSubnets(), especially - # since it'll try to run early in epochs, avoiding race conditions. - let epoch = state.slot.epoch - - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability - var - attestationSubnets = prevAttestationSubnets - prevStabilitySubnets: set[uint8] = {} - stabilitySet: set[uint8] = {} - for i in 0 ..< attestationSubnets.stabilitySubnets.len: - static: doAssert ATTESTATION_SUBNET_COUNT <= high(uint8) - prevStabilitySubnets.incl attestationSubnets.stabilitySubnets[i].subnet - - if epoch >= attestationSubnets.stabilitySubnets[i].expiration: - attestationSubnets.stabilitySubnets[i].subnet = - rand(ATTESTATION_SUBNET_COUNT - 1).uint8 - attestationSubnets.stabilitySubnets[i].expiration = - epoch + getStabilitySubnetLength() - - stabilitySet.incl attestationSubnets.stabilitySubnets[i].subnet - - var nextEpochSubnets: set[uint8] - for it in get_committee_assignments( - state, epoch + 1, attachedValidators.toHashSet): - nextEpochSubnets.incl it.subnetIndex.uint8 - - doAssert nextEpochSubnets.len <= attachedValidators.len - nextEpochSubnets.incl stabilitySet - - let - epochParity = epoch mod 2 - currentEpochSubnets = attestationSubnets.subscribedSubnets[1 - epochParity] - - expiringSubnets = - (prevStabilitySubnets + - attestationSubnets.subscribedSubnets[epochParity]) - - nextEpochSubnets - currentEpochSubnets - stabilitySet - newSubnets = - nextEpochSubnets - (currentEpochSubnets + prevStabilitySubnets) - - doAssert newSubnets.len <= attachedValidators.len + 1 - doAssert (expiringSubnets * currentEpochSubnets).len == 0 - doAssert newSubnets <= nextEpochSubnets - - attestationSubnets.subscribedSubnets[epochParity] = nextEpochSubnets - (attestationSubnets, expiringSubnets, newSubnets)