don't rely on head updates for topic subscription decision
This commit is contained in:
parent
cac4fee569
commit
95d5736128
|
@ -467,7 +467,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
|
||||||
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
|
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
|
||||||
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
|
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
|
||||||
# lead time is required.
|
# lead time is required.
|
||||||
const GOSSIP_ACTIVATION_THRESHOLD_SLOTS = 64
|
const TOPIC_SUBSCRIBE_THRESHOLD_SLOTS = 96
|
||||||
|
|
||||||
if slot > lastSlot + SLOTS_PER_EPOCH:
|
if slot > lastSlot + SLOTS_PER_EPOCH:
|
||||||
# We've fallen behind more than an epoch - there's nothing clever we can
|
# We've fallen behind more than an epoch - there's nothing clever we can
|
||||||
|
@ -526,6 +526,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
|
||||||
finalizedHead = shortLog(node.chainDag.finalizedHead.blck),
|
finalizedHead = shortLog(node.chainDag.finalizedHead.blck),
|
||||||
finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot())
|
finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot())
|
||||||
|
|
||||||
|
let syncQueueLen = node.syncManager.syncQueueLen
|
||||||
if
|
if
|
||||||
# Don't enable if already enabled; to avoid race conditions requires care,
|
# Don't enable if already enabled; to avoid race conditions requires care,
|
||||||
# but isn't crucial, as this condition spuriously fail, but the next time,
|
# but isn't crucial, as this condition spuriously fail, but the next time,
|
||||||
|
@ -533,13 +534,22 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, asyn
|
||||||
node.attestationSubnets.subscribedSubnets[0].len +
|
node.attestationSubnets.subscribedSubnets[0].len +
|
||||||
node.attestationSubnets.subscribedSubnets[1].len == 0 and
|
node.attestationSubnets.subscribedSubnets[1].len == 0 and
|
||||||
# SyncManager forward sync by default runs until maxHeadAge slots, or one
|
# SyncManager forward sync by default runs until maxHeadAge slots, or one
|
||||||
# epoch range is achieved.
|
# epoch range is achieved. This particular condition has a couple caveats
|
||||||
slot < node.chainDag.head.slot + GOSSIP_ACTIVATION_THRESHOLD_SLOTS:
|
# including that under certain conditions, debtsCount appears to push len
|
||||||
|
# (here, syncQueueLen) to underflow-like values; and even when exactly at
|
||||||
|
# the expected walltime slot the queue isn't necessarily empty. Therefore
|
||||||
|
# TOPIC_SUBSCRIBE_THRESHOLD_SLOTS is not exactly the number of slots that
|
||||||
|
# are left. Furthermore, even when 0 peers are being used, this won't get
|
||||||
|
# to 0 slots in syncQueueLen, but that's a vacuous condition given that a
|
||||||
|
# networking interaction cannot happen under such circumstances.
|
||||||
|
syncQueueLen < TOPIC_SUBSCRIBE_THRESHOLD_SLOTS:
|
||||||
# When node.cycleAttestationSubnets() is enabled more properly, integrate
|
# When node.cycleAttestationSubnets() is enabled more properly, integrate
|
||||||
# this into the node.cycleAttestationSubnets() call.
|
# this into the node.cycleAttestationSubnets() call.
|
||||||
debug "Enabling topic subscriptions",
|
debug "Enabling topic subscriptions",
|
||||||
wallSlot = slot,
|
wallSlot = slot,
|
||||||
headSlot = node.chainDag.head.slot
|
headSlot = node.chainDag.head.slot,
|
||||||
|
syncQueueLen
|
||||||
|
|
||||||
await node.addMessageHandlers()
|
await node.addMessageHandlers()
|
||||||
|
|
||||||
when false:
|
when false:
|
||||||
|
|
|
@ -653,6 +653,9 @@ template checkPeerScore(peer, body: untyped): untyped =
|
||||||
topics = "syncman"
|
topics = "syncman"
|
||||||
break
|
break
|
||||||
|
|
||||||
|
func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 =
|
||||||
|
man.queue.len
|
||||||
|
|
||||||
proc syncWorker*[A, B](man: SyncManager[A, B],
|
proc syncWorker*[A, B](man: SyncManager[A, B],
|
||||||
peer: A): Future[A] {.async.} =
|
peer: A): Future[A] {.async.} =
|
||||||
# Sync worker is the lowest level loop which performs syncing with single
|
# Sync worker is the lowest level loop which performs syncing with single
|
||||||
|
|
Loading…
Reference in New Issue