initial dynamic subscribe/unsubscribe for attestations to/from subnets (#1462)

* initial dynamic subscribe/unsubscribe for attestations to/from subnets

* implement random stability subnet and clean up

* switch from HashSet[uint64] to set[uint8]

* refactor subnet logic out from beacon_node and actual (un)subscribing

* only try to subscribe to marginally different subnets

* add assertions

* maintain ENR subnets

* assert that beacon_node and eth2_network have consistent view of subscribed subnets

* disable actual cycling
This commit is contained in:
tersec 2020-08-12 17:48:31 +00:00 committed by GitHub
parent af3355e0f8
commit ab34584f23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 156 additions and 11 deletions

View File

@ -8,6 +8,7 @@
import
# Standard library
std/[algorithm, os, tables, strutils, sequtils, times, math, terminal],
std/random,
# Nimble packages
stew/[objects, byteutils, endians2], stew/shims/macros,
@ -392,6 +393,82 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
# finalization occurs every slot, to 4 slots vs scheduledSlot.
doAssert finalizedEpoch + 4 >= epoch
proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) =
proc attestationHandler(attestation: Attestation) =
# Avoid double-counting attestation-topic attestations on shared codepath
# when they're reflected through beacon blocks
beacon_attestations_received.inc()
beacon_attestation_received_seconds_from_slot_start.observe(
node.beaconClock.now.int64 - attestation.data.slot.toBeaconTime.int64)
node.onAttestation(attestation)
var attestationSubscriptions: seq[Future[void]] = @[]
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#attestations-and-aggregation
for subnet in subnets:
attestationSubscriptions.add(node.network.subscribe(
getAttestationTopic(node.forkDigest, subnet),
attestationHandler,
))
waitFor allFutures(attestationSubscriptions)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#metadata
node.network.metadata.seq_number += 1
for subnet in subnets:
node.network.metadata.attnets[subnet] = true
proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) =
let epochParity = slot.epoch mod 2
var attachedValidators: seq[ValidatorIndex]
for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len:
if node.getAttachedValidator(
node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) != nil:
attachedValidators.add validatorIndex.ValidatorIndex
let (newAttestationSubnets, expiringSubnets, newSubnets) =
get_attestation_subnet_changes(
node.chainDag.headState.data.data, attachedValidators,
node.attestationSubnets, slot.epoch)
node.attestationSubnets = newAttestationSubnets
debug "Attestation subnets",
expiring_subnets = expiringSubnets,
current_epoch_subnets =
node.attestationSubnets.subscribedSubnets[1 - epochParity],
upcoming_subnets = node.attestationSubnets.subscribedSubnets[epochParity],
new_subnets = newSubnets,
stability_subnet = node.attestationSubnets.stabilitySubnet,
stability_subnet_expiration_epoch =
node.attestationSubnets.stabilitySubnetExpirationEpoch
block:
var unsubscriptions: seq[Future[void]] = @[]
for expiringSubnet in expiringSubnets:
unsubscriptions.add(node.network.unsubscribe(
getAttestationTopic(node.forkDigest, expiringSubnet)))
waitFor allFutures(unsubscriptions)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#metadata
# The race condition window is smaller by placing the fast, local, and
# synchronous operation after a variable-latency, asynchronous action.
node.network.metadata.seq_number += 1
for expiringSubnet in expiringSubnets:
node.network.metadata.attnets[expiringSubnet] = false
node.installAttestationSubnetHandlers(newSubnets)
block:
let subscribed_subnets =
node.attestationSubnets.subscribedSubnets[0] +
node.attestationSubnets.subscribedSubnets[1] +
{node.attestationSubnets.stabilitySubnet.uint8}
for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
doAssert node.network.metadata.attnets[subnet] ==
(subnet in subscribed_subnets)
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
@ -461,6 +538,10 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
if node.config.verifyFinalization:
verifyFinalization(node, scheduledSlot)
when false:
if slot.isEpoch:
node.cycleAttestationSubnets(slot)
if slot > lastSlot + SLOTS_PER_EPOCH:
# We've fallen behind more than an epoch - there's nothing clever we can
# do here really, except skip all the work and try again later.
@ -776,29 +857,39 @@ proc installAttestationHandlers(node: BeaconNode) =
return false
node.attestationPool.isValidAggregatedAttestation(signedAggregateAndProof, slot)
var attestationSubscriptions: seq[Future[void]] = @[]
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#attestations-and-aggregation
# These validators stay around the whole time, regardless of which specific
# subnets are subscribed to during any given epoch.
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
closureScope:
let ci = it
attestationSubscriptions.add(node.network.subscribe(
node.network.addValidator(
getAttestationTopic(node.forkDigest, ci),
attestationHandler,
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): bool =
attestationValidator(attestation, ci)
))
)
attestationSubscriptions.add(node.network.subscribe(
var initialSubnets: set[uint8]
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
initialSubnets.incl i
node.installAttestationSubnetHandlers(initialSubnets)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#phase-0-attestation-subnet-stability
node.attestationSubnets.stabilitySubnet = rand(ATTESTATION_SUBNET_COUNT - 1).uint64
node.attestationSubnets.stabilitySubnetExpirationEpoch =
GENESIS_EPOCH + getStabilitySubnetLength()
# Relative to epoch 0, this sets the "current" attestation subnets.
node.attestationSubnets.subscribedSubnets[1 - (GENESIS_EPOCH mod 2)] =
initialSubnets
waitFor node.network.subscribe(
getAggregateAndProofsTopic(node.forkDigest),
proc(signedAggregateAndProof: SignedAggregateAndProof) =
attestationHandler(signedAggregateAndProof.message.aggregate),
proc(signedAggregateAndProof: SignedAggregateAndProof): bool =
aggregatedAttestationValidator(signedAggregateAndProof)
))
waitFor allFutures(attestationSubscriptions)
aggregatedAttestationValidator(signedAggregateAndProof))
proc stop*(node: BeaconNode) =
status = BeaconNodeStatus.Stopping

