nimbus-eth1/nimbus/nimbus_import.nim

335 lines
10 KiB
Nim
Raw Normal View History

# Nimbus
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
2024-06-02 11:00:05 +00:00
{.push raises: [].}
import
chronicles,
metrics,
2024-06-02 11:00:05 +00:00
chronos/timer,
std/[os, strformat, strutils],
stew/io2,
beacon_chain/era_db,
beacon_chain/networking/network_metadata,
./config,
./common/common,
./core/[block_import, chain],
./db/era1_db,
./utils/era_helpers
declareGauge nec_import_block_number, "Latest imported block number"
declareCounter nec_imported_blocks, "Blocks processed during import"
declareCounter nec_imported_transactions, "Transactions processed during import"
declareCounter nec_imported_gas, "Gas processed during import"
var running {.volatile.} = true
2024-06-02 11:00:05 +00:00
func shortLog(a: timer.Duration, parts = int.high): string {.inline.} =
## Returns string representation of Duration ``a`` as nanoseconds value.
var
res = ""
v = a.nanoseconds()
parts = parts
template f(n: string, T: Duration) =
if v >= T.nanoseconds():
res.add($(uint64(v div T.nanoseconds())))
res.add(n)
v = v mod T.nanoseconds()
dec parts
if v == 0 or parts <= 0:
return res
2024-06-02 11:00:05 +00:00
f("w", Week)
f("d", Day)
f("h", Hour)
f("m", Minute)
f("s", Second)
f("ms", Millisecond)
f("us", Microsecond)
f("ns", Nanosecond)
res
proc importBlocks*(conf: NimbusConf, com: CommonRef) =
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
running = false
setControlCHook(controlCHandler)
let
start = com.db.getSavedStateBlockNumber() + 1
chain = com.newChain()
template boolFlag(flags, b): PersistBlockFlags =
if b:
flags
else:
{}
var
imported = 0'u64
importedSlot = 1'u64
gas = GasInt(0)
txs = 0
2024-06-02 11:00:05 +00:00
time0 = Moment.now()
csv =
if conf.csvStats.isSome:
try:
let f = open(conf.csvStats.get(), fmAppend)
let pos = f.getFileSize()
if pos == 0:
f.writeLine("block_number,blocks,slot,txs,gas,time")
f
except IOError as exc:
error "Could not open statistics output file",
file = conf.csvStats, err = exc.msg
quit(QuitFailure)
else:
File(nil)
flags =
boolFlag({PersistBlockFlag.NoValidation}, conf.noValidation) +
boolFlag({PersistBlockFlag.NoFullValidation}, not conf.fullValidation) +
boolFlag(NoPersistBodies, not conf.storeBodies) +
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
blocks: seq[EthBlock]
clConfig: Eth2NetworkMetadata
genesis_validators_root: Eth2Digest
lastEra1Block: uint64
firstSlotAfterMerge: uint64
defer:
if csv != nil:
close(csv)
# Network Specific Configurations
# TODO: the merge block number could be fetched from the era1 file instead,
# specially if the accumulator is added to the chain metadata
if conf.networkId == MainNet:
doAssert isDir(conf.era1Dir.string), "Era1 directory not found"
clConfig = getMetadataForNetwork("mainnet")
genesis_validators_root = Eth2Digest.fromHex(
"0x4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95"
) # Mainnet Validators Root
lastEra1Block = 15537393'u64 # Mainnet
firstSlotAfterMerge =
if isDir(conf.eraDir.string):
4700013'u64 # Mainnet
else:
notice "No eraDir found for Mainnet, block loading will stop after era1"
0'u64 # No eraDir for Mainnet
elif conf.networkId == SepoliaNet:
doAssert isDir(conf.era1Dir.string), "Era1 directory not found"
clConfig = getMetadataForNetwork("sepolia")
genesis_validators_root = Eth2Digest.fromHex(
"0xd8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078"
) # Sepolia Validators Root
lastEra1Block = 1450408'u64 # Sepolia
firstSlotAfterMerge =
if isDir(conf.eraDir.string):
115193'u64 # Sepolia
else:
notice "No eraDir found for Sepolia, block loading will stop after era1"
0'u64 # No eraDir for Sepolia
elif conf.networkId == HoleskyNet:
doAssert isDir(conf.eraDir.string), "Era directory not found"
clConfig = getMetadataForNetwork("holesky")
genesis_validators_root = Eth2Digest.fromHex(
"0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1"
) # Holesky Validators Root
lastEra1Block = 0'u64
firstSlotAfterMerge = 0'u64
else:
error "Unsupported network", network = conf.networkId
quit(QuitFailure)
nec_import_block_number.set(start.int64)
template blockNumber(): uint64 =
start + imported
func f(value: float): string =
try:
&"{value:4.3f}"
except ValueError:
raiseAssert "valid fmt string"
template process() =
let
time1 = Moment.now()
statsRes = chain.persistBlocks(blocks, flags)
if statsRes.isErr():
error "Failed to persist blocks", error = statsRes.error
quit(QuitFailure)
txs += statsRes[].txs
gas += statsRes[].gas
let
time2 = Moment.now()
diff1 = (time2 - time1).nanoseconds().float / 1000000000
diff0 = (time2 - time0).nanoseconds().float / 1000000000
info "Imported blocks",
blockNumber,
blocks = imported,
importedSlot,
txs,
mgas = f(gas.float / 1000000),
bps = f(blocks.len.float / diff1),
tps = f(statsRes[].txs.float / diff1),
mgps = f(statsRes[].gas.float / 1000000 / diff1),
avgBps = f(imported.float / diff0),
avgTps = f(txs.float / diff0),
avgMGps = f(gas.float / 1000000 / diff0),
elapsed = shortLog(time2 - time0, 3)
metrics.set(nec_import_block_number, int64(blockNumber))
nec_imported_blocks.inc(blocks.len)
nec_imported_transactions.inc(statsRes[].txs)
nec_imported_gas.inc(int64 statsRes[].gas)
if csv != nil:
# In the CSV, we store a line for every chunk of blocks processed so
# that the file can meaningfully be appended to when restarting the
# process - this way, each sample is independent
try:
csv.writeLine(
[
$blockNumber,
$blocks.len,
$importedSlot,
$statsRes[].txs,
$statsRes[].gas,
$(time2 - time1).nanoseconds(),
].join(",")
)
csv.flushFile()
except IOError as exc:
warn "Could not write csv", err = exc.msg
blocks.setLen(0)
# Finds the slot number to resume the import process
# First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1
# Then it iterates over the slots to find the current slot number, along with reducing the
# search space by calculating the difference between the `blockNumber` and the `block_number` from the executionPayload
# of the slot, then adding the difference to the importedSlot. This pushes the lower bound more,
# making the search way smaller
template updateLastImportedSlot(
era: EraDB,
historical_roots: openArray[Eth2Digest],
historical_summaries: openArray[HistoricalSummary],
) =
if blockNumber > 1:
# Setting the initial lower bound
importedSlot = (blockNumber - lastEra1Block) + firstSlotAfterMerge
notice "Finding slot number after resuming import", importedSlot
# BlockNumber based slot finding
var clNum = 0'u64
while clNum < blockNumber:
let blk = getEthBlockFromEra(
era, historical_roots, historical_summaries, Slot(importedSlot), clConfig.cfg
).valueOr:
importedSlot += 1
continue
clNum = blk.header.number
# decreasing the lower bound with each iteration
importedSlot += blockNumber - clNum
notice "Found the slot to start with", importedSlot
if isDir(conf.era1Dir.string) or isDir(conf.eraDir.string):
if start <= lastEra1Block:
notice "Importing era1 archive",
start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string
let db =
if conf.networkId == MainNet:
Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present")
# Mainnet
else:
Era1DbRef.init(conf.era1Dir.string, "sepolia").expect("Era files present")
# Sepolia
defer:
db.dispose()
while running and imported < conf.maxBlocks and blockNumber <= lastEra1Block:
Consolidate block type for block processing (#2325) This PR consolidates the split header-body sequences into a single EthBlock sequence and cleans up the fallout from that which significantly reduces block processing overhead during import thanks to less garbage collection and fewer copies of things all around. Notably, since the number of headers must always match the number of bodies, we also get rid of a pointless degree of freedom that in the future could introduce unnecessary bugs. * only read header and body from era file * avoid several unnecessary copies along the block processing way * simplify signatures, cleaning up unused arguemnts and returns * use `stew/assign2` in a few strategic places where the generated nim assignent is slow and add a few `move` to work around poor analysis in nim 1.6 (will need to be revisited for 2.0) ``` stats-20240607_2223-a814aa0b.csv vs stats-20240608_0714-21c1d0a9.csv bps_x bps_y tps_x tps_y bpsd tpsd timed block_number (498305, 713245] 1,540.52 1,809.73 2,361.58 2775.340189 17.63% 17.63% -14.92% (713245, 928185] 730.36 865.26 1,715.90 2028.973852 18.01% 18.01% -15.21% (928185, 1143126] 663.03 789.10 2,529.26 3032.490771 19.79% 19.79% -16.28% (1143126, 1358066] 393.46 508.05 2,152.50 2777.578119 29.13% 29.13% -22.50% (1358066, 1573007] 370.88 440.72 2,351.31 2791.896052 18.81% 18.81% -15.80% (1573007, 1787947] 283.65 335.11 2,068.93 2441.373402 17.60% 17.60% -14.91% (1787947, 2002888] 287.29 342.11 2,078.39 2474.179448 18.99% 18.99% -15.91% (2002888, 2217828] 293.38 343.16 2,208.83 2584.77457 17.16% 17.16% -14.61% (2217828, 2432769] 140.09 167.86 1,081.87 1296.336926 18.82% 18.82% -15.80% blocks: 1934464, baseline: 3h13m1s, contender: 2h43m47s bpsd (mean): 19.55% tpsd (mean): 19.55% Time (total): -29m13s, -15.14% ```
2024-06-09 14:32:20 +00:00
var blk = db.getEthBlock(blockNumber).valueOr:
error "Could not load block from era1", blockNumber, error
break
imported += 1
blocks.add blk
Consolidate block type for block processing (#2325) This PR consolidates the split header-body sequences into a single EthBlock sequence and cleans up the fallout from that which significantly reduces block processing overhead during import thanks to less garbage collection and fewer copies of things all around. Notably, since the number of headers must always match the number of bodies, we also get rid of a pointless degree of freedom that in the future could introduce unnecessary bugs. * only read header and body from era file * avoid several unnecessary copies along the block processing way * simplify signatures, cleaning up unused arguemnts and returns * use `stew/assign2` in a few strategic places where the generated nim assignent is slow and add a few `move` to work around poor analysis in nim 1.6 (will need to be revisited for 2.0) ``` stats-20240607_2223-a814aa0b.csv vs stats-20240608_0714-21c1d0a9.csv bps_x bps_y tps_x tps_y bpsd tpsd timed block_number (498305, 713245] 1,540.52 1,809.73 2,361.58 2775.340189 17.63% 17.63% -14.92% (713245, 928185] 730.36 865.26 1,715.90 2028.973852 18.01% 18.01% -15.21% (928185, 1143126] 663.03 789.10 2,529.26 3032.490771 19.79% 19.79% -16.28% (1143126, 1358066] 393.46 508.05 2,152.50 2777.578119 29.13% 29.13% -22.50% (1358066, 1573007] 370.88 440.72 2,351.31 2791.896052 18.81% 18.81% -15.80% (1573007, 1787947] 283.65 335.11 2,068.93 2441.373402 17.60% 17.60% -14.91% (1787947, 2002888] 287.29 342.11 2,078.39 2474.179448 18.99% 18.99% -15.91% (2002888, 2217828] 293.38 343.16 2,208.83 2584.77457 17.16% 17.16% -14.61% (2217828, 2432769] 140.09 167.86 1,081.87 1296.336926 18.82% 18.82% -15.80% blocks: 1934464, baseline: 3h13m1s, contender: 2h43m47s bpsd (mean): 19.55% tpsd (mean): 19.55% Time (total): -29m13s, -15.14% ```
2024-06-09 14:32:20 +00:00
if blocks.lenu64 mod conf.chunkSize == 0:
process()
Consolidate block type for block processing (#2325) This PR consolidates the split header-body sequences into a single EthBlock sequence and cleans up the fallout from that which significantly reduces block processing overhead during import thanks to less garbage collection and fewer copies of things all around. Notably, since the number of headers must always match the number of bodies, we also get rid of a pointless degree of freedom that in the future could introduce unnecessary bugs. * only read header and body from era file * avoid several unnecessary copies along the block processing way * simplify signatures, cleaning up unused arguemnts and returns * use `stew/assign2` in a few strategic places where the generated nim assignent is slow and add a few `move` to work around poor analysis in nim 1.6 (will need to be revisited for 2.0) ``` stats-20240607_2223-a814aa0b.csv vs stats-20240608_0714-21c1d0a9.csv bps_x bps_y tps_x tps_y bpsd tpsd timed block_number (498305, 713245] 1,540.52 1,809.73 2,361.58 2775.340189 17.63% 17.63% -14.92% (713245, 928185] 730.36 865.26 1,715.90 2028.973852 18.01% 18.01% -15.21% (928185, 1143126] 663.03 789.10 2,529.26 3032.490771 19.79% 19.79% -16.28% (1143126, 1358066] 393.46 508.05 2,152.50 2777.578119 29.13% 29.13% -22.50% (1358066, 1573007] 370.88 440.72 2,351.31 2791.896052 18.81% 18.81% -15.80% (1573007, 1787947] 283.65 335.11 2,068.93 2441.373402 17.60% 17.60% -14.91% (1787947, 2002888] 287.29 342.11 2,078.39 2474.179448 18.99% 18.99% -15.91% (2002888, 2217828] 293.38 343.16 2,208.83 2584.77457 17.16% 17.16% -14.61% (2217828, 2432769] 140.09 167.86 1,081.87 1296.336926 18.82% 18.82% -15.80% blocks: 1934464, baseline: 3h13m1s, contender: 2h43m47s bpsd (mean): 19.55% tpsd (mean): 19.55% Time (total): -29m13s, -15.14% ```
2024-06-09 14:32:20 +00:00
if blocks.len > 0:
process() # last chunk, if any
if blockNumber > lastEra1Block:
notice "Importing era archive",
start, dataDir = conf.dataDir.string, eraDir = conf.eraDir.string
let
eraDB = EraDB.new(clConfig.cfg, conf.eraDir.string, genesis_validators_root)
(historical_roots, historical_summaries, endSlot) = loadHistoricalRootsFromEra(
conf.eraDir.string, clConfig.cfg
).valueOr:
error "Error loading historical summaries", error
quit QuitFailure
# Load the last slot number
if blockNumber > lastEra1Block + 1:
updateLastImportedSlot(
eraDB, historical_roots.asSeq(), historical_summaries.asSeq()
)
if importedSlot < firstSlotAfterMerge and firstSlotAfterMerge != 0:
# if resuming import we do not update the slot
importedSlot = firstSlotAfterMerge
while running and imported < conf.maxBlocks and importedSlot < endSlot:
var blk = getEthBlockFromEra(
eraDB,
historical_roots.asSeq(),
historical_summaries.asSeq(),
Slot(importedSlot),
clConfig.cfg,
).valueOr:
importedSlot += 1
continue
blocks.add blk
imported += 1
importedSlot += 1
if blocks.lenu64 mod conf.chunkSize == 0:
process()
if blocks.len > 0:
process()
for blocksFile in conf.blocksFile:
if isFile(string blocksFile):
# success or not, we quit after importing blocks
if not importRlpBlock(string blocksFile, com):
quit(QuitFailure)
else:
quit(QuitSuccess)