enable topic unsubscribing and attestation subnet cycling (#1646)
* enable topic unsubscribing and attestation subnet cycling * remove refences to states * waitFor -> await * revert exit pool changes for a cleaner PR
This commit is contained in:
parent
617c295839
commit
7e10b9850b
|
@ -327,6 +327,8 @@ proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) =
|
||||||
node.network.metadata.attnets[subnet] = true
|
node.network.metadata.attnets[subnet] = true
|
||||||
|
|
||||||
proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) =
|
proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) =
|
||||||
|
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
|
||||||
|
|
||||||
let epochParity = slot.epoch mod 2
|
let epochParity = slot.epoch mod 2
|
||||||
var attachedValidators: seq[ValidatorIndex]
|
var attachedValidators: seq[ValidatorIndex]
|
||||||
for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len:
|
for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len:
|
||||||
|
@ -334,6 +336,9 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) =
|
||||||
node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) != nil:
|
node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) != nil:
|
||||||
attachedValidators.add validatorIndex.ValidatorIndex
|
attachedValidators.add validatorIndex.ValidatorIndex
|
||||||
|
|
||||||
|
if attachedValidators.len == 0:
|
||||||
|
return
|
||||||
|
|
||||||
let (newAttestationSubnets, expiringSubnets, newSubnets) =
|
let (newAttestationSubnets, expiringSubnets, newSubnets) =
|
||||||
get_attestation_subnet_changes(
|
get_attestation_subnet_changes(
|
||||||
node.chainDag.headState.data.data, attachedValidators,
|
node.chainDag.headState.data.data, attachedValidators,
|
||||||
|
@ -404,6 +409,29 @@ proc addMessageHandlers(node: BeaconNode): Future[void] =
|
||||||
node.getAttestationHandlers()
|
node.getAttestationHandlers()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func getTopicSubscriptionEnabled(node: BeaconNode): bool =
|
||||||
|
node.attestationSubnets.subscribedSubnets[0].len +
|
||||||
|
node.attestationSubnets.subscribedSubnets[1].len > 0
|
||||||
|
|
||||||
|
proc removeMessageHandlers(node: BeaconNode): Future[void] =
|
||||||
|
node.attestationSubnets.subscribedSubnets[0] = {}
|
||||||
|
node.attestationSubnets.subscribedSubnets[1] = {}
|
||||||
|
doAssert not node.getTopicSubscriptionEnabled()
|
||||||
|
|
||||||
|
var unsubscriptions = mapIt(
|
||||||
|
[getBeaconBlocksTopic(node.forkDigest),
|
||||||
|
getVoluntaryExitsTopic(node.forkDigest),
|
||||||
|
getProposerSlashingsTopic(node.forkDigest),
|
||||||
|
getAttesterSlashingsTopic(node.forkDigest),
|
||||||
|
getAggregateAndProofsTopic(node.forkDigest)],
|
||||||
|
node.network.unsubscribe(it))
|
||||||
|
|
||||||
|
for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
|
||||||
|
unsubscriptions.add node.network.unsubscribe(
|
||||||
|
getAttestationTopic(node.forkDigest, subnet))
|
||||||
|
|
||||||
|
allFutures(unsubscriptions)
|
||||||
|
|
||||||
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
||||||
## Called at the beginning of a slot - usually every slot, but sometimes might
|
## Called at the beginning of a slot - usually every slot, but sometimes might
|
||||||
## skip a few in case we're running late.
|
## skip a few in case we're running late.
|
||||||
|
@ -461,11 +489,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
||||||
if node.config.verifyFinalization:
|
if node.config.verifyFinalization:
|
||||||
verifyFinalization(node, scheduledSlot)
|
verifyFinalization(node, scheduledSlot)
|
||||||
|
|
||||||
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
|
|
||||||
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
|
|
||||||
# lead time is required.
|
|
||||||
const TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 96
|
|
||||||
|
|
||||||
if slot > lastSlot + SLOTS_PER_EPOCH:
|
if slot > lastSlot + SLOTS_PER_EPOCH:
|
||||||
# We've fallen behind more than an epoch - there's nothing clever we can
|
# We've fallen behind more than an epoch - there's nothing clever we can
|
||||||
# do here really, except skip all the work and try again later.
|
# do here really, except skip all the work and try again later.
|
||||||
|
@ -523,13 +546,21 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
||||||
finalizedHead = shortLog(node.chainDag.finalizedHead.blck),
|
finalizedHead = shortLog(node.chainDag.finalizedHead.blck),
|
||||||
finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot())
|
finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot())
|
||||||
|
|
||||||
let syncQueueLen = node.syncManager.syncQueueLen
|
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
|
||||||
|
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
|
||||||
|
# lead time is required
|
||||||
|
const
|
||||||
|
TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64
|
||||||
|
HYSTERESIS_BUFFER = 16
|
||||||
|
|
||||||
|
let
|
||||||
|
syncQueueLen = node.syncManager.syncQueueLen
|
||||||
|
topicSubscriptionEnabled = node.getTopicSubscriptionEnabled()
|
||||||
if
|
if
|
||||||
# Don't enable if already enabled; to avoid race conditions requires care,
|
# Don't enable if already enabled; to avoid race conditions requires care,
|
||||||
# but isn't crucial, as this condition spuriously fail, but the next time,
|
# but isn't crucial, as this condition spuriously fail, but the next time,
|
||||||
# should properly succeed.
|
# should properly succeed.
|
||||||
node.attestationSubnets.subscribedSubnets[0].len +
|
not topicSubscriptionEnabled and
|
||||||
node.attestationSubnets.subscribedSubnets[1].len == 0 and
|
|
||||||
# SyncManager forward sync by default runs until maxHeadAge slots, or one
|
# SyncManager forward sync by default runs until maxHeadAge slots, or one
|
||||||
# epoch range is achieved. This particular condition has a couple caveats
|
# epoch range is achieved. This particular condition has a couple caveats
|
||||||
# including that under certain conditions, debtsCount appears to push len
|
# including that under certain conditions, debtsCount appears to push len
|
||||||
|
@ -548,10 +579,22 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
||||||
syncQueueLen
|
syncQueueLen
|
||||||
|
|
||||||
await node.addMessageHandlers()
|
await node.addMessageHandlers()
|
||||||
|
doAssert node.getTopicSubscriptionEnabled()
|
||||||
|
elif
|
||||||
|
topicSubscriptionEnabled and
|
||||||
|
syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and
|
||||||
|
# Filter out underflow from debtsCount; plausible queue lengths can't
|
||||||
|
# exceed wallslot, with safety margin.
|
||||||
|
syncQueueLen < 2 * slot.uint64:
|
||||||
|
debug "Disabling topic subscriptions",
|
||||||
|
wallSlot = slot,
|
||||||
|
headSlot = node.chainDag.head.slot,
|
||||||
|
syncQueueLen
|
||||||
|
await node.removeMessageHandlers()
|
||||||
|
|
||||||
when false:
|
# Subscription or unsubscription might have occurred; recheck
|
||||||
if slot.isEpoch:
|
if slot.isEpoch and node.getTopicSubscriptionEnabled:
|
||||||
node.cycleAttestationSubnets(slot)
|
node.cycleAttestationSubnets(slot)
|
||||||
|
|
||||||
when declared(GC_fullCollect):
|
when declared(GC_fullCollect):
|
||||||
# The slots in the beacon node work as frames in a game: we want to make
|
# The slots in the beacon node work as frames in a game: we want to make
|
||||||
|
@ -788,23 +831,6 @@ proc installMessageValidators(node: BeaconNode) =
|
||||||
proc (voluntaryExit: VoluntaryExit): bool =
|
proc (voluntaryExit: VoluntaryExit): bool =
|
||||||
node.processor[].voluntaryExitValidator(voluntaryExit))
|
node.processor[].voluntaryExitValidator(voluntaryExit))
|
||||||
|
|
||||||
proc removeMessageHandlers(node: BeaconNode) =
|
|
||||||
var unsubscriptions: seq[Future[void]]
|
|
||||||
|
|
||||||
for topic in [
|
|
||||||
getBeaconBlocksTopic(node.forkDigest),
|
|
||||||
getVoluntaryExitsTopic(node.forkDigest),
|
|
||||||
getProposerSlashingsTopic(node.forkDigest),
|
|
||||||
getAttesterSlashingsTopic(node.forkDigest),
|
|
||||||
getAggregateAndProofsTopic(node.forkDigest)]:
|
|
||||||
unsubscriptions.add node.network.unsubscribe(topic)
|
|
||||||
|
|
||||||
for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
|
|
||||||
unsubscriptions.add node.network.unsubscribe(
|
|
||||||
getAttestationTopic(node.forkDigest, subnet))
|
|
||||||
|
|
||||||
waitFor allFutures(unsubscriptions)
|
|
||||||
|
|
||||||
proc stop*(node: BeaconNode) =
|
proc stop*(node: BeaconNode) =
|
||||||
status = BeaconNodeStatus.Stopping
|
status = BeaconNodeStatus.Stopping
|
||||||
info "Graceful shutdown"
|
info "Graceful shutdown"
|
||||||
|
|
Loading…
Reference in New Issue