import: read from era files (#2254)

This PR extends the `nimbus import` command to also allow reading from
era files - this command allows creating or topping up an existing
database with data coming from era files instead of network sync.

* add `--era1-dir` and `--max-blocks` options to command line
* make `persistBlocks` report basic stats like transactions and gas
* improve error reporting in several API
* allow importing multiple RLP files in one go
* clean up logging options to match nimbus-eth2
* make sure database is closed properly on shutdown
This commit is contained in:
Jacek Sieka 2024-05-31 09:13:56 +02:00 committed by GitHub
parent 324610ef05
commit a375720c16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 298 additions and 161 deletions

View File

@ -86,8 +86,7 @@ proc processBlock(
ValidationResult.OK ValidationResult.OK
proc getVmState(c: ChainRef, header: BlockHeader): proc getVmState(c: ChainRef, header: BlockHeader):
Result[BaseVMState, void] Result[BaseVMState, void] =
{.gcsafe, raises: [CatchableError].} =
if c.vmState.isNil.not: if c.vmState.isNil.not:
return ok(c.vmState) return ok(c.vmState)
@ -96,6 +95,7 @@ proc getVmState(c: ChainRef, header: BlockHeader):
debug "Cannot initialise VmState", debug "Cannot initialise VmState",
number = header.blockNumber number = header.blockNumber
return err() return err()
return ok(vmState) return ok(vmState)
# A stripped down version of persistBlocks without validation # A stripped down version of persistBlocks without validation

View File

@ -118,14 +118,14 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
return simpleFCU(PayloadExecutionStatus.invalid, "TDs unavailable for TDD check") return simpleFCU(PayloadExecutionStatus.invalid, "TDs unavailable for TDD check")
if td < ttd or (blockNumber > 0'u64 and ptd > ttd): if td < ttd or (blockNumber > 0'u64 and ptd > ttd):
error "Refusing beacon update to pre-merge", notice "Refusing beacon update to pre-merge",
number = blockNumber, number = blockNumber,
hash = blockHash.short, hash = blockHash.short,
diff = header.difficulty, diff = header.difficulty,
ptd = ptd, ptd = ptd,
ttd = ttd ttd = ttd
return invalidFCU() return invalidFCU("Refusing beacon update to pre-merge")
# If the head block is already in our canonical chain, the beacon client is # If the head block is already in our canonical chain, the beacon client is
# probably resyncing. Ignore the update. # probably resyncing. Ignore the update.
@ -133,11 +133,10 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
if db.getBlockHash(header.blockNumber, canonHash) and canonHash == blockHash: if db.getBlockHash(header.blockNumber, canonHash) and canonHash == blockHash:
# TODO should this be possible? # TODO should this be possible?
# If we allow these types of reorgs, we will do lots and lots of reorgs during sync # If we allow these types of reorgs, we will do lots and lots of reorgs during sync
debug "Reorg to previous block" notice "Reorg to previous block", blockHash
if chain.setCanonical(header) != ValidationResult.OK:
return invalidFCU(com, header) chain.setCanonical(header).isOkOr:
elif chain.setCanonical(header) != ValidationResult.OK: return invalidFCU(error, com, header)
return invalidFCU(com, header)
# If the beacon client also advertised a finalized block, mark the local # If the beacon client also advertised a finalized block, mark the local
# chain final and completely in PoS mode. # chain final and completely in PoS mode.

View File

@ -187,10 +187,10 @@ proc newPayload*(ben: BeaconEngineRef,
hash = blockHash, number = header.blockNumber hash = blockHash, number = header.blockNumber
let body = blockBody(payload) let body = blockBody(payload)
let vres = ben.chain.insertBlockWithoutSetHead(header, body) let vres = ben.chain.insertBlockWithoutSetHead(header, body)
if vres != ValidationResult.OK: if vres.isErr:
ben.setInvalidAncestor(header, blockHash) ben.setInvalidAncestor(header, blockHash)
let blockHash = latestValidHash(db, parent, ttd) let blockHash = latestValidHash(db, parent, ttd)
return invalidStatus(blockHash, "Failed to insert block") return invalidStatus(blockHash, vres.error())
# We've accepted a valid payload from the beacon client. Mark the local # We've accepted a valid payload from the beacon client. Mark the local
# chain transitions to notify other subsystems (e.g. downloader) of the # chain transitions to notify other subsystems (e.g. downloader) of the

View File

@ -85,11 +85,14 @@ proc simpleFCU*(status: PayloadExecutionStatus,
) )
) )
proc invalidFCU*(hash = common.Hash256()): ForkchoiceUpdatedResponse = proc invalidFCU*(
validationError: string,
hash = common.Hash256()): ForkchoiceUpdatedResponse =
ForkchoiceUpdatedResponse(payloadStatus: ForkchoiceUpdatedResponse(payloadStatus:
PayloadStatusV1( PayloadStatusV1(
status: PayloadExecutionStatus.invalid, status: PayloadExecutionStatus.invalid,
latestValidHash: toValidHash(hash) latestValidHash: toValidHash(hash),
validationError: some validationError
) )
) )
@ -183,13 +186,16 @@ proc latestValidHash*(db: CoreDbRef,
# latestValidHash MUST be set to ZERO # latestValidHash MUST be set to ZERO
common.Hash256() common.Hash256()
proc invalidFCU*(com: CommonRef, proc invalidFCU*(validationError: string,
header: common.BlockHeader): ForkchoiceUpdatedResponse com: CommonRef,
{.gcsafe, raises: [RlpError].} = header: common.BlockHeader): ForkchoiceUpdatedResponse =
var parent: common.BlockHeader var parent: common.BlockHeader
if not com.db.getBlockHeader(header.parentHash, parent): if not com.db.getBlockHeader(header.parentHash, parent):
return invalidFCU(common.Hash256()) return invalidFCU(validationError)
let blockHash = latestValidHash(com.db, parent, let blockHash = try:
com.ttd.get(high(common.BlockNumber))) latestValidHash(com.db, parent, com.ttd.get(high(common.BlockNumber)))
invalidFCU(blockHash) except RlpError:
default(common.Hash256)
invalidFCU(validationError, blockHash)

View File

@ -369,7 +369,7 @@ proc initializeEmptyDb*(com: CommonRef)
{.gcsafe, raises: [CatchableError].} = {.gcsafe, raises: [CatchableError].} =
let kvt = com.db.kvt() let kvt = com.db.kvt()
if canonicalHeadHashKey().toOpenArray notin kvt: if canonicalHeadHashKey().toOpenArray notin kvt:
trace "Writing genesis to DB" info "Writing genesis to DB"
doAssert(com.genesisHeader.blockNumber.isZero, doAssert(com.genesisHeader.blockNumber.isZero,
"can't commit genesis block with number > 0") "can't commit genesis block with number > 0")
discard com.db.persistHeaderToDb(com.genesisHeader, discard com.db.persistHeaderToDb(com.genesisHeader,

View File

@ -145,6 +145,11 @@ type
abbr: "d" abbr: "d"
name: "data-dir" }: OutDir name: "data-dir" }: OutDir
era1DirOpt* {.
desc: "Directory where era1 (pre-merge) archive can be found"
defaultValueDesc: "<data-dir>/era1"
name: "era1-dir" }: Option[OutDir]
keyStore* {. keyStore* {.
desc: "Load one or more keystore files from this directory" desc: "Load one or more keystore files from this directory"
defaultValue: defaultKeystoreDir() defaultValue: defaultKeystoreDir()
@ -166,7 +171,7 @@ type
syncMode* {. syncMode* {.
desc: "Specify particular blockchain sync mode." desc: "Specify particular blockchain sync mode."
longDesc: longDesc:
"- default -- legacy sync mode\n" & "- default -- beacon sync mode\n" &
"- full -- full blockchain archive\n" & "- full -- full blockchain archive\n" &
# "- snap -- experimental snap mode (development only)\n" & # "- snap -- experimental snap mode (development only)\n" &
"" ""
@ -475,12 +480,20 @@ type
name: "trusted-setup-file" .}: Option[string] name: "trusted-setup-file" .}: Option[string]
of `import`: of `import`:
blocksFile* {. blocksFile* {.
argument argument
desc: "Import RLP encoded block(s) from a file, validate, write to database and quit" desc: "One or more RLP encoded block(s) files"
defaultValue: "" name: "blocks-file" }: seq[InputFile]
name: "blocks-file" }: InputFile
maxBlocks* {.
desc: "Maximum number of blocks to import"
defaultValue: uint64.high()
name: "max-blocks" .}: uint64
chunkSize* {.
desc: "Number of blocks per database transaction"
defaultValue: 8192
name: "chunk-size" .}: uint64
func parseCmdArg(T: type NetworkId, p: string): T func parseCmdArg(T: type NetworkId, p: string): T
{.gcsafe, raises: [ValueError].} = {.gcsafe, raises: [ValueError].} =
@ -735,6 +748,9 @@ func httpServerEnabled*(conf: NimbusConf): bool =
conf.wsEnabled or conf.wsEnabled or
conf.rpcEnabled conf.rpcEnabled
func era1Dir*(conf: NimbusConf): OutDir =
conf.era1DirOpt.get(OutDir(conf.dataDir.string & "/era1"))
# KLUDGE: The `load()` template does currently not work within any exception # KLUDGE: The `load()` template does currently not work within any exception
# annotated environment. # annotated environment.
{.pop.} {.pop.}

View File

@ -7,6 +7,8 @@
# This file may not be copied, modified, or distributed except according to # This file may not be copied, modified, or distributed except according to
# those terms. # those terms.
{.push raises: [].}
import import
chronicles, chronicles,
eth/rlp, stew/io2, eth/rlp, stew/io2,
@ -30,9 +32,6 @@ proc importRlpBlock*(blocksRlp: openArray[byte]; com: CommonRef; importFile: str
while rlp.hasData: while rlp.hasData:
try: try:
rlp.decompose(header, body) rlp.decompose(header, body)
if chain.persistBlocks([header], [body]) == ValidationResult.Error:
# register one more error and continue
errorCount.inc
except RlpError as e: except RlpError as e:
# terminate if there was a decoding error # terminate if there was a decoding error
error "rlp error", error "rlp error",
@ -40,12 +39,12 @@ proc importRlpBlock*(blocksRlp: openArray[byte]; com: CommonRef; importFile: str
msg = e.msg, msg = e.msg,
exception = e.name exception = e.name
return false return false
except CatchableError as e:
# otherwise continue chain.persistBlocks([header], [body]).isOkOr():
# register one more error and continue
error "import error", error "import error",
fileName = importFile, fileName = importFile,
msg = e.msg, error
exception = e.name
errorCount.inc errorCount.inc
return errorCount == 0 return errorCount == 0

View File

@ -41,16 +41,12 @@ type
## First block to when `extraValidation` will be applied (only ## First block to when `extraValidation` will be applied (only
## effective if `extraValidation` is true.) ## effective if `extraValidation` is true.)
vmState: BaseVMState
## If it's not nil, block validation will use this
## If it's nil, a new vmState state will be created.
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public constructors # Public constructors
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc newChain*(com: CommonRef, proc newChain*(com: CommonRef,
extraValidation: bool, vmState = BaseVMState(nil)): ChainRef = extraValidation: bool): ChainRef =
## Constructor for the `Chain` descriptor object. ## Constructor for the `Chain` descriptor object.
## The argument `extraValidation` enables extra block ## The argument `extraValidation` enables extra block
## chain validation if set `true`. ## chain validation if set `true`.
@ -58,7 +54,6 @@ proc newChain*(com: CommonRef,
com: com, com: com,
validateBlock: true, validateBlock: true,
extraValidation: extraValidation, extraValidation: extraValidation,
vmState: vmState,
) )
func newChain*(com: CommonRef): ChainRef = func newChain*(com: CommonRef): ChainRef =

View File

@ -11,6 +11,7 @@
{.push raises: [].} {.push raises: [].}
import import
results,
../../db/ledger, ../../db/ledger,
../../vm_state, ../../vm_state,
../../vm_types, ../../vm_types,
@ -25,6 +26,8 @@ when not defined(release):
../../tracer, ../../tracer,
../../utils/utils ../../utils/utils
export results
type type
PersistBlockFlag = enum PersistBlockFlag = enum
NoPersistHeader NoPersistHeader
@ -34,6 +37,11 @@ type
PersistBlockFlags = set[PersistBlockFlag] PersistBlockFlags = set[PersistBlockFlag]
PersistStats = tuple
blocks: int
txs: int
gas: GasInt
const const
CleanUpEpoch = 30_000.u256 CleanUpEpoch = 30_000.u256
## Regular checks for history clean up (applies to single state DB). This ## Regular checks for history clean up (applies to single state DB). This
@ -45,17 +53,16 @@ const
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc getVmState(c: ChainRef, header: BlockHeader): proc getVmState(c: ChainRef, header: BlockHeader):
Result[BaseVMState, void] = Result[BaseVMState, string] =
if c.vmState.isNil.not:
return ok(c.vmState)
let vmState = BaseVMState() let vmState = BaseVMState()
if not vmState.init(header, c.com): try:
debug "Cannot initialise VmState", # TODO clean up exception handling
number = header.blockNumber if not vmState.init(header, c.com):
return err() return err("Could not initialise VMState")
return ok(vmState) except CatchableError as exc:
return err("Error while initializing VMState: " & exc.msg)
ok(vmState)
proc purgeOutOfJournalBlocks(db: CoreDbRef) {.inline, raises: [RlpError].} = proc purgeOutOfJournalBlocks(db: CoreDbRef) {.inline, raises: [RlpError].} =
## Remove non-reachable blocks from KVT database ## Remove non-reachable blocks from KVT database
@ -67,24 +74,22 @@ proc purgeOutOfJournalBlocks(db: CoreDbRef) {.inline, raises: [RlpError].} =
break break
blkNum = blkNum - 1 blkNum = blkNum - 1
proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader]; proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
bodies: openArray[BlockBody], bodies: openArray[BlockBody],
flags: PersistBlockFlags = {}): ValidationResult flags: PersistBlockFlags = {}): Result[PersistStats, string]
# wildcard exception, wrapped below in public section {.raises: [CatchableError] .} =
{.inline, raises: [CatchableError].} =
let dbTx = c.db.beginTransaction() let dbTx = c.db.beginTransaction()
defer: dbTx.dispose() defer: dbTx.dispose()
c.com.hardForkTransition(headers[0]) c.com.hardForkTransition(headers[0])
# Note that `0 < headers.len`, assured when called from `persistBlocks()` # Note that `0 < headers.len`, assured when called from `persistBlocks()`
let vmState = c.getVmState(headers[0]).valueOr: let vmState = ?c.getVmState(headers[0])
return ValidationResult.Error
let (fromBlock, toBlock) = (headers[0].blockNumber, headers[^1].blockNumber) let (fromBlock, toBlock) = (headers[0].blockNumber, headers[^1].blockNumber)
trace "Persisting blocks", fromBlock, toBlock trace "Persisting blocks", fromBlock, toBlock
var txs = 0
for i in 0 ..< headers.len: for i in 0 ..< headers.len:
let (header, body) = (headers[i], bodies[i]) let (header, body) = (headers[i], bodies[i])
@ -99,19 +104,15 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
debug "Cannot update VmState", debug "Cannot update VmState",
blockNumber = header.blockNumber, blockNumber = header.blockNumber,
item = i item = i
return ValidationResult.Error return err("Cannot update VmState to block " & $header.blockNumber)
if c.validateBlock and c.extraValidation and if c.validateBlock and c.extraValidation and
c.verifyFrom <= header.blockNumber: c.verifyFrom <= header.blockNumber:
let res = c.com.validateHeaderAndKinship( ? c.com.validateHeaderAndKinship(
header, header,
body, body,
checkSealOK = false) # TODO: how to checkseal from here checkSealOK = false) # TODO: how to checkseal from here
if res.isErr:
debug "block validation error",
msg = res.error
return ValidationResult.Error
if c.generateWitness: if c.generateWitness:
vmState.generateWitness = true vmState.generateWitness = true
@ -128,7 +129,7 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
warn "Validation error. Debugging metadata dumped." warn "Validation error. Debugging metadata dumped."
if validationResult != ValidationResult.OK: if validationResult != ValidationResult.OK:
return validationResult return err("Failed to validate block")
if c.generateWitness: if c.generateWitness:
let dbTx = c.db.beginTransaction() let dbTx = c.db.beginTransaction()
@ -144,7 +145,6 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
c.db.setBlockWitness(header.blockHash(), witness) c.db.setBlockWitness(header.blockHash(), witness)
if NoPersistHeader notin flags: if NoPersistHeader notin flags:
discard c.db.persistHeaderToDb( discard c.db.persistHeaderToDb(
header, c.com.consensus == ConsensusType.POS, c.com.startOfHistory) header, c.com.consensus == ConsensusType.POS, c.com.startOfHistory)
@ -166,6 +166,8 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
# Done with this block # Done with this block
lapTx.commit() lapTx.commit()
txs += body.transactions.len
dbTx.commit() dbTx.commit()
# The `c.db.persistent()` call is ignored by the legacy DB which # The `c.db.persistent()` call is ignored by the legacy DB which
@ -183,60 +185,66 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
if(fromBlock mod CleanUpEpoch) <= (toBlock - fromBlock): if(fromBlock mod CleanUpEpoch) <= (toBlock - fromBlock):
c.db.purgeOutOfJournalBlocks() c.db.purgeOutOfJournalBlocks()
ValidationResult.OK ok((headers.len, txs, vmState.cumulativeGasUsed))
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public `ChainDB` methods # Public `ChainDB` methods
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc insertBlockWithoutSetHead*(c: ChainRef, header: BlockHeader, proc insertBlockWithoutSetHead*(c: ChainRef, header: BlockHeader,
body: BlockBody): ValidationResult body: BlockBody): Result[void, string] =
{.gcsafe, raises: [CatchableError].} = try:
result = c.persistBlocksImpl( discard ? c.persistBlocksImpl(
[header], [body], {NoPersistHeader, NoSaveReceipts}) [header], [body], {NoPersistHeader, NoSaveReceipts})
if result == ValidationResult.OK:
c.db.persistHeaderToDbWithoutSetHead(header, c.com.startOfHistory) c.db.persistHeaderToDbWithoutSetHead(header, c.com.startOfHistory)
ok()
except CatchableError as exc:
err(exc.msg)
proc setCanonical*(c: ChainRef, header: BlockHeader): ValidationResult proc setCanonical*(c: ChainRef, header: BlockHeader): Result[void, string] =
{.gcsafe, raises: [CatchableError].} = try:
if header.parentHash == Hash256():
discard c.db.setHead(header.blockHash)
return ok()
var body: BlockBody
if not c.db.getBlockBody(header, body):
debug "Failed to get BlockBody",
hash = header.blockHash
return err("Could not get block body")
discard ? c.persistBlocksImpl([header], [body], {NoPersistHeader, NoSaveTxs})
if header.parentHash == Hash256():
discard c.db.setHead(header.blockHash) discard c.db.setHead(header.blockHash)
return ValidationResult.OK ok()
except CatchableError as exc:
err(exc.msg)
var body: BlockBody proc setCanonical*(c: ChainRef, blockHash: Hash256): Result[void, string] =
if not c.db.getBlockBody(header, body):
debug "Failed to get BlockBody",
hash = header.blockHash
return ValidationResult.Error
result = c.persistBlocksImpl([header], [body], {NoPersistHeader, NoSaveTxs})
if result == ValidationResult.OK:
discard c.db.setHead(header.blockHash)
proc setCanonical*(c: ChainRef, blockHash: Hash256): ValidationResult
{.gcsafe, raises: [CatchableError].} =
var header: BlockHeader var header: BlockHeader
if not c.db.getBlockHeader(blockHash, header): if not c.db.getBlockHeader(blockHash, header):
debug "Failed to get BlockHeader", debug "Failed to get BlockHeader",
hash = blockHash hash = blockHash
return ValidationResult.Error return err("Could not get block header")
setCanonical(c, header) setCanonical(c, header)
proc persistBlocks*(c: ChainRef; headers: openArray[BlockHeader]; proc persistBlocks*(c: ChainRef; headers: openArray[BlockHeader];
bodies: openArray[BlockBody]): ValidationResult bodies: openArray[BlockBody]): Result[PersistStats, string] =
{.gcsafe, raises: [CatchableError].} =
# Run the VM here # Run the VM here
if headers.len != bodies.len: if headers.len != bodies.len:
debug "Number of headers not matching number of bodies" debug "Number of headers not matching number of bodies"
return ValidationResult.Error return err("Mismatching headers and bodies")
if headers.len == 0: if headers.len == 0:
debug "Nothing to do" debug "Nothing to do"
return ValidationResult.OK return ok(default(PersistStats)) # TODO not nice to return nil
c.persistBlocksImpl(headers,bodies) try:
c.persistBlocksImpl(headers,bodies)
except CatchableError as exc:
err(exc.msg)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End

View File

@ -1,5 +1,10 @@
-d:chronicles_line_numbers -d:"chronicles_runtime_filtering=on"
-d:"chronicles_disable_thread_id"
@if release:
-d:"chronicles_line_numbers:0"
@end
-d:"chronicles_sinks=textlines[file]" -d:"chronicles_sinks=textlines[file]"
-d:"chronicles_runtime_filtering=on" -d:"chronicles_runtime_filtering=on"
-d:nimDebugDlOpen -d:nimDebugDlOpen

View File

@ -22,8 +22,8 @@ import
./version, ./version,
./constants, ./constants,
./nimbus_desc, ./nimbus_desc,
./nimbus_import,
./core/eip4844, ./core/eip4844,
./core/block_import,
./db/core_db/persistent, ./db/core_db/persistent,
./sync/protocol, ./sync/protocol,
./sync/handlers ./sync/handlers
@ -36,14 +36,6 @@ when defined(evmc_enabled):
## * No multiple bind addresses support ## * No multiple bind addresses support
## * No database support ## * No database support
proc importBlocks(conf: NimbusConf, com: CommonRef) =
if string(conf.blocksFile).len > 0:
# success or not, we quit after importing blocks
if not importRlpBlock(string conf.blocksFile, com):
quit(QuitFailure)
else:
quit(QuitSuccess)
proc basicServices(nimbus: NimbusNode, proc basicServices(nimbus: NimbusNode,
conf: NimbusConf, conf: NimbusConf,
com: CommonRef) = com: CommonRef) =
@ -218,7 +210,7 @@ proc localServices(nimbus: NimbusNode, conf: NimbusConf,
nimbus.metricsServer = res.get nimbus.metricsServer = res.get
waitFor nimbus.metricsServer.start() waitFor nimbus.metricsServer.start()
proc start(nimbus: NimbusNode, conf: NimbusConf) = proc run(nimbus: NimbusNode, conf: NimbusConf) =
## logging ## logging
setLogLevel(conf.logLevel) setLogLevel(conf.logLevel)
if conf.logFile.isSome: if conf.logFile.isSome:
@ -229,6 +221,19 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
when defined(evmc_enabled): when defined(evmc_enabled):
evmcSetLibraryPath(conf.evm) evmcSetLibraryPath(conf.evm)
# Trusted setup is needed for processing Cancun+ blocks
if conf.trustedSetupFile.isSome:
let fileName = conf.trustedSetupFile.get()
let res = Kzg.loadTrustedSetup(fileName)
if res.isErr:
fatal "Cannot load Kzg trusted setup from file", msg=res.error
quit(QuitFailure)
else:
let res = loadKzgTrustedSetup()
if res.isErr:
fatal "Cannot load baked in Kzg trusted setup", msg=res.error
quit(QuitFailure)
createDir(string conf.dataDir) createDir(string conf.dataDir)
let coreDB = let coreDB =
# Resolve statically for database type # Resolve statically for database type
@ -241,26 +246,17 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
networkId = conf.networkId, networkId = conf.networkId,
params = conf.networkParams) params = conf.networkParams)
defer:
com.db.finish()
com.initializeEmptyDb() com.initializeEmptyDb()
let protocols = conf.getProtocolFlags()
if conf.cmd != NimbusCmd.`import` and conf.trustedSetupFile.isSome:
let fileName = conf.trustedSetupFile.get()
let res = Kzg.loadTrustedSetup(fileName)
if res.isErr:
fatal "Cannot load Kzg trusted setup from file", msg=res.error
quit(QuitFailure)
else:
let res = loadKzgTrustedSetup()
if res.isErr:
fatal "Cannot load baked in Kzg trusted setup", msg=res.error
quit(QuitFailure)
case conf.cmd case conf.cmd
of NimbusCmd.`import`: of NimbusCmd.`import`:
importBlocks(conf, com) importBlocks(conf, com)
else: else:
let protocols = conf.getProtocolFlags()
basicServices(nimbus, conf, com) basicServices(nimbus, conf, com)
manageAccounts(nimbus, conf) manageAccounts(nimbus, conf)
setupP2P(nimbus, conf, com, protocols) setupP2P(nimbus, conf, com, protocols)
@ -282,17 +278,16 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
# it might have been set to "Stopping" with Ctrl+C # it might have been set to "Stopping" with Ctrl+C
nimbus.state = NimbusState.Running nimbus.state = NimbusState.Running
proc process*(nimbus: NimbusNode, conf: NimbusConf) = # Main event loop
# Main event loop while nimbus.state == NimbusState.Running:
while nimbus.state == NimbusState.Running: try:
try: poll()
poll() except CatchableError as e:
except CatchableError as e: debug "Exception in poll()", exc = e.name, err = e.msg
debug "Exception in poll()", exc = e.name, err = e.msg discard e # silence warning when chronicles not activated
discard e # silence warning when chronicles not activated
# Stop loop # Stop loop
waitFor nimbus.stop(conf) waitFor nimbus.stop(conf)
when isMainModule: when isMainModule:
var nimbus = NimbusNode(state: NimbusState.Starting, ctx: newEthContext()) var nimbus = NimbusNode(state: NimbusState.Starting, ctx: newEthContext())
@ -312,5 +307,4 @@ when isMainModule:
## Processing command line arguments ## Processing command line arguments
let conf = makeConfig() let conf = makeConfig()
nimbus.start(conf) nimbus.run(conf)
nimbus.process(conf)

117
nimbus/nimbus_import.nim Normal file
View File

@ -0,0 +1,117 @@
# Nimbus
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import
chronicles,
std/[monotimes, strformat, times],
stew/io2,
./config,
./common/common,
./core/[block_import, chain],
./db/era1_db,
beacon_chain/era_db
var running {.volatile.} = true
proc importBlocks*(conf: NimbusConf, com: CommonRef) =
# ## Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
running = false
setControlCHook(controlCHandler)
let
start = com.db.getLatestJournalBlockNumber().truncate(uint64) + 1
chain = com.newChain()
var
imported = 0'u64
gas = 0.u256
txs = 0
time0 = getMonoTime()
template blockNumber(): uint64 =
start + imported
if isDir(conf.era1Dir.string):
doAssert conf.networkId == MainNet, "Only mainnet era1 current supported"
const
# TODO the merge block number could be fetched from the era1 file instead,
# specially if the accumulator is added to the chain metadata
lastEra1Block = 15537393
if start <= lastEra1Block:
notice "Importing era1 archive",
start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string
var
headers: seq[BlockHeader]
bodies: seq[BlockBody]
func f(value: float): string =
&"{value:4.3f}"
template process() =
let
time1 = getMonoTime()
statsRes = chain.persistBlocks(headers, bodies)
if statsRes.isErr():
error "Failed to persist blocks", error = statsRes.error
quit(QuitFailure)
txs += statsRes[].txs
gas += uint64 statsRes[].gas
let
time2 = getMonoTime()
diff1 = (time2 - time1).inNanoseconds().float / 1000000000
diff0 = (time2 - time0).inNanoseconds().float / 1000000000
# TODO generate csv with import statistics
info "Imported blocks",
blockNumber,
gas,
bps = f(headers.len.float / diff1),
tps = f(statsRes[].txs.float / diff1),
gps = f(statsRes[].gas.float / diff1),
avgBps = f(imported.float / diff0),
avgGps = f(txs.float / diff0),
avgGps = f(gas.truncate(uint64).float / diff0) # TODO fix truncate
headers.setLen(0)
bodies.setLen(0)
let db =
Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present")
defer:
db.dispose()
while running and imported < conf.maxBlocks and blockNumber <= lastEra1Block:
var blk = db.getBlockTuple(blockNumber).valueOr:
error "Could not load block from era1", blockNumber, error
break
imported += 1
headers.add move(blk.header)
bodies.add move(blk.body)
if headers.lenu64 mod conf.chunkSize == 0:
process()
if headers.len > 0:
process() # last chunk, if any
for blocksFile in conf.blocksFile:
if isFile(string blocksFile):
# success or not, we quit after importing blocks
if not importRlpBlock(string blocksFile, com):
quit(QuitFailure)
else:
quit(QuitSuccess)

View File

@ -1,5 +1,5 @@
# Nimbus # Nimbus
# Copyright (c) 2023 Status Research & Development GmbH # Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at # * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT). # https://opensource.org/licenses/MIT).
@ -197,13 +197,8 @@ proc insertBlocks*(sk: SkeletonRef,
headers: openArray[BlockHeader], headers: openArray[BlockHeader],
body: openArray[BlockBody], body: openArray[BlockBody],
fromEngine: bool): Result[uint64, string] = fromEngine: bool): Result[uint64, string] =
try: discard ? sk.chain.persistBlocks(headers, body)
let res = sk.chain.persistBlocks(headers, body) ok(headers.len.uint64)
if res != ValidationResult.OK:
return err("insertBlocks validation error")
ok(headers.len.uint64)
except CatchableError as ex:
err(ex.msg)
proc insertBlock*(sk: SkeletonRef, proc insertBlock*(sk: SkeletonRef,
header: BlockHeader, header: BlockHeader,

View File

@ -122,7 +122,7 @@ proc processStaged(buddy: FullBuddyRef): bool =
# Store in persistent database # Store in persistent database
try: try:
if chain.persistBlocks(wi.headers, wi.bodies) == ValidationResult.OK: if chain.persistBlocks(wi.headers, wi.bodies).isOk():
bq.blockQueueAccept(wi) bq.blockQueueAccept(wi)
return true return true
except CatchableError as e: except CatchableError as e:

View File

@ -82,7 +82,6 @@ proc main() {.used.} =
var retryCount = 0 var retryCount = 0
while true: while true:
var thisBlock: Block var thisBlock: Block
try: try:
thisBlock = requestBlock(blockNumber, { DownloadAndValidate }) thisBlock = requestBlock(blockNumber, { DownloadAndValidate })
@ -104,8 +103,9 @@ proc main() {.used.} =
if numBlocks == numBlocksToCommit: if numBlocks == numBlocksToCommit:
persistToDb(com.db): persistToDb(com.db):
if chain.persistBlocks(headers, bodies) != ValidationResult.OK: let res = chain.persistBlocks(headers, bodies)
raise newException(ValidationError, "Error when validating blocks") res.isOkOr:
raise newException(ValidationError, "Error when validating blocks: " & res.error)
numBlocks = 0 numBlocks = 0
headers.setLen(0) headers.setLen(0)
bodies.setLen(0) bodies.setLen(0)
@ -116,8 +116,9 @@ proc main() {.used.} =
if numBlocks > 0: if numBlocks > 0:
persistToDb(com.db): persistToDb(com.db):
if chain.persistBlocks(headers, bodies) != ValidationResult.OK: let res = chain.persistBlocks(headers, bodies)
raise newException(ValidationError, "Error when validating blocks") res.isOkOr:
raise newException(ValidationError, "Error when validating blocks: " & res.error)
when isMainModule: when isMainModule:
var message: string var message: string

View File

@ -8,7 +8,7 @@
# according to those terms. # according to those terms.
excluded_files="config.yaml|.gitmodules" excluded_files="config.yaml|.gitmodules"
excluded_extensions="json|md|png|txt|toml|gz|key|rlp|era1" excluded_extensions="json|md|png|txt|toml|gz|key|rlp|era1|cfg"
current_year=$(date +"%Y") current_year=$(date +"%Y")
outdated_files=() outdated_files=()

View File

@ -9,7 +9,9 @@
# according to those terms. # according to those terms.
-d:chronicles_line_numbers -d:chronicles_line_numbers
-d:"chronicles_sinks=textblocks" -d:"chronicles_sinks=textlines"
# comment this out, to run the tests in a serial manner: # comment this out, to run the tests in a serial manner:
#-d:nimtestParallel #-d:nimtestParallel
-d:"chronicles_disable_thread_id"
-d:"chronicles_runtime_filtering=on"

View File

@ -86,7 +86,7 @@ when false:
discard discard
when defined(chronicles_runtime_filtering) and loggingEnabled: when defined(chronicles_runtime_filtering) and loggingEnabled:
setLogLevel(LogLevel.TRACE) setLogLevel(LogLevel.TRACE)
proc setErrorLevel = proc setErrorLevel =
discard discard
when defined(chronicles_runtime_filtering) and loggingEnabled: when defined(chronicles_runtime_filtering) and loggingEnabled:
@ -104,7 +104,7 @@ proc blockChainForTesting*(network: NetworkId): CommonRef =
initializeEmptyDb(result) initializeEmptyDb(result)
proc importBlocks(com: CommonRef; h: seq[BlockHeader]; b: seq[BlockBody]) = proc importBlocks(com: CommonRef; h: seq[BlockHeader]; b: seq[BlockBody]) =
if com.newChain.persistBlocks(h,b) != ValidationResult.OK: if com.newChain.persistBlocks(h,b).isErr:
raiseAssert "persistBlocks() failed at block #" & $h[0].blockNumber raiseAssert "persistBlocks() failed at block #" & $h[0].blockNumber
proc getVmState(com: CommonRef; number: BlockNumber): BaseVMState = proc getVmState(com: CommonRef; number: BlockNumber): BaseVMState =

View File

@ -261,8 +261,8 @@ proc importBlock(ctx: var TestCtx, com: CommonRef,
chain = newChain(com, extraValidation = true, ctx.vmState) chain = newChain(com, extraValidation = true, ctx.vmState)
res = chain.persistBlocks([tb.header], [tb.body]) res = chain.persistBlocks([tb.header], [tb.body])
if res == ValidationResult.Error: if res.isErr()
raise newException(ValidationError, "persistBlocks validation") raise newException(ValidationError, res.error())
else: else:
blockWitness(chain.vmState, com.db) blockWitness(chain.vmState, com.db)
testGetBlockWitness(chain, chain.vmState.parent, tb.header) testGetBlockWitness(chain, chain.vmState.parent, tb.header)

View File

@ -52,7 +52,7 @@ proc configurationMain*() =
let bb = makeConfig(@["import", genesisFile]) let bb = makeConfig(@["import", genesisFile])
check bb.cmd == NimbusCmd.`import` check bb.cmd == NimbusCmd.`import`
check bb.blocksFile.string == genesisFile check bb.blocksFile[0].string == genesisFile
test "custom-network loading config file with no genesis data": test "custom-network loading config file with no genesis data":
# no genesis will fallback to geth compatibility mode # no genesis will fallback to geth compatibility mode

View File

@ -234,7 +234,7 @@ proc test_chainSync*(
noisy.stopLoggingAfter(): noisy.stopLoggingAfter():
let runPersistBlocksRc = chain.persistBlocks(w[0], w[1]) let runPersistBlocksRc = chain.persistBlocks(w[0], w[1])
xCheck runPersistBlocksRc == ValidationResult.OK: xCheck runPersistBlocksRc.isOk():
if noisy: if noisy:
noisy.whisper "***", "Re-run with logging enabled...\n" noisy.whisper "***", "Re-run with logging enabled...\n"
setTraceLevel() setTraceLevel()
@ -269,7 +269,7 @@ proc test_chainSync*(
noisy.whisper "***", noisy.whisper "***",
&"processing {dotsOrSpace}[#{fromBlock:>8},#{(lastBlock-1):>8}]" &"processing {dotsOrSpace}[#{fromBlock:>8},#{(lastBlock-1):>8}]"
let runPersistBlocks1Rc = chain.persistBlocks(headers1, bodies1) let runPersistBlocks1Rc = chain.persistBlocks(headers1, bodies1)
xCheck runPersistBlocks1Rc == ValidationResult.OK xCheck runPersistBlocks1Rc.isOk()
dotsOrSpace = " " dotsOrSpace = " "
noisy.startLogging(headers9[0].blockNumber) noisy.startLogging(headers9[0].blockNumber)
@ -286,7 +286,7 @@ proc test_chainSync*(
&"processing {dotsOrSpace}[#{lastBlock:>8},#{lastBlock:>8}]" &"processing {dotsOrSpace}[#{lastBlock:>8},#{lastBlock:>8}]"
noisy.stopLoggingAfter(): noisy.stopLoggingAfter():
let runPersistBlocks0Rc = chain.persistBlocks(headers0, bodies0) let runPersistBlocks0Rc = chain.persistBlocks(headers0, bodies0)
xCheck runPersistBlocks0Rc == ValidationResult.OK xCheck runPersistBlocks0Rc.isOk()
else: else:
if oldLogAlign: if oldLogAlign:
noisy.whisper "***", noisy.whisper "***",
@ -297,7 +297,7 @@ proc test_chainSync*(
&"processing {dotsOrSpace}[#{lastBlock:>8},#{toBlock:>8}]" &"processing {dotsOrSpace}[#{lastBlock:>8},#{toBlock:>8}]"
noisy.stopLoggingAfter(): noisy.stopLoggingAfter():
let runPersistBlocks9Rc = chain.persistBlocks(headers9, bodies9) let runPersistBlocks9Rc = chain.persistBlocks(headers9, bodies9)
xCheck runPersistBlocks9Rc == ValidationResult.OK xCheck runPersistBlocks9Rc.isOk()
break break
if not oldLogAlign: if not oldLogAlign:
sayPerf sayPerf

View File

@ -88,7 +88,7 @@ proc setupChain(): CommonRef =
let chain = newChain(com) let chain = newChain(com)
let res = chain.persistBlocks(headers, bodies) let res = chain.persistBlocks(headers, bodies)
assert(res == ValidationResult.OK) assert res.isOk(), res.error()
com com

View File

@ -40,7 +40,7 @@ proc testFixture(node: JsonNode, testStatusIMPL: var TestStatus) =
# it's ok if setHead fails here because of missing ancestors # it's ok if setHead fails here because of missing ancestors
discard com.db.setHead(parent, true) discard com.db.setHead(parent, true)
let validationResult = chain.persistBlocks(headers, bodies) let validationResult = chain.persistBlocks(headers, bodies)
check validationResult == ValidationResult.OK check validationResult.isOk()
proc persistBlockJsonMain*() = proc persistBlockJsonMain*() =
suite "persist block json tests": suite "persist block json tests":

View File

@ -43,7 +43,7 @@ proc testFixture(node: JsonNode, testStatusIMPL: var TestStatus) =
# it's ok if setHead fails here because of missing ancestors # it's ok if setHead fails here because of missing ancestors
discard com.db.setHead(parent, true) discard com.db.setHead(parent, true)
let validationResult = chain.persistBlocks(headers, bodies) let validationResult = chain.persistBlocks(headers, bodies)
check validationResult == ValidationResult.OK check validationResult.isOk()
let let
blockHash = memoryDB.getBlockHash(blockNumber) blockHash = memoryDB.getBlockHash(blockNumber)

View File

@ -123,7 +123,7 @@ proc test_dbTimingUndumpBlocks*(
# Message if [fromBlock,toBlock] contains a multiple of 700 # Message if [fromBlock,toBlock] contains a multiple of 700
if fromBlock + (toBlock mod 900) <= toBlock: if fromBlock + (toBlock mod 900) <= toBlock:
loadNoise.say "***", &"processing ...[#{fromBlock},#{toBlock}]..." loadNoise.say "***", &"processing ...[#{fromBlock},#{toBlock}]..."
check chain.persistBlocks(w[0], w[1]) == ValidationResult.OK check chain.persistBlocks(w[0], w[1]).isOk()
if numBlocks.toBlockNumber <= w[0][^1].blockNumber: if numBlocks.toBlockNumber <= w[0][^1].blockNumber:
break break

View File

@ -52,7 +52,7 @@ proc importBlockData(node: JsonNode): (CommonRef, Hash256, Hash256, UInt256) {.
# it's ok if setHead fails here because of missing ancestors # it's ok if setHead fails here because of missing ancestors
discard com.db.setHead(parent, true) discard com.db.setHead(parent, true)
let validationResult = chain.persistBlocks(headers, bodies) let validationResult = chain.persistBlocks(headers, bodies)
doAssert validationResult == ValidationResult.OK doAssert validationResult.isOk()
return (com, parent.stateRoot, header.stateRoot, blockNumber) return (com, parent.stateRoot, header.stateRoot, blockNumber)

View File

@ -31,7 +31,7 @@ proc setStatus(xp: TxPoolRef; item: TxItemRef; status: TxItemStatus)
discard xp.txDB.reassign(item, status) discard xp.txDB.reassign(item, status)
proc importBlocks(c: ChainRef; h: seq[BlockHeader]; b: seq[BlockBody]): int = proc importBlocks(c: ChainRef; h: seq[BlockHeader]; b: seq[BlockBody]): int =
if c.persistBlocks(h,b) != ValidationResult.OK: if c.persistBlocks(h,b).isErr():
raiseAssert "persistBlocks() failed at block #" & $h[0].blockNumber raiseAssert "persistBlocks() failed at block #" & $h[0].blockNumber
for body in b: for body in b:
result += body.transactions.len result += body.transactions.len

View File

@ -173,7 +173,7 @@ proc runTxPoolPosTest() =
test "PoS persistBlocks": test "PoS persistBlocks":
let rr = chain.persistBlocks([blk.header], [body]) let rr = chain.persistBlocks([blk.header], [body])
check rr == ValidationResult.OK check rr.isOk()
test "validate TxPool prevRandao setter": test "validate TxPool prevRandao setter":
var sdb = LedgerRef.init(com.db, blk.header.stateRoot) var sdb = LedgerRef.init(com.db, blk.header.stateRoot)
@ -236,7 +236,7 @@ proc runTxPoolBlobhashTest() =
test "Blobhash persistBlocks": test "Blobhash persistBlocks":
let rr = chain.persistBlocks([blk.header], [body]) let rr = chain.persistBlocks([blk.header], [body])
check rr == ValidationResult.OK check rr.isOk()
test "validate TxPool prevRandao setter": test "validate TxPool prevRandao setter":
var sdb = LedgerRef.init(com.db, blk.header.stateRoot) var sdb = LedgerRef.init(com.db, blk.header.stateRoot)