register vc duties with subnet tracker (#2949)

* register vc duties with subnet tracker
* fix activation logging during startup
* cache slot signature to avoid duplicate signature work
* schedule aggregation duties one slot at a time to avoid CPU spike at
each epoch
* lower aggregation subnet pre-subscription time to 4 slots (lowers
bandwidth and CPU usage)
* update stability subnets in ENR on startup
* log gossip state
* perform gossip subscriptions just before the next slot starts
* document stuff
* add random include
* don't overwrite subscription state when not subscribed
* log target gossip state
* updating gossip status once is enough
* add test
* remove syncQueueLen - this one is not updated at the end of the sync
and may cause gossip to disconnect itself completely - use a simple head
distance instead
* fix gossip disconnection - if in hysteresis, node.gossipState will be
set to disabled even though we don't disable topic subscriptions
* fix extra duty registration call
This commit is contained in:
Jacek Sieka 2021-10-18 11:11:44 +02:00 committed by GitHub
parent 073389377e
commit 4f7a8cf79d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 525 additions and 410 deletions

View File

@ -15,20 +15,20 @@ import
taskpools,
# Local modules
./conf, ./beacon_clock, ./beacon_chain_db,
./beacon_node_types,
"."/[beacon_clock, beacon_chain_db, beacon_node_types, conf],
./gossip_processing/[eth2_processor, block_processor, consensus_manager],
./networking/eth2_network,
./eth1/eth1_monitor,
./consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool],
./spec/datatypes/base,
./sync/[sync_manager, request_manager]
./sync/[sync_manager, request_manager],
./validators/action_tracker
export
osproc, chronos, httpserver, presto, conf, beacon_clock, beacon_chain_db,
attestation_pool, eth2_network, beacon_node_types, eth1_monitor,
request_manager, sync_manager, eth2_processor, blockchain_dag, block_quarantine,
base
osproc, chronos, httpserver, presto, action_tracker, beacon_clock,
beacon_chain_db, conf, attestation_pool, eth2_network, beacon_node_types,
eth1_monitor, request_manager, sync_manager, eth2_processor, blockchain_dag,
block_quarantine, base
type
RpcServer* = RpcHttpServer
@ -61,7 +61,7 @@ type
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerID]
genesisSnapshotContent*: string
attestationSubnets*: AttestationSubnets
actionTracker*: ActionTracker
processor*: ref Eth2Processor
blockProcessor*: ref BlockProcessor
consensusManager*: ref ConsensusManager

View File

@ -176,30 +176,45 @@ type
# assumed that a valid index is stored here!
index*: Option[ValidatorIndex]
# Cache the latest slot signature - the slot signature is used to determine
# if the validator will be aggregating (in the near future)
slotSignature*: Option[tuple[slot: Slot, signature: ValidatorSig]]
ValidatorPool* = object
validators*: Table[ValidatorPubKey, AttachedValidator]
slashingProtection*: SlashingProtectionDB
AttesterDuty* = object
subnet*: SubnetId
slot*: Slot
isAggregator*: bool
AttestationSubnets* = object
enabled*: bool
subscribedSubnets*: BitArray[ATTESTATION_SUBNET_COUNT] ##\
## All subnets we're current subscribed to
stabilitySubnets*: seq[tuple[subnet_id: SubnetId, expiration: Epoch]] ##\
## The subnets on which we listen and broadcast gossip traffic to maintain
## the health of the network - these are advertised in the ENR
nextCycleEpoch*: Epoch
# These encode states in per-subnet state machines
aggregateSubnets*: BitArray[ATTESTATION_SUBNET_COUNT] ##\
## The subnets on which we listen for attestations in order to produce
## aggregates
subscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot]
unsubscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot]
# Used to track the next attestation and proposal slots using an
# epoch-relative coordinate system. Doesn't need initialization.
attestingSlots*: array[2, uint32]
proposingSlots*: array[2, uint32]
lastCalculatedEpoch*: Epoch
knownValidators*: Table[ValidatorIndex, Slot]
## Validators that we've recently seen - we'll subscribe to one stability
## subnet for each such validator - the slot is used to expire validators
## that no longer are posting duties
duties*: seq[AttesterDuty] ##\
## Known aggregation duties in the near future - before each such
## duty, we'll subscribe to the corresponding subnet to collect
func shortLog*(v: AttachedValidator): string = shortLog(v.pubKey)
func hash*(x: SyncCommitteeMsgKey): Hash =

View File

@ -173,12 +173,13 @@ func makeAttestationData*(
# https://github.com/ethereum/consensus-specs/blob/v1.1.2/specs/phase0/validator.md#validator-assignments
iterator get_committee_assignments*(
epochRef: EpochRef, epoch: Epoch, validator_indices: IntSet):
epochRef: EpochRef, validator_indices: IntSet):
tuple[validatorIndices: IntSet,
committeeIndex: CommitteeIndex,
subnet_id: SubnetId, slot: Slot] =
let
committees_per_slot = get_committee_count_per_slot(epochRef)
epoch = epochRef.epoch
start_slot = compute_start_slot_at_epoch(epoch)
for slot in start_slot ..< start_slot + SLOTS_PER_EPOCH:

View File

@ -2080,6 +2080,9 @@ proc unsubscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATIO
proc updateStabilitySubnetMetadata*(
node: Eth2Node, attnets: BitArray[ATTESTATION_SUBNET_COUNT]) =
# https://github.com/ethereum/consensus-specs/blob/v1.1.2/specs/phase0/p2p-interface.md#metadata
if node.metadata.attnets == attnets:
return
node.metadata.seq_number += 1
node.metadata.attnets = attnets
@ -2125,14 +2128,6 @@ proc updateForkId*(node: Eth2Node, epoch: Epoch, genesisValidatorsRoot: Eth2Dige
node.updateForkId(getENRForkId(node.cfg, epoch, genesisValidatorsRoot))
node.discoveryForkId = getDiscoveryForkID(node.cfg, epoch, genesisValidatorsRoot)
# https://github.com/ethereum/consensus-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
func getStabilitySubnetLength*(node: Eth2Node): uint64 =
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +
node.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64
func getRandomSubnetId*(node: Eth2Node): SubnetId =
node.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).SubnetId
func forkDigestAtEpoch(node: Eth2Node, epoch: Epoch): ForkDigest =
case node.cfg.stateForkAtEpoch(epoch)
of forkMerge: node.forkDigests.merge

