# Copyright (c) 2018-2023 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. import std/os, chronicles, stew/results, snappy, taskpools, ../ncli/e2store, ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[beaconstate, forks, signatures_batch], ./consensus_object_pools/block_dag # TODO move to somewhere else to avoid circular deps export results, forks, e2store type EraFile* = ref object handle: Opt[IoHandle] stateIdx: Index blockIdx: Index EraDB* = ref object ## The Era database manages a collection of era files that together make up ## a linear history of beacon chain data. cfg: RuntimeConfig path: string genesis_validators_root: Eth2Digest files: seq[EraFile] proc open*(_: type EraFile, name: string): Result[EraFile, string] = var f = Opt[IoHandle].ok(? openFile(name, {OpenFlags.Read}).mapErr(ioErrorMsg)) defer: if f.isSome(): discard closeFile(f[]) # Indices can be found at the end of each era file - we only support # single-era files for now ? f[].setFilePos(0, SeekPosition.SeekEnd).mapErr(ioErrorMsg) # Last in the file is the state index let stateIdxPos = ? f[].findIndexStartOffset() ? f[].setFilePos(stateIdxPos, SeekPosition.SeekCurrent).mapErr(ioErrorMsg) let stateIdx = ? f[].readIndex() if stateIdx.offsets.len() != 1: return err("State index length invalid") ? f[].setFilePos(stateIdxPos, SeekPosition.SeekCurrent).mapErr(ioErrorMsg) # The genesis era file does not contain a block index let blockIdx = if stateIdx.startSlot > 0: let blockIdxPos = ? f[].findIndexStartOffset() ? f[].setFilePos(blockIdxPos, SeekPosition.SeekCurrent).mapErr(ioErrorMsg) let idx = ? f[].readIndex() if idx.offsets.lenu64() != SLOTS_PER_HISTORICAL_ROOT: return err("Block index length invalid") idx else: Index() let res = EraFile(handle: f, stateIdx: stateIdx, blockIdx: blockIdx) reset(f) ok res proc close(f: EraFile) = if f.handle.isSome(): discard closeFile(f.handle.get()) reset(f.handle) proc getBlockSZ*( f: EraFile, slot: Slot, bytes: var seq[byte]): Result[void, string] = ## Get a snappy-frame-compressed version of the block data - may overwrite ## `bytes` on error ## ## Sets `bytes` to an empty seq and returns success if there is no block at ## the given slot, according to the index # Block content for the blocks of an era is found in the file for the _next_ # era doAssert not isNil(f) and f[].handle.isSome let pos = f[].blockIdx.offsets[slot - f[].blockIdx.startSlot] if pos == 0: bytes = @[] return ok() ? f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) let header = ? f[].handle.get().readRecord(bytes) if header.typ != SnappyBeaconBlock: return err("Invalid era file: didn't find block at index position") ok() proc getBlockSSZ*( f: EraFile, slot: Slot, bytes: var seq[byte]): Result[void, string] = var tmp: seq[byte] ? f.getBlockSZ(slot, tmp) let len = uncompressedLenFramed(tmp).valueOr: return err("Cannot read uncompressed length, era file corrupt?") if len > int.high.uint64: return err("Invalid uncompressed size") bytes = newSeqUninitialized[byte](len) # Where it matters, we will integrity-check the data with SSZ - no # need to waste cycles on crc32 discard uncompressFramed(tmp, bytes, checkIntegrity = false).valueOr: return err("Block failed to decompress, era file corrupt?") ok() proc getStateSZ*( f: EraFile, slot: Slot, bytes: var seq[byte]): Result[void, string] = ## Get a snappy-frame-compressed version of the state data - may overwrite ## `bytes` on error ## https://github.com/google/snappy/blob/8dd58a519f79f0742d4c68fbccb2aed2ddb651e8/framing_format.txt#L34 doAssert not isNil(f) and f[].handle.isSome # TODO consider multi-era files if f[].stateIdx.startSlot != slot: return err("State not found in era file") let pos = f[].stateIdx.offsets[0] if pos == 0: return err("No state at given slot") ? f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) let header = ? f[].handle.get().readRecord(bytes) if header.typ != SnappyBeaconState: return err("Invalid era file: didn't find state at index position") ok() proc getStateSSZ*( f: EraFile, slot: Slot, bytes: var seq[byte], partial = Opt.none(int)): Result[void, string] = var tmp: seq[byte] ? f.getStateSZ(slot, tmp) let len = uncompressedLenFramed(tmp).valueOr: return err("Cannot read uncompressed length, era file corrupt?") wanted = if partial.isSome(): min(len, partial.get().uint64 + maxUncompressedFrameDataLen - 1) else: len bytes = newSeqUninitialized[byte](wanted) # Where it matters, we will integrity-check the data with SSZ - no # need to waste cycles on crc32 discard uncompressFramed(tmp, bytes, checkIntegrity = false).valueOr: return err("State failed to decompress, era file corrupt?") ok() proc verify*(f: EraFile, cfg: RuntimeConfig): Result[Eth2Digest, string] = ## Verify that an era file is internally consistent, returning the state root ## Verification is dominated by block signature checks - about 4-10s on ## decent hardware. # We'll load the full state and compute its root - then we'll load the blocks # and make sure that they match the state and that their signatures check out let startSlot = f.stateIdx.startSlot era = startSlot.era rng = HmacDrbgContext.new() taskpool = Taskpool.new() var verifier = BatchVerifier.init(rng, taskpool) var tmp: seq[byte] ? f.getStateSSZ(startSlot, tmp) let state = try: newClone(readSszForkedHashedBeaconState(cfg, tmp)) except CatchableError as exc: return err("Unable to read state: " & exc.msg) if era > 0: var sigs: seq[SignatureSet] for slot in (era - 1).start_slot().. 0: let blck = try: newClone(readSszForkedSignedBeaconBlock(cfg, tmp)) except CatchableError as exc: return err("Unable to read block: " & exc.msg) if getForkedBlockField(blck[], slot) != slot: return err("Block slot does not match era index") if blck[].root != state[].get_block_root_at_slot(getForkedBlockField(blck[], slot)): return err("Block does not match state") if slot > GENESIS_SLOT: let proposer = getForkedBlockField(blck[], proposer_index) key = withState(state[]): if proposer >= forkyState.data.validators.lenu64: return err("Invalid proposer in block") forkyState.data.validators.item(proposer).pubkey cooked = key.load() sig = blck[].signature.load() if cooked.isNone(): return err("Cannot load proposer key") if sig.isNone(): warn "Signature invalid", sig = blck[].signature, blck = shortLog(blck[]) return err("Cannot load block signature") # Batch-verification more than doubles total verification speed sigs.add block_signature_set( cfg.forkAtEpoch(slot.epoch), getStateField(state[], genesis_validators_root), slot, blck[].root, cooked.get(), sig.get()) else: # slot == GENESIS_SLOT: if blck[].signature != default(type(blck[].signature)): return err("Genesis slot signature not empty") if not batchVerify(verifier, sigs): return err("Invalid block signature") ok(getStateRoot(state[])) proc getEraFile( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], era: Era): Result[EraFile, string] = for f in db.files: if f.stateIdx.startSlot.era == era: return ok(f) let eraRoot = eraRoot( db.genesis_validators_root, historical_roots, historical_summaries, era).valueOr: return err("Era outside of known history") name = eraFileName(db.cfg, era, eraRoot) path = db.path / name if not isFile(path): return err("No such era file") let f = EraFile.open(path).valueOr: # TODO allow caller to differentiate between invalid and missing era file, # then move logging elsewhere warn "Failed to open era file", path, error = error return err(error) if db.files.len > 16: # TODO LRU close(db.files[0]) db.files.delete(0) db.files.add(f) ok(f) proc getBlockSZ*( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], slot: Slot, bytes: var seq[byte]): Result[void, string] = ## Get a snappy-frame-compressed version of the block data - may overwrite ## `bytes` on error ## ## Sets `bytes` to an empty seq and returns success if there is no block at ## the given slot, according to the index # Block content for the blocks of an era is found in the file for the _next_ # era let f = ? db.getEraFile(historical_roots, historical_summaries, slot.era + 1) f.getBlockSZ(slot, bytes) proc getBlockSSZ*( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], slot: Slot, bytes: var seq[byte]): Result[void, string] = let f = ? db.getEraFile(historical_roots, historical_summaries, slot.era + 1) f.getBlockSSZ(slot, bytes) proc getBlock*( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], slot: Slot, root: Opt[Eth2Digest], T: type ForkyTrustedSignedBeaconBlock): Opt[T] = var tmp: seq[byte] ? db.getBlockSSZ( historical_roots, historical_summaries, slot, tmp).mapErr(proc(x: auto) = discard) result.ok(default(T)) try: readSszBytes(tmp, result.get(), updateRoot = root.isNone) if root.isSome(): result.get().root = root.get() except CatchableError: result.err() proc getStateSZ*( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], slot: Slot, bytes: var seq[byte]): Result[void, string] = ## Get a snappy-frame-compressed version of the state data - may overwrite ## `bytes` on error ## https://github.com/google/snappy/blob/8dd58a519f79f0742d4c68fbccb2aed2ddb651e8/framing_format.txt#L34 # Block content for the blocks of an era is found in the file for the _next_ # era let f = ? db.getEraFile(historical_roots, historical_summaries, slot.era) f.getStateSZ(slot, bytes) proc getStateSSZ*( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], slot: Slot, bytes: var seq[byte], partial = Opt.none(int)): Result[void, string] = let f = ? db.getEraFile(historical_roots, historical_summaries, slot.era) f.getStateSSZ(slot, bytes, partial) proc getState*( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], slot: Slot, state: var ForkedHashedBeaconState): Result[void, string] = var bytes: seq[byte] ? db.getStateSSZ(historical_roots, historical_summaries, slot, bytes) try: state = readSszForkedHashedBeaconState(db.cfg, slot, bytes) ok() except CatchableError as exc: err(exc.msg) type PartialBeaconState = object # The first bytes of a beacon state object are (for now) shared between all # forks - we exploit this to speed up loading # Versioning genesis_time*: uint64 genesis_validators_root*: Eth2Digest slot*: Slot fork*: Fork # History latest_block_header*: BeaconBlockHeader ##\ ## `latest_block_header.state_root == ZERO_HASH` temporarily block_roots*: HashArray[Limit SLOTS_PER_HISTORICAL_ROOT, Eth2Digest] ##\ ## Needed to process attestations, older to newer proc getPartialState( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], slot: Slot, output: var PartialBeaconState): bool = static: doAssert isFixedSize(PartialBeaconState) const partialBytes = fixedPortionSize(PartialBeaconState) # TODO we don't need to read all bytes: ideally we could use something like # faststreams to read uncompressed bytes up to a limit and it would take care # of reading the minimal number of bytes from disk var tmp: seq[byte] if (let e = db.getStateSSZ( historical_roots, historical_summaries, slot, tmp, Opt[int].ok(partialBytes)); e.isErr): return false try: readSszBytes(tmp.toOpenArray(0, partialBytes - 1), output) true except CatchableError: # TODO log? false iterator getBlockIds*( db: EraDB, historical_roots: openArray[Eth2Digest], historical_summaries: openArray[HistoricalSummary], start_slot: Slot, prev_root: Eth2Digest): BlockId = ## Iterate over block roots starting from the given slot - `prev_root` must ## point out the last block added to the chain before `start_slot` such that ## empty slots can be filtered out correctly var state = (ref PartialBeaconState)() # avoid stack overflow slot = start_slot prev_root = prev_root while true: # `case` ensures we're on a fork for which the `PartialBeaconState` # definition is consistent case db.cfg.consensusForkAtEpoch(slot.epoch) of ConsensusFork.Phase0 .. ConsensusFork.Deneb: let stateSlot = (slot.era() + 1).start_slot() if not getPartialState( db, historical_roots, historical_summaries, stateSlot, state[]): state = nil # No `return` in iterators if state == nil: break let x = slot.uint64 mod state[].block_roots.lenu64 for i in x..