diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 7c5dcb139..e5bcbcaf3 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -327,6 +327,8 @@ proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) = node.network.metadata.attnets[subnet] = true proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) = + static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 + let epochParity = slot.epoch mod 2 var attachedValidators: seq[ValidatorIndex] for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len: @@ -334,6 +336,9 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) = node.chainDag.headState.data.data, validatorIndex.ValidatorIndex) != nil: attachedValidators.add validatorIndex.ValidatorIndex + if attachedValidators.len == 0: + return + let (newAttestationSubnets, expiringSubnets, newSubnets) = get_attestation_subnet_changes( node.chainDag.headState.data.data, attachedValidators, @@ -404,6 +409,29 @@ proc addMessageHandlers(node: BeaconNode): Future[void] = node.getAttestationHandlers() ) +func getTopicSubscriptionEnabled(node: BeaconNode): bool = + node.attestationSubnets.subscribedSubnets[0].len + + node.attestationSubnets.subscribedSubnets[1].len > 0 + +proc removeMessageHandlers(node: BeaconNode): Future[void] = + node.attestationSubnets.subscribedSubnets[0] = {} + node.attestationSubnets.subscribedSubnets[1] = {} + doAssert not node.getTopicSubscriptionEnabled() + + var unsubscriptions = mapIt( + [getBeaconBlocksTopic(node.forkDigest), + getVoluntaryExitsTopic(node.forkDigest), + getProposerSlashingsTopic(node.forkDigest), + getAttesterSlashingsTopic(node.forkDigest), + getAggregateAndProofsTopic(node.forkDigest)], + node.network.unsubscribe(it)) + + for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT: + unsubscriptions.add node.network.unsubscribe( + getAttestationTopic(node.forkDigest, subnet)) + + allFutures(unsubscriptions) + proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = ## Called at the beginning of a slot - usually every slot, but sometimes might ## skip a few in case we're running late. @@ -461,11 +489,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = if node.config.verifyFinalization: verifyFinalization(node, scheduledSlot) - # Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p - # subscribing to spin up. The faster the sync, the more wallSlot - headSlot - # lead time is required. - const TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 96 - if slot > lastSlot + SLOTS_PER_EPOCH: # We've fallen behind more than an epoch - there's nothing clever we can # do here really, except skip all the work and try again later. @@ -523,13 +546,21 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = finalizedHead = shortLog(node.chainDag.finalizedHead.blck), finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot()) - let syncQueueLen = node.syncManager.syncQueueLen + # Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p + # subscribing to spin up. The faster the sync, the more wallSlot - headSlot + # lead time is required + const + TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64 + HYSTERESIS_BUFFER = 16 + + let + syncQueueLen = node.syncManager.syncQueueLen + topicSubscriptionEnabled = node.getTopicSubscriptionEnabled() if # Don't enable if already enabled; to avoid race conditions requires care, # but isn't crucial, as this condition spuriously fail, but the next time, # should properly succeed. - node.attestationSubnets.subscribedSubnets[0].len + - node.attestationSubnets.subscribedSubnets[1].len == 0 and + not topicSubscriptionEnabled and # SyncManager forward sync by default runs until maxHeadAge slots, or one # epoch range is achieved. This particular condition has a couple caveats # including that under certain conditions, debtsCount appears to push len @@ -548,10 +579,22 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = syncQueueLen await node.addMessageHandlers() + doAssert node.getTopicSubscriptionEnabled() + elif + topicSubscriptionEnabled and + syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and + # Filter out underflow from debtsCount; plausible queue lengths can't + # exceed wallslot, with safety margin. + syncQueueLen < 2 * slot.uint64: + debug "Disabling topic subscriptions", + wallSlot = slot, + headSlot = node.chainDag.head.slot, + syncQueueLen + await node.removeMessageHandlers() - when false: - if slot.isEpoch: - node.cycleAttestationSubnets(slot) + # Subscription or unsubscription might have occurred; recheck + if slot.isEpoch and node.getTopicSubscriptionEnabled: + node.cycleAttestationSubnets(slot) when declared(GC_fullCollect): # The slots in the beacon node work as frames in a game: we want to make @@ -788,23 +831,6 @@ proc installMessageValidators(node: BeaconNode) = proc (voluntaryExit: VoluntaryExit): bool = node.processor[].voluntaryExitValidator(voluntaryExit)) -proc removeMessageHandlers(node: BeaconNode) = - var unsubscriptions: seq[Future[void]] - - for topic in [ - getBeaconBlocksTopic(node.forkDigest), - getVoluntaryExitsTopic(node.forkDigest), - getProposerSlashingsTopic(node.forkDigest), - getAttesterSlashingsTopic(node.forkDigest), - getAggregateAndProofsTopic(node.forkDigest)]: - unsubscriptions.add node.network.unsubscribe(topic) - - for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT: - unsubscriptions.add node.network.unsubscribe( - getAttestationTopic(node.forkDigest, subnet)) - - waitFor allFutures(unsubscriptions) - proc stop*(node: BeaconNode) = status = BeaconNodeStatus.Stopping info "Graceful shutdown"