View File

@ -12,13 +12,14 @@ import
tables, times, terminal],
# Nimble packages
serialization, json_serialization, spec/eth2_apis/eth2_rest_serialization,
stew/[objects, byteutils, endians2, io2], stew/shims/macros,
chronos, confutils, metrics, metrics/chronos_httpserver,
chronicles, bearssl, blscurve, presto,
json_serialization/std/[options, sets, net], serialization/errors,
taskpools,
eth/[keys, async_utils], eth/net/nat,
eth/keys, eth/net/nat,
eth/p2p/discoveryv5/[protocol, enr, random2],
# Local modules
@ -410,6 +411,7 @@ proc init*(T: type BeaconNode,
rpcServer: rpcServer,
restServer: restServer,
eventBus: eventBus,
actionTracker: ActionTracker.init(rng, config.subscribeAllSubnets),
processor: processor,
blockProcessor: blockProcessor,
consensusManager: consensusManager,
@ -461,6 +463,18 @@ proc init*(T: type BeaconNode,
except Exception as exc: raiseAssert exc.msg
node.addRemoteValidators()
block:
# Add in-process validators to the list of "known" validators such that
# we start with a reasonable ENR
let wallSlot = node.beaconClock.now().slotOrZero()
for validator in node.attachedValidators[].validators.values():
if validator.index.isSome():
node.actionTracker.knownValidators[validator.index.get()] = wallSlot
let stabilitySubnets = node.actionTracker.stabilitySubnets(wallSlot)
# Here, we also set the correct ENR should we be in all subnets mode!
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
network.initBeaconSync(dag, getBeaconTime)
node.updateValidatorMetrics()
@ -490,189 +504,36 @@ func toBitArray(stabilitySubnets: auto): BitArray[ATTESTATION_SUBNET_COUNT] =
for subnetInfo in stabilitySubnets:
result[subnetInfo.subnet_id.int] = true
proc getAttachedValidators(node: BeaconNode):
Table[ValidatorIndex, AttachedValidator] =
for validatorIndex in 0 ..<
getStateField(node.dag.headState.data, validators).len:
let attachedValidator = node.getAttachedValidator(
getStateField(node.dag.headState.data, validators),
validatorIndex.ValidatorIndex)
if attachedValidator.isNil:
continue
result[validatorIndex.ValidatorIndex] = attachedValidator
proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} =
doAssert epoch >= 1
let
attachedValidators = node.getAttachedValidators()
validatorIndices = toIntSet(toSeq(attachedValidators.keys()))
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#lookahead
# Only subscribe when this node should aggregate; libp2p broadcasting works
# on subnet topics regardless.
let epochRef = node.dag.getEpochRef(node.dag.head, epoch)
# Update proposals
node.attestationSubnets.proposingSlots[epoch mod 2] = 0
for i, proposer in epochRef.beacon_proposers:
if proposer.isSome and proposer.get() in attachedValidators:
node.attestationsubnets.proposingSlots[epoch mod 2] =
node.attestationsubnets.proposingSlots[epoch mod 2] or (1'u32 shl i)
# Update attestations
template isAnyCommitteeValidatorAggregating(
validatorIndices, committeeLen: untyped, slot: Slot): bool =
anyIt(
validatorIndices,
is_aggregator(
committeeLen,
await attachedValidators[it.ValidatorIndex].getSlotSig(
getStateField(node.dag.headState.data, fork),
getStateField(
node.dag.headState.data, genesis_validators_root), slot)))
node.attestationSubnets.lastCalculatedEpoch = epoch
node.attestationSubnets.attestingSlots[epoch mod 2] = 0
# The relevant bitmaps are 32 bits each.
static: doAssert SLOTS_PER_EPOCH <= 32
for (validatorIndices, committeeIndex, subnet_id, slot) in
get_committee_assignments(epochRef, epoch, validatorIndices):
doAssert compute_epoch_at_slot(slot) == epoch
# Each get_committee_assignments() call here is on the next epoch. At any
# given time, only care about two epochs, the current and next epoch. So,
# after it is done for an epoch, [aS[epoch mod 2], aS[1 - (epoch mod 2)]]
# provides, sequentially, the current and next epochs' slot schedules. If
# get_committee_assignments() has not been called for the next epoch yet,
# typically because there hasn't been a block in the current epoch, there
# isn't valid information in aS[1 - (epoch mod 2)], and only slots within
# the current epoch can be known. Usually, this is not a major issue, but
# when there hasn't been a block substantially through an epoch, it might
# prove misleading to claim that there aren't attestations known, when it
# only might be known either way for 3 more slots. However, it's also not
# as important to attest when blocks aren't flowing as only attestions in
# blocks garner rewards.
node.attestationSubnets.attestingSlots[epoch mod 2] =
node.attestationSubnets.attestingSlots[epoch mod 2] or
(1'u32 shl (slot mod SLOTS_PER_EPOCH))
if not isAnyCommitteeValidatorAggregating(
validatorIndices,
get_beacon_committee_len(epochRef, slot, committeeIndex), slot):
continue
node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] =
max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnet_id.uint64])
if not node.attestationSubnets.aggregateSubnets[subnet_id.uint64]:
# The lead time here allows for the gossip mesh to stabilise well before
# attestations start flowing on the channel - the downside of a long lead
# time is that we waste bandwidth and CPU on traffic we're not strictly
# interested in - it could potentially be decreased, specially when peers
# are selected based on their stability subnet connectivity
const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 6
node.attestationSubnets.subscribeSlot[subnet_id.uint64] =
# Queue upcoming subscription potentially earlier
# SLOTS_PER_EPOCH emulates one boundary condition of the per-epoch
# cycling mechanism timing buffers
min(
slot - min(slot.uint64, SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS),
node.attestationSubnets.subscribeSlot[subnet_id.uint64])
func updateStabilitySubnets(node: BeaconNode, slot: Slot): BitArray[ATTESTATION_SUBNET_COUNT] =
# Equivalent to wallSlot by cycleAttestationSubnets(), especially
# since it'll try to run early in epochs, avoiding race conditions.
let epoch = slot.epoch
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
for ss in node.attestationSubnets.stabilitySubnets.mitems():
if epoch >= ss.expiration:
ss.subnet_id = node.network.getRandomSubnetId()
ss.expiration = epoch + node.network.getStabilitySubnetLength()
result[ss.subnet_id.int] = true
proc cycleAttestationSubnetsPerEpoch(
node: BeaconNode, wallSlot: Slot,
prevStabilitySubnets: BitArray[ATTESTATION_SUBNET_COUNT]):
Future[BitArray[ATTESTATION_SUBNET_COUNT]] {.async.} =
# Per-epoch portion of subnet cycling: updating stability subnets and
# calculating future attestation subnets.
# Only know RANDAO mix, which determines shuffling seed, one epoch in
# advance. When getStateField(node.dag.headState, slot).epoch is
# ahead of wallSlot, the clock's just incorrect. If the slot's behind
# wallSlot, it would have to look more than MIN_SEED_LOOKAHEAD epochs
# ahead to compute the shuffling determining the beacon committees.
static: doAssert MIN_SEED_LOOKAHEAD == 1
if getStateField(node.dag.headState.data, slot).epoch != wallSlot.epoch:
debug "Requested attestation subnets too far in advance",
wallSlot,
stateSlot = getStateField(node.dag.headState.data, slot)
return prevStabilitySubnets
# This works so long as at least one block in an epoch provides a basis for
# calculating the shuffling for the next epoch. It will keep checking for a
# block, each slot, until a block comes in, even if the first few blocks in
# an epoch are missing. If a whole epoch without blocks occurs, it's not as
# important to attest regardless, as those upcoming blocks will hit maximum
# attestations quickly and any individual attestation's likelihood of being
# selected is low.
if node.attestationSubnets.nextCycleEpoch <= wallSlot.epoch:
await node.updateSubscriptionSchedule(wallSlot.epoch + 1)
node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1
let stabilitySubnets = node.updateStabilitySubnets(wallSlot)
if not node.config.subscribeAllSubnets and
stabilitySubnets != prevStabilitySubnets:
# In subscribeAllSubnets mode, this only gets set once, at initial subnet
# attestation handler creation, since they're all considered as stability
# subnets in that case.
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
return stabilitySubnets
func subnetLog(v: BitArray): string =
$toSeq(v.oneIndices())
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
doAssert not node.config.subscribeAllSubnets
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.2/specs/phase0/validator.md#phase-0-attestation-subnet-stability
proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
if node.gossipState == GossipState.Disconnected:
# When disconnected, updateGossipState is responsible for all things
# subnets - in particular, it will remove subscriptions on the edge where
# we enter the disconnected state.
return
let
prevStabilitySubnets =
node.attestationSubnets.stabilitySubnets.toBitArray()
stabilitySubnets =
await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets)
aggregateSubnets = node.actionTracker.aggregateSubnets(slot)
stabilitySubnets = node.actionTracker.stabilitySubnets(slot)
subnets = aggregateSubnets + stabilitySubnets
let prevAggregateSubnets = node.attestationSubnets.aggregateSubnets
for i in 0..<node.attestationSubnets.aggregateSubnets.len():
if node.attestationSubnets.aggregateSubnets[i]:
if wallSlot >= node.attestationSubnets.unsubscribeSlot[i]:
node.attestationSubnets.aggregateSubnets[i] = false
else:
if wallSlot >= node.attestationSubnets.subscribeSlot[i]:
node.attestationSubnets.aggregateSubnets[i] = true
# Accounting specific to non-stability subnets
for i in (prevAggregateSubnets - node.attestationSubnets.aggregateSubnets).
oneIndices():
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
# Now we know what we should be subscribed to - make it so
let
prevAllSubnets = prevAggregateSubnets + prevStabilitySubnets
allSubnets = node.attestationSubnets.aggregateSubnets + stabilitySubnets
unsubscribeSubnets = prevAllSubnets - allSubnets
subscribeSubnets = allSubnets - prevAllSubnets
prevSubnets = node.actionTracker.subscribedSubnets
unsubscribeSubnets = prevSubnets - subnets
subscribeSubnets = subnets - prevSubnets
# Remember what we subscribed to, so we can unsubscribe later
node.actionTracker.subscribedSubnets = subnets
case node.gossipState
of GossipState.Disconnected:
discard
raiseAssert "Checked above"
of GossipState.ConnectedToPhase0:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0)
@ -686,91 +547,13 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair)
debug "Attestation subnets",
wallSlot,
wallEpoch = wallSlot.epoch,
prevAggregateSubnets = subnetLog(prevAggregateSubnets),
aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets),
prevStabilitySubnets = subnetLog(prevStabilitySubnets),
slot, epoch = slot.epoch, gossipState = node.gossipState,
stabilitySubnets = subnetLog(stabilitySubnets),
aggregateSubnets = subnetLog(aggregateSubnets),
prevSubnets = subnetLog(prevSubnets),
subscribeSubnets = subnetLog(subscribeSubnets),
unsubscribeSubnets = subnetLog(unsubscribeSubnets)
proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] =
let
wallEpoch = node.beaconClock.now.slotOrZero.epoch
validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys()))
template mergeAggregateSubnets(epoch: Epoch) =
# TODO when https://github.com/nim-lang/Nim/issues/15972 and
# https://github.com/nim-lang/Nim/issues/16217 are fixed, in
# Nimbus's Nim, use (_, _, subnetIndex, slot).
let epochRef = node.dag.getEpochRef(node.dag.head, epoch)
for (_, ci, subnet_id, slot) in get_committee_assignments(
epochRef, epoch, validatorIndices):
result.withValue(subnet_id, v) do:
v[] = max(v[], slot + 1)
do:
result[subnet_id] = slot + 1
# Either wallEpoch is 0, in which case it might be pre-genesis, but we only
# care about the already-known first two epochs of attestations, or it's in
# epoch 0 for real, in which case both are also already known; or wallEpoch
# is greater than 0, in which case it's being called from onSlotStart which
# has enough state to calculate wallEpoch + {0,1}'s attestations.
mergeAggregateSubnets(wallEpoch)
mergeAggregateSubnets(wallEpoch + 1)
proc subscribeAttestationSubnetHandlers(node: BeaconNode,
forkDigest: ForkDigest) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# TODO:
# We might want to reuse the previous stability subnet if not expired when:
# - Restarting the node with a presistent netkey
# - When going from synced -> syncing -> synced state
if node.config.subscribeAllSubnets:
# In all-subnets mode, we create a stability subnet subscription for every
# subnet - this will be propagated in the attnets ENR entry
node.attestationSubnets.stabilitySubnets.setLen(ATTESTATION_SUBNET_COUNT)
for i, ss in node.attestationSubnets.stabilitySubnets.mpairs():
ss.subnet_id = SubnetId(i)
ss.expiration = FAR_FUTURE_EPOCH
else:
let wallEpoch = node.beaconClock.now.slotOrZero.epoch
# TODO make length dynamic when validator-client-based validators join and leave
# In normal mode, there's one subnet subscription per validator, changing
# randomly over time
node.attestationSubnets.stabilitySubnets.setLen(
node.attachedValidators[].count)
for i, ss in node.attestationSubnets.stabilitySubnets.mpairs():
ss.subnet_id = node.network.getRandomSubnetId()
ss.expiration = wallEpoch + node.network.getStabilitySubnetLength()
let stabilitySubnets =
node.attestationSubnets.stabilitySubnets.toBitArray()
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
let
aggregateSubnets = node.getInitialAggregateSubnets()
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
if SubnetId(i) in aggregateSubnets:
node.attestationSubnets.aggregateSubnets[i] = true
node.attestationSubnets.unsubscribeSlot[i] =
try: aggregateSubnets[SubnetId(i)] except KeyError: raiseAssert "checked with in"
else:
node.attestationSubnets.aggregateSubnets[i] = false
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
node.attestationSubnets.enabled = true
debug "Initial attestation subnets subscribed",
aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets),
stabilitySubnets = subnetLog(stabilitySubnets)
node.network.subscribeAttestationSubnets(
node.attestationSubnets.aggregateSubnets + stabilitySubnets,
forkDigest)
# inspired by lighthouse research here
# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py
const
@ -820,17 +603,17 @@ static:
aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet()
proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.network.subscribe(getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams)
node.network.subscribe(getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, enableTopicMetrics = true)
node.subscribeAttestationSubnetHandlers(forkDigest)
# updateAttestationSubnetHandlers subscribes attestation subnets
proc addPhase0MessageHandlers(node: BeaconNode) =
addPhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc addPhase0MessageHandlers(node: BeaconNode, slot: Slot) =
addPhase0MessageHandlers(node, node.dag.forkDigests.phase0, slot)
proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(getBeaconBlocksTopic(forkDigest))
@ -843,11 +626,13 @@ proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(
getAttestationTopic(forkDigest, SubnetId(subnet_id)))
node.actionTracker.subscribedSubnets = default(SubnetBits)
proc removePhase0MessageHandlers(node: BeaconNode) =
removePhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) =
node.addPhase0MessageHandlers(node.dag.forkDigests.altair)
node.addPhase0MessageHandlers(node.dag.forkDigests.altair, slot)
var syncnets: BitArray[SYNC_COMMITTEE_SUBNET_COUNT]
@ -873,9 +658,6 @@ proc removeAltairMessageHandlers(node: BeaconNode) =
node.network.unsubscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair))
func getTopicSubscriptionEnabled(node: BeaconNode): bool =
node.attestationSubnets.enabled
proc removeAllMessageHandlers(node: BeaconNode) =
node.removePhase0MessageHandlers()
node.removeAltairMessageHandlers()
@ -901,27 +683,30 @@ proc trackSyncCommitteeTopics*(node: BeaconNode) =
# TODO
discard
proc updateGossipStatus(node: BeaconNode, slot: Slot) {.raises: [Defect, CatchableError].} =
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
# lead time is required
proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
## Subscribe to subnets that we are providing stability for or aggregating
## and unsubscribe from the ones that are no longer relevant.
# Let the tracker know what duties are approaching - this will tell us how
# many stability subnets we need to be subscribed to and what subnets we'll
# soon be aggregating - in addition to the in-beacon-node duties, there may
# also be duties coming from the validator client, but we don't control when
# these arrive
await node.registerDuties(slot)
# We start subscribing to gossip before we're fully synced - this allows time
# to subscribe before the sync end game
const
TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64
HYSTERESIS_BUFFER = 16
let
syncQueueLen = node.syncManager.syncQueueLen
head = node.dag.head
headDistance =
if slot > head.slot: (slot - head.slot).uint64
else: 0'u64
targetGossipState =
# SyncManager forward sync by default runs until maxHeadAge slots, or one
# epoch range is achieved. This particular condition has a couple caveats
# including that under certain conditions, debtsCount appears to push len
# (here, syncQueueLen) to underflow-like values; and even when exactly at
# the expected walltime slot the queue isn't necessarily empty. Therefore
# TOPIC_SUBSCRIBE_THRESHOLD_SLOTS is not exactly the number of slots that
# are left. Furthermore, even when 0 peers are being used, this won't get
# to 0 slots in syncQueueLen, but that's a vacuous condition given that a
# networking interaction cannot happen under such circumstances.
if syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS:
if headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER:
GossipState.Disconnected
elif slot.epoch + 1 < node.dag.cfg.ALTAIR_FORK_EPOCH:
GossipState.ConnectedToPhase0
@ -935,73 +720,71 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.raises: [Defect, Catchab
# We are synced, so we will connect
debug "Enabling topic subscriptions",
wallSlot = slot,
headSlot = node.dag.head.slot,
syncQueueLen
headSlot = head.slot,
headDistance, targetGossipState
node.setupDoppelgangerDetection(slot)
block addRemoveHandlers:
case targetGossipState
# Specially when waiting for genesis, we'll already be synced on startup -
# it might also happen on a sufficiently fast restart
# We "know" the actions for the current and the next epoch
if node.isSynced(head):
node.actionTracker.updateActions(
node.dag.getEpochRef(head, slot.epoch))
node.actionTracker.updateActions(
node.dag.getEpochRef(head, slot.epoch + 1))
case targetGossipState
of GossipState.Disconnected:
case node.gossipState:
of GossipState.Disconnected: discard
else:
debug "Disabling topic subscriptions",
wallSlot = slot,
headSlot = head.slot,
headDistance
node.removeAllMessageHandlers()
node.gossipState = GossipState.Disconnected
of GossipState.ConnectedToPhase0:
case node.gossipState:
of GossipState.ConnectedToPhase0: discard
of GossipState.Disconnected:
case node.gossipState:
of GossipState.Disconnected: break
else:
if syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and
# Filter out underflow from debtsCount; plausible queue lengths can't
# exceed wallslot, with safety margin.
syncQueueLen < 2 * slot.uint64:
debug "Disabling topic subscriptions",
wallSlot = slot,
headSlot = node.dag.head.slot,
syncQueueLen
node.removeAllMessageHandlers()
node.gossipState = GossipState.Disconnected
break
of GossipState.ConnectedToPhase0:
case node.gossipState:
of GossipState.ConnectedToPhase0: break
of GossipState.Disconnected:
node.addPhase0MessageHandlers()
of GossipState.InTransitionToAltair:
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
node.addPhase0MessageHandlers()
node.addPhase0MessageHandlers(slot)
of GossipState.InTransitionToAltair:
case node.gossipState:
of GossipState.InTransitionToAltair: break
of GossipState.Disconnected:
node.addPhase0MessageHandlers()
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.addPhase0MessageHandlers()
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
of GossipState.ConnectedToAltair:
case node.gossipState:
of GossipState.ConnectedToAltair: break
of GossipState.Disconnected:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.removePhase0MessageHandlers()
node.addAltairMessageHandlers(slot)
of GossipState.InTransitionToAltair:
node.removePhase0MessageHandlers()
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
node.addPhase0MessageHandlers(slot)
node.gossipState = targetGossipState
of GossipState.InTransitionToAltair:
case node.gossipState:
of GossipState.InTransitionToAltair: discard
of GossipState.Disconnected:
node.addPhase0MessageHandlers(slot)
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.addPhase0MessageHandlers(slot)
# Subscription or unsubscription might have occurred; recheck. Since Nimbus
# initially subscribes to all subnets, simply do not ever cycle attestation
# subnets and they'll all remain subscribed.
if node.getTopicSubscriptionEnabled and not node.config.subscribeAllSubnets:
# This exits early all but one call each epoch.
traceAsyncErrors node.cycleAttestationSubnets(slot)
of GossipState.ConnectedToAltair:
case node.gossipState:
of GossipState.ConnectedToAltair: discard
of GossipState.Disconnected:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.removePhase0MessageHandlers()
node.addAltairMessageHandlers(slot)
of GossipState.InTransitionToAltair:
node.removePhase0MessageHandlers()
node.gossipState = targetGossipState
node.updateAttestationSubnetHandlers(slot)
func getNextValidatorAction(
actionSlotSource: auto, lastCalculatedEpoch: Epoch, slot: Slot): Slot =
@ -1067,6 +850,27 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
node.syncCommitteeMsgPool[].pruneData(slot)
# Update upcoming actions - we do this every slot in case a reorg happens
if node.isSynced(node.dag.head) and
node.actionTracker.lastCalculatedEpoch < slot.epoch + 1:
# TODO this is costly because we compute an EpochRef that likely will never
# be used for anything else, due to the epoch ancestor being selected
# pessimistically with respect to the shuffling - this needs fixing
# at EpochRef level by not mixing balances and shufflings in the same
# place
let epochRef = node.dag.getEpochRef(node.dag.head, slot.epoch + 1)
node.actionTracker.updateActions(epochRef)
let
nextAttestationSlot = getNextValidatorAction(
node.actionTracker.attestingSlots,
node.actionTracker.lastCalculatedEpoch, slot)
nextProposalSlot = getNextValidatorAction(
node.actionTracker.proposingSlots,
node.actionTracker.lastCalculatedEpoch, slot)
nextActionWaitTime = saturate(fromNow(
node.beaconClock, min(nextAttestationSlot, nextProposalSlot)))
# -1 is a more useful output than 18446744073709551615 as an indicator of
# no future attestation/proposal known.
template displayInt64(x: Slot): int64 =
@ -1075,16 +879,6 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
else:
toGaugeValue(x)
let
nextAttestationSlot = getNextValidatorAction(
node.attestationSubnets.attestingSlots,
node.attestationSubnets.lastCalculatedEpoch, slot)
nextProposalSlot = getNextValidatorAction(
node.attestationSubnets.proposingSlots,
node.attestationSubnets.lastCalculatedEpoch, slot)
nextActionWaitTime = saturate(fromNow(
node.beaconClock, min(nextAttestationSlot, nextProposalSlot)))
info "Slot end",
slot = shortLog(slot),
nextSlot = shortLog(slot + 1),
@ -1109,8 +903,6 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# Update 1 epoch early to block non-fork-ready peers
node.network.updateForkId(epoch, node.dag.genesisValidatorsRoot)
node.updateGossipStatus(slot)
# When we're not behind schedule, we'll speculatively update the clearance
# state in anticipation of receiving the next block - we do it after logging
# slot end since the nextActionWaitTime can be short
@ -1125,6 +917,14 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
await sleepAsync(advanceCutoff.offset)
node.dag.advanceClearanceState()
# Prepare action tracker for the next slot
node.actionTracker.updateSlot(slot + 1)
# The last thing we do is to perform the subscriptions and unsubscriptions for
# the next slot, just before that slot starts - because of the advance cuttoff
# above, this will be done just before the next slot starts
await node.updateGossipStatus(slot + 1)
proc onSlotStart(
node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
@ -1385,15 +1185,19 @@ proc run*(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.installMessageValidators()
let startTime = node.beaconClock.now()
asyncSpawn runSlotLoop(node, startTime, onSlotStart)
asyncSpawn runOnSecondLoop(node)
asyncSpawn runQueueProcessingLoop(node.blockProcessor)
let
wallTime = node.beaconClock.now()
wallSlot = wallTime.slotOrZero()
node.requestManager.start()
node.startSyncManager()
node.updateGossipStatus(startTime.slotOrZero)
waitFor node.updateGossipStatus(wallSlot)
asyncSpawn runSlotLoop(node, wallTime, onSlotStart)
asyncSpawn runOnSecondLoop(node)
asyncSpawn runQueueProcessingLoop(node.blockProcessor)
## Ctrl+C handling
proc controlCHandler() {.noconv.} =

View File

@ -514,11 +514,14 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
$res.error())
res.get()
let epochRef = node.dag.getEpochRef(head, epoch)
let subnet = uint8(compute_subnet_for_attestation(
let subnet = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), request.slot,
request.committee_index)
)
warn "Beacon committee subscription request served, but not implemented"
node.registerDuty(
request.slot, subnet, request.validator_index,
request.is_aggregator)
return RestApiResponse.jsonMsgResponse(BeaconCommitteeSubscriptionSuccess)
# https://ethereum.github.io/beacon-APIs/#/Validator/prepareSyncCommitteeSubnets

View File

@ -159,21 +159,11 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
let
head = node.doChecksAndGetCurrentHead(epoch)
epochRef = node.dag.getEpochRef(head, epoch)
subnet_id = compute_subnet_for_attestation(
subnet = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), slot, committee_index)
# Either subnet already subscribed or not. If not, subscribe. If it is,
# extend subscription. All one knows from the API combined with how far
# ahead one can check for attestation schedule is that it might be used
# for up to the end of next epoch. Therefore, arrange for subscriptions
# to last at least that long.
if not node.attestationSubnets.aggregateSubnets[subnet_id.uint64]:
# When to subscribe. Since it's not clear when from the API it's first
# needed, do so immediately.
node.attestationSubnets.subscribeSlot[subnet_id.uint64] =
min(node.attestationSubnets.subscribeSlot[subnet_id.uint64], wallSlot)
node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] =
max(
compute_start_slot_at_epoch(epoch + 2),
node.attestationSubnets.unsubscribeSlot[subnet_id.uint64])
# The validator index here is invalid, but since JSON-RPC is on its way
# to deprecation, this is fine
node.registerDuty(
slot, subnet, 0.ValidatorIndex,
is_aggregator(epochRef, slot, committee_index, slot_signature))

