cache empty slot state root (#961)

When replaying state transitions, for the slots that have a block, the
state root is taken from the block. For slots that lack a block, it's
currently calculated using hash_tree_root which is expensive.

Caching the empty slot state roots helps us avoid recalculating this
hash, meaning that for replay, hashes are never calculated. This turns
blocks into fairly lightweight "state-diffs"!

* avoid re-saving state when replaying blocks
* advance empty slots slot-by-slot and save root
* fix sim randomness
* fix sim genesis filename
* introduce `isEpoch` to check if a slot is an epoch slot
This commit is contained in:
Jacek Sieka 2020-05-03 19:44:04 +02:00 committed by GitHub
parent c3cdb399c0
commit 2449d4b479
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 124 additions and 71 deletions

3
.gitignore vendored
View File

@ -24,6 +24,9 @@ build/
# State sim # TODO - move in another folder # State sim # TODO - move in another folder
0000-*.json 0000-*.json
*.ssz
*.log
*.sqlite3
/local_testnet_data /local_testnet_data

View File

@ -247,7 +247,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
# time to do so? # time to do so?
network.initBeaconSync(blockPool, enrForkId.forkDigest, network.initBeaconSync(blockPool, enrForkId.forkDigest,
proc(signedBlock: SignedBeaconBlock) = proc(signedBlock: SignedBeaconBlock) =
if signedBlock.message.slot mod SLOTS_PER_EPOCH == 0: if signedBlock.message.slot.isEpoch:
# TODO this is a hack to make sure that lmd ghost is run regularly # TODO this is a hack to make sure that lmd ghost is run regularly
# while syncing blocks - it's poor form to keep it here though - # while syncing blocks - it's poor form to keep it here though -
# the logic should be moved elsewhere # the logic should be moved elsewhere

View File

@ -195,19 +195,13 @@ proc init*(T: type BlockPool, db: BeaconChainDB): BlockPool =
let root = db.getStateRoot(bs.blck.root, bs.slot) let root = db.getStateRoot(bs.blck.root, bs.slot)
if root.isSome(): if root.isSome():
# TODO load StateData from BeaconChainDB # TODO load StateData from BeaconChainDB
let loaded = db.getState(root.get(), tmpState.data.data, noRollback) # We save state root separately for empty slots which means we might
if not loaded: # sometimes not find a state even though we saved its state root
# TODO We don't write state root and state atomically, so we need to be if db.getState(root.get(), tmpState.data.data, noRollback):
# lenient here in case of dirty shutdown - transactions would be tmpState.data.root = root.get()
# nice! tmpState.blck = bs.blck
warn "State root, but no state - database corrupt?",
stateRoot = root.get(), blockRoot = bs.blck.root, blockSlot = bs.slot
continue
tmpState.data.root = root.get() break
tmpState.blck = bs.blck
break
bs = bs.parent() # Iterate slot by slot in case there's a gap! bs = bs.parent() # Iterate slot by slot in case there's a gap!
@ -355,8 +349,17 @@ proc putState(pool: BlockPool, state: HashedBeaconState, blck: BlockRef) =
# we could easily see a state explosion # we could easily see a state explosion
logScope: pcs = "save_state_at_epoch_start" logScope: pcs = "save_state_at_epoch_start"
var rootWritten = false
if state.data.slot != blck.slot:
# This is a state that was produced by a skip slot for which there is no
# block - we'll save the state root in the database in case we need to
# replay the skip
pool.db.putStateRoot(blck.root, state.data.slot, state.root)
rootWritten = true
let epochParity = state.data.slot.compute_epoch_at_slot.uint64 mod 2 let epochParity = state.data.slot.compute_epoch_at_slot.uint64 mod 2
if state.data.slot mod SLOTS_PER_EPOCH == 0:
if state.data.slot.isEpoch:
if not pool.db.containsState(state.root): if not pool.db.containsState(state.root):
info "Storing state", info "Storing state",
blck = shortLog(blck), blck = shortLog(blck),
@ -364,8 +367,8 @@ proc putState(pool: BlockPool, state: HashedBeaconState, blck: BlockRef) =
stateRoot = shortLog(state.root), stateRoot = shortLog(state.root),
cat = "caching" cat = "caching"
pool.db.putState(state.root, state.data) pool.db.putState(state.root, state.data)
# TODO this should be atomic with the above write.. if not rootWritten:
pool.db.putStateRoot(blck.root, state.data.slot, state.root) pool.db.putStateRoot(blck.root, state.data.slot, state.root)
# Because state.data.slot mod SLOTS_PER_EPOCH == 0, wrap back to last # Because state.data.slot mod SLOTS_PER_EPOCH == 0, wrap back to last
# time this was the case i.e. last currentCache. The opposite parity, # time this was the case i.e. last currentCache. The opposite parity,
@ -462,7 +465,8 @@ proc add*(
# TODO if the block is from the future, we should not be resolving it (yet), # TODO if the block is from the future, we should not be resolving it (yet),
# but maybe we should use it as a hint that our clock is wrong? # but maybe we should use it as a hint that our clock is wrong?
updateStateData(pool, pool.tmpState, BlockSlot(blck: parent, slot: blck.slot - 1)) updateStateData(
pool, pool.tmpState, BlockSlot(blck: parent, slot: blck.slot - 1))
let let
poolPtr = unsafeAddr pool # safe because restore is short-lived poolPtr = unsafeAddr pool # safe because restore is short-lived
@ -639,17 +643,24 @@ func checkMissing*(pool: var BlockPool): seq[FetchRecord] =
proc skipAndUpdateState( proc skipAndUpdateState(
pool: BlockPool, pool: BlockPool,
state: var HashedBeaconState, blck: BlockRef, slot: Slot) = state: var HashedBeaconState, blck: BlockRef, slot: Slot, save: bool) =
while state.data.slot < slot: while state.data.slot < slot:
# Process slots one at a time in case afterUpdate needs to see empty states # Process slots one at a time in case afterUpdate needs to see empty states
process_slots(state, state.data.slot + 1) # TODO when replaying, we already do this query when loading the ancestors -
pool.putState(state, blck) # save and reuse
# TODO possibly we should keep this in memory for the hot blocks
let nextStateRoot = pool.db.getStateRoot(blck.root, state.data.slot + 1)
advance_slot(state, nextStateRoot)
if save:
pool.putState(state, blck)
proc skipAndUpdateState( proc skipAndUpdateState(
pool: BlockPool, pool: BlockPool,
state: var StateData, blck: BlockData, flags: UpdateFlags): bool = state: var StateData, blck: BlockData, flags: UpdateFlags, save: bool): bool =
pool.skipAndUpdateState(state.data, blck.refs, blck.data.message.slot - 1) pool.skipAndUpdateState(
state.data, blck.refs, blck.data.message.slot - 1, save)
var statePtr = unsafeAddr state # safe because `rollback` is locally scoped var statePtr = unsafeAddr state # safe because `rollback` is locally scoped
proc rollback(v: var HashedBeaconState) = proc rollback(v: var HashedBeaconState) =
@ -657,7 +668,7 @@ proc skipAndUpdateState(
statePtr[] = pool.headState statePtr[] = pool.headState
let ok = state_transition(state.data, blck.data, flags, rollback) let ok = state_transition(state.data, blck.data, flags, rollback)
if ok: if ok and save:
pool.putState(state.data, blck.refs) pool.putState(state.data, blck.refs)
ok ok
@ -678,16 +689,15 @@ proc rewindState(pool: BlockPool, state: var StateData, bs: BlockSlot):
# successive parent block and checking if we can find the corresponding state # successive parent block and checking if we can find the corresponding state
# in the database. # in the database.
var var
stateRoot = pool.db.getStateRoot(bs.blck.root, bs.slot) stateRoot = block:
let tmp = pool.db.getStateRoot(bs.blck.root, bs.slot)
if tmp.isSome() and pool.db.containsState(tmp.get()):
tmp
else:
# State roots are sometimes kept in database even though state is not
err(Opt[Eth2Digest])
curBs = bs curBs = bs
# TODO this can happen when state root is saved but state is gone - this would
# indicate a corrupt database, but since we're not atomically
# writing and deleting state+root mappings in a single transaction, it's
# likely to happen and we guard against it here.
if stateRoot.isSome() and not pool.db.containsState(stateRoot.get()):
stateRoot.err()
while stateRoot.isNone(): while stateRoot.isNone():
let parBs = curBs.parent() let parBs = curBs.parent()
if parBs.blck.isNil: if parBs.blck.isNil:
@ -783,8 +793,7 @@ proc getStateDataCached(pool: BlockPool, state: var StateData, bs: BlockSlot): b
# In-memory caches didn't hit. Try main blockpool database. This is slower # In-memory caches didn't hit. Try main blockpool database. This is slower
# than the caches due to SSZ (de)serializing and disk I/O, so prefer them. # than the caches due to SSZ (de)serializing and disk I/O, so prefer them.
if (let tmp = pool.db.getStateRoot(bs.blck.root, bs.slot); tmp.isSome()): if (let tmp = pool.db.getStateRoot(bs.blck.root, bs.slot); tmp.isSome()):
doAssert pool.getState(pool.db, tmp.get(), bs.blck, state) return pool.getState(pool.db, tmp.get(), bs.blck, state)
return true
false false
@ -800,7 +809,7 @@ proc updateStateData*(pool: BlockPool, state: var StateData, bs: BlockSlot) =
if state.blck.root == bs.blck.root and state.data.data.slot <= bs.slot: if state.blck.root == bs.blck.root and state.data.data.slot <= bs.slot:
if state.data.data.slot != bs.slot: if state.data.data.slot != bs.slot:
# Might be that we're moving to the same block but later slot # Might be that we're moving to the same block but later slot
pool.skipAndUpdateState(state.data, bs.blck, bs.slot) pool.skipAndUpdateState(state.data, bs.blck, bs.slot, true)
return # State already at the right spot return # State already at the right spot
@ -818,13 +827,22 @@ proc updateStateData*(pool: BlockPool, state: var StateData, bs: BlockSlot) =
# it's the one that we found the state with, and it has already been # it's the one that we found the state with, and it has already been
# applied. Pathologically quadratic in slot number, naïvely. # applied. Pathologically quadratic in slot number, naïvely.
for i in countdown(ancestors.len - 1, 0): for i in countdown(ancestors.len - 1, 0):
# Because the ancestors are in the database, there's no need to persist them
# again. Also, because we're applying blocks that were loaded from the
# database, we can skip certain checks that have already been performed
# before adding the block to the database. In particular, this means that
# no state root calculation will take place here, because we can load
# the final state root from the block itself.
let ok = let ok =
pool.skipAndUpdateState( pool.skipAndUpdateState(
state, ancestors[i], state, ancestors[i],
{skipBlsValidation, skipMerkleValidation, skipStateRootValidation}) {skipBlsValidation, skipMerkleValidation, skipStateRootValidation},
false)
doAssert ok, "Blocks in database should never fail to apply.." doAssert ok, "Blocks in database should never fail to apply.."
pool.skipAndUpdateState(state.data, bs.blck, bs.slot) # We save states here - blocks were guaranteed to have passed through the save
# function once at least, but not so for empty slots!
pool.skipAndUpdateState(state.data, bs.blck, bs.slot, true)
state.blck = bs.blck state.blck = bs.blck
@ -839,7 +857,6 @@ proc delState(pool: BlockPool, bs: BlockSlot) =
# Delete state state and mapping for a particular block+slot # Delete state state and mapping for a particular block+slot
if (let root = pool.db.getStateRoot(bs.blck.root, bs.slot); root.isSome()): if (let root = pool.db.getStateRoot(bs.blck.root, bs.slot); root.isSome()):
pool.db.delState(root.get()) pool.db.delState(root.get())
pool.db.delStateRoot(bs.blck.root, bs.slot)
proc updateHead*(pool: BlockPool, newHead: BlockRef) = proc updateHead*(pool: BlockPool, newHead: BlockRef) =
## Update what we consider to be the current head, as given by the fork ## Update what we consider to be the current head, as given by the fork

View File

@ -38,6 +38,9 @@ func compute_epoch_at_slot*(slot: Slot|uint64): Epoch =
template epoch*(slot: Slot): Epoch = template epoch*(slot: Slot): Epoch =
compute_epoch_at_slot(slot) compute_epoch_at_slot(slot)
template isEpoch*(slot: Slot): bool =
(slot mod SLOTS_PER_EPOCH) == 0
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#compute_start_slot_at_epoch # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#compute_start_slot_at_epoch
func compute_start_slot_at_epoch*(epoch: Epoch): Slot = func compute_start_slot_at_epoch*(epoch: Epoch): Slot =
# Return the start slot of ``epoch``. # Return the start slot of ``epoch``.

View File

@ -29,6 +29,7 @@
import import
chronicles, chronicles,
stew/results,
./extras, ./ssz, metrics, ./extras, ./ssz, metrics,
./spec/[datatypes, crypto, digest, helpers, validator], ./spec/[datatypes, crypto, digest, helpers, validator],
./spec/[state_transition_block, state_transition_epoch], ./spec/[state_transition_block, state_transition_epoch],
@ -121,6 +122,32 @@ func process_slot*(state: var HashedBeaconState) {.nbench.} =
state.data.block_roots[state.data.slot mod SLOTS_PER_HISTORICAL_ROOT] = state.data.block_roots[state.data.slot mod SLOTS_PER_HISTORICAL_ROOT] =
hash_tree_root(state.data.latest_block_header) hash_tree_root(state.data.latest_block_header)
# https://github.com/ethereum/eth2.0-specs/blob/v0.10.1/specs/phase0/beacon-chain.md#beacon-chain-state-transition-function
proc advance_slot*(state: var HashedBeaconState, nextStateRoot: Opt[Eth2Digest]) =
# Special case version of process_slots that moves one slot at a time - can
# run faster if the state root is known already (for example when replaying
# existing slots)
process_slot(state)
let is_epoch_transition = (state.data.slot + 1).isEpoch
if is_epoch_transition:
# Note: Genesis epoch = 0, no need to test if before Genesis
try:
beacon_previous_validators.set(get_epoch_validator_count(state.data))
except Exception as e: # TODO https://github.com/status-im/nim-metrics/pull/22
trace "Couldn't update metrics", msg = e.msg
process_epoch(state.data)
state.data.slot += 1
if is_epoch_transition:
try:
beacon_current_validators.set(get_epoch_validator_count(state.data))
except Exception as e: # TODO https://github.com/status-im/nim-metrics/pull/22
trace "Couldn't update metrics", msg = e.msg
if nextStateRoot.isSome:
state.root = nextStateRoot.get()
else:
state.root = hash_tree_root(state.data)
# https://github.com/ethereum/eth2.0-specs/blob/v0.10.1/specs/phase0/beacon-chain.md#beacon-chain-state-transition-function # https://github.com/ethereum/eth2.0-specs/blob/v0.10.1/specs/phase0/beacon-chain.md#beacon-chain-state-transition-function
proc process_slots*(state: var HashedBeaconState, slot: Slot) {.nbench.} = proc process_slots*(state: var HashedBeaconState, slot: Slot) {.nbench.} =
# TODO: Eth specs strongly assert that state.data.slot <= slot # TODO: Eth specs strongly assert that state.data.slot <= slot
@ -129,6 +156,11 @@ proc process_slots*(state: var HashedBeaconState, slot: Slot) {.nbench.} =
# but it maybe an artifact of the test case # but it maybe an artifact of the test case
# as this was not triggered in the testnet1 # as this was not triggered in the testnet1
# after a hour # after a hour
# TODO this function is not _really_ necessary: when replaying states, we
# advance slots one by one before calling `state_transition` - this way,
# we avoid the state root calculation - as such, instead of advancing
# slots "automatically" in `state_transition`, perhaps it would be better
# to keep a pre-condition that state must be at the right slot already?
if state.data.slot > slot: if state.data.slot > slot:
notice( notice(
"Unusual request for a slot in the past", "Unusual request for a slot in the past",
@ -139,22 +171,7 @@ proc process_slots*(state: var HashedBeaconState, slot: Slot) {.nbench.} =
# Catch up to the target slot # Catch up to the target slot
while state.data.slot < slot: while state.data.slot < slot:
process_slot(state) advance_slot(state, err(Opt[Eth2Digest]))
let is_epoch_transition = (state.data.slot + 1) mod SLOTS_PER_EPOCH == 0
if is_epoch_transition:
# Note: Genesis epoch = 0, no need to test if before Genesis
try:
beacon_previous_validators.set(get_epoch_validator_count(state.data))
except Exception as e: # TODO https://github.com/status-im/nim-metrics/pull/22
trace "Couldn't update metrics", msg = e.msg
process_epoch(state.data)
state.data.slot += 1
if is_epoch_transition:
try:
beacon_current_validators.set(get_epoch_validator_count(state.data))
except Exception as e: # TODO https://github.com/status-im/nim-metrics/pull/22
trace "Couldn't update metrics", msg = e.msg
state.root = hash_tree_root(state.data)
# TODO remove this once callers gone # TODO remove this once callers gone
proc process_slots*(state: var BeaconState, slot: Slot) {.deprecated: "Use HashedBeaconState version".} = proc process_slots*(state: var BeaconState, slot: Slot) {.deprecated: "Use HashedBeaconState version".} =

View File

@ -33,13 +33,15 @@ type Timers = enum
tEpoch = "Process epoch slot with block" tEpoch = "Process epoch slot with block"
tHashBlock = "Tree-hash block" tHashBlock = "Tree-hash block"
tSignBlock = "Sign block" tSignBlock = "Sign block"
tShuffle = "Retrieve committee once using get_beacon_committee" tAttest = "Have committee attest to block"
tAttest = "Combine committee attestations" tReplay = "Replay all produced blocks"
# TODO confutils is an impenetrable black box. how can a help text be added here? # TODO confutils is an impenetrable black box. how can a help text be added here?
cli do(slots = SLOTS_PER_EPOCH * 6, cli do(slots = SLOTS_PER_EPOCH * 6,
validators = SLOTS_PER_EPOCH * 100, # One per shard is minimum validators = SLOTS_PER_EPOCH * 100, # One per shard is minimum
attesterRatio {.desc: "ratio of validators that attest in each round"} = 0.73): attesterRatio {.desc: "ratio of validators that attest in each round"} = 0.73,
blockRatio {.desc: "ratio of slots with blocks"} = 1.0,
replay = true):
let let
state = loadGenesis(validators, true) state = loadGenesis(validators, true)
genesisBlock = get_initial_beacon_block(state[]) genesisBlock = get_initial_beacon_block(state[])
@ -56,11 +58,12 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
attPool = AttestationPool.init(blockPool) attPool = AttestationPool.init(blockPool)
timers: array[Timers, RunningStat] timers: array[Timers, RunningStat]
attesters: RunningStat attesters: RunningStat
r: Rand r = initRand(1)
proc handleAttestations() = let replayState = newClone(blockPool.headState)
proc handleAttestations(slot: Slot) =
let let
slot = blockPool.head.blck.slot
attestationHead = blockPool.head.blck.atSlot(slot) attestationHead = blockPool.head.blck.atSlot(slot)
blockPool.withState(blockPool.tmpState, attestationHead): blockPool.withState(blockPool.tmpState, attestationHead):
@ -72,7 +75,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
state, slot, committee_index.CommitteeIndex, cache) state, slot, committee_index.CommitteeIndex, cache)
for index_in_committee, validatorIdx in committee: for index_in_committee, validatorIdx in committee:
if (rand(r, high(int)).float * attesterRatio).int <= high(int): if rand(r, 1.0) <= attesterRatio:
let let
data = makeAttestationData(state, slot, committee_index, blck.root) data = makeAttestationData(state, slot, committee_index, blck.root)
sig = sig =
@ -89,10 +92,12 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
signature: sig signature: sig
)) ))
proc proposeBlock() = proc proposeBlock(slot: Slot) =
if rand(r, 1.0) > blockRatio:
return
let let
head = blockPool.head.blck head = blockPool.head.blck
slot = blockPool.head.blck.slot + 1
blockPool.withState(blockPool.tmpState, head.atSlot(slot)): blockPool.withState(blockPool.tmpState, head.atSlot(slot)):
var cache = get_empty_per_epoch_cache() var cache = get_empty_per_epoch_cache()
@ -130,18 +135,21 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
for i in 0..<slots: for i in 0..<slots:
let let
slot = blockPool.headState.data.data.slot + 1 slot = Slot(i + 1)
t = t =
if slot mod SLOTS_PER_EPOCH == 0: tEpoch if slot.isEpoch: tEpoch
else: tBlock else: tBlock
withTimer(timers[t]): if blockRatio > 0.0:
proposeBlock() withTimer(timers[t]):
proposeBlock(slot)
if attesterRatio > 0.0: if attesterRatio > 0.0:
withTimer(timers[tAttest]): withTimer(timers[tAttest]):
handleAttestations() handleAttestations(slot)
verifyConsensus(blockPool.headState.data.data, attesterRatio) # TODO if attestation pool was smarter, it would include older attestations
# too!
verifyConsensus(blockPool.headState.data.data, attesterRatio * blockRatio)
if t == tEpoch: if t == tEpoch:
echo &". slot: {shortLog(slot)} ", echo &". slot: {shortLog(slot)} ",
@ -150,6 +158,11 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
write(stdout, ".") write(stdout, ".")
flushFile(stdout) flushFile(stdout)
if replay:
withTimer(timers[tReplay]):
blockPool.updateStateData(
replayState[], blockPool.head.blck.atSlot(Slot(slots)))
echo "Done!" echo "Done!"
printTimers(blockPool.headState.data.data, attesters, true, timers) printTimers(blockPool.headState.data.data, attesters, true, timers)

View File

@ -40,7 +40,7 @@ func verifyConsensus*(state: BeaconState, attesterRatio: auto) =
doAssert state.finalized_checkpoint.epoch + 2 >= current_epoch doAssert state.finalized_checkpoint.epoch + 2 >= current_epoch
proc loadGenesis*(validators: int, validate: bool): ref BeaconState = proc loadGenesis*(validators: int, validate: bool): ref BeaconState =
let fn = &"genesim_{const_preset}_{validators}" let fn = &"genesim_{const_preset}_{validators}.ssz"
if fileExists(fn): if fileExists(fn):
let res = newClone(SSZ.loadFile(fn, BeaconState)) let res = newClone(SSZ.loadFile(fn, BeaconState))
if res.slot != GENESIS_SLOT: if res.slot != GENESIS_SLOT:

View File

@ -51,7 +51,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
latest_block_root = hash_tree_root(genesisBlock.message) latest_block_root = hash_tree_root(genesisBlock.message)
timers: array[Timers, RunningStat] timers: array[Timers, RunningStat]
attesters: RunningStat attesters: RunningStat
r: Rand r = initRand(1)
signedBlock: SignedBeaconBlock signedBlock: SignedBeaconBlock
cache = get_empty_per_epoch_cache() cache = get_empty_per_epoch_cache()
@ -89,7 +89,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
let t = let t =
if (state.slot > GENESIS_SLOT and if (state.slot > GENESIS_SLOT and
(state.slot + 1) mod SLOTS_PER_EPOCH == 0): tEpoch (state.slot + 1).isEpoch): tEpoch
else: tBlock else: tBlock
withTimer(timers[t]): withTimer(timers[t]):