use snappy-framed format for compressing bellatrix+ database entries (#3551)

`.era` files and Req/Resp protocols use framed formats - aligning the
database with these makes for less recompression work overall as gossip
is sent only once while req/resp repeats (potentially) - this also
allows efficient pruning-to-era where snappy-recompression is the major
cycle thief.
This commit is contained in:
Jacek Sieka 2022-03-29 13:33:06 +02:00 committed by GitHub
parent 2c5f725543
commit 5092fc41c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 208 additions and 31 deletions

View File

@ -10,7 +10,7 @@
import
std/[typetraits, tables],
stew/[arrayops, assign2, byteutils, endians2, io2, objects, results],
serialization, chronicles, snappy,
serialization, chronicles, snappy, snappy/framing,
eth/db/[kvstore, kvstore_sqlite3],
./networking/network_metadata, ./beacon_chain_db_immutable,
./spec/[eth2_ssz_serialization, eth2_merkleization, forks, state_transition],
@ -69,6 +69,9 @@ type
## 1.3 creates `kvstore` with rowid, making it quite fast, but doesn't do
## anything about existing databases. Versions after that use a separate
## file instead (V1)
##
## Starting with bellatrix, we store blocks and states using snappy framed
## encoding so as to match the `Req`/`Resp` protocols and era files ("SZ").
backend: KvStoreRef # kvstore
stateStore: KvStoreRef # state_no_validators
@ -521,6 +524,18 @@ proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool =
err = e.msg, typ = name(T), dataLen = data.len
false
proc decodeSZSSZ[T](data: openArray[byte], output: var T): bool =
try:
let decompressed = framingFormatUncompress(data)
readSszBytes(decompressed, output, updateRoot = false)
true
except CatchableError as e:
# If the data can't be deserialized, it could be because it's from a
# version of the software that uses a different SSZ encoding
warn "Unable to deserialize data, old database?",
err = e.msg, typ = name(T), dataLen = data.len
false
proc encodeSSZ(v: auto): seq[byte] =
try:
SSZ.encode(v)
@ -534,6 +549,14 @@ proc encodeSnappySSZ(v: auto): seq[byte] =
# In-memory encode shouldn't fail!
raiseAssert err.msg
proc encodeSZSSZ(v: auto): seq[byte] =
# https://github.com/google/snappy/blob/main/framing_format.txt
try:
framingFormatCompress(SSZ.encode(v))
except CatchableError as err:
# In-memory encode shouldn't fail!
raiseAssert err.msg
proc getRaw(db: KvStoreRef, key: openArray[byte], T: type Eth2Digest): Opt[T] =
var res: Opt[T]
proc decode(data: openArray[byte]) =
@ -561,9 +584,7 @@ type GetResult = enum
proc getSSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult =
var status = GetResult.notFound
# TODO address is needed because there's no way to express lifetimes in nim
# we'll use unsafeAddr to find the code later
var outputPtr = unsafeAddr output # callback is local, ptr wont escape
var outputPtr = addr output # callback is local, ptr wont escape
proc decode(data: openArray[byte]) =
status =
if decodeSSZ(data, outputPtr[]): GetResult.found
@ -579,9 +600,7 @@ proc putSSZ(db: KvStoreRef, key: openArray[byte], v: auto) =
proc getSnappySSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult =
var status = GetResult.notFound
# TODO address is needed because there's no way to express lifetimes in nim
# we'll use unsafeAddr to find the code later
var outputPtr = unsafeAddr output # callback is local, ptr wont escape
var outputPtr = addr output # callback is local, ptr wont escape
proc decode(data: openArray[byte]) =
status =
if decodeSnappySSZ(data, outputPtr[]): GetResult.found
@ -594,6 +613,22 @@ proc getSnappySSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetRe
proc putSnappySSZ(db: KvStoreRef, key: openArray[byte], v: auto) =
db.put(key, encodeSnappySSZ(v)).expectDb()
proc getSZSSZ[T](db: KvStoreRef, key: openArray[byte], output: var T): GetResult =
var status = GetResult.notFound
var outputPtr = addr output # callback is local, ptr wont escape
proc decode(data: openArray[byte]) =
status =
if decodeSZSSZ(data, outputPtr[]): GetResult.found
else: GetResult.corrupted
discard db.get(key, decode).expectDb()
status
proc putSZSSZ(db: KvStoreRef, key: openArray[byte], v: auto) =
db.put(key, encodeSZSSZ(v)).expectDb()
proc close*(db: BeaconChainDBV0) =
discard db.stateStore.close()
discard db.backend.close()
@ -628,11 +663,20 @@ proc putBeaconBlockSummary*(
# Summaries are too simple / small to compress, store them as plain SSZ
db.summaries.putSSZ(root.data, value)
proc putBlock*(db: BeaconChainDB, value: ForkyTrustedSignedBeaconBlock) =
proc putBlock*(
db: BeaconChainDB,
value: phase0.TrustedSignedBeaconBlock | altair.TrustedSignedBeaconBlock) =
db.withManyWrites:
db.blocks[type(value).toFork].putSnappySSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())
proc putBlock*(
db: BeaconChainDB,
value: bellatrix.TrustedSignedBeaconBlock) =
db.withManyWrites:
db.blocks[type(value).toFork].putSZSSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())
proc updateImmutableValidators*(
db: BeaconChainDB, validators: openArray[Validator]) =
# Must be called before storing a state that references the new validators
@ -659,11 +703,18 @@ template toBeaconStateNoImmutableValidators(state: bellatrix.BeaconState):
BellatrixBeaconStateNoImmutableValidators =
isomorphicCast[BellatrixBeaconStateNoImmutableValidators](state)
proc putState*(db: BeaconChainDB, key: Eth2Digest, value: ForkyBeaconState) =
proc putState*(
db: BeaconChainDB, key: Eth2Digest,
value: phase0.BeaconState | altair.BeaconState) =
db.updateImmutableValidators(value.validators.asSeq())
db.statesNoVal[type(value).toFork()].putSnappySSZ(
key.data, toBeaconStateNoImmutableValidators(value))
proc putState*(db: BeaconChainDB, key: Eth2Digest, value: bellatrix.BeaconState) =
db.updateImmutableValidators(value.validators.asSeq())
db.statesNoVal[type(value).toFork()].putSZSSZ(
key.data, toBeaconStateNoImmutableValidators(value))
proc putState*(db: BeaconChainDB, state: ForkyHashedBeaconState) =
db.withManyWrites:
db.putStateRoot(state.latest_block_root, state.data.slot, state.root)
@ -740,10 +791,9 @@ proc getBlock*(
# set root after deserializing (so it doesn't get zeroed)
result.get().root = key
proc getBlock*[
X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock](
proc getBlock*(
db: BeaconChainDB, key: Eth2Digest,
T: type X): Opt[T] =
T: type altair.TrustedSignedBeaconBlock): Opt[T] =
# We only store blocks that we trust in the database
result.ok(default(T))
if db.blocks[T.toFork].getSnappySSZ(key.data, result.get) == GetResult.found:
@ -752,19 +802,33 @@ proc getBlock*[
else:
result.err()
proc getPhase0BlockSSZ(db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = unsafeAddr data # Short-lived
proc getBlock*[
X: bellatrix.TrustedSignedBeaconBlock](
db: BeaconChainDB, key: Eth2Digest,
T: type X): Opt[T] =
# We only store blocks that we trust in the database
result.ok(default(T))
if db.blocks[T.toFork].getSZSSZ(key.data, result.get) == GetResult.found:
# set root after deserializing (so it doesn't get zeroed)
result.get().root = key
else:
result.err()
proc getPhase0BlockSSZ(
db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and success
db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and
success
# SSZ implementations are separate so as to avoid unnecessary data copies
proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type phase0.TrustedSignedBeaconBlock): bool =
let dataPtr = unsafeAddr data # Short-lived
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
@ -772,17 +836,26 @@ proc getBlockSSZ*(
db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or
db.v0.getPhase0BlockSSZ(key, data)
proc getBlockSSZ*[
X: altair.TrustedSignedBeaconBlock | bellatrix.TrustedSignedBeaconBlock](
proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type X): bool =
let dataPtr = unsafeAddr data # Short-lived
T: type altair.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success
proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type bellatrix.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatUncompress(data)
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success
proc getBlockSSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
fork: BeaconBlockFork): bool =
@ -794,9 +867,53 @@ proc getBlockSSZ*(
of BeaconBlockFork.Bellatrix:
getBlockSSZ(db, key, data, bellatrix.TrustedSignedBeaconBlock)
proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type phase0.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatCompress(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.blocks[BeaconBlockFork.Phase0].get(key.data, decode).expectDb() and success or
db.v0.getPhase0BlockSSZ(key, data)
proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type altair.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = framingFormatCompress(
snappy.decode(data, maxDecompressedDbRecordSize))
except CatchableError: success = false
db.blocks[T.toFork].get(key.data, decode).expectDb() and success
proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
T: type bellatrix.TrustedSignedBeaconBlock): bool =
let dataPtr = addr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
assign(dataPtr[], data)
db.blocks[T.toFork].get(key.data, decode).expectDb() and success
proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],
fork: BeaconBlockFork): bool =
case fork
of BeaconBlockFork.Phase0:
getBlockSZ(db, key, data, phase0.TrustedSignedBeaconBlock)
of BeaconBlockFork.Altair:
getBlockSZ(db, key, data, altair.TrustedSignedBeaconBlock)
of BeaconBlockFork.Bellatrix:
getBlockSZ(db, key, data, bellatrix.TrustedSignedBeaconBlock)
proc getStateOnlyMutableValidators(
immutableValidators: openArray[ImmutableValidatorData2],
store: KvStoreRef, key: openArray[byte], output: var ForkyBeaconState,
store: KvStoreRef, key: openArray[byte],
output: var (phase0.BeaconState | altair.BeaconState),
rollback: RollbackProc): bool =
## Load state into `output` - BeaconState is large so we want to avoid
## re-allocating it if possible
@ -835,6 +952,47 @@ proc getStateOnlyMutableValidators(
rollback()
false
proc getStateOnlyMutableValidators(
immutableValidators: openArray[ImmutableValidatorData2],
store: KvStoreRef, key: openArray[byte], output: var bellatrix.BeaconState,
rollback: RollbackProc): bool =
## Load state into `output` - BeaconState is large so we want to avoid
## re-allocating it if possible
## Return `true` iff the entry was found in the database and `output` was
## overwritten.
## Rollback will be called only if output was partially written - if it was
## not found at all, rollback will not be called
# TODO rollback is needed to deal with bug - use `noRollback` to ignore:
# https://github.com/nim-lang/Nim/issues/14126
# TODO RVO is inefficient for large objects:
# https://github.com/nim-lang/Nim/issues/13879
case store.getSZSSZ(key, toBeaconStateNoImmutableValidators(output))
of GetResult.found:
let numValidators = output.validators.len
doAssert immutableValidators.len >= numValidators
for i in 0 ..< numValidators:
let
# Bypass hash cache invalidation
dstValidator = addr output.validators.data[i]
assign(
dstValidator.pubkey,
immutableValidators[i].pubkey.toPubKey())
assign(
dstValidator.withdrawal_credentials,
immutableValidators[i].withdrawal_credentials)
output.validators.resetCache()
true
of GetResult.notFound:
false
of GetResult.corrupted:
rollback()
false
proc getState(
db: BeaconChainDBV0,
immutableValidators: openArray[ImmutableValidatorData2],

View File

@ -478,8 +478,6 @@ proc getBlock*(
proc getBlockSSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool =
# Load the SSZ-encoded data of a block into `bytes`, overwriting the existing
# content
# careful: there are two snappy encodings in use, with and without framing!
# Returns true if the block is found, false if not
let fork = dag.cfg.blockForkAtEpoch(bid.slot.epoch)
dag.db.getBlockSSZ(bid.root, bytes, fork) or
(bid.slot <= dag.finalizedHead.slot and
@ -487,6 +485,18 @@ proc getBlockSSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool =
dag.era, getStateField(dag.headState, historical_roots).asSeq,
bid.slot, bytes).isOk)
proc getBlockSZ*(dag: ChainDAGRef, bid: BlockId, bytes: var seq[byte]): bool =
# Load the snappy-frame-compressed ("SZ") SSZ-encoded data of a block into
# `bytes`, overwriting the existing content
# careful: there are two snappy encodings in use, with and without framing!
# Returns true if the block is found, false if not
let fork = dag.cfg.blockForkAtEpoch(bid.slot.epoch)
dag.db.getBlockSZ(bid.root, bytes, fork) or
(bid.slot <= dag.finalizedHead.slot and
getBlockSZ(
dag.era, getStateField(dag.headState, historical_roots).asSeq,
bid.slot, bytes).isOk)
proc getForkedBlock*(
dag: ChainDAGRef, bid: BlockId): Opt[ForkedTrustedSignedBeaconBlock] =

View File

@ -243,11 +243,11 @@ proc init*(T: type EraGroup, f: IoHandle, startSlot: Option[Slot]): Result[T, st
else: 0
))))
proc update*(g: var EraGroup, f: IoHandle, slot: Slot, sszBytes: openArray[byte]): Result[void, string] =
proc update*(g: var EraGroup, f: IoHandle, slot: Slot, szBytes: openArray[byte]): Result[void, string] =
doAssert slot >= g.slotIndex.startSlot
g.slotIndex.offsets[int(slot - g.slotIndex.startSlot)] =
try:
? f.appendRecord(SnappyBeaconBlock, framingFormatCompress(sszBytes))
? f.appendRecord(SnappyBeaconBlock, szBytes)
except CatchableError as e: raiseAssert e.msg # TODO fix snappy
ok()

View File

@ -487,7 +487,7 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
withTimer(timers[tBlocks]):
var blocks: array[SLOTS_PER_HISTORICAL_ROOT.int, BlockId]
for i in dag.getBlockRange(firstSlot.get(), 1, blocks)..<blocks.len:
if dag.getBlockSSZ(blocks[i], tmp):
if dag.getBlockSZ(blocks[i], tmp):
group.update(e2, blocks[i].slot, tmp).get()
withTimer(timers[tState]):

View File

@ -14,7 +14,7 @@ import
../beacon_chain/spec/[beaconstate, forks, state_transition],
../beacon_chain/spec/datatypes/[phase0, altair, bellatrix],
../beacon_chain/consensus_object_pools/blockchain_dag,
eth/db/kvstore,
eth/db/kvstore, snappy/framing,
# test utilies
./testutil, ./testdbutil, ./testblockutil, ./teststateutil
@ -100,7 +100,7 @@ suite "Beacon chain DB" & preset():
db.putBlock(signedBlock)
var tmp: seq[byte]
var tmp, tmp2: seq[byte]
check:
db.containsBlock(root)
db.containsBlock(root, phase0.TrustedSignedBeaconBlock)
@ -108,7 +108,9 @@ suite "Beacon chain DB" & preset():
not db.containsBlock(root, bellatrix.TrustedSignedBeaconBlock)
db.getBlock(root, phase0.TrustedSignedBeaconBlock).get() == signedBlock
db.getBlockSSZ(root, tmp, phase0.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, phase0.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
db.delBlock(root)
check:
@ -118,6 +120,7 @@ suite "Beacon chain DB" & preset():
not db.containsBlock(root, bellatrix.TrustedSignedBeaconBlock)
db.getBlock(root, phase0.TrustedSignedBeaconBlock).isErr()
not db.getBlockSSZ(root, tmp, phase0.TrustedSignedBeaconBlock)
not db.getBlockSZ(root, tmp2, phase0.TrustedSignedBeaconBlock)
db.putStateRoot(root, signedBlock.message.slot, root)
var root2 = root
@ -139,7 +142,7 @@ suite "Beacon chain DB" & preset():
db.putBlock(signedBlock)
var tmp: seq[byte]
var tmp, tmp2: seq[byte]
check:
db.containsBlock(root)
not db.containsBlock(root, phase0.TrustedSignedBeaconBlock)
@ -147,7 +150,9 @@ suite "Beacon chain DB" & preset():
not db.containsBlock(root, bellatrix.TrustedSignedBeaconBlock)
db.getBlock(root, altair.TrustedSignedBeaconBlock).get() == signedBlock
db.getBlockSSZ(root, tmp, altair.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, altair.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
db.delBlock(root)
check:
@ -157,6 +162,7 @@ suite "Beacon chain DB" & preset():
not db.containsBlock(root, bellatrix.TrustedSignedBeaconBlock)
db.getBlock(root, altair.TrustedSignedBeaconBlock).isErr()
not db.getBlockSSZ(root, tmp, altair.TrustedSignedBeaconBlock)
not db.getBlockSZ(root, tmp2, altair.TrustedSignedBeaconBlock)
db.putStateRoot(root, signedBlock.message.slot, root)
var root2 = root
@ -178,7 +184,7 @@ suite "Beacon chain DB" & preset():
db.putBlock(signedBlock)
var tmp: seq[byte]
var tmp, tmp2: seq[byte]
check:
db.containsBlock(root)
not db.containsBlock(root, phase0.TrustedSignedBeaconBlock)
@ -186,7 +192,9 @@ suite "Beacon chain DB" & preset():
db.containsBlock(root, bellatrix.TrustedSignedBeaconBlock)
db.getBlock(root, bellatrix.TrustedSignedBeaconBlock).get() == signedBlock
db.getBlockSSZ(root, tmp, bellatrix.TrustedSignedBeaconBlock)
db.getBlockSZ(root, tmp2, bellatrix.TrustedSignedBeaconBlock)
tmp == SSZ.encode(signedBlock)
tmp2 == framingFormatCompress(tmp)
db.delBlock(root)
check:
@ -196,6 +204,7 @@ suite "Beacon chain DB" & preset():
not db.containsBlock(root, bellatrix.TrustedSignedBeaconBlock)
db.getBlock(root, bellatrix.TrustedSignedBeaconBlock).isErr()
not db.getBlockSSZ(root, tmp, bellatrix.TrustedSignedBeaconBlock)
not db.getBlockSZ(root, tmp2, bellatrix.TrustedSignedBeaconBlock)
db.putStateRoot(root, signedBlock.message.slot, root)
var root2 = root

2
vendor/nim-snappy vendored

@ -1 +1 @@
Subproject commit 2256d6efb266dd1f65fc24077fb148377edb5b8c
Subproject commit 6537b10600686f3a698aa5f4f71cc12a40154eb9