From 22998fdfd42a9d28c7bc8b6b1a20af5b09319ce0 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 20 Aug 2020 18:30:47 +0200 Subject: [PATCH] avoid double deserialization When blocks and attestations arrive, they are SSZ-decoded twice: once for validation and once for processing. This branch enqueues the decoded block directly for processing, avoiding the second, slow deserialization. * move processing of blocks and attestations to queue * ...and out from beacon_node * split attestation processing into attestations and aggregates * also updates metrics * clean up logging to better follow the lifetime of gossip: arrival, validation and processing * drop attestations and aggregates if there are too many * try to prioritise blocks and aggregates before single-validator attestations --- beacon_chain/beacon_node.nim | 245 ++--------------- beacon_chain/beacon_node_common.nim | 29 +- beacon_chain/eth2_network.nim | 9 +- beacon_chain/eth2_processor.nim | 404 ++++++++++++++++++++++++++++ beacon_chain/request_manager.nim | 8 +- beacon_chain/spec/validator.nim | 2 +- beacon_chain/sync_manager.nim | 30 +-- beacon_chain/time.nim | 4 + beacon_chain/validator_duties.nim | 6 +- tests/test_sync_manager.nim | 44 +-- 10 files changed, 482 insertions(+), 299 deletions(-) create mode 100644 beacon_chain/eth2_processor.nim diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index c7b587312..60d04ff0a 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -29,9 +29,10 @@ import beacon_node_common, beacon_node_types, block_pools/[spec_cache, chain_dag, quarantine, clearance, block_pools_types], nimbus_binary_common, network_metadata, - mainchain_monitor, version, ssz/[merkleization], sszdump, merkle_minimal, + mainchain_monitor, version, ssz/[merkleization], merkle_minimal, sync_protocol, request_manager, keystore_management, interop, statusbar, - sync_manager, validator_duties, validator_api, attestation_aggregation + sync_manager, validator_duties, validator_api, + ./eth2_processor const genesisFile* = "genesis.ssz" @@ -56,12 +57,6 @@ declareGauge beacon_slot, declareGauge beacon_head_slot, "Slot of the head block of the beacon chain" -# Metrics for tracking attestation and beacon block loss -declareCounter beacon_attestations_received, - "Number of beacon chain attestations received by this peer" -declareCounter beacon_blocks_received, - "Number of beacon chain blocks received by this peer" - # Finalization tracking declareGauge finalization_delay, "Epoch delay between scheduled epoch and finalized epoch" @@ -69,21 +64,8 @@ declareGauge finalization_delay, declareGauge ticks_delay, "How long does to take to run the onSecond loop" -const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf] - -declareHistogram beacon_attestation_received_seconds_from_slot_start, - "Interval between slot start and attestation reception", buckets = delayBuckets - -declareHistogram beacon_block_received_seconds_from_slot_start, - "Interval between slot start and beacon block reception", buckets = delayBuckets - -declareHistogram beacon_store_block_duration_seconds, - "storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] - logScope: topics = "beacnde" -proc onBeaconBlock(node: BeaconNode, signedBlock: SignedBeaconBlock) {.gcsafe.} - proc getStateFromSnapshot(conf: BeaconNodeConf, stateSnapshotContents: ref string): NilableBeaconStateRef = var genesisPath = conf.dataDir/genesisFile @@ -266,7 +248,7 @@ proc init*(T: type BeaconNode, topicBeaconBlocks = getBeaconBlocksTopic(enrForkId.forkDigest) topicAggregateAndProofs = getAggregateAndProofsTopic(enrForkId.forkDigest) network = createEth2Node(rng, conf, enrForkId) - + attestationPool = newClone(AttestationPool.init(chainDag, quarantine)) var res = BeaconNode( nickname: nickname, graffitiBytes: if conf.graffiti.isSome: conf.graffiti.get.GraffitiBytes @@ -278,16 +260,23 @@ proc init*(T: type BeaconNode, attachedValidators: ValidatorPool.init(), chainDag: chainDag, quarantine: quarantine, - attestationPool: AttestationPool.init(chainDag, quarantine), + attestationPool: attestationPool, mainchainMonitor: mainchainMonitor, beaconClock: BeaconClock.init(chainDag.headState.data.data), rpcServer: rpcServer, forkDigest: enrForkId.forkDigest, topicBeaconBlocks: topicBeaconBlocks, topicAggregateAndProofs: topicAggregateAndProofs, - blocksQueue: newAsyncQueue[SyncBlock](1), ) - res.requestManager = RequestManager.init(network, res.blocksQueue) + + proc getWallTime(): BeaconTime = res.beaconClock.now() + + res.processor = Eth2Processor.new( + conf, chainDag, attestationPool, quarantine, getWallTime) + + res.requestManager = RequestManager.init( + network, res.processor.blocksQueue) + res.addLocalValidators() # This merely configures the BeaconSync @@ -295,84 +284,6 @@ proc init*(T: type BeaconNode, network.initBeaconSync(chainDag, enrForkId.forkDigest) return res -proc onAttestation(node: BeaconNode, attestation: Attestation) = - # We received an attestation from the network but don't know much about it - # yet - in particular, we haven't verified that it belongs to particular chain - # we're on, or that it follows the rules of the protocol - logScope: - attestation = shortLog(attestation) - head = shortLog(node.chainDag.head) - pcs = "on_attestation" - - let - wallSlot = node.beaconClock.now().toSlot() - head = node.chainDag.head - - debug "Attestation received", - wallSlot = shortLog(wallSlot.slot) - - if not wallSlot.afterGenesis or wallSlot.slot < head.slot: - warn "Received attestation before genesis or head - clock is wrong?", - afterGenesis = wallSlot.afterGenesis, - wallSlot = shortLog(wallSlot.slot) - return - - if attestation.data.slot > head.slot and - (attestation.data.slot - head.slot) > MaxEmptySlotCount: - warn "Ignoring attestation, head block too old (out of sync?)" - return - - node.attestationPool.addAttestation(attestation, wallSlot.slot) - -proc dumpBlock[T]( - node: BeaconNode, signedBlock: SignedBeaconBlock, - res: Result[T, BlockError]) = - if node.config.dumpEnabled and res.isErr: - case res.error - of Invalid: - dump( - node.config.dumpDirInvalid, signedBlock) - of MissingParent: - dump( - node.config.dumpDirIncoming, signedBlock) - else: - discard - -proc storeBlock( - node: BeaconNode, signedBlock: SignedBeaconBlock): Result[void, BlockError] = - let start = Moment.now() - debug "Block received", - signedBlock = shortLog(signedBlock.message), - blockRoot = shortLog(signedBlock.root), - pcs = "receive_block" - - beacon_blocks_received.inc() - - {.gcsafe.}: # TODO: fork choice and quarantine should sync via messages instead of callbacks - let blck = node.chainDag.addRawBlock(node.quarantine, signedBlock) do ( - blckRef: BlockRef, signedBlock: SignedBeaconBlock, - epochRef: EpochRef, state: HashedBeaconState): - # Callback add to fork choice if valid - node.attestationPool.addForkChoice( - epochRef, blckRef, signedBlock.message, - node.beaconClock.now().slotOrZero()) - - node.dumpBlock(signedBlock, blck) - - # There can be a scenario where we receive a block we already received. - # However this block was before the last finalized epoch and so its parent - # was pruned from the ForkChoice. - if blck.isErr: - return err(blck.error) - - beacon_store_block_duration_seconds.observe((Moment.now() - start).milliseconds.float64 / 1000) - return ok() - -proc onBeaconBlock(node: BeaconNode, signedBlock: SignedBeaconBlock) = - # We received a block but don't know much about it yet - in particular, we - # don't know if it's part of the chain we're currently building. - discard node.storeBlock(signedBlock) - func verifyFinalization(node: BeaconNode, slot: Slot) = # Epoch must be >= 4 to check finalization const SETTLING_TIME_OFFSET = 1'u64 @@ -393,23 +304,12 @@ func verifyFinalization(node: BeaconNode, slot: Slot) = doAssert finalizedEpoch + 4 >= epoch proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) = - proc attestationHandler(attestation: Attestation) = - # Avoid double-counting attestation-topic attestations on shared codepath - # when they're reflected through beacon blocks - beacon_attestations_received.inc() - beacon_attestation_received_seconds_from_slot_start.observe( - node.beaconClock.now.int64 - attestation.data.slot.toBeaconTime.int64) - - node.onAttestation(attestation) - var attestationSubscriptions: seq[Future[void]] = @[] # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#attestations-and-aggregation for subnet in subnets: attestationSubscriptions.add(node.network.subscribe( - getAttestationTopic(node.forkDigest, subnet), - attestationHandler, - )) + getAttestationTopic(node.forkDigest, subnet))) waitFor allFutures(attestationSubscriptions) @@ -636,37 +536,6 @@ proc runOnSecondLoop(node: BeaconNode) {.async.} = ticks_delay.set(sleepTime.nanoseconds.float / nanosecondsIn1s) debug "onSecond task completed", sleepTime, processingTime -proc importBlock(node: BeaconNode, - sblock: SignedBeaconBlock): Result[void, BlockError] = - let sm1 = now(chronos.Moment) - let res = node.storeBlock(sblock) - let em1 = now(chronos.Moment) - if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}): - let sm2 = now(chronos.Moment) - discard node.updateHead(node.beaconClock.now().slotOrZero) - let em2 = now(chronos.Moment) - let storeBlockDuration = if res.isOk(): em1 - sm1 else: ZeroDuration - let updateHeadDuration = if res.isOk(): em2 - sm2 else: ZeroDuration - let overallDuration = if res.isOk(): em2 - sm1 else: ZeroDuration - let storeSpeed = - block: - let secs = float(chronos.seconds(1).nanoseconds) - if not(overallDuration.isZero()): - let v = secs / float(overallDuration.nanoseconds) - round(v * 10_000) / 10_000 - else: - 0.0 - debug "Block got imported successfully", - local_head_slot = node.chainDag.head.slot, store_speed = storeSpeed, - block_root = shortLog(sblock.root), - block_slot = sblock.message.slot, - store_block_duration = $storeBlockDuration, - update_head_duration = $updateHeadDuration, - overall_duration = $overallDuration - ok() - else: - err(res.error()) - proc startSyncManager(node: BeaconNode) = func getLocalHeadSlot(): Slot = node.chainDag.head.slot @@ -696,16 +565,10 @@ proc startSyncManager(node: BeaconNode) = node.syncManager = newSyncManager[Peer, PeerID]( node.network.peerPool, getLocalHeadSlot, getLocalWallSlot, - getFirstSlotAtFinalizedEpoch, node.blocksQueue, chunkSize = 32 + getFirstSlotAtFinalizedEpoch, node.processor.blocksQueue, chunkSize = 32 ) node.syncManager.start() -proc runBlockProcessingLoop(node: BeaconNode) {.async.} = - ## Incoming blocks processing loop. - while true: - let sblock = await node.blocksQueue.popFirst() - sblock.complete(node.importBlock(sblock.blk)) - proc currentSlot(node: BeaconNode): Slot = node.beaconClock.now.slotOrZero @@ -833,20 +696,6 @@ proc installRpcHandlers(rpcServer: RpcServer, node: BeaconNode) = rpcServer.installDebugApiHandlers(node) proc installMessageValidators(node: BeaconNode) = - proc attestationValidator(attestation: Attestation, - committeeIndex: uint64): bool = - let (afterGenesis, slot) = node.beaconClock.now().toSlot() - if not afterGenesis: - return false - node.attestationPool.isValidAttestation(attestation, slot, committeeIndex) - - proc aggregatedAttestationValidator( - signedAggregateAndProof: SignedAggregateAndProof): bool = - let (afterGenesis, slot) = node.beaconClock.now().toSlot() - if not afterGenesis: - return false - node.attestationPool.isValidAggregatedAttestation(signedAggregateAndProof, slot) - # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#attestations-and-aggregation # These validators stay around the whole time, regardless of which specific # subnets are subscribed to during any given epoch. @@ -857,59 +706,19 @@ proc installMessageValidators(node: BeaconNode) = getAttestationTopic(node.forkDigest, ci), # This proc needs to be within closureScope; don't lift out of loop. proc(attestation: Attestation): bool = - attestationValidator(attestation, ci) - ) + node.processor[].attestationValidator(attestation, ci)) node.network.addValidator( getAggregateAndProofsTopic(node.forkDigest), proc(signedAggregateAndProof: SignedAggregateAndProof): bool = - aggregatedAttestationValidator(signedAggregateAndProof)) + node.processor[].aggregateValidator(signedAggregateAndProof)) - node.network.addValidator(node.topicBeaconBlocks) do (signedBlock: SignedBeaconBlock) -> bool: - let - now = node.beaconClock.now - (afterGenesis, slot) = now.toSlot() - - if not afterGenesis: - return false - - logScope: - blk = shortLog(signedBlock.message) - root = shortLog(signedBlock.root) - - let isKnown = signedBlock.root in node.chainDag.blocks - if isKnown: - trace "Received known gossip block" - # TODO: - # Potentially use a fast exit here. We only need to check that - # the contents of the incoming message match our previously seen - # version of the block. We don't need to use HTR for this - for - # better efficiency we can use vanilla SHA256 or direct comparison - # if we still have the previous block in memory. - # TODO: - # We are seeing extreme delays sometimes (e.g. 300 seconds). - # Should we drop such blocks? The spec doesn't set a policy on this. - else: - let delay = (now.int64 - signedBlock.message.slot.toBeaconTime.int64) - debug "Incoming gossip block", delay - beacon_block_received_seconds_from_slot_start.observe delay - - let blck = node.chainDag.isValidBeaconBlock(node.quarantine, - signedBlock, slot, {}) - node.dumpBlock(signedBlock, blck) - - blck.isOk + node.network.addValidator( + node.topicBeaconBlocks, + proc (signedBlock: SignedBeaconBlock): bool = + node.processor[].blockValidator(signedBlock)) proc getAttestationHandlers(node: BeaconNode): Future[void] = - proc attestationHandler(attestation: Attestation) = - # Avoid double-counting attestation-topic attestations on shared codepath - # when they're reflected through beacon blocks - beacon_attestations_received.inc() - beacon_attestation_received_seconds_from_slot_start.observe( - node.beaconClock.now.int64 - attestation.data.slot.toBeaconTime.int64) - - node.onAttestation(attestation) - var initialSubnets: set[uint8] for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: initialSubnets.incl i @@ -925,16 +734,12 @@ proc getAttestationHandlers(node: BeaconNode): Future[void] = node.attestationSubnets.subscribedSubnets[1 - (GENESIS_EPOCH mod 2)] = initialSubnets - node.network.subscribe( - getAggregateAndProofsTopic(node.forkDigest), - proc(signedAggregateAndProof: SignedAggregateAndProof) = - attestationHandler(signedAggregateAndProof.message.aggregate)) + node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest)) proc addMessageHandlers(node: BeaconNode) = waitFor allFutures( # As a side-effect, this gets the attestation subnets too. - node.network.subscribe(node.topicBeaconBlocks) do (signedBlock: SignedBeaconBlock): - onBeaconBlock(node, signedBlock), + node.network.subscribe(node.topicBeaconBlocks), node.getAttestationHandlers() ) @@ -987,7 +792,7 @@ proc run*(node: BeaconNode) = asyncCheck node.onSlotStart(curSlot, nextSlot) node.onSecondLoop = runOnSecondLoop(node) - node.blockProcessingLoop = runBlockProcessingLoop(node) + node.blockProcessingLoop = node.processor.runQueueProcessingLoop() node.requestManager.start() node.startSyncManager() diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index 78858b147..8af68be64 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -21,7 +21,8 @@ import attestation_pool, eth2_network, block_pools/[chain_dag, quarantine], beacon_node_types, mainchain_monitor, request_manager, - sync_manager + sync_manager, + ./eth2_processor # This removes an invalid Nim warning that the digest module is unused here # It's currently used for `shortLog(head.blck.root)` @@ -41,12 +42,11 @@ type attachedValidators*: ValidatorPool chainDag*: ChainDAGRef quarantine*: QuarantineRef - attestationPool*: AttestationPool + attestationPool*: ref AttestationPool mainchainMonitor*: MainchainMonitor beaconClock*: BeaconClock rpcServer*: RpcServer forkDigest*: ForkDigest - blocksQueue*: AsyncQueue[SyncBlock] requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerID] topicBeaconBlocks*: string @@ -55,33 +55,14 @@ type onSecondLoop*: Future[void] genesisSnapshotContent*: string attestationSubnets*: AttestationSubnets + processor*: ref Eth2Processor const MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT # Metrics -declareGauge beacon_head_root, - "Root of the head block of the beacon chain" - proc updateHead*(node: BeaconNode, wallSlot: Slot): BlockRef = - # Check pending attestations - maybe we found some blocks for them - node.attestationPool.resolve(wallSlot) - - # Grab the new head according to our latest attestation data - let newHead = node.attestationPool.selectHead(wallSlot) - - # Store the new head in the chain DAG - this may cause epochs to be - # justified and finalized - let oldFinalized = node.chainDag.finalizedHead.blck - - node.chainDag.updateHead(newHead) - beacon_head_root.set newHead.root.toGaugeValue - - # Cleanup the fork choice v2 if we have a finalized head - if oldFinalized != node.chainDag.finalizedHead.blck: - node.attestationPool.prune() - - newHead + node.processor[].updateHead(wallSlot) template findIt*(s: openarray, predicate: untyped): int = var res = -1 diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 017dc6c22..3b3150855 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -953,7 +953,8 @@ proc stop*(node: Eth2Node) {.async.} = timeout = 5.seconds completed = await withTimeout(allFutures(waitedFutures), timeout) if not completed: - trace "Eth2Node.stop(): timeout reached", timeout, futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) + trace "Eth2Node.stop(): timeout reached", timeout, + futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer = new result @@ -1259,6 +1260,12 @@ proc subscribe*[MsgType](node: Eth2Node, await node.pubsub.subscribe(topic & "_snappy", execMsgHandler) +proc subscribe*(node: Eth2Node, topic: string) {.async, gcsafe.} = + proc dummyMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard + + await node.pubsub.subscribe(topic & "_snappy", dummyMsgHandler) + proc addValidator*[MsgType](node: Eth2Node, topic: string, msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) = diff --git a/beacon_chain/eth2_processor.nim b/beacon_chain/eth2_processor.nim new file mode 100644 index 000000000..d32e11fb5 --- /dev/null +++ b/beacon_chain/eth2_processor.nim @@ -0,0 +1,404 @@ +import + std/[math, tables], + stew/results, + chronicles, chronicles/chronos_tools, chronos, metrics, + ./spec/[crypto, datatypes, digest], + ./block_pools/[clearance, chain_dag], + ./attestation_aggregation, + ./beacon_node_types, ./attestation_pool, + ./time, ./conf, ./sszdump + +# Metrics for tracking attestation and beacon block loss +declareCounter beacon_attestations_received, + "Number of beacon chain attestations received by this peer" +declareCounter beacon_aggregates_received, + "Number of beacon chain aggregate attestations received by this peer" +declareCounter beacon_blocks_received, + "Number of beacon chain blocks received by this peer" + +const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf] + +declareHistogram beacon_attestation_delay, + "Time(s) between slot start and attestation reception", buckets = delayBuckets + +declareHistogram beacon_aggregate_delay, + "Time(s) between slot start and aggregate reception", buckets = delayBuckets + +declareHistogram beacon_block_delay, + "Time(s) between slot start and beacon block reception", buckets = delayBuckets + +declareHistogram beacon_store_block_duration_seconds, + "storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] + +declareGauge beacon_head_root, + "Root of the head block of the beacon chain" + +type + GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].} + + Entry[T] = object + v*: T + + SyncBlock* = object + blk*: SignedBeaconBlock + resfut*: Future[Result[void, BlockError]] + + BlockEntry* = Entry[SyncBlock] + AttestationEntry* = Entry[Attestation] + AggregateEntry* = Entry[Attestation] + + Eth2Processor* = object + config*: BeaconNodeConf + getWallTime*: GetWallTimeFn + chainDag*: ChainDAGRef + attestationPool*: ref AttestationPool + quarantine*: QuarantineRef + + blocksQueue*: AsyncQueue[BlockEntry] + attestationsQueue*: AsyncQueue[AttestationEntry] + aggregatesQueue*: AsyncQueue[AggregateEntry] + +proc updateHead*(self: var Eth2Processor, wallSlot: Slot): BlockRef = + # Check pending attestations - maybe we found some blocks for them + self.attestationPool[].resolve(wallSlot) + + # Grab the new head according to our latest attestation data + let newHead = self.attestationPool[].selectHead(wallSlot) + + # Store the new head in the chain DAG - this may cause epochs to be + # justified and finalized + let oldFinalized = self.chainDag.finalizedHead.blck + + self.chainDag.updateHead(newHead) + beacon_head_root.set newHead.root.toGaugeValue + + # Cleanup the fork choice v2 if we have a finalized head + if oldFinalized != self.chainDag.finalizedHead.blck: + self.attestationPool[].prune() + + newHead + +proc dumpBlock[T]( + self: Eth2Processor, signedBlock: SignedBeaconBlock, + res: Result[T, BlockError]) = + if self.config.dumpEnabled and res.isErr: + case res.error + of Invalid: + dump( + self.config.dumpDirInvalid, signedBlock) + of MissingParent: + dump( + self.config.dumpDirIncoming, signedBlock) + else: + discard + +proc done*(blk: SyncBlock) = + ## Send signal to [Sync/Request]Manager that the block ``blk`` has passed + ## verification successfully. + if blk.resfut != nil: + blk.resfut.complete(Result[void, BlockError].ok()) + +proc fail*(blk: SyncBlock, error: BlockError) = + ## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed + ## verification with specific ``error``. + if blk.resfut != nil: + blk.resfut.complete(Result[void, BlockError].err(error)) + +proc complete*(blk: SyncBlock, res: Result[void, BlockError]) {.inline.} = + ## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk`` + ## verification. + if blk.resfut != nil: + blk.resfut.complete(res) + +proc storeBlock( + self: var Eth2Processor, signedBlock: SignedBeaconBlock, + wallSlot: Slot): Result[void, BlockError] = + let + start = Moment.now() + attestationPool = self.attestationPool + + {.gcsafe.}: # TODO: fork choice and quarantine should sync via messages instead of callbacks + let blck = self.chainDag.addRawBlock(self.quarantine, signedBlock) do ( + blckRef: BlockRef, signedBlock: SignedBeaconBlock, + epochRef: EpochRef, state: HashedBeaconState): + # Callback add to fork choice if valid + attestationPool[].addForkChoice( + epochRef, blckRef, signedBlock.message, wallSlot) + + self.dumpBlock(signedBlock, blck) + + # There can be a scenario where we receive a block we already received. + # However this block was before the last finalized epoch and so its parent + # was pruned from the ForkChoice. + if blck.isErr: + return err(blck.error) + + beacon_store_block_duration_seconds.observe((Moment.now() - start).milliseconds.float64 / 1000) + return ok() + +proc processAttestation( + self: var Eth2Processor, entry: AttestationEntry) = + logScope: + signature = shortLog(entry.v.signature) + + let + wallTime = self.getWallTime() + (afterGenesis, wallSlot) = wallTime.toSlot() + + if not afterGenesis: + error "Processing attestation before genesis, clock turned back?" + quit 1 + + debug "Processing attestation" + self.attestationPool[].addAttestation(entry.v, wallSlot) + +proc processAggregate( + self: var Eth2Processor, entry: AggregateEntry) = + logScope: + signature = shortLog(entry.v.signature) + + let + wallTime = self.getWallTime() + (afterGenesis, wallSlot) = wallTime.toSlot() + + if not afterGenesis: + error "Processing aggregate before genesis, clock turned back?" + quit 1 + + debug "Processing aggregate" + self.attestationPool[].addAttestation(entry.v, wallSlot) + +proc processBlock(self: var Eth2Processor, entry: BlockEntry) = + logScope: + blockRoot = shortLog(entry.v.blk.root) + + let + wallTime = self.getWallTime() + (afterGenesis, wallSlot) = wallTime.toSlot() + + if not afterGenesis: + error "Processing block before genesis, clock turned back?" + quit 1 + + let + start = now(chronos.Moment) + res = self.storeBlock(entry.v.blk, wallSlot) + storeDone = now(chronos.Moment) + + if res.isOk(): + # Eagerly update head in case the new block gets selected + discard self.updateHead(wallSlot) + let updateDone = now(chronos.Moment) + let storeBlockDuration = storeDone - start + let updateHeadDuration = updateDone - storeDone + let overallDuration = updateDone - start + let storeSpeed = + block: + let secs = float(chronos.seconds(1).nanoseconds) + if not(overallDuration.isZero()): + let v = secs / float(overallDuration.nanoseconds) + round(v * 10_000) / 10_000 + else: + 0.0 + debug "Block processed", + local_head_slot = self.chainDag.head.slot, + store_speed = storeSpeed, + block_slot = entry.v.blk.message.slot, + store_block_duration = $storeBlockDuration, + update_head_duration = $updateHeadDuration, + overall_duration = $overallDuration + + if entry.v.resFut != nil: + entry.v.resFut.complete(Result[void, BlockError].ok()) + elif res.error() in {BlockError.Duplicate, BlockError.Old}: + # These are harmless / valid outcomes - for the purpose of scoring peers, + # they are ok + if entry.v.resFut != nil: + entry.v.resFut.complete(Result[void, BlockError].ok()) + else: + if entry.v.resFut != nil: + entry.v.resFut.complete(Result[void, BlockError].err(res.error())) + +proc blockValidator*( + self: var Eth2Processor, + signedBlock: SignedBeaconBlock): bool = + logScope: + signedBlock = shortLog(signedBlock.message) + blockRoot = shortLog(signedBlock.root) + + let + wallTime = self.getWallTime() + (afterGenesis, wallSlot) = wallTime.toSlot() + + if not afterGenesis: + return false + + logScope: wallSlot + + let delay = wallTime - signedBlock.message.slot.toBeaconTime + + if signedBlock.root in self.chainDag.blocks: + # The gossip algorithm itself already does one round of hashing to find + # already-seen data, but it is fairly aggressive about forgetting about + # what it has seen already + debug "Dropping already-seen gossip block", delay + # TODO: + # Potentially use a fast exit here. We only need to check that + # the contents of the incoming message match our previously seen + # version of the block. We don't need to use HTR for this - for + # better efficiency we can use vanilla SHA256 or direct comparison + # if we still have the previous block in memory. + return false + + # Start of block processing - in reality, we have already gone through SSZ + # decoding at this stage, which may be significant + debug "Block received", delay + + let blck = self.chainDag.isValidBeaconBlock( + self.quarantine, signedBlock, wallSlot, {}) + + self.dumpBlock(signedBlock, blck) + + if not blck.isOk: + return false + + # Block passed validation - enqueue it for processing. The block processing + # queue is effectively unbounded as we use a freestanding task to enqueue + # the block - this is done so that when blocks arrive concurrently with + # sync, we don't lose the gossip blocks, but also don't block the gossip + # propagation of seemingly good blocks + debug "Block validated" + traceAsyncErrors self.blocksQueue.addLast( + BlockEntry(v: SyncBlock(blk: signedBlock))) + + beacon_block_delay.observe(float(milliseconds(delay)) / 1000.0) + + true + +proc attestationValidator*( + self: var Eth2Processor, + attestation: Attestation, + committeeIndex: uint64): bool = + logScope: + attestation = shortLog(attestation) + committeeIndex + + let + wallTime = self.getWallTime() + (afterGenesis, wallSlot) = wallTime.toSlot() + + if not afterGenesis: + notice "Attestation before genesis" + return false + + logScope: wallSlot + + let delay = wallTime - attestation.data.slot.toBeaconTime + debug "Attestation received", delay + if not self.attestationPool[].isValidAttestation( + attestation, wallSlot, committeeIndex): + return false # logged in validation + + beacon_attestations_received.inc() + beacon_attestation_delay.observe(float(milliseconds(delay)) / 1000.0) + + while self.attestationsQueue.full(): + let dropped = self.attestationsQueue.popFirst() + doAssert dropped.finished, "popFirst sanity" + notice "Queue full, dropping attestation", + dropped = shortLog(dropped.read().v) + + debug "Attestation validated" + traceAsyncErrors self.attestationsQueue.addLast( + AttestationEntry(v: attestation)) + + true + +proc aggregateValidator*( + self: var Eth2Processor, + signedAggregateAndProof: SignedAggregateAndProof): bool = + logScope: + aggregate = shortLog(signedAggregateAndProof.message.aggregate) + signature = shortLog(signedAggregateAndProof.signature) + + let + wallTime = self.getWallTime() + (afterGenesis, wallSlot) = wallTime.toSlot() + + if not afterGenesis: + notice "Aggregate before genesis" + return false + + logScope: wallSlot + + let delay = + wallTime - signedAggregateAndProof.message.aggregate.data.slot.toBeaconTime + debug "Aggregate received", delay + + if not self.attestationPool[].isValidAggregatedAttestation( + signedAggregateAndProof, wallSlot): + return false + + beacon_aggregates_received.inc() + beacon_aggregate_delay.observe(float(milliseconds(delay)) / 1000.0) + + while self.aggregatesQueue.full(): + let dropped = self.aggregatesQueue.popFirst() + doAssert dropped.finished, "popFirst sanity" + notice "Queue full, dropping aggregate", + dropped = shortLog(dropped.read().v) + + debug "Aggregate validated" + traceAsyncErrors self.aggregatesQueue.addLast(AggregateEntry( + v: signedAggregateAndProof.message.aggregate)) + + true + +proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} = + # Blocks in eth2 arrive on a schedule for every slot: + # + # * Block arrives at time 0 + # * Attestations arrives at time 4 + # * Aggregate arrives at time 8 + + var + blockFut = self[].blocksQueue.popFirst() + aggregateFut = self[].aggregatesQueue.popFirst() + attestationFut = self[].attestationsQueue.popFirst() + + while true: + trace "Waiting for processing work" + await blockFut or aggregateFut or attestationFut + + while blockFut.finished: + # TODO await here _hopefully_ yields to the event loop allowing another + # queue put to complete + self[].processBlock(await blockFut) + blockFut = self[].blocksQueue.popFirst() + + if aggregateFut.finished: + self[].processAggregate(await aggregateFut) + aggregateFut = self[].aggregatesQueue.popFirst() + continue + + if attestationFut.finished: + self[].processAttestation(await attestationFut) + attestationFut = self[].attestationsQueue.popFirst() + continue + +proc new*(T: type Eth2Processor, + config: BeaconNodeConf, + chainDag: ChainDAGRef, + attestationPool: ref AttestationPool, + quarantine: QuarantineRef, + getWallTime: GetWallTimeFn): ref Eth2Processor = + (ref Eth2Processor)( + config: config, + getWallTime: getWallTime, + chainDag: chainDag, + attestationPool: attestationPool, + quarantine: quarantine, + blocksQueue: newAsyncQueue[BlockEntry](1), + aggregatesQueue: newAsyncQueue[AggregateEntry](MAX_ATTESTATIONS.int), + attestationsQueue: newAsyncQueue[AttestationEntry](TARGET_COMMITTEE_SIZE.int * 4), + ) diff --git a/beacon_chain/request_manager.nim b/beacon_chain/request_manager.nim index f908757fc..ed4cb7445 100644 --- a/beacon_chain/request_manager.nim +++ b/beacon_chain/request_manager.nim @@ -1,7 +1,7 @@ import options, sequtils, strutils import chronos, chronicles import spec/[datatypes, digest], eth2_network, beacon_node_types, sync_protocol, - sync_manager, ssz/merkleization + sync_manager, ssz/merkleization, ./eth2_processor export sync_manager logScope: @@ -18,7 +18,7 @@ type RequestManager* = object network*: Eth2Node inpQueue*: AsyncQueue[FetchRecord] - outQueue*: AsyncQueue[SyncBlock] + outQueue*: AsyncQueue[BlockEntry] loopFuture: Future[void] func shortLog*(x: seq[Eth2Digest]): string = @@ -28,7 +28,7 @@ func shortLog*(x: seq[FetchRecord]): string = "[" & x.mapIt(shortLog(it.root)).join(", ") & "]" proc init*(T: type RequestManager, network: Eth2Node, - outputQueue: AsyncQueue[SyncBlock]): RequestManager = + outputQueue: AsyncQueue[BlockEntry]): RequestManager = RequestManager( network: network, inpQueue: newAsyncQueue[FetchRecord](), @@ -55,7 +55,7 @@ proc validate(rman: RequestManager, blk: b, resfut: newFuture[Result[void, BlockError]]("request.manager.validate") ) - await rman.outQueue.addLast(sblock) + await rman.outQueue.addLast(BlockEntry(v: sblock)) return await sblock.resfut proc fetchAncestorBlocksFromNetwork(rman: RequestManager, diff --git a/beacon_chain/spec/validator.nim b/beacon_chain/spec/validator.nim index 874cd80af..901945f21 100644 --- a/beacon_chain/spec/validator.nim +++ b/beacon_chain/spec/validator.nim @@ -303,7 +303,7 @@ func get_beacon_proposer_index*(state: BeaconState, cache: var StateCache, slot: # active validator indices are kept in cache but sorting them takes # quite a while indices = get_active_validator_indices(state, epoch) - start = slot.epoch().compute_start_slot_at_epoch() + start = epoch.compute_start_slot_at_epoch() var res: Option[ValidatorIndex] for i in 0..