diff --git a/beacon_chain/attestation_pool.nim b/beacon_chain/attestation_pool.nim index 1c3821a72..6353504ee 100644 --- a/beacon_chain/attestation_pool.nim +++ b/beacon_chain/attestation_pool.nim @@ -15,7 +15,8 @@ import # Internal ./spec/[beaconstate, datatypes, crypto, digest, helpers], ssz/merkleization, - ./block_pools/[spec_cache, chain_dag, clearance], ./beacon_node_types, + ./block_pools/[spec_cache, chain_dag, clearance, quarantine], + ./beacon_node_types, ./fork_choice/fork_choice export beacon_node_types, sets @@ -355,7 +356,15 @@ proc selectHead*(pool: var AttestationPool, wallSlot: Slot): BlockRef = error "Couldn't select head", err = newHead.error nil else: - pool.chainDag.getRef(newHead.get()) + let ret = pool.chainDag.getRef(newHead.get()) + if ret.isNil: + # This should normally not happen, but if the chain dag and fork choice + # get out of sync, we'll need to try to download the selected head - in + # the meantime, return nil to indicate that no new head was chosen + warn "Fork choice selected unknown head, trying to sync", root = newHead.get() + pool.quarantine.addMissing(newHead.get()) + + ret proc prune*(pool: var AttestationPool) = if (let v = pool.forkChoice.prune(); v.isErr): diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 592b69e2e..8275029e1 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -47,8 +47,6 @@ template init(T: type RpcHttpServer, ip: ValidIpAddress, port: Port): T = # https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics declareGauge beacon_slot, "Latest slot of the beacon chain state" -declareGauge beacon_head_slot, - "Slot of the head block of the beacon chain" # Finalization tracking declareGauge finalization_delay, @@ -517,9 +515,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = # Whatever we do during the slot, we need to know the head, because this will # give us a state to work with and thus a shuffling. - # TODO typically, what consitutes correct actions stays constant between slot - # updates and is stable across some epoch transitions as well - see how - # we can avoid recalculating everything here # TODO if the head is very old, that is indicative of something being very # wrong - us being out of sync or disconnected from the network - need # to consider what to do in that case: @@ -535,11 +530,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = # disappear naturally - risky because user is not aware, # and might lose stake on canonical chain but "just works" # when reconnected.. - discard node.updateHead(slot) - - # TODO is the slot of the clock or the head block more interesting? provide - # rationale in comment - beacon_head_slot.set slot.int64 + node.processor[].updateHead(slot) # Time passes in here.. await node.handleValidatorDuties(lastSlot, slot) diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index fc8a4a873..987d29f79 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -8,25 +8,23 @@ # Common routines for a BeaconNode and a BeaconValidator node import - # Standard library - tables, osproc, + std/osproc, # Nimble packages - chronos, json_rpc/rpcserver, metrics, - chronicles, + chronos, json_rpc/rpcserver, # Local modules - spec/[datatypes, crypto, digest], - conf, time, beacon_chain_db, - attestation_pool, eth2_network, - block_pools/[chain_dag, quarantine], - beacon_node_types, mainchain_monitor, request_manager, - sync_manager, - ./eth2_processor + ./conf, ./time, ./beacon_chain_db, ./attestation_pool, ./eth2_network, + ./beacon_node_types, ./mainchain_monitor, ./request_manager, + ./sync_manager, ./eth2_processor, + ./block_pools/[chain_dag, quarantine], + ./spec/datatypes -# This removes an invalid Nim warning that the digest module is unused here -# It's currently used for `shortLog(head.blck.root)` -type Eth2Digest = digest.Eth2Digest +export + osproc, chronos, rpcserver, conf, time, beacon_chain_db, + attestation_pool, eth2_network, beacon_node_types, mainchain_monitor, + request_manager, sync_manager, eth2_processor, chain_Dag, quarantine, + datatypes type RpcServer* = RpcHttpServer @@ -62,12 +60,6 @@ type const MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT -# Metrics -proc updateHead*(node: BeaconNode, wallSlot: Slot): BlockRef = - ## Trigger fork choice and returns the new head block. - ## Can return `nil` - node.processor[].updateHead(wallSlot) - # TODO stew/sequtils2 template findIt*(s: openarray, predicate: untyped): int = var res = -1 diff --git a/beacon_chain/eth2_processor.nim b/beacon_chain/eth2_processor.nim index a0b7a503d..83679ed82 100644 --- a/beacon_chain/eth2_processor.nim +++ b/beacon_chain/eth2_processor.nim @@ -36,8 +36,9 @@ declareHistogram beacon_block_delay, declareHistogram beacon_store_block_duration_seconds, "storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] -declareGauge beacon_head_root, - "Root of the head block of the beacon chain" +# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#interop-metrics +declareGauge beacon_head_root, "Root of the head block of the beacon chain" +declareGauge beacon_head_slot, "Slot of the head block of the beacon chain" type GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].} @@ -67,13 +68,15 @@ type attestationsQueue*: AsyncQueue[AttestationEntry] aggregatesQueue*: AsyncQueue[AggregateEntry] -proc updateHead*(self: var Eth2Processor, wallSlot: Slot): BlockRef = +proc updateHead*(self: var Eth2Processor, wallSlot: Slot) = ## Trigger fork choice and returns the new head block. ## Can return `nil` # Grab the new head according to our latest attestation data let newHead = self.attestationPool[].selectHead(wallSlot) if newHead.isNil(): - return nil + warn "Head selection failed, using previous head", + head = shortLog(self.chainDag.head), wallSlot + return # Store the new head in the chain DAG - this may cause epochs to be # justified and finalized @@ -81,13 +84,12 @@ proc updateHead*(self: var Eth2Processor, wallSlot: Slot): BlockRef = self.chainDag.updateHead(newHead, self.quarantine) beacon_head_root.set newHead.root.toGaugeValue + beacon_head_slot.set newHead.slot.int64 # Cleanup the fork choice v2 if we have a finalized head if oldFinalized != self.chainDag.finalizedHead.blck: self.attestationPool[].prune() - newHead - proc dumpBlock[T]( self: Eth2Processor, signedBlock: SignedBeaconBlock, res: Result[T, (ValidationResult, BlockError)]) = @@ -198,7 +200,8 @@ proc processBlock(self: var Eth2Processor, entry: BlockEntry) = if res.isOk(): # Eagerly update head in case the new block gets selected - discard self.updateHead(wallSlot) + self.updateHead(wallSlot) + let updateDone = now(chronos.Moment) let storeBlockDuration = storeDone - start let updateHeadDuration = updateDone - storeDone diff --git a/beacon_chain/validator_duties.nim b/beacon_chain/validator_duties.nim index bfd0fd64f..3935493b8 100644 --- a/beacon_chain/validator_duties.nim +++ b/beacon_chain/validator_duties.nim @@ -477,18 +477,15 @@ proc broadcastAggregatedAttestations( attestation = shortLog(signedAP.message.aggregate), validator = shortLog(curr[0].v) -proc handleValidatorDuties*( - node: BeaconNode, lastSlot, slot: Slot) {.async.} = +proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} = ## Perform validator duties - create blocks, vote and aggregate existing votes - let maybeHead = node.updateHead(slot) - if maybeHead.isNil(): - error "Couldn't update head - cannot proceed with validator duties" - return - var head = maybeHead if node.attachedValidators.count == 0: # Nothing to do because we have no validator attached return + # The chainDag head might be updated by sync while we're working due to the + # await calls, thus we use a local variable to keep the logic straight here + var head = node.chainDag.head if not node.isSynced(head): notice "Node out of sync, skipping validator duties", slot, headSlot = head.slot @@ -499,9 +496,6 @@ proc handleValidatorDuties*( # Start by checking if there's work we should have done in the past that we # can still meaningfully do while curSlot < slot: - # TODO maybe even collect all work synchronously to avoid unnecessary - # state rewinds while waiting for async operations like validator - # signature.. notice "Catching up on validator duties", curSlot = shortLog(curSlot), lastSlot = shortLog(lastSlot), @@ -510,14 +504,12 @@ proc handleValidatorDuties*( # For every slot we're catching up, we'll propose then send # attestations - head should normally be advancing along the same branch # in this case - # TODO what if we receive blocks / attestations while doing this work? head = await handleProposal(node, head, curSlot) # For each slot we missed, we need to send out attestations - if we were # proposing during this time, we'll use the newly proposed head, else just # keep reusing the same - the attestation that goes out will actually # rewind the state to what it looked like at the time of that slot - # TODO smells like there's an optimization opportunity here handleAttestations(node, head, curSlot) curSlot += 1 @@ -541,11 +533,8 @@ proc handleValidatorDuties*( template sleepToSlotOffsetWithHeadUpdate(extra: chronos.Duration, msg: static string) = if await node.beaconClock.sleepToSlotOffset(extra, slot, msg): # Time passed - we might need to select a new head in that case - let maybeHead = node.updateHead(slot) - if not maybeHead.isNil(): - head = maybeHead - else: - error "Couldn't update head" + node.processor[].updateHead(slot) + head = node.chainDag.head sleepToSlotOffsetWithHeadUpdate( seconds(int64(SECONDS_PER_SLOT)) div 3, "Waiting to send attestations")