cycle attestation subnets every slot (#2240)

Cycle attestation subnets every slot
This commit is contained in:
tersec 2021-01-19 18:44:03 +01:00 committed by GitHub
parent 921fe5a68f
commit 55ecb61c3a
5 changed files with 165 additions and 137 deletions

View File

@ -1095,6 +1095,7 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
{"eth2": SSZ.encode(result.forkId), "attnets": SSZ.encode(result.metadata.attnets)},
rng)
result.discoveryEnabled = discovery
result.rng = rng
newSeq result.protocolStates, allProtocols.len
for proto in allProtocols:

View File

@ -7,8 +7,8 @@
import
# Standard library
std/[os, tables, strutils, strformat, times, math,
terminal, osproc, random],
std/[math, os, strformat, strutils, tables, times,
terminal, osproc],
system/ansi_c,
# Nimble packages
@ -19,7 +19,7 @@ import
eth/[keys, async_utils],
eth/db/[kvstore, kvstore_sqlite3],
eth/p2p/enode, eth/p2p/discoveryv5/[protocol, enr],
eth/p2p/enode, eth/p2p/discoveryv5/[protocol, enr, random2],
# Local modules
./rpc/[beacon_api, config_api, debug_api, event_api, nimbus_api, node_api,
@ -390,15 +390,58 @@ func getStabilitySubnets(stabilitySubnets: auto): set[uint8] =
for subnetInfo in stabilitySubnets:
result.incl subnetInfo.subnet
proc getAttachedValidators(node: BeaconNode): seq[ValidatorIndex] =
proc getAttachedValidators(node: BeaconNode): HashSet[ValidatorIndex] =
for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len:
if node.getAttachedValidator(
node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) != nil:
result.add validatorIndex.ValidatorIndex
result.incl validatorIndex.ValidatorIndex
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) =
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
doAssert not node.config.subscribeAllSubnets
proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) =
doAssert epoch >= 1
let attachedValidators = node.getAttachedValidators()
for (subnetIndex, slot) in get_committee_assignments(
node.chainDag.headState.data.data, epoch, attachedValidators):
node.attestationSubnets.unsubscribeSlot[subnetIndex] =
max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnetIndex])
if subnetIndex notin node.attestationSubnets.subscribedSubnets:
# A Pyrmont test validator didn't manage to subscribe in time with
# SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS being 32.
const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34
node.attestationSubnets.subscribeSlot[subnetIndex] =
# Queue upcoming subscription potentially earlier
# SLOTS_PER_EPOCH emulates one boundary condition of the per-epoch
# cycling mechanism timing buffers
min(
slot - min(slot.uint64, SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS),
node.attestationSubnets.subscribeSlot[subnetIndex])
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability
proc getStabilitySubnetLength(node: BeaconNode): uint64 =
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +
node.network.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64
proc updateStabilitySubnets(node: BeaconNode, slot: Slot): set[uint8] =
# Equivalent to wallSlot by cycleAttestationSubnets(), especially
# since it'll try to run early in epochs, avoiding race conditions.
static: doAssert ATTESTATION_SUBNET_COUNT <= high(uint8)
let epoch = slot.epoch
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability
for i in 0 ..< node.attestationSubnets.stabilitySubnets.len:
if epoch >= node.attestationSubnets.stabilitySubnets[i].expiration:
node.attestationSubnets.stabilitySubnets[i].subnet =
node.network.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).uint8
node.attestationSubnets.stabilitySubnets[i].expiration =
epoch + node.getStabilitySubnetLength()
result.incl node.attestationSubnets.stabilitySubnets[i].subnet
proc cycleAttestationSubnetsPerEpoch(
node: BeaconNode, wallSlot: Slot, prevStabilitySubnets: set[uint8]):
set[uint8] =
# Per-epoch portion of subnet cycling: updating stability subnets and
# calculating future attestation subnets.
# Only know RANDAO mix, which determines shuffling seed, one epoch in
# advance. When node.chainDag.headState.data.data.slot.epoch is ahead
@ -409,11 +452,7 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) =
debug "Requested attestation subnets too far in advance",
wallSlot,
stateSlot = node.chainDag.headState.data.data.slot
return
if node.attestationSubnets.nextCycleEpoch > wallSlot.epoch:
return
node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1
return prevStabilitySubnets
# This works so long as at least one block in an epoch provides a basis for
# calculating the shuffling for the next epoch. It will keep checking for a
@ -422,52 +461,84 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) =
# important to attest regardless, as those upcoming blocks will hit maximum
# attestations quickly and any individual attestation's likelihood of being
# selected is low.
let
epochParity = wallSlot.epoch mod 2
attachedValidators = node.getAttachedValidators()
if node.attestationSubnets.nextCycleEpoch <= wallSlot.epoch:
node.updateSubscriptionSchedule(wallSlot.epoch + 1)
node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1
let (newAttestationSubnets, expiringSubnets, newSubnets) =
get_attestation_subnet_changes(
node.chainDag.headState.data.data, attachedValidators,
node.attestationSubnets)
let stabilitySubnets = node.updateStabilitySubnets(wallSlot)
let prevStabilitySubnets =
getStabilitySubnets(node.attestationSubnets.stabilitySubnets)
node.attestationSubnets = newAttestationSubnets
debug "Attestation subnets",
expiring_subnets = expiringSubnets,
current_epoch_subnets =
node.attestationSubnets.subscribedSubnets[1 - epochParity],
upcoming_subnets = node.attestationSubnets.subscribedSubnets[epochParity],
new_subnets = newSubnets,
epoch = wallSlot.epoch,
num_stability_subnets = node.attestationSubnets.stabilitySubnets.len
block:
for expiringSubnet in expiringSubnets:
node.network.unsubscribe(getAttestationTopic(node.forkDigest, expiringSubnet))
node.installAttestationSubnetHandlers(newSubnets)
if not node.config.subscribeAllSubnets:
if not node.config.subscribeAllSubnets and
stabilitySubnets != prevStabilitySubnets:
# In subscribeAllSubnets mode, this only gets set once, at initial subnet
# attestation handler creation, since they're all considered as stability
# subnets in that case.
let stabilitySubnets =
getStabilitySubnets(node.attestationSubnets.stabilitySubnets)
if stabilitySubnets != prevStabilitySubnets:
node.updateStabilitySubnetMetadata(stabilitySubnets)
node.updateStabilitySubnetMetadata(stabilitySubnets)
proc getInitialAttestationSubnets(node: BeaconNode): set[uint8] =
stabilitySubnets
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) =
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
doAssert not node.config.subscribeAllSubnets
let prevSubscribedSubnets = node.attestationSubnets.subscribedSubnets
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
if i in node.attestationSubnets.subscribedSubnets:
if wallSlot >= node.attestationSubnets.unsubscribeSlot[i]:
node.attestationSubnets.subscribedSubnets.excl i
else:
if wallSlot >= node.attestationSubnets.subscribeSlot[i]:
node.attestationSubnets.subscribedSubnets.incl i
let
prevStabilitySubnets =
getStabilitySubnets(node.attestationSubnets.stabilitySubnets)
stabilitySubnets =
node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets)
# Accounting specific to non-stability subnets
for expiringSubnet in
prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets:
node.attestationSubnets.subscribeSlot[expiringSubnet] = FAR_FUTURE_SLOT
let
prevAllSubnets = prevSubscribedSubnets + prevStabilitySubnets
allSubnets = node.attestationSubnets.subscribedSubnets + stabilitySubnets
unsubscribedSubnets = prevAllSubnets - allSubnets
subscribedSubnets = allSubnets - prevAllSubnets
for subnet in unsubscribedSubnets:
node.network.unsubscribe(
getAttestationTopic(node.forkDigest, subnet))
node.installAttestationSubnetHandlers(subscribedSubnets)
debug "Attestation subnets",
expiringSubnets =
prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets,
subnets = node.attestationSubnets.subscribedSubnets,
newSubnets =
node.attestationSubnets.subscribedSubnets - prevSubscribedSubnets,
wallSlot,
wallEpoch = wallSlot.epoch,
num_stability_subnets = node.attestationSubnets.stabilitySubnets.len,
expiring_stability_subnets = prevStabilitySubnets - stabilitySubnets,
new_stability_subnets = stabilitySubnets - prevStabilitySubnets,
subscribedSubnets,
unsubscribedSubnets
proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] =
let
wallEpoch = node.beaconClock.now().slotOrZero().epoch
validator_indices = toHashSet(node.getAttachedValidators())
validator_indices = node.getAttachedValidators()
template mergeAttestationSubnets(epoch: Epoch) =
for (subnetIndex, _) in get_committee_assignments(
for (subnetIndex, slot) in get_committee_assignments(
node.chainDag.headState.data.data, epoch, validator_indices):
result.incl subnetIndex
if subnetIndex in result:
result[subnetIndex] = max(result[subnetIndex], slot + 1)
else:
result[subnetIndex] = slot + 1
# Either wallEpoch is 0, in which case it might be pre-genesis, but we only
# care about the already-known first two epochs of attestations, or it's in
@ -484,10 +555,10 @@ proc getAttestationSubnetHandlers(node: BeaconNode) =
# - Restarting the node with a presistent netkey
# - When going from synced -> syncing -> synced state
template getAllAttestationSubnets(): set[uint8] =
var subnets: set[uint8]
template getAllAttestationSubnets(): Table[uint8, Slot] =
var subnets: Table[uint8, Slot]
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
subnets.incl i
subnets[i] = FAR_FUTURE_SLOT
subnets
let
@ -504,23 +575,25 @@ proc getAttestationSubnetHandlers(node: BeaconNode) =
node.attachedValidators.count)
for i in 0 ..< node.attachedValidators.count:
node.attestationSubnets.stabilitySubnets[i] = (
subnet: rand(ATTESTATION_SUBNET_COUNT - 1).uint8,
expiration: wallEpoch + getStabilitySubnetLength())
subnet: node.network.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).uint8,
expiration: wallEpoch + node.getStabilitySubnetLength())
initialStabilitySubnets.incl(
node.attestationSubnets.stabilitySubnets[i].subnet)
node.updateStabilitySubnetMetadata(
if node.config.subscribeAllSubnets:
initialSubnets
{0'u8 .. (ATTESTATION_SUBNET_COUNT - 1)}
else:
node.attestationSubnets.stabilitySubnets.getStabilitySubnets)
# Sets the "current" and "future" attestation subnets. One of these gets
# replaced by get_attestation_subnet_changes() immediately. Symmetric so
# that it's robust to the exact timing of when cycleAttestationSubnets()
# first runs, by making that first (effective) swap a no-op.
node.attestationSubnets.subscribedSubnets[0] = initialSubnets
node.attestationSubnets.subscribedSubnets[1] = initialSubnets
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
if i in initialSubnets:
node.attestationSubnets.subscribedSubnets.incl i
node.attestationSubnets.unsubscribeSlot[i] = initialSubnets[i]
else:
node.attestationSubnets.subscribedSubnets.excl i
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
node.attestationSubnets.enabled = true
debug "Initial attestation subnets subscribed",
@ -528,7 +601,7 @@ proc getAttestationSubnetHandlers(node: BeaconNode) =
initialStabilitySubnets,
wallEpoch
node.installAttestationSubnetHandlers(
initialSubnets + initialStabilitySubnets)
node.attestationSubnets.subscribedSubnets + initialStabilitySubnets)
proc addMessageHandlers(node: BeaconNode) =
node.network.subscribe(node.topicBeaconBlocks, enableTopicMetrics = true)
@ -542,8 +615,6 @@ func getTopicSubscriptionEnabled(node: BeaconNode): bool =
node.attestationSubnets.enabled
proc removeMessageHandlers(node: BeaconNode) =
node.attestationSubnets.subscribedSubnets[0] = {}
node.attestationSubnets.subscribedSubnets[1] = {}
node.attestationSubnets.enabled = false
doAssert not node.getTopicSubscriptionEnabled()

