small cleanups (#2526)

* remove some redundant EH
* avoid pessimising move (introduces a copy in this case!)
* shift less data around when reading era files (reduces stack usage)
This commit is contained in:
Jacek Sieka 2024-07-26 07:32:01 +02:00 committed by GitHub
parent 7e2a636717
commit bdc86b3fd4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 112 additions and 149 deletions

View File

@ -357,12 +357,9 @@ proc setHead(c: ForkedChainRef,
headHash: Hash256,
number: BlockNumber) =
# TODO: db.setHead should not read from db anymore
# and raise RlpError, all canonical chain marking
# all canonical chain marking
# should be done from here.
try:
discard c.db.setHead(headHash)
except RlpError as exc:
raiseAssert(exc.msg)
discard c.db.setHead(headHash)
# update global syncHighest
c.com.syncHighest = number

View File

@ -201,10 +201,7 @@ proc persistBlocksImpl(
let n = fromBlock div CleanUpEpoch
if 0 < n and n < (toBlock div CleanUpEpoch):
# Starts at around `2 * CleanUpEpoch`
try:
c.db.purgeOlderBlocksFromHistory(fromBlock - CleanUpEpoch)
except CatchableError as exc:
warn "Could not clean up old blocks from history", err = exc.msg
c.db.purgeOlderBlocksFromHistory(fromBlock - CleanUpEpoch)
ok((blks, txs, gas))
@ -222,31 +219,22 @@ proc insertBlockWithoutSetHead*(c: ChainRef, blk: EthBlock): Result[void, string
proc setCanonical*(c: ChainRef, header: BlockHeader): Result[void, string] =
if header.parentHash == Hash256():
try:
if not c.db.setHead(header.blockHash):
return err("setHead failed")
except RlpError as exc:
# TODO fix exception+bool error return
return err(exc.msg)
if not c.db.setHead(header):
return err("setHead failed")
return ok()
var body: BlockBody
try:
if not c.db.getBlockBody(header, body):
debug "Failed to get BlockBody", hash = header.blockHash
return err("Could not get block body")
except RlpError as exc:
return err(exc.msg)
if not c.db.getBlockBody(header, body):
debug "Failed to get BlockBody", hash = header.blockHash
return err("Could not get block body")
discard
?c.persistBlocksImpl(
[EthBlock.init(header, move(body))], {NoPersistHeader, NoPersistTransactions}
)
try:
discard c.db.setHead(header.blockHash)
except RlpError as exc:
return err(exc.msg)
if not c.db.setHead(header):
return err("setHead failed")
ok()
proc setCanonical*(c: ChainRef, blockHash: Hash256): Result[void, string] =

View File

@ -11,7 +11,7 @@
{.push raises: [].}
import
std/strutils,
std/strformat,
results,
../../common/common,
../../db/ledger,
@ -48,12 +48,8 @@ proc commitOrRollbackDependingOnGasUsed(
# an early stop. It would rather detect differing values for the block
# header `gasUsed` and the `vmState.cumulativeGasUsed` at a later stage.
if header.gasLimit < vmState.cumulativeGasUsed + gasBurned:
try:
vmState.stateDB.rollback(accTx)
return err("invalid tx: block header gasLimit reached. gasLimit=$1, gasUsed=$2, addition=$3" % [
$header.gasLimit, $vmState.cumulativeGasUsed, $gasBurned])
except ValueError as ex:
return err(ex.msg)
vmState.stateDB.rollback(accTx)
return err(&"invalid tx: block header gasLimit reached. gasLimit={header.gasLimit}, gasUsed={vmState.cumulativeGasUsed}, addition={gasBurned}")
else:
# Accept transaction and collect mining fee.
vmState.stateDB.commit(accTx)

View File

@ -32,11 +32,7 @@ proc validateGasLimit(header: BlockHeader; limit: GasInt): Result[void,string] =
let upperLimit = limit div GAS_LIMIT_ADJUSTMENT_FACTOR
if diff >= upperLimit:
try:
return err(&"invalid gas limit: have {header.gasLimit}, want {limit} +-= {upperLimit-1}")
except ValueError:
# TODO deprecate-strformat
raiseAssert "strformat cannot fail"
return err(&"invalid gas limit: have {header.gasLimit}, want {limit} +-= {upperLimit-1}")
if header.gasLimit < GAS_LIMIT_MINIMUM:
return err("invalid gas limit below 5000")
ok()
@ -77,14 +73,10 @@ proc verifyEip1559Header(com: CommonRef;
# Verify the baseFee is correct based on the parent header.
var expectedBaseFee = com.calcEip1599BaseFee(parent)
if headerBaseFee != expectedBaseFee:
try:
return err(&"invalid baseFee: have {expectedBaseFee}, "&
&"want {header.baseFeePerGas}, " &
&"parent.baseFee {parent.baseFeePerGas}, "&
&"parent.gasUsed {parent.gasUsed}")
except ValueError:
# TODO deprecate-strformat
raiseAssert "strformat cannot fail"
return err(&"invalid baseFee: have {expectedBaseFee}, "&
&"want {header.baseFeePerGas}, " &
&"parent.baseFee {parent.baseFeePerGas}, "&
&"parent.gasUsed {parent.gasUsed}")
return ok()

View File

@ -47,7 +47,7 @@ logScope:
# use it as a stack/lifo as the ordering is reversed
proc insert(xp: TxPoolRef; kq: TxHeadDiffRef; blockHash: Hash256)
{.gcsafe,raises: [CatchableError].} =
{.raises: [BlockNotFound].} =
let db = xp.chain.com.db
for tx in db.getBlockBody(blockHash).transactions:
if tx.versionedHashes.len > 0:
@ -59,7 +59,7 @@ proc insert(xp: TxPoolRef; kq: TxHeadDiffRef; blockHash: Hash256)
kq.addTxs[tx.itemID] = PooledTransaction(tx: tx)
proc remove(xp: TxPoolRef; kq: TxHeadDiffRef; blockHash: Hash256)
{.gcsafe,raises: [CatchableError].} =
{.gcsafe,raises: [BlockNotFound].} =
let db = xp.chain.com.db
for tx in db.getBlockBody(blockHash).transactions:
kq.remTxs[tx.itemID] = true

View File

@ -71,7 +71,7 @@ proc validateHeader(
if header.gasLimit > GAS_LIMIT_MAXIMUM:
return err("gasLimit exceeds GAS_LIMIT_MAXIMUM")
if com.daoForkSupport and inDAOExtraRange(header.number):
if header.extraData != daoForkBlockExtraData:
return err("header extra data should be marked DAO")

View File

@ -241,8 +241,7 @@ proc markCanonicalChain(
db: CoreDbRef;
header: BlockHeader;
headerHash: Hash256;
): bool
{.gcsafe, raises: [RlpError].} =
): bool =
## mark this chain as canonical by adding block number to hash lookup
## down to forking point
var
@ -266,6 +265,13 @@ proc markCanonicalChain(
if not db.getBlockHeader(currHeader.parentHash, currHeader):
return false
template rlpDecodeOrZero(data: openArray[byte]): Hash256 =
try:
rlp.decode(data, Hash256)
except RlpError as exc:
warn logTxt "markCanonicalChain()", key, action="put()", error=exc.msg
Hash256()
while currHash != Hash256():
let key = blockNumberToHashKey(currHeader.number)
let data = kvt.getOrEmpty(key.toOpenArray).valueOr:
@ -275,7 +281,7 @@ proc markCanonicalChain(
# not marked, mark it
kvt.put(key.toOpenArray, rlp.encode(currHash)).isOkOr:
warn logTxt "markCanonicalChain()", key, action="put()", error=($$error)
elif rlp.decode(data, Hash256) != currHash:
elif rlpDecodeOrZero(data) != currHash:
# replace prev chain
kvt.put(key.toOpenArray, rlp.encode(currHash)).isOkOr:
warn logTxt "markCanonicalChain()", key, action="put()", error=($$error)
@ -653,22 +659,22 @@ proc getBlockBody*(
db: CoreDbRef;
header: BlockHeader;
output: var BlockBody;
): bool
{.gcsafe, raises: [RlpError].} =
output.transactions = db.getTransactions(header.txRoot)
output.uncles = db.getUncles(header.ommersHash)
): bool =
try:
output.transactions = db.getTransactions(header.txRoot)
output.uncles = db.getUncles(header.ommersHash)
if header.withdrawalsRoot.isSome:
output.withdrawals = Opt.some(db.getWithdrawals(header.withdrawalsRoot.get))
true
if header.withdrawalsRoot.isSome:
output.withdrawals = Opt.some(db.getWithdrawals(header.withdrawalsRoot.get))
true
except RlpError:
false
proc getBlockBody*(
db: CoreDbRef;
blockHash: Hash256;
output: var BlockBody;
): bool
{.gcsafe, raises: [RlpError].} =
): bool =
var header: BlockHeader
if db.getBlockHeader(blockHash, header):
return db.getBlockBody(header, output)
@ -677,15 +683,15 @@ proc getBlockBody*(
db: CoreDbRef;
hash: Hash256;
): BlockBody
{.gcsafe, raises: [RlpError,ValueError].} =
{.gcsafe, raises: [BlockNotFound].} =
if not db.getBlockBody(hash, result):
raise newException(ValueError, "Error when retrieving block body")
raise newException(BlockNotFound, "Error when retrieving block body")
proc getEthBlock*(
db: CoreDbRef;
hash: Hash256;
): EthBlock
{.gcsafe, raises: [BlockNotFound, RlpError,ValueError].} =
{.gcsafe, raises: [BlockNotFound].} =
var
header = db.getBlockHeader(hash)
blockBody = db.getBlockBody(hash)
@ -695,7 +701,7 @@ proc getEthBlock*(
db: CoreDbRef;
blockNumber: BlockNumber;
): EthBlock
{.gcsafe, raises: [BlockNotFound, RlpError,ValueError].} =
{.gcsafe, raises: [BlockNotFound].} =
var
header = db.getBlockHeader(blockNumber)
headerHash = header.blockHash
@ -706,7 +712,7 @@ proc getUncleHashes*(
db: CoreDbRef;
blockHashes: openArray[Hash256];
): seq[Hash256]
{.gcsafe, raises: [RlpError,ValueError].} =
{.gcsafe, raises: [BlockNotFound].} =
for blockHash in blockHashes:
result &= db.getBlockBody(blockHash).uncles.mapIt(it.rlpHash)
@ -749,8 +755,7 @@ proc headerExists*(db: CoreDbRef; blockHash: Hash256): bool =
proc setHead*(
db: CoreDbRef;
blockHash: Hash256;
): bool
{.gcsafe, raises: [RlpError].} =
): bool =
var header: BlockHeader
if not db.getBlockHeader(blockHash, header):
return false
@ -767,8 +772,7 @@ proc setHead*(
db: CoreDbRef;
header: BlockHeader;
writeHeader = false;
): bool
{.gcsafe, raises: [RlpError].} =
): bool =
var headerHash = rlpHash(header)
let kvt = db.ctx.getKvt()
if writeHeader:

View File

@ -13,7 +13,7 @@
##
import
std/[sequtils, tables],
std/tables,
results,
./kvt_desc,
./kvt_desc/desc_backend,
@ -78,7 +78,8 @@ proc deltaPersistent*(
# Store structural single trie entries
let writeBatch = ? be.putBegFn()
be.putKvpFn(writeBatch, db.balancer.sTab.pairs.toSeq)
for k,v in db.balancer.sTab:
be.putKvpFn(writeBatch, k, v)
? be.putEndFn writeBatch
# Done with balancer, all saved to backend

View File

@ -40,7 +40,7 @@ type
## Generic transaction initialisation function
PutKvpFn* =
proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) {.gcsafe, raises: [].}
proc(hdl: PutHdlRef; k, v: openArray[byte]) {.gcsafe, raises: [].}
## Generic backend database bulk storage function.
PutEndFn* =

View File

@ -101,14 +101,13 @@ proc putBegFn(db: MemBackendRef): PutBegFn =
proc putKvpFn(db: MemBackendRef): PutKvpFn =
result =
proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) =
proc(hdl: PutHdlRef; k, v: openArray[byte]) =
let hdl = hdl.getSession db
if hdl.error == KvtError(0):
for (k,v) in kvps:
if k.isValid:
hdl.tab[k] = v
else:
hdl.error = KeyInvalid
if k.len > 0:
hdl.tab[@k] = @v
else:
hdl.tab.del @k
proc putEndFn(db: MemBackendRef): PutEndFn =
result =

View File

@ -119,14 +119,14 @@ proc putBegFn(db: RdbBackendRef): PutBegFn =
proc putKvpFn(db: RdbBackendRef): PutKvpFn =
result =
proc(hdl: PutHdlRef; kvps: openArray[(Blob,Blob)]) =
proc(hdl: PutHdlRef; k, v: openArray[byte]) =
let hdl = hdl.getSession db
if hdl.error == KvtError(0):
# Collect batch session arguments
db.rdb.put(kvps).isOkOr:
hdl.error = error[1]
hdl.info = error[2]
db.rdb.put(k, v).isOkOr:
hdl.error = error[0]
hdl.info = error[1]
return

View File

@ -68,21 +68,20 @@ proc commit*(rdb: var RdbInst): Result[void,(KvtError,string)] =
proc put*(
rdb: RdbInst;
data: openArray[(Blob,Blob)];
): Result[void,(Blob,KvtError,string)] =
for (key,val) in data:
if val.len == 0:
rdb.session.delete(key, rdb.store[KvtGeneric].handle()).isOkOr:
const errSym = RdbBeDriverDelError
when extraTraceMessages:
trace logTxt "del", key, error=errSym, info=error
return err((key,errSym,error))
else:
rdb.session.put(key, val, rdb.store[KvtGeneric].handle()).isOkOr:
const errSym = RdbBeDriverPutError
when extraTraceMessages:
trace logTxt "put", key, error=errSym, info=error
return err((key,errSym,error))
key, val: openArray[byte];
): Result[void,(KvtError,string)] =
if val.len == 0:
rdb.session.delete(key, rdb.store[KvtGeneric].handle()).isOkOr:
const errSym = RdbBeDriverDelError
when extraTraceMessages:
trace logTxt "del", key, error=errSym, info=error
return err((errSym,error))
else:
rdb.session.put(key, val, rdb.store[KvtGeneric].handle()).isOkOr:
const errSym = RdbBeDriverPutError
when extraTraceMessages:
trace logTxt "put", key, error=errSym, info=error
return err((errSym,error))
ok()
# ------------------------------------------------------------------------------

View File

@ -224,7 +224,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
# First it sets the initial lower bound to `firstSlotAfterMerge` + number of blocks after Era1
# Then it iterates over the slots to find the current slot number, along with reducing the
# search space by calculating the difference between the `blockNumber` and the `block_number` from the executionPayload
# of the slot, then adding the difference to the importedSlot. This pushes the lower bound more,
# of the slot, then adding the difference to the importedSlot. This pushes the lower bound more,
# making the search way smaller
template updateLastImportedSlot(
era: EraDB,
@ -232,7 +232,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
historical_summaries: openArray[HistoricalSummary],
) =
if blockNumber > 1:
# Setting the initial lower bound
# Setting the initial lower bound
importedSlot = (blockNumber - lastEra1Block) + firstSlotAfterMerge
notice "Finding slot number after resuming import", importedSlot
@ -240,13 +240,13 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
var clNum = 0'u64
while clNum < blockNumber:
let clBlock = getBlockFromEra(
let blk = getEthBlockFromEra(
era, historical_roots, historical_summaries, Slot(importedSlot), clConfig.cfg
).valueOr:
importedSlot += 1
continue
clNum = getEth1BlockNumber(clBlock)
clNum = blk.header.number
# decreasing the lower bound with each iteration
importedSlot += blockNumber - clNum
@ -273,7 +273,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
break
imported += 1
blocks.add move(blk)
blocks.add blk
if blocks.lenu64 mod conf.chunkSize == 0:
process()
@ -304,7 +304,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
importedSlot = firstSlotAfterMerge
while running and imported < conf.maxBlocks and importedSlot < endSlot:
let clblock = getBlockFromEra(
var blk = getEthBlockFromEra(
eraDB,
historical_roots.asSeq(),
historical_summaries.asSeq(),
@ -314,7 +314,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
importedSlot += 1
continue
blocks.add getEth1Block(clblock)
blocks.add blk
imported += 1
importedSlot += 1

View File

@ -519,7 +519,7 @@ proc setupEthRpc*(
hash: Hash256,
header: BlockHeader,
opts: FilterOptions): seq[FilterLog]
{.gcsafe, raises: [RlpError,ValueError].} =
{.gcsafe, raises: [RlpError,BlockNotFound].} =
if headerBloomFilter(header, opts.address, opts.topics):
let blockBody = chain.getBlockBody(hash)
let receipts = chain.getReceipts(header.receiptsRoot)
@ -538,7 +538,7 @@ proc setupEthRpc*(
start: common.BlockNumber,
finish: common.BlockNumber,
opts: FilterOptions): seq[FilterLog]
{.gcsafe, raises: [RlpError,ValueError].} =
{.gcsafe, raises: [RlpError,BlockNotFound].} =
var logs = newSeq[FilterLog]()
var i = start
while i <= finish:

View File

@ -405,19 +405,16 @@ method getBlockBodies*(ctx: EthWireRef,
hashes: openArray[Hash256]):
Result[seq[BlockBody], string]
{.gcsafe.} =
try:
let db = ctx.db
var body: BlockBody
var list: seq[BlockBody]
for blockHash in hashes:
if db.getBlockBody(blockHash, body):
list.add body
else:
list.add BlockBody()
trace "handlers.getBlockBodies: blockBody not found", blockHash
return ok(list)
except RlpError as exc:
return err(exc.msg)
let db = ctx.db
var body: BlockBody
var list: seq[BlockBody]
for blockHash in hashes:
if db.getBlockBody(blockHash, body):
list.add body
else:
list.add BlockBody()
trace "handlers.getBlockBodies: blockBody not found", blockHash
return ok(list)
method getBlockHeaders*(ctx: EthWireRef,
req: BlocksRequest):

View File

@ -95,23 +95,6 @@ proc loadHistoricalRootsFromEra*(
)
)
proc getBlockFromEra*(
db: EraDB,
historical_roots: openArray[Eth2Digest],
historical_summaries: openArray[HistoricalSummary],
slot: Slot,
cfg: RuntimeConfig,
): Opt[ForkedTrustedSignedBeaconBlock] =
let fork = cfg.consensusForkAtEpoch(slot.epoch)
result.ok(ForkedTrustedSignedBeaconBlock(kind: fork))
withBlck(result.get()):
type T = type(forkyBlck)
forkyBlck = db.getBlock(
historical_roots, historical_summaries, slot, Opt[Eth2Digest].err(), T
).valueOr:
result.err()
return
proc getTxs*(txs: seq[bellatrix.Transaction]): seq[common.Transaction] =
var transactions = newSeqOfCap[common.Transaction](txs.len)
for tx in txs:
@ -134,17 +117,7 @@ proc getWithdrawals*(x: seq[capella.Withdrawal]): seq[common.Withdrawal] =
)
return withdrawals
# This is a helper function to get the eth1 block number
# from a beacon block in slot finding process
proc getEth1BlockNumber*(blck: ForkedTrustedSignedBeaconBlock): uint64 =
## Get the eth1 block number from a beacon block.
## This does not check for pre/post merge, despite having only
## post merge meaning
withBlck(blck):
when consensusFork >= ConsensusFork.Bellatrix:
return forkyBlck.message.body.execution_payload.block_number
proc getEth1Block*(blck: ForkedTrustedSignedBeaconBlock): EthBlock =
proc getEthBlock*(blck: ForkedTrustedSignedBeaconBlock): Opt[EthBlock] =
## Convert a beacon block to an eth1 block.
withBlck(blck):
when consensusFork >= ConsensusFork.Bellatrix:
@ -199,6 +172,23 @@ proc getEth1Block*(blck: ForkedTrustedSignedBeaconBlock): EthBlock =
excessBlobGas: excessBlobGas,
parentBeaconBlockRoot: parentBeaconBlockRoot,
)
return EthBlock(
return Opt.some EthBlock(
header: header, transactions: txs, uncles: @[], withdrawals: ethWithdrawals
)
proc getEthBlockFromEra*(
db: EraDB,
historical_roots: openArray[Eth2Digest],
historical_summaries: openArray[HistoricalSummary],
slot: Slot,
cfg: RuntimeConfig,
): Opt[EthBlock] =
let fork = cfg.consensusForkAtEpoch(slot.epoch)
var tmp = ForkedTrustedSignedBeaconBlock(kind: fork)
withBlck(tmp):
type T = type(forkyBlck)
forkyBlck = db.getBlock(
historical_roots, historical_summaries, slot, Opt[Eth2Digest].err(), T
).valueOr:
return Opt.none(EthBlock)
getEthBlock(tmp)