don't wait until after the first slot to enable gossip

This commit is contained in:
Dustin Brody 2020-12-01 11:43:02 +01:00 committed by zah
parent 2fd71a9aac
commit 68c91d1d1b
1 changed files with 55 additions and 49 deletions

View File

@ -485,6 +485,57 @@ proc removeMessageHandlers(node: BeaconNode): Future[void] =
allFutures(unsubscriptions) allFutures(unsubscriptions)
proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
# lead time is required
const
TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64
HYSTERESIS_BUFFER = 16
let
syncQueueLen = node.syncManager.syncQueueLen
topicSubscriptionEnabled = node.getTopicSubscriptionEnabled()
if
# Don't enable if already enabled; to avoid race conditions requires care,
# but isn't crucial, as this condition spuriously fail, but the next time,
# should properly succeed.
not topicSubscriptionEnabled and
# SyncManager forward sync by default runs until maxHeadAge slots, or one
# epoch range is achieved. This particular condition has a couple caveats
# including that under certain conditions, debtsCount appears to push len
# (here, syncQueueLen) to underflow-like values; and even when exactly at
# the expected walltime slot the queue isn't necessarily empty. Therefore
# TOPIC_SUBSCRIBE_THRESHOLD_SLOTS is not exactly the number of slots that
# are left. Furthermore, even when 0 peers are being used, this won't get
# to 0 slots in syncQueueLen, but that's a vacuous condition given that a
# networking interaction cannot happen under such circumstances.
syncQueueLen < TOPIC_SUBSCRIBE_THRESHOLD_SLOTS:
# When node.cycleAttestationSubnets() is enabled more properly, integrate
# this into the node.cycleAttestationSubnets() call.
debug "Enabling topic subscriptions",
wallSlot = slot,
headSlot = node.chainDag.head.slot,
syncQueueLen
await node.addMessageHandlers()
doAssert node.getTopicSubscriptionEnabled()
elif
topicSubscriptionEnabled and
syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and
# Filter out underflow from debtsCount; plausible queue lengths can't
# exceed wallslot, with safety margin.
syncQueueLen < 2 * slot.uint64:
debug "Disabling topic subscriptions",
wallSlot = slot,
headSlot = node.chainDag.head.slot,
syncQueueLen
await node.removeMessageHandlers()
# Subscription or unsubscription might have occurred; recheck
if slot.isEpoch and node.getTopicSubscriptionEnabled:
node.cycleAttestationSubnets(slot)
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might ## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late. ## skip a few in case we're running late.
@ -598,55 +649,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
finalizedHead = shortLog(node.chainDag.finalizedHead.blck), finalizedHead = shortLog(node.chainDag.finalizedHead.blck),
finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot()) finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot())
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p await node.updateGossipStatus(slot)
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
# lead time is required
const
TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 64
HYSTERESIS_BUFFER = 16
let
syncQueueLen = node.syncManager.syncQueueLen
topicSubscriptionEnabled = node.getTopicSubscriptionEnabled()
if
# Don't enable if already enabled; to avoid race conditions requires care,
# but isn't crucial, as this condition spuriously fail, but the next time,
# should properly succeed.
not topicSubscriptionEnabled and
# SyncManager forward sync by default runs until maxHeadAge slots, or one
# epoch range is achieved. This particular condition has a couple caveats
# including that under certain conditions, debtsCount appears to push len
# (here, syncQueueLen) to underflow-like values; and even when exactly at
# the expected walltime slot the queue isn't necessarily empty. Therefore
# TOPIC_SUBSCRIBE_THRESHOLD_SLOTS is not exactly the number of slots that
# are left. Furthermore, even when 0 peers are being used, this won't get
# to 0 slots in syncQueueLen, but that's a vacuous condition given that a
# networking interaction cannot happen under such circumstances.
syncQueueLen < TOPIC_SUBSCRIBE_THRESHOLD_SLOTS:
# When node.cycleAttestationSubnets() is enabled more properly, integrate
# this into the node.cycleAttestationSubnets() call.
debug "Enabling topic subscriptions",
wallSlot = slot,
headSlot = node.chainDag.head.slot,
syncQueueLen
await node.addMessageHandlers()
doAssert node.getTopicSubscriptionEnabled()
elif
topicSubscriptionEnabled and
syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and
# Filter out underflow from debtsCount; plausible queue lengths can't
# exceed wallslot, with safety margin.
syncQueueLen < 2 * slot.uint64:
debug "Disabling topic subscriptions",
wallSlot = slot,
headSlot = node.chainDag.head.slot,
syncQueueLen
await node.removeMessageHandlers()
# Subscription or unsubscription might have occurred; recheck
if slot.isEpoch and node.getTopicSubscriptionEnabled:
node.cycleAttestationSubnets(slot)
when declared(GC_fullCollect): when declared(GC_fullCollect):
# The slots in the beacon node work as frames in a game: we want to make # The slots in the beacon node work as frames in a game: we want to make
@ -814,6 +817,9 @@ proc run*(node: BeaconNode) =
node.requestManager.start() node.requestManager.start()
node.startSyncManager() node.startSyncManager()
waitFor node.addMessageHandlers()
doAssert node.getTopicSubscriptionEnabled()
## Ctrl+C handling ## Ctrl+C handling
proc controlCHandler() {.noconv.} = proc controlCHandler() {.noconv.} =
when defined(windows): when defined(windows):