From 9a499eb45f92715a556ec0ae2e4c0500c57ba2b4 Mon Sep 17 00:00:00 2001 From: Advaita Saha Date: Tue, 9 Jul 2024 18:58:01 +0530 Subject: [PATCH] Era support for nimbus import (#2429) * add the era-dir option * feat: support for era files in nimbus import * fix: metric logs * fix: eraDir check * fix: redundant code and sepolia support * fix: remove dependency from csv + formatting * fix: typo * fix: RVO * fix: parseBiggestInt * fix: opt impl * fix: network agnostic loading * fix: shift to int64 --- nimbus/config.nim | 8 + nimbus/nimbus_import.nim | 437 ++++++++++++++++++++++++++++++++------- 2 files changed, 367 insertions(+), 78 deletions(-) diff --git a/nimbus/config.nim b/nimbus/config.nim index 59141989f..0574d028a 100644 --- a/nimbus/config.nim +++ b/nimbus/config.nim @@ -148,6 +148,11 @@ type desc: "Directory where era1 (pre-merge) archive can be found" defaultValueDesc: "/era1" name: "era1-dir" }: Option[OutDir] + + eraDirOpt* {. + desc: "Directory where era (post-merge) archive can be found" + defaultValueDesc: "/era" + name: "era-dir" }: Option[OutDir] keyStore* {. desc: "Load one or more keystore files from this directory" @@ -784,6 +789,9 @@ func httpServerEnabled*(conf: NimbusConf): bool = func era1Dir*(conf: NimbusConf): OutDir = conf.era1DirOpt.get(OutDir(conf.dataDir.string & "/era1")) +func eraDir*(conf: NimbusConf): OutDir = + conf.eraDirOpt.get(OutDir(conf.dataDir.string & "/era")) + func dbOptions*(conf: NimbusConf): DbOptions = DbOptions.init( maxOpenFiles = conf.rocksdbMaxOpenFiles, diff --git a/nimbus/nimbus_import.nim b/nimbus/nimbus_import.nim index 854f5c839..92b180a7a 100644 --- a/nimbus/nimbus_import.nim +++ b/nimbus/nimbus_import.nim @@ -13,28 +13,198 @@ import chronicles, metrics, chronos/timer, - std/[strformat, strutils], + std/[os, strformat, strutils], stew/io2, ./config, ./common/common, ./core/[block_import, chain], ./db/era1_db, - beacon_chain/era_db + beacon_chain/era_db, + beacon_chain/networking/network_metadata, + beacon_chain/spec/[forks, helpers], + ./beacon/payload_conv -declareGauge nec_import_block_number, - "Latest imported block number" +declareGauge nec_import_block_number, "Latest imported block number" -declareCounter nec_imported_blocks, - "Blocks processed during import" +declareCounter nec_imported_blocks, "Blocks processed during import" -declareCounter nec_imported_transactions, - "Transactions processed during import" +declareCounter nec_imported_transactions, "Transactions processed during import" -declareCounter nec_imported_gas, - "Gas processed during import" +declareCounter nec_imported_gas, "Gas processed during import" var running {.volatile.} = true +proc latestEraFile*(eraDir: string, cfg: RuntimeConfig): Result[(string, Era), string] = + ## Find the latest era file in the era directory. + var + latestEra = 0 + latestEraFile = "" + + try: + for kind, obj in walkDir eraDir: + let (_, name, _) = splitFile(obj) + let parts = name.split('-') + if parts.len() == 3 and parts[0] == cfg.CONFIG_NAME: + let era = + try: + parseBiggestInt(parts[1]) + except ValueError: + return err("Invalid era number") + if era > latestEra: + latestEra = era + latestEraFile = obj + except OSError as e: + return err(e.msg) + + if latestEraFile == "": + err("No valid era files found") + else: + ok((latestEraFile, Era(latestEra))) + +proc loadHistoricalRootsFromEra*( + eraDir: string, cfg: RuntimeConfig +): Result[ + ( + HashList[Eth2Digest, Limit HISTORICAL_ROOTS_LIMIT], + HashList[HistoricalSummary, Limit HISTORICAL_ROOTS_LIMIT], + Slot, + ), + string, +] = + ## Load the historical_summaries from the latest era file. + let + (latestEraFile, latestEra) = ?latestEraFile(eraDir, cfg) + f = ?EraFile.open(latestEraFile) + slot = start_slot(latestEra) + var bytes: seq[byte] + + ?f.getStateSSZ(slot, bytes) + + if bytes.len() == 0: + return err("State not found") + + let state = + try: + newClone(readSszForkedHashedBeaconState(cfg, slot, bytes)) + except SerializationError as exc: + return err("Unable to read state: " & exc.msg) + + withState(state[]): + when consensusFork >= ConsensusFork.Capella: + return ok( + ( + forkyState.data.historical_roots, + forkyState.data.historical_summaries, + slot + 8192, + ) + ) + else: + return ok( + ( + forkyState.data.historical_roots, + HashList[HistoricalSummary, Limit HISTORICAL_ROOTS_LIMIT](), + slot + 8192, + ) + ) + +proc getBlockFromEra*( + db: EraDB, + historical_roots: openArray[Eth2Digest], + historical_summaries: openArray[HistoricalSummary], + slot: Slot, + cfg: RuntimeConfig, +): Opt[ForkedTrustedSignedBeaconBlock] = + let fork = cfg.consensusForkAtEpoch(slot.epoch) + result.ok(ForkedTrustedSignedBeaconBlock(kind: fork)) + withBlck(result.get()): + type T = type(forkyBlck) + forkyBlck = db.getBlock( + historical_roots, historical_summaries, slot, Opt[Eth2Digest].err(), T + ).valueOr: + result.err() + return + +proc getTxs*(txs: seq[bellatrix.Transaction]): seq[common.Transaction] = + var transactions = newSeqOfCap[common.Transaction](txs.len) + for tx in txs: + try: + transactions.add(rlp.decode(tx.asSeq(), common.Transaction)) + except RlpError: + return @[] + return transactions + +proc getWithdrawals*(x: seq[capella.Withdrawal]): seq[common.Withdrawal] = + var withdrawals = newSeqOfCap[common.Withdrawal](x.len) + for w in x: + withdrawals.add( + common.Withdrawal( + index: w.index, + validatorIndex: w.validator_index, + address: EthAddress(w.address.data), + amount: uint64(w.amount), + ) + ) + return withdrawals + +proc getEth1Block*(blck: ForkedTrustedSignedBeaconBlock): EthBlock = + ## Convert a beacon block to an eth1 block. + withBlck(blck): + when consensusFork >= ConsensusFork.Bellatrix: + let + payload = forkyBlck.message.body.execution_payload + txs = getTxs(payload.transactions.asSeq()) + ethWithdrawals = + when consensusFork >= ConsensusFork.Capella: + Opt.some(getWithdrawals(payload.withdrawals.asSeq())) + else: + Opt.none(seq[common.Withdrawal]) + withdrawalRoot = + when consensusFork >= ConsensusFork.Capella: + Opt.some(calcWithdrawalsRoot(ethWithdrawals.get())) + else: + Opt.none(common.Hash256) + blobGasUsed = + when consensusFork >= ConsensusFork.Deneb: + Opt.some(payload.blob_gas_used) + else: + Opt.none(uint64) + excessBlobGas = + when consensusFork >= ConsensusFork.Deneb: + Opt.some(payload.excess_blob_gas) + else: + Opt.none(uint64) + parentBeaconBlockRoot = + when consensusFork >= ConsensusFork.Deneb: + Opt.some(forkyBlck.message.parent_root) + else: + Opt.none(common.Hash256) + + let header = BlockHeader( + parentHash: payload.parent_hash, + ommersHash: EMPTY_UNCLE_HASH, + coinbase: EthAddress(payload.fee_recipient.data), + stateRoot: payload.state_root, + txRoot: calcTxRoot(txs), + receiptsRoot: payload.receipts_root, + logsBloom: BloomFilter(payload.logs_bloom.data), + difficulty: 0.u256, + number: payload.block_number, + gasLimit: GasInt(payload.gas_limit), + gasUsed: GasInt(payload.gas_used), + timestamp: EthTime(payload.timestamp), + extraData: payload.extra_data.asSeq(), + mixHash: payload.prev_randao, + nonce: default(BlockNonce), + baseFeePerGas: Opt.some(payload.base_fee_per_gas), + withdrawalsRoot: withdrawalRoot, + blobGasUsed: blobGasUsed, + excessBlobGas: excessBlobGas, + parentBeaconBlockRoot: parentBeaconBlockRoot, + ) + return EthBlock( + header: header, transactions: txs, uncles: @[], withdrawals: ethWithdrawals + ) + func shortLog(a: timer.Duration, parts = int.high): string {.inline.} = ## Returns string representation of Duration ``a`` as nanoseconds value. var @@ -83,6 +253,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = var imported = 0'u64 + importedSlot = 1'u64 gas = GasInt(0) txs = 0 time0 = Moment.now() @@ -90,8 +261,9 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = if conf.csvStats.isSome: try: let f = open(conf.csvStats.get(), fmAppend) - if f.getFileSize() == 0: - f.writeLine("block_number,blocks,txs,gas,time") + let pos = f.getFileSize() + if pos == 0: + f.writeLine("block_number,blocks,slot,txs,gas,time") f except IOError as exc: error "Could not open statistics output file", @@ -104,89 +276,156 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = boolFlag({PersistBlockFlag.NoFullValidation}, not conf.fullValidation) + boolFlag(NoPersistBodies, not conf.storeBodies) + boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) + blocks: seq[EthBlock] + clConfig: Eth2NetworkMetadata + genesis_validators_root: Eth2Digest + lastEra1Block: uint64 + firstSlotAfterMerge: uint64 defer: if csv != nil: close(csv) + # Network Specific Configurations + # TODO: the merge block number could be fetched from the era1 file instead, + # specially if the accumulator is added to the chain metadata + if conf.networkId == MainNet: + doAssert isDir(conf.era1Dir.string), "Era1 directory not found" + clConfig = getMetadataForNetwork("mainnet") + genesis_validators_root = Eth2Digest.fromHex( + "0x4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95" + ) # Mainnet Validators Root + lastEra1Block = 15537393'u64 # Mainnet + firstSlotAfterMerge = + if isDir(conf.eraDir.string): + 4700013'u64 # Mainnet + else: + notice "No eraDir found for Mainnet, block loading will stop after era1" + 0'u64 # No eraDir for Mainnet + elif conf.networkId == SepoliaNet: + doAssert isDir(conf.era1Dir.string), "Era1 directory not found" + clConfig = getMetadataForNetwork("sepolia") + genesis_validators_root = Eth2Digest.fromHex( + "0xd8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078" + ) # Sepolia Validators Root + lastEra1Block = 1450409'u64 # Sepolia + firstSlotAfterMerge = + if isDir(conf.eraDir.string): + 115193'u64 # Sepolia + else: + notice "No eraDir found for Sepolia, block loading will stop after era1" + 0'u64 # No eraDir for Sepolia + elif conf.networkId == HoleskyNet: + doAssert isDir(conf.eraDir.string), "Era directory not found" + clConfig = getMetadataForNetwork("holesky") + genesis_validators_root = Eth2Digest.fromHex( + "0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1" + ) # Holesky Validators Root + lastEra1Block = 0'u64 + firstSlotAfterMerge = 0'u64 + else: + error "Unsupported network", network = conf.networkId + quit(QuitFailure) + nec_import_block_number.set(start.int64) template blockNumber(): uint64 = start + imported - if isDir(conf.era1Dir.string): - doAssert conf.networkId == MainNet, "Only mainnet era1 current supported" + func f(value: float): string = + try: + &"{value:4.3f}" + except ValueError: + raiseAssert "valid fmt string" - const - # TODO the merge block number could be fetched from the era1 file instead, - # specially if the accumulator is added to the chain metadata - lastEra1Block = 15537393 + template process() = + let + time1 = Moment.now() + statsRes = chain.persistBlocks(blocks, flags) + if statsRes.isErr(): + error "Failed to persist blocks", error = statsRes.error + quit(QuitFailure) + txs += statsRes[].txs + gas += statsRes[].gas + let + time2 = Moment.now() + diff1 = (time2 - time1).nanoseconds().float / 1000000000 + diff0 = (time2 - time0).nanoseconds().float / 1000000000 + + info "Imported blocks", + blockNumber, + blocks = imported, + importedSlot, + txs, + mgas = f(gas.float / 1000000), + bps = f(blocks.len.float / diff1), + tps = f(statsRes[].txs.float / diff1), + mgps = f(statsRes[].gas.float / 1000000 / diff1), + avgBps = f(imported.float / diff0), + avgTps = f(txs.float / diff0), + avgMGps = f(gas.float / 1000000 / diff0), + elapsed = shortLog(time2 - time0, 3) + + metrics.set(nec_import_block_number, int64(blockNumber)) + nec_imported_blocks.inc(blocks.len) + nec_imported_transactions.inc(statsRes[].txs) + nec_imported_gas.inc(int64 statsRes[].gas) + + if csv != nil: + # In the CSV, we store a line for every chunk of blocks processed so + # that the file can meaningfully be appended to when restarting the + # process - this way, each sample is independent + try: + csv.writeLine( + [ + $blockNumber, + $blocks.len, + $importedSlot, + $statsRes[].txs, + $statsRes[].gas, + $(time2 - time1).nanoseconds(), + ].join(",") + ) + csv.flushFile() + except IOError as exc: + warn "Could not write csv", err = exc.msg + blocks.setLen(0) + + template updateLastImportedSlot( + era: EraDB, + historical_roots: openArray[Eth2Digest], + historical_summaries: openArray[HistoricalSummary], + ) = + if blockNumber > 1: + importedSlot = blockNumber + notice "Finding slot number after resuming import", importedSlot + var parentHash: common.Hash256 + let currentHash = com.db.getHeadBlockHash() + while currentHash != parentHash: + let clBlock = getBlockFromEra( + era, historical_roots, historical_summaries, Slot(importedSlot), clConfig.cfg + ) + if clBlock.isSome: + let ethBlock = getEth1Block(clBlock.get()) + parentHash = ethBlock.header.parentHash + + importedSlot += 1 + importedSlot -= 1 + notice "Found the slot to start with", importedSlot + + if isDir(conf.era1Dir.string) or isDir(conf.eraDir.string): if start <= lastEra1Block: notice "Importing era1 archive", start, dataDir = conf.dataDir.string, era1Dir = conf.era1Dir.string - var blocks: seq[EthBlock] - - func f(value: float): string = - try: - &"{value:4.3f}" - except ValueError: - raiseAssert "valid fmt string" - - template process() = - let - time1 = Moment.now() - statsRes = chain.persistBlocks(blocks, flags) - if statsRes.isErr(): - error "Failed to persist blocks", error = statsRes.error - quit(QuitFailure) - - txs += statsRes[].txs - gas += statsRes[].gas - let - time2 = Moment.now() - diff1 = (time2 - time1).nanoseconds().float / 1000000000 - diff0 = (time2 - time0).nanoseconds().float / 1000000000 - - info "Imported blocks", - blockNumber, - blocks = imported, - txs, - mgas = f(gas.float / 1000000), - bps = f(blocks.len.float / diff1), - tps = f(statsRes[].txs.float / diff1), - mgps = f(statsRes[].gas.float / 1000000 / diff1), - avgBps = f(imported.float / diff0), - avgTps = f(txs.float / diff0), - avgMGps = f(gas.float / 1000000 / diff0), - elapsed = shortLog(time2 - time0, 3) - - metrics.set(nec_import_block_number, int64(blockNumber)) - nec_imported_blocks.inc(blocks.len) - nec_imported_transactions.inc(statsRes[].txs) - nec_imported_gas.inc(int64 statsRes[].gas) - - if csv != nil: - # In the CSV, we store a line for every chunk of blocks processed so - # that the file can meaningfully be appended to when restarting the - # process - this way, each sample is independent - try: - csv.writeLine( - [ - $blockNumber, - $blocks.len, - $statsRes[].txs, - $statsRes[].gas, - $(time2 - time1).nanoseconds(), - ].join(",") - ) - csv.flushFile() - except IOError as exc: - warn "Could not write csv", err = exc.msg - blocks.setLen(0) let db = - Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present") + if conf.networkId == MainNet: + Era1DbRef.init(conf.era1Dir.string, "mainnet").expect("Era files present") + # Mainnet + else: + Era1DbRef.init(conf.era1Dir.string, "sepolia").expect("Era files present") + # Sepolia defer: db.dispose() @@ -204,6 +443,48 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = if blocks.len > 0: process() # last chunk, if any + if start > lastEra1Block: + notice "Importing era archive", + start, dataDir = conf.dataDir.string, eraDir = conf.eraDir.string + + let + eraDB = EraDB.new(clConfig.cfg, conf.eraDir.string, genesis_validators_root) + (historical_roots, historical_summaries, endSlot) = loadHistoricalRootsFromEra( + conf.eraDir.string, clConfig.cfg + ).valueOr: + error "Error loading historical summaries", error + quit QuitFailure + + # Load the last slot number + updateLastImportedSlot( + eraDB, historical_roots.asSeq(), historical_summaries.asSeq() + ) + + if importedSlot < firstSlotAfterMerge and firstSlotAfterMerge != 0: + # if resuming import we do not update the slot + importedSlot = firstSlotAfterMerge + + while running and imported < conf.maxBlocks and importedSlot < endSlot: + let clblock = getBlockFromEra( + eraDB, + historical_roots.asSeq(), + historical_summaries.asSeq(), + Slot(importedSlot), + clConfig.cfg, + ).valueOr: + importedSlot += 1 + continue + + blocks.add getEth1Block(clblock) + imported += 1 + + importedSlot += 1 + if blocks.lenu64 mod conf.chunkSize == 0: + process() + + if blocks.len > 0: + process() + for blocksFile in conf.blocksFile: if isFile(string blocksFile): # success or not, we quit after importing blocks