mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-11 14:54:12 +00:00
Store finalized block roots in database (3s startup) (#3320)
* Store finalized block roots in database (3s startup) When the chain has finalized a checkpoint, the history from that point onwards becomes linear - this is exploited in `.era` files to allow constant-time by-slot lookups. In the database, we can do the same by storing finalized block roots in a simple sparse table indexed by slot, bringing the two representations closer to each other in terms of conceptual layout and performance. Doing so has a number of interesting effects: * mainnet startup time is improved 3-5x (3s on my laptop) * the _first_ startup might take slightly longer as the new index is being built - ~10s on the same laptop * we no longer rely on the beacon block summaries to load the full dag - this is a lot faster because we no longer have to look up each block by parent root * a collateral benefit is that we no longer need to load the full summaries table into memory - we get the RSS benefits of #3164 without the CPU hit. Other random stuff: * simplify forky block generics * fix withManyWrites multiple evaluation * fix validator key cache not being updated properly in chaindag read-only mode * drop pre-altair summaries from `kvstore` * recreate missing summaries from altair+ blocks as well (in case database has lost some to an involuntary restart) * print database startup timings in chaindag load log * avoid allocating superfluos state at startup * use a recursive sql query to load the summaries of the unfinalized blocks
This commit is contained in:
parent
0051af430b
commit
d583e8e4ac
@ -152,6 +152,11 @@ OK: 3/3 Fail: 0/3 Skip: 0/3
|
||||
+ addExitMessage/getVoluntaryExitMessage OK
|
||||
```
|
||||
OK: 3/3 Fail: 0/3 Skip: 0/3
|
||||
## FinalizedBlocks [Preset: mainnet]
|
||||
```diff
|
||||
+ Basic ops [Preset: mainnet] OK
|
||||
```
|
||||
OK: 1/1 Fail: 0/1 Skip: 0/1
|
||||
## Fork Choice + Finality [Preset: mainnet]
|
||||
```diff
|
||||
+ fork_choice - testing finality #01 OK
|
||||
@ -457,4 +462,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
|
||||
OK: 1/1 Fail: 0/1 Skip: 0/1
|
||||
|
||||
---TOTAL---
|
||||
OK: 251/255 Fail: 0/255 Skip: 4/255
|
||||
OK: 252/256 Fail: 0/256 Skip: 4/256
|
||||
|
@ -29,6 +29,15 @@ type
|
||||
selectStmt: SqliteStmt[int64, openArray[byte]]
|
||||
recordCount: int64
|
||||
|
||||
FinalizedBlocks* = object
|
||||
# A sparse version of DbSeq - can have holes but not duplicate entries
|
||||
insertStmt: SqliteStmt[(int64, array[32, byte]), void]
|
||||
selectStmt: SqliteStmt[int64, array[32, byte]]
|
||||
selectAllStmt: SqliteStmt[NoParams, (int64, array[32, byte])]
|
||||
|
||||
low*: Opt[Slot]
|
||||
high*: Opt[Slot]
|
||||
|
||||
DepositsSeq = DbSeq[DepositData]
|
||||
|
||||
DepositContractSnapshot* = object
|
||||
@ -113,6 +122,8 @@ type
|
||||
|
||||
summaries: KvStoreRef # BlockRoot -> BeaconBlockSummary
|
||||
|
||||
finalizedBlocks*: FinalizedBlocks
|
||||
|
||||
DbKeyKind = enum
|
||||
kHashToState
|
||||
kHashToBlock
|
||||
@ -264,6 +275,82 @@ proc get*[T](s: DbSeq[T], idx: int64): T =
|
||||
let found = queryRes.expectDb()
|
||||
if not found: panic()
|
||||
|
||||
proc init*(T: type FinalizedBlocks, db: SqStoreRef, name: string): KvResult[T] =
|
||||
? db.exec("""
|
||||
CREATE TABLE IF NOT EXISTS """ & name & """(
|
||||
id INTEGER PRIMARY KEY,
|
||||
value BLOB NOT NULL
|
||||
);
|
||||
""")
|
||||
|
||||
let
|
||||
insertStmt = db.prepareStmt(
|
||||
"REPLACE INTO " & name & "(id, value) VALUES (?, ?);",
|
||||
(int64, array[32, byte]), void, managed = false).expect("this is a valid statement")
|
||||
|
||||
selectStmt = db.prepareStmt(
|
||||
"SELECT value FROM " & name & " WHERE id = ?;",
|
||||
int64, array[32, byte], managed = false).expect("this is a valid statement")
|
||||
selectAllStmt = db.prepareStmt(
|
||||
"SELECT id, value FROM " & name & " ORDER BY id;",
|
||||
NoParams, (int64, array[32, byte]), managed = false).expect("this is a valid statement")
|
||||
|
||||
maxIdStmt = db.prepareStmt(
|
||||
"SELECT MAX(id) FROM " & name & ";",
|
||||
NoParams, Option[int64], managed = false).expect("this is a valid statement")
|
||||
|
||||
minIdStmt = db.prepareStmt(
|
||||
"SELECT MIN(id) FROM " & name & ";",
|
||||
NoParams, Option[int64], managed = false).expect("this is a valid statement")
|
||||
|
||||
var
|
||||
low, high: Opt[Slot]
|
||||
tmp: Option[int64]
|
||||
|
||||
for rowRes in minIdStmt.exec(tmp):
|
||||
expectDb rowRes
|
||||
if tmp.isSome():
|
||||
low.ok(Slot(tmp.get()))
|
||||
|
||||
for rowRes in maxIdStmt.exec(tmp):
|
||||
expectDb rowRes
|
||||
if tmp.isSome():
|
||||
high.ok(Slot(tmp.get()))
|
||||
|
||||
maxIdStmt.dispose()
|
||||
minIdStmt.dispose()
|
||||
|
||||
ok(T(insertStmt: insertStmt,
|
||||
selectStmt: selectStmt,
|
||||
selectAllStmt: selectAllStmt,
|
||||
low: low,
|
||||
high: high))
|
||||
|
||||
proc close*(s: FinalizedBlocks) =
|
||||
s.insertStmt.dispose()
|
||||
s.selectStmt.dispose()
|
||||
s.selectAllStmt.dispose()
|
||||
|
||||
proc insert*(s: var FinalizedBlocks, slot: Slot, val: Eth2Digest) =
|
||||
doAssert slot.uint64 < int64.high.uint64, "Only reasonable slots supported"
|
||||
s.insertStmt.exec((slot.int64, val.data)).expectDb()
|
||||
s.low.ok(min(slot, s.low.get(slot)))
|
||||
s.high.ok(max(slot, s.high.get(slot)))
|
||||
|
||||
proc get*(s: FinalizedBlocks, idx: Slot): Opt[Eth2Digest] =
|
||||
var row: s.selectStmt.Result
|
||||
for rowRes in s.selectStmt.exec(int64(idx), row):
|
||||
expectDb rowRes
|
||||
return ok(Eth2Digest(data: row))
|
||||
|
||||
err()
|
||||
|
||||
iterator pairs*(s: FinalizedBlocks): (Slot, Eth2Digest) =
|
||||
var row: s.selectAllStmt.Result
|
||||
for rowRes in s.selectAllStmt.exec(row):
|
||||
expectDb rowRes
|
||||
yield (Slot(row[0]), Eth2Digest(data: row[1]))
|
||||
|
||||
proc loadImmutableValidators(vals: DbSeq[ImmutableValidatorDataDb2]): seq[ImmutableValidatorData2] =
|
||||
result = newSeqOfCap[ImmutableValidatorData2](vals.len())
|
||||
for i in 0 ..< vals.len:
|
||||
@ -272,23 +359,23 @@ proc loadImmutableValidators(vals: DbSeq[ImmutableValidatorDataDb2]): seq[Immuta
|
||||
pubkey: tmp.pubkey.loadValid(),
|
||||
withdrawal_credentials: tmp.withdrawal_credentials)
|
||||
|
||||
template withManyWrites*(db: BeaconChainDB, body: untyped) =
|
||||
template withManyWrites*(dbParam: BeaconChainDB, body: untyped) =
|
||||
# We don't enforce strong ordering or atomicity requirements in the beacon
|
||||
# chain db in general, relying instead on readers to be able to deal with
|
||||
# minor inconsistencies - however, putting writes in a transaction is orders
|
||||
# of magnitude faster when doing many small writes, so we use this as an
|
||||
# optimization technique and the templace is named accordingly.
|
||||
mixin expectDb
|
||||
db.db.exec("BEGIN TRANSACTION;").expectDb()
|
||||
let db = dbParam
|
||||
expectDb db.db.exec("BEGIN TRANSACTION;")
|
||||
var commit = false
|
||||
try:
|
||||
body
|
||||
commit = true
|
||||
finally:
|
||||
if commit:
|
||||
db.db.exec("COMMIT TRANSACTION;").expectDb()
|
||||
expectDb db.db.exec("COMMIT TRANSACTION;")
|
||||
else:
|
||||
db.db.exec("ROLLBACK TRANSACTION;").expectDb()
|
||||
expectDb db.db.exec("ROLLBACK TRANSACTION;")
|
||||
|
||||
proc new*(T: type BeaconChainDB,
|
||||
dir: string,
|
||||
@ -339,6 +426,7 @@ proc new*(T: type BeaconChainDB,
|
||||
mergeStatesNoVal = kvStore db.openKvStore("merge_state_no_validators").expectDb()
|
||||
stateDiffs = kvStore db.openKvStore("state_diffs").expectDb()
|
||||
summaries = kvStore db.openKvStore("beacon_block_summaries", true).expectDb()
|
||||
finalizedBlocks = FinalizedBlocks.init(db, "finalized_blocks").expectDb()
|
||||
|
||||
# `immutable_validators` stores validator keys in compressed format - this is
|
||||
# slow to load and has been superceded by `immutable_validators2` which uses
|
||||
@ -378,6 +466,7 @@ proc new*(T: type BeaconChainDB,
|
||||
mergeStatesNoVal: mergeStatesNoVal,
|
||||
stateDiffs: stateDiffs,
|
||||
summaries: summaries,
|
||||
finalizedBlocks: finalizedBlocks,
|
||||
)
|
||||
|
||||
proc decodeSSZ[T](data: openArray[byte], output: var T): bool =
|
||||
@ -484,6 +573,7 @@ proc close*(db: BeaconchainDB) =
|
||||
if db.db == nil: return
|
||||
|
||||
# Close things in reverse order
|
||||
db.finalizedBlocks.close()
|
||||
discard db.summaries.close()
|
||||
discard db.stateDiffs.close()
|
||||
discard db.mergeStatesNoVal.close()
|
||||
@ -494,6 +584,7 @@ proc close*(db: BeaconchainDB) =
|
||||
discard db.altairBlocks.close()
|
||||
discard db.blocks.close()
|
||||
discard db.keyValues.close()
|
||||
|
||||
db.immutableValidatorsDb.close()
|
||||
db.genesisDeposits.close()
|
||||
db.v0.close()
|
||||
@ -501,7 +592,7 @@ proc close*(db: BeaconchainDB) =
|
||||
|
||||
db.db = nil
|
||||
|
||||
func toBeaconBlockSummary*(v: SomeSomeBeaconBlock): BeaconBlockSummary =
|
||||
func toBeaconBlockSummary*(v: SomeForkyBeaconBlock): BeaconBlockSummary =
|
||||
BeaconBlockSummary(
|
||||
slot: v.slot,
|
||||
parent_root: v.parent_root,
|
||||
@ -535,9 +626,10 @@ proc updateImmutableValidators*(
|
||||
while db.immutableValidators.len() < numValidators:
|
||||
let immutableValidator =
|
||||
getImmutableValidatorData(validators[db.immutableValidators.len()])
|
||||
db.immutableValidatorsDb.add ImmutableValidatorDataDb2(
|
||||
pubkey: immutableValidator.pubkey.toUncompressed(),
|
||||
withdrawal_credentials: immutableValidator.withdrawal_credentials)
|
||||
if not db.db.readOnly:
|
||||
db.immutableValidatorsDb.add ImmutableValidatorDataDb2(
|
||||
pubkey: immutableValidator.pubkey.toUncompressed(),
|
||||
withdrawal_credentials: immutableValidator.withdrawal_credentials)
|
||||
db.immutableValidators.add immutableValidator
|
||||
|
||||
template toBeaconStateNoImmutableValidators(state: phase0.BeaconState):
|
||||
@ -571,6 +663,7 @@ proc putState*(db: BeaconChainDB, state: ForkyHashedBeaconState) =
|
||||
db.withManyWrites:
|
||||
db.putStateRoot(state.latest_block_root(), state.data.slot, state.root)
|
||||
db.putState(state.root, state.data)
|
||||
|
||||
# For testing rollback
|
||||
proc putCorruptPhase0State*(db: BeaconChainDB, key: Eth2Digest) =
|
||||
db.statesNoVal.putSnappySSZ(key.data, Validator())
|
||||
@ -597,15 +690,17 @@ proc putStateDiff*(db: BeaconChainDB, root: Eth2Digest, value: BeaconStateDiff)
|
||||
db.stateDiffs.putSnappySSZ(root.data, value)
|
||||
|
||||
proc delBlock*(db: BeaconChainDB, key: Eth2Digest) =
|
||||
db.blocks.del(key.data).expectDb()
|
||||
db.altairBlocks.del(key.data).expectDb()
|
||||
db.mergeBlocks.del(key.data).expectDb()
|
||||
db.summaries.del(key.data).expectDb()
|
||||
db.withManyWrites:
|
||||
db.blocks.del(key.data).expectDb()
|
||||
db.altairBlocks.del(key.data).expectDb()
|
||||
db.mergeBlocks.del(key.data).expectDb()
|
||||
db.summaries.del(key.data).expectDb()
|
||||
|
||||
proc delState*(db: BeaconChainDB, key: Eth2Digest) =
|
||||
db.statesNoVal.del(key.data).expectDb()
|
||||
db.altairStatesNoVal.del(key.data).expectDb()
|
||||
db.mergeStatesNoVal.del(key.data).expectDb()
|
||||
db.withManyWrites:
|
||||
db.statesNoVal.del(key.data).expectDb()
|
||||
db.altairStatesNoVal.del(key.data).expectDb()
|
||||
db.mergeStatesNoVal.del(key.data).expectDb()
|
||||
|
||||
proc delStateRoot*(db: BeaconChainDB, root: Eth2Digest, slot: Slot) =
|
||||
db.stateRoots.del(stateRootKey(root, slot)).expectDb()
|
||||
@ -915,6 +1010,14 @@ iterator getAncestors*(db: BeaconChainDB, root: Eth2Digest):
|
||||
yield res
|
||||
root = res.message.parent_root
|
||||
|
||||
proc getBeaconBlockSummary*(db: BeaconChainDB, root: Eth2Digest):
|
||||
Opt[BeaconBlockSummary] =
|
||||
var summary: BeaconBlockSummary
|
||||
if db.summaries.getSSZ(root.data, summary) == GetResult.found:
|
||||
ok(summary)
|
||||
else:
|
||||
err()
|
||||
|
||||
proc loadSummaries*(db: BeaconChainDB): Table[Eth2Digest, BeaconBlockSummary] =
|
||||
# Load summaries into table - there's no telling what order they're in so we
|
||||
# load them all - bugs in nim prevent this code from living in the iterator.
|
||||
@ -934,60 +1037,80 @@ proc loadSummaries*(db: BeaconChainDB): Table[Eth2Digest, BeaconBlockSummary] =
|
||||
type RootedSummary = tuple[root: Eth2Digest, summary: BeaconBlockSummary]
|
||||
iterator getAncestorSummaries*(db: BeaconChainDB, root: Eth2Digest):
|
||||
RootedSummary =
|
||||
## Load a chain of ancestors for blck - returns a list of blocks with the
|
||||
## oldest block last (blck will be at result[0]).
|
||||
## Load a chain of ancestors for blck - iterates over the block starting from
|
||||
## root and moving parent by parent
|
||||
##
|
||||
## The search will go on until the ancestor cannot be found.
|
||||
## The search will go on until an ancestor cannot be found.
|
||||
|
||||
# Summaries are loaded from the dedicated summaries table. For backwards
|
||||
# compatibility, we also load from `kvstore` and finally, if no summaries
|
||||
# can be found, by loading the blocks instead.
|
||||
|
||||
# First, load the full summary table into memory in one query - this makes
|
||||
# initial startup very fast.
|
||||
var
|
||||
summaries = db.loadSummaries()
|
||||
res: RootedSummary
|
||||
blck: phase0.TrustedSignedBeaconBlock
|
||||
newSummaries: seq[RootedSummary]
|
||||
|
||||
res.root = root
|
||||
|
||||
# Yield summaries in reverse chain order by walking the parent references.
|
||||
# If a summary is missing, try loading it from the older version or create one
|
||||
# from block data.
|
||||
|
||||
const summariesQuery = """
|
||||
WITH RECURSIVE
|
||||
next(v) as (
|
||||
SELECT value FROM beacon_block_summaries
|
||||
WHERE `key` == ?
|
||||
|
||||
UNION ALL
|
||||
SELECT value FROM beacon_block_summaries
|
||||
INNER JOIN next ON `key` == substr(v, 9, 32)
|
||||
)
|
||||
SELECT v FROM next;
|
||||
"""
|
||||
let
|
||||
stmt = expectDb db.db.prepareStmt(
|
||||
summariesQuery, array[32, byte],
|
||||
array[sizeof(BeaconBlockSummary), byte],
|
||||
managed = false)
|
||||
|
||||
defer: # in case iteration is stopped along the way
|
||||
# Write the newly found summaries in a single transaction - on first migration
|
||||
# from the old format, this brings down the write from minutes to seconds
|
||||
stmt.dispose()
|
||||
|
||||
if newSummaries.len() > 0:
|
||||
db.withManyWrites:
|
||||
for s in newSummaries:
|
||||
db.putBeaconBlockSummary(s.root, s.summary)
|
||||
|
||||
if false:
|
||||
# When the current version has been online for a bit, we can safely remove
|
||||
# summaries from kvstore by enabling this little snippet - if users were
|
||||
# to downgrade after the summaries have been purged, the old versions that
|
||||
# use summaries can also recreate them on the fly from blocks.
|
||||
db.db.exec(
|
||||
"DELETE FROM kvstore WHERE key >= ? and key < ?",
|
||||
([byte ord(kHashToBlockSummary)], [byte ord(kHashToBlockSummary) + 1])).expectDb()
|
||||
|
||||
# Yield summaries in reverse chain order by walking the parent references.
|
||||
# If a summary is missing, try loading it from the older version or create one
|
||||
# from block data.
|
||||
while true:
|
||||
summaries.withValue(res.root, summary) do:
|
||||
res.summary = summary[]
|
||||
# Clean up pre-altair summaries - by now, we will have moved them to the
|
||||
# new table
|
||||
db.db.exec(
|
||||
"DELETE FROM kvstore WHERE key >= ? and key < ?",
|
||||
([byte ord(kHashToBlockSummary)], [byte ord(kHashToBlockSummary) + 1])).expectDb()
|
||||
var row: stmt.Result
|
||||
for rowRes in exec(stmt, root.data, row):
|
||||
expectDb rowRes
|
||||
if decodeSSZ(row, res.summary):
|
||||
yield res
|
||||
do: # Summary was not found in summary table, look elsewhere
|
||||
if db.v0.backend.getSnappySSZ(subkey(BeaconBlockSummary, res.root), res.summary) == GetResult.found:
|
||||
yield res
|
||||
elif db.v0.backend.getSnappySSZ(
|
||||
subkey(phase0.SignedBeaconBlock, res.root), blck) == GetResult.found:
|
||||
res.summary = blck.message.toBeaconBlockSummary()
|
||||
yield res
|
||||
else:
|
||||
break
|
||||
# Next time, load them from the right place
|
||||
newSummaries.add(res)
|
||||
res.root = res.summary.parent_root
|
||||
|
||||
# Backwards compat for reading old databases, or those that for whatever
|
||||
# reason lost a summary along the way..
|
||||
while true:
|
||||
if db.v0.backend.getSnappySSZ(
|
||||
subkey(BeaconBlockSummary, res.root), res.summary) == GetResult.found:
|
||||
discard # Just yield below
|
||||
elif (let blck = db.getPhase0Block(res.root); blck.isSome()):
|
||||
res.summary = blck.get().message.toBeaconBlockSummary()
|
||||
elif (let blck = db.getAltairBlock(res.root); blck.isSome()):
|
||||
res.summary = blck.get().message.toBeaconBlockSummary()
|
||||
elif (let blck = db.getMergeBlock(res.root); blck.isSome()):
|
||||
res.summary = blck.get().message.toBeaconBlockSummary()
|
||||
else:
|
||||
break
|
||||
|
||||
yield res
|
||||
|
||||
# Next time, load them from the right place
|
||||
newSummaries.add(res)
|
||||
|
||||
res.root = res.summary.parent_root
|
||||
|
||||
|
@ -355,6 +355,7 @@ proc addBackfillBlock*(
|
||||
|
||||
dag.backfillBlocks[blck.slot.int] = blockRoot
|
||||
dag.backfill = blck.toBeaconBlockSummary()
|
||||
dag.db.finalizedBlocks.insert(blck.slot, blockRoot)
|
||||
|
||||
let putBlockTick = Moment.now
|
||||
debug "Block backfilled",
|
||||
|
@ -65,7 +65,7 @@ func init*(T: type BlockRef, root: Eth2Digest, slot: Slot): BlockRef =
|
||||
bid: BlockId(root: root, slot: slot)
|
||||
)
|
||||
|
||||
func init*(T: type BlockRef, root: Eth2Digest, blck: SomeSomeBeaconBlock):
|
||||
func init*(T: type BlockRef, root: Eth2Digest, blck: SomeForkyBeaconBlock):
|
||||
BlockRef =
|
||||
BlockRef.init(root, blck.slot)
|
||||
|
||||
|
@ -105,6 +105,27 @@ proc updateValidatorKeys*(dag: ChainDAGRef, validators: openArray[Validator]) =
|
||||
# data (but no earlier than that the whole block as been validated)
|
||||
dag.db.updateImmutableValidators(validators)
|
||||
|
||||
proc updateFinalizedBlocks*(dag: ChainDAGRef) =
|
||||
template update(s: Slot) =
|
||||
if s < dag.tail.slot:
|
||||
if dag.backfillBlocks[s.int] != Eth2Digest():
|
||||
dag.db.finalizedBlocks.insert(s, dag.backfillBlocks[s.int])
|
||||
else:
|
||||
let dagIndex = int(s - dag.tail.slot)
|
||||
if not isNil(dag.finalizedBlocks[dagIndex]):
|
||||
dag.db.finalizedBlocks.insert(s, dag.finalizedBlocks[dagIndex].root)
|
||||
|
||||
if not dag.db.db.readOnly: # TODO abstraction leak - where to put this?
|
||||
dag.db.withManyWrites:
|
||||
if dag.db.finalizedBlocks.low.isNone():
|
||||
for s in dag.backfill.slot .. dag.finalizedHead.slot:
|
||||
update(s)
|
||||
else:
|
||||
for s in dag.backfill.slot ..< dag.db.finalizedBlocks.low.get():
|
||||
update(s)
|
||||
for s in dag.db.finalizedBlocks.high.get() + 1 .. dag.finalizedHead.slot:
|
||||
update(s)
|
||||
|
||||
func validatorKey*(
|
||||
dag: ChainDAGRef, index: ValidatorIndex or uint64): Option[CookedPubKey] =
|
||||
## Returns the validator pubkey for the index, assuming it's been observed
|
||||
@ -433,19 +454,59 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||
withBlck(genesisBlock): BlockRef.init(genesisBlockRoot, blck.message)
|
||||
|
||||
var
|
||||
headRef: BlockRef
|
||||
backfillBlocks = newSeq[Eth2Digest](tailRef.slot.int)
|
||||
backfill = BeaconBlockSummary(slot: GENESIS_SLOT)
|
||||
midRef: BlockRef
|
||||
backRoot: Option[Eth2Digest]
|
||||
startTick = Moment.now()
|
||||
|
||||
# Loads blocks in the forward direction - these may or may not be available
|
||||
# in the database
|
||||
for slot, root in db.finalizedBlocks:
|
||||
if slot < tailRef.slot:
|
||||
backfillBlocks[slot.int] = root
|
||||
if backRoot.isNone():
|
||||
backRoot = some(root)
|
||||
elif slot == tailRef.slot:
|
||||
midRef = tailRef
|
||||
elif slot > tailRef.slot:
|
||||
let next = BlockRef.init(root, slot)
|
||||
link(midRef, next)
|
||||
midRef = next
|
||||
|
||||
let finalizedTick = Moment.now()
|
||||
|
||||
var
|
||||
backfillBlocks = newSeq[Eth2Digest](tailRef.slot.int)
|
||||
headRef: BlockRef
|
||||
curRef: BlockRef
|
||||
backfill = BeaconBlockSummary(slot: GENESIS_SLOT)
|
||||
|
||||
# Now load the part from head to finalized in the other direction - these
|
||||
# should meet at the midpoint if we loaded any finalized blocks
|
||||
for blck in db.getAncestorSummaries(headRoot):
|
||||
if midRef != nil and blck.summary.slot == midRef.slot:
|
||||
if midRef.root != blck.root:
|
||||
fatal "Finalized block table does not match ancestor summaries, database corrupt?"
|
||||
quit 1
|
||||
|
||||
if curRef == nil:
|
||||
# When starting from checkpoint, head == tail and there won't be any
|
||||
# blocks in between
|
||||
headRef = tailRef
|
||||
else:
|
||||
link(midRef, curRef)
|
||||
|
||||
# The finalized blocks form a linear history by definition - we can skip
|
||||
# straight to the tail
|
||||
curRef = tailRef
|
||||
break
|
||||
|
||||
if blck.summary.slot < tailRef.slot:
|
||||
backfillBlocks[blck.summary.slot.int] = blck.root
|
||||
backfill = blck.summary
|
||||
if backRoot.isNone():
|
||||
backfill = blck.summary
|
||||
elif blck.summary.slot == tailRef.slot:
|
||||
backfill = blck.summary
|
||||
if backRoot.isNone():
|
||||
backfill = blck.summary
|
||||
|
||||
if curRef == nil:
|
||||
curRef = tailRef
|
||||
@ -454,14 +515,6 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||
link(tailRef, curRef)
|
||||
curRef = curRef.parent
|
||||
else:
|
||||
if curRef == nil:
|
||||
# When the database has been written with a pre-fork version of the
|
||||
# software, it may happen that blocks produced using an "unforked"
|
||||
# chain get written to the database - we need to skip such blocks
|
||||
# when loading the database with a fork-compatible version
|
||||
if not containsBlock(cfg, db, blck.summary.slot, blck.root):
|
||||
continue
|
||||
|
||||
let newRef = BlockRef.init(blck.root, blck.summary.slot)
|
||||
if curRef == nil:
|
||||
curRef = newRef
|
||||
@ -472,61 +525,37 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||
|
||||
trace "Populating block dag", key = curRef.root, val = curRef
|
||||
|
||||
if backRoot.isSome():
|
||||
backfill = db.getBeaconBlockSummary(backRoot.get()).expect(
|
||||
"Backfill block must have a summary")
|
||||
|
||||
let summariesTick = Moment.now()
|
||||
|
||||
if curRef != tailRef:
|
||||
fatal "Head block does not lead to tail - database corrupt?",
|
||||
genesisRef, tailRef, headRef, curRef, tailRoot, headRoot
|
||||
|
||||
quit 1
|
||||
|
||||
while not containsBlock(cfg, db, headRef.slot, headRef.root):
|
||||
# When the database has been written with a pre-fork version of the
|
||||
# software, it may happen that blocks produced using an "unforked"
|
||||
# chain get written to the database - we need to skip such blocks
|
||||
# when loading the database with a fork-compatible version
|
||||
if isNil(headRef.parent):
|
||||
fatal "Cannot find block for head root - database corrupt?",
|
||||
headRef = shortLog(headRef)
|
||||
|
||||
headRef = headRef.parent
|
||||
|
||||
# Because of incorrect hardfork check, there might be no head block, in which
|
||||
# case it's equivalent to the tail block
|
||||
if headRef == nil:
|
||||
headRef = tailRef
|
||||
|
||||
var
|
||||
cur = headRef.atSlot()
|
||||
tmpState = (ref StateData)()
|
||||
|
||||
# Now that we have a head block, we need to find the most recent state that
|
||||
# we have saved in the database
|
||||
while cur.blck != nil and
|
||||
not getStateData(db, cfg, tmpState[], cur, noRollback):
|
||||
cur = cur.parentOrSlot()
|
||||
|
||||
if tmpState.blck == nil:
|
||||
fatal "No state found in head history, database corrupt?",
|
||||
genesisRef, tailRef, headRef, tailRoot, headRoot
|
||||
# TODO Potentially we could recover from here instead of crashing - what
|
||||
# would be a good recovery model?
|
||||
quit 1
|
||||
|
||||
case tmpState.data.kind
|
||||
of BeaconStateFork.Phase0:
|
||||
if tmpState.data.phase0Data.data.fork != genesisFork(cfg):
|
||||
error "State from database does not match network, check --network parameter",
|
||||
genesisRef, tailRef, headRef, tailRoot, headRoot,
|
||||
stateFork = tmpState.data.phase0Data.data.fork,
|
||||
configFork = genesisFork(cfg)
|
||||
quit 1
|
||||
of BeaconStateFork.Altair:
|
||||
if tmpState.data.altairData.data.fork != altairFork(cfg):
|
||||
error "State from database does not match network, check --network parameter",
|
||||
genesisRef, tailRef, headRef, tailRoot, headRoot,
|
||||
stateFork = tmpState.data.altairData.data.fork,
|
||||
configFork = altairFork(cfg)
|
||||
quit 1
|
||||
of BeaconStateFork.Bellatrix:
|
||||
if tmpState.data.bellatrixData.data.fork != bellatrixFork(cfg):
|
||||
error "State from database does not match network, check --network parameter",
|
||||
genesisRef, tailRef, headRef, tailRoot, headRoot,
|
||||
stateFork = tmpState.data.bellatrixData.data.fork,
|
||||
configFork = bellatrixFork(cfg)
|
||||
quit 1
|
||||
|
||||
let dag = ChainDAGRef(
|
||||
db: db,
|
||||
validatorMonitor: validatorMonitor,
|
||||
backfillBlocks: backfillBlocks,
|
||||
genesis: genesisRef,
|
||||
tail: tailRef,
|
||||
backfill: backfill,
|
||||
@ -535,25 +564,56 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||
# Tail is implicitly finalized - we'll adjust it below when computing the
|
||||
# head state
|
||||
heads: @[headRef],
|
||||
headState: tmpState[],
|
||||
epochRefState: tmpState[],
|
||||
clearanceState: tmpState[],
|
||||
|
||||
# The only allowed flag right now is verifyFinalization, as the others all
|
||||
# allow skipping some validation.
|
||||
updateFlags: {verifyFinalization} * updateFlags,
|
||||
cfg: cfg,
|
||||
|
||||
forkDigests: newClone ForkDigests.init(
|
||||
cfg,
|
||||
getStateField(tmpState.data, genesis_validators_root)),
|
||||
|
||||
onBlockAdded: onBlockCb,
|
||||
onHeadChanged: onHeadCb,
|
||||
onReorgHappened: onReorgCb,
|
||||
onFinHappened: onFinCb
|
||||
)
|
||||
|
||||
block: # Initialize dag states
|
||||
var
|
||||
cur = headRef.atSlot()
|
||||
|
||||
# Now that we have a head block, we need to find the most recent state that
|
||||
# we have saved in the database
|
||||
while cur.blck != nil and
|
||||
not getStateData(db, cfg, dag.headState, cur, noRollback):
|
||||
cur = cur.parentOrSlot()
|
||||
|
||||
if dag.headState.blck == nil:
|
||||
fatal "No state found in head history, database corrupt?",
|
||||
genesisRef, tailRef, headRef, tailRoot, headRoot
|
||||
# TODO Potentially we could recover from here instead of crashing - what
|
||||
# would be a good recovery model?
|
||||
quit 1
|
||||
|
||||
let
|
||||
configFork = case dag.headState.data.kind
|
||||
of BeaconStateFork.Phase0: genesisFork(cfg)
|
||||
of BeaconStateFork.Altair: altairFork(cfg)
|
||||
of BeaconStateFork.Bellatrix: bellatrixFork(cfg)
|
||||
statefork = getStateField(dag.headState.data, fork)
|
||||
|
||||
if stateFork != configFork:
|
||||
error "State from database does not match network, check --network parameter",
|
||||
genesisRef, tailRef, headRef, tailRoot, headRoot, stateFork, configFork
|
||||
quit 1
|
||||
|
||||
assign(dag.clearanceState, dag.headState)
|
||||
assign(dag.epochRefState, dag.headState)
|
||||
|
||||
dag.forkDigests = newClone ForkDigests.init(
|
||||
cfg,
|
||||
getStateField(dag.headState.data, genesis_validators_root))
|
||||
|
||||
swap(dag.backfillBlocks, backfillBlocks) # avoid allocating a full copy
|
||||
|
||||
let forkVersions =
|
||||
[cfg.GENESIS_FORK_VERSION, cfg.ALTAIR_FORK_VERSION,
|
||||
cfg.BELLATRIX_FORK_VERSION, cfg.SHARDING_FORK_VERSION]
|
||||
@ -596,27 +656,31 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||
dag.finalizedBlocks[(tmp.slot - dag.tail.slot).int] = tmp
|
||||
tmp = tmp.parent
|
||||
|
||||
let stateTick = Moment.now()
|
||||
|
||||
dag.clearanceState = dag.headState
|
||||
|
||||
# Pruning metadata
|
||||
dag.lastPrunePoint = dag.finalizedHead
|
||||
|
||||
if not dag.db.db.readOnly:
|
||||
# Fill validator key cache in case we're loading an old database that doesn't
|
||||
# have a cache
|
||||
dag.updateValidatorKeys(getStateField(dag.headState.data, validators).asSeq())
|
||||
# Fill validator key cache in case we're loading an old database that doesn't
|
||||
# have a cache
|
||||
dag.updateValidatorKeys(getStateField(dag.headState.data, validators).asSeq())
|
||||
dag.updateFinalizedBlocks()
|
||||
|
||||
withState(dag.headState.data):
|
||||
when stateFork >= BeaconStateFork.Altair:
|
||||
dag.headSyncCommittees = state.data.get_sync_committee_cache(cache)
|
||||
|
||||
info "Block dag initialized",
|
||||
info "Block DAG initialized",
|
||||
head = shortLog(dag.head),
|
||||
finalizedHead = shortLog(dag.finalizedHead),
|
||||
tail = shortLog(dag.tail),
|
||||
finalizedBlocks = dag.finalizedBlocks.len(),
|
||||
forkBlocks = dag.forkBlocks.len(),
|
||||
backfill = (dag.backfill.slot, shortLog(dag.backfill.parent_root))
|
||||
backfill = (dag.backfill.slot, shortLog(dag.backfill.parent_root)),
|
||||
finalizedDur = finalizedTick - startTick,
|
||||
summariesDur = summariesTick - finalizedTick,
|
||||
stateDur = stateTick - summariesTick,
|
||||
indexDur = Moment.now() - stateTick
|
||||
|
||||
dag
|
||||
|
||||
@ -1438,6 +1502,8 @@ proc updateHead*(
|
||||
|
||||
dag.finalizedHead = finalizedHead
|
||||
|
||||
dag.updateFinalizedBlocks()
|
||||
|
||||
beacon_finalized_epoch.set(getStateField(
|
||||
dag.headState.data, finalized_checkpoint).epoch.toGaugeValue)
|
||||
beacon_finalized_root.set(getStateField(
|
||||
|
@ -327,8 +327,6 @@ proc init*(T: type BeaconNode,
|
||||
# Doesn't use std/random directly, but dependencies might
|
||||
randomize(rng[].rand(high(int)))
|
||||
|
||||
info "Loading block dag from database", path = config.databaseDir
|
||||
|
||||
let
|
||||
validatorMonitor = newClone(ValidatorMonitor.init(
|
||||
config.validatorMonitorAuto, config.validatorMonitorTotals))
|
||||
@ -336,6 +334,8 @@ proc init*(T: type BeaconNode,
|
||||
for key in config.validatorMonitorPubkeys:
|
||||
validatorMonitor[].addMonitor(key, none(ValidatorIndex))
|
||||
|
||||
info "Loading block DAG from database", path = config.databaseDir
|
||||
|
||||
let
|
||||
chainDagFlags = if config.verifyFinalization: {verifyFinalization}
|
||||
else: {}
|
||||
|
@ -314,13 +314,6 @@ type
|
||||
SomeBeaconBlock* = BeaconBlock | SigVerifiedBeaconBlock | TrustedBeaconBlock
|
||||
SomeBeaconBlockBody* = BeaconBlockBody | SigVerifiedBeaconBlockBody | TrustedBeaconBlockBody
|
||||
|
||||
# TODO why does this fail?
|
||||
#SomeSomeBeaconBlock* = SomeBeaconBlock | phase0.SomeBeaconBlock
|
||||
SomeSomeBeaconBlock* =
|
||||
BeaconBlock | SigVerifiedBeaconBlock | TrustedBeaconBlock |
|
||||
altair.BeaconBlock | altair.SigVerifiedBeaconBlock | altair.TrustedBeaconBlock |
|
||||
phase0.BeaconBlock | phase0.SigVerifiedBeaconBlock | phase0.TrustedBeaconBlock
|
||||
|
||||
# TODO see above, re why does it fail
|
||||
SomeSomeBeaconBlockBody* =
|
||||
BeaconBlockBody | SigVerifiedBeaconBlockBody | TrustedBeaconBlockBody |
|
||||
|
@ -76,6 +76,11 @@ type
|
||||
altair.TrustedBeaconBlock |
|
||||
bellatrix.TrustedBeaconBlock
|
||||
|
||||
SomeForkyBeaconBlock* =
|
||||
ForkyBeaconBlock |
|
||||
ForkySigVerifiedBeaconBlock |
|
||||
ForkyTrustedBeaconBlock
|
||||
|
||||
ForkedBeaconBlock* = object
|
||||
case kind*: BeaconBlockFork
|
||||
of BeaconBlockFork.Phase0: phase0Data*: phase0.BeaconBlock
|
||||
|
@ -86,7 +86,7 @@ proc verify_epoch_signature*(
|
||||
|
||||
func compute_block_signing_root*(
|
||||
fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot,
|
||||
blck: Eth2Digest | SomeSomeBeaconBlock | BeaconBlockHeader): Eth2Digest =
|
||||
blck: Eth2Digest | SomeForkyBeaconBlock | BeaconBlockHeader): Eth2Digest =
|
||||
let
|
||||
epoch = epoch(slot)
|
||||
domain = get_domain(
|
||||
@ -104,7 +104,7 @@ func get_block_signature*(
|
||||
|
||||
proc verify_block_signature*(
|
||||
fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot,
|
||||
blck: Eth2Digest | SomeSomeBeaconBlock | BeaconBlockHeader,
|
||||
blck: Eth2Digest | SomeForkyBeaconBlock | BeaconBlockHeader,
|
||||
pubkey: ValidatorPubKey | CookedPubKey, signature: SomeSig): bool =
|
||||
withTrust(signature):
|
||||
let
|
||||
|
@ -149,7 +149,7 @@ proc epoch_signature_set*(
|
||||
# See also: verify_block_signature
|
||||
proc block_signature_set*(
|
||||
fork: Fork, genesis_validators_root: Eth2Digest, slot: Slot,
|
||||
blck: Eth2Digest | SomeSomeBeaconBlock | BeaconBlockHeader,
|
||||
blck: Eth2Digest | SomeForkyBeaconBlock | BeaconBlockHeader,
|
||||
pubkey: CookedPubKey, signature: CookedSig): SignatureSet =
|
||||
let signing_root = compute_block_signing_root(
|
||||
fork, genesis_validators_root, slot, blck)
|
||||
|
@ -30,7 +30,7 @@ export extras, phase0, altair
|
||||
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/phase0/beacon-chain.md#block-header
|
||||
func process_block_header*(
|
||||
state: var ForkyBeaconState, blck: SomeSomeBeaconBlock, flags: UpdateFlags,
|
||||
state: var ForkyBeaconState, blck: SomeForkyBeaconBlock, flags: UpdateFlags,
|
||||
cache: var StateCache): Result[void, cstring] =
|
||||
# Verify that the slots match
|
||||
if not (blck.slot == state.slot):
|
||||
|
@ -414,6 +414,7 @@ suite "Beacon chain DB" & preset():
|
||||
|
||||
doAssert toSeq(db.getAncestorSummaries(a0.root)).len == 0
|
||||
doAssert toSeq(db.getAncestorSummaries(a2.root)).len == 0
|
||||
doAssert db.getBeaconBlockSummary(a2.root).isNone()
|
||||
|
||||
db.putBlock(a2)
|
||||
|
||||
@ -422,6 +423,7 @@ suite "Beacon chain DB" & preset():
|
||||
|
||||
doAssert toSeq(db.getAncestorSummaries(a0.root)).len == 0
|
||||
doAssert toSeq(db.getAncestorSummaries(a2.root)).len == 1
|
||||
doAssert db.getBeaconBlockSummary(a2.root).get().slot == a2.message.slot
|
||||
|
||||
db.putBlock(a1)
|
||||
|
||||
@ -481,3 +483,32 @@ suite "Beacon chain DB" & preset():
|
||||
|
||||
check:
|
||||
hash_tree_root(state2[]) == root
|
||||
|
||||
suite "FinalizedBlocks" & preset():
|
||||
test "Basic ops" & preset():
|
||||
var
|
||||
db = SqStoreRef.init("", "test", inMemory = true).expect(
|
||||
"working database (out of memory?)")
|
||||
|
||||
var s = FinalizedBlocks.init(db, "finalized_blocks").get()
|
||||
|
||||
check:
|
||||
s.low.isNone
|
||||
s.high.isNone
|
||||
|
||||
s.insert(Slot 0, Eth2Digest())
|
||||
check:
|
||||
s.low.get() == Slot 0
|
||||
s.high.get() == Slot 0
|
||||
|
||||
s.insert(Slot 5, Eth2Digest())
|
||||
check:
|
||||
s.low.get() == Slot 0
|
||||
s.high.get() == Slot 5
|
||||
|
||||
var items = 0
|
||||
for k, v in s:
|
||||
check: k in [Slot 0, Slot 5]
|
||||
items += 1
|
||||
|
||||
check: items == 2
|
||||
|
2
vendor/nim-eth
vendored
2
vendor/nim-eth
vendored
@ -1 +1 @@
|
||||
Subproject commit 5655bd035cfd7319c6f2e60b7fdefef65a057939
|
||||
Subproject commit 49d1d29fdf4ce090d0e2aa3d6037f0ded324b114
|
Loading…
x
Reference in New Issue
Block a user