View File

@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2020 Status Research & Development GmbH
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -145,12 +145,21 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
let
head = node.doChecksAndGetCurrentHead(epoch)
epochRef = node.chainDag.getEpochRef(head, epoch)
let subnet = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), slot, committee_index).uint8
if subnet notin node.attestationSubnets.subscribedSubnets[0] and
subnet notin node.attestationSubnets.subscribedSubnets[1]:
node.network.subscribe(getAttestationTopic(node.forkDigest, subnet))
subnet = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), slot, committee_index).uint8
# But it might only be in current
node.attestationSubnets.subscribedSubnets[0].incl subnet
node.attestationSubnets.subscribedSubnets[1].incl subnet
# Either subnet already subscribed or not. If not, subscribe. If it is,
# extend subscription. All one knows from the API combined with how far
# ahead one can check for attestation schedule is that it might be used
# for up to the end of next epoch. Therefore, arrange for subscriptions
# to last at least that long.
if subnet notin node.attestationSubnets.subscribedSubnets:
# When to subscribe. Since it's not clear when from the API it's first
# needed, do so immediately.
node.attestationSubnets.subscribeSlot[subnet] =
min(node.attestationSubnets.subscribeSlot[subnet], wallSlot)
node.attestationSubnets.unsubscribeSlot[subnet] =
max(
compute_start_slot_at_epoch(epoch + 2),
node.attestationSubnets.unsubscribeSlot[subnet])

