From 9b43a76f2fd615392697cd184d9e5958c57e9097 Mon Sep 17 00:00:00 2001 From: tersec Date: Fri, 25 Mar 2022 11:40:10 +0000 Subject: [PATCH] kiln beacon node (#3540) * kiln bn * use version of beacon_chain_db * have Eth1Monitor abstract more tightly over web3provider --- .../consensus_object_pools/block_dag.nim | 24 +++- .../consensus_object_pools/blockchain_dag.nim | 3 +- beacon_chain/eth1/eth1_monitor.nim | 46 ++++-- .../gossip_processing/block_processor.nim | 135 +++++++++++++++++- .../gossip_processing/consensus_manager.nim | 13 +- beacon_chain/nimbus_beacon_node.nim | 2 +- beacon_chain/spec/engine_authentication.nim | 6 +- tests/test_block_processor.nim | 5 +- 8 files changed, 210 insertions(+), 24 deletions(-) diff --git a/beacon_chain/consensus_object_pools/block_dag.nim b/beacon_chain/consensus_object_pools/block_dag.nim index 45d1e6acb..883c8e6de 100644 --- a/beacon_chain/consensus_object_pools/block_dag.nim +++ b/beacon_chain/consensus_object_pools/block_dag.nim @@ -9,6 +9,7 @@ import chronicles, + ../spec/datatypes/[phase0, altair, bellatrix], ../spec/forks export chronicles, forks @@ -30,6 +31,8 @@ type bid*: BlockId ##\ ## Root that can be used to retrieve block data from database + executionBlockRoot*: Eth2Digest + parent*: BlockRef ##\ ## Not nil, except for the finalized head @@ -46,14 +49,25 @@ type template root*(blck: BlockRef): Eth2Digest = blck.bid.root template slot*(blck: BlockRef): Slot = blck.bid.slot -func init*(T: type BlockRef, root: Eth2Digest, slot: Slot): BlockRef = +func init*( + T: type BlockRef, root: Eth2Digest, executionPayloadRoot: Eth2Digest, + slot: Slot): BlockRef = BlockRef( - bid: BlockId(root: root, slot: slot) + bid: BlockId(root: root, slot: slot), + executionBlockRoot: executionPayloadRoot, ) -func init*(T: type BlockRef, root: Eth2Digest, blck: SomeForkyBeaconBlock): - BlockRef = - BlockRef.init(root, blck.slot) +func init*( + T: type BlockRef, root: Eth2Digest, + blck: phase0.SomeBeaconBlock | altair.SomeBeaconBlock | + phase0.TrustedBeaconBlock | altair.TrustedBeaconBlock): BlockRef = + BlockRef.init(root, Eth2Digest(), blck.slot) + +func init*( + T: type BlockRef, root: Eth2Digest, + blck: bellatrix.SomeBeaconBlock | bellatrix.TrustedBeaconBlock): BlockRef = + BlockRef.init( + root, Eth2Digest(blck.body.execution_payload.block_hash), blck.slot) func parent*(bs: BlockSlot): BlockSlot = ## Return a blockslot representing the previous slot, using the parent block diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 6c186b88e..bed29360a 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -721,7 +721,8 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB, # Load head -> finalized, or all summaries in case the finalized block table # hasn't been written yet for blck in db.getAncestorSummaries(head.root): - let newRef = BlockRef.init(blck.root, blck.summary.slot) + let newRef = BlockRef.init( + blck.root, default(Eth2Digest), blck.summary.slot) if headRef == nil: doAssert blck.root == head.root headRef = newRef diff --git a/beacon_chain/eth1/eth1_monitor.nim b/beacon_chain/eth1/eth1_monitor.nim index 93efbe7cf..1ae9f0891 100644 --- a/beacon_chain/eth1/eth1_monitor.nim +++ b/beacon_chain/eth1/eth1_monitor.nim @@ -14,13 +14,13 @@ import chronos, json, metrics, chronicles/timings, stint/endians2, web3, web3/ethtypes as web3Types, web3/ethhexstrings, web3/engine_api, eth/common/eth_types, - eth/async_utils, stew/[byteutils, shims/hashes], + eth/async_utils, stew/[byteutils, objects, shims/hashes], # Local modules: ../spec/[eth2_merkleization, forks, helpers], ../spec/datatypes/[base, phase0, bellatrix], ../networking/network_metadata, ../consensus_object_pools/block_pools_types, - ".."/[beacon_chain_db, beacon_node_status], + ".."/[beacon_chain_db, beacon_node_status, beacon_clock], ./merkle_minimal export @@ -296,7 +296,7 @@ func is_candidate_block(cfg: RuntimeConfig, func asEth2Digest*(x: BlockHash): Eth2Digest = Eth2Digest(data: array[32, byte](x)) -template asBlockHash(x: Eth2Digest): BlockHash = +template asBlockHash*(x: Eth2Digest): BlockHash = BlockHash(x.data) func asConsensusExecutionPayload*(rpcExecutionPayload: ExecutionPayloadV1): @@ -445,16 +445,38 @@ proc getBlockByNumber*(p: Web3DataProviderRef, proc getPayload*(p: Web3DataProviderRef, payloadId: bellatrix.PayloadID): Future[engine_api.ExecutionPayloadV1] = + # Eth1 monitor can recycle connections without (external) warning; at least, + # don't crash. + if p.isNil: + var epr: Future[engine_api.ExecutionPayloadV1] + epr.complete(default(engine_api.ExecutionPayloadV1)) + return epr + p.web3.provider.engine_getPayloadV1(FixedBytes[8] payloadId) -proc newPayload*(p: Web3DataProviderRef, - payload: engine_api.ExecutionPayloadV1): Future[PayloadStatusV1] = - p.web3.provider.engine_newPayloadV1(payload) +proc newPayload*(p: Eth1Monitor, payload: engine_api.ExecutionPayloadV1): + Future[PayloadStatusV1] = + # Eth1 monitor can recycle connections without (external) warning; at least, + # don't crash. + if p.dataProvider.isNil: + var epr: Future[PayloadStatusV1] + epr.complete(PayloadStatusV1(status: PayloadExecutionStatus.syncing)) + return epr -proc forkchoiceUpdated*(p: Web3DataProviderRef, + p.dataProvider.web3.provider.engine_newPayloadV1(payload) + +proc forkchoiceUpdated*(p: Eth1Monitor, headBlock, finalizedBlock: Eth2Digest): Future[engine_api.ForkchoiceUpdatedResponse] = - p.web3.provider.engine_forkchoiceUpdatedV1( + # Eth1 monitor can recycle connections without (external) warning; at least, + # don't crash. + if p.dataProvider.isNil: + var fcuR: Future[engine_api.ForkchoiceUpdatedResponse] + fcuR.complete(engine_api.ForkchoiceUpdatedResponse( + payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))) + return fcuR + + p.dataProvider.web3.provider.engine_forkchoiceUpdatedV1( ForkchoiceStateV1( headBlockHash: headBlock.asBlockHash, @@ -472,6 +494,14 @@ proc forkchoiceUpdated*(p: Web3DataProviderRef, randomData: array[32, byte], suggestedFeeRecipient: Eth1Address): Future[engine_api.ForkchoiceUpdatedResponse] = + # Eth1 monitor can recycle connections without (external) warning; at least, + # don't crash. + if p.isNil: + var fcuR: Future[engine_api.ForkchoiceUpdatedResponse] + fcuR.complete(engine_api.ForkchoiceUpdatedResponse( + payloadStatus: PayloadStatusV1(status: PayloadExecutionStatus.syncing))) + return fcuR + p.web3.provider.engine_forkchoiceUpdatedV1( ForkchoiceStateV1( headBlockHash: headBlock.asBlockHash, diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index a1aaddc6a..3b4a8b551 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -11,13 +11,16 @@ import std/math, stew/results, chronicles, chronos, metrics, - ../spec/datatypes/[phase0, altair], + eth/async_utils, + web3/engine_api_types, + ../spec/datatypes/[phase0, altair, bellatrix], ../spec/[forks, signatures_batch], ../consensus_object_pools/[ attestation_pool, block_clearance, blockchain_dag, block_quarantine, spec_cache], + ../eth1/eth1_monitor, ./consensus_manager, - ".."/[beacon_clock], + ../beacon_clock, ../sszdump export sszdump, signatures_batch @@ -29,6 +32,8 @@ export sszdump, signatures_batch declareHistogram beacon_store_block_duration_seconds, "storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf] +const web3Timeout = 650.milliseconds + type BlockEntry* = object blck*: ForkedSignedBeaconBlock @@ -69,11 +74,17 @@ type # ---------------------------------------------------------------- consensusManager: ref ConsensusManager ## Blockchain DAG, AttestationPool and Quarantine + ## Blockchain DAG, AttestationPool, Quarantine, and Eth1Manager validatorMonitor: ref ValidatorMonitor getBeaconTime: GetBeaconTimeFn verifier: BatchVerifier +proc addBlock*( + self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, + resfut: Future[Result[void, BlockError]] = nil, + validationDur = Duration()) + # Initialization # ------------------------------------------------------------------------------ @@ -311,6 +322,66 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) = if res.isOk(): Result[void, BlockError].ok() else: Result[void, BlockError].err(res.error())) +proc runForkchoiceUpdated( + self: ref BlockProcessor, headBlockRoot, finalizedBlockRoot: Eth2Digest) + {.async.} = + if headBlockRoot.isZero or finalizedBlockRoot.isZero: + return + + try: + # Minimize window for Eth1 monitor to shut down connection + await self.consensusManager.eth1Monitor.ensureDataProvider() + + debug "runForkChoiceUpdated: running forkchoiceUpdated", + headBlockRoot, + finalizedBlockRoot + + discard awaitWithTimeout( + forkchoiceUpdated( + self.consensusManager.eth1Monitor, headBlockRoot, finalizedBlockRoot), + web3Timeout): + info "runForkChoiceUpdated: forkchoiceUpdated timed out" + default(ForkchoiceUpdatedResponse) + except CatchableError as err: + info "runForkChoiceUpdated: forkchoiceUpdated failed", + err = err.msg + discard + +proc newExecutionPayload( + eth1Monitor: Eth1Monitor, executionPayload: bellatrix.ExecutionPayload): + Future[PayloadExecutionStatus] {.async.} = + debug "newPayload: inserting block into execution engine", + parentHash = executionPayload.parent_hash, + blockHash = executionPayload.block_hash, + stateRoot = shortLog(executionPayload.state_root), + receiptsRoot = shortLog(executionPayload.receipts_root), + prevRandao = shortLog(executionPayload.prev_randao), + blockNumber = executionPayload.block_number, + gasLimit = executionPayload.gas_limit, + gasUsed = executionPayload.gas_used, + timestamp = executionPayload.timestamp, + extraDataLen = executionPayload.extra_data.len, + blockHash = executionPayload.block_hash, + baseFeePerGas = + UInt256.fromBytesLE(executionPayload.base_fee_per_gas.data), + numTransactions = executionPayload.transactions.len + + try: + let + payloadResponse = + awaitWithTimeout( + eth1Monitor.newPayload( + executionPayload.asEngineExecutionPayload), + web3Timeout): + info "newPayload: newExecutionPayload timed out" + PayloadStatusV1(status: PayloadExecutionStatus.syncing) + payloadStatus = payloadResponse.status + + return payloadStatus + except CatchableError as err: + info "newExecutionPayload failed", msg = err.msg + return PayloadExecutionStatus.syncing + proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = while true: # Cooperative concurrency: one block per loop iteration - because @@ -324,6 +395,64 @@ proc runQueueProcessingLoop*(self: ref BlockProcessor) {.async.} = # larger network reads when under load. idleTimeout = 10.milliseconds + defaultBellatrixPayload = default(bellatrix.ExecutionPayload) + discard await idleAsync().withTimeout(idleTimeout) - self[].processBlock(await self[].blockQueue.popFirst()) + let + blck = await self[].blockQueue.popFirst() + hasExecutionPayload = blck.blck.kind >= BeaconBlockFork.Bellatrix + executionPayloadStatus = + if hasExecutionPayload and + # Allow local testnets to run without requiring an execution layer + blck.blck.bellatrixData.message.body.execution_payload != + defaultBellatrixPayload: + try: + # Minimize window for Eth1 monitor to shut down connection + await self.consensusManager.eth1Monitor.ensureDataProvider() + + await newExecutionPayload( + self.consensusManager.eth1Monitor, + blck.blck.bellatrixData.message.body.execution_payload) + except CatchableError as err: + info "runQueueProcessingLoop: newExecutionPayload failed", + err = err.msg + PayloadExecutionStatus.syncing + else: + # Vacuously + PayloadExecutionStatus.valid + + if executionPayloadStatus in [ + PayloadExecutionStatus.invalid, + PayloadExecutionStatus.invalid_block_hash, + PayloadExecutionStatus.invalid_terminal_block]: + info "runQueueProcessingLoop: execution payload invalid", + executionPayloadStatus + if not blck.resfut.isNil: + blck.resfut.complete(Result[void, BlockError].err(BlockError.Invalid)) + continue + + if executionPayloadStatus == PayloadExecutionStatus.valid: + self[].processBlock(blck) + else: + # Every non-nil future must be completed here, but don't want to process + # the block any further in CL terms. Also don't want to specify Invalid, + # as if it gets here, it's something more like MissingParent (except, on + # the EL side). + if not blck.resfut.isNil: + blck.resfut.complete( + Result[void, BlockError].err(BlockError.MissingParent)) + + if executionPayloadStatus == PayloadExecutionStatus.valid and + hasExecutionPayload: + let + headBlockRoot = self.consensusManager.dag.head.executionBlockRoot + + finalizedBlockRoot = + if not isZero( + self.consensusManager.dag.finalizedHead.blck.executionBlockRoot): + self.consensusManager.dag.finalizedHead.blck.executionBlockRoot + else: + default(Eth2Digest) + + await self.runForkchoiceUpdated(headBlockRoot, finalizedBlockRoot) diff --git a/beacon_chain/gossip_processing/consensus_manager.nim b/beacon_chain/gossip_processing/consensus_manager.nim index c0b2b59e8..0dcb0cac2 100644 --- a/beacon_chain/gossip_processing/consensus_manager.nim +++ b/beacon_chain/gossip_processing/consensus_manager.nim @@ -10,7 +10,8 @@ import chronicles, chronos, ../spec/datatypes/base, - ../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool] + ../consensus_object_pools/[blockchain_dag, block_quarantine, attestation_pool], + ../eth1/eth1_monitor # TODO: Move to "consensus_object_pools" folder @@ -28,18 +29,24 @@ type # ---------------------------------------------------------------- quarantine*: ref Quarantine + # Execution layer integration + # ---------------------------------------------------------------- + eth1Monitor*: Eth1Monitor + # Initialization # ------------------------------------------------------------------------------ func new*(T: type ConsensusManager, dag: ChainDAGRef, attestationPool: ref AttestationPool, - quarantine: ref Quarantine + quarantine: ref Quarantine, + eth1Monitor: Eth1Monitor ): ref ConsensusManager = (ref ConsensusManager)( dag: dag, attestationPool: attestationPool, - quarantine: quarantine + quarantine: quarantine, + eth1Monitor: eth1Monitor ) # Consensus Management diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index faa03d081..af6f428bc 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -256,7 +256,7 @@ proc initFullNode( exitPool = newClone( ExitPool.init(dag, onVoluntaryExitAdded)) consensusManager = ConsensusManager.new( - dag, attestationPool, quarantine) + dag, attestationPool, quarantine, node.eth1Monitor) blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, rng, taskpool, consensusManager, node.validatorMonitor, getBeaconTime) diff --git a/beacon_chain/spec/engine_authentication.nim b/beacon_chain/spec/engine_authentication.nim index fe79ce17a..09146edbd 100644 --- a/beacon_chain/spec/engine_authentication.nim +++ b/beacon_chain/spec/engine_authentication.nim @@ -83,14 +83,16 @@ proc checkJwtSecret*( try: let lines = readLines(jwtSecret.get, 1) - if lines.len > 0 and lines[0].startswith("0x"): + if lines.len > 0: + # Secret JWT key is parsed in constant time using nimcrypto: + # https://github.com/cheatfate/nimcrypto/pull/44 let secret = utils.fromHex(lines[0]) if secret.len >= MIN_SECRET_LEN: ok(secret) else: err("JWT secret not at least 256 bits") else: - err("no 0x-prefixed hex string found") + err("no hex string found") except IOError: err("couldn't open specified JWT secret file") except ValueError: diff --git a/tests/test_block_processor.nim b/tests/test_block_processor.nim index 87d831308..3e8e0a208 100644 --- a/tests/test_block_processor.nim +++ b/tests/test_block_processor.nim @@ -17,6 +17,7 @@ import ../beacon_chain/gossip_processing/[block_processor, consensus_manager], ../beacon_chain/consensus_object_pools/[ attestation_pool, blockchain_dag, block_quarantine, block_clearance], + ../beacon_chain/eth1/eth1_monitor, ./testutil, ./testdbutil, ./testblockutil proc pruneAtFinalization(dag: ChainDAGRef) = @@ -33,7 +34,9 @@ suite "Block processor" & preset(): verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool) quarantine = newClone(Quarantine.init()) attestationPool = newClone(AttestationPool.init(dag, quarantine)) - consensusManager = ConsensusManager.new(dag, attestationPool, quarantine) + eth1Monitor = new Eth1Monitor + consensusManager = ConsensusManager.new( + dag, attestationPool, quarantine, eth1Monitor) state = newClone(dag.headState) cache = StateCache() b1 = addTestBlock(state[], cache).phase0Data