View File

@ -802,9 +802,6 @@ template queueAge(): uint64 =
template peerStatusAge(): Duration =
Moment.now() - peer.state(BeaconSync).statusLastTime
func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 =
man.queue.len
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()

View File

@ -0,0 +1,212 @@
import
std/[sequtils, intsets, sets, tables],
chronicles,
bearssl,
eth/p2p/discoveryv5/random2,
../spec/datatypes/base,
../spec/helpers,
../consensus_object_pools/[block_pools_types, spec_cache]
export base, helpers, sets, tables
const
SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS* = 4 ##\
## The number of slots before we're up for aggregation duty that we'll
## actually subscribe to the subnet we're aggregating for - this gives
## the node time to find a mesh etc - can likely be further trimmed
KNOWN_VALIDATOR_DECAY = 3 * 32 * SLOTS_PER_EPOCH ##\
## The number of slots before we "forget" about validators that have
## registered for duties - once we've forgotten about a validator, we'll
## eventually decrease the number of stability subnets we're subscribed to -
## 3 epochs because we perform attestations once every epoch, +1 to deal
## with rounding + 1 to deal with the network growing beyond 260k validators
## and us not validating every epoch any more.
## When known validators decrease, we will keep the stability subnet around
## until it "naturally" expires.
type
SubnetBits* = BitArray[ATTESTATION_SUBNET_COUNT]
AggregatorDuty* = object
subnet*: SubnetId
slot*: Slot
ActionTracker* = object
rng: ref BrHmacDrbgContext
subscribeAllSubnets*: bool
currentSlot*: Slot ##\
## Duties that we accept are limited to a range around the current slot
subscribedSubnets*: SubnetBits ##\
## All subnets we're currently subscribed to
stabilitySubnets: seq[tuple[subnet: SubnetId, expiration: Epoch]] ##\
## The subnets on which we listen and broadcast gossip traffic to maintain
## the health of the network - these are advertised in the ENR
nextCycleEpoch*: Epoch
# Used to track the next attestation and proposal slots using an
# epoch-relative coordinate system. Doesn't need initialization.
attestingSlots*: array[2, uint32]
proposingSlots*: array[2, uint32]
lastCalculatedEpoch*: Epoch
knownValidators*: Table[ValidatorIndex, Slot] ##\
## Validators that we've recently seen - we'll subscribe to one stability
## subnet for each such validator - the slot is used to expire validators
## that no longer are posting duties
duties*: seq[AggregatorDuty] ##\
## Known aggregation duties in the near future - before each such
## duty, we'll subscribe to the corresponding subnet to collect
## attestations for the aggregate
# https://github.com/ethereum/consensus-specs/blob/v1.1.2/specs/phase0/validator.md#phase-0-attestation-subnet-stability
func randomStabilitySubnet*(
self: ActionTracker, epoch: Epoch): tuple[subnet: SubnetId, expiration: Epoch] =
(
self.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).SubnetId,
epoch + EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +
self.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64,
)
proc registerDuty*(
tracker: var ActionTracker, slot: Slot, subnet: SubnetId,
vidx: ValidatorIndex, isAggregator: bool) =
# Only register relevant duties
if slot < tracker.currentSlot or
slot + (SLOTS_PER_EPOCH * 2) <= tracker.currentSlot:
debug "Irrelevant duty", slot, subnet, vidx
return
tracker.knownValidators[vidx] = slot # Update validator last-seen registry
if isAggregator:
let newDuty = AggregatorDuty(slot: slot, subnet: subnet)
for duty in tracker.duties.mitems():
if duty == newDuty:
return
debug "Registering aggregation duty", slot, subnet, vidx
tracker.duties.add(newDuty)
const allSubnetBits = block:
var res: SubnetBits
for i in 0..<res.len: res[i] = true
res
func aggregateSubnets*(tracker: ActionTracker, wallSlot: Slot): SubnetBits =
var res: SubnetBits
# Subscribe to subnets for upcoming duties
for duty in tracker.duties:
if wallSlot <= duty.slot and
wallSlot + SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS > duty.slot:
res[duty.subnet.int] = true
res
func stabilitySubnets*(tracker: ActionTracker, slot: Slot): SubnetBits =
if tracker.subscribeAllSubnets:
allSubnetBits
else:
var res: SubnetBits
for v in tracker.stabilitySubnets:
res[v.subnet.int] = true
res
func updateSlot*(tracker: var ActionTracker, wallSlot: Slot) =
# Prune duties from the past - this collection is kept small because there
# are only so many slot/subnet combos - prune both internal and API-supplied
# duties at the same time
tracker.duties.keepItIf(it.slot >= wallSlot)
# Keep stability subnets for as long as validators are validating
var toPrune: seq[ValidatorIndex]
for k, v in tracker.knownValidators:
if v + KNOWN_VALIDATOR_DECAY < wallSlot: toPrune.add k
for k in toPrune: tracker.knownValidators.del k
# One stability subnet per known validator
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.2/specs/phase0/validator.md#phase-0-attestation-subnet-stability
let expectedSubnets =
min(ATTESTATION_SUBNET_COUNT, tracker.knownValidators.len)
let epoch = wallSlot.epoch
block:
# If we have too many stability subnets, remove some expired ones
var i = 0
while tracker.stabilitySubnets.len > expectedSubnets and
i < tracker.stabilitySubnets.len:
if epoch >= tracker.stabilitySubnets[i].expiration:
tracker.stabilitySubnets.delete(i)
else:
inc i
for ss in tracker.stabilitySubnets.mitems():
if epoch >= ss.expiration:
ss = tracker.randomStabilitySubnet(epoch)
# and if we have too few, add a few more
for i in tracker.stabilitySubnets.len..<expectedSubnets:
tracker.stabilitySubnets.add(tracker.randomStabilitySubnet(epoch))
tracker.currentSlot = wallSlot
proc updateActions*(tracker: var ActionTracker, epochRef: EpochRef) =
# Updates the schedule for upcoming attestation and proposal work
let
epoch = epochRef.epoch
if tracker.lastCalculatedEpoch == epoch:
return
tracker.lastCalculatedEpoch = epoch
let
validatorIndices = toIntSet(toSeq(tracker.knownValidators.keys()))
# Update proposals
tracker.proposingSlots[epoch mod 2] = 0
for i, proposer in epochRef.beacon_proposers:
# TODO unsafe int conversion
if proposer.isSome and proposer.get().int in validatorIndices:
tracker.proposingSlots[epoch mod 2] =
tracker.proposingSlots[epoch mod 2] or (1'u32 shl i)
tracker.attestingSlots[epoch mod 2] = 0
# The relevant bitmaps are 32 bits each.
static: doAssert SLOTS_PER_EPOCH <= 32
for (validatorIndices, committeeIndex, subnet_id, slot) in
get_committee_assignments(epochRef, validatorIndices):
doAssert compute_epoch_at_slot(slot) == epoch
# Each get_committee_assignments() call here is on the next epoch. At any
# given time, only care about two epochs, the current and next epoch. So,
# after it is done for an epoch, [aS[epoch mod 2], aS[1 - (epoch mod 2)]]
# provides, sequentially, the current and next epochs' slot schedules. If
# get_committee_assignments() has not been called for the next epoch yet,
# typically because there hasn't been a block in the current epoch, there
# isn't valid information in aS[1 - (epoch mod 2)], and only slots within
# the current epoch can be known. Usually, this is not a major issue, but
# when there hasn't been a block substantially through an epoch, it might
# prove misleading to claim that there aren't attestations known, when it
# only might be known either way for 3 more slots. However, it's also not
# as important to attest when blocks aren't flowing as only attestions in
# blocks garner rewards.
tracker.attestingSlots[epoch mod 2] =
tracker.attestingSlots[epoch mod 2] or
(1'u32 shl (slot mod SLOTS_PER_EPOCH))
proc init*(T: type ActionTracker, rng: ref BrHmacDrbgContext, subscribeAllSubnets: bool): T =
T(
rng: rng,
subscribeAllSubnets: subscribeAllSubnets
)

View File

@ -22,7 +22,8 @@ import
# Local modules
../spec/datatypes/[phase0, altair, merge],
../spec/[
eth2_merkleization, forks, helpers, network, signatures, state_transition],
eth2_merkleization, forks, helpers, network, signatures, state_transition,
validator],
../consensus_object_pools/[
spec_cache, blockchain_dag, block_clearance, attestation_pool, exit_pool,
sync_committee_msg_pool],
@ -82,12 +83,18 @@ proc findValidator(validators: auto, pubKey: ValidatorPubKey):
else:
some(idx.ValidatorIndex)
proc addLocalValidator(node: BeaconNode, item: ValidatorPrivateItem) =
node.attachedValidators[].addLocalValidator(item)
proc addLocalValidator(node: BeaconNode,
validators: auto,
item: ValidatorPrivateItem) =
let pubKey = item.privateKey.toPubKey()
node.attachedValidators[].addLocalValidator(
item,
findValidator(validators, pubKey.toPubKey()))
proc addLocalValidators*(node: BeaconNode) =
for validatorItem in node.config.validatorItems():
node.addLocalValidator(validatorItem)
withState(node.dag.headState.data):
for validatorItem in node.config.validatorItems():
node.addLocalValidator(state.data.validators.asSeq(), validatorItem)
proc addRemoteValidators*(node: BeaconNode) {.raises: [Defect, OSError, IOError].} =
# load all the validators from the child process - loop until `end`
@ -653,11 +660,11 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
data.target.epoch,
signing_root)
if registered.isOk():
let subnet_id = compute_subnet_for_attestation(
let subnet = compute_subnet_for_attestation(
committees_per_slot, data.slot, data.index.CommitteeIndex)
asyncSpawn createAndSendAttestation(
node, fork, genesis_validators_root, validator, data,
committee.len(), index_in_committee, subnet_id)
committee.len(), index_in_committee, subnet)
else:
warn "Slashing protection activated for attestation",
validator = validator.pubkey,
@ -1097,10 +1104,10 @@ proc sendAttestation*(node: BeaconNode,
let
epochRef = node.dag.getEpochRef(
attestationBlock, attestation.data.target.epoch)
subnet_id = compute_subnet_for_attestation(
subnet = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), attestation.data.slot,
attestation.data.index.CommitteeIndex)
res = await node.sendAttestation(attestation, subnet_id,
res = await node.sendAttestation(attestation, subnet,
checkSignature = true)
if not(res):
return SendResult.err("Attestation failed validation")
@ -1175,3 +1182,45 @@ proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock
node.network.broadcastBeaconBlock(forked)
return SendBlockResult.ok(false)
return SendBlockResult.ok(true)
proc registerDuty*(
node: BeaconNode, slot: Slot, subnet: SubnetId, vidx: ValidatorIndex,
isAggregator: bool) =
# Only register relevant duties
node.actionTracker.registerDuty(slot, subnet, vidx, isAggregator)
proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} =
## Register upcoming duties of attached validators with the duty tracker
if node.attachedValidators[].count() == 0 or not node.isSynced(node.dag.head):
# Nothing to do because we have no validator attached
return
let
genesis_validators_root =
getStateField(node.dag.headState.data, genesis_validators_root)
head = node.dag.head
# Getting the slot signature is expensive but cached - in "normal" cases we'll
# be getting the duties one slot at a time
for slot in wallSlot ..< wallSlot + SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS:
let
epochRef = node.dag.getEpochRef(head, slot.epoch)
fork = node.dag.forkAtEpoch(slot.epoch)
committees_per_slot = get_committee_count_per_slot(epochRef)
for committee_index in 0'u64..<committees_per_slot:
let committee = get_beacon_committee(
epochRef, slot, committee_index.CommitteeIndex)
for index_in_committee, validatorIdx in committee:
let validator = node.getAttachedValidator(epochRef, validatorIdx)
if validator != nil:
let
subnet = compute_subnet_for_attestation(
committees_per_slot, slot, committee_index.CommitteeIndex)
slotSig = await getSlotSig(
validator, fork, genesis_validators_root, slot)
isAggregator = is_aggregator(committee.lenu64, slotSig)
node.registerDuty(slot, subnet, validatorIdx, isAggregator)

