Stream blocks during import (#2937)

When running the import, currently blocks are loaded in batches into a
`seq` then passed to the importer as such.

In reality, blocks are still processed one by one, so the batching does
not offer any performance advantage. It does however require that the
client wastes memory, up to several GB, on the block sequence while
they're waiting to be processed.

This PR introduces a persister that accepts these potentially large
blocks one by one and at the same time removes a number of redundant /
unnecessary copies, assignments and resets that were slowing down the
import process in general.
This commit is contained in:
Jacek Sieka 2024-12-18 13:21:20 +01:00 committed by GitHub
parent 06a544ac85
commit 7bbb0f4421
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 499 additions and 467 deletions

View File

@ -59,15 +59,19 @@ proc new*(
): Era1DB = ): Era1DB =
Era1DB(path: path, network: network, accumulator: accumulator) Era1DB(path: path, network: network, accumulator: accumulator)
proc getEthBlock*(db: Era1DB, blockNumber: uint64): Result[Block, string] = proc getEthBlock*(
db: Era1DB, blockNumber: uint64, res: var Block
): Result[void, string] =
let f = ?db.getEra1File(blockNumber.era) let f = ?db.getEra1File(blockNumber.era)
f.getEthBlock(blockNumber) f.getEthBlock(blockNumber, res)
proc getBlockTuple*(db: Era1DB, blockNumber: uint64): Result[BlockTuple, string] = proc getBlockTuple*(
db: Era1DB, blockNumber: uint64, res: var BlockTuple
): Result[void, string] =
let f = ?db.getEra1File(blockNumber.era) let f = ?db.getEra1File(blockNumber.era)
f.getBlockTuple(blockNumber) f.getBlockTuple(blockNumber, res)
proc getAccumulator*( proc getAccumulator*(
db: Era1DB, blockNumber: uint64 db: Era1DB, blockNumber: uint64

View File

@ -180,9 +180,10 @@ func offsetsLen(startNumber: uint64): int =
proc toCompressedRlpBytes(item: auto): seq[byte] = proc toCompressedRlpBytes(item: auto): seq[byte] =
snappy.encodeFramed(rlp.encode(item)) snappy.encodeFramed(rlp.encode(item))
proc fromCompressedRlpBytes(bytes: openArray[byte], T: type): Result[T, string] = proc fromCompressedRlpBytes[T](bytes: openArray[byte], v: var T): Result[void, string] =
try: try:
ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), T)) v = rlp.decode(decodeFramed(bytes, checkIntegrity = false), T)
ok()
except RlpError as e: except RlpError as e:
err("Invalid compressed RLP data for " & $T & ": " & e.msg) err("Invalid compressed RLP data for " & $T & ": " & e.msg)
@ -300,32 +301,32 @@ proc skipRecord*(f: Era1File): Result[void, string] =
f[].handle.get().skipRecord() f[].handle.get().skipRecord()
proc getBlockHeader(f: Era1File): Result[headers.Header, string] = proc getBlockHeader(f: Era1File, res: var headers.Header): Result[void, string] =
var bytes: seq[byte] var bytes: seq[byte]
let header = ?f[].handle.get().readRecord(bytes) let header = ?f[].handle.get().readRecord(bytes)
if header.typ != CompressedHeader: if header.typ != CompressedHeader:
return err("Invalid era file: didn't find block header at index position") return err("Invalid era file: didn't find block header at index position")
fromCompressedRlpBytes(bytes, headers.Header) fromCompressedRlpBytes(bytes, res)
proc getBlockBody(f: Era1File): Result[BlockBody, string] = proc getBlockBody(f: Era1File, res: var BlockBody): Result[void, string] =
var bytes: seq[byte] var bytes: seq[byte]
let header = ?f[].handle.get().readRecord(bytes) let header = ?f[].handle.get().readRecord(bytes)
if header.typ != CompressedBody: if header.typ != CompressedBody:
return err("Invalid era file: didn't find block body at index position") return err("Invalid era file: didn't find block body at index position")
fromCompressedRlpBytes(bytes, BlockBody) fromCompressedRlpBytes(bytes, res)
proc getReceipts(f: Era1File): Result[seq[Receipt], string] = proc getReceipts(f: Era1File, res: var seq[Receipt]): Result[void, string] =
var bytes: seq[byte] var bytes: seq[byte]
let header = ?f[].handle.get().readRecord(bytes) let header = ?f[].handle.get().readRecord(bytes)
if header.typ != CompressedReceipts: if header.typ != CompressedReceipts:
return err("Invalid era file: didn't find receipts at index position") return err("Invalid era file: didn't find receipts at index position")
fromCompressedRlpBytes(bytes, seq[Receipt]) fromCompressedRlpBytes(bytes, res)
proc getTotalDifficulty(f: Era1File): Result[UInt256, string] = proc getTotalDifficulty(f: Era1File): Result[UInt256, string] =
var bytes: seq[byte] var bytes: seq[byte]
@ -339,18 +340,25 @@ proc getTotalDifficulty(f: Era1File): Result[UInt256, string] =
ok(UInt256.fromBytesLE(bytes)) ok(UInt256.fromBytesLE(bytes))
proc getNextEthBlock*(f: Era1File): Result[Block, string] = proc getNextEthBlock*(f: Era1File, res: var Block): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome doAssert not isNil(f) and f[].handle.isSome
var var body: BlockBody
header = ?getBlockHeader(f) ?getBlockHeader(f, res.header)
body = ?getBlockBody(f) ?getBlockBody(f, body)
?skipRecord(f) # receipts ?skipRecord(f) # receipts
?skipRecord(f) # totalDifficulty ?skipRecord(f) # totalDifficulty
ok(Block.init(move(header), move(body))) res.transactions = move(body.transactions)
res.uncles = move(body.uncles)
res.withdrawals = move(body.withdrawals)
proc getEthBlock*(f: Era1File, blockNumber: uint64): Result[Block, string] = ok()
proc getEthBlock*(
f: Era1File, blockNumber: uint64, res: var Block
): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome doAssert not isNil(f) and f[].handle.isSome
doAssert( doAssert(
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber, blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
@ -361,20 +369,21 @@ proc getEthBlock*(f: Era1File, blockNumber: uint64): Result[Block, string] =
?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) ?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)
getNextEthBlock(f) getNextEthBlock(f, res)
proc getNextBlockTuple*(f: Era1File): Result[BlockTuple, string] = proc getNextBlockTuple*(f: Era1File, res: var BlockTuple): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome doAssert not isNil(f) and f[].handle.isSome
let ?getBlockHeader(f, res.header)
blockHeader = ?getBlockHeader(f) ?getBlockBody(f, res.body)
blockBody = ?getBlockBody(f) ?getReceipts(f, res.receipts)
receipts = ?getReceipts(f) res.td = ?getTotalDifficulty(f)
totalDifficulty = ?getTotalDifficulty(f)
ok((blockHeader, blockBody, receipts, totalDifficulty)) ok()
proc getBlockTuple*(f: Era1File, blockNumber: uint64): Result[BlockTuple, string] = proc getBlockTuple*(
f: Era1File, blockNumber: uint64, res: var BlockTuple
): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome doAssert not isNil(f) and f[].handle.isSome
doAssert( doAssert(
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber, blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
@ -385,9 +394,11 @@ proc getBlockTuple*(f: Era1File, blockNumber: uint64): Result[BlockTuple, string
?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) ?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)
getNextBlockTuple(f) getNextBlockTuple(f, res)
proc getBlockHeader*(f: Era1File, blockNumber: uint64): Result[headers.Header, string] = proc getBlockHeader*(
f: Era1File, blockNumber: uint64, res: var headers.Header
): Result[void, string] =
doAssert not isNil(f) and f[].handle.isSome doAssert not isNil(f) and f[].handle.isSome
doAssert( doAssert(
blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber, blockNumber >= f[].blockIdx.startNumber and blockNumber <= f[].blockIdx.endNumber,
@ -398,7 +409,7 @@ proc getBlockHeader*(f: Era1File, blockNumber: uint64): Result[headers.Header, s
?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) ?f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)
getBlockHeader(f) getBlockHeader(f, res)
proc getTotalDifficulty*(f: Era1File, blockNumber: uint64): Result[UInt256, string] = proc getTotalDifficulty*(f: Era1File, blockNumber: uint64): Result[UInt256, string] =
doAssert not isNil(f) and f[].handle.isSome doAssert not isNil(f) and f[].handle.isSome
@ -445,13 +456,13 @@ proc buildAccumulator*(f: Era1File): Result[EpochRecordCached, string] =
endNumber = f.blockIdx.endNumber() endNumber = f.blockIdx.endNumber()
var headerRecords: seq[HeaderRecord] var headerRecords: seq[HeaderRecord]
var header: headers.Header
for blockNumber in startNumber .. endNumber: for blockNumber in startNumber .. endNumber:
let ?f.getBlockHeader(blockNumber, header)
blockHeader = ?f.getBlockHeader(blockNumber) let totalDifficulty = ?f.getTotalDifficulty(blockNumber)
totalDifficulty = ?f.getTotalDifficulty(blockNumber)
headerRecords.add( headerRecords.add(
HeaderRecord(blockHash: blockHeader.rlpHash(), totalDifficulty: totalDifficulty) HeaderRecord(blockHash: header.rlpHash(), totalDifficulty: totalDifficulty)
) )
ok(EpochRecordCached.init(headerRecords)) ok(EpochRecordCached.init(headerRecords))
@ -462,25 +473,26 @@ proc verify*(f: Era1File): Result[Digest, string] =
endNumber = f.blockIdx.endNumber() endNumber = f.blockIdx.endNumber()
var headerRecords: seq[HeaderRecord] var headerRecords: seq[HeaderRecord]
var blockTuple: BlockTuple
for blockNumber in startNumber .. endNumber: for blockNumber in startNumber .. endNumber:
?f.getBlockTuple(blockNumber, blockTuple)
let let
(blockHeader, blockBody, receipts, totalDifficulty) = txRoot = calcTxRoot(blockTuple.body.transactions)
?f.getBlockTuple(blockNumber) ommershHash = rlpHash(blockTuple.body.uncles)
txRoot = calcTxRoot(blockBody.transactions) if blockTuple.header.txRoot != txRoot:
ommershHash = rlpHash(blockBody.uncles)
if blockHeader.txRoot != txRoot:
return err("Invalid transactions root") return err("Invalid transactions root")
if blockHeader.ommersHash != ommershHash: if blockTuple.header.ommersHash != ommershHash:
return err("Invalid ommers hash") return err("Invalid ommers hash")
if blockHeader.receiptsRoot != calcReceiptsRoot(receipts): if blockTuple.header.receiptsRoot != calcReceiptsRoot(blockTuple.receipts):
return err("Invalid receipts root") return err("Invalid receipts root")
headerRecords.add( headerRecords.add(
HeaderRecord(blockHash: blockHeader.rlpHash(), totalDifficulty: totalDifficulty) HeaderRecord(
blockHash: blockTuple.header.rlpHash(), totalDifficulty: blockTuple.td
)
) )
let expectedRoot = ?f.getAccumulatorRoot() let expectedRoot = ?f.getAccumulatorRoot()
@ -496,9 +508,9 @@ iterator era1BlockHeaders*(f: Era1File): headers.Header =
startNumber = f.blockIdx.startNumber startNumber = f.blockIdx.startNumber
endNumber = f.blockIdx.endNumber() endNumber = f.blockIdx.endNumber()
var header: headers.Header
for blockNumber in startNumber .. endNumber: for blockNumber in startNumber .. endNumber:
let header = f.getBlockHeader(blockNumber).valueOr: f.getBlockHeader(blockNumber, header).expect("Header can be read")
raiseAssert("Failed to read block header: " & error)
yield header yield header
iterator era1BlockTuples*(f: Era1File): BlockTuple = iterator era1BlockTuples*(f: Era1File): BlockTuple =
@ -506,7 +518,7 @@ iterator era1BlockTuples*(f: Era1File): BlockTuple =
startNumber = f.blockIdx.startNumber startNumber = f.blockIdx.startNumber
endNumber = f.blockIdx.endNumber() endNumber = f.blockIdx.endNumber()
var blockTuple: BlockTuple
for blockNumber in startNumber .. endNumber: for blockNumber in startNumber .. endNumber:
let blockTuple = f.getBlockTuple(blockNumber).valueOr: f.getBlockTuple(blockNumber, blockTuple).expect("Block tuple can be read")
raiseAssert("Failed to read block tuple: " & error)
yield blockTuple yield blockTuple

View File

@ -374,14 +374,15 @@ proc runBackfillLoopAuditMode(
rng = newRng() rng = newRng()
db = Era1DB.new(era1Dir, "mainnet", loadAccumulator()) db = Era1DB.new(era1Dir, "mainnet", loadAccumulator())
var blockTuple: BlockTuple
while true: while true:
let let
# Grab a random blockNumber to audit and potentially gossip # Grab a random blockNumber to audit and potentially gossip
blockNumber = rng[].rand(network_metadata.mergeBlockNumber - 1).uint64 blockNumber = rng[].rand(network_metadata.mergeBlockNumber - 1).uint64
(header, body, receipts, _) = db.getBlockTuple(blockNumber).valueOr: db.getBlockTuple(blockNumber, blockTuple).isOkOr:
error "Failed to get block tuple", error, blockNumber error "Failed to get block tuple", error, blockNumber
continue continue
blockHash = header.rlpHash() let blockHash = blockTuple.header.rlpHash()
var headerSuccess, bodySuccess, receiptsSuccess = false var headerSuccess, bodySuccess, receiptsSuccess = false
@ -441,7 +442,7 @@ proc runBackfillLoopAuditMode(
error "Invalid hex for block body content", error = e.msg error "Invalid hex for block body content", error = e.msg
break bodyBlock break bodyBlock
validateBlockBodyBytes(content, header).isOkOr: validateBlockBodyBytes(content, blockTuple.header).isOkOr:
error "Block body is invalid", error error "Block body is invalid", error
break bodyBlock break bodyBlock
@ -469,7 +470,7 @@ proc runBackfillLoopAuditMode(
error "Invalid hex for block receipts content", error = e.msg error "Invalid hex for block receipts content", error = e.msg
break receiptsBlock break receiptsBlock
validateReceiptsBytes(content, header.receiptsRoot).isOkOr: validateReceiptsBytes(content, blockTuple.header.receiptsRoot).isOkOr:
error "Block receipts are invalid", error error "Block receipts are invalid", error
break receiptsBlock break receiptsBlock
@ -481,7 +482,7 @@ proc runBackfillLoopAuditMode(
let let
epochRecord = db.getAccumulator(blockNumber).valueOr: epochRecord = db.getAccumulator(blockNumber).valueOr:
raiseAssert "Failed to get accumulator from EraDB: " & error raiseAssert "Failed to get accumulator from EraDB: " & error
headerWithProof = buildHeaderWithProof(header, epochRecord).valueOr: headerWithProof = buildHeaderWithProof(blockTuple.header, epochRecord).valueOr:
raiseAssert "Failed to build header with proof: " & error raiseAssert "Failed to build header with proof: " & error
# gossip block header by hash # gossip block header by hash
@ -489,9 +490,13 @@ proc runBackfillLoopAuditMode(
# gossip block header by number # gossip block header by number
await bridge.gossipBlockHeader(blockNumber, headerWithProof) await bridge.gossipBlockHeader(blockNumber, headerWithProof)
if not bodySuccess: if not bodySuccess:
await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body)) await bridge.gossipBlockBody(
blockHash, PortalBlockBodyLegacy.fromBlockBody(blockTuple.body)
)
if not receiptsSuccess: if not receiptsSuccess:
await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts)) await bridge.gossipReceipts(
blockHash, PortalReceipts.fromReceipts(blockTuple.receipts)
)
await sleepAsync(2.seconds) await sleepAsync(2.seconds)

View File

@ -96,7 +96,6 @@ proc setBlock*(c: ChainRef; blk: Block): Result[void, string] =
let let
vmState = c.getVmState(header).valueOr: vmState = c.getVmState(header).valueOr:
return err("no vmstate") return err("no vmstate")
_ = vmState.parent.stateRoot # Check point
? vmState.processBlock(blk) ? vmState.processBlock(blk)
? c.db.persistHeaderAndSetHead(header, c.com.startOfHistory) ? c.db.persistHeaderAndSetHead(header, c.com.startOfHistory)

View File

@ -1,5 +1,5 @@
# Nimbus # Nimbus
# Copyright (c) 2021 Status Research & Development GmbH # Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed under either of # Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT)) # * MIT license ([LICENSE-MIT](LICENSE-MIT))

View File

@ -11,6 +11,7 @@
{.push raises: [].} {.push raises: [].}
import import
stew/assign2,
results, results,
../../evm/state, ../../evm/state,
../../evm/types, ../../evm/types,
@ -29,8 +30,8 @@ export results
type type
PersistBlockFlag* = enum PersistBlockFlag* = enum
NoValidation # Validate the batch instead of validating each block in it NoValidation # Disable chunk state root validation
NoFullValidation # Validate the batch instead of validating each block in it NoFullValidation # Disable per-block validation
NoPersistHeader NoPersistHeader
NoPersistTransactions NoPersistTransactions
NoPersistUncles NoPersistUncles
@ -40,157 +41,142 @@ type
PersistBlockFlags* = set[PersistBlockFlag] PersistBlockFlags* = set[PersistBlockFlag]
PersistStats = tuple[blocks: int, txs: int, gas: GasInt] Persister* = object
c: ChainRef
flags: PersistBlockFlags
const vmState: BaseVMState
NoPersistBodies* = {NoPersistTransactions, NoPersistUncles, NoPersistWithdrawals} dbTx: CoreDbTxRef
stats*: PersistStats
CleanUpEpoch = 30_000.BlockNumber parent: Header
## Regular checks for history clean up (applies to single state DB). This
## is mainly a debugging/testing feature so that the database can be held PersistStats* = tuple[blocks: int, txs: int, gas: GasInt]
## a bit smaller. It is not applicable to a full node.
const NoPersistBodies* = {NoPersistTransactions, NoPersistUncles, NoPersistWithdrawals}
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private # Private
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc getVmState( proc getVmState(
c: ChainRef, header: Header, storeSlotHash = false p: var Persister, header: Header, storeSlotHash = false
): Result[BaseVMState, string] = ): Result[BaseVMState, string] =
let vmState = BaseVMState() if p.vmState == nil:
if not vmState.init(header, c.com, storeSlotHash = storeSlotHash): let vmState = BaseVMState()
return err("Could not initialise VMState") if not vmState.init(header, p.c.com, storeSlotHash = storeSlotHash):
ok(vmState) return err("Could not initialise VMState")
p.vmState = vmState
else:
if header.number != p.parent.number + 1:
return err("Only linear histories supported by Persister")
proc purgeOlderBlocksFromHistory(db: CoreDbRef, bn: BlockNumber) = if not p.vmState.reinit(p.parent, header, linear = true):
## Remove non-reachable blocks from KVT database return err("Could not update VMState for new block")
if 0 < bn:
var blkNum = bn - 1
while 0 < blkNum:
if not db.forgetHistory blkNum:
break
blkNum = blkNum - 1
proc persistBlocksImpl( ok(p.vmState)
c: ChainRef, blocks: openArray[Block], flags: PersistBlockFlags = {}
): Result[PersistStats, string] =
let dbTx = c.db.ctx.newTransaction()
defer:
dbTx.dispose()
# Note that `0 < headers.len`, assured when called from `persistBlocks()` proc dispose*(p: var Persister) =
let if p.dbTx != nil:
vmState = p.dbTx.dispose()
?c.getVmState(blocks[0].header, storeSlotHash = NoPersistSlotHashes notin flags) p.dbTx = nil
fromBlock = blocks[0].header.number
toBlock = blocks[blocks.high()].header.number
trace "Persisting blocks", fromBlock, toBlock
var proc init*(T: type Persister, c: ChainRef, flags: PersistBlockFlags): T =
blks = 0 T(c: c, flags: flags)
txs = 0
gas = GasInt(0)
parentHash: Hash32 # only needed after the first block
for blk in blocks:
template header(): Header =
blk.header
# Full validation means validating the state root at every block and proc checkpoint*(p: var Persister): Result[void, string] =
# performing the more expensive hash computations on the block itself, ie if NoValidation notin p.flags:
# verifying that the transaction and receipts roots are valid - when not let stateRoot = p.c.db.ctx.getAccounts().getStateRoot().valueOr:
# doing full validation, we skip these expensive checks relying instead return err($$error)
# on the source of the data to have performed them previously or because
# the cost of failure is low.
# TODO Figure out the right balance for header fields - in particular, if
# we receive instruction from the CL while syncing that a block is
# CL-valid, do we skip validation while "far from head"? probably yes.
# This requires performing a header-chain validation from that CL-valid
# block which the current code doesn't express.
# Also, the potential avenues for corruption should be described with
# more rigor, ie if the txroot doesn't match but everything else does,
# can the state root of the last block still be correct? Dubious, but
# what would be the consequences? We would roll back the full set of
# blocks which is fairly low-cost.
let skipValidation =
NoFullValidation in flags and header.number != toBlock or NoValidation in flags
if p.parent.stateRoot != stateRoot:
if blks > 0: # TODO replace logging with better error
template parent(): Header = debug "wrong state root in block",
blocks[blks - 1].header blockNumber = p.parent.number,
blockHash = p.parent.blockHash,
let updated = parentHash = p.parent.parentHash,
if header.number == parent.number + 1 and header.parentHash == parentHash: expected = p.parent.stateRoot,
vmState.reinit(parent = parent, header = header, linear = true) actual = stateRoot
else: return err(
# TODO remove this code path and process only linear histories in this "stateRoot mismatch, expect: " & $p.parent.stateRoot & ", got: " & $stateRoot
# function
vmState.reinit(header = header)
if not updated:
debug "Cannot update VmState", blockNumber = header.number
return err("Cannot update VmState to block " & $header.number)
# TODO even if we're skipping validation, we should perform basic sanity
# checks on the block and header - that fields are sanely set for the
# given hard fork and similar path-independent checks - these same
# sanity checks should be performed early in the processing pipeline no
# matter their provenance.
if not skipValidation and c.extraValidation:
# TODO: how to checkseal from here
?c.com.validateHeaderAndKinship(blk, vmState.parent)
# Generate receipts for storage or validation but skip them otherwise
?vmState.processBlock(
blk,
skipValidation,
skipReceipts = skipValidation and NoPersistReceipts in flags,
skipUncles = NoPersistUncles in flags,
taskpool = c.com.taskpool,
)
let blockHash = header.blockHash()
if NoPersistHeader notin flags:
?c.db.persistHeaderAndSetHead(
blockHash, header, c.com.startOfHistory)
if NoPersistTransactions notin flags:
c.db.persistTransactions(header.number, header.txRoot, blk.transactions)
if NoPersistReceipts notin flags:
c.db.persistReceipts(header.receiptsRoot, vmState.receipts)
if NoPersistWithdrawals notin flags and blk.withdrawals.isSome:
c.db.persistWithdrawals(
header.withdrawalsRoot.expect("WithdrawalsRoot should be verified before"),
blk.withdrawals.get,
) )
# update currentBlock *after* we persist it if p.dbTx != nil:
# so the rpc return consistent result p.dbTx.commit()
# between eth_blockNumber and eth_syncing p.dbTx = nil
c.com.syncCurrent = header.number
blks += 1
txs += blk.transactions.len
gas += blk.header.gasUsed
parentHash = blockHash
dbTx.commit()
# Save and record the block number before the last saved block state. # Save and record the block number before the last saved block state.
c.db.persistent(toBlock).isOkOr: p.c.db.persistent(p.parent.number).isOkOr:
return err("Failed to save state: " & $$error) return err("Failed to save state: " & $$error)
if c.com.pruneHistory: ok()
# There is a feature for test systems to regularly clean up older blocks
# from the database, not appicable to a full node set up.
let n = fromBlock div CleanUpEpoch
if 0 < n and n < (toBlock div CleanUpEpoch):
# Starts at around `2 * CleanUpEpoch`
c.db.purgeOlderBlocksFromHistory(fromBlock - CleanUpEpoch)
ok((blks, txs, gas)) proc persistBlock*(p: var Persister, blk: Block): Result[void, string] =
template header(): Header =
blk.header
let c = p.c
# Full validation means validating the state root at every block and
# performing the more expensive hash computations on the block itself, ie
# verifying that the transaction and receipts roots are valid - when not
# doing full validation, we skip these expensive checks relying instead
# on the source of the data to have performed them previously or because
# the cost of failure is low.
# TODO Figure out the right balance for header fields - in particular, if
# we receive instruction from the CL while syncing that a block is
# CL-valid, do we skip validation while "far from head"? probably yes.
# This requires performing a header-chain validation from that CL-valid
# block which the current code doesn't express.
# Also, the potential avenues for corruption should be described with
# more rigor, ie if the txroot doesn't match but everything else does,
# can the state root of the last block still be correct? Dubious, but
# what would be the consequences? We would roll back the full set of
# blocks which is fairly low-cost.
let
skipValidation = NoValidation in p.flags
vmState = ?p.getVmState(header, storeSlotHash = NoPersistSlotHashes notin p.flags)
# TODO even if we're skipping validation, we should perform basic sanity
# checks on the block and header - that fields are sanely set for the
# given hard fork and similar path-independent checks - these same
# sanity checks should be performed early in the processing pipeline no
# matter their provenance.
if not skipValidation:
?c.com.validateHeaderAndKinship(blk, vmState.parent)
# Generate receipts for storage or validation but skip them otherwise
?vmState.processBlock(
blk,
skipValidation,
skipReceipts = skipValidation and NoPersistReceipts in p.flags,
skipUncles = NoPersistUncles in p.flags,
taskpool = c.com.taskpool,
)
if NoPersistHeader notin p.flags:
let blockHash = header.blockHash()
?c.db.persistHeaderAndSetHead(blockHash, header, c.com.startOfHistory)
if NoPersistTransactions notin p.flags:
c.db.persistTransactions(header.number, header.txRoot, blk.transactions)
if NoPersistReceipts notin p.flags:
c.db.persistReceipts(header.receiptsRoot, vmState.receipts)
if NoPersistWithdrawals notin p.flags and blk.withdrawals.isSome:
c.db.persistWithdrawals(
header.withdrawalsRoot.expect("WithdrawalsRoot should be verified before"),
blk.withdrawals.get,
)
p.stats.blocks += 1
p.stats.txs += blk.transactions.len
p.stats.gas += blk.header.gasUsed
assign(p.parent, header)
ok()
proc persistBlocks*( proc persistBlocks*(
c: ChainRef, blocks: openArray[Block], flags: PersistBlockFlags = {} c: ChainRef, blocks: openArray[Block], flags: PersistBlockFlags = {}
@ -200,7 +186,21 @@ proc persistBlocks*(
debug "Nothing to do" debug "Nothing to do"
return ok(default(PersistStats)) # TODO not nice to return nil return ok(default(PersistStats)) # TODO not nice to return nil
c.persistBlocksImpl(blocks, flags) var p = Persister.init(c, flags)
for blk in blocks:
p.persistBlock(blk).isOkOr:
p.dispose()
return err(error)
# update currentBlock *after* we persist it
# so the rpc return consistent result
# between eth_blockNumber and eth_syncing
c.com.syncCurrent = p.parent.number
let res = p.checkpoint()
p.dispose()
res and ok(p.stats)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End

View File

@ -86,8 +86,7 @@ proc finish*(db: CoreDbRef; eradicate = false) =
db.ifTrackNewApi: debug logTxt, api, elapsed db.ifTrackNewApi: debug logTxt, api, elapsed
proc `$$`*(e: CoreDbError): string = proc `$$`*(e: CoreDbError): string =
## Pretty print error symbol, note that this directive may have side effects ## Pretty print error symbol
## as it calls a backend function.
## ##
e.toStr() e.toStr()

View File

@ -14,10 +14,10 @@ import
stew/io2, stew/io2,
std/[os, parseutils, strutils, tables], std/[os, parseutils, strutils, tables],
results, results,
eth/common/eth_types, eth/common/blocks,
../../../fluffy/eth_data/era1 ../../../fluffy/eth_data/era1
export results, eth_types export results, blocks
# TODO this is a "rough copy" of the fluffy DB, minus the accumulator (it goes # TODO this is a "rough copy" of the fluffy DB, minus the accumulator (it goes
# by era number alone instead of rooted name) - eventually the two should # by era number alone instead of rooted name) - eventually the two should
@ -80,15 +80,19 @@ proc init*(
ok Era1DbRef(path: path, network: network, filenames: filenames) ok Era1DbRef(path: path, network: network, filenames: filenames)
proc getEthBlock*(db: Era1DbRef, blockNumber: uint64): Result[EthBlock, string] = proc getEthBlock*(
db: Era1DbRef, blockNumber: uint64, res: var Block
): Result[void, string] =
let f = ?db.getEra1File(blockNumber.era) let f = ?db.getEra1File(blockNumber.era)
f.getEthBlock(blockNumber) f.getEthBlock(blockNumber, res)
proc getBlockTuple*(db: Era1DbRef, blockNumber: uint64): Result[BlockTuple, string] = proc getBlockTuple*(
db: Era1DbRef, blockNumber: uint64, res: var BlockTuple
): Result[void, string] =
let f = ?db.getEra1File(blockNumber.era) let f = ?db.getEra1File(blockNumber.era)
f.getBlockTuple(blockNumber) f.getBlockTuple(blockNumber, res)
proc dispose*(db: Era1DbRef) = proc dispose*(db: Era1DbRef) =
for w in db.files: for w in db.files:

View File

@ -5,12 +5,7 @@
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) # * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except according to those terms. # at your option. This file may not be copied, modified, or distributed except according to those terms.
import import chronicles, eth/common, stew/byteutils, ./interpreter/op_codes, ./code_bytes
chronicles,
eth/common,
stew/byteutils,
./interpreter/op_codes,
./code_bytes
export code_bytes export code_bytes
@ -31,24 +26,21 @@ func init*(T: type CodeStream, code: openArray[char]): T =
T(code: CodeBytesRef.init(code)) T(code: CodeBytesRef.init(code))
template read*(c: var CodeStream, size: int): openArray[byte] = template read*(c: var CodeStream, size: int): openArray[byte] =
if c.pc + size - 1 < c.bytes.len: let
let pos = c.pc pos = c.pc
c.pc += size last = pos + size
c.code.bytes.toOpenArray(pos, pos + size - 1)
if last <= c.bytes.len:
c.pc = last
c.code.bytes.toOpenArray(pos, last - 1)
else: else:
c.pc = c.bytes.len c.pc = c.bytes.len
c.code.bytes.toOpenArray(0, -1) c.code.bytes.toOpenArray(pos, c.bytes.high)
func readVmWord*(c: var CodeStream, n: static int): UInt256 = func readVmWord*(c: var CodeStream, n: static int): UInt256 {.inline, noinit.} =
## Reads `n` bytes from the code stream and pads ## Reads `n` bytes from the code stream and pads
## the remaining bytes with zeros. ## the remaining bytes with zeros.
let result_bytes = cast[ptr array[32, byte]](addr result) UInt256.fromBytesBE(c.read(n))
let last = min(c.pc + n, c.code.bytes.len)
let toWrite = last - c.pc
for i in 0 ..< toWrite:
result_bytes[i] = c.code.bytes[last - i - 1]
c.pc = last
func len*(c: CodeStream): int = func len*(c: CodeStream): int =
len(c.code) len(c.code)
@ -100,13 +92,7 @@ proc decompile*(original: CodeStream): seq[(int, Op, string)] =
while not c.atEnd: while not c.atEnd:
var op = c.next var op = c.next
if op >= Push1 and op <= Push32: if op >= Push1 and op <= Push32:
result.add( result.add((c.pc - 1, op, "0x" & c.read(op.int - 95).toHex))
(
c.pc - 1,
op,
"0x" & c.read(op.int - 95).toHex,
)
)
elif op != Op.Stop: elif op != Op.Stop:
result.add((c.pc - 1, op, "")) result.add((c.pc - 1, op, ""))
else: else:

View File

@ -34,19 +34,25 @@ proc init(
tracer: TracerRef, tracer: TracerRef,
flags: set[VMFlag] = self.flags) = flags: set[VMFlag] = self.flags) =
## Initialisation helper ## Initialisation helper
assign(self.parent, parent) # Take care to (re)set all fields since the VMState might be recycled
self.blockCtx = blockCtx
self.gasPool = blockCtx.gasLimit
self.com = com self.com = com
self.tracer = tracer
self.stateDB = ac self.stateDB = ac
self.gasPool = blockCtx.gasLimit
assign(self.parent, parent)
assign(self.blockCtx, blockCtx)
const txCtx = default(TxContext)
assign(self.txCtx, txCtx)
self.flags = flags self.flags = flags
self.blobGasUsed = 0'u64
self.fork = self.determineFork self.fork = self.determineFork
self.tracer = tracer
self.receipts.setLen(0)
self.cumulativeGasUsed = 0
self.gasCosts = self.fork.forkToSchedule self.gasCosts = self.fork.forkToSchedule
self.blobGasUsed = 0'u64
self.allLogs.setLen(0)
self.gasRefunded = 0
func blockCtx(com: CommonRef, header: Header): func blockCtx(header: Header): BlockContext =
BlockContext =
BlockContext( BlockContext(
timestamp : header.timestamp, timestamp : header.timestamp,
gasLimit : header.gasLimit, gasLimit : header.gasLimit,
@ -104,24 +110,24 @@ proc reinit*(self: BaseVMState; ## Object descriptor
## queries about its `getStateRoot()`, i.e. `isTopLevelClean` evaluated `true`. If ## queries about its `getStateRoot()`, i.e. `isTopLevelClean` evaluated `true`. If
## this function returns `false`, the function argument `self` is left ## this function returns `false`, the function argument `self` is left
## untouched. ## untouched.
if self.stateDB.isTopLevelClean: if not self.stateDB.isTopLevelClean:
let return false
tracer = self.tracer
com = self.com let
db = com.db tracer = self.tracer
ac = if linear or self.stateDB.getStateRoot() == parent.stateRoot: self.stateDB com = self.com
else: LedgerRef.init(db, self.stateDB.storeSlotHash) db = com.db
flags = self.flags ac = if linear or self.stateDB.getStateRoot() == parent.stateRoot: self.stateDB
self[].reset else: LedgerRef.init(db, self.stateDB.storeSlotHash)
self.init( flags = self.flags
ac = ac, self.init(
parent = parent, ac = ac,
blockCtx = blockCtx, parent = parent,
com = com, blockCtx = blockCtx,
tracer = tracer, com = com,
flags = flags) tracer = tracer,
return true flags = flags)
# else: false true
proc reinit*(self: BaseVMState; ## Object descriptor proc reinit*(self: BaseVMState; ## Object descriptor
parent: Header; ## parent header, account sync pos. parent: Header; ## parent header, account sync pos.
@ -136,23 +142,10 @@ proc reinit*(self: BaseVMState; ## Object descriptor
## networks, the miner address is retrievable via `ecRecover()`. ## networks, the miner address is retrievable via `ecRecover()`.
self.reinit( self.reinit(
parent = parent, parent = parent,
blockCtx = self.com.blockCtx(header), blockCtx = blockCtx(header),
linear = linear linear = linear
) )
proc reinit*(self: BaseVMState; ## Object descriptor
header: Header; ## header with tx environment data fields
): bool =
## This is a variant of the `reinit()` function above where the field
## `header.parentHash`, is used to fetch the `parent` Header to be
## used in the `update()` variant, above.
let parent = self.com.db.getBlockHeader(header.parentHash).valueOr:
return false
self.reinit(
parent = parent,
header = header,
linear = false)
proc init*( proc init*(
self: BaseVMState; ## Object descriptor self: BaseVMState; ## Object descriptor
parent: Header; ## parent header, account sync position parent: Header; ## parent header, account sync position
@ -170,7 +163,7 @@ proc init*(
self.init( self.init(
ac = LedgerRef.init(com.db, storeSlotHash), ac = LedgerRef.init(com.db, storeSlotHash),
parent = parent, parent = parent,
blockCtx = com.blockCtx(header), blockCtx = blockCtx(header),
com = com, com = com,
tracer = tracer) tracer = tracer)

View File

@ -33,6 +33,60 @@ declareCounter nec_imported_gas, "Gas processed during import"
var running {.volatile.} = true var running {.volatile.} = true
proc openCsv(name: string): File =
try:
let f = open(name, fmAppend)
let pos = f.getFileSize()
if pos == 0:
f.writeLine("block_number,blocks,slot,txs,gas,time")
f
except IOError as exc:
fatal "Could not open statistics output file", file = name, err = exc.msg
quit(QuitFailure)
proc getMetadata(networkId: NetworkId): auto =
# Network Specific Configurations
# TODO: the merge block number could be fetched from the era1 file instead,
# specially if the accumulator is added to the chain metadata
case networkId
of MainNet:
(
getMetadataForNetwork("mainnet").cfg,
# Mainnet Validators Root
Eth2Digest.fromHex(
"0x4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95"
),
15537393'u64, # Last pre-merge block
4700013'u64, # First post-merge slot
)
of SepoliaNet:
(
getMetadataForNetwork("sepolia").cfg,
Eth2Digest.fromHex(
"0xd8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078"
),
1450408'u64, # Last pre-merge block number
115193'u64, # First post-merge slot
)
of HoleskyNet:
(
getMetadataForNetwork("holesky").cfg,
Eth2Digest.fromHex(
"0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1"
),
0'u64, # Last pre-merge block number
0'u64, # First post-merge slot
)
else:
fatal "Unsupported network", network = networkId
quit(QuitFailure)
template boolFlag(flags, b): PersistBlockFlags =
if b:
flags
else:
{}
proc importBlocks*(conf: NimbusConf, com: CommonRef) = proc importBlocks*(conf: NimbusConf, com: CommonRef) =
proc controlCHandler() {.noconv.} = proc controlCHandler() {.noconv.} =
when defined(windows): when defined(windows):
@ -45,31 +99,18 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
let let
start = com.db.getSavedStateBlockNumber() + 1 start = com.db.getSavedStateBlockNumber() + 1
chain = com.newChain() chain = com.newChain()
(cfg, genesis_validators_root, lastEra1Block, firstSlotAfterMerge) =
template boolFlag(flags, b): PersistBlockFlags = getMetadata(conf.networkId)
if b:
flags
else:
{}
var
imported = 0'u64
importedSlot = 1'u64
gas = GasInt(0)
txs = 0
time0 = Moment.now() time0 = Moment.now()
# These variables are used from closures on purpose, so as to place them on
# the heap rather than the stack
var
slot = 1'u64
time1 = Moment.now() # time at start of chunk
csv = csv =
if conf.csvStats.isSome: if conf.csvStats.isSome:
try: openCsv(conf.csvStats.get())
let f = open(conf.csvStats.get(), fmAppend)
let pos = f.getFileSize()
if pos == 0:
f.writeLine("block_number,blocks,slot,txs,gas,time")
f
except IOError as exc:
error "Could not open statistics output file",
file = conf.csvStats, err = exc.msg
quit(QuitFailure)
else: else:
File(nil) File(nil)
flags = flags =
@ -78,75 +119,52 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
boolFlag(NoPersistBodies, not conf.storeBodies) + boolFlag(NoPersistBodies, not conf.storeBodies) +
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) + boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes) boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
blocks: seq[EthBlock] blk: Block
clConfig: Eth2NetworkMetadata persister = Persister.init(chain, flags)
genesis_validators_root: Eth2Digest cstats: PersistStats # stats at start of chunk
lastEra1Block: uint64
firstSlotAfterMerge: uint64
defer: defer:
if csv != nil: if csv != nil:
close(csv) close(csv)
# Network Specific Configurations template blockNumber(): uint64 =
# TODO: the merge block number could be fetched from the era1 file instead, start + uint64 persister.stats.blocks
# specially if the accumulator is added to the chain metadata
if conf.networkId == MainNet:
doAssert isDir(conf.era1Dir.string), "Era1 directory not found"
clConfig = getMetadataForNetwork("mainnet")
genesis_validators_root = Eth2Digest.fromHex(
"0x4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95"
) # Mainnet Validators Root
lastEra1Block = 15537393'u64 # Mainnet
firstSlotAfterMerge =
if isDir(conf.eraDir.string):
4700013'u64 # Mainnet
else:
warn "No eraDir found for Mainnet, block loading will stop after era1"
0'u64 # No eraDir for Mainnet
elif conf.networkId == SepoliaNet:
doAssert isDir(conf.era1Dir.string), "Era1 directory not found"
clConfig = getMetadataForNetwork("sepolia")
genesis_validators_root = Eth2Digest.fromHex(
"0xd8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078"
) # Sepolia Validators Root
lastEra1Block = 1450408'u64 # Sepolia
firstSlotAfterMerge =
if isDir(conf.eraDir.string):
115193'u64 # Sepolia
else:
warn "No eraDir found for Sepolia, block loading will stop after era1"
0'u64 # No eraDir for Sepolia
elif conf.networkId == HoleskyNet:
doAssert isDir(conf.eraDir.string), "Era directory not found"
clConfig = getMetadataForNetwork("holesky")
genesis_validators_root = Eth2Digest.fromHex(
"0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1"
) # Holesky Validators Root
lastEra1Block = 0'u64
firstSlotAfterMerge = 0'u64
else:
error "Unsupported network", network = conf.networkId
quit(QuitFailure)
nec_import_block_number.set(start.int64) nec_import_block_number.set(start.int64)
template blockNumber(): uint64 =
start + imported
func f(value: float): string = func f(value: float): string =
&"{value:4.3f}" if value >= 1000:
&"{int(value)}"
elif value >= 100:
&"{value:4.1f}"
elif value >= 10:
&"{value:4.2f}"
else:
&"{value:4.3f}"
proc process() = proc persistBlock() =
let persister.persistBlock(blk).isOkOr:
time1 = Moment.now() fatal "Could not persist block", blockNumber = blk.header.number, error
statsRes = chain.persistBlocks(blocks, flags)
if statsRes.isErr():
error "Failed to persist blocks", error = statsRes.error
quit(QuitFailure) quit(QuitFailure)
txs += statsRes[].txs proc checkpoint(force: bool = false) =
gas += statsRes[].gas let (blocks, txs, gas) = persister.stats
if not force and blocks.uint64 mod conf.chunkSize != 0:
return
persister.checkpoint().isOkOr:
fatal "Could not write database checkpoint", error
quit(QuitFailure)
let (cblocks, ctxs, cgas) =
(blocks - cstats.blocks, txs - cstats.txs, gas - cstats.gas)
if cblocks == 0:
return
cstats = persister.stats
let let
time2 = Moment.now() time2 = Moment.now()
diff1 = (time2 - time1).nanoseconds().float / 1000000000 diff1 = (time2 - time1).nanoseconds().float / 1000000000
@ -154,22 +172,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
info "Imported blocks", info "Imported blocks",
blockNumber, blockNumber,
blocks = imported, slot,
importedSlot, blocks,
txs, txs,
mgas = f(gas.float / 1000000), mgas = f(gas.float / 1000000),
bps = f(blocks.len.float / diff1), bps = f(cblocks.float / diff1),
tps = f(statsRes[].txs.float / diff1), tps = f(ctxs.float / diff1),
mgps = f(statsRes[].gas.float / 1000000 / diff1), mgps = f(cgas.float / 1000000 / diff1),
avgBps = f(imported.float / diff0), avgBps = f(blocks.float / diff0),
avgTps = f(txs.float / diff0), avgTps = f(txs.float / diff0),
avgMGps = f(gas.float / 1000000 / diff0), avgMGps = f(gas.float / 1000000 / diff0),
elapsed = toString(time2 - time0, 3) elapsed = toString(time2 - time0, 3)
metrics.set(nec_import_block_number, int64(blockNumber)) metrics.set(nec_import_block_number, int64(blockNumber))
nec_imported_blocks.inc(blocks.len) nec_imported_blocks.inc(cblocks)
nec_imported_transactions.inc(statsRes[].txs) nec_imported_transactions.inc(ctxs)
nec_imported_gas.inc(int64 statsRes[].gas) nec_imported_gas.inc(int64 cgas)
if csv != nil: if csv != nil:
# In the CSV, we store a line for every chunk of blocks processed so # In the CSV, we store a line for every chunk of blocks processed so
@ -177,19 +195,15 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
# process - this way, each sample is independent # process - this way, each sample is independent
try: try:
csv.writeLine( csv.writeLine(
[ [$blockNumber, $cblocks, $slot, $ctxs, $cgas, $(time2 - time1).nanoseconds()].join(
$blockNumber, ","
$blocks.len, )
$importedSlot,
$statsRes[].txs,
$statsRes[].gas,
$(time2 - time1).nanoseconds(),
].join(",")
) )
csv.flushFile() csv.flushFile()
except IOError as exc: except IOError as exc:
warn "Could not write csv", err = exc.msg warn "Could not write csv", err = exc.msg
blocks.setLen(0)
time1 = time2
# Finds the slot number to resume the import process # Finds the slot number to resume the import process
# First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1 # First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1
@ -213,94 +227,101 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
debug "Finding slot number to resume import", startSlot, endSlot debug "Finding slot number to resume import", startSlot, endSlot
while startSlot < endSlot: while startSlot < endSlot:
let blk = getEthBlockFromEra( if not getEthBlockFromEra(
era, historical_roots, historical_summaries, startSlot, clConfig.cfg era, historical_roots, historical_summaries, startSlot, cfg, blk
).valueOr: ):
startSlot += 1 startSlot += 1
if startSlot == endSlot - 1: if startSlot == endSlot - 1:
error "No blocks found in the last era file" error "No blocks found in the last era file"
return false return false
else:
continue continue
startSlot += 1 startSlot += 1
if blk.header.number < blockNumber: if blk.header.number < blockNumber:
notice "Available Era Files are already imported", notice "Available `era` files are already imported",
stateBlockNumber = blockNumber, eraBlockNumber = blk.header.number stateBlockNumber = blockNumber, eraBlockNumber = blk.header.number
quit QuitSuccess return false
else: break
break
if blockNumber > 1: if blockNumber > 1:
# Setting the initial lower bound # Setting the initial lower bound
importedSlot = (blockNumber - lastEra1Block) + firstSlotAfterMerge slot = (blockNumber - lastEra1Block) + firstSlotAfterMerge
debug "Finding slot number after resuming import", importedSlot debug "Finding slot number after resuming import", slot
# BlockNumber based slot finding # BlockNumber based slot finding
var clNum = 0'u64 var clNum = 0'u64
while clNum < blockNumber: while clNum < blockNumber:
let blk = getEthBlockFromEra( if not getEthBlockFromEra(
era, historical_roots, historical_summaries, Slot(importedSlot), clConfig.cfg era, historical_roots, historical_summaries, Slot(slot), cfg, blk
).valueOr: ):
importedSlot += 1 slot += 1
continue continue
clNum = blk.header.number clNum = blk.header.number
# decreasing the lower bound with each iteration # decreasing the lower bound with each iteration
importedSlot += blockNumber - clNum slot += blockNumber - clNum
notice "Resuming import from", importedSlot notice "Matched block to slot number", blockNumber, slot
return true return true
if isDir(conf.era1Dir.string) or isDir(conf.eraDir.string): if lastEra1Block > 0 and start <= lastEra1Block:
if start <= lastEra1Block: let
notice "Importing era1 archive", era1Name =
start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string case conf.networkId
of MainNet:
let db = "mainnet"
if conf.networkId == MainNet: of SepoliaNet:
Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present") "sepolia"
# Mainnet
else: else:
Era1DbRef.init(conf.era1Dir.string, "sepolia").expect("Era files present") raiseAssert "Other networks are unsupported or do not have an era1"
# Sepolia db = Era1DbRef.init(conf.era1Dir.string, era1Name).valueOr:
defer: fatal "Could not open era1 database", era1Dir = conf.era1Dir, era1Name, error
db.dispose() quit(QuitFailure)
proc loadEraBlock(blockNumber: uint64): bool = notice "Importing era1 archive",
# Separate proc to reduce stack usage of blk start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string
let blk = db.getEthBlock(blockNumber).valueOr:
error "Could not load block from era1", blockNumber, error=error
return false
blocks.add blk defer:
true db.dispose()
while running and imported < conf.maxBlocks and blockNumber <= lastEra1Block: proc loadEraBlock(blockNumber: uint64): bool =
if not loadEraBlock(blockNumber): db.getEthBlock(blockNumber, blk).isOkOr:
notice "No more era1 blocks to import", blockNumber return false
break true
imported += 1 while running and persister.stats.blocks.uint64 < conf.maxBlocks and
blockNumber <= lastEra1Block:
if blocks.lenu64 mod conf.chunkSize == 0: if not loadEraBlock(blockNumber):
process() notice "No more `era1` blocks to import", blockNumber, slot
break
if blocks.len > 0: persistBlock()
process() # last chunk, if any checkpoint()
block era1Import:
if blockNumber > lastEra1Block: if blockNumber > lastEra1Block:
if not isDir(conf.eraDir.string):
if blockNumber == 0:
fatal "`era` directory not found, cannot start import",
blockNumber, eraDir = conf.eraDir.string
quit(QuitFailure)
else:
notice "`era` directory not found, stopping import at merge boundary",
blockNumber, eraDir = conf.eraDir.string
break era1Import
notice "Importing era archive", notice "Importing era archive",
blockNumber, dataDir = conf.dataDir.string, eraDir = conf.eraDir.string blockNumber, dataDir = conf.dataDir.string, eraDir = conf.eraDir.string
let let
eraDB = EraDB.new(clConfig.cfg, conf.eraDir.string, genesis_validators_root) eraDB = EraDB.new(cfg, conf.eraDir.string, genesis_validators_root)
(historical_roots, historical_summaries, endSlot) = loadHistoricalRootsFromEra( (historical_roots, historical_summaries, endSlot) = loadHistoricalRootsFromEra(
conf.eraDir.string, clConfig.cfg conf.eraDir.string, cfg
).valueOr: ).valueOr:
error "Error loading historical summaries", error fatal "Could not load historical summaries",
quit QuitFailure eraDir = conf.eraDir.string, error
quit(QuitFailure)
# Load the last slot number # Load the last slot number
var moreEraAvailable = true var moreEraAvailable = true
@ -309,35 +330,39 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
eraDB, historical_roots.asSeq(), historical_summaries.asSeq(), endSlot eraDB, historical_roots.asSeq(), historical_summaries.asSeq(), endSlot
) )
if importedSlot < firstSlotAfterMerge and firstSlotAfterMerge != 0: if slot < firstSlotAfterMerge and firstSlotAfterMerge != 0:
# if resuming import we do not update the slot # if resuming import we do not update the slot
importedSlot = firstSlotAfterMerge slot = firstSlotAfterMerge
proc loadEra1Block(importedSlot: Slot): bool = proc loadEra1Block(): bool =
# Separate proc to reduce stack usage of blk # Separate proc to reduce stack usage of blk
var blk = getEthBlockFromEra( if not getEthBlockFromEra(
eraDB, eraDB,
historical_roots.asSeq(), historical_roots.asSeq(),
historical_summaries.asSeq(), historical_summaries.asSeq(),
importedSlot, Slot(slot),
clConfig.cfg, cfg,
).valueOr: blk,
):
return false return false
blocks.add blk
true true
while running and moreEraAvailable and imported < conf.maxBlocks and while running and moreEraAvailable and
importedSlot < endSlot: persister.stats.blocks.uint64 < conf.maxBlocks and slot < endSlot:
if not loadEra1Block(Slot(importedSlot)): if not loadEra1Block():
importedSlot += 1 slot += 1
continue continue
slot += 1
imported += 1 persistBlock()
importedSlot += 1 checkpoint()
if blocks.lenu64 mod conf.chunkSize == 0: checkpoint(true)
process()
if blocks.len > 0: notice "Import complete",
process() blockNumber,
slot,
blocks = persister.stats.blocks,
txs = persister.stats.txs,
mgas = f(persister.stats.gas.float / 1000000)

View File

@ -119,11 +119,11 @@ proc getWithdrawals*(x: seq[capella.Withdrawal]): seq[blocks.Withdrawal] =
) )
return withdrawals return withdrawals
proc getEthBlock*(blck: ForkyTrustedBeaconBlock): Opt[EthBlock] = proc getEthBlock*(blck: ForkyTrustedBeaconBlock, res: var EthBlock): bool =
## Convert a beacon block to an eth1 block. ## Convert a beacon block to an eth1 block.
const consensusFork = typeof(blck).kind const consensusFork = typeof(blck).kind
when consensusFork >= ConsensusFork.Bellatrix: when consensusFork >= ConsensusFork.Bellatrix:
let var
payload = blck.body.execution_payload payload = blck.body.execution_payload
txs = getTxs(payload.transactions.asSeq()) txs = getTxs(payload.transactions.asSeq())
ethWithdrawals = ethWithdrawals =
@ -152,33 +152,35 @@ proc getEthBlock*(blck: ForkyTrustedBeaconBlock): Opt[EthBlock] =
else: else:
Opt.none(Hash32) Opt.none(Hash32)
header = Header( res.header = Header(
parentHash: Hash32(payload.parent_hash.data), parentHash: Hash32(payload.parent_hash.data),
ommersHash: EMPTY_UNCLE_HASH, ommersHash: EMPTY_UNCLE_HASH,
coinbase: EthAddress(payload.fee_recipient.data), coinbase: EthAddress(payload.fee_recipient.data),
stateRoot: Root(payload.state_root.data), stateRoot: Root(payload.state_root.data),
transactionsRoot: calcTxRoot(txs), transactionsRoot: calcTxRoot(txs),
receiptsRoot: Root(payload.receipts_root.data), receiptsRoot: Root(payload.receipts_root.data),
logsBloom: Bloom(payload.logs_bloom.data), logsBloom: Bloom(payload.logs_bloom.data),
difficulty: 0.u256, difficulty: 0.u256,
number: payload.block_number, number: payload.block_number,
gasLimit: GasInt(payload.gas_limit), gasLimit: GasInt(payload.gas_limit),
gasUsed: GasInt(payload.gas_used), gasUsed: GasInt(payload.gas_used),
timestamp: EthTime(payload.timestamp), timestamp: EthTime(payload.timestamp),
extraData: payload.extra_data.asSeq(), extraData: payload.extra_data.asSeq(),
mixHash: Bytes32(payload.prev_randao.data), mixHash: Bytes32(payload.prev_randao.data),
nonce: default(Bytes8), nonce: default(Bytes8),
baseFeePerGas: Opt.some(payload.base_fee_per_gas), baseFeePerGas: Opt.some(payload.base_fee_per_gas),
withdrawalsRoot: withdrawalRoot, withdrawalsRoot: withdrawalRoot,
blobGasUsed: blobGasUsed, blobGasUsed: blobGasUsed,
excessBlobGas: excessBlobGas, excessBlobGas: excessBlobGas,
parentBeaconBlockRoot: parentBeaconBlockRoot, parentBeaconBlockRoot: parentBeaconBlockRoot,
)
Opt.some EthBlock(
header: header, transactions: txs, uncles: @[], withdrawals: ethWithdrawals
) )
res.transactions = move(txs)
res.uncles.reset()
res.withdrawals = move(ethWithdrawals)
true
else: else:
Opt.none(EthBlock) false
proc getEthBlockFromEra*( proc getEthBlockFromEra*(
db: EraDB, db: EraDB,
@ -186,15 +188,18 @@ proc getEthBlockFromEra*(
historical_summaries: openArray[HistoricalSummary], historical_summaries: openArray[HistoricalSummary],
slot: Slot, slot: Slot,
cfg: RuntimeConfig, cfg: RuntimeConfig,
): Opt[EthBlock] = res: var EthBlock,
): bool =
let fork = cfg.consensusForkAtEpoch(slot.epoch) let fork = cfg.consensusForkAtEpoch(slot.epoch)
fork.withConsensusFork: fork.withConsensusFork:
type T = consensusFork.TrustedSignedBeaconBlock type T = consensusFork.TrustedSignedBeaconBlock
var tmp = new T var tmp = new Opt[T]
# Pass in default Eth2Digest to avoid block root computation (it is not # Pass in default Eth2Digest to avoid block root computation (it is not
# needed in this case) # needed in this case)
tmp[] = db.getBlock( tmp[] = db.getBlock(
historical_roots, historical_summaries, slot, Opt.some(default(Eth2Digest)), T historical_roots, historical_summaries, slot, Opt.some(default(Eth2Digest)), T
).valueOr: )
return Opt.none(EthBlock) if tmp[].isNone():
getEthBlock(tmp[].message) return false
getEthBlock(tmp[][].message, res)

View File

@ -61,7 +61,7 @@ template getELBlockFromBeaconChain(
var eth1block: EthBlock var eth1block: EthBlock
if isAvailable: if isAvailable:
withBlck(clBlock.asTrusted()): withBlck(clBlock.asTrusted()):
eth1Block = getEthBlock(forkyBlck.message).valueOr: if not getEthBlock(forkyBlck.message, eth1Block):
error "Failed to get EL block from CL head" error "Failed to get EL block from CL head"
quit(QuitFailure) quit(QuitFailure)

View File

@ -30,14 +30,14 @@ iterator undumpBlocksEra1*(
# a time and let the consumer do the chunking # a time and let the consumer do the chunking
const blocksPerYield = 192 const blocksPerYield = 192
var tmp = newSeqOfCap[EthBlock](blocksPerYield) var tmp = newSeqOfCap[EthBlock](blocksPerYield)
var blk: Block
for i in 0 ..< stopAfter: for i in 0 ..< stopAfter:
var bck = db.getEthBlock(least + i).valueOr: db.getEthBlock(least + i, blk).isOkOr:
if doAssertOk: if doAssertOk:
doAssert i > 0, "expected at least one block" doAssert i > 0, "expected at least one block"
break break
tmp.add move(bck) tmp.add move(blk)
# Genesis block requires a chunk of its own, for compatibility with current # Genesis block requires a chunk of its own, for compatibility with current
# test setup (a bit weird, that...) # test setup (a bit weird, that...)