refactor slot loop (#2355)

* refactor slot loop

* fix attestations being sent out early when _any_ block arrives (as
opposed to the block for the "correct" slot)
* fix attestations being sent out late when block already arrived
* refactor slot processing loop
* shutdown if clock moves backwards significantly
* fix docs

* notify caller whether the block actually arrived
This commit is contained in:
Jacek Sieka 2021-03-01 17:36:06 +01:00 committed by GitHub
parent 965972dd0b
commit 3e2c0a220c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 174 additions and 139 deletions

View File

@ -51,8 +51,6 @@ type
syncManager*: SyncManager[Peer, PeerID] syncManager*: SyncManager[Peer, PeerID]
topicBeaconBlocks*: string topicBeaconBlocks*: string
topicAggregateAndProofs*: string topicAggregateAndProofs*: string
blockProcessingLoop*: Future[void]
onSecondLoop*: Future[void]
genesisSnapshotContent*: string genesisSnapshotContent*: string
attestationSubnets*: AttestationSubnets attestationSubnets*: AttestationSubnets
processor*: ref Eth2Processor processor*: ref Eth2Processor

View File

@ -72,7 +72,8 @@ type
exitPool: ref ExitPool exitPool: ref ExitPool
validatorPool: ref ValidatorPool validatorPool: ref ValidatorPool
quarantine*: QuarantineRef quarantine*: QuarantineRef
blockReceivedDuringSlot*: Future[void] expectedSlot: Slot
expectedBlockReceived: Future[bool]
blocksQueue*: AsyncQueue[BlockEntry] blocksQueue*: AsyncQueue[BlockEntry]
attestationsQueue*: AsyncQueue[AttestationEntry] attestationsQueue*: AsyncQueue[AttestationEntry]
@ -80,6 +81,34 @@ type
doppelgangerDetection*: DoppelgangerProtection doppelgangerDetection*: DoppelgangerProtection
proc checkExpectedBlock(self: var Eth2Processor) =
if self.expectedBlockReceived == nil:
return
if self.chainDag.head.slot < self.expectedSlot:
return
self.expectedBlockReceived.complete(true)
self.expectedBlockReceived = nil # Don't keep completed futures around!
proc expectBlock*(self: var Eth2Processor, expectedSlot: Slot): Future[bool] =
## Return a future that will complete when a head is selected whose slot is
## equal or greater than the given slot, or a new expectation is created
if self.expectedBlockReceived != nil:
# Reset the old future to not leave it hanging.. an alternative would be to
# cancel it, but it doesn't make any practical difference for now
self.expectedBlockReceived.complete(false)
let fut = newFuture[bool]("Eth2Processor.expectBlock")
self.expectedSlot = expectedSlot
self.expectedBlockReceived = fut
# It might happen that by the time we're expecting a block, it might have
# already been processed!
self.checkExpectedBlock()
return fut
proc updateHead*(self: var Eth2Processor, wallSlot: Slot) = proc updateHead*(self: var Eth2Processor, wallSlot: Slot) =
## Trigger fork choice and returns the new head block. ## Trigger fork choice and returns the new head block.
## Can return `nil` ## Can return `nil`
@ -101,6 +130,8 @@ proc updateHead*(self: var Eth2Processor, wallSlot: Slot) =
if oldFinalized != self.chainDag.finalizedHead.blck: if oldFinalized != self.chainDag.finalizedHead.blck:
self.attestationPool[].prune() self.attestationPool[].prune()
self.checkExpectedBlock()
proc dumpBlock[T]( proc dumpBlock[T](
self: Eth2Processor, signedBlock: SignedBeaconBlock, self: Eth2Processor, signedBlock: SignedBeaconBlock,
res: Result[T, (ValidationResult, BlockError)]) = res: Result[T, (ValidationResult, BlockError)]) =
@ -147,10 +178,6 @@ proc storeBlock(
attestationPool[].addForkChoice( attestationPool[].addForkChoice(
epochRef, blckRef, trustedBlock.message, wallSlot) epochRef, blckRef, trustedBlock.message, wallSlot)
# Trigger attestation sending
if blck.isOk and not self.blockReceivedDuringSlot.finished:
self.blockReceivedDuringSlot.complete()
self.dumpBlock(signedBlock, blck) self.dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received. # There can be a scenario where we receive a block we already received.
@ -568,7 +595,6 @@ proc new*(T: type Eth2Processor,
exitPool: exitPool, exitPool: exitPool,
validatorPool: validatorPool, validatorPool: validatorPool,
quarantine: quarantine, quarantine: quarantine,
blockReceivedDuringSlot: newFuture[void](),
blocksQueue: newAsyncQueue[BlockEntry](1), blocksQueue: newAsyncQueue[BlockEntry](1),
# limit to the max number of aggregates we expect to see in one slot # limit to the max number of aggregates we expect to see in one slot
aggregatesQueue: newAsyncQueue[AggregateEntry]( aggregatesQueue: newAsyncQueue[AggregateEntry](

View File

@ -930,29 +930,27 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
node.updateGossipStatus(slot) node.updateGossipStatus(slot)
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = proc onSlotStart(
node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might ## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late. ## skip a few in case we're running late.
## wallTime: current system time - we will strive to perform all duties up
## to this point in time
## lastSlot: the last slot that we successfully processed, so we know where to ## lastSlot: the last slot that we successfully processed, so we know where to
## start work from ## start work from - there might be jumps if processing is delayed
## scheduledSlot: the slot that we were aiming for, in terms of timing
let let
# The slot we should be at, according to the clock # The slot we should be at, according to the clock
beaconTime = node.beaconClock.now() wallSlot = wallTime.slotOrZero
wallSlot = beaconTime.toSlot() # If everything was working perfectly, the slot that we should be processing
expectedSlot = lastSlot + 1
finalizedEpoch = finalizedEpoch =
node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot() node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot()
delay = wallTime - expectedSlot.toBeaconTime()
if not node.processor[].blockReceivedDuringSlot.finished:
node.processor[].blockReceivedDuringSlot.complete()
node.processor[].blockReceivedDuringSlot = newFuture[void]()
let delay = beaconTime - scheduledSlot.toBeaconTime()
info "Slot start", info "Slot start",
lastSlot = shortLog(lastSlot), lastSlot = shortLog(lastSlot),
scheduledSlot = shortLog(scheduledSlot), wallSlot = shortLog(wallSlot),
delay, delay = shortLog(delay),
peers = len(node.network.peerPool), peers = len(node.network.peerPool),
head = shortLog(node.chainDag.head), head = shortLog(node.chainDag.head),
headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()), headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()),
@ -963,87 +961,91 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
else: "synced" else: "synced"
# Check before any re-scheduling of onSlotStart() # Check before any re-scheduling of onSlotStart()
checkIfShouldStopAtEpoch(scheduledSlot, node.config.stopAtEpoch) checkIfShouldStopAtEpoch(wallSlot, node.config.stopAtEpoch)
if not wallSlot.afterGenesis or (wallSlot.slot < lastSlot): beacon_slot.set wallSlot.int64
let beacon_current_epoch.set wallSlot.epoch.int64
slot =
if wallSlot.afterGenesis: wallSlot.slot
else: GENESIS_SLOT
nextSlot = slot + 1 # At least GENESIS_SLOT + 1!
# This can happen if the system clock changes time for example, and it's finalization_delay.set wallSlot.epoch.int64 - finalizedEpoch.int64
# pretty bad
# TODO shut down? time either was or is bad, and PoS relies on accuracy..
warn "Beacon clock time moved back, rescheduling slot actions",
beaconTime = shortLog(beaconTime),
lastSlot = shortLog(lastSlot),
scheduledSlot = shortLog(scheduledSlot),
nextSlot = shortLog(nextSlot)
addTimer(saturate(node.beaconClock.fromNow(nextSlot))) do (p: pointer):
asyncCheck node.onSlotStart(slot, nextSlot)
return
let
slot = wallSlot.slot # afterGenesis == true!
nextSlot = slot + 1
defer: await onSlotEnd(node, slot)
beacon_slot.set slot.int64
beacon_current_epoch.set slot.epoch.int64
finalization_delay.set scheduledSlot.epoch.int64 - finalizedEpoch.int64
if node.config.verifyFinalization: if node.config.verifyFinalization:
verifyFinalization(node, scheduledSlot) verifyFinalization(node, wallSlot)
if slot > lastSlot + SLOTS_PER_EPOCH: node.processor[].updateHead(wallSlot)
# We've fallen behind more than an epoch - there's nothing clever we can
# do here really, except skip all the work and try again later.
# TODO how long should the period be? Using an epoch because that's roughly
# how long attestations remain interesting
# TODO should we shut down instead? clearly we're unable to keep up
warn "Unable to keep up, skipping ahead",
lastSlot = shortLog(lastSlot),
slot = shortLog(slot),
nextSlot = shortLog(nextSlot),
scheduledSlot = shortLog(scheduledSlot)
addTimer(saturate(node.beaconClock.fromNow(nextSlot))) do (p: pointer): await node.handleValidatorDuties(lastSlot, wallSlot)
# We pass the current slot here to indicate that work should be skipped!
asyncCheck node.onSlotStart(slot, nextSlot)
return
# Whatever we do during the slot, we need to know the head, because this will await onSlotEnd(node, wallSlot)
# give us a state to work with and thus a shuffling.
# TODO if the head is very old, that is indicative of something being very
# wrong - us being out of sync or disconnected from the network - need
# to consider what to do in that case:
# * nothing - the other parts of the application will reconnect and
# start listening to broadcasts, learn a new head etc..
# risky, because the network might stall if everyone does
# this, because no blocks will be produced
# * shut down - this allows the user to notice and take action, but is
# kind of harsh
# * keep going - we create blocks and attestations as usual and send them
# out - if network conditions improve, fork choice should
# eventually select the correct head and the rest will
# disappear naturally - risky because user is not aware,
# and might lose stake on canonical chain but "just works"
# when reconnected..
node.processor[].updateHead(slot)
# Time passes in here.. proc runSlotLoop(node: BeaconNode, startTime: BeaconTime) {.async.} =
await node.handleValidatorDuties(lastSlot, slot) var
curSlot = startTime.slotOrZero()
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1
timeToNextSlot = nextSlot.toBeaconTime() - startTime
let info "Scheduling first slot action",
nextSlotStart = saturate(node.beaconClock.fromNow(nextSlot)) startTime = shortLog(startTime),
nextSlot = shortLog(nextSlot),
timeToNextSlot = shortLog(timeToNextSlot)
addTimer(nextSlotStart) do (p: pointer): while true:
asyncCheck node.onSlotStart(slot, nextSlot) # Start by waiting for the time when the slot starts. Sleeping relinquishes
# control to other tasks which may or may not finish within the alotted
# time, so below, we need to be wary that the ship might have sailed
# already.
await sleepAsync(timeToNextSlot)
let
wallTime = node.beaconClock.now()
wallSlot = wallTime.slotOrZero() # Always > GENESIS!
if wallSlot < nextSlot:
# While we were sleeping, the system clock changed and time moved
# backwards!
if wallSlot + 1 < nextSlot:
# This is a critical condition where it's hard to reason about what
# to do next - we'll call the attention of the user here by shutting
# down.
fatal "System time adjusted backwards significantly - clock may be inaccurate - shutting down",
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
bnStatus = BeaconNodeStatus.Stopping
return
# Time moved back by a single slot - this could be a minor adjustment,
# for example when NTP does its thing after not working for a while
warn "System time adjusted backwards, rescheduling slot actions",
wallTime = shortLog(wallTime),
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
# cur & next slot remain the same
timeToNextSlot = nextSlot.toBeaconTime() - wallTime
continue
if wallSlot > nextSlot + SLOTS_PER_EPOCH:
# Time moved forwards by more than an epoch - either the clock was reset
# or we've been stuck in processing for a long time - either way, we will
# skip ahead so that we only process the events of the last
# SLOTS_PER_EPOCH slots
warn "Time moved forwards by more than an epoch, skipping ahead",
curSlot = shortLog(curSlot),
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
curSlot = wallSlot - SLOTS_PER_EPOCH
elif wallSlot > nextSlot:
notice "Missed expected slot start, catching up",
delay = shortLog(wallTime - nextSlot.toBeaconTime()),
curSlot = shortLog(curSlot),
nextSlot = shortLog(curSlot)
await onSlotStart(node, wallTime, curSlot)
curSlot = wallSlot
nextSlot = wallSlot + 1
timeToNextSlot = saturate(node.beaconClock.fromNow(nextSlot))
proc handleMissingBlocks(node: BeaconNode) = proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.quarantine.checkMissing() let missingBlocks = node.quarantine.checkMissing()
@ -1179,27 +1181,16 @@ proc run*(node: BeaconNode) =
node.installMessageValidators() node.installMessageValidators()
let let startTime = node.beaconClock.now()
curSlot = node.beaconClock.now().slotOrZero() asyncSpawn runSlotLoop(node, startTime)
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1 asyncSpawn runOnSecondLoop(node)
fromNow = saturate(node.beaconClock.fromNow(nextSlot)) asyncSpawn runQueueProcessingLoop(node.processor)
info "Scheduling first slot action",
beaconTime = shortLog(node.beaconClock.now()),
nextSlot = shortLog(nextSlot),
fromNow = shortLog(fromNow)
addTimer(fromNow) do (p: pointer):
asyncCheck node.onSlotStart(curSlot, nextSlot)
node.onSecondLoop = runOnSecondLoop(node)
node.blockProcessingLoop = node.processor.runQueueProcessingLoop()
node.requestManager.start() node.requestManager.start()
node.startSyncManager() node.startSyncManager()
if not node.beaconClock.now().toSlot().afterGenesis: if not startTime.toSlot().afterGenesis:
node.setupDoppelgangerDetection(curSlot) node.setupDoppelgangerDetection(startTime.slotOrZero())
node.addMessageHandlers() node.addMessageHandlers()
doAssert node.getTopicSubscriptionEnabled() doAssert node.getTopicSubscriptionEnabled()

View File

@ -280,7 +280,6 @@ proc proposeSignedBlock*(node: BeaconNode,
head: BlockRef, head: BlockRef,
validator: AttachedValidator, validator: AttachedValidator,
newBlock: SignedBeaconBlock): BlockRef = newBlock: SignedBeaconBlock): BlockRef =
let newBlockRef = node.chainDag.addRawBlock(node.quarantine, newBlock) do ( let newBlockRef = node.chainDag.addRawBlock(node.quarantine, newBlock) do (
blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock, blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock,
epochRef: EpochRef, state: HashedBeaconState): epochRef: EpochRef, state: HashedBeaconState):
@ -393,7 +392,6 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
attestationHeadRoot = shortLog(attestationHead.blck.root), attestationHeadRoot = shortLog(attestationHead.blck.root),
attestationSlot = shortLog(slot) attestationSlot = shortLog(slot)
# Collect data to send before node.stateCache grows stale
var attestations: seq[tuple[ var attestations: seq[tuple[
data: AttestationData, committeeLen, indexInCommittee: int, data: AttestationData, committeeLen, indexInCommittee: int,
validator: AttachedValidator]] validator: AttachedValidator]]
@ -661,35 +659,53 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
# slot_timing_entropy` seconds have elapsed since the start of the `slot` # slot_timing_entropy` seconds have elapsed since the start of the `slot`
# (using the `slot_timing_entropy` generated for this slot) # (using the `slot_timing_entropy` generated for this slot)
# We've been doing lots of work up until now which took time. Normally, we # Milliseconds to wait from the start of the slot before sending out
# send out attestations at the slot thirds-point, so we go back to the clock # attestations - base value
# to see how much time we need to wait. const attestationOffset =
# TODO the beacon clock might jump here also. It's probably easier to complete SECONDS_PER_SLOT.int64 * 1000 div ATTESTATION_PRODUCTION_DIVISOR
# the work for the whole slot using a monotonic clock instead, then deal
# with any clock discrepancies once only, at the start of slot timer
# processing..
let slotTimingEntropy = getSlotTimingEntropy()
template sleepToSlotOffsetWithHeadUpdate(extra: chronos.Duration, msg: static string) = let
let waitTime = node.beaconClock.fromNow(slot.toBeaconTime(extra)) slotTimingEntropy = getSlotTimingEntropy() # +/- 1s
if waitTime.inFuture: # The latest point in time when we'll be sending out attestations
discard await withTimeout( attestationCutoffTime = slot.toBeaconTime(
node.processor[].blockReceivedDuringSlot, waitTime.offset) millis(attestationOffset + slotTimingEntropy))
attestationCutoff = node.beaconClock.fromNow(attestationCutoffTime)
# Might have gotten a valid beacon block this slot, which triggers the if attestationCutoff.inFuture:
# first case, in which we wait for another abs(slotTimingEntropy). debug "Waiting to send attestations",
if node.processor[].blockReceivedDuringSlot.finished: head = shortLog(head),
await sleepAsync( attestationCutoff = shortLog(attestationCutoff.offset)
milliseconds(max(slotTimingEntropy, 0 - slotTimingEntropy)))
# Time passed - we might need to select a new head in that case # Wait either for the block or the attestation cutoff time to arrive
node.processor[].updateHead(slot) if await node.processor[].expectBlock(slot).withTimeout(attestationCutoff.offset):
head = node.chainDag.head # The expected block arrived (or expectBlock was called again which
# shouldn't happen as this is the only place we use it) - according to the
# spec, we should now wait for abs(slotTimingEntropy) - in our async loop
# however, we might have been doing other processing that caused delays
# here so we'll cap the waiting to the time when we would have sent out
# attestations had the block not arrived.
# An opposite case is that we received (or produced) a block that has
# not yet reached our neighbours. To protect against our attestations
# being dropped (because the others have not yet seen the block), we'll
# impose a minimum delay of 250ms. The delay is enforced only when we're
# not hitting the "normal" cutoff time for sending out attestations.
sleepToSlotOffsetWithHeadUpdate( let
milliseconds(SECONDS_PER_SLOT.int64 * 1000 div ATTESTATION_PRODUCTION_DIVISOR + afterBlockDelay = max(250, abs(slotTimingEntropy))
slotTimingEntropy), afterBlockTime = node.beaconClock.now() + millis(afterBlockDelay)
"Waiting to send attestations") afterBlockCutoff = node.beaconClock.fromNow(
min(afterBlockTime, attestationCutoffTime))
if afterBlockCutoff.inFuture:
debug "Got block, waiting to send attestations",
head = shortLog(head),
afterBlockCutoff = shortLog(afterBlockCutoff.offset)
await sleepAsync(afterBlockCutoff.offset)
# Time passed - we might need to select a new head in that case
node.processor[].updateHead(slot)
head = node.chainDag.head
handleAttestations(node, head, slot) handleAttestations(node, head, slot)
@ -702,9 +718,13 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
# through the slot-that is, SECONDS_PER_SLOT * 2 / 3 seconds after the start # through the slot-that is, SECONDS_PER_SLOT * 2 / 3 seconds after the start
# of slot. # of slot.
if slot > 2: if slot > 2:
discard await node.beaconClock.sleepToSlotOffset( let
seconds(int64(SECONDS_PER_SLOT * 2) div 3), slot, aggregateWaitTime = node.beaconClock.fromNow(
"Waiting to aggregate attestations") slot.toBeaconTime(seconds(int64(SECONDS_PER_SLOT * 2) div 3)))
if aggregateWaitTime.inFuture:
debug "Waiting to send aggregate attestations",
aggregateWaitTime = shortLog(aggregateWaitTime.offset)
await sleepAsync(aggregateWaitTime.offset)
await broadcastAggregatedAttestations(node, head, slot) await broadcastAggregatedAttestations(node, head, slot)