nimbus-eth1/nimbus/nimbus_import.nim

369 lines
11 KiB
Nim

# Nimbus
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import
chronicles,
metrics,
chronos/timer,
std/[strformat, strutils],
stew/io2,
beacon_chain/era_db,
beacon_chain/networking/network_metadata,
./config,
./common/common,
./core/chain,
./db/era1_db,
./utils/era_helpers
declareGauge nec_import_block_number, "Latest imported block number"
declareCounter nec_imported_blocks, "Blocks processed during import"
declareCounter nec_imported_transactions, "Transactions processed during import"
declareCounter nec_imported_gas, "Gas processed during import"
var running {.volatile.} = true
proc openCsv(name: string): File =
try:
let f = open(name, fmAppend)
let pos = f.getFileSize()
if pos == 0:
f.writeLine("block_number,blocks,slot,txs,gas,time")
f
except IOError as exc:
fatal "Could not open statistics output file", file = name, err = exc.msg
quit(QuitFailure)
proc getMetadata(networkId: NetworkId): auto =
# Network Specific Configurations
# TODO: the merge block number could be fetched from the era1 file instead,
# specially if the accumulator is added to the chain metadata
case networkId
of MainNet:
(
getMetadataForNetwork("mainnet").cfg,
# Mainnet Validators Root
Eth2Digest.fromHex(
"0x4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95"
),
15537393'u64, # Last pre-merge block
4700013'u64, # First post-merge slot
)
of SepoliaNet:
(
getMetadataForNetwork("sepolia").cfg,
Eth2Digest.fromHex(
"0xd8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078"
),
1450408'u64, # Last pre-merge block number
115193'u64, # First post-merge slot
)
of HoleskyNet:
(
getMetadataForNetwork("holesky").cfg,
Eth2Digest.fromHex(
"0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1"
),
0'u64, # Last pre-merge block number
0'u64, # First post-merge slot
)
else:
fatal "Unsupported network", network = networkId
quit(QuitFailure)
template boolFlag(flags, b): PersistBlockFlags =
if b:
flags
else:
{}
proc importBlocks*(conf: NimbusConf, com: CommonRef) =
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
running = false
setControlCHook(controlCHandler)
let
start = com.db.getSavedStateBlockNumber() + 1
chain = com.newChain()
(cfg, genesis_validators_root, lastEra1Block, firstSlotAfterMerge) =
getMetadata(conf.networkId)
time0 = Moment.now()
# These variables are used from closures on purpose, so as to place them on
# the heap rather than the stack
var
slot = 1'u64
time1 = Moment.now() # time at start of chunk
csv =
if conf.csvStats.isSome:
openCsv(conf.csvStats.get())
else:
File(nil)
flags =
boolFlag({PersistBlockFlag.NoValidation}, conf.noValidation) +
boolFlag({PersistBlockFlag.NoFullValidation}, not conf.fullValidation) +
boolFlag(NoPersistBodies, not conf.storeBodies) +
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
blk: Block
persister = Persister.init(chain, flags)
cstats: PersistStats # stats at start of chunk
defer:
if csv != nil:
close(csv)
template blockNumber(): uint64 =
start + uint64 persister.stats.blocks
nec_import_block_number.set(start.int64)
func f(value: float): string =
if value >= 1000:
&"{int(value)}"
elif value >= 100:
&"{value:4.1f}"
elif value >= 10:
&"{value:4.2f}"
else:
&"{value:4.3f}"
proc persistBlock() =
persister.persistBlock(blk).isOkOr:
fatal "Could not persist block", blockNumber = blk.header.number, error
quit(QuitFailure)
proc checkpoint(force: bool = false) =
let (blocks, txs, gas) = persister.stats
if not force and blocks.uint64 mod conf.chunkSize != 0:
return
persister.checkpoint().isOkOr:
fatal "Could not write database checkpoint", error
quit(QuitFailure)
let (cblocks, ctxs, cgas) =
(blocks - cstats.blocks, txs - cstats.txs, gas - cstats.gas)
if cblocks == 0:
return
cstats = persister.stats
let
time2 = Moment.now()
diff1 = (time2 - time1).nanoseconds().float / 1000000000
diff0 = (time2 - time0).nanoseconds().float / 1000000000
info "Imported blocks",
blockNumber,
slot,
blocks,
txs,
mgas = f(gas.float / 1000000),
bps = f(cblocks.float / diff1),
tps = f(ctxs.float / diff1),
mgps = f(cgas.float / 1000000 / diff1),
avgBps = f(blocks.float / diff0),
avgTps = f(txs.float / diff0),
avgMGps = f(gas.float / 1000000 / diff0),
elapsed = toString(time2 - time0, 3)
metrics.set(nec_import_block_number, int64(blockNumber))
nec_imported_blocks.inc(cblocks)
nec_imported_transactions.inc(ctxs)
nec_imported_gas.inc(int64 cgas)
if csv != nil:
# In the CSV, we store a line for every chunk of blocks processed so
# that the file can meaningfully be appended to when restarting the
# process - this way, each sample is independent
try:
csv.writeLine(
[$blockNumber, $cblocks, $slot, $ctxs, $cgas, $(time2 - time1).nanoseconds()].join(
","
)
)
csv.flushFile()
except IOError as exc:
warn "Could not write csv", err = exc.msg
time1 = time2
# Finds the slot number to resume the import process
# First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1
# Then it iterates over the slots to find the current slot number, along with reducing the
# search space by calculating the difference between the `blockNumber` and the `block_number` from the executionPayload
# of the slot, then adding the difference to the importedSlot. This pushes the lower bound more,
# making the search way smaller
proc updateLastImportedSlot(
era: EraDB,
historical_roots: openArray[Eth2Digest],
historical_summaries: openArray[HistoricalSummary],
endSlot: Slot,
): bool =
# Checks if the Nimbus block number is ahead the era block number
# First we load the last era number, and get the fist slot number
# Since the slot emptiness cannot be predicted, we iterate over to find the block and check
# if the block number is greater than the current block number
var
lastEra = era(endSlot - 1)
startSlot = start_slot(lastEra) - 8192
debug "Finding slot number to resume import", startSlot, endSlot
while startSlot < endSlot:
if not getEthBlockFromEra(
era, historical_roots, historical_summaries, startSlot, cfg, blk
):
startSlot += 1
if startSlot == endSlot - 1:
error "No blocks found in the last era file"
return false
continue
startSlot += 1
if blk.header.number < blockNumber:
notice "Available `era` files are already imported",
stateBlockNumber = blockNumber, eraBlockNumber = blk.header.number
return false
break
if blockNumber > 1:
# Setting the initial lower bound
slot = (blockNumber - lastEra1Block) + firstSlotAfterMerge
debug "Finding slot number after resuming import", slot
# BlockNumber based slot finding
var clNum = 0'u64
while clNum < blockNumber:
if not getEthBlockFromEra(
era, historical_roots, historical_summaries, Slot(slot), cfg, blk
):
slot += 1
continue
clNum = blk.header.number
# decreasing the lower bound with each iteration
slot += blockNumber - clNum
notice "Matched block to slot number", blockNumber, slot
return true
if lastEra1Block > 0 and start <= lastEra1Block:
let
era1Name =
case conf.networkId
of MainNet:
"mainnet"
of SepoliaNet:
"sepolia"
else:
raiseAssert "Other networks are unsupported or do not have an era1"
db = Era1DbRef.init(conf.era1Dir.string, era1Name).valueOr:
fatal "Could not open era1 database", era1Dir = conf.era1Dir, era1Name, error
quit(QuitFailure)
notice "Importing era1 archive",
start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string
defer:
db.dispose()
proc loadEraBlock(blockNumber: uint64): bool =
db.getEthBlock(blockNumber, blk).isOkOr:
return false
true
while running and persister.stats.blocks.uint64 < conf.maxBlocks and
blockNumber <= lastEra1Block:
if not loadEraBlock(blockNumber):
notice "No more `era1` blocks to import", blockNumber, slot
break
persistBlock()
checkpoint()
block era1Import:
if blockNumber > lastEra1Block:
if not isDir(conf.eraDir.string):
if blockNumber == 0:
fatal "`era` directory not found, cannot start import",
blockNumber, eraDir = conf.eraDir.string
quit(QuitFailure)
else:
notice "`era` directory not found, stopping import at merge boundary",
blockNumber, eraDir = conf.eraDir.string
break era1Import
notice "Importing era archive",
blockNumber, dataDir = conf.dataDir.string, eraDir = conf.eraDir.string
let
eraDB = EraDB.new(cfg, conf.eraDir.string, genesis_validators_root)
(historical_roots, historical_summaries, endSlot) = loadHistoricalRootsFromEra(
conf.eraDir.string, cfg
).valueOr:
fatal "Could not load historical summaries",
eraDir = conf.eraDir.string, error
quit(QuitFailure)
# Load the last slot number
var moreEraAvailable = true
if blockNumber > lastEra1Block + 1:
moreEraAvailable = updateLastImportedSlot(
eraDB, historical_roots.asSeq(), historical_summaries.asSeq(), endSlot
)
if slot < firstSlotAfterMerge and firstSlotAfterMerge != 0:
# if resuming import we do not update the slot
slot = firstSlotAfterMerge
proc loadEra1Block(): bool =
# Separate proc to reduce stack usage of blk
if not getEthBlockFromEra(
eraDB,
historical_roots.asSeq(),
historical_summaries.asSeq(),
Slot(slot),
cfg,
blk,
):
return false
true
while running and moreEraAvailable and
persister.stats.blocks.uint64 < conf.maxBlocks and slot < endSlot:
if not loadEra1Block():
slot += 1
continue
slot += 1
persistBlock()
checkpoint()
checkpoint(true)
notice "Import complete",
blockNumber,
slot,
blocks = persister.stats.blocks,
txs = persister.stats.txs,
mgas = f(persister.stats.gas.float / 1000000)