From bf6ad41d7dfd0899527a0374009a3fcf2a32361b Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 19 Oct 2021 17:20:55 +0200 Subject: [PATCH] add drop and sync committee metrics * use storeBlock for processing API blocks * avoid double block dump * count all gossip metrics at the same spot * simplify block broadcast --- .../block_clearance.nim | 3 +- .../gossip_processing/block_processor.nim | 10 +- .../gossip_processing/eth2_processor.nim | 95 ++++++++++---- .../gossip_processing/gossip_validation.nim | 123 +++++++++--------- beacon_chain/networking/eth2_network.nim | 41 +++--- beacon_chain/rpc/rpc_beacon_api.nim | 4 +- beacon_chain/spec/datatypes/altair.nim | 16 +++ beacon_chain/validators/validator_duties.nim | 48 ++----- 8 files changed, 189 insertions(+), 151 deletions(-) diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index b3920f5c2..d4f39ae81 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -148,8 +148,7 @@ proc addResolvedBlock( merge.TrustedSignedBeaconBlock, parent: BlockRef, cache: var StateCache, onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded, - stateDataDur, sigVerifyDur, - stateVerifyDur: Duration + stateDataDur, sigVerifyDur, stateVerifyDur: Duration ) = doAssert getStateField(state.data, slot) == trustedBlock.message.slot, "state must match block" diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 07ab5e8a4..4c7b95712 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -108,8 +108,8 @@ proc addBlock*( validationDur = Duration()) = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: - # There is no backpressure here - producers must wait for the future in the - # BlockEntry to constrain their own processing + # There is no backpressure here - producers must wait for `resfut` to + # constrain their own processing # Producers: # - Gossip (when synced) # - SyncManager (during sync) @@ -144,11 +144,11 @@ proc dumpBlock*[T]( else: discard -proc storeBlock( +proc storeBlock*( self: var BlockProcessor, signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock | merge.SignedBeaconBlock, - wallSlot: Slot): Result[void, BlockError] = + wallSlot: Slot): Result[BlockRef, BlockError] = let attestationPool = self.consensusManager.attestationPool @@ -167,7 +167,7 @@ proc storeBlock( # was pruned from the ForkChoice. if blck.isErr: return err(blck.error[1]) - ok() + ok(blck.get()) # Event Loop # ------------------------------------------------------------------------------ diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index f3659cef7..6285d7ea6 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -23,17 +23,37 @@ import # Metrics for tracking attestation and beacon block loss declareCounter beacon_attestations_received, - "Number of beacon chain attestations received by this peer" + "Number of valid unaggregated attestations processed by this node" +declareCounter beacon_attestations_dropped, + "Number of invalid unaggregated attestations dropped by this node", labels = ["reason"] declareCounter beacon_aggregates_received, - "Number of beacon chain aggregate attestations received by this peer" + "Number of valid aggregated attestations processed by this node" +declareCounter beacon_aggregates_dropped, + "Number of invalid aggregated attestations dropped by this node", labels = ["reason"] declareCounter beacon_blocks_received, - "Number of beacon chain blocks received by this peer" + "Number of valid blocks processed by this node" +declareCounter beacon_blocks_dropped, + "Number of invalid blocks dropped by this node", labels = ["reason"] declareCounter beacon_attester_slashings_received, - "Number of beacon chain attester slashings received by this peer" + "Number of valid attester slashings processed by this node" +declareCounter beacon_attester_slashings_dropped, + "Number of invalid attester slashings dropped by this node", labels = ["reason"] declareCounter beacon_proposer_slashings_received, - "Number of beacon chain proposer slashings received by this peer" + "Number of valid proposer slashings processed by this node" +declareCounter beacon_proposer_slashings_dropped, + "Number of invalid proposer slashings dropped by this node", labels = ["reason"] declareCounter beacon_voluntary_exits_received, - "Number of beacon chain voluntary exits received by this peer" + "Number of valid voluntary exits processed by this node" +declareCounter beacon_voluntary_exits_dropped, + "Number of invalid voluntary exits dropped by this node", labels = ["reason"] +declareCounter beacon_sync_committee_messages_received, + "Number of valid sync committee messages processed by this node" +declareCounter beacon_sync_committee_messages_dropped, + "Number of invalid sync committee messages dropped by this node", labels = ["reason"] +declareCounter beacon_sync_committee_contributions_received, + "Number of valid sync committee contributions processed by this node" +declareCounter beacon_sync_committee_contributions_dropped, + "Number of invalid sync committee contributions dropped by this node", labels = ["reason"] const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf] @@ -60,6 +80,19 @@ type ## across quick restarts. Eth2Processor* = object + ## The Eth2Processor is the entry point for untrusted message processing - + ## when we receive messages from various sources, we pass them to the + ## processor for validation and routing - the messages are generally + ## validated, and if valid, passed on to the various pools, monitors and + ## managers to update the state of the application. + ## + ## Block processing is special in that part of it is done in the + ## `BlockProcessor` instead, via a special block processing queue. + ## + ## Each validating function generally will do a sanity check on the message + ## whose purpose is to quickly filter out spam, then will (usually) delegate + ## full validation to the proper manager - finally, metrics and monitoring + ## are updated. doppelGangerDetectionEnabled*: bool # Local sources of truth for validation @@ -158,16 +191,13 @@ proc blockValidator*( # decoding at this stage, which may be significant debug "Block received", delay - let blck = self.dag.isValidBeaconBlock( + let v = self.dag.isValidBeaconBlock( self.quarantine, signedBlock, wallTime, {}) - self.blockProcessor[].dumpBlock(signedBlock, blck) - - if not blck.isOk: - return blck.error[0] - - beacon_blocks_received.inc() - beacon_block_delay.observe(delay.toFloatSeconds()) + if v.isErr: + self.blockProcessor[].dumpBlock(signedBlock, v) + beacon_blocks_dropped.inc(1, [$v.error[0]]) + return v.error[0] # Block passed validation - enqueue it for processing. The block processing # queue is effectively unbounded as we use a freestanding task to enqueue @@ -175,10 +205,15 @@ proc blockValidator*( # sync, we don't lose the gossip blocks, but also don't block the gossip # propagation of seemingly good blocks trace "Block validated" + self.blockProcessor[].addBlock( ForkedSignedBeaconBlock.init(signedBlock), validationDur = self.getCurrentBeaconTime() - wallTime) + # Validator monitor registration for blocks is done by the processor + beacon_blocks_received.inc() + beacon_block_delay.observe(delay.toFloatSeconds()) + ValidationResult.Accept proc checkForPotentialDoppelganger( @@ -235,21 +270,22 @@ proc attestationValidator*( self.batchCrypto, attestation, wallTime, subnet_id, checkSignature) if v.isErr(): debug "Dropping attestation", validationError = v.error + beacon_attestations_dropped.inc(1, [$v.error[0]]) return v.error[0] # Due to async validation the wallSlot here might have changed (afterGenesis, wallSlot) = self.getCurrentBeaconTime().toSlot() - beacon_attestations_received.inc() - beacon_attestation_delay.observe(delay.toFloatSeconds()) + let (attester_index, sig) = v.get() - let (attestation_index, sig) = v.get() - - self[].checkForPotentialDoppelganger(attestation, [attestation_index]) + self[].checkForPotentialDoppelganger(attestation, [attester_index]) trace "Attestation validated" self.attestationPool[].addAttestation( - attestation, [attestation_index], sig, wallSlot) + attestation, [attester_index], sig, wallSlot) + + beacon_attestations_received.inc() + beacon_attestation_delay.observe(delay.toFloatSeconds()) return ValidationResult.Accept @@ -283,14 +319,12 @@ proc aggregateValidator*( aggregator_index = signedAggregateAndProof.message.aggregator_index, selection_proof = signedAggregateAndProof.message.selection_proof, wallSlot + beacon_aggregates_dropped.inc(1, [$v.error[0]]) return v.error[0] # Due to async validation the wallSlot here might have changed (afterGenesis, wallSlot) = self.getCurrentBeaconTime().toSlot() - beacon_aggregates_received.inc() - beacon_aggregate_delay.observe(delay.toFloatSeconds()) - let (attesting_indices, sig) = v.get() self[].checkForPotentialDoppelganger( @@ -303,6 +337,9 @@ proc aggregateValidator*( self.attestationPool[].addAttestation( signedAggregateAndProof.message.aggregate, attesting_indices, sig, wallSlot) + beacon_aggregates_received.inc() + beacon_aggregate_delay.observe(delay.toFloatSeconds()) + return ValidationResult.Accept proc attesterSlashingValidator*( @@ -314,6 +351,7 @@ proc attesterSlashingValidator*( let v = self.exitPool[].validateAttesterSlashing(attesterSlashing) if v.isErr: debug "Dropping attester slashing", validationError = v.error + beacon_attester_slashings_dropped.inc(1, [$v.error[0]]) return v.error[0] beacon_attester_slashings_received.inc() @@ -329,6 +367,7 @@ proc proposerSlashingValidator*( let v = self.exitPool[].validateProposerSlashing(proposerSlashing) if v.isErr: debug "Dropping proposer slashing", validationError = v.error + beacon_proposer_slashings_dropped.inc(1, [$v.error[0]]) return v.error[0] beacon_proposer_slashings_received.inc() @@ -344,6 +383,7 @@ proc voluntaryExitValidator*( let v = self.exitPool[].validateVoluntaryExit(signedVoluntaryExit) if v.isErr: debug "Dropping voluntary exit", validationError = v.error + beacon_voluntary_exits_dropped.inc(1, [$v.error[0]]) return v.error[0] beacon_voluntary_exits_received.inc() @@ -369,11 +409,15 @@ proc syncCommitteeMsgValidator*( let v = validateSyncCommitteeMessage(self.dag, self.syncCommitteeMsgPool, syncCommitteeMsg, committeeIdx, wallTime, checkSignature) - if v.isErr(): + if v.isErr: debug "Dropping sync committee message", validationError = v.error + beacon_sync_committee_messages_dropped.inc(1, [$v.error[0]]) return v.error[0] trace "Sync committee message validated" + + beacon_sync_committee_messages_received.inc() + ValidationResult.Accept proc syncCommitteeContributionValidator*( @@ -401,6 +445,9 @@ proc syncCommitteeContributionValidator*( validationError = v.error, selection_proof = contributionAndProof.message.selection_proof, wallSlot + beacon_sync_committee_contributions_dropped.inc(1, [$v.error[0]]) return v.error[0] + beacon_sync_committee_contributions_received.inc() + ValidationResult.Accept diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 387cc8707..58952c07c 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -876,76 +876,75 @@ proc validateSignedContributionAndProof*( syncCommitteeMsgPool.seenContributionByAuthor.incl msgKey - block: - # [REJECT] The aggregator's validator index is in the declared subcommittee - # of the current sync committee. - # i.e. state.validators[contribution_and_proof.aggregator_index].pubkey in - # get_sync_subcommittee_pubkeys(state, contribution.subcommittee_index). - let - epoch = msg.message.contribution.slot.epoch - fork = dag.forkAtEpoch(epoch) - genesisValidatorsRoot = dag.genesisValidatorsRoot + # [REJECT] The aggregator's validator index is in the declared subcommittee + # of the current sync committee. + # i.e. state.validators[contribution_and_proof.aggregator_index].pubkey in + # get_sync_subcommittee_pubkeys(state, contribution.subcommittee_index). + let + epoch = msg.message.contribution.slot.epoch + fork = dag.forkAtEpoch(epoch) + genesisValidatorsRoot = dag.genesisValidatorsRoot - # [REJECT] The aggregator signature, signed_contribution_and_proof.signature, is valid - if not verify_signed_contribution_and_proof_signature(msg, fork, - genesisValidatorsRoot, - aggregatorPubKey.get()): - return errReject( - "SignedContributionAndProof: aggregator signature fails to verify") + # [REJECT] The aggregator signature, signed_contribution_and_proof.signature, is valid + if not verify_signed_contribution_and_proof_signature(msg, fork, + genesisValidatorsRoot, + aggregatorPubKey.get()): + return errReject( + "SignedContributionAndProof: aggregator signature fails to verify") - # [REJECT] The contribution_and_proof.selection_proof is a valid signature of the - # SyncAggregatorSelectionData derived from the contribution by the validator with - # index contribution_and_proof.aggregator_index. - if not verify_selection_proof_signature(msg.message, fork, - genesisValidatorsRoot, - aggregatorPubKey.get()): - return errReject( - "SignedContributionAndProof: selection proof signature fails to verify") + # [REJECT] The contribution_and_proof.selection_proof is a valid signature of the + # SyncAggregatorSelectionData derived from the contribution by the validator with + # index contribution_and_proof.aggregator_index. + if not verify_selection_proof_signature(msg.message, fork, + genesisValidatorsRoot, + aggregatorPubKey.get()): + return errReject( + "SignedContributionAndProof: selection proof signature fails to verify") - # [REJECT] The aggregate signature is valid for the message beacon_block_root - # and aggregate pubkey derived from the participation info in aggregation_bits - # for the subcommittee specified by the contribution.subcommittee_index. - var - committeeAggKey {.noInit.}: AggregatePublicKey - initialized = false - mixedKeys = 0 - - for validatorPubKey in dag.syncCommitteeParticipants( - msg.message.contribution.slot + 1, - committeeIdx, - msg.message.contribution.aggregation_bits): - let validatorPubKey = validatorPubKey.loadWithCache.get - if not initialized: - initialized = true - committeeAggKey.init(validatorPubKey) - inc mixedKeys - else: - inc mixedKeys - committeeAggKey.aggregate(validatorPubKey) + # [REJECT] The aggregate signature is valid for the message beacon_block_root + # and aggregate pubkey derived from the participation info in aggregation_bits + # for the subcommittee specified by the contribution.subcommittee_index. + var + committeeAggKey {.noInit.}: AggregatePublicKey + initialized = false + mixedKeys = 0 + for validatorPubKey in dag.syncCommitteeParticipants( + msg.message.contribution.slot + 1, + committeeIdx, + msg.message.contribution.aggregation_bits): + let validatorPubKey = validatorPubKey.loadWithCache.get if not initialized: - # [REJECT] The contribution has participants - # that is, any(contribution.aggregation_bits). - return errReject("SignedContributionAndProof: aggregation bits empty") + initialized = true + committeeAggKey.init(validatorPubKey) + inc mixedKeys + else: + inc mixedKeys + committeeAggKey.aggregate(validatorPubKey) - let cookedSignature = msg.message.contribution.signature.load - if cookedSignature.isNone: - return errReject( - "SignedContributionAndProof: aggregate signature fails to load") + if not initialized: + # [REJECT] The contribution has participants + # that is, any(contribution.aggregation_bits). + return errReject("SignedContributionAndProof: aggregation bits empty") - if checkSignature and - not verify_sync_committee_message_signature( - epoch, msg.message.contribution.beacon_block_root, fork, - genesisValidatorsRoot, committeeAggKey.finish, cookedSignature.get): - debug "failing_sync_contribution", - slot = msg.message.contribution.slot + 1, - subnet_id = committeeIdx, - participants = $(msg.message.contribution.aggregation_bits), - mixedKeys + let cookedSignature = msg.message.contribution.signature.load + if cookedSignature.isNone: + return errReject( + "SignedContributionAndProof: aggregate signature fails to load") - return errReject( - "SignedContributionAndProof: aggregate signature fails to verify") + if checkSignature and + not verify_sync_committee_message_signature( + epoch, msg.message.contribution.beacon_block_root, fork, + genesisValidatorsRoot, committeeAggKey.finish, cookedSignature.get): + debug "failing_sync_contribution", + slot = msg.message.contribution.slot + 1, + subnet = committeeIdx, + participants = $(msg.message.contribution.aggregation_bits), + mixedKeys - syncCommitteeMsgPool[].addSyncContribution(msg, cookedSignature.get) + return errReject( + "SignedContributionAndProof: aggregate signature fails to verify") + + syncCommitteeMsgPool[].addSyncContribution(msg, cookedSignature.get) ok() diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index d1860a5ee..c855d86c9 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -27,7 +27,7 @@ import eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], ".."/[version, conf, beacon_clock], - ../spec/datatypes/[phase0, altair], + ../spec/datatypes/[phase0, altair, merge], ../spec/[eth2_ssz_serialization, network, helpers, forks], ../validators/keystore_management, ./eth2_discovery, ./peer_pool, ./libp2p_json_serialization @@ -2150,37 +2150,40 @@ proc broadcastAttestation*(node: Eth2Node, subnet_id: SubnetId, node.broadcast(topic, attestation) proc broadcastVoluntaryExit*(node: Eth2Node, exit: SignedVoluntaryExit) = - let exitsTopic = getVoluntaryExitsTopic( + let topic = getVoluntaryExitsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) - node.broadcast(exitsTopic, exit) + node.broadcast(topic, exit) proc broadcastAttesterSlashing*(node: Eth2Node, slashing: AttesterSlashing) = - let attesterSlashingsTopic = getAttesterSlashingsTopic( + let topic = getAttesterSlashingsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) - node.broadcast(attesterSlashingsTopic, slashing) + node.broadcast(topic, slashing) proc broadcastProposerSlashing*(node: Eth2Node, slashing: ProposerSlashing) = - let proposerSlashingsTopic = getProposerSlashingsTopic( + let topic = getProposerSlashingsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) - node.broadcast(proposerSlashingsTopic, slashing) + node.broadcast(topic, slashing) proc broadcastAggregateAndProof*(node: Eth2Node, proof: SignedAggregateAndProof) = - let proofTopic = getAggregateAndProofsTopic( + let topic = getAggregateAndProofsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) - node.broadcast(proofTopic, proof) + node.broadcast(topic, proof) + +proc broadcastBeaconBlock*(node: Eth2Node, blck: phase0.SignedBeaconBlock) = + let topic = getBeaconBlocksTopic(node.forkDigests.phase0) + node.broadcast(topic, blck) + +proc broadcastBeaconBlock*(node: Eth2Node, blck: altair.SignedBeaconBlock) = + let topic = getBeaconBlocksTopic(node.forkDigests.altair) + node.broadcast(topic, blck) + +proc broadcastBeaconBlock*(node: Eth2Node, blck: merge.SignedBeaconBlock) = + let topic = getBeaconBlocksTopic(node.forkDigests.merge) + node.broadcast(topic, blck) proc broadcastBeaconBlock*(node: Eth2Node, forked: ForkedSignedBeaconBlock) = - case forked.kind - of BeaconBlockFork.Phase0: - let topic = getBeaconBlocksTopic(node.forkDigests.phase0) - node.broadcast(topic, forked.phase0Data) - of BeaconBlockFork.Altair: - let topic = getBeaconBlocksTopic(node.forkDigests.altair) - node.broadcast(topic, forked.altairData) - of BeaconBlockFork.Merge: - let topic = getBeaconBlocksTopic(node.forkDigests.merge) - node.broadcast(topic, forked.mergeData) + withBlck(forked): node.broadcastBeaconBlock(blck) proc broadcastSyncCommitteeMessage*( node: Eth2Node, msg: SyncCommitteeMessage, committeeIdx: SyncCommitteeIndex) = diff --git a/beacon_chain/rpc/rpc_beacon_api.nim b/beacon_chain/rpc/rpc_beacon_api.nim index aec0d71ae..7ea909c4c 100644 --- a/beacon_chain/rpc/rpc_beacon_api.nim +++ b/beacon_chain/rpc/rpc_beacon_api.nim @@ -406,7 +406,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. "Beacon node is currently syncing, try again later.") let head = node.dag.head if head.slot >= blck.message.slot: - node.network.broadcastBeaconBlock(ForkedSignedBeaconBlock.init(blck)) + node.network.broadcastBeaconBlock(blck) # The block failed validation, but was successfully broadcast anyway. # It was not integrated into the beacon node's database. return 202 @@ -415,7 +415,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. node, head, AttachedValidator(), ForkedSignedBeaconBlock.init(blck)) if res == head: - node.network.broadcastBeaconBlock(ForkedSignedBeaconBlock.init(blck)) + node.network.broadcastBeaconBlock(blck) # The block failed validation, but was successfully broadcast anyway. # It was not integrated into the beacon node''s database. return 202 diff --git a/beacon_chain/spec/datatypes/altair.nim b/beacon_chain/spec/datatypes/altair.nim index c175dc973..63e0c71ee 100644 --- a/beacon_chain/spec/datatypes/altair.nim +++ b/beacon_chain/spec/datatypes/altair.nim @@ -514,7 +514,23 @@ func init*(T: type SyncAggregate): SyncAggregate = func shortLog*(v: SyncAggregate): auto = $(v.sync_committee_bits) +func shortLog*(v: ContributionAndProof): auto = + ( + aggregator_index: v.aggregator_index, + contribution: shortLog(v.contribution), + selection_proof: shortLog(v.selection_proof) + ) + +func shortLog*(v: SignedContributionAndProof): auto = + ( + message: shortLog(v.message), + signature: shortLog(v.signature) + ) + chronicles.formatIt SyncCommitteeMessage: shortLog(it) +chronicles.formatIt SyncCommitteeContribution: shortLog(it) +chronicles.formatIt ContributionAndProof: shortLog(it) +chronicles.formatIt SignedContributionAndProof: shortLog(it) template hash*(x: LightClientUpdate): Hash = hash(x.header) diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index 88394ddfd..e49680540 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -30,7 +30,7 @@ import ../eth1/eth1_monitor, ../networking/eth2_network, ../sszdump, ../sync/sync_manager, - ../gossip_processing/consensus_manager, + ../gossip_processing/[block_processor, consensus_manager], ".."/[conf, beacon_clock, beacon_node, version], "."/[slashing_protection, validator_pool, keystore_management] @@ -445,52 +445,26 @@ proc proposeSignedBlock*(node: BeaconNode, validator: AttachedValidator, newBlock: ForkedSignedBeaconBlock): Future[BlockRef] {.async.} = - let newBlockRef = - case newBlock.kind: - of BeaconBlockFork.Phase0: - node.dag.addRawBlock(node.quarantine, newBlock.phase0Data) do ( - blckRef: BlockRef, trustedBlock: phase0.TrustedSignedBeaconBlock, - epochRef: EpochRef): - # Callback add to fork choice if signed block valid (and becomes trusted) - node.attestationPool[].addForkChoice( - epochRef, blckRef, trustedBlock.message, - node.beaconClock.now().slotOrZero()) - of BeaconBlockFork.Altair: - node.dag.addRawBlock(node.quarantine, newBlock.altairData) do ( - blckRef: BlockRef, trustedBlock: altair.TrustedSignedBeaconBlock, - epochRef: EpochRef): - # Callback add to fork choice if signed block valid (and becomes trusted) - node.attestationPool[].addForkChoice( - epochRef, blckRef, trustedBlock.message, - node.beaconClock.now().slotOrZero()) - of BeaconBlockFork.Merge: - node.dag.addRawBlock(node.quarantine, newBlock.mergeData) do ( - blckRef: BlockRef, trustedBlock: merge.TrustedSignedBeaconBlock, - epochRef: EpochRef): - # Callback add to fork choice if signed block valid (and becomes trusted) - node.attestationPool[].addForkChoice( - epochRef, blckRef, trustedBlock.message, - node.beaconClock.now().slotOrZero()) + let wallTime = node.beaconClock.now() - if newBlockRef.isErr: - withBlck(newBlock): + return withBlck(newBlock): + let newBlockRef = node.blockProcessor[].storeBlock( + blck, wallTime.slotOrZero()) + + if newBlockRef.isErr: warn "Unable to add proposed block to block pool", newBlock = blck.message, root = blck.root - return head + return head - withBlck(newBlock): notice "Block proposed", blck = shortLog(blck.message), root = blck.root, validator = shortLog(validator) - if node.config.dumpEnabled: - dump(node.config.dumpDirOutgoing, blck) + node.network.broadcastBeaconBlock(blck) - node.network.broadcastBeaconBlock(newBlock) + beacon_blocks_proposed.inc() - beacon_blocks_proposed.inc() - - return newBlockRef[] + newBlockRef.get() proc proposeBlock(node: BeaconNode, validator: AttachedValidator,