update era file documentation / impl (#3226)

Overhaul of era files, including documentation and reference
implementations

* store blocks, then state, then slot indices for easy lookup at low
cost
* document era file rationale
* altair+ support in era writer
This commit is contained in:
Jacek Sieka 2022-01-07 11:13:19 +01:00 committed by GitHub
parent 52fc646fbd
commit ba99c8fe4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 584 additions and 176 deletions

View File

@ -633,6 +633,40 @@ proc getMergeBlock*(db: BeaconChainDB, key: Eth2Digest):
else:
result.err()
proc getPhase0BlockSSZ(db: BeaconChainDBV0, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = unsafeAddr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.backend.get(subkey(phase0.SignedBeaconBlock, key), decode).expectDb() and success
proc getPhase0BlockSSZ*(db: BeaconChainDB, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = unsafeAddr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.blocks.get(key.data, decode).expectDb() and success or
db.v0.getPhase0BlockSSZ(key, data)
proc getAltairBlockSSZ*(db: BeaconChainDB, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = unsafeAddr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.altairBlocks.get(key.data, decode).expectDb() and success
proc getMergeBlockSSZ*(db: BeaconChainDB, key: Eth2Digest, data: var seq[byte]): bool =
let dataPtr = unsafeAddr data # Short-lived
var success = true
proc decode(data: openArray[byte]) =
try: dataPtr[] = snappy.decode(data, maxDecompressedDbRecordSize)
except CatchableError: success = false
db.mergeBlocks.get(key.data, decode).expectDb() and success
proc getStateOnlyMutableValidators(
immutableValidators: openArray[ImmutableValidatorData2],
store: KvStoreRef, key: openArray[byte], output: var ForkyBeaconState,

View File

@ -1661,3 +1661,16 @@ proc aggregateAll*(
err("aggregate: no attesting keys")
else:
ok(finish(aggregateKey))
proc getBlockSSZ*(dag: ChainDAGRef, id: BlockId, bytes: var seq[byte]): bool =
# Load the SSZ-encoded data of a block into `bytes`, overwriting the existing
# content
# careful: there are two snappy encodings in use, with and without framing!
# Returns true if the block is found, false if not
case dag.cfg.blockForkAtEpoch(id.slot.epoch)
of BeaconBlockFork.Phase0:
dag.db.getPhase0BlockSSZ(id.root, bytes)
of BeaconBlockFork.Altair:
dag.db.getAltairBlockSSZ(id.root, bytes)
of BeaconBlockFork.Bellatrix:
dag.db.getMergeBlockSSZ(id.root, bytes)

View File

@ -23,8 +23,7 @@ proc installConfigApiHandlers*(router: var RestRouter, node: BeaconNode) =
RestApiResponse.prepareJsonResponse(
(
# https://github.com/ethereum/consensus-specs/blob/v1.0.1/configs/mainnet/phase0.yaml
CONFIG_NAME:
const_preset,
CONFIG_NAME: node.dag.cfg.name(),
# https://github.com/ethereum/consensus-specs/blob/v1.1.3/presets/mainnet/phase0.yaml
MAX_COMMITTEES_PER_SLOT:

View File

@ -36,6 +36,8 @@ type
PRESET_BASE*: string
CONFIG_NAME*: string
# Transition
TERMINAL_TOTAL_DIFFICULTY*: UInt256
TERMINAL_BLOCK_HASH*: BlockHash
@ -148,8 +150,6 @@ const
"DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF",
"DOMAIN_CONTRIBUTION_AND_PROOF",
"CONFIG_NAME",
"TRANSITION_TOTAL_DIFFICULTY", # Name that appears in some altair alphas, obsolete, remove when no more testnets
]
@ -444,3 +444,9 @@ proc readRuntimeConfig*(
msg: "Config not compatible with binary, compile with -d:const_preset=" & cfg.PRESET_BASE)
(cfg, unknowns)
template name*(cfg: RuntimeConfig): string =
if cfg.CONFIG_NAME.len() > 0:
cfg.CONFIG_NAME
else:
const_preset

View File

@ -32,34 +32,39 @@ The following python code can be used to read an e2 file:
```python
import sys, struct
with open(sys.argv[1], "rb") as f:
def read_entry(f):
header = f.read(8)
typ = header[0:2] # First 2 bytes for type
if not header: return None
if typ != b"e2":
raise RuntimeError("this is not an e2store file")
typ = header[0:2] # 2 bytes of type
dlen = struct.unpack("<q", header[2:8] + b"\0\0")[0] # 6 bytes of little-endian length
while True:
header = f.read(8) # Header is 8 bytes
if not header: break
data = f.read(dlen)
typ = header[0:2] # First 2 bytes for type
dlen = struct.unpack("<q", header[2:8] + b"\0\0")[0] # 6 bytes of little-endian length
return (typ, data)
print("typ:", "".join("{:02x}".format(x) for x in typ), "len:", dlen)
def print_stats(name):
with open(name, "rb") as f:
sizes = {}
entries = 0
data = f.read(dlen)
if len(data) != dlen: # Don't trust the given length, specially when pre-allocating
print("Missing data", len(data), dlen)
break
while True:
(typ, data) = read_entry(f)
if typ == b"i2":
print("Index header")
break
elif typ == b"e2":
print("e2 header") # May appear
if not typ:
break
entries += 1
old = sizes.get(typ, (0, 0))
sizes[typ] = (old[0] + len(data), old[1] + 1)
print("Entries", entries)
for k, v in dict(sorted(sizes.items())).items():
print("type", k.hex(), "bytes", v[0], "count", v[1], "average", v[0] / v[1])
```
## Writing
`e2s` files are by design intended to be append-only, making them suitable for cold storage of finalized chain data.
@ -100,61 +105,183 @@ type: [0x00, 0x00]
The `Empty` type contains no data, but may have a length. The corresponding amount of data should be skiped while reading the file.
# Slot Index files
Index files are files that store indices to linear histories of entries. They consist of offsets that point the the beginning of the corresponding record. Index files start with an 8-byte header and a starting offset followed by a series of `uint64` encoded as little endian bytes. An index of 0 idicates that there is no data for the given slot.
Each entry in the slot index is fixed-length, meaning that the entry for slot `N` can be found at index `(N * 8) + 16` in the index file. Index files only support linear histories.
By convention, slot index files have the name `.e2i`.
```
header | starting-slot | index | index | index ...
```
## IndexVersion
## SlotIndex
```
type: [0x69, 0x32]
data: starting-slot | index | index | index ... | count
```
The `version` header of an index file consists of the bytes `[0x69, 0x32, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]`.
`SlotIndex` records store offsets, in bytes, from the beginning of the index record to the beginning of the corresponding data at that slot. An offset of `0` indicates that no data is present for the given slot.
## Reading
Each entry in the slot index is a fixed-length 8-byte signed integer, meaning that the entry for slot `N` can be found at index `(N * 8) + 16` in the index. The length of a `SlotIndex` record can be computed as `count * 8 + 24` - one entry for every slot and 8 bytes each for type header, starting slot and count. In particular, knowing where the slot index ends allows finding its beginning as well.
Only one entry per slot is supported, meaning that only one canonical history can be indexed this way.
A `SlotIndex` record may appear in a stand-alone file which by convention ends with `.e2i` - in this case, the offset is counted as if the index was appened to its corresponding data file - offsets are thus negative and counted from the end of the data file. In particular, if the index is simply appended to the data file, it does not change in contents.
### Reading
```python
def find_offset(name, slot):
# Find the offset of a given slot
with open(name, "rb") as f:
header = f.read(8)
typ = header[0:2] # First 2 bytes for type
def read_slot_index(f):
# Read a slot index, assuming `f` is positioned at the end of the record
record_end = f.tell()
f.seek(-8, 1) # Relative seek to get count
if typ != b"i2":
raise RuntimeError("this is not an e2store file")
count = struct.unpack("<q", f.read(8))[0]
start_slot = struct.unpack("<q", f.read(8))[0]
record_start = record_end - (8 * count + 24)
if record_start < 0:
raise RuntimeError("Record count out of bounds")
f.seek(8 * (slot - start_slot) + 16)
f.seek(record_start) # Absolute seek
return struct.unpack("<q", f.read(8))[0]
(typ, data) = read_entry(f)
if typ != b"i2":
raise RuntimeError("this is not an e2store index record")
start_slot = struct.unpack("<q", data[0:8])[0]
# Convert slot indices to absolute file offsets
slot_entries = (data[(i+1) * 8:(i+2)*8] for i in range(0, (len(data)//8 - 2)))
slot_offsets = [struct.unpack("<q", entry)[0] for entry in slot_entries]
return (start_slot, record_start, slot_offsets)
```
# Era files
`.era` files are special instances of `.e2s` files that follow a more strict content format optimised for reading and long-term storage and distribution. Era files contain groups consisting of a state and the blocks that led up to it, limited to `SLOTS_PER_HISTORICAL_ROOT` slots each, allowing quick verification of the data contained in the file.
Each era is identified by when it ends. Thus, the genesis era is era 0, followed by era 1 which ends before slot 8192 etc.
Each era is identified by when it ends. Thus, the genesis era is era 0, followed by era 1 which ends when slot 8192 has been processed, but the block that potentially exists at slot 8192 has not yet been applied.
`.era` files MAY follow a simple naming convention: `eth2-<network>-<era-number>-<era-count>.era` with era and count hex-encoded to 8 digits.
## File name
`.era` file names follow a simple convention: `<config-name>-<era-number>-<era-count>-<short-historical-root>.era`:
* `config-name` is the `CONFIG_NAME` field of the runtime configation (`mainnet`, `prater`, etc)
* `era-number` is the number of the _last_ era stored in the file - for example, the genesis era file has number 0 - as a 5-digit 0-filled decimal integer
* `era-count` is the number of eras stored in the file, as a 5-digit 0-filled decimal integer
* `short-historical-root` is the first 4 bytes of the last historical root in the last state in the era file, lower-case hex-encoded (8 characters), except the genesis era which instead uses the `genesis_validators_root` field from the genesis state.
* The root is available as `state.historical_roots[era - 1]` except genesis, whose historical root is all `0`
An era file containing the mainnet genesis is thus named `mainnet-00000000-0000-0001.era`, and the era after that `mainnet-40cf2f3c-0001-0001.era`.
## Structure
An `.era` file is structured in the following way:
```
era := group+
group := canonical-state | blocks*
group := Version | block* | canonical-state | other-entries* | slot-index(block)? | slot-index(state)
block := CompressedSignedBeaconBlock
canonical-state := CompressedBeaconState
```
The `canonical-state` is the state of the slot that immediately follows the end of the era without applying blocks from the next era. For example, for the era that covers the first 8192 slots will have all blocks applied up to slot 8191 and will `process_slots` up to 8192. The genesis group contains only the genesis state but no blocks.
The `block` entries of a group include all blocks pertaining to an era. For example, the group representing era one will have all blocks from slot 0 up to and including block 8191.
The `canonical-state` is the state of the slot that immediately follows the end of the era without applying blocks from the next era. For example, era 1 that covers the first 8192 slots will have all blocks applied up to slot 8191 and will `process_slots` up to 8192. The genesis group contains only the genesis state but no blocks.
`slot-index(state)` is a `SlotIndex` entry with `count = 1` for the `CompressedBeaconState` entry of that era, pointing out the offset where the state entry begins. (TODO: consider count > 1 for files that cover multiple eras - breaks trivial composability of each era snippet but allows instant lookup in multi-era files)
`slot-index(block)` is a `SlotIndex` entry with `count = SLOTS_PER_HISTORICAL_ROOT` for the `CompressedSignedBeaconBlock` entries in that era, pointing out the offsets of each block in the era. It is omitted for the genesis era.
`other-entries` is the extension point for future record types in the era file. The positioning of these allows the indices to continue to be looked up from the back.
The structure of the era file gives it the following properties:
* the indices at the end are fixed-length: they can be used to discover the beginning of an era if the end of it is known
* the start slot field of the state slot index idenfifies which era the group pertains to
* the state in the era file is the end state after having applied all the blocks in the era - the `block_roots` entries in the state can be used to discover the digest of the blocks - either to verify the intergrity of the era file or to quickly load block roots without computing them
* each group in the era file is full, indendent era file - eras can freely be split and combined
## Reading era files
```python
def read_era_file(name):
# Print contents of an era file, backwards
with open(name, "rb") as f:
# Seek to end of file to figure out the indices of the state and blocks
f.seek(0, 2)
groups = 0
while True:
if f.tell() < 8:
break
(start_slot, state_index_start, state_slot_offsets) = read_slot_index(f)
print(
"State slot:", start_slot,
"state index start:", state_index_start,
"offsets", state_slot_offsets)
# The start of the state index record is the end of the block index record, if any
f.seek(state_index_start)
# This can underflow! Python should complain when seeking - ymmv
prev_group = state_index_start + state_slot_offsets[0] - 8
if start_slot > 0:
(block_slot, block_index_start, block_slot_offsets) = read_slot_index(f)
print(
"Block start slot:", block_slot,
"block index start:", block_index_start,
"offsets", len(block_slot_offsets))
if any((x for x in block_slot_offsets if x != 0)):
# This can underflow! Python should complain when seeking - ymmv
prev_group = block_index_start + [x for x in block_slot_offsets if x != 0][0] - 8
print("Previous group starts at:", prev_group)
# The beginning of the first block (or the state, if there are no blocks)
# is the end of the previous group
f.seek(prev_group) # Skip header
groups += 1
print("Groups in file:", groups)
```
# FAQ
## Why snappy framed compression?
* The networking protocol uses snappy framed compression, avoiding the need to re-compress data to serve blocks
* Each entry can be decompressed separately
* It's fast and compresses decently - some compression stats for the first 100 eras:
* Uncompressed: 8.4gb
* Snappy compression: 4.7gb
* `xz` of uncompressed: 3.8gb
## Why SLOTS_PER_HISTORICAL_ROOT blocks per state?
The state stores the block root of the latest `SLOTS_PER_HISTORICAL_ROOT` blocks - storing one state per that many blocks allows verifying the integrity of the blocks easily against the given state, and ensures that all block and state root information remains available, for example to validate states and blocks against `historical_roots`.
## Why include the state at all?
This is a tradeoff between being able to access state data such as validator keys and balances directly vs and recreating it by applying each block one by one from from genesis. Given an era file, you can always start processing the chain from there onwards.
## Why the weird file name?
Historical roots for the entire beacon chain history are stored in the state - thus, with a recent state one can quickly judge if an era file is part of the same history - this is useful for example when performing checkpoint sync.
The genesis era file uses the genesis validators root for two reasons: it allows disambiguating otherwise similar chains and the genesis state does not yet have a historical root to use.
The era numbers are zero-filled so that they trivially can be sorted - 5 digits is enough for 99999 eras or ~312 years
## How long is an era?
An era is typically 8192 slots, or roughly 27.3 hours - a bit more than a day.
## What happens after the merge?
Era files will store execution block contents, but not execution states (these are too large) - a full era history thus gives the full ethereum history from the merge onwards, for convenient cold storage.
## What is a "canonical state" and why use it?
The state transition function in ethereum does 3 things: slot processing, epoch processing and block processing, in that order. In particular, the slot and epoch processing is done for every slot and epoch, but the block processing may be skipped. When epoch processing is done, all the epoch-related fields in the state have been written, and a new epoch can begin - it's thus reasonable to say that the epoch processing is the last thing that happens in an epoch and the block processing happens in the context of the new epoch.
Storing the "canonical state" without the block applied means that any block from the new epoch can be applied to it - if two histories exist, one that skips the first block in the epoch and one that includes it, one can use the same canonical state in both cases.
Era files place the state first for a number of reasons: the state is then guaranteed to contain all public keys and block roots needed to verify the blocks in the file. A special case is the genesis era file - this file contains only the genesis state.

View File

@ -1,96 +1,240 @@
{.push raises: [Defect].}
import
stew/[endians2, results],
std/strformat,
stew/[arrayops, endians2, io2, results],
snappy, snappy/framing,
../beacon_chain/spec/datatypes/phase0,
../beacon_chain/spec/forks,
../beacon_chain/spec/eth2_ssz_serialization
const
E2Version = [byte 0x65, 0x32]
E2Index = [byte 0x69, 0x32]
SnappyBeaconBlock = [byte 0x01, 0x00]
SnappyBeaconState = [byte 0x02, 0x00]
E2Version* = [byte 0x65, 0x32]
E2Index* = [byte 0x69, 0x32]
SnappyBeaconBlock* = [byte 0x01, 0x00]
SnappyBeaconState* = [byte 0x02, 0x00]
TypeFieldLen = 2
LengthFieldLen = 6
HeaderFieldLen = TypeFieldLen + LengthFieldLen
type
E2Store* = object
data: File
index: File
slot: Slot
Type* = array[2, byte]
Header* = object
typ*: array[2, byte]
len*: uint64
typ*: Type
len*: int
proc append(f: File, data: openArray[byte]): Result[void, string] =
EraFile* = object
handle: IoHandle
start: Slot
Index* = object
startSlot*: Slot
offsets*: seq[int64] # Absolute positions in file
proc toString(v: IoErrorCode): string =
try: ioErrorMsg(v)
except Exception as e: raiseAssert e.msg
func eraFileName*(cfg: RuntimeConfig, state: ForkyBeaconState, era: uint64): string =
try:
if writeBytes(f, data, 0, data.len()) != data.len:
err("Cannot write to file")
else:
ok()
except CatchableError as exc:
err(exc.msg)
let
historicalRoot =
if era == 0: state.genesis_validators_root
elif era > state.historical_roots.lenu64(): Eth2Digest()
else: state.historical_roots.asSeq()[era - 1]
proc readHeader(f: File): Result[Header, string] =
try:
var buf: array[8, byte]
if system.readBuffer(f, addr buf[0], 8) != 8:
return err("Not enough bytes for header")
except CatchableError as e:
return err("Cannot read header")
&"{cfg.name()}-{era.int:05}-{1:05}-{shortLog(historicalRoot)}.era"
except ValueError as exc:
raiseAssert exc.msg
proc appendRecord(f: File, typ: array[2, byte], data: openArray[byte]): Result[int64, string] =
try:
let start = getFilePos(f)
let dlen = toBytesLE(data.len().uint64)
proc append(f: IoHandle, data: openArray[byte]): Result[void, string] =
if (? writeFile(f, data).mapErr(toString)) != data.len.uint:
return err("could not write data")
ok()
? append(f, typ)
? append(f, dlen.toOpenArray(0, 5))
? append(f, data)
ok(start)
except CatchableError as e:
err(e.msg)
proc appendHeader(f: IoHandle, typ: Type, dataLen: int): Result[int64, string] =
let start = ? getFilePos(f).mapErr(toString)
proc open*(T: type E2Store, path: string, name: string, firstSlot: Slot): Result[E2Store, string] =
let
data =
try: open(path / name & ".e2s", fmWrite)
except CatchableError as e: return err(e.msg)
index =
try: system.open(path / name & ".e2i", fmWrite)
except CatchableError as e:
close(data)
return err(e.msg)
discard ? appendRecord(data, E2Version, [])
discard ? appendRecord(index, E2Index, [])
? append(index, toBytesLE(firstSlot.uint64))
? append(f, typ)
? append(f, toBytesLE(dataLen.uint64).toOpenArray(0, 5))
ok(E2Store(data: data, index: index, slot: firstSlot))
ok(start)
func close*(store: var E2Store) =
store.data.close()
store.index.close()
proc appendRecord*(f: IoHandle, typ: Type, data: openArray[byte]): Result[int64, string] =
let start = ? appendHeader(f, typ, data.len())
? append(f, data)
ok(start)
proc toCompressedBytes(item: auto): seq[byte] =
try:
let
payload = SSZ.encode(item)
framingFormatCompress(payload)
framingFormatCompress(SSZ.encode(item))
except CatchableError as exc:
raiseAssert exc.msg # shouldn't happen
proc appendRecord*(store: var E2Store, v: phase0.TrustedSignedBeaconBlock): Result[void, string] =
if v.message.slot < store.slot:
return err("Blocks must be written in order")
let start = store.data.appendRecord(SnappyBeaconBlock, toCompressedBytes(v)).get()
while store.slot < v.message.slot:
? append(store.index, toBytesLE(0'u64))
store.slot += 1
? append(store.index, toBytesLE(start.uint64))
store.slot += 1
proc appendRecord*(f: IoHandle, v: ForkyTrustedSignedBeaconBlock): Result[int64, string] =
f.appendRecord(SnappyBeaconBlock, toCompressedBytes(v))
proc appendRecord*(f: IoHandle, v: ForkyBeaconState): Result[int64, string] =
f.appendRecord(SnappyBeaconState, toCompressedBytes(v))
proc appendIndex*(f: IoHandle, startSlot: Slot, offsets: openArray[int64]): Result[int64, string] =
let
len = offsets.len() * sizeof(int64) + 16
pos = ? f.appendHeader(E2Index, len)
? f.append(startSlot.uint64.toBytesLE())
for v in offsets:
? f.append(cast[uint64](v - pos).toBytesLE())
? f.append(offsets.lenu64().toBytesLE())
ok(pos)
proc appendRecord(f: IoHandle, index: Index): Result[int64, string] =
f.appendIndex(index.startSlot, index.offsets)
proc checkBytesLeft(f: IoHandle, expected: int64): Result[void, string] =
let size = ? getFileSize(f).mapErr(toString)
if expected > size:
return err("Record extends past end of file")
let pos = ? getFilePos(f).mapErr(toString)
if expected > size - pos:
return err("Record extends past end of file")
ok()
proc appendRecord*(store: var E2Store, v: phase0.BeaconState): Result[void, string] =
discard ? store.data.appendRecord(SnappyBeaconState, toCompressedBytes(v))
proc readFileExact(f: IoHandle, buf: var openArray[byte]): Result[void, string] =
if (? f.readFile(buf).mapErr(toString)) != buf.len().uint:
return err("missing data")
ok()
proc readHeader(f: IoHandle): Result[Header, string] =
var buf: array[10, byte]
? readFileExact(f, buf.toOpenArray(0, 7))
var
typ: Type
discard typ.copyFrom(buf)
# Cast safe because we had only 6 bytes of length data
let
len = cast[int64](uint64.fromBytesLE(buf.toOpenArray(2, 9)))
# No point reading these..
if len > int.high(): return err("header length exceeds int.high")
# Must have at least that much data, or header is invalid
? f.checkBytesLeft(len)
ok(Header(typ: typ, len: int(len)))
proc readRecord(f: IoHandle, data: var seq[byte]): Result[Header, string] =
let header = ? readHeader(f)
if header.len > 0:
? f.checkBytesLeft(header.len)
data.setLen(header.len)
? readFileExact(f, data)
ok(header)
proc readIndexCount*(f: IoHandle): Result[int, string] =
var bytes: array[8, byte]
? f.readFileExact(bytes)
let count = uint64.fromBytesLE(bytes)
if count > (int.high() div 8) - 3: return err("count: too large")
let size = uint64(? f.getFileSize().mapErr(toString))
# Need to have at least this much data in the file to read an index with
# this count
if count > (size div 8 + 3): return err("count: too large")
ok(int(count)) # Sizes checked against int above
proc findIndexStartOffset*(f: IoHandle): Result[int64, string] =
? f.setFilePos(-8, SeekPosition.SeekCurrent).mapErr(toString)
let
count = ? f.readIndexCount() # Now we're back at the end of the index
bytes = count.int64 * 8 + 24
ok(-bytes)
proc readIndex*(f: IoHandle): Result[Index, string] =
let
startPos = ? f.getFilePos().mapErr(toString)
fileSize = ? f.getFileSize().mapErr(toString)
header = ? f.readHeader()
if header.typ != E2Index: return err("not an index")
if header.len < 16: return err("index entry too small")
if header.len mod 8 != 0: return err("index length invalid")
var buf: array[8, byte]
? f.readFileExact(buf)
let
slot = uint64.fromBytesLE(buf)
count = header.len div 8 - 2
var offsets = newSeqUninitialized[int64](count)
for i in 0..<count:
? f.readFileExact(buf)
let offset = uint64.fromBytesLE(buf)
# Wrapping math is actually convenient here
let absolute = cast[int64](cast[uint64](startPos) + offset)
if absolute < 0 or absolute > fileSize: return err("Invalid offset")
offsets[i] = absolute
? f.readFileExact(buf)
if uint64(count) != uint64.fromBytesLE(buf): return err("invalid count")
# technically not an error, but we'll throw this sanity check in here..
if slot > int32.high().uint64: return err("fishy slot")
ok(Index(startSlot: Slot(slot), offsets: offsets))
type
EraGroup* = object
eraStart: int64
slotIndex*: Index
proc init*(T: type EraGroup, f: IoHandle, startSlot: Option[Slot]): Result[T, string] =
let eraStart = ? f.appendHeader(E2Version, 0)
ok(EraGroup(
eraStart: eraStart,
slotIndex: Index(
startSlot: startSlot.get(Slot(0)),
offsets: newSeq[int64](
if startSlot.isSome(): SLOTS_PER_HISTORICAL_ROOT.int
else: 0
))))
proc update*(g: var EraGroup, f: IoHandle, slot: Slot, sszBytes: openArray[byte]): Result[void, string] =
doAssert slot >= g.slotIndex.startSlot
g.slotIndex.offsets[int(slot - g.slotIndex.startSlot)] =
try:
? f.appendRecord(SnappyBeaconBlock, framingFormatCompress(sszBytes))
except CatchableError as e: raiseAssert e.msg # TODO fix snappy
ok()
proc finish*(g: var EraGroup, f: IoHandle, state: ForkyBeaconState): Result[void, string] =
let
statePos = ? f.appendRecord(state)
if state.slot > Slot(0):
discard ? f.appendRecord(g.slotIndex)
discard ? f.appendIndex(state.slot, [statePos])
ok()

View File

@ -1,49 +1,113 @@
import sys, struct
def read_e2store(name):
with open(name, "rb") as f:
header = f.read(8)
typ = header[0:2] # First 2 bytes for type
def read_entry(f):
header = f.read(8)
if not header: return (None, None)
if typ != b"e2":
raise RuntimeError("this is not an e2store file")
typ = header[0:2] # 2 bytes of type
dlen = struct.unpack("<q", header[2:8] + b"\0\0")[0] # 6 bytes of little-endian length
data = f.read(dlen)
return (typ, data)
def read_slot_index(f):
# Read a slot index, assuming `f` is positioned at the end of the record
record_end = f.tell()
f.seek(-8, 1) # Relative seek to get count
count = struct.unpack("<q", f.read(8))[0]
record_start = record_end - (8 * count + 24)
if record_start < 0:
raise RuntimeError("Record count out of bounds")
f.seek(record_start) # Absolute seek
(typ, data) = read_entry(f)
if typ != b"i2":
raise RuntimeError("this is not an e2store index record")
start_slot = struct.unpack("<q", data[0:8])[0]
# Convert slot indices to absolute file offsets
slot_entries = (data[(i+1) * 8:(i+2)*8] for i in range(0, (len(data)//8 - 2)))
slot_offsets = [struct.unpack("<q", entry)[0] for entry in slot_entries]
return (start_slot, record_start, slot_offsets)
def read_era_file(name):
# Print contents of an era file, backwards
with open(name, "rb") as f:
# Seek to end of file to figure out the indices of the state and blocks
f.seek(0, 2)
groups = 0
while True:
if f.tell() < 8:
break
(start_slot, state_index_start, state_slot_offsets) = read_slot_index(f)
print(
"State slot:", start_slot,
"state index start:", state_index_start,
"offsets", state_slot_offsets)
# The start of the state index record is the end of the block index record, if any
f.seek(state_index_start)
# This can underflow! Python should complain when seeking - ymmv
prev_group = state_index_start + state_slot_offsets[0] - 8
if start_slot > 0:
(block_slot, block_index_start, block_slot_offsets) = read_slot_index(f)
print(
"Block start slot:", block_slot,
"block index start:", block_index_start,
"offsets", len(block_slot_offsets))
if any((x for x in block_slot_offsets if x != 0)):
# This can underflow! Python should complain when seeking - ymmv
prev_group = block_index_start + [x for x in block_slot_offsets if x != 0][0] - 8
print("Previous group starts at:", prev_group)
# The beginning of the first block (or the state, if there are no blocks)
# is the end of the previous group
f.seek(prev_group) # Skip header
groups += 1
print("Groups in file:", groups)
def print_stats(name):
with open(name, "rb") as f:
sizes = {}
entries = 0
while True:
header = f.read(8) # Header is 8 bytes
if not header: break
(typ, data) = read_entry(f)
typ = header[0:2] # First 2 bytes for type
dlen = struct.unpack("<q", header[2:8] + b"\0\0")[0] # 6 bytes of little-endian length
if not typ:
break
entries += 1
data = f.read(dlen)
if len(data) != dlen: # Don't trust the given length, specially when pre-allocating
raise RuntimeError("File is missing data")
old = sizes.get(typ, (0, 0))
sizes[typ] = (old[0] + len(data), old[1] + 1)
if typ == b"i2":
raise RuntimeError("Cannot switch to index mode")
elif typ == b"e2":
pass # Ignore extra headers
print("Entries", entries)
yield (typ, data)
for k, v in dict(sorted(sizes.items())).items():
print("type", k.hex(), "bytes", v[0], "count", v[1], "average", v[0] / v[1])
def find_offset(name, slot):
# Find the offset of a given slot
with open(name, "rb") as f:
header = f.read(8)
typ = header[0:2] # First 2 bytes for type
def print_help():
print(sys.argv[0], "stats|era filename")
exit(1)
if typ != b"i2":
raise RuntimeError("this is not an e2store file")
if len(sys.argv) != 3:
print_help()
start_slot = struct.unpack("<q", f.read(8))[0]
f.seek(8 * (slot - start_slot) + 16)
return struct.unpack("<q", f.read(8))[0]
name = sys.argv[1]
if name.endswith(".e2i"):
print(find_offset(name, int(sys.argv[2])))
else:
for typ, data in read_e2store(name):
print("typ", typ, "data", len(data))
if sys.argv[1] == "stats": print_stats(sys.argv[2])
elif sys.argv[1] == "era": read_era_file(sys.argv[2])
else: print_help()

View File

@ -506,44 +506,53 @@ proc cmdExportEra(conf: DbConf, cfg: RuntimeConfig) =
echo "Database not initialized: ", v.error()
quit 1
type Timers = enum
tState
tBlocks
echo "Initializing block pool..."
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = init(ChainDAGRef, cfg, db, validatorMonitor, {})
let tmpState = assignClone(dag.headState)
var
tmp: seq[byte]
timers: array[Timers, RunningStat]
for era in conf.era..<conf.era + conf.eraCount:
let
firstSlot = if era == 0: Slot(0) else: Slot((era - 1) * SLOTS_PER_HISTORICAL_ROOT)
firstSlot =
if era == 0: none(Slot)
else: some(Slot((era - 1) * SLOTS_PER_HISTORICAL_ROOT))
endSlot = Slot(era * SLOTS_PER_HISTORICAL_ROOT)
slotCount = endSlot - firstSlot
name = &"ethereum2-mainnet-{era.int:08x}-{1:08x}"
canonical = dag.head.atCanonicalSlot(endSlot)
if endSlot > dag.head.slot:
echo "Written all complete eras"
break
var e2s = E2Store.open(".", name, firstSlot).get()
defer: e2s.close()
let name = withState(dag.headState.data): eraFileName(cfg, state.data, era)
echo "Writing ", name
dag.withUpdatedState(tmpState[], canonical) do:
e2s.appendRecord(stateData.data.phase0Data.data).get()
do: raiseAssert "withUpdatedState failed"
let e2 = openFile(name, {OpenFlags.Write, OpenFlags.Create}).get()
defer: discard closeFile(e2)
var
ancestors: seq[BlockRef]
cur = canonical.blck
if era != 0:
while cur != nil and cur.slot >= firstSlot:
ancestors.add(cur)
cur = cur.parent
var group = EraGroup.init(e2, firstSlot).get()
if firstSlot.isSome():
withTimer(timers[tBlocks]):
var blocks: array[SLOTS_PER_HISTORICAL_ROOT.int, BlockId]
for i in dag.getBlockRange(firstSlot.get(), 1, blocks)..<blocks.len:
if dag.getBlockSSZ(blocks[i], tmp):
group.update(e2, blocks[i].slot, tmp).get()
for i in 0..<ancestors.len():
let
ancestor = ancestors[ancestors.len - 1 - i]
e2s.appendRecord(db.getPhase0Block(ancestor.root).get()).get()
withTimer(timers[tState]):
dag.withUpdatedState(tmpState[], canonical) do:
withState(stateData.data):
group.finish(e2, state.data).get()
do: raiseAssert "withUpdatedState failed"
printTimers(true, timers)
type
# Validator performance metrics tool based on

View File

@ -100,12 +100,15 @@ suite "Beacon chain DB" & preset():
db.putBlock(signedBlock)
var tmp: seq[byte]
check:
db.containsBlock(root)
db.containsBlockPhase0(root)
not db.containsBlockAltair(root)
not db.containsBlockMerge(root)
db.getPhase0Block(root).get() == signedBlock
db.getPhase0BlockSSZ(root, tmp)
tmp == SSZ.encode(signedBlock)
db.delBlock(root)
check:
@ -114,6 +117,7 @@ suite "Beacon chain DB" & preset():
not db.containsBlockAltair(root)
not db.containsBlockMerge(root)
db.getPhase0Block(root).isErr()
not db.getPhase0BlockSSZ(root, tmp)
db.putStateRoot(root, signedBlock.message.slot, root)
var root2 = root
@ -135,12 +139,15 @@ suite "Beacon chain DB" & preset():
db.putBlock(signedBlock)
var tmp: seq[byte]
check:
db.containsBlock(root)
not db.containsBlockPhase0(root)
db.containsBlockAltair(root)
not db.containsBlockMerge(root)
db.getAltairBlock(root).get() == signedBlock
db.getAltairBlockSSZ(root, tmp)
tmp == SSZ.encode(signedBlock)
db.delBlock(root)
check:
@ -149,6 +156,7 @@ suite "Beacon chain DB" & preset():
not db.containsBlockAltair(root)
not db.containsBlockMerge(root)
db.getAltairBlock(root).isErr()
not db.getAltairBlockSSZ(root, tmp)
db.putStateRoot(root, signedBlock.message.slot, root)
var root2 = root
@ -170,12 +178,15 @@ suite "Beacon chain DB" & preset():
db.putBlock(signedBlock)
var tmp: seq[byte]
check:
db.containsBlock(root)
not db.containsBlockPhase0(root)
not db.containsBlockAltair(root)
db.containsBlockMerge(root)
db.getMergeBlock(root).get() == signedBlock
db.getMergeBlockSSZ(root, tmp)
tmp == SSZ.encode(signedBlock)
db.delBlock(root)
check:
@ -184,6 +195,7 @@ suite "Beacon chain DB" & preset():
not db.containsBlockAltair(root)
not db.containsBlockMerge(root)
db.getMergeBlock(root).isErr()
not db.getMergeBlockSSZ(root, tmp)
db.putStateRoot(root, signedBlock.message.slot, root)
var root2 = root