diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 6977f9d46..6d0511012 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -485,6 +485,57 @@ proc removeMessageHandlers(node: BeaconNode): Future[void] = allFutures(unsubscriptions) +proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = + # Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p + # subscribing to spin up. The faster the sync, the more wallSlot - headSlot + # lead time is required + const + TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64 + HYSTERESIS_BUFFER = 16 + + let + syncQueueLen = node.syncManager.syncQueueLen + topicSubscriptionEnabled = node.getTopicSubscriptionEnabled() + if + # Don't enable if already enabled; to avoid race conditions requires care, + # but isn't crucial, as this condition spuriously fail, but the next time, + # should properly succeed. + not topicSubscriptionEnabled and + # SyncManager forward sync by default runs until maxHeadAge slots, or one + # epoch range is achieved. This particular condition has a couple caveats + # including that under certain conditions, debtsCount appears to push len + # (here, syncQueueLen) to underflow-like values; and even when exactly at + # the expected walltime slot the queue isn't necessarily empty. Therefore + # TOPIC_SUBSCRIBE_THRESHOLD_SLOTS is not exactly the number of slots that + # are left. Furthermore, even when 0 peers are being used, this won't get + # to 0 slots in syncQueueLen, but that's a vacuous condition given that a + # networking interaction cannot happen under such circumstances. + syncQueueLen < TOPIC_SUBSCRIBE_THRESHOLD_SLOTS: + # When node.cycleAttestationSubnets() is enabled more properly, integrate + # this into the node.cycleAttestationSubnets() call. + debug "Enabling topic subscriptions", + wallSlot = slot, + headSlot = node.chainDag.head.slot, + syncQueueLen + + await node.addMessageHandlers() + doAssert node.getTopicSubscriptionEnabled() + elif + topicSubscriptionEnabled and + syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and + # Filter out underflow from debtsCount; plausible queue lengths can't + # exceed wallslot, with safety margin. + syncQueueLen < 2 * slot.uint64: + debug "Disabling topic subscriptions", + wallSlot = slot, + headSlot = node.chainDag.head.slot, + syncQueueLen + await node.removeMessageHandlers() + + # Subscription or unsubscription might have occurred; recheck + if slot.isEpoch and node.getTopicSubscriptionEnabled: + node.cycleAttestationSubnets(slot) + proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = ## Called at the beginning of a slot - usually every slot, but sometimes might ## skip a few in case we're running late. @@ -598,55 +649,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = finalizedHead = shortLog(node.chainDag.finalizedHead.blck), finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot()) - # Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p - # subscribing to spin up. The faster the sync, the more wallSlot - headSlot - # lead time is required - const - TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64 - HYSTERESIS_BUFFER = 16 - - let - syncQueueLen = node.syncManager.syncQueueLen - topicSubscriptionEnabled = node.getTopicSubscriptionEnabled() - if - # Don't enable if already enabled; to avoid race conditions requires care, - # but isn't crucial, as this condition spuriously fail, but the next time, - # should properly succeed. - not topicSubscriptionEnabled and - # SyncManager forward sync by default runs until maxHeadAge slots, or one - # epoch range is achieved. This particular condition has a couple caveats - # including that under certain conditions, debtsCount appears to push len - # (here, syncQueueLen) to underflow-like values; and even when exactly at - # the expected walltime slot the queue isn't necessarily empty. Therefore - # TOPIC_SUBSCRIBE_THRESHOLD_SLOTS is not exactly the number of slots that - # are left. Furthermore, even when 0 peers are being used, this won't get - # to 0 slots in syncQueueLen, but that's a vacuous condition given that a - # networking interaction cannot happen under such circumstances. - syncQueueLen < TOPIC_SUBSCRIBE_THRESHOLD_SLOTS: - # When node.cycleAttestationSubnets() is enabled more properly, integrate - # this into the node.cycleAttestationSubnets() call. - debug "Enabling topic subscriptions", - wallSlot = slot, - headSlot = node.chainDag.head.slot, - syncQueueLen - - await node.addMessageHandlers() - doAssert node.getTopicSubscriptionEnabled() - elif - topicSubscriptionEnabled and - syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and - # Filter out underflow from debtsCount; plausible queue lengths can't - # exceed wallslot, with safety margin. - syncQueueLen < 2 * slot.uint64: - debug "Disabling topic subscriptions", - wallSlot = slot, - headSlot = node.chainDag.head.slot, - syncQueueLen - await node.removeMessageHandlers() - - # Subscription or unsubscription might have occurred; recheck - if slot.isEpoch and node.getTopicSubscriptionEnabled: - node.cycleAttestationSubnets(slot) + await node.updateGossipStatus(slot) when declared(GC_fullCollect): # The slots in the beacon node work as frames in a game: we want to make @@ -814,6 +817,9 @@ proc run*(node: BeaconNode) = node.requestManager.start() node.startSyncManager() + waitFor node.addMessageHandlers() + doAssert node.getTopicSubscriptionEnabled() + ## Ctrl+C handling proc controlCHandler() {.noconv.} = when defined(windows):