View File

@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2020 Status Research & Development GmbH
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -61,6 +61,7 @@ const
# Not part of spec. Still useful, pending removing usage if appropriate.
ZERO_HASH* = Eth2Digest()
MAX_GRAFFITI_SIZE = 32
FAR_FUTURE_SLOT* = (not 0'u64).Slot
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#configuration
MAXIMUM_GOSSIP_CLOCK_DISPARITY* = 500.millis
@ -71,6 +72,9 @@ const
DEPOSIT_CONTRACT_TREE_DEPTH* = 32
BASE_REWARDS_PER_EPOCH* = 4
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#misc
ATTESTATION_SUBNET_COUNT* = 64
# https://github.com/ethereum/eth2.0-specs/pull/2101
ATTESTATION_PRODUCTION_DIVISOR* = 3
ATTESTATION_ENTROPY_DIVISOR* = 12
@ -451,9 +455,13 @@ type
AttestationSubnets* = object
enabled*: bool
nextCycleEpoch*: Epoch
subscribedSubnets*: array[2, set[uint8]]
stabilitySubnets*: seq[tuple[subnet: uint8, expiration: Epoch]]
nextCycleEpoch*: Epoch
# These encode states in per-subnet state machines
subscribedSubnets*: set[uint8]
subscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot]
unsubscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot]
# This matches the mutable state of the Solidity deposit contract
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/solidity_deposit_contract/deposit_contract.sol