View File

@ -54,6 +54,7 @@ type
blockProcessingLoop*: Future[void]
onSecondLoop*: Future[void]
genesisSnapshotContent*: string
attestationSubnets*: AttestationSubnets
const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT

View File

@ -442,6 +442,11 @@ type
Table[Epoch, seq[ValidatorIndex]]
beacon_proposer_indices*: Table[Slot, Option[ValidatorIndex]]
AttestationSubnets* = object
subscribedSubnets*: array[2, set[uint8]]
stabilitySubnet*: uint64
stabilitySubnetExpirationEpoch*: Epoch
func shortValidatorKey*(state: BeaconState, validatorIdx: int): string =
($state.validators[validatorIdx].pubkey)[0..7]

View File

@ -8,7 +8,7 @@
{.push raises: [Defect].}
import
std/[strformat, sets],
std/[strformat, sets, random],
./datatypes, ./helpers, ./validator
const
@ -110,3 +110,51 @@ func get_committee_assignments(
result.add(
(compute_subnet_for_attestation(committees_per_slot, slot, idx),
slot))
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#phase-0-attestation-subnet-stability
proc getStabilitySubnetLength*(): uint64 =
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +
rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64
proc get_attestation_subnet_changes*(
state: BeaconState, attachedValidators: openarray[ValidatorIndex],
prevAttestationSubnets: AttestationSubnets, epoch: Epoch):
tuple[a: AttestationSubnets, b: set[uint8], c: set[uint8]] =
static:
doAssert ATTESTATION_SUBNET_COUNT == 64
var attestationSubnets = prevAttestationSubnets
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#phase-0-attestation-subnet-stability
let prevStabilitySubnet = {attestationSubnets.stabilitySubnet.uint8}
if epoch >= attestationSubnets.stabilitySubnetExpirationEpoch:
attestationSubnets.stabilitySubnet =
rand(ATTESTATION_SUBNET_COUNT - 1).uint64
attestationSubnets.stabilitySubnetExpirationEpoch =
epoch + getStabilitySubnetLength()
var nextEpochSubnets: set[uint8]
for it in get_committee_assignments(
state, state.slot.epoch + 1, attachedValidators.toHashSet):
nextEpochSubnets.incl it.subnetIndex.uint8
doAssert nextEpochSubnets.len > 0 and
nextEpochSubnets.len <= attachedValidators.len
let
epochParity = epoch mod 2
stabilitySet = {attestationSubnets.stabilitySubnet.uint8}
currentEpochSubnets = attestationSubnets.subscribedSubnets[1 - epochParity]
expiringSubnets =
(prevStabilitySubnet +
attestationSubnets.subscribedSubnets[epochParity]) -
nextEpochSubnets - currentEpochSubnets - stabilitySet
newSubnets =
(nextEpochSubnets + stabilitySet) -
(currentEpochSubnets + prevStabilitySubnet)
doAssert newSubnets.len <= attachedValidators.len + 1
attestationSubnets.subscribedSubnets[epochParity] = newSubnets
(attestationSubnets, expiringSubnets, newSubnets)