From 3e2c0a220c129e790836426fc37eadcb20e4acba Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 1 Mar 2021 17:36:06 +0100 Subject: [PATCH] refactor slot loop (#2355) * refactor slot loop * fix attestations being sent out early when _any_ block arrives (as opposed to the block for the "correct" slot) * fix attestations being sent out late when block already arrived * refactor slot processing loop * shutdown if clock moves backwards significantly * fix docs * notify caller whether the block actually arrived --- beacon_chain/beacon_node_common.nim | 2 - beacon_chain/eth2_processor.nim | 38 +++++- beacon_chain/nimbus_beacon_node.nim | 193 +++++++++++++--------------- beacon_chain/validator_duties.nim | 80 +++++++----- 4 files changed, 174 insertions(+), 139 deletions(-) diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index af85656fa..11a468987 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -51,8 +51,6 @@ type syncManager*: SyncManager[Peer, PeerID] topicBeaconBlocks*: string topicAggregateAndProofs*: string - blockProcessingLoop*: Future[void] - onSecondLoop*: Future[void] genesisSnapshotContent*: string attestationSubnets*: AttestationSubnets processor*: ref Eth2Processor diff --git a/beacon_chain/eth2_processor.nim b/beacon_chain/eth2_processor.nim index 23665cf37..a891d743b 100644 --- a/beacon_chain/eth2_processor.nim +++ b/beacon_chain/eth2_processor.nim @@ -72,7 +72,8 @@ type exitPool: ref ExitPool validatorPool: ref ValidatorPool quarantine*: QuarantineRef - blockReceivedDuringSlot*: Future[void] + expectedSlot: Slot + expectedBlockReceived: Future[bool] blocksQueue*: AsyncQueue[BlockEntry] attestationsQueue*: AsyncQueue[AttestationEntry] @@ -80,6 +81,34 @@ type doppelgangerDetection*: DoppelgangerProtection +proc checkExpectedBlock(self: var Eth2Processor) = + if self.expectedBlockReceived == nil: + return + + if self.chainDag.head.slot < self.expectedSlot: + return + + self.expectedBlockReceived.complete(true) + self.expectedBlockReceived = nil # Don't keep completed futures around! + +proc expectBlock*(self: var Eth2Processor, expectedSlot: Slot): Future[bool] = + ## Return a future that will complete when a head is selected whose slot is + ## equal or greater than the given slot, or a new expectation is created + if self.expectedBlockReceived != nil: + # Reset the old future to not leave it hanging.. an alternative would be to + # cancel it, but it doesn't make any practical difference for now + self.expectedBlockReceived.complete(false) + + let fut = newFuture[bool]("Eth2Processor.expectBlock") + self.expectedSlot = expectedSlot + self.expectedBlockReceived = fut + + # It might happen that by the time we're expecting a block, it might have + # already been processed! + self.checkExpectedBlock() + + return fut + proc updateHead*(self: var Eth2Processor, wallSlot: Slot) = ## Trigger fork choice and returns the new head block. ## Can return `nil` @@ -101,6 +130,8 @@ proc updateHead*(self: var Eth2Processor, wallSlot: Slot) = if oldFinalized != self.chainDag.finalizedHead.blck: self.attestationPool[].prune() + self.checkExpectedBlock() + proc dumpBlock[T]( self: Eth2Processor, signedBlock: SignedBeaconBlock, res: Result[T, (ValidationResult, BlockError)]) = @@ -147,10 +178,6 @@ proc storeBlock( attestationPool[].addForkChoice( epochRef, blckRef, trustedBlock.message, wallSlot) - # Trigger attestation sending - if blck.isOk and not self.blockReceivedDuringSlot.finished: - self.blockReceivedDuringSlot.complete() - self.dumpBlock(signedBlock, blck) # There can be a scenario where we receive a block we already received. @@ -568,7 +595,6 @@ proc new*(T: type Eth2Processor, exitPool: exitPool, validatorPool: validatorPool, quarantine: quarantine, - blockReceivedDuringSlot: newFuture[void](), blocksQueue: newAsyncQueue[BlockEntry](1), # limit to the max number of aggregates we expect to see in one slot aggregatesQueue: newAsyncQueue[AggregateEntry]( diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 54cfe582e..057fe403f 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -930,29 +930,27 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = node.updateGossipStatus(slot) -proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = +proc onSlotStart( + node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} = ## Called at the beginning of a slot - usually every slot, but sometimes might ## skip a few in case we're running late. + ## wallTime: current system time - we will strive to perform all duties up + ## to this point in time ## lastSlot: the last slot that we successfully processed, so we know where to - ## start work from - ## scheduledSlot: the slot that we were aiming for, in terms of timing + ## start work from - there might be jumps if processing is delayed let # The slot we should be at, according to the clock - beaconTime = node.beaconClock.now() - wallSlot = beaconTime.toSlot() + wallSlot = wallTime.slotOrZero + # If everything was working perfectly, the slot that we should be processing + expectedSlot = lastSlot + 1 finalizedEpoch = node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot() - - if not node.processor[].blockReceivedDuringSlot.finished: - node.processor[].blockReceivedDuringSlot.complete() - node.processor[].blockReceivedDuringSlot = newFuture[void]() - - let delay = beaconTime - scheduledSlot.toBeaconTime() + delay = wallTime - expectedSlot.toBeaconTime() info "Slot start", lastSlot = shortLog(lastSlot), - scheduledSlot = shortLog(scheduledSlot), - delay, + wallSlot = shortLog(wallSlot), + delay = shortLog(delay), peers = len(node.network.peerPool), head = shortLog(node.chainDag.head), headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()), @@ -963,87 +961,91 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = else: "synced" # Check before any re-scheduling of onSlotStart() - checkIfShouldStopAtEpoch(scheduledSlot, node.config.stopAtEpoch) + checkIfShouldStopAtEpoch(wallSlot, node.config.stopAtEpoch) - if not wallSlot.afterGenesis or (wallSlot.slot < lastSlot): - let - slot = - if wallSlot.afterGenesis: wallSlot.slot - else: GENESIS_SLOT - nextSlot = slot + 1 # At least GENESIS_SLOT + 1! + beacon_slot.set wallSlot.int64 + beacon_current_epoch.set wallSlot.epoch.int64 - # This can happen if the system clock changes time for example, and it's - # pretty bad - # TODO shut down? time either was or is bad, and PoS relies on accuracy.. - warn "Beacon clock time moved back, rescheduling slot actions", - beaconTime = shortLog(beaconTime), - lastSlot = shortLog(lastSlot), - scheduledSlot = shortLog(scheduledSlot), - nextSlot = shortLog(nextSlot) - - addTimer(saturate(node.beaconClock.fromNow(nextSlot))) do (p: pointer): - asyncCheck node.onSlotStart(slot, nextSlot) - - return - - let - slot = wallSlot.slot # afterGenesis == true! - nextSlot = slot + 1 - - defer: await onSlotEnd(node, slot) - - beacon_slot.set slot.int64 - beacon_current_epoch.set slot.epoch.int64 - - finalization_delay.set scheduledSlot.epoch.int64 - finalizedEpoch.int64 + finalization_delay.set wallSlot.epoch.int64 - finalizedEpoch.int64 if node.config.verifyFinalization: - verifyFinalization(node, scheduledSlot) + verifyFinalization(node, wallSlot) - if slot > lastSlot + SLOTS_PER_EPOCH: - # We've fallen behind more than an epoch - there's nothing clever we can - # do here really, except skip all the work and try again later. - # TODO how long should the period be? Using an epoch because that's roughly - # how long attestations remain interesting - # TODO should we shut down instead? clearly we're unable to keep up - warn "Unable to keep up, skipping ahead", - lastSlot = shortLog(lastSlot), - slot = shortLog(slot), - nextSlot = shortLog(nextSlot), - scheduledSlot = shortLog(scheduledSlot) + node.processor[].updateHead(wallSlot) - addTimer(saturate(node.beaconClock.fromNow(nextSlot))) do (p: pointer): - # We pass the current slot here to indicate that work should be skipped! - asyncCheck node.onSlotStart(slot, nextSlot) - return + await node.handleValidatorDuties(lastSlot, wallSlot) - # Whatever we do during the slot, we need to know the head, because this will - # give us a state to work with and thus a shuffling. - # TODO if the head is very old, that is indicative of something being very - # wrong - us being out of sync or disconnected from the network - need - # to consider what to do in that case: - # * nothing - the other parts of the application will reconnect and - # start listening to broadcasts, learn a new head etc.. - # risky, because the network might stall if everyone does - # this, because no blocks will be produced - # * shut down - this allows the user to notice and take action, but is - # kind of harsh - # * keep going - we create blocks and attestations as usual and send them - # out - if network conditions improve, fork choice should - # eventually select the correct head and the rest will - # disappear naturally - risky because user is not aware, - # and might lose stake on canonical chain but "just works" - # when reconnected.. - node.processor[].updateHead(slot) + await onSlotEnd(node, wallSlot) - # Time passes in here.. - await node.handleValidatorDuties(lastSlot, slot) +proc runSlotLoop(node: BeaconNode, startTime: BeaconTime) {.async.} = + var + curSlot = startTime.slotOrZero() + nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1 + timeToNextSlot = nextSlot.toBeaconTime() - startTime - let - nextSlotStart = saturate(node.beaconClock.fromNow(nextSlot)) + info "Scheduling first slot action", + startTime = shortLog(startTime), + nextSlot = shortLog(nextSlot), + timeToNextSlot = shortLog(timeToNextSlot) - addTimer(nextSlotStart) do (p: pointer): - asyncCheck node.onSlotStart(slot, nextSlot) + while true: + # Start by waiting for the time when the slot starts. Sleeping relinquishes + # control to other tasks which may or may not finish within the alotted + # time, so below, we need to be wary that the ship might have sailed + # already. + await sleepAsync(timeToNextSlot) + + let + wallTime = node.beaconClock.now() + wallSlot = wallTime.slotOrZero() # Always > GENESIS! + + if wallSlot < nextSlot: + # While we were sleeping, the system clock changed and time moved + # backwards! + if wallSlot + 1 < nextSlot: + # This is a critical condition where it's hard to reason about what + # to do next - we'll call the attention of the user here by shutting + # down. + fatal "System time adjusted backwards significantly - clock may be inaccurate - shutting down", + nextSlot = shortLog(nextSlot), + wallSlot = shortLog(wallSlot) + bnStatus = BeaconNodeStatus.Stopping + return + + # Time moved back by a single slot - this could be a minor adjustment, + # for example when NTP does its thing after not working for a while + warn "System time adjusted backwards, rescheduling slot actions", + wallTime = shortLog(wallTime), + nextSlot = shortLog(nextSlot), + wallSlot = shortLog(wallSlot) + + # cur & next slot remain the same + timeToNextSlot = nextSlot.toBeaconTime() - wallTime + continue + + if wallSlot > nextSlot + SLOTS_PER_EPOCH: + # Time moved forwards by more than an epoch - either the clock was reset + # or we've been stuck in processing for a long time - either way, we will + # skip ahead so that we only process the events of the last + # SLOTS_PER_EPOCH slots + warn "Time moved forwards by more than an epoch, skipping ahead", + curSlot = shortLog(curSlot), + nextSlot = shortLog(nextSlot), + wallSlot = shortLog(wallSlot) + + curSlot = wallSlot - SLOTS_PER_EPOCH + + elif wallSlot > nextSlot: + notice "Missed expected slot start, catching up", + delay = shortLog(wallTime - nextSlot.toBeaconTime()), + curSlot = shortLog(curSlot), + nextSlot = shortLog(curSlot) + + await onSlotStart(node, wallTime, curSlot) + + curSlot = wallSlot + nextSlot = wallSlot + 1 + timeToNextSlot = saturate(node.beaconClock.fromNow(nextSlot)) proc handleMissingBlocks(node: BeaconNode) = let missingBlocks = node.quarantine.checkMissing() @@ -1179,27 +1181,16 @@ proc run*(node: BeaconNode) = node.installMessageValidators() - let - curSlot = node.beaconClock.now().slotOrZero() - nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1 - fromNow = saturate(node.beaconClock.fromNow(nextSlot)) - - info "Scheduling first slot action", - beaconTime = shortLog(node.beaconClock.now()), - nextSlot = shortLog(nextSlot), - fromNow = shortLog(fromNow) - - addTimer(fromNow) do (p: pointer): - asyncCheck node.onSlotStart(curSlot, nextSlot) - - node.onSecondLoop = runOnSecondLoop(node) - node.blockProcessingLoop = node.processor.runQueueProcessingLoop() + let startTime = node.beaconClock.now() + asyncSpawn runSlotLoop(node, startTime) + asyncSpawn runOnSecondLoop(node) + asyncSpawn runQueueProcessingLoop(node.processor) node.requestManager.start() node.startSyncManager() - if not node.beaconClock.now().toSlot().afterGenesis: - node.setupDoppelgangerDetection(curSlot) + if not startTime.toSlot().afterGenesis: + node.setupDoppelgangerDetection(startTime.slotOrZero()) node.addMessageHandlers() doAssert node.getTopicSubscriptionEnabled() diff --git a/beacon_chain/validator_duties.nim b/beacon_chain/validator_duties.nim index 995e40274..8df9604a4 100644 --- a/beacon_chain/validator_duties.nim +++ b/beacon_chain/validator_duties.nim @@ -280,7 +280,6 @@ proc proposeSignedBlock*(node: BeaconNode, head: BlockRef, validator: AttachedValidator, newBlock: SignedBeaconBlock): BlockRef = - let newBlockRef = node.chainDag.addRawBlock(node.quarantine, newBlock) do ( blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock, epochRef: EpochRef, state: HashedBeaconState): @@ -393,7 +392,6 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) = attestationHeadRoot = shortLog(attestationHead.blck.root), attestationSlot = shortLog(slot) - # Collect data to send before node.stateCache grows stale var attestations: seq[tuple[ data: AttestationData, committeeLen, indexInCommittee: int, validator: AttachedValidator]] @@ -661,35 +659,53 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} = # slot_timing_entropy` seconds have elapsed since the start of the `slot` # (using the `slot_timing_entropy` generated for this slot) - # We've been doing lots of work up until now which took time. Normally, we - # send out attestations at the slot thirds-point, so we go back to the clock - # to see how much time we need to wait. - # TODO the beacon clock might jump here also. It's probably easier to complete - # the work for the whole slot using a monotonic clock instead, then deal - # with any clock discrepancies once only, at the start of slot timer - # processing.. - let slotTimingEntropy = getSlotTimingEntropy() + # Milliseconds to wait from the start of the slot before sending out + # attestations - base value + const attestationOffset = + SECONDS_PER_SLOT.int64 * 1000 div ATTESTATION_PRODUCTION_DIVISOR - template sleepToSlotOffsetWithHeadUpdate(extra: chronos.Duration, msg: static string) = - let waitTime = node.beaconClock.fromNow(slot.toBeaconTime(extra)) - if waitTime.inFuture: - discard await withTimeout( - node.processor[].blockReceivedDuringSlot, waitTime.offset) + let + slotTimingEntropy = getSlotTimingEntropy() # +/- 1s + # The latest point in time when we'll be sending out attestations + attestationCutoffTime = slot.toBeaconTime( + millis(attestationOffset + slotTimingEntropy)) + attestationCutoff = node.beaconClock.fromNow(attestationCutoffTime) - # Might have gotten a valid beacon block this slot, which triggers the - # first case, in which we wait for another abs(slotTimingEntropy). - if node.processor[].blockReceivedDuringSlot.finished: - await sleepAsync( - milliseconds(max(slotTimingEntropy, 0 - slotTimingEntropy))) + if attestationCutoff.inFuture: + debug "Waiting to send attestations", + head = shortLog(head), + attestationCutoff = shortLog(attestationCutoff.offset) - # Time passed - we might need to select a new head in that case - node.processor[].updateHead(slot) - head = node.chainDag.head + # Wait either for the block or the attestation cutoff time to arrive + if await node.processor[].expectBlock(slot).withTimeout(attestationCutoff.offset): + # The expected block arrived (or expectBlock was called again which + # shouldn't happen as this is the only place we use it) - according to the + # spec, we should now wait for abs(slotTimingEntropy) - in our async loop + # however, we might have been doing other processing that caused delays + # here so we'll cap the waiting to the time when we would have sent out + # attestations had the block not arrived. + # An opposite case is that we received (or produced) a block that has + # not yet reached our neighbours. To protect against our attestations + # being dropped (because the others have not yet seen the block), we'll + # impose a minimum delay of 250ms. The delay is enforced only when we're + # not hitting the "normal" cutoff time for sending out attestations. - sleepToSlotOffsetWithHeadUpdate( - milliseconds(SECONDS_PER_SLOT.int64 * 1000 div ATTESTATION_PRODUCTION_DIVISOR + - slotTimingEntropy), - "Waiting to send attestations") + let + afterBlockDelay = max(250, abs(slotTimingEntropy)) + afterBlockTime = node.beaconClock.now() + millis(afterBlockDelay) + afterBlockCutoff = node.beaconClock.fromNow( + min(afterBlockTime, attestationCutoffTime)) + + if afterBlockCutoff.inFuture: + debug "Got block, waiting to send attestations", + head = shortLog(head), + afterBlockCutoff = shortLog(afterBlockCutoff.offset) + + await sleepAsync(afterBlockCutoff.offset) + + # Time passed - we might need to select a new head in that case + node.processor[].updateHead(slot) + head = node.chainDag.head handleAttestations(node, head, slot) @@ -702,9 +718,13 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} = # through the slot-that is, SECONDS_PER_SLOT * 2 / 3 seconds after the start # of slot. if slot > 2: - discard await node.beaconClock.sleepToSlotOffset( - seconds(int64(SECONDS_PER_SLOT * 2) div 3), slot, - "Waiting to aggregate attestations") + let + aggregateWaitTime = node.beaconClock.fromNow( + slot.toBeaconTime(seconds(int64(SECONDS_PER_SLOT * 2) div 3))) + if aggregateWaitTime.inFuture: + debug "Waiting to send aggregate attestations", + aggregateWaitTime = shortLog(aggregateWaitTime.offset) + await sleepAsync(aggregateWaitTime.offset) await broadcastAggregatedAttestations(node, head, slot)