make subnet cycling more robust; use one stability subnet/validator; explicitly represent gossip enabled/disabled (#2201)

* make subnet cycling more robust; use one stability subnet/validator; explicitly represent gossip enabled/disabled

* fix asymmetry in _snappy being used for subscriptions but not unsubscriptions

* remove redundant comment

* minimal RPC and VC support for infoming BN of subnets

* create and verify slot signatures in RPC interface and VC

* loosen old slot check

* because Slot + uint64 works but uint64 + Slot doesn't

* document assumptions for head state use; don't clear stability subnets; guard against VC not having checked an epoch ahead, fixing a crash; clarify unsigned comparison

* revert unsub fix
This commit is contained in:
tersec 2020-12-22 10:05:36 +01:00 committed by GitHub
parent be74df70e0
commit afbaa36ef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 141 additions and 43 deletions

View File

@ -359,11 +359,12 @@ proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8])
await allFutures(attestationSubscriptions) await allFutures(attestationSubscriptions)
proc updateStabilitySubnetMetadata(node: BeaconNode, stabilitySubnet: uint64) = proc updateStabilitySubnetMetadata(
node: BeaconNode, stabilitySubnets: set[uint8]) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata
node.network.metadata.seq_number += 1 node.network.metadata.seq_number += 1
for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT: for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
node.network.metadata.attnets[subnet] = (subnet == stabilitySubnet) node.network.metadata.attnets[subnet] = (subnet in stabilitySubnets)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#attestation-subnet-bitfield # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#attestation-subnet-bitfield
@ -374,12 +375,38 @@ proc updateStabilitySubnetMetadata(node: BeaconNode, stabilitySubnet: uint64) =
# be the correct one and the ENR will not increase in size. # be the correct one and the ENR will not increase in size.
warn "Failed to update record on subnet cycle", error = res.error warn "Failed to update record on subnet cycle", error = res.error
else: else:
debug "Stability subnet changed, updated ENR attnets", stabilitySubnet debug "Stability subnets changed; updated ENR attnets", stabilitySubnets
proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} = func getStabilitySubnets(stabilitySubnets: auto): set[uint8] =
for subnetInfo in stabilitySubnets:
result.incl subnetInfo.subnet
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
let epochParity = slot.epoch mod 2 # Only know RANDAO mix, which determines shuffling seed, one epoch in
# advance. When node.chainDag.headState.data.data.slot.epoch is ahead
# of wallSlot, the clock's just incorrect. If the state slot's behind
# wallSlot, it would have to look more than MIN_SEED_LOOKAHEAD epochs
# ahead to compute the shuffling determining the beacon committees.
if node.chainDag.headState.data.data.slot.epoch != wallSlot.epoch:
debug "Requested attestation subnets too far in advance",
wallSlot,
stateSlot = node.chainDag.headState.data.data.slot
return
if node.attestationSubnets.nextCycleEpoch > wallSlot.epoch:
return
node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1
# This works so long as at least one block in an epoch provides a basis for
# calculating the shuffling for the next epoch. It will keep checking for a
# block, each slot, until a block comes in, even if the first few blocks in
# an epoch are missing. If a whole epoch without blocks occurs, it's not as
# important to attest regardless, as those upcoming blocks will hit maximum
# attestations quickly and any individual attestation's likelihood of being
# selected is low.
let epochParity = wallSlot.epoch mod 2
var attachedValidators: seq[ValidatorIndex] var attachedValidators: seq[ValidatorIndex]
for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len: for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len:
if node.getAttachedValidator( if node.getAttachedValidator(
@ -392,9 +419,10 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} =
let (newAttestationSubnets, expiringSubnets, newSubnets) = let (newAttestationSubnets, expiringSubnets, newSubnets) =
get_attestation_subnet_changes( get_attestation_subnet_changes(
node.chainDag.headState.data.data, attachedValidators, node.chainDag.headState.data.data, attachedValidators,
node.attestationSubnets, slot.epoch) node.attestationSubnets)
let prevStabilitySubnet = node.attestationSubnets.stabilitySubnet let prevStabilitySubnets =
getStabilitySubnets(node.attestationSubnets.stabilitySubnets)
node.attestationSubnets = newAttestationSubnets node.attestationSubnets = newAttestationSubnets
debug "Attestation subnets", debug "Attestation subnets",
@ -403,9 +431,8 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} =
node.attestationSubnets.subscribedSubnets[1 - epochParity], node.attestationSubnets.subscribedSubnets[1 - epochParity],
upcoming_subnets = node.attestationSubnets.subscribedSubnets[epochParity], upcoming_subnets = node.attestationSubnets.subscribedSubnets[epochParity],
new_subnets = newSubnets, new_subnets = newSubnets,
stability_subnet = node.attestationSubnets.stabilitySubnet, epoch = wallSlot.epoch,
stability_subnet_expiration_epoch = num_stability_subnets = node.attestationSubnets.stabilitySubnets.len
node.attestationSubnets.stabilitySubnetExpirationEpoch
block: block:
var unsubscriptions: seq[Future[void]] = @[] var unsubscriptions: seq[Future[void]] = @[]
@ -417,9 +444,10 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} =
await node.installAttestationSubnetHandlers(newSubnets) await node.installAttestationSubnetHandlers(newSubnets)
let stabilitySubnet = node.attestationSubnets.stabilitySubnet let stabilitySubnets =
if stabilitySubnet != prevStabilitySubnet: getStabilitySubnets(node.attestationSubnets.stabilitySubnets)
node.updateStabilitySubnetMetadata(stabilitySubnet) if stabilitySubnets != prevStabilitySubnets:
node.updateStabilitySubnetMetadata(stabilitySubnets)
proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] = proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] =
var initialSubnets: set[uint8] var initialSubnets: set[uint8]
@ -432,17 +460,27 @@ proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] =
# - Restarting the node with a presistent netkey # - Restarting the node with a presistent netkey
# - When going from synced -> syncing -> synced state # - When going from synced -> syncing -> synced state
let wallEpoch = node.beaconClock.now().slotOrZero().epoch let wallEpoch = node.beaconClock.now().slotOrZero().epoch
node.attestationSubnets.stabilitySubnet = rand(ATTESTATION_SUBNET_COUNT - 1).uint64 node.attestationSubnets.stabilitySubnets.setLen(
node.attestationSubnets.stabilitySubnetExpirationEpoch = node.attachedValidators.count)
wallEpoch + getStabilitySubnetLength() for i in 0 ..< node.attachedValidators.count:
node.attestationSubnets.stabilitySubnets[i] = (
subnet: rand(ATTESTATION_SUBNET_COUNT - 1).uint8,
expiration: wallEpoch + getStabilitySubnetLength())
node.updateStabilitySubnetMetadata(node.attestationSubnets.stabilitySubnet) node.updateStabilitySubnetMetadata(
node.attestationSubnets.stabilitySubnets.getStabilitySubnets)
# Sets the "current" and "future" attestation subnets. One of these gets # Sets the "current" and "future" attestation subnets. One of these gets
# replaced by get_attestation_subnet_changes() immediately. # replaced by get_attestation_subnet_changes() immediately. Symmetric so
# that it's robust to the exact timing of when cycleAttestationSubnets()
# first runs, by making that first (effective) swap a no-op.
node.attestationSubnets.subscribedSubnets[0] = initialSubnets node.attestationSubnets.subscribedSubnets[0] = initialSubnets
node.attestationSubnets.subscribedSubnets[1] = initialSubnets node.attestationSubnets.subscribedSubnets[1] = initialSubnets
node.attestationSubnets.enabled = true
debug "Initial attestation subnets subscribed",
initialSubnets,
wallEpoch
node.installAttestationSubnetHandlers(initialSubnets) node.installAttestationSubnetHandlers(initialSubnets)
proc addMessageHandlers(node: BeaconNode): Future[void] = proc addMessageHandlers(node: BeaconNode): Future[void] =
@ -457,12 +495,12 @@ proc addMessageHandlers(node: BeaconNode): Future[void] =
) )
func getTopicSubscriptionEnabled(node: BeaconNode): bool = func getTopicSubscriptionEnabled(node: BeaconNode): bool =
node.attestationSubnets.subscribedSubnets[0].len + node.attestationSubnets.enabled
node.attestationSubnets.subscribedSubnets[1].len > 0
proc removeMessageHandlers(node: BeaconNode): Future[void] = proc removeMessageHandlers(node: BeaconNode): Future[void] =
node.attestationSubnets.subscribedSubnets[0] = {} node.attestationSubnets.subscribedSubnets[0] = {}
node.attestationSubnets.subscribedSubnets[1] = {} node.attestationSubnets.subscribedSubnets[1] = {}
node.attestationSubnets.enabled = false
doAssert not node.getTopicSubscriptionEnabled() doAssert not node.getTopicSubscriptionEnabled()
var unsubscriptions = mapIt( var unsubscriptions = mapIt(
@ -526,8 +564,9 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
syncQueueLen syncQueueLen
await node.removeMessageHandlers() await node.removeMessageHandlers()
# Subscription or unsubscription might have occurred; recheck # Subscription or unsubscription might have occurred; recheck.
if slot.isEpoch and node.getTopicSubscriptionEnabled: if node.getTopicSubscriptionEnabled:
# This exits early all but one call each epoch.
await node.cycleAttestationSubnets(slot) await node.cycleAttestationSubnets(slot)
proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot): Future[void] = proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot): Future[void] =