View File

@ -8,7 +8,7 @@
{.push raises: [Defect].}
import
std/[strformat, sets, random],
std/[strformat, sets],
./datatypes, ./helpers, ./validator
const
@ -19,9 +19,6 @@ const
topicAttesterSlashingsSuffix* = "attester_slashing/ssz"
topicAggregateAndProofsSuffix* = "beacon_aggregate_and_proof/ssz"
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#misc
ATTESTATION_SUBNET_COUNT* = 64
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#eth2-network-interaction-domains
MAX_CHUNK_SIZE* = 1 * 1024 * 1024 # bytes
GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
@ -107,61 +104,3 @@ func get_committee_assignments*(
result.add(
(compute_subnet_for_attestation(committees_per_slot, slot, idx).uint8,
slot))
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability
proc getStabilitySubnetLength*(): uint64 =
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +
rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64
proc get_attestation_subnet_changes*(
state: BeaconState, attachedValidators: openArray[ValidatorIndex],
prevAttestationSubnets: AttestationSubnets):
tuple[a: AttestationSubnets, b: set[uint8], c: set[uint8]] =
static: doAssert ATTESTATION_SUBNET_COUNT == 64 # Fits in a set[uint8]
# Guaranteed equivalent to wallSlot by cycleAttestationSubnets(), especially
# since it'll try to run early in epochs, avoiding race conditions.
let epoch = state.slot.epoch
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability
var
attestationSubnets = prevAttestationSubnets
prevStabilitySubnets: set[uint8] = {}
stabilitySet: set[uint8] = {}
for i in 0 ..< attestationSubnets.stabilitySubnets.len:
static: doAssert ATTESTATION_SUBNET_COUNT <= high(uint8)
prevStabilitySubnets.incl attestationSubnets.stabilitySubnets[i].subnet
if epoch >= attestationSubnets.stabilitySubnets[i].expiration:
attestationSubnets.stabilitySubnets[i].subnet =
rand(ATTESTATION_SUBNET_COUNT - 1).uint8
attestationSubnets.stabilitySubnets[i].expiration =
epoch + getStabilitySubnetLength()
stabilitySet.incl attestationSubnets.stabilitySubnets[i].subnet
var nextEpochSubnets: set[uint8]
for it in get_committee_assignments(
state, epoch + 1, attachedValidators.toHashSet):
nextEpochSubnets.incl it.subnetIndex.uint8
doAssert nextEpochSubnets.len <= attachedValidators.len
nextEpochSubnets.incl stabilitySet
let
epochParity = epoch mod 2
currentEpochSubnets = attestationSubnets.subscribedSubnets[1 - epochParity]
expiringSubnets =
(prevStabilitySubnets +
attestationSubnets.subscribedSubnets[epochParity]) -
nextEpochSubnets - currentEpochSubnets - stabilitySet
newSubnets =
nextEpochSubnets - (currentEpochSubnets + prevStabilitySubnets)
doAssert newSubnets.len <= attachedValidators.len + 1
doAssert (expiringSubnets * currentEpochSubnets).len == 0
doAssert newSubnets <= nextEpochSubnets
attestationSubnets.subscribedSubnets[epochParity] = nextEpochSubnets
(attestationSubnets, expiringSubnets, newSubnets)