View File

@ -236,11 +236,16 @@ proc genRandaoReveal*(v: AttachedValidator, fork: Fork,
proc getSlotSig*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot
): Future[ValidatorSig] {.async.} =
return
if v.slotSignature.isSome() and v.slotSignature.get().slot == slot:
return v.slotSignature.get().signature
let signature =
case v.kind
of ValidatorKind.Local:
get_slot_signature(fork, genesis_validators_root, slot,
v.data.privateKey).toValidatorSig()
v.data.privateKey).toValidatorSig()
of ValidatorKind.Remote:
let root = compute_slot_root(fork, genesis_validators_root, slot)
await signWithRemoteValidator(v, root)
v.slotSignature = some((slot, signature))
return signature

View File

@ -13,6 +13,7 @@ import
import # Unit test
./ssz/all_tests as ssz_all_tests,
./test_action_tracker,
./test_attestation_pool,
./test_beacon_chain_db,
./test_beaconstate,

View File

@ -0,0 +1,43 @@
{.used.}
import
unittest2,
eth/keys,
../beacon_chain/validators/action_tracker
suite "subnet tracker":
let rng = keys.newRng()
test "should register stability subnets on attester duties":
var tracker = ActionTracker.init(rng, false)
check:
tracker.stabilitySubnets(Slot(0)).countOnes() == 0
tracker.aggregateSubnets(Slot(0)).countOnes() == 0
tracker.registerDuty(Slot(0), SubnetId(0), ValidatorIndex(0), true)
tracker.updateSlot(Slot(0))
check:
tracker.stabilitySubnets(Slot(0)).countOnes() == 1
tracker.aggregateSubnets(Slot(0)).countOnes() == 1
tracker.aggregateSubnets(Slot(1)).countOnes() == 0
tracker.registerDuty(Slot(1), SubnetId(1), ValidatorIndex(0), true)
check:
tracker.aggregateSubnets(Slot(0)).countOnes() == 2
tracker.aggregateSubnets(Slot(1)).countOnes() == 1
tracker.registerDuty(Slot(SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS), SubnetId(2), ValidatorIndex(0), true)
check:
tracker.aggregateSubnets(Slot(0)).countOnes() == 2
tracker.aggregateSubnets(Slot(1)).countOnes() == 2
# Guaranteed to expire
tracker.updateSlot(
Slot(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION * 2 * SLOTS_PER_EPOCH))
check:
tracker.stabilitySubnets(Slot(0)).countOnes() == 0
tracker.aggregateSubnets(Slot(0)).countOnes() == 0