View File

@ -87,7 +87,6 @@ proc getValidatorDutiesForEpoch(vc: ValidatorClient, epoch: Epoch) {.gcsafe, asy
vc.attestationsForEpoch.clear() vc.attestationsForEpoch.clear()
await getAttesterDutiesForEpoch(epoch) await getAttesterDutiesForEpoch(epoch)
# obtain the attestation duties this VC should do during the next epoch # obtain the attestation duties this VC should do during the next epoch
# TODO currently we aren't making use of this but perhaps we should
await getAttesterDutiesForEpoch(epoch + 1) await getAttesterDutiesForEpoch(epoch + 1)
# for now we will get the fork each time we update the validator duties for each epoch # for now we will get the fork each time we update the validator duties for each epoch
@ -129,6 +128,9 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a
# 1 slot earlier because there are a few back-and-forth requests which # 1 slot earlier because there are a few back-and-forth requests which
# could take up time for attesting... Perhaps this should be called more # could take up time for attesting... Perhaps this should be called more
# than once per epoch because of forks & other events... # than once per epoch because of forks & other events...
#
# calling it before epoch n starts means one can't ensure knowing about
# epoch n+1.
if slot.isEpoch: if slot.isEpoch:
await getValidatorDutiesForEpoch(vc, epoch) await getValidatorDutiesForEpoch(vc, epoch)
@ -264,6 +266,16 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a
# need similar amounts of memory. # need similar amounts of memory.
GC_fullCollect() GC_fullCollect()
if (slot - 2).isEpoch and (slot.epoch + 1) in vc.attestationsForEpoch:
for slot, attesterDuties in vc.attestationsForEpoch[slot.epoch + 1].pairs:
for ad in attesterDuties:
let
validator = vc.attachedValidators.validators[ad.public_key]
sig = await validator.getSlotSig(
vc.fork, vc.beaconGenesis.genesis_validators_root, slot)
discard await vc.client.post_v1_validator_beacon_committee_subscriptions(
ad.committee_index, ad.slot, true, ad.public_key, sig)
addTimer(nextSlotStart) do (p: pointer): addTimer(nextSlotStart) do (p: pointer):
asyncCheck vc.onSlotStart(slot, nextSlot) asyncCheck vc.onSlotStart(slot, nextSlot)

View File

@ -15,7 +15,7 @@ import
chronicles, chronicles,
# Local modules # Local modules
../spec/[datatypes, digest, crypto, helpers], ../spec/[datatypes, digest, crypto, helpers, network, signatures],
../spec/eth2_apis/callsigs_types, ../spec/eth2_apis/callsigs_types,
../block_pools/[chain_dag, spec_cache], ../ssz/merkleization, ../block_pools/[chain_dag, spec_cache], ../ssz/merkleization,
../beacon_node_common, ../beacon_node_types, ../attestation_pool, ../beacon_node_common, ../beacon_node_types, ../attestation_pool,
@ -115,5 +115,39 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
rpcServer.rpc("post_v1_validator_beacon_committee_subscriptions") do ( rpcServer.rpc("post_v1_validator_beacon_committee_subscriptions") do (
committee_index: CommitteeIndex, slot: Slot, aggregator: bool, committee_index: CommitteeIndex, slot: Slot, aggregator: bool,
validator_pubkey: ValidatorPubKey, slot_signature: ValidatorSig) -> bool: validator_pubkey: ValidatorPubKey, slot_signature: ValidatorSig) -> bool:
debug "post_v1_validator_beacon_committee_subscriptions" debug "post_v1_validator_beacon_committee_subscriptions",
raise newException(CatchableError, "Not implemented") committee_index, slot
if committee_index.uint64 >= ATTESTATION_SUBNET_COUNT.uint64:
raise newException(CatchableError,
"Invalid committee index")
if node.syncManager.inProgress:
raise newException(CatchableError,
"Beacon node is currently syncing and not serving request on that endpoint")
let wallSlot = node.beaconClock.now.slotOrZero
if wallSlot > slot + 1:
raise newException(CatchableError,
"Past slot requested")
let epoch = slot.epoch
if epoch >= wallSlot.epoch and epoch - wallSlot.epoch > 1:
raise newException(CatchableError,
"Slot requested not in current or next wall-slot epoch")
if not verify_slot_signature(
node.chainDag.headState.data.data.fork,
node.chainDag.headState.data.data.genesis_validators_root,
slot, validator_pubkey, slot_signature):
raise newException(CatchableError,
"Invalid slot signature")
let subnet = committee_index.uint8
if subnet notin node.attestationSubnets.subscribedSubnets[0] and
subnet notin node.attestationSubnets.subscribedSubnets[1]:
await node.network.subscribe(getAttestationTopic(
node.forkDigest, subnet))
# But it might only be in current
node.attestationSubnets.subscribedSubnets[0].incl subnet
node.attestationSubnets.subscribedSubnets[1].incl subnet

View File

@ -457,9 +457,10 @@ type
beacon_proposer_indices*: Table[Slot, Option[ValidatorIndex]] beacon_proposer_indices*: Table[Slot, Option[ValidatorIndex]]
AttestationSubnets* = object AttestationSubnets* = object
enabled*: bool
nextCycleEpoch*: Epoch
subscribedSubnets*: array[2, set[uint8]] subscribedSubnets*: array[2, set[uint8]]
stabilitySubnet*: uint64 stabilitySubnets*: seq[tuple[subnet: uint8, expiration: Epoch]]
stabilitySubnetExpirationEpoch*: Epoch
# This matches the mutable state of the Solidity deposit contract # This matches the mutable state of the Solidity deposit contract
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/solidity_deposit_contract/deposit_contract.sol # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/solidity_deposit_contract/deposit_contract.sol

View File

@ -124,42 +124,54 @@ proc getStabilitySubnetLength*(): uint64 =
proc get_attestation_subnet_changes*( proc get_attestation_subnet_changes*(
state: BeaconState, attachedValidators: openArray[ValidatorIndex], state: BeaconState, attachedValidators: openArray[ValidatorIndex],
prevAttestationSubnets: AttestationSubnets, epoch: Epoch): prevAttestationSubnets: AttestationSubnets):
tuple[a: AttestationSubnets, b: set[uint8], c: set[uint8]] = tuple[a: AttestationSubnets, b: set[uint8], c: set[uint8]] =
static: doAssert ATTESTATION_SUBNET_COUNT == 64 # Fits in a set[uint8] static: doAssert ATTESTATION_SUBNET_COUNT == 64 # Fits in a set[uint8]
doAssert attachedValidators.len > 0 doAssert attachedValidators.len > 0
var attestationSubnets = prevAttestationSubnets # Guaranteed equivalent to wallSlot by cycleAttestationSubnets(), especially
# since it'll try to run early in epochs, avoiding race conditions.
let epoch = state.slot.epoch
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability
let prevStabilitySubnet = {attestationSubnets.stabilitySubnet.uint8} var
if epoch >= attestationSubnets.stabilitySubnetExpirationEpoch: attestationSubnets = prevAttestationSubnets
attestationSubnets.stabilitySubnet = prevStabilitySubnets: set[uint8] = {}
rand(ATTESTATION_SUBNET_COUNT - 1).uint64 stabilitySet: set[uint8] = {}
attestationSubnets.stabilitySubnetExpirationEpoch = for i in 0 ..< attestationSubnets.stabilitySubnets.len:
static: doAssert ATTESTATION_SUBNET_COUNT <= high(uint8)
prevStabilitySubnets.incl attestationSubnets.stabilitySubnets[i].subnet
if epoch >= attestationSubnets.stabilitySubnets[i].expiration:
attestationSubnets.stabilitySubnets[i].subnet =
rand(ATTESTATION_SUBNET_COUNT - 1).uint8
attestationSubnets.stabilitySubnets[i].expiration =
epoch + getStabilitySubnetLength() epoch + getStabilitySubnetLength()
stabilitySet.incl attestationSubnets.stabilitySubnets[i].subnet
var nextEpochSubnets: set[uint8] var nextEpochSubnets: set[uint8]
for it in get_committee_assignments( for it in get_committee_assignments(
state, state.slot.epoch + 1, attachedValidators.toHashSet): state, epoch + 1, attachedValidators.toHashSet):
nextEpochSubnets.incl it.subnetIndex.uint8 nextEpochSubnets.incl it.subnetIndex.uint8
doAssert nextEpochSubnets.len <= attachedValidators.len doAssert nextEpochSubnets.len <= attachedValidators.len
nextEpochSubnets.incl stabilitySet
let let
epochParity = epoch mod 2 epochParity = epoch mod 2
stabilitySet = {attestationSubnets.stabilitySubnet.uint8}
currentEpochSubnets = attestationSubnets.subscribedSubnets[1 - epochParity] currentEpochSubnets = attestationSubnets.subscribedSubnets[1 - epochParity]
expiringSubnets = expiringSubnets =
(prevStabilitySubnet + (prevStabilitySubnets +
attestationSubnets.subscribedSubnets[epochParity]) - attestationSubnets.subscribedSubnets[epochParity]) -
nextEpochSubnets - currentEpochSubnets - stabilitySet nextEpochSubnets - currentEpochSubnets - stabilitySet
newSubnets = newSubnets =
(nextEpochSubnets + stabilitySet) - nextEpochSubnets - (currentEpochSubnets + prevStabilitySubnets)
(currentEpochSubnets + prevStabilitySubnet)
doAssert newSubnets.len <= attachedValidators.len + 1 doAssert newSubnets.len <= attachedValidators.len + 1
doAssert (expiringSubnets * currentEpochSubnets).len == 0
doAssert newSubnets <= nextEpochSubnets
attestationSubnets.subscribedSubnets[epochParity] = newSubnets attestationSubnets.subscribedSubnets[epochParity] = nextEpochSubnets
(attestationSubnets, expiringSubnets, newSubnets) (attestationSubnets, expiringSubnets, newSubnets)