Blob storage (#4454)

* Blob storage

* fix indentation

* Fix build (none->Opt.none)

* putBlobs -> putBlobsSidecar

* getBlobs -> getBlobsSidecar

* Check blob correctness when storing a backfill block

* Blobs table: rename and conditionally create

* Check block<->blob match in storeBackfillBlock

* Use when .. toFork() to condition on type

* Check blob viability in block_processor.storeBlock()

* Fix build

* Review feedback
This commit is contained in:
henridf 2023-01-09 19:42:10 +01:00 committed by GitHub
parent b1acae67c3
commit 64878888bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 127 additions and 37 deletions

View File

@ -20,12 +20,13 @@ import
eth2_ssz_serialization,
eth2_merkleization,
forks,
presets,
state_transition],
./spec/datatypes/[phase0, altair, bellatrix],
"."/[beacon_chain_db_light_client, filepath]
from ./spec/datatypes/capella import BeaconState
from ./spec/datatypes/eip4844 import TrustedSignedBeaconBlock
from ./spec/datatypes/eip4844 import TrustedSignedBeaconBlock, BlobsSidecar
export
phase0, altair, eth2_ssz_serialization, eth2_merkleization, kvstore,
@ -115,6 +116,8 @@ type
keyValues: KvStoreRef # Random stuff using DbKeyKind - suitable for small values mainly!
blocks: array[BeaconBlockFork, KvStoreRef] # BlockRoot -> TrustedSignedBeaconBlock
blobs: KvStoreRef # (BlockRoot -> BlobsSidecar)
stateRoots: KvStoreRef # (Slot, BlockRoot) -> StateRoot
statesNoVal: array[BeaconStateFork, KvStoreRef] # StateRoot -> ForkBeaconStateNoImmutableValidators
@ -478,7 +481,8 @@ proc new*(T: type BeaconChainDBV0,
)
proc new*(T: type BeaconChainDB,
db: SqStoreRef
db: SqStoreRef,
cfg: RuntimeConfig = defaultRuntimeConfig
): BeaconChainDB =
if not db.readOnly:
# Remove the deposits table we used before we switched
@ -523,6 +527,10 @@ proc new*(T: type BeaconChainDB,
altairBestUpdates: "lc_altair_best_updates",
sealedPeriods: "lc_sealed_periods")).expectDb()
var blobs : KvStoreRef
if cfg.EIP4844_FORK_EPOCH != FAR_FUTURE_EPOCH:
blobs = kvStore db.openKvStore("eip4844_blobs").expectDb()
# Versions prior to 1.4.0 (altair) stored validators in `immutable_validators`
# which stores validator keys in compressed format - this is
# slow to load and has been superceded by `immutable_validators2` which uses
@ -557,6 +565,7 @@ proc new*(T: type BeaconChainDB,
checkpoint: proc() = db.checkpoint(),
keyValues: keyValues,
blocks: blocks,
blobs: blobs,
stateRoots: stateRoots,
statesNoVal: statesNoVal,
stateDiffs: stateDiffs,
@ -567,6 +576,7 @@ proc new*(T: type BeaconChainDB,
proc new*(T: type BeaconChainDB,
dir: string,
cfg: RuntimeConfig = defaultRuntimeConfig,
inMemory = false,
readOnly = false
): BeaconChainDB =
@ -582,7 +592,7 @@ proc new*(T: type BeaconChainDB,
SqStoreRef.init(
dir, "nbc", readOnly = readOnly, manualCheckpoint = true).expectDb()
BeaconChainDB.new(db)
BeaconChainDB.new(db, cfg)
template getLightClientDataDB*(db: BeaconChainDB): LightClientDataDB =
db.lcData
@ -723,6 +733,8 @@ proc close*(db: BeaconChainDB) =
if db.db == nil: return
# Close things roughly in reverse order
if not isNil(db.blobs):
discard db.blobs.close()
db.lcData.close()
db.finalizedBlocks.close()
discard db.summaries.close()
@ -768,6 +780,11 @@ proc putBlock*(
db.blocks[type(value).toFork].putSZSSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())
proc putBlobsSidecar*(
db: BeaconChainDB,
value: BlobsSidecar) =
db.blobs.putSZSSZ(value.beacon_block_root.data, value)
proc updateImmutableValidators*(
db: BeaconChainDB, validators: openArray[Validator]) =
# Must be called before storing a state that references the new validators
@ -957,6 +974,12 @@ proc getBlock*[
else:
result.err()
proc getBlobsSidecar*(db: BeaconChainDB, key: Eth2Digest): Opt[BlobsSidecar] =
var blobs: BlobsSidecar
result.ok(blobs)
if db.blobs.getSZSSZ(key.data, result.get) != GetResult.found:
result.err()
proc getPhase0BlockSSZ(
db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = addr data # Short-lived

View File

@ -13,7 +13,8 @@ else:
import
std/[tables],
stew/bitops2,
../spec/forks
../spec/forks,
../spec/datatypes/eip4844
export tables, forks
@ -41,7 +42,8 @@ type
##
## Trivially invalid blocks may be dropped before reaching this stage.
orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock]
orphans*: Table[(Eth2Digest, ValidatorSig),
(ForkedSignedBeaconBlock, Opt[eip4844.BlobsSidecar])]
## Blocks that we don't have a parent for - when we resolve the parent, we
## can proceed to resolving the block as well - we index this by root and
## signature such that a block with invalid signature won't cause a block
@ -150,7 +152,7 @@ func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) =
while toCheck.len > 0:
let root = toCheck.pop()
for k, v in quarantine.orphans.mpairs():
if getForkedBlockField(v, parent_root) == root:
if getForkedBlockField(v[0], parent_root) == root:
toCheck.add(k[0])
toRemove.add(k)
elif k[0] == root:
@ -168,7 +170,7 @@ func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[(Eth2Digest, ValidatorSig)]
for k, v in quarantine.orphans:
if not isViableOrphan(finalizedSlot, v):
if not isViableOrphan(finalizedSlot, v[0]):
toDel.add k
for k in toDel:
@ -193,7 +195,8 @@ func clearAfterReorg*(quarantine: var Quarantine) =
# likely imminent arrival.
func addOrphan*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: ForkedSignedBeaconBlock): bool =
signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar]): bool =
## Adds block to quarantine's `orphans` and `missing` lists.
if not isViableOrphan(finalizedSlot, signedBlock):
quarantine.addUnviable(signedBlock.root)
@ -215,13 +218,13 @@ func addOrphan*(
return false
quarantine.orphans[(signedBlock.root, signedBlock.signature)] =
signedBlock
(signedBlock, blobs)
quarantine.missing.del(signedBlock.root)
true
iterator pop*(quarantine: var Quarantine, root: Eth2Digest):
ForkedSignedBeaconBlock =
(ForkedSignedBeaconBlock, Opt[eip4844.BlobsSidecar]) =
# Pop orphans whose parent is the block identified by `root`
var toRemove: seq[(Eth2Digest, ValidatorSig)]
@ -230,6 +233,6 @@ iterator pop*(quarantine: var Quarantine, root: Eth2Digest):
quarantine.orphans.del k
for k, v in quarantine.orphans.mpairs():
if getForkedBlockField(v, parent_root) == root:
if getForkedBlockField(v[0], parent_root) == root:
toRemove.add(k)
yield v

View File

@ -30,6 +30,8 @@ from ../consensus_object_pools/block_quarantine import
from ../validators/validator_monitor import
MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock,
registerSyncAggregateInBlock
from ../spec/datatypes/eip4844 import BlobsSidecar
from ../spec/state_transition_block import validate_blobs_sidecar
export sszdump, signatures_batch
@ -43,6 +45,7 @@ declareHistogram beacon_store_block_duration_seconds,
type
BlockEntry* = object
blck*: ForkedSignedBeaconBlock
blobs*: Opt[eip4844.BlobsSidecar]
resfut*: Future[Result[void, VerifierError]]
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
@ -98,6 +101,7 @@ type
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar],
resfut: Future[Result[void, VerifierError]] = nil,
validationDur = Duration())
@ -151,14 +155,31 @@ proc dumpBlock[T](
from ../consensus_object_pools/block_clearance import
addBackfillBlock, addHeadBlock
from ../beacon_chain_db import putBlobsSidecar
proc storeBackfillBlock(
self: var BlockProcessor,
signedBlock: ForkySignedBeaconBlock): Result[void, VerifierError] =
signedBlock: ForkySignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar]): Result[void, VerifierError] =
# The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root)
# Establish blob viability before calling addbackfillBlock to avoid
# writing the block in case of blob error.
let blobsOk =
when typeof(signedBlock).toFork() >= BeaconBlockFork.EIP4844:
blobs.isSome() and not
validate_blobs_sidecar(signedBlock.message.slot,
hash_tree_root(signedBlock.message),
signedBlock.message
.body.blob_kzg_commitments.asSeq,
blobs.get()).isOk()
else:
true
if not blobsOk:
return err(VerifierError.Invalid)
let res = self.consensusManager.dag.addBackfillBlock(signedBlock)
if res.isErr():
@ -175,9 +196,14 @@ proc storeBackfillBlock(
# Track unviables so that descendants can be discarded properly
self.consensusManager.quarantine[].addUnviable(signedBlock.root)
else: discard
return res
if blobs.isSome():
# Only store blobs after successfully establishing block viability.
self.consensusManager.dag.db.putBlobsSidecar(blobs.get())
res
from web3/engine_api_types import PayloadExecutionStatus, PayloadStatusV1
from ../eth1/eth1_monitor import
Eth1Monitor, asEngineExecutionPayload, ensureDataProvider, newPayload
@ -345,8 +371,9 @@ proc getExecutionValidity(
proc storeBlock*(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock, queueTick: Moment = Moment.now(),
validationDur = Duration()):
signedBlock: ForkySignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar],
queueTick: Moment = Moment.now(), validationDur = Duration()):
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} =
## storeBlock is the main entry point for unvalidated blocks - all untrusted
## blocks, regardless of origin, pass through here. When storing a block,
@ -393,6 +420,17 @@ proc storeBlock*(
# be re-added later
self.consensusManager.quarantine[].removeOrphan(signedBlock)
# Establish blob viability before calling addHeadBlock to avoid
# writing the block in case of blob error.
when typeof(signedBlock).toFork() >= BeaconBlockFork.EIP4844:
if blobs.isSome() and not
validate_blobs_sidecar(signedBlock.message.slot,
hash_tree_root(signedBlock.message),
signedBlock.message
.body.blob_kzg_commitments.asSeq,
blobs.get()).isOk():
return err((VerifierError.Invalid, ProcessingStatus.completed))
type Trusted = typeof signedBlock.asTrusted()
let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do (
blckRef: BlockRef, trustedBlock: Trusted,
@ -434,7 +472,7 @@ proc storeBlock*(
return err((VerifierError.UnviableFork, ProcessingStatus.completed))
if not self.consensusManager.quarantine[].addOrphan(
dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock)):
dag.finalizedHead.slot, ForkedSignedBeaconBlock.init(signedBlock), blobs):
debug "Block quarantine full",
blockRoot = shortLog(signedBlock.root),
blck = shortLog(signedBlock.message),
@ -446,6 +484,10 @@ proc storeBlock*(
return err((blck.error, ProcessingStatus.completed))
# write blobs now that block has been written.
if blobs.isSome():
self.consensusManager.dag.db.putBlobsSidecar(blobs.get())
let storeBlockTick = Moment.now()
# Eagerly update head: the incoming block "should" get selected.
@ -541,7 +583,7 @@ proc storeBlock*(
for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent
self[].addBlock(MsgSource.gossip, quarantined)
self[].addBlock(MsgSource.gossip, quarantined[0], quarantined[1])
return Result[BlockRef, (VerifierError, ProcessingStatus)].ok blck.get
@ -550,7 +592,7 @@ proc storeBlock*(
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
resfut: Future[Result[void, VerifierError]] = nil,
blobs: Opt[eip4844.BlobsSidecar], resfut: Future[Result[void, VerifierError]] = nil,
validationDur = Duration()) =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
@ -565,8 +607,7 @@ proc addBlock*(
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
# let backfill blocks skip the queue - these are always "fast" to process
# because there are no state rewinds to deal with
let res = self.storeBackfillBlock(blck)
let res = self.storeBackfillBlock(blck, blobs)
if resfut != nil:
resfut.complete(res)
return
@ -574,6 +615,7 @@ proc addBlock*(
try:
self.blockQueue.addLastNoWait(BlockEntry(
blck: blck,
blobs: blobs,
resfut: resfut, queueTick: Moment.now(),
validationDur: validationDur,
src: src))
@ -598,7 +640,8 @@ proc processBlock(
let res = withBlck(entry.blck):
await self.storeBlock(
entry.src, wallTime, blck, entry.queueTick, entry.validationDur)
entry.src, wallTime, blck, entry.blobs, entry.queueTick,
entry.validationDur)
if res.isErr and res.error[1] == ProcessingStatus.notCompleted:
# When an execution engine returns an error or fails to respond to a
@ -609,8 +652,7 @@ proc processBlock(
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-alpha.2/sync/optimistic.md#execution-engine-errors
await sleepAsync(chronos.seconds(1))
self[].addBlock(
entry.src, entry.blck, entry.resfut, entry.validationDur)
entry.src, entry.blck, entry.blobs, entry.resfut, entry.validationDur)
# To ensure backpressure on the sync manager, do not complete these futures.
return

View File

@ -225,6 +225,7 @@ proc processSignedBeaconBlock*(
self.blockProcessor[].addBlock(
src, ForkedSignedBeaconBlock.init(signedBlock),
Opt.none(eip4844.BlobsSidecar),
validationDur = nanoseconds(
(self.getCurrentBeaconTime() - wallTime).nanoseconds))
@ -248,6 +249,7 @@ proc processSignedBeaconBlockAndBlobsSidecar*(
(afterGenesis, wallSlot) = wallTime.toSlot()
template signedBlock: auto = signedBlockAndBlobsSidecar.beacon_block
template blobs: auto = signedBlockAndBlobsSidecar.blobs_sidecar
logScope:
blockRoot = shortLog(signedBlock.root)
@ -267,7 +269,8 @@ proc processSignedBeaconBlockAndBlobsSidecar*(
debug "Block received", delay
let blockRes =
self.dag.validateBeaconBlock(self.quarantine, signedBlock, wallTime, {})
self.dag.validateBeaconBlock(self.quarantine, signedBlock, Opt.some(blobs),
wallTime, {})
if blockRes.isErr():
debug "Dropping block", error = blockRes.error()
self.blockProcessor[].dumpInvalidBlock(signedBlock)
@ -289,6 +292,7 @@ proc processSignedBeaconBlockAndBlobsSidecar*(
trace "Block validated"
self.blockProcessor[].addBlock(
src, ForkedSignedBeaconBlock.init(signedBlock),
Opt.some(blobs),
validationDur = nanoseconds(
(self.getCurrentBeaconTime() - wallTime).nanoseconds))

View File

@ -230,11 +230,16 @@ proc validateBeaconBlock*(
signed_beacon_block: phase0.SignedBeaconBlock | altair.SignedBeaconBlock |
bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock |
eip4844.SignedBeaconBlock,
blobs: Opt[eip4844.BlobsSidecar],
wallTime: BeaconTime, flags: UpdateFlags): Result[void, ValidationError] =
# In general, checks are ordered from cheap to expensive. Especially, crypto
# verification could be quite a bit more expensive than the rest. This is an
# externally easy-to-invoke function by tossing network packets at the node.
# We should enforce this statically via the type system... but for now, assert.
when typeof(signed_beacon_block).toFork() < BeaconBlockFork.EIP4844:
doAssert blobs.isNone(), "Blobs with pre-EIP4844 block"
# [IGNORE] The block is not from a future slot (with a
# MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. validate that
# signed_beacon_block.message.slot <= current_slot (a client MAY queue future
@ -326,7 +331,7 @@ proc validateBeaconBlock*(
# in the quarantine for later processing
if not quarantine[].addOrphan(
dag.finalizedHead.slot,
ForkedSignedBeaconBlock.init(signed_beacon_block)):
ForkedSignedBeaconBlock.init(signed_beacon_block), blobs):
debug "Block quarantine full"
return errIgnore("BeaconBlock: Parent not found")
@ -391,6 +396,15 @@ proc validateBeaconBlock*(
ok()
proc validateBeaconBlock*(
dag: ChainDAGRef, quarantine: ref Quarantine,
signed_beacon_block: phase0.SignedBeaconBlock | altair.SignedBeaconBlock |
bellatrix.SignedBeaconBlock | capella.SignedBeaconBlock |
eip4844.SignedBeaconBlock,
wallTime: BeaconTime, flags: UpdateFlags): Result[void, ValidationError] =
dag.validateBeaconBlock(quarantine, signed_beacon_block,
Opt.none(eip4844.BlobsSidecar), wallTime, flags)
proc validateBeaconBlockAndBlobsSidecar*(signedBlock: SignedBeaconBlockAndBlobsSidecar):
Result[void, ValidationError] =
# TODO

View File

@ -311,7 +311,8 @@ proc initFullNode(
# taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
blockProcessor[].addBlock(MsgSource.gossip, signedBlock, resfut)
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(eip4844.BlobsSidecar), resfut)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
@ -424,7 +425,7 @@ proc init*(T: type BeaconNode,
exitQueue: newAsyncEventQueue[SignedVoluntaryExit](),
finalQueue: newAsyncEventQueue[FinalizationInfoObject]()
)
db = BeaconChainDB.new(config.databaseDir, inMemory = false)
db = BeaconChainDB.new(config.databaseDir, cfg, inMemory = false)
if config.finalizedCheckpointBlock.isSome:
warn "--finalized-checkpoint-block has been deprecated, ignoring"

View File

@ -71,7 +71,7 @@ proc doTrustedNodeSync*(
quit 1
let
db = BeaconChainDB.new(databaseDir, inMemory = false)
db = BeaconChainDB.new(databaseDir, cfg, inMemory = false)
defer:
db.close()

View File

@ -11,6 +11,7 @@ else:
{.push raises: [].}
import
stew/results,
std/sequtils,
chronicles,
metrics,
@ -18,7 +19,8 @@ import
../consensus_object_pools/spec_cache,
../gossip_processing/eth2_processor,
../networking/eth2_network,
./activity_metrics
./activity_metrics,
../spec/datatypes/eip4844
export eth2_processor, eth2_network
@ -122,7 +124,7 @@ proc routeSignedBeaconBlock*(
let
newBlockRef = await router[].blockProcessor.storeBlock(
MsgSource.api, sendTime, blck)
MsgSource.api, sendTime, blck, Opt.none(eip4844.BlobsSidecar))
# The boolean we return tells the caller whether the block was integrated
# into the chain

View File

@ -855,7 +855,7 @@ proc insertValidators(db: SqStoreRef, state: ForkedHashedBeaconState,
proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
# Create a database with performance information for every epoch
info "Opening database..."
let db = BeaconChainDB.new(conf.databaseDir.string, false, true)
let db = BeaconChainDB.new(conf.databaseDir.string, readOnly = true)
defer: db.close()
if (let v = ChainDAGRef.isInitialized(db); v.isErr()):

View File

@ -14,6 +14,7 @@ import
eth/keys, taskpools,
../beacon_chain/beacon_clock,
../beacon_chain/spec/[beaconstate, forks, helpers, state_transition],
../beacon_chain/spec/datatypes/eip4844,
../beacon_chain/gossip_processing/block_processor,
../beacon_chain/consensus_object_pools/[
attestation_pool, blockchain_dag, block_quarantine, block_clearance,
@ -59,7 +60,7 @@ suite "Block processor" & preset():
asyncTest "Reverse order block add & get" & preset():
let missing = await processor.storeBlock(
MsgSource.gossip, b2.message.slot.start_beacon_time(), b2)
MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, Opt.none(BlobsSidecar))
check: missing.error[0] == VerifierError.MissingParent
check:
@ -69,7 +70,7 @@ suite "Block processor" & preset():
let
status = await processor.storeBlock(
MsgSource.gossip, b2.message.slot.start_beacon_time(), b1)
MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, Opt.none(BlobsSidecar))
b1Get = dag.getBlockRef(b1.root)
check:

View File

@ -10,7 +10,7 @@
import
unittest2,
../beacon_chain/spec/forks,
../beacon_chain/spec/datatypes/phase0,
../beacon_chain/spec/datatypes/[phase0,eip4844],
../beacon_chain/consensus_object_pools/block_quarantine
func makeBlock(slot: Slot, parent: Eth2Digest): ForkedSignedBeaconBlock =
@ -35,13 +35,13 @@ suite "Block quarantine":
check:
FetchRecord(root: b1.root) in quarantine.checkMissing()
quarantine.addOrphan(Slot 0, b1)
quarantine.addOrphan(Slot 0, b1, Opt.none(BlobsSidecar))
FetchRecord(root: b1.root) notin quarantine.checkMissing()
quarantine.addOrphan(Slot 0, b2)
quarantine.addOrphan(Slot 0, b3)
quarantine.addOrphan(Slot 0, b4)
quarantine.addOrphan(Slot 0, b2, Opt.none(BlobsSidecar))
quarantine.addOrphan(Slot 0, b3, Opt.none(BlobsSidecar))
quarantine.addOrphan(Slot 0, b4, Opt.none(BlobsSidecar))
(b4.root, ValidatorSig()) in quarantine.orphans