Era support for nimbus import (#2429)

* add the era-dir option

* feat: support for era files in nimbus import

* fix: metric logs

* fix: eraDir check

* fix: redundant code and sepolia support

* fix: remove dependency from csv + formatting

* fix: typo

* fix: RVO

* fix: parseBiggestInt

* fix: opt impl

* fix: network agnostic loading

* fix: shift to int64
This commit is contained in:
Advaita Saha 2024-07-09 18:58:01 +05:30 committed by GitHub
parent 4fa3756860
commit 9a499eb45f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 367 additions and 78 deletions

View File

@ -148,6 +148,11 @@ type
desc: "Directory where era1 (pre-merge) archive can be found"
defaultValueDesc: "<data-dir>/era1"
name: "era1-dir" }: Option[OutDir]
eraDirOpt* {.
desc: "Directory where era (post-merge) archive can be found"
defaultValueDesc: "<data-dir>/era"
name: "era-dir" }: Option[OutDir]
keyStore* {.
desc: "Load one or more keystore files from this directory"
@ -784,6 +789,9 @@ func httpServerEnabled*(conf: NimbusConf): bool =
func era1Dir*(conf: NimbusConf): OutDir =
conf.era1DirOpt.get(OutDir(conf.dataDir.string & "/era1"))
func eraDir*(conf: NimbusConf): OutDir =
conf.eraDirOpt.get(OutDir(conf.dataDir.string & "/era"))
func dbOptions*(conf: NimbusConf): DbOptions =
DbOptions.init(
maxOpenFiles = conf.rocksdbMaxOpenFiles,

View File

@ -13,28 +13,198 @@ import
chronicles,
metrics,
chronos/timer,
std/[strformat, strutils],
std/[os, strformat, strutils],
stew/io2,
./config,
./common/common,
./core/[block_import, chain],
./db/era1_db,
beacon_chain/era_db
beacon_chain/era_db,
beacon_chain/networking/network_metadata,
beacon_chain/spec/[forks, helpers],
./beacon/payload_conv
declareGauge nec_import_block_number,
"Latest imported block number"
declareGauge nec_import_block_number, "Latest imported block number"
declareCounter nec_imported_blocks,
"Blocks processed during import"
declareCounter nec_imported_blocks, "Blocks processed during import"
declareCounter nec_imported_transactions,
"Transactions processed during import"
declareCounter nec_imported_transactions, "Transactions processed during import"
declareCounter nec_imported_gas,
"Gas processed during import"
declareCounter nec_imported_gas, "Gas processed during import"
var running {.volatile.} = true
proc latestEraFile*(eraDir: string, cfg: RuntimeConfig): Result[(string, Era), string] =
## Find the latest era file in the era directory.
var
latestEra = 0
latestEraFile = ""
try:
for kind, obj in walkDir eraDir:
let (_, name, _) = splitFile(obj)
let parts = name.split('-')
if parts.len() == 3 and parts[0] == cfg.CONFIG_NAME:
let era =
try:
parseBiggestInt(parts[1])
except ValueError:
return err("Invalid era number")
if era > latestEra:
latestEra = era
latestEraFile = obj
except OSError as e:
return err(e.msg)
if latestEraFile == "":
err("No valid era files found")
else:
ok((latestEraFile, Era(latestEra)))
proc loadHistoricalRootsFromEra*(
eraDir: string, cfg: RuntimeConfig
): Result[
(
HashList[Eth2Digest, Limit HISTORICAL_ROOTS_LIMIT],
HashList[HistoricalSummary, Limit HISTORICAL_ROOTS_LIMIT],
Slot,
),
string,
] =
## Load the historical_summaries from the latest era file.
let
(latestEraFile, latestEra) = ?latestEraFile(eraDir, cfg)
f = ?EraFile.open(latestEraFile)
slot = start_slot(latestEra)
var bytes: seq[byte]
?f.getStateSSZ(slot, bytes)
if bytes.len() == 0:
return err("State not found")
let state =
try:
newClone(readSszForkedHashedBeaconState(cfg, slot, bytes))
except SerializationError as exc:
return err("Unable to read state: " & exc.msg)
withState(state[]):
when consensusFork >= ConsensusFork.Capella:
return ok(
(
forkyState.data.historical_roots,
forkyState.data.historical_summaries,
slot + 8192,
)
)
else:
return ok(
(
forkyState.data.historical_roots,
HashList[HistoricalSummary, Limit HISTORICAL_ROOTS_LIMIT](),
slot + 8192,
)
)
proc getBlockFromEra*(
db: EraDB,
historical_roots: openArray[Eth2Digest],
historical_summaries: openArray[HistoricalSummary],
slot: Slot,
cfg: RuntimeConfig,
): Opt[ForkedTrustedSignedBeaconBlock] =
let fork = cfg.consensusForkAtEpoch(slot.epoch)
result.ok(ForkedTrustedSignedBeaconBlock(kind: fork))
withBlck(result.get()):
type T = type(forkyBlck)
forkyBlck = db.getBlock(
historical_roots, historical_summaries, slot, Opt[Eth2Digest].err(), T
).valueOr:
result.err()
return
proc getTxs*(txs: seq[bellatrix.Transaction]): seq[common.Transaction] =
var transactions = newSeqOfCap[common.Transaction](txs.len)
for tx in txs:
try:
transactions.add(rlp.decode(tx.asSeq(), common.Transaction))
except RlpError:
return @[]
return transactions
proc getWithdrawals*(x: seq[capella.Withdrawal]): seq[common.Withdrawal] =
var withdrawals = newSeqOfCap[common.Withdrawal](x.len)
for w in x:
withdrawals.add(
common.Withdrawal(
index: w.index,
validatorIndex: w.validator_index,
address: EthAddress(w.address.data),
amount: uint64(w.amount),
)
)
return withdrawals
proc getEth1Block*(blck: ForkedTrustedSignedBeaconBlock): EthBlock =
## Convert a beacon block to an eth1 block.
withBlck(blck):
when consensusFork >= ConsensusFork.Bellatrix:
let
payload = forkyBlck.message.body.execution_payload
txs = getTxs(payload.transactions.asSeq())
ethWithdrawals =
when consensusFork >= ConsensusFork.Capella:
Opt.some(getWithdrawals(payload.withdrawals.asSeq()))
else:
Opt.none(seq[common.Withdrawal])
withdrawalRoot =
when consensusFork >= ConsensusFork.Capella:
Opt.some(calcWithdrawalsRoot(ethWithdrawals.get()))
else:
Opt.none(common.Hash256)
blobGasUsed =
when consensusFork >= ConsensusFork.Deneb:
Opt.some(payload.blob_gas_used)
else:
Opt.none(uint64)
excessBlobGas =
when consensusFork >= ConsensusFork.Deneb:
Opt.some(payload.excess_blob_gas)
else:
Opt.none(uint64)
parentBeaconBlockRoot =
when consensusFork >= ConsensusFork.Deneb:
Opt.some(forkyBlck.message.parent_root)
else:
Opt.none(common.Hash256)
let header = BlockHeader(
parentHash: payload.parent_hash,
ommersHash: EMPTY_UNCLE_HASH,
coinbase: EthAddress(payload.fee_recipient.data),
stateRoot: payload.state_root,
txRoot: calcTxRoot(txs),
receiptsRoot: payload.receipts_root,
logsBloom: BloomFilter(payload.logs_bloom.data),
difficulty: 0.u256,
number: payload.block_number,
gasLimit: GasInt(payload.gas_limit),
gasUsed: GasInt(payload.gas_used),
timestamp: EthTime(payload.timestamp),
extraData: payload.extra_data.asSeq(),
mixHash: payload.prev_randao,
nonce: default(BlockNonce),
baseFeePerGas: Opt.some(payload.base_fee_per_gas),
withdrawalsRoot: withdrawalRoot,
blobGasUsed: blobGasUsed,
excessBlobGas: excessBlobGas,
parentBeaconBlockRoot: parentBeaconBlockRoot,
)
return EthBlock(
header: header, transactions: txs, uncles: @[], withdrawals: ethWithdrawals
)
func shortLog(a: timer.Duration, parts = int.high): string {.inline.} =
## Returns string representation of Duration ``a`` as nanoseconds value.
var
@ -83,6 +253,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
var
imported = 0'u64
importedSlot = 1'u64
gas = GasInt(0)
txs = 0
time0 = Moment.now()
@ -90,8 +261,9 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
if conf.csvStats.isSome:
try:
let f = open(conf.csvStats.get(), fmAppend)
if f.getFileSize() == 0:
f.writeLine("block_number,blocks,txs,gas,time")
let pos = f.getFileSize()
if pos == 0:
f.writeLine("block_number,blocks,slot,txs,gas,time")
f
except IOError as exc:
error "Could not open statistics output file",
@ -104,89 +276,156 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
boolFlag({PersistBlockFlag.NoFullValidation}, not conf.fullValidation) +
boolFlag(NoPersistBodies, not conf.storeBodies) +
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts)
blocks: seq[EthBlock]
clConfig: Eth2NetworkMetadata
genesis_validators_root: Eth2Digest
lastEra1Block: uint64
firstSlotAfterMerge: uint64
defer:
if csv != nil:
close(csv)
# Network Specific Configurations
# TODO: the merge block number could be fetched from the era1 file instead,
# specially if the accumulator is added to the chain metadata
if conf.networkId == MainNet:
doAssert isDir(conf.era1Dir.string), "Era1 directory not found"
clConfig = getMetadataForNetwork("mainnet")
genesis_validators_root = Eth2Digest.fromHex(
"0x4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95"
) # Mainnet Validators Root
lastEra1Block = 15537393'u64 # Mainnet
firstSlotAfterMerge =
if isDir(conf.eraDir.string):
4700013'u64 # Mainnet
else:
notice "No eraDir found for Mainnet, block loading will stop after era1"
0'u64 # No eraDir for Mainnet
elif conf.networkId == SepoliaNet:
doAssert isDir(conf.era1Dir.string), "Era1 directory not found"
clConfig = getMetadataForNetwork("sepolia")
genesis_validators_root = Eth2Digest.fromHex(
"0xd8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078"
) # Sepolia Validators Root
lastEra1Block = 1450409'u64 # Sepolia
firstSlotAfterMerge =
if isDir(conf.eraDir.string):
115193'u64 # Sepolia
else:
notice "No eraDir found for Sepolia, block loading will stop after era1"
0'u64 # No eraDir for Sepolia
elif conf.networkId == HoleskyNet:
doAssert isDir(conf.eraDir.string), "Era directory not found"
clConfig = getMetadataForNetwork("holesky")
genesis_validators_root = Eth2Digest.fromHex(
"0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1"
) # Holesky Validators Root
lastEra1Block = 0'u64
firstSlotAfterMerge = 0'u64
else:
error "Unsupported network", network = conf.networkId
quit(QuitFailure)
nec_import_block_number.set(start.int64)
template blockNumber(): uint64 =
start + imported
if isDir(conf.era1Dir.string):
doAssert conf.networkId == MainNet, "Only mainnet era1 current supported"
func f(value: float): string =
try:
&"{value:4.3f}"
except ValueError:
raiseAssert "valid fmt string"
const
# TODO the merge block number could be fetched from the era1 file instead,
# specially if the accumulator is added to the chain metadata
lastEra1Block = 15537393
template process() =
let
time1 = Moment.now()
statsRes = chain.persistBlocks(blocks, flags)
if statsRes.isErr():
error "Failed to persist blocks", error = statsRes.error
quit(QuitFailure)
txs += statsRes[].txs
gas += statsRes[].gas
let
time2 = Moment.now()
diff1 = (time2 - time1).nanoseconds().float / 1000000000
diff0 = (time2 - time0).nanoseconds().float / 1000000000
info "Imported blocks",
blockNumber,
blocks = imported,
importedSlot,
txs,
mgas = f(gas.float / 1000000),
bps = f(blocks.len.float / diff1),
tps = f(statsRes[].txs.float / diff1),
mgps = f(statsRes[].gas.float / 1000000 / diff1),
avgBps = f(imported.float / diff0),
avgTps = f(txs.float / diff0),
avgMGps = f(gas.float / 1000000 / diff0),
elapsed = shortLog(time2 - time0, 3)
metrics.set(nec_import_block_number, int64(blockNumber))
nec_imported_blocks.inc(blocks.len)
nec_imported_transactions.inc(statsRes[].txs)
nec_imported_gas.inc(int64 statsRes[].gas)
if csv != nil:
# In the CSV, we store a line for every chunk of blocks processed so
# that the file can meaningfully be appended to when restarting the
# process - this way, each sample is independent
try:
csv.writeLine(
[
$blockNumber,
$blocks.len,
$importedSlot,
$statsRes[].txs,
$statsRes[].gas,
$(time2 - time1).nanoseconds(),
].join(",")
)
csv.flushFile()
except IOError as exc:
warn "Could not write csv", err = exc.msg
blocks.setLen(0)
template updateLastImportedSlot(
era: EraDB,
historical_roots: openArray[Eth2Digest],
historical_summaries: openArray[HistoricalSummary],
) =
if blockNumber > 1:
importedSlot = blockNumber
notice "Finding slot number after resuming import", importedSlot
var parentHash: common.Hash256
let currentHash = com.db.getHeadBlockHash()
while currentHash != parentHash:
let clBlock = getBlockFromEra(
era, historical_roots, historical_summaries, Slot(importedSlot), clConfig.cfg
)
if clBlock.isSome:
let ethBlock = getEth1Block(clBlock.get())
parentHash = ethBlock.header.parentHash
importedSlot += 1
importedSlot -= 1
notice "Found the slot to start with", importedSlot
if isDir(conf.era1Dir.string) or isDir(conf.eraDir.string):
if start <= lastEra1Block:
notice "Importing era1 archive",
start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string
var blocks: seq[EthBlock]
func f(value: float): string =
try:
&"{value:4.3f}"
except ValueError:
raiseAssert "valid fmt string"
template process() =
let
time1 = Moment.now()
statsRes = chain.persistBlocks(blocks, flags)
if statsRes.isErr():
error "Failed to persist blocks", error = statsRes.error
quit(QuitFailure)
txs += statsRes[].txs
gas += statsRes[].gas
let
time2 = Moment.now()
diff1 = (time2 - time1).nanoseconds().float / 1000000000
diff0 = (time2 - time0).nanoseconds().float / 1000000000
info "Imported blocks",
blockNumber,
blocks = imported,
txs,
mgas = f(gas.float / 1000000),
bps = f(blocks.len.float / diff1),
tps = f(statsRes[].txs.float / diff1),
mgps = f(statsRes[].gas.float / 1000000 / diff1),
avgBps = f(imported.float / diff0),
avgTps = f(txs.float / diff0),
avgMGps = f(gas.float / 1000000 / diff0),
elapsed = shortLog(time2 - time0, 3)
metrics.set(nec_import_block_number, int64(blockNumber))
nec_imported_blocks.inc(blocks.len)
nec_imported_transactions.inc(statsRes[].txs)
nec_imported_gas.inc(int64 statsRes[].gas)
if csv != nil:
# In the CSV, we store a line for every chunk of blocks processed so
# that the file can meaningfully be appended to when restarting the
# process - this way, each sample is independent
try:
csv.writeLine(
[
$blockNumber,
$blocks.len,
$statsRes[].txs,
$statsRes[].gas,
$(time2 - time1).nanoseconds(),
].join(",")
)
csv.flushFile()
except IOError as exc:
warn "Could not write csv", err = exc.msg
blocks.setLen(0)
let db =
Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present")
if conf.networkId == MainNet:
Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present")
# Mainnet
else:
Era1DbRef.init(conf.era1Dir.string, "sepolia").expect("Era files present")
# Sepolia
defer:
db.dispose()
@ -204,6 +443,48 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
if blocks.len > 0:
process() # last chunk, if any
if start > lastEra1Block:
notice "Importing era archive",
start, dataDir = conf.dataDir.string, eraDir = conf.eraDir.string
let
eraDB = EraDB.new(clConfig.cfg, conf.eraDir.string, genesis_validators_root)
(historical_roots, historical_summaries, endSlot) = loadHistoricalRootsFromEra(
conf.eraDir.string, clConfig.cfg
).valueOr:
error "Error loading historical summaries", error
quit QuitFailure
# Load the last slot number
updateLastImportedSlot(
eraDB, historical_roots.asSeq(), historical_summaries.asSeq()
)
if importedSlot < firstSlotAfterMerge and firstSlotAfterMerge != 0:
# if resuming import we do not update the slot
importedSlot = firstSlotAfterMerge
while running and imported < conf.maxBlocks and importedSlot < endSlot:
let clblock = getBlockFromEra(
eraDB,
historical_roots.asSeq(),
historical_summaries.asSeq(),
Slot(importedSlot),
clConfig.cfg,
).valueOr:
importedSlot += 1
continue
blocks.add getEth1Block(clblock)
imported += 1
importedSlot += 1
if blocks.lenu64 mod conf.chunkSize == 0:
process()
if blocks.len > 0:
process()
for blocksFile in conf.blocksFile:
if isFile(string blocksFile):
# success or not, we quit after importing blocks