From 7bbb0f44217c67a3ddff0f9b51b603ded06cb23d Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 18 Dec 2024 13:21:20 +0100 Subject: [PATCH] Stream blocks during import (#2937) When running the import, currently blocks are loaded in batches into a `seq` then passed to the importer as such. In reality, blocks are still processed one by one, so the batching does not offer any performance advantage. It does however require that the client wastes memory, up to several GB, on the block sequence while they're waiting to be processed. This PR introduces a persister that accepts these potentially large blocks one by one and at the same time removes a number of redundant / unnecessary copies, assignments and resets that were slowing down the import process in general. --- fluffy/database/era1_db.nim | 12 +- fluffy/eth_data/era1.nim | 98 ++--- .../portal_bridge/portal_bridge_history.nim | 23 +- hive_integration/nodocker/engine/node.nim | 1 - hive_integration/nodocker/rpc/rpc_sim.nim | 2 +- nimbus/core/chain/persist_blocks.nim | 264 ++++++------- nimbus/db/core_db/base.nim | 3 +- nimbus/db/era1_db/db_desc.nim | 16 +- nimbus/evm/code_stream.nim | 38 +- nimbus/evm/state.nim | 73 ++-- nimbus/nimbus_import.nim | 359 ++++++++++-------- nimbus/utils/era_helpers.nim | 69 ++-- nrpc/nrpc.nim | 2 +- tests/replay/undump_blocks_era1.nim | 6 +- 14 files changed, 499 insertions(+), 467 deletions(-) diff --git a/fluffy/database/era1_db.nim b/fluffy/database/era1_db.nim index bb89bcaa3..24063671c 100644 --- a/fluffy/database/era1_db.nim +++ b/fluffy/database/era1_db.nim @@ -59,15 +59,19 @@ proc new*( ): Era1DB = Era1DB(path: path, network: network, accumulator: accumulator) -proc getEthBlock*(db: Era1DB, blockNumber: uint64): Result[Block, string] = +proc getEthBlock*( + db: Era1DB, blockNumber: uint64, res: var Block +): Result[void, string] = let f = ?db.getEra1File(blockNumber.era) - f.getEthBlock(blockNumber) + f.getEthBlock(blockNumber, res) -proc getBlockTuple*(db: Era1DB, blockNumber: uint64): Result[BlockTuple, string] = +proc getBlockTuple*( + db: Era1DB, blockNumber: uint64, res: var BlockTuple +): Result[void, string] = let f = ?db.getEra1File(blockNumber.era) - f.getBlockTuple(blockNumber) + f.getBlockTuple(blockNumber, res) proc getAccumulator*( db: Era1DB, blockNumber: uint64 diff --git a/fluffy/eth_data/era1.nim b/fluffy/eth_data/era1.nim index 0469bbeed..e6ba70ffa 100644 --- a/fluffy/eth_data/era1.nim +++ b/fluffy/eth_data/era1.nim @@ -180,9 +180,10 @@ func offsetsLen(startNumber: uint64): int = proc toCompressedRlpBytes(item: auto): seq[byte] = snappy.encodeFramed(rlp.encode(item)) -proc fromCompressedRlpBytes(bytes: openArray[byte], T: type): Result[T, string] = +proc fromCompressedRlpBytes[T](bytes: openArray[byte], v: var T): Result[void, string] = try: - ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), T)) + v = rlp.decode(decodeFramed(bytes, checkIntegrity = false), T) + ok() except RlpError as e: err("Invalid compressed RLP data for " & $T & ": " & e.msg) @@ -300,32 +301,32 @@ proc skipRecord*(f: Era1File): Result[void, string] = f[].handle.get().skipRecord() -proc getBlockHeader(f: Era1File): Result[headers.Header, string] = +proc getBlockHeader(f: Era1File, res: var headers.Header): Result[void, string] = var bytes: seq[byte] let header = ?f[].handle.get().readRecord(bytes) if header.typ != CompressedHeader: return err("Invalid era file: didn't find block header at index position") - fromCompressedRlpBytes(bytes, headers.Header) + fromCompressedRlpBytes(bytes, res) -proc getBlockBody(f: Era1File): Result[BlockBody, string] = +proc getBlockBody(f: Era1File, res: var BlockBody): Result[void, string] = var bytes: seq[byte] let header = ?f[].handle.get().readRecord(bytes) if header.typ != CompressedBody: return err("Invalid era file: didn't find block body at index position") - fromCompressedRlpBytes(bytes, BlockBody) + fromCompressedRlpBytes(bytes, res) -proc getReceipts(f: Era1File): Result[seq[Receipt], string] = +proc getReceipts(f: Era1File, res: var seq[Receipt]): Result[void, string] = var bytes: seq[byte] let header = ?f[].handle.get().readRecord(bytes) if header.typ != CompressedReceipts: return err("Invalid era file: didn't find receipts at index position") - fromCompressedRlpBytes(bytes, seq[Receipt]) + fromCompressedRlpBytes(bytes, res) proc getTotalDifficulty(f: Era1File): Result[UInt256, string] = var bytes: seq[byte] @@ -339,18 +340,25 @@ proc getTotalDifficulty(f: Era1File): Result[UInt256, string] = ok(UInt256.fromBytesLE(bytes)) -proc getNextEthBlock*(f: Era1File): Result[Block, string] = +proc getNextEthBlock*(f: Era1File, res: var Block): Result[void, string] = doAssert not isNil(f) and f[].handle.isSome - var - header = ?getBlockHeader(f) - body = ?getBlockBody(f) + var body: BlockBody + ?getBlockHeader(f, res.header) + ?getBlockBody(f, body) + ?skipRecord(f) # receipts ?skipRecord(f) # totalDifficulty - ok(Block.init(move(header), move(body))) + res.transactions = move(body.transactions) + res.uncles = move(body.uncles) + res.withdrawals = move(body.withdrawals) -proc getEthBlock*(f: Era1File, blockNumber: uint64): Result[Block, string] = + ok() + +proc getEthBlock*( + f: Era1File, blockNumber: uint64, res: var Block +): Result[void, string] = doAssert not isNil(f) and f[].handle.isSome doAssert( blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber, @@ -361,20 +369,21 @@ proc getEthBlock*(f: Era1File, blockNumber: uint64): Result[Block, string] = ?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) - getNextEthBlock(f) + getNextEthBlock(f, res) -proc getNextBlockTuple*(f: Era1File): Result[BlockTuple, string] = +proc getNextBlockTuple*(f: Era1File, res: var BlockTuple): Result[void, string] = doAssert not isNil(f) and f[].handle.isSome - let - blockHeader = ?getBlockHeader(f) - blockBody = ?getBlockBody(f) - receipts = ?getReceipts(f) - totalDifficulty = ?getTotalDifficulty(f) + ?getBlockHeader(f, res.header) + ?getBlockBody(f, res.body) + ?getReceipts(f, res.receipts) + res.td = ?getTotalDifficulty(f) - ok((blockHeader, blockBody, receipts, totalDifficulty)) + ok() -proc getBlockTuple*(f: Era1File, blockNumber: uint64): Result[BlockTuple, string] = +proc getBlockTuple*( + f: Era1File, blockNumber: uint64, res: var BlockTuple +): Result[void, string] = doAssert not isNil(f) and f[].handle.isSome doAssert( blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber, @@ -385,9 +394,11 @@ proc getBlockTuple*(f: Era1File, blockNumber: uint64): Result[BlockTuple, string ?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) - getNextBlockTuple(f) + getNextBlockTuple(f, res) -proc getBlockHeader*(f: Era1File, blockNumber: uint64): Result[headers.Header, string] = +proc getBlockHeader*( + f: Era1File, blockNumber: uint64, res: var headers.Header +): Result[void, string] = doAssert not isNil(f) and f[].handle.isSome doAssert( blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber, @@ -398,7 +409,7 @@ proc getBlockHeader*(f: Era1File, blockNumber: uint64): Result[headers.Header, s ?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) - getBlockHeader(f) + getBlockHeader(f, res) proc getTotalDifficulty*(f: Era1File, blockNumber: uint64): Result[UInt256, string] = doAssert not isNil(f) and f[].handle.isSome @@ -445,13 +456,13 @@ proc buildAccumulator*(f: Era1File): Result[EpochRecordCached, string] = endNumber = f.blockIdx.endNumber() var headerRecords: seq[HeaderRecord] + var header: headers.Header for blockNumber in startNumber .. endNumber: - let - blockHeader = ?f.getBlockHeader(blockNumber) - totalDifficulty = ?f.getTotalDifficulty(blockNumber) + ?f.getBlockHeader(blockNumber, header) + let totalDifficulty = ?f.getTotalDifficulty(blockNumber) headerRecords.add( - HeaderRecord(blockHash: blockHeader.rlpHash(), totalDifficulty: totalDifficulty) + HeaderRecord(blockHash: header.rlpHash(), totalDifficulty: totalDifficulty) ) ok(EpochRecordCached.init(headerRecords)) @@ -462,25 +473,26 @@ proc verify*(f: Era1File): Result[Digest, string] = endNumber = f.blockIdx.endNumber() var headerRecords: seq[HeaderRecord] + var blockTuple: BlockTuple for blockNumber in startNumber .. endNumber: + ?f.getBlockTuple(blockNumber, blockTuple) let - (blockHeader, blockBody, receipts, totalDifficulty) = - ?f.getBlockTuple(blockNumber) + txRoot = calcTxRoot(blockTuple.body.transactions) + ommershHash = rlpHash(blockTuple.body.uncles) - txRoot = calcTxRoot(blockBody.transactions) - ommershHash = rlpHash(blockBody.uncles) - - if blockHeader.txRoot != txRoot: + if blockTuple.header.txRoot != txRoot: return err("Invalid transactions root") - if blockHeader.ommersHash != ommershHash: + if blockTuple.header.ommersHash != ommershHash: return err("Invalid ommers hash") - if blockHeader.receiptsRoot != calcReceiptsRoot(receipts): + if blockTuple.header.receiptsRoot != calcReceiptsRoot(blockTuple.receipts): return err("Invalid receipts root") headerRecords.add( - HeaderRecord(blockHash: blockHeader.rlpHash(), totalDifficulty: totalDifficulty) + HeaderRecord( + blockHash: blockTuple.header.rlpHash(), totalDifficulty: blockTuple.td + ) ) let expectedRoot = ?f.getAccumulatorRoot() @@ -496,9 +508,9 @@ iterator era1BlockHeaders*(f: Era1File): headers.Header = startNumber = f.blockIdx.startNumber endNumber = f.blockIdx.endNumber() + var header: headers.Header for blockNumber in startNumber .. endNumber: - let header = f.getBlockHeader(blockNumber).valueOr: - raiseAssert("Failed to read block header: " & error) + f.getBlockHeader(blockNumber, header).expect("Header can be read") yield header iterator era1BlockTuples*(f: Era1File): BlockTuple = @@ -506,7 +518,7 @@ iterator era1BlockTuples*(f: Era1File): BlockTuple = startNumber = f.blockIdx.startNumber endNumber = f.blockIdx.endNumber() + var blockTuple: BlockTuple for blockNumber in startNumber .. endNumber: - let blockTuple = f.getBlockTuple(blockNumber).valueOr: - raiseAssert("Failed to read block tuple: " & error) + f.getBlockTuple(blockNumber, blockTuple).expect("Block tuple can be read") yield blockTuple diff --git a/fluffy/tools/portal_bridge/portal_bridge_history.nim b/fluffy/tools/portal_bridge/portal_bridge_history.nim index 4af239486..2407ab1b2 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_history.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_history.nim @@ -374,14 +374,15 @@ proc runBackfillLoopAuditMode( rng = newRng() db = Era1DB.new(era1Dir, "mainnet", loadAccumulator()) + var blockTuple: BlockTuple while true: let # Grab a random blockNumber to audit and potentially gossip blockNumber = rng[].rand(network_metadata.mergeBlockNumber - 1).uint64 - (header, body, receipts, _) = db.getBlockTuple(blockNumber).valueOr: - error "Failed to get block tuple", error, blockNumber - continue - blockHash = header.rlpHash() + db.getBlockTuple(blockNumber, blockTuple).isOkOr: + error "Failed to get block tuple", error, blockNumber + continue + let blockHash = blockTuple.header.rlpHash() var headerSuccess, bodySuccess, receiptsSuccess = false @@ -441,7 +442,7 @@ proc runBackfillLoopAuditMode( error "Invalid hex for block body content", error = e.msg break bodyBlock - validateBlockBodyBytes(content, header).isOkOr: + validateBlockBodyBytes(content, blockTuple.header).isOkOr: error "Block body is invalid", error break bodyBlock @@ -469,7 +470,7 @@ proc runBackfillLoopAuditMode( error "Invalid hex for block receipts content", error = e.msg break receiptsBlock - validateReceiptsBytes(content, header.receiptsRoot).isOkOr: + validateReceiptsBytes(content, blockTuple.header.receiptsRoot).isOkOr: error "Block receipts are invalid", error break receiptsBlock @@ -481,7 +482,7 @@ proc runBackfillLoopAuditMode( let epochRecord = db.getAccumulator(blockNumber).valueOr: raiseAssert "Failed to get accumulator from EraDB: " & error - headerWithProof = buildHeaderWithProof(header, epochRecord).valueOr: + headerWithProof = buildHeaderWithProof(blockTuple.header, epochRecord).valueOr: raiseAssert "Failed to build header with proof: " & error # gossip block header by hash @@ -489,9 +490,13 @@ proc runBackfillLoopAuditMode( # gossip block header by number await bridge.gossipBlockHeader(blockNumber, headerWithProof) if not bodySuccess: - await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body)) + await bridge.gossipBlockBody( + blockHash, PortalBlockBodyLegacy.fromBlockBody(blockTuple.body) + ) if not receiptsSuccess: - await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts)) + await bridge.gossipReceipts( + blockHash, PortalReceipts.fromReceipts(blockTuple.receipts) + ) await sleepAsync(2.seconds) diff --git a/hive_integration/nodocker/engine/node.nim b/hive_integration/nodocker/engine/node.nim index 26051c660..fd9072e09 100644 --- a/hive_integration/nodocker/engine/node.nim +++ b/hive_integration/nodocker/engine/node.nim @@ -96,7 +96,6 @@ proc setBlock*(c: ChainRef; blk: Block): Result[void, string] = let vmState = c.getVmState(header).valueOr: return err("no vmstate") - _ = vmState.parent.stateRoot # Check point ? vmState.processBlock(blk) ? c.db.persistHeaderAndSetHead(header, c.com.startOfHistory) diff --git a/hive_integration/nodocker/rpc/rpc_sim.nim b/hive_integration/nodocker/rpc/rpc_sim.nim index 23a4d3ddf..c879bcefd 100644 --- a/hive_integration/nodocker/rpc/rpc_sim.nim +++ b/hive_integration/nodocker/rpc/rpc_sim.nim @@ -1,5 +1,5 @@ # Nimbus -# Copyright (c) 2021 Status Research & Development GmbH +# Copyright (c) 2021-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) diff --git a/nimbus/core/chain/persist_blocks.nim b/nimbus/core/chain/persist_blocks.nim index b1d007b31..0e7ab5ed9 100644 --- a/nimbus/core/chain/persist_blocks.nim +++ b/nimbus/core/chain/persist_blocks.nim @@ -11,6 +11,7 @@ {.push raises: [].} import + stew/assign2, results, ../../evm/state, ../../evm/types, @@ -29,8 +30,8 @@ export results type PersistBlockFlag* = enum - NoValidation # Validate the batch instead of validating each block in it - NoFullValidation # Validate the batch instead of validating each block in it + NoValidation # Disable chunk state root validation + NoFullValidation # Disable per-block validation NoPersistHeader NoPersistTransactions NoPersistUncles @@ -40,157 +41,142 @@ type PersistBlockFlags* = set[PersistBlockFlag] - PersistStats = tuple[blocks: int, txs: int, gas: GasInt] + Persister* = object + c: ChainRef + flags: PersistBlockFlags -const - NoPersistBodies* = {NoPersistTransactions, NoPersistUncles, NoPersistWithdrawals} + vmState: BaseVMState + dbTx: CoreDbTxRef + stats*: PersistStats - CleanUpEpoch = 30_000.BlockNumber - ## Regular checks for history clean up (applies to single state DB). This - ## is mainly a debugging/testing feature so that the database can be held - ## a bit smaller. It is not applicable to a full node. + parent: Header + + PersistStats* = tuple[blocks: int, txs: int, gas: GasInt] + +const NoPersistBodies* = {NoPersistTransactions, NoPersistUncles, NoPersistWithdrawals} # ------------------------------------------------------------------------------ # Private # ------------------------------------------------------------------------------ proc getVmState( - c: ChainRef, header: Header, storeSlotHash = false + p: var Persister, header: Header, storeSlotHash = false ): Result[BaseVMState, string] = - let vmState = BaseVMState() - if not vmState.init(header, c.com, storeSlotHash = storeSlotHash): - return err("Could not initialise VMState") - ok(vmState) + if p.vmState == nil: + let vmState = BaseVMState() + if not vmState.init(header, p.c.com, storeSlotHash = storeSlotHash): + return err("Could not initialise VMState") + p.vmState = vmState + else: + if header.number != p.parent.number + 1: + return err("Only linear histories supported by Persister") -proc purgeOlderBlocksFromHistory(db: CoreDbRef, bn: BlockNumber) = - ## Remove non-reachable blocks from KVT database - if 0 < bn: - var blkNum = bn - 1 - while 0 < blkNum: - if not db.forgetHistory blkNum: - break - blkNum = blkNum - 1 + if not p.vmState.reinit(p.parent, header, linear = true): + return err("Could not update VMState for new block") -proc persistBlocksImpl( - c: ChainRef, blocks: openArray[Block], flags: PersistBlockFlags = {} -): Result[PersistStats, string] = - let dbTx = c.db.ctx.newTransaction() - defer: - dbTx.dispose() + ok(p.vmState) - # Note that `0 < headers.len`, assured when called from `persistBlocks()` - let - vmState = - ?c.getVmState(blocks[0].header, storeSlotHash = NoPersistSlotHashes notin flags) - fromBlock = blocks[0].header.number - toBlock = blocks[blocks.high()].header.number - trace "Persisting blocks", fromBlock, toBlock +proc dispose*(p: var Persister) = + if p.dbTx != nil: + p.dbTx.dispose() + p.dbTx = nil - var - blks = 0 - txs = 0 - gas = GasInt(0) - parentHash: Hash32 # only needed after the first block - for blk in blocks: - template header(): Header = - blk.header +proc init*(T: type Persister, c: ChainRef, flags: PersistBlockFlags): T = + T(c: c, flags: flags) - # Full validation means validating the state root at every block and - # performing the more expensive hash computations on the block itself, ie - # verifying that the transaction and receipts roots are valid - when not - # doing full validation, we skip these expensive checks relying instead - # on the source of the data to have performed them previously or because - # the cost of failure is low. - # TODO Figure out the right balance for header fields - in particular, if - # we receive instruction from the CL while syncing that a block is - # CL-valid, do we skip validation while "far from head"? probably yes. - # This requires performing a header-chain validation from that CL-valid - # block which the current code doesn't express. - # Also, the potential avenues for corruption should be described with - # more rigor, ie if the txroot doesn't match but everything else does, - # can the state root of the last block still be correct? Dubious, but - # what would be the consequences? We would roll back the full set of - # blocks which is fairly low-cost. - let skipValidation = - NoFullValidation in flags and header.number != toBlock or NoValidation in flags +proc checkpoint*(p: var Persister): Result[void, string] = + if NoValidation notin p.flags: + let stateRoot = p.c.db.ctx.getAccounts().getStateRoot().valueOr: + return err($$error) - - if blks > 0: - template parent(): Header = - blocks[blks - 1].header - - let updated = - if header.number == parent.number + 1 and header.parentHash == parentHash: - vmState.reinit(parent = parent, header = header, linear = true) - else: - # TODO remove this code path and process only linear histories in this - # function - vmState.reinit(header = header) - - if not updated: - debug "Cannot update VmState", blockNumber = header.number - return err("Cannot update VmState to block " & $header.number) - - # TODO even if we're skipping validation, we should perform basic sanity - # checks on the block and header - that fields are sanely set for the - # given hard fork and similar path-independent checks - these same - # sanity checks should be performed early in the processing pipeline no - # matter their provenance. - if not skipValidation and c.extraValidation: - # TODO: how to checkseal from here - ?c.com.validateHeaderAndKinship(blk, vmState.parent) - - # Generate receipts for storage or validation but skip them otherwise - ?vmState.processBlock( - blk, - skipValidation, - skipReceipts = skipValidation and NoPersistReceipts in flags, - skipUncles = NoPersistUncles in flags, - taskpool = c.com.taskpool, - ) - - let blockHash = header.blockHash() - if NoPersistHeader notin flags: - ?c.db.persistHeaderAndSetHead( - blockHash, header, c.com.startOfHistory) - - if NoPersistTransactions notin flags: - c.db.persistTransactions(header.number, header.txRoot, blk.transactions) - - if NoPersistReceipts notin flags: - c.db.persistReceipts(header.receiptsRoot, vmState.receipts) - - if NoPersistWithdrawals notin flags and blk.withdrawals.isSome: - c.db.persistWithdrawals( - header.withdrawalsRoot.expect("WithdrawalsRoot should be verified before"), - blk.withdrawals.get, + if p.parent.stateRoot != stateRoot: + # TODO replace logging with better error + debug "wrong state root in block", + blockNumber = p.parent.number, + blockHash = p.parent.blockHash, + parentHash = p.parent.parentHash, + expected = p.parent.stateRoot, + actual = stateRoot + return err( + "stateRoot mismatch, expect: " & $p.parent.stateRoot & ", got: " & $stateRoot ) - # update currentBlock *after* we persist it - # so the rpc return consistent result - # between eth_blockNumber and eth_syncing - c.com.syncCurrent = header.number - - blks += 1 - txs += blk.transactions.len - gas += blk.header.gasUsed - parentHash = blockHash - - dbTx.commit() + if p.dbTx != nil: + p.dbTx.commit() + p.dbTx = nil # Save and record the block number before the last saved block state. - c.db.persistent(toBlock).isOkOr: + p.c.db.persistent(p.parent.number).isOkOr: return err("Failed to save state: " & $$error) - if c.com.pruneHistory: - # There is a feature for test systems to regularly clean up older blocks - # from the database, not appicable to a full node set up. - let n = fromBlock div CleanUpEpoch - if 0 < n and n < (toBlock div CleanUpEpoch): - # Starts at around `2 * CleanUpEpoch` - c.db.purgeOlderBlocksFromHistory(fromBlock - CleanUpEpoch) + ok() - ok((blks, txs, gas)) +proc persistBlock*(p: var Persister, blk: Block): Result[void, string] = + template header(): Header = + blk.header + + let c = p.c + + # Full validation means validating the state root at every block and + # performing the more expensive hash computations on the block itself, ie + # verifying that the transaction and receipts roots are valid - when not + # doing full validation, we skip these expensive checks relying instead + # on the source of the data to have performed them previously or because + # the cost of failure is low. + # TODO Figure out the right balance for header fields - in particular, if + # we receive instruction from the CL while syncing that a block is + # CL-valid, do we skip validation while "far from head"? probably yes. + # This requires performing a header-chain validation from that CL-valid + # block which the current code doesn't express. + # Also, the potential avenues for corruption should be described with + # more rigor, ie if the txroot doesn't match but everything else does, + # can the state root of the last block still be correct? Dubious, but + # what would be the consequences? We would roll back the full set of + # blocks which is fairly low-cost. + let + skipValidation = NoValidation in p.flags + vmState = ?p.getVmState(header, storeSlotHash = NoPersistSlotHashes notin p.flags) + + # TODO even if we're skipping validation, we should perform basic sanity + # checks on the block and header - that fields are sanely set for the + # given hard fork and similar path-independent checks - these same + # sanity checks should be performed early in the processing pipeline no + # matter their provenance. + if not skipValidation: + ?c.com.validateHeaderAndKinship(blk, vmState.parent) + + # Generate receipts for storage or validation but skip them otherwise + ?vmState.processBlock( + blk, + skipValidation, + skipReceipts = skipValidation and NoPersistReceipts in p.flags, + skipUncles = NoPersistUncles in p.flags, + taskpool = c.com.taskpool, + ) + + if NoPersistHeader notin p.flags: + let blockHash = header.blockHash() + ?c.db.persistHeaderAndSetHead(blockHash, header, c.com.startOfHistory) + + if NoPersistTransactions notin p.flags: + c.db.persistTransactions(header.number, header.txRoot, blk.transactions) + + if NoPersistReceipts notin p.flags: + c.db.persistReceipts(header.receiptsRoot, vmState.receipts) + + if NoPersistWithdrawals notin p.flags and blk.withdrawals.isSome: + c.db.persistWithdrawals( + header.withdrawalsRoot.expect("WithdrawalsRoot should be verified before"), + blk.withdrawals.get, + ) + + p.stats.blocks += 1 + p.stats.txs += blk.transactions.len + p.stats.gas += blk.header.gasUsed + + assign(p.parent, header) + + ok() proc persistBlocks*( c: ChainRef, blocks: openArray[Block], flags: PersistBlockFlags = {} @@ -200,7 +186,21 @@ proc persistBlocks*( debug "Nothing to do" return ok(default(PersistStats)) # TODO not nice to return nil - c.persistBlocksImpl(blocks, flags) + var p = Persister.init(c, flags) + + for blk in blocks: + p.persistBlock(blk).isOkOr: + p.dispose() + return err(error) + + # update currentBlock *after* we persist it + # so the rpc return consistent result + # between eth_blockNumber and eth_syncing + c.com.syncCurrent = p.parent.number + + let res = p.checkpoint() + p.dispose() + res and ok(p.stats) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/core_db/base.nim b/nimbus/db/core_db/base.nim index 197918317..bfd6496b4 100644 --- a/nimbus/db/core_db/base.nim +++ b/nimbus/db/core_db/base.nim @@ -86,8 +86,7 @@ proc finish*(db: CoreDbRef; eradicate = false) = db.ifTrackNewApi: debug logTxt, api, elapsed proc `$$`*(e: CoreDbError): string = - ## Pretty print error symbol, note that this directive may have side effects - ## as it calls a backend function. + ## Pretty print error symbol ## e.toStr() diff --git a/nimbus/db/era1_db/db_desc.nim b/nimbus/db/era1_db/db_desc.nim index d409f3ded..250962c10 100644 --- a/nimbus/db/era1_db/db_desc.nim +++ b/nimbus/db/era1_db/db_desc.nim @@ -14,10 +14,10 @@ import stew/io2, std/[os, parseutils, strutils, tables], results, - eth/common/eth_types, + eth/common/blocks, ../../../fluffy/eth_data/era1 -export results, eth_types +export results, blocks # TODO this is a "rough copy" of the fluffy DB, minus the accumulator (it goes # by era number alone instead of rooted name) - eventually the two should @@ -80,15 +80,19 @@ proc init*( ok Era1DbRef(path: path, network: network, filenames: filenames) -proc getEthBlock*(db: Era1DbRef, blockNumber: uint64): Result[EthBlock, string] = +proc getEthBlock*( + db: Era1DbRef, blockNumber: uint64, res: var Block +): Result[void, string] = let f = ?db.getEra1File(blockNumber.era) - f.getEthBlock(blockNumber) + f.getEthBlock(blockNumber, res) -proc getBlockTuple*(db: Era1DbRef, blockNumber: uint64): Result[BlockTuple, string] = +proc getBlockTuple*( + db: Era1DbRef, blockNumber: uint64, res: var BlockTuple +): Result[void, string] = let f = ?db.getEra1File(blockNumber.era) - f.getBlockTuple(blockNumber) + f.getBlockTuple(blockNumber, res) proc dispose*(db: Era1DbRef) = for w in db.files: diff --git a/nimbus/evm/code_stream.nim b/nimbus/evm/code_stream.nim index f6d1b0d2e..10308cb8c 100644 --- a/nimbus/evm/code_stream.nim +++ b/nimbus/evm/code_stream.nim @@ -5,12 +5,7 @@ # * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) # at your option. This file may not be copied, modified, or distributed except according to those terms. -import - chronicles, - eth/common, - stew/byteutils, - ./interpreter/op_codes, - ./code_bytes +import chronicles, eth/common, stew/byteutils, ./interpreter/op_codes, ./code_bytes export code_bytes @@ -31,24 +26,21 @@ func init*(T: type CodeStream, code: openArray[char]): T = T(code: CodeBytesRef.init(code)) template read*(c: var CodeStream, size: int): openArray[byte] = - if c.pc + size - 1 < c.bytes.len: - let pos = c.pc - c.pc += size - c.code.bytes.toOpenArray(pos, pos + size - 1) + let + pos = c.pc + last = pos + size + + if last <= c.bytes.len: + c.pc = last + c.code.bytes.toOpenArray(pos, last - 1) else: c.pc = c.bytes.len - c.code.bytes.toOpenArray(0, -1) + c.code.bytes.toOpenArray(pos, c.bytes.high) -func readVmWord*(c: var CodeStream, n: static int): UInt256 = +func readVmWord*(c: var CodeStream, n: static int): UInt256 {.inline, noinit.} = ## Reads `n` bytes from the code stream and pads ## the remaining bytes with zeros. - let result_bytes = cast[ptr array[32, byte]](addr result) - - let last = min(c.pc + n, c.code.bytes.len) - let toWrite = last - c.pc - for i in 0 ..< toWrite: - result_bytes[i] = c.code.bytes[last - i - 1] - c.pc = last + UInt256.fromBytesBE(c.read(n)) func len*(c: CodeStream): int = len(c.code) @@ -100,13 +92,7 @@ proc decompile*(original: CodeStream): seq[(int, Op, string)] = while not c.atEnd: var op = c.next if op >= Push1 and op <= Push32: - result.add( - ( - c.pc - 1, - op, - "0x" & c.read(op.int - 95).toHex, - ) - ) + result.add((c.pc - 1, op, "0x" & c.read(op.int - 95).toHex)) elif op != Op.Stop: result.add((c.pc - 1, op, "")) else: diff --git a/nimbus/evm/state.nim b/nimbus/evm/state.nim index 4617bea96..9548100ec 100644 --- a/nimbus/evm/state.nim +++ b/nimbus/evm/state.nim @@ -34,19 +34,25 @@ proc init( tracer: TracerRef, flags: set[VMFlag] = self.flags) = ## Initialisation helper - assign(self.parent, parent) - self.blockCtx = blockCtx - self.gasPool = blockCtx.gasLimit + # Take care to (re)set all fields since the VMState might be recycled self.com = com - self.tracer = tracer self.stateDB = ac + self.gasPool = blockCtx.gasLimit + assign(self.parent, parent) + assign(self.blockCtx, blockCtx) + const txCtx = default(TxContext) + assign(self.txCtx, txCtx) self.flags = flags - self.blobGasUsed = 0'u64 self.fork = self.determineFork + self.tracer = tracer + self.receipts.setLen(0) + self.cumulativeGasUsed = 0 self.gasCosts = self.fork.forkToSchedule + self.blobGasUsed = 0'u64 + self.allLogs.setLen(0) + self.gasRefunded = 0 -func blockCtx(com: CommonRef, header: Header): - BlockContext = +func blockCtx(header: Header): BlockContext = BlockContext( timestamp : header.timestamp, gasLimit : header.gasLimit, @@ -104,24 +110,24 @@ proc reinit*(self: BaseVMState; ## Object descriptor ## queries about its `getStateRoot()`, i.e. `isTopLevelClean` evaluated `true`. If ## this function returns `false`, the function argument `self` is left ## untouched. - if self.stateDB.isTopLevelClean: - let - tracer = self.tracer - com = self.com - db = com.db - ac = if linear or self.stateDB.getStateRoot() == parent.stateRoot: self.stateDB - else: LedgerRef.init(db, self.stateDB.storeSlotHash) - flags = self.flags - self[].reset - self.init( - ac = ac, - parent = parent, - blockCtx = blockCtx, - com = com, - tracer = tracer, - flags = flags) - return true - # else: false + if not self.stateDB.isTopLevelClean: + return false + + let + tracer = self.tracer + com = self.com + db = com.db + ac = if linear or self.stateDB.getStateRoot() == parent.stateRoot: self.stateDB + else: LedgerRef.init(db, self.stateDB.storeSlotHash) + flags = self.flags + self.init( + ac = ac, + parent = parent, + blockCtx = blockCtx, + com = com, + tracer = tracer, + flags = flags) + true proc reinit*(self: BaseVMState; ## Object descriptor parent: Header; ## parent header, account sync pos. @@ -136,23 +142,10 @@ proc reinit*(self: BaseVMState; ## Object descriptor ## networks, the miner address is retrievable via `ecRecover()`. self.reinit( parent = parent, - blockCtx = self.com.blockCtx(header), + blockCtx = blockCtx(header), linear = linear ) -proc reinit*(self: BaseVMState; ## Object descriptor - header: Header; ## header with tx environment data fields - ): bool = - ## This is a variant of the `reinit()` function above where the field - ## `header.parentHash`, is used to fetch the `parent` Header to be - ## used in the `update()` variant, above. - let parent = self.com.db.getBlockHeader(header.parentHash).valueOr: - return false - self.reinit( - parent = parent, - header = header, - linear = false) - proc init*( self: BaseVMState; ## Object descriptor parent: Header; ## parent header, account sync position @@ -170,7 +163,7 @@ proc init*( self.init( ac = LedgerRef.init(com.db, storeSlotHash), parent = parent, - blockCtx = com.blockCtx(header), + blockCtx = blockCtx(header), com = com, tracer = tracer) diff --git a/nimbus/nimbus_import.nim b/nimbus/nimbus_import.nim index eb15f4cdb..4de9fa377 100644 --- a/nimbus/nimbus_import.nim +++ b/nimbus/nimbus_import.nim @@ -33,6 +33,60 @@ declareCounter nec_imported_gas, "Gas processed during import" var running {.volatile.} = true +proc openCsv(name: string): File = + try: + let f = open(name, fmAppend) + let pos = f.getFileSize() + if pos == 0: + f.writeLine("block_number,blocks,slot,txs,gas,time") + f + except IOError as exc: + fatal "Could not open statistics output file", file = name, err = exc.msg + quit(QuitFailure) + +proc getMetadata(networkId: NetworkId): auto = + # Network Specific Configurations + # TODO: the merge block number could be fetched from the era1 file instead, + # specially if the accumulator is added to the chain metadata + case networkId + of MainNet: + ( + getMetadataForNetwork("mainnet").cfg, + # Mainnet Validators Root + Eth2Digest.fromHex( + "0x4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95" + ), + 15537393'u64, # Last pre-merge block + 4700013'u64, # First post-merge slot + ) + of SepoliaNet: + ( + getMetadataForNetwork("sepolia").cfg, + Eth2Digest.fromHex( + "0xd8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078" + ), + 1450408'u64, # Last pre-merge block number + 115193'u64, # First post-merge slot + ) + of HoleskyNet: + ( + getMetadataForNetwork("holesky").cfg, + Eth2Digest.fromHex( + "0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1" + ), + 0'u64, # Last pre-merge block number + 0'u64, # First post-merge slot + ) + else: + fatal "Unsupported network", network = networkId + quit(QuitFailure) + +template boolFlag(flags, b): PersistBlockFlags = + if b: + flags + else: + {} + proc importBlocks*(conf: NimbusConf, com: CommonRef) = proc controlCHandler() {.noconv.} = when defined(windows): @@ -45,31 +99,18 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = let start = com.db.getSavedStateBlockNumber() + 1 chain = com.newChain() - - template boolFlag(flags, b): PersistBlockFlags = - if b: - flags - else: - {} - - var - imported = 0'u64 - importedSlot = 1'u64 - gas = GasInt(0) - txs = 0 + (cfg, genesis_validators_root, lastEra1Block, firstSlotAfterMerge) = + getMetadata(conf.networkId) time0 = Moment.now() + + # These variables are used from closures on purpose, so as to place them on + # the heap rather than the stack + var + slot = 1'u64 + time1 = Moment.now() # time at start of chunk csv = if conf.csvStats.isSome: - try: - let f = open(conf.csvStats.get(), fmAppend) - let pos = f.getFileSize() - if pos == 0: - f.writeLine("block_number,blocks,slot,txs,gas,time") - f - except IOError as exc: - error "Could not open statistics output file", - file = conf.csvStats, err = exc.msg - quit(QuitFailure) + openCsv(conf.csvStats.get()) else: File(nil) flags = @@ -78,75 +119,52 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = boolFlag(NoPersistBodies, not conf.storeBodies) + boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) + boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes) - blocks: seq[EthBlock] - clConfig: Eth2NetworkMetadata - genesis_validators_root: Eth2Digest - lastEra1Block: uint64 - firstSlotAfterMerge: uint64 + blk: Block + persister = Persister.init(chain, flags) + cstats: PersistStats # stats at start of chunk defer: if csv != nil: close(csv) - # Network Specific Configurations - # TODO: the merge block number could be fetched from the era1 file instead, - # specially if the accumulator is added to the chain metadata - if conf.networkId == MainNet: - doAssert isDir(conf.era1Dir.string), "Era1 directory not found" - clConfig = getMetadataForNetwork("mainnet") - genesis_validators_root = Eth2Digest.fromHex( - "0x4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95" - ) # Mainnet Validators Root - lastEra1Block = 15537393'u64 # Mainnet - firstSlotAfterMerge = - if isDir(conf.eraDir.string): - 4700013'u64 # Mainnet - else: - warn "No eraDir found for Mainnet, block loading will stop after era1" - 0'u64 # No eraDir for Mainnet - elif conf.networkId == SepoliaNet: - doAssert isDir(conf.era1Dir.string), "Era1 directory not found" - clConfig = getMetadataForNetwork("sepolia") - genesis_validators_root = Eth2Digest.fromHex( - "0xd8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078" - ) # Sepolia Validators Root - lastEra1Block = 1450408'u64 # Sepolia - firstSlotAfterMerge = - if isDir(conf.eraDir.string): - 115193'u64 # Sepolia - else: - warn "No eraDir found for Sepolia, block loading will stop after era1" - 0'u64 # No eraDir for Sepolia - elif conf.networkId == HoleskyNet: - doAssert isDir(conf.eraDir.string), "Era directory not found" - clConfig = getMetadataForNetwork("holesky") - genesis_validators_root = Eth2Digest.fromHex( - "0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1" - ) # Holesky Validators Root - lastEra1Block = 0'u64 - firstSlotAfterMerge = 0'u64 - else: - error "Unsupported network", network = conf.networkId - quit(QuitFailure) + template blockNumber(): uint64 = + start + uint64 persister.stats.blocks nec_import_block_number.set(start.int64) - template blockNumber(): uint64 = - start + imported - func f(value: float): string = - &"{value:4.3f}" + if value >= 1000: + &"{int(value)}" + elif value >= 100: + &"{value:4.1f}" + elif value >= 10: + &"{value:4.2f}" + else: + &"{value:4.3f}" - proc process() = - let - time1 = Moment.now() - statsRes = chain.persistBlocks(blocks, flags) - if statsRes.isErr(): - error "Failed to persist blocks", error = statsRes.error + proc persistBlock() = + persister.persistBlock(blk).isOkOr: + fatal "Could not persist block", blockNumber = blk.header.number, error quit(QuitFailure) - txs += statsRes[].txs - gas += statsRes[].gas + proc checkpoint(force: bool = false) = + let (blocks, txs, gas) = persister.stats + + if not force and blocks.uint64 mod conf.chunkSize != 0: + return + + persister.checkpoint().isOkOr: + fatal "Could not write database checkpoint", error + quit(QuitFailure) + + let (cblocks, ctxs, cgas) = + (blocks - cstats.blocks, txs - cstats.txs, gas - cstats.gas) + + if cblocks == 0: + return + + cstats = persister.stats + let time2 = Moment.now() diff1 = (time2 - time1).nanoseconds().float / 1000000000 @@ -154,22 +172,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = info "Imported blocks", blockNumber, - blocks = imported, - importedSlot, + slot, + blocks, txs, mgas = f(gas.float / 1000000), - bps = f(blocks.len.float / diff1), - tps = f(statsRes[].txs.float / diff1), - mgps = f(statsRes[].gas.float / 1000000 / diff1), - avgBps = f(imported.float / diff0), + bps = f(cblocks.float / diff1), + tps = f(ctxs.float / diff1), + mgps = f(cgas.float / 1000000 / diff1), + avgBps = f(blocks.float / diff0), avgTps = f(txs.float / diff0), avgMGps = f(gas.float / 1000000 / diff0), elapsed = toString(time2 - time0, 3) metrics.set(nec_import_block_number, int64(blockNumber)) - nec_imported_blocks.inc(blocks.len) - nec_imported_transactions.inc(statsRes[].txs) - nec_imported_gas.inc(int64 statsRes[].gas) + nec_imported_blocks.inc(cblocks) + nec_imported_transactions.inc(ctxs) + nec_imported_gas.inc(int64 cgas) if csv != nil: # In the CSV, we store a line for every chunk of blocks processed so @@ -177,19 +195,15 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = # process - this way, each sample is independent try: csv.writeLine( - [ - $blockNumber, - $blocks.len, - $importedSlot, - $statsRes[].txs, - $statsRes[].gas, - $(time2 - time1).nanoseconds(), - ].join(",") + [$blockNumber, $cblocks, $slot, $ctxs, $cgas, $(time2 - time1).nanoseconds()].join( + "," + ) ) csv.flushFile() except IOError as exc: warn "Could not write csv", err = exc.msg - blocks.setLen(0) + + time1 = time2 # Finds the slot number to resume the import process # First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1 @@ -213,94 +227,101 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = debug "Finding slot number to resume import", startSlot, endSlot while startSlot < endSlot: - let blk = getEthBlockFromEra( - era, historical_roots, historical_summaries, startSlot, clConfig.cfg - ).valueOr: + if not getEthBlockFromEra( + era, historical_roots, historical_summaries, startSlot, cfg, blk + ): startSlot += 1 if startSlot == endSlot - 1: error "No blocks found in the last era file" return false - else: - continue + + continue startSlot += 1 if blk.header.number < blockNumber: - notice "Available Era Files are already imported", + notice "Available `era` files are already imported", stateBlockNumber = blockNumber, eraBlockNumber = blk.header.number - quit QuitSuccess - else: - break + return false + break if blockNumber > 1: # Setting the initial lower bound - importedSlot = (blockNumber - lastEra1Block) + firstSlotAfterMerge - debug "Finding slot number after resuming import", importedSlot + slot = (blockNumber - lastEra1Block) + firstSlotAfterMerge + debug "Finding slot number after resuming import", slot # BlockNumber based slot finding var clNum = 0'u64 while clNum < blockNumber: - let blk = getEthBlockFromEra( - era, historical_roots, historical_summaries, Slot(importedSlot), clConfig.cfg - ).valueOr: - importedSlot += 1 + if not getEthBlockFromEra( + era, historical_roots, historical_summaries, Slot(slot), cfg, blk + ): + slot += 1 continue clNum = blk.header.number # decreasing the lower bound with each iteration - importedSlot += blockNumber - clNum + slot += blockNumber - clNum - notice "Resuming import from", importedSlot + notice "Matched block to slot number", blockNumber, slot return true - if isDir(conf.era1Dir.string) or isDir(conf.eraDir.string): - if start <= lastEra1Block: - notice "Importing era1 archive", - start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string - - let db = - if conf.networkId == MainNet: - Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present") - # Mainnet + if lastEra1Block > 0 and start <= lastEra1Block: + let + era1Name = + case conf.networkId + of MainNet: + "mainnet" + of SepoliaNet: + "sepolia" else: - Era1DbRef.init(conf.era1Dir.string, "sepolia").expect("Era files present") - # Sepolia - defer: - db.dispose() + raiseAssert "Other networks are unsupported or do not have an era1" + db = Era1DbRef.init(conf.era1Dir.string, era1Name).valueOr: + fatal "Could not open era1 database", era1Dir = conf.era1Dir, era1Name, error + quit(QuitFailure) - proc loadEraBlock(blockNumber: uint64): bool = - # Separate proc to reduce stack usage of blk - let blk = db.getEthBlock(blockNumber).valueOr: - error "Could not load block from era1", blockNumber, error=error - return false + notice "Importing era1 archive", + start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string - blocks.add blk - true + defer: + db.dispose() - while running and imported < conf.maxBlocks and blockNumber <= lastEra1Block: - if not loadEraBlock(blockNumber): - notice "No more era1 blocks to import", blockNumber - break + proc loadEraBlock(blockNumber: uint64): bool = + db.getEthBlock(blockNumber, blk).isOkOr: + return false + true - imported += 1 - - if blocks.lenu64 mod conf.chunkSize == 0: - process() - - if blocks.len > 0: - process() # last chunk, if any + while running and persister.stats.blocks.uint64 < conf.maxBlocks and + blockNumber <= lastEra1Block: + if not loadEraBlock(blockNumber): + notice "No more `era1` blocks to import", blockNumber, slot + break + persistBlock() + checkpoint() + block era1Import: if blockNumber > lastEra1Block: + if not isDir(conf.eraDir.string): + if blockNumber == 0: + fatal "`era` directory not found, cannot start import", + blockNumber, eraDir = conf.eraDir.string + quit(QuitFailure) + else: + notice "`era` directory not found, stopping import at merge boundary", + blockNumber, eraDir = conf.eraDir.string + break era1Import + notice "Importing era archive", blockNumber, dataDir = conf.dataDir.string, eraDir = conf.eraDir.string let - eraDB = EraDB.new(clConfig.cfg, conf.eraDir.string, genesis_validators_root) + eraDB = EraDB.new(cfg, conf.eraDir.string, genesis_validators_root) (historical_roots, historical_summaries, endSlot) = loadHistoricalRootsFromEra( - conf.eraDir.string, clConfig.cfg + conf.eraDir.string, cfg ).valueOr: - error "Error loading historical summaries", error - quit QuitFailure + fatal "Could not load historical summaries", + eraDir = conf.eraDir.string, error + quit(QuitFailure) # Load the last slot number var moreEraAvailable = true @@ -309,35 +330,39 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = eraDB, historical_roots.asSeq(), historical_summaries.asSeq(), endSlot ) - if importedSlot < firstSlotAfterMerge and firstSlotAfterMerge != 0: + if slot < firstSlotAfterMerge and firstSlotAfterMerge != 0: # if resuming import we do not update the slot - importedSlot = firstSlotAfterMerge + slot = firstSlotAfterMerge - proc loadEra1Block(importedSlot: Slot): bool = + proc loadEra1Block(): bool = # Separate proc to reduce stack usage of blk - var blk = getEthBlockFromEra( + if not getEthBlockFromEra( eraDB, historical_roots.asSeq(), historical_summaries.asSeq(), - importedSlot, - clConfig.cfg, - ).valueOr: + Slot(slot), + cfg, + blk, + ): return false - blocks.add blk true - while running and moreEraAvailable and imported < conf.maxBlocks and - importedSlot < endSlot: - if not loadEra1Block(Slot(importedSlot)): - importedSlot += 1 + while running and moreEraAvailable and + persister.stats.blocks.uint64 < conf.maxBlocks and slot < endSlot: + if not loadEra1Block(): + slot += 1 continue + slot += 1 - imported += 1 - importedSlot += 1 + persistBlock() + checkpoint() - if blocks.lenu64 mod conf.chunkSize == 0: - process() + checkpoint(true) - if blocks.len > 0: - process() + notice "Import complete", + blockNumber, + slot, + blocks = persister.stats.blocks, + txs = persister.stats.txs, + mgas = f(persister.stats.gas.float / 1000000) diff --git a/nimbus/utils/era_helpers.nim b/nimbus/utils/era_helpers.nim index 8427b6385..68e4ad92b 100644 --- a/nimbus/utils/era_helpers.nim +++ b/nimbus/utils/era_helpers.nim @@ -119,11 +119,11 @@ proc getWithdrawals*(x: seq[capella.Withdrawal]): seq[blocks.Withdrawal] = ) return withdrawals -proc getEthBlock*(blck: ForkyTrustedBeaconBlock): Opt[EthBlock] = +proc getEthBlock*(blck: ForkyTrustedBeaconBlock, res: var EthBlock): bool = ## Convert a beacon block to an eth1 block. const consensusFork = typeof(blck).kind when consensusFork >= ConsensusFork.Bellatrix: - let + var payload = blck.body.execution_payload txs = getTxs(payload.transactions.asSeq()) ethWithdrawals = @@ -152,33 +152,35 @@ proc getEthBlock*(blck: ForkyTrustedBeaconBlock): Opt[EthBlock] = else: Opt.none(Hash32) - header = Header( - parentHash: Hash32(payload.parent_hash.data), - ommersHash: EMPTY_UNCLE_HASH, - coinbase: EthAddress(payload.fee_recipient.data), - stateRoot: Root(payload.state_root.data), - transactionsRoot: calcTxRoot(txs), - receiptsRoot: Root(payload.receipts_root.data), - logsBloom: Bloom(payload.logs_bloom.data), - difficulty: 0.u256, - number: payload.block_number, - gasLimit: GasInt(payload.gas_limit), - gasUsed: GasInt(payload.gas_used), - timestamp: EthTime(payload.timestamp), - extraData: payload.extra_data.asSeq(), - mixHash: Bytes32(payload.prev_randao.data), - nonce: default(Bytes8), - baseFeePerGas: Opt.some(payload.base_fee_per_gas), - withdrawalsRoot: withdrawalRoot, - blobGasUsed: blobGasUsed, - excessBlobGas: excessBlobGas, - parentBeaconBlockRoot: parentBeaconBlockRoot, - ) - Opt.some EthBlock( - header: header, transactions: txs, uncles: @[], withdrawals: ethWithdrawals + res.header = Header( + parentHash: Hash32(payload.parent_hash.data), + ommersHash: EMPTY_UNCLE_HASH, + coinbase: EthAddress(payload.fee_recipient.data), + stateRoot: Root(payload.state_root.data), + transactionsRoot: calcTxRoot(txs), + receiptsRoot: Root(payload.receipts_root.data), + logsBloom: Bloom(payload.logs_bloom.data), + difficulty: 0.u256, + number: payload.block_number, + gasLimit: GasInt(payload.gas_limit), + gasUsed: GasInt(payload.gas_used), + timestamp: EthTime(payload.timestamp), + extraData: payload.extra_data.asSeq(), + mixHash: Bytes32(payload.prev_randao.data), + nonce: default(Bytes8), + baseFeePerGas: Opt.some(payload.base_fee_per_gas), + withdrawalsRoot: withdrawalRoot, + blobGasUsed: blobGasUsed, + excessBlobGas: excessBlobGas, + parentBeaconBlockRoot: parentBeaconBlockRoot, ) + res.transactions = move(txs) + res.uncles.reset() + res.withdrawals = move(ethWithdrawals) + + true else: - Opt.none(EthBlock) + false proc getEthBlockFromEra*( db: EraDB, @@ -186,15 +188,18 @@ proc getEthBlockFromEra*( historical_summaries: openArray[HistoricalSummary], slot: Slot, cfg: RuntimeConfig, -): Opt[EthBlock] = + res: var EthBlock, +): bool = let fork = cfg.consensusForkAtEpoch(slot.epoch) fork.withConsensusFork: type T = consensusFork.TrustedSignedBeaconBlock - var tmp = new T + var tmp = new Opt[T] # Pass in default Eth2Digest to avoid block root computation (it is not # needed in this case) tmp[] = db.getBlock( historical_roots, historical_summaries, slot, Opt.some(default(Eth2Digest)), T - ).valueOr: - return Opt.none(EthBlock) - getEthBlock(tmp[].message) + ) + if tmp[].isNone(): + return false + + getEthBlock(tmp[][].message, res) diff --git a/nrpc/nrpc.nim b/nrpc/nrpc.nim index b13be7972..0f3a8f004 100644 --- a/nrpc/nrpc.nim +++ b/nrpc/nrpc.nim @@ -61,7 +61,7 @@ template getELBlockFromBeaconChain( var eth1block: EthBlock if isAvailable: withBlck(clBlock.asTrusted()): - eth1Block = getEthBlock(forkyBlck.message).valueOr: + if not getEthBlock(forkyBlck.message, eth1Block): error "Failed to get EL block from CL head" quit(QuitFailure) diff --git a/tests/replay/undump_blocks_era1.nim b/tests/replay/undump_blocks_era1.nim index 4cd64dc2c..c4f36c402 100644 --- a/tests/replay/undump_blocks_era1.nim +++ b/tests/replay/undump_blocks_era1.nim @@ -30,14 +30,14 @@ iterator undumpBlocksEra1*( # a time and let the consumer do the chunking const blocksPerYield = 192 var tmp = newSeqOfCap[EthBlock](blocksPerYield) - + var blk: Block for i in 0 ..< stopAfter: - var bck = db.getEthBlock(least + i).valueOr: + db.getEthBlock(least + i, blk).isOkOr: if doAssertOk: doAssert i > 0, "expected at least one block" break - tmp.add move(bck) + tmp.add move(blk) # Genesis block requires a chunk of its own, for compatibility with current # test setup (a bit weird, that...)