diff --git a/beacon_chain/era_db.nim b/beacon_chain/era_db.nim index 79f8b90ee..2eb983115 100644 --- a/beacon_chain/era_db.nim +++ b/beacon_chain/era_db.nim @@ -8,12 +8,12 @@ import std/os, chronicles, results, snappy, taskpools, - ../ncli/e2store, + ../ncli/era, ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[beaconstate, forks, signatures_batch], ./consensus_object_pools/block_dag # TODO move to somewhere else to avoid circular deps -export results, forks, e2store +export results, forks, era type EraFile* = ref object diff --git a/ncli/e2store.nim b/ncli/e2store.nim index 0dcb70045..b1e8f3df3 100644 --- a/ncli/e2store.nim +++ b/ncli/e2store.nim @@ -8,18 +8,11 @@ {.push raises: [].} import - std/strformat, results, - stew/[arrayops, endians2, io2], - snappy, - ../beacon_chain/spec/[beacon_time, forks], - ../beacon_chain/spec/eth2_ssz_serialization + stew/[arrayops, endians2, io2] export io2 -type - Era* = distinct uint64 # Time unit, similar to slot - const E2Version* = [byte 0x65, 0x32] E2Index* = [byte 0x69, 0x32] @@ -27,8 +20,6 @@ const SnappyBeaconBlock* = [byte 0x01, 0x00] SnappyBeaconState* = [byte 0x02, 0x00] - FAR_FUTURE_ERA* = Era(not 0'u64) - type Type* = array[2, byte] @@ -36,51 +27,16 @@ type typ*: Type len*: int - Index* = object - startSlot*: Slot - offsets*: seq[int64] # Absolute positions in file - -ethTimeUnit Era - -func era*(s: Slot): Era = - if s == FAR_FUTURE_SLOT: FAR_FUTURE_ERA - else: Era(s div SLOTS_PER_HISTORICAL_ROOT) - -func start_slot*(e: Era): Slot = - const maxEra = Era(FAR_FUTURE_SLOT div SLOTS_PER_HISTORICAL_ROOT) - if e >= maxEra: FAR_FUTURE_SLOT - else: Slot(e.uint64 * SLOTS_PER_HISTORICAL_ROOT) - -proc toString(v: IoErrorCode): string = +proc toString*(v: IoErrorCode): string = try: ioErrorMsg(v) except Exception as e: raiseAssert e.msg -func eraRoot*( - genesis_validators_root: Eth2Digest, - historical_roots: openArray[Eth2Digest], - historical_summaries: openArray[HistoricalSummary], - era: Era): Opt[Eth2Digest] = - if era == Era(0): ok(genesis_validators_root) - elif era <= historical_roots.lenu64(): - ok(historical_roots[int(uint64(era) - 1)]) - elif era <= historical_roots.lenu64() + historical_summaries.lenu64(): - ok(hash_tree_root( - historical_summaries[int(uint64(era) - 1) - historical_roots.len()])) - else: err() - -func eraFileName*( - cfg: RuntimeConfig, era: Era, eraRoot: Eth2Digest): string = - try: - &"{cfg.name()}-{era.uint64:05}-{shortLog(eraRoot)}.era" - except ValueError as exc: - raiseAssert exc.msg - -proc append(f: IoHandle, data: openArray[byte]): Result[void, string] = +proc append*(f: IoHandle, data: openArray[byte]): Result[void, string] = if (? writeFile(f, data).mapErr(toString)) != data.len.uint: return err("could not write data") ok() -proc appendHeader(f: IoHandle, typ: Type, dataLen: int): Result[int64, string] = +proc appendHeader*(f: IoHandle, typ: Type, dataLen: int): Result[int64, string] = if dataLen.uint64 > uint32.high: return err("entry does not fit 32-bit length") @@ -98,35 +54,6 @@ proc appendRecord*( ? append(f, data) ok(start) -proc toCompressedBytes(item: auto): seq[byte] = - snappy.encodeFramed(SSZ.encode(item)) - -proc appendRecord*( - f: IoHandle, v: ForkyTrustedSignedBeaconBlock): Result[int64, string] = - f.appendRecord(SnappyBeaconBlock, toCompressedBytes(v)) - -proc appendRecord*(f: IoHandle, v: ForkyBeaconState): Result[int64, string] = - f.appendRecord(SnappyBeaconState, toCompressedBytes(v)) - -proc appendIndex*( - f: IoHandle, startSlot: Slot, offsets: openArray[int64]): - Result[int64, string] = - let - len = offsets.len() * sizeof(int64) + 16 - pos = ? f.appendHeader(E2Index, len) - - ? f.append(startSlot.uint64.toBytesLE()) - - for v in offsets: - ? f.append(cast[uint64](v - pos).toBytesLE()) - - ? f.append(offsets.lenu64().toBytesLE()) - - ok(pos) - -proc appendRecord(f: IoHandle, index: Index): Result[int64, string] = - f.appendIndex(index.startSlot, index.offsets) - proc checkBytesLeft(f: IoHandle, expected: int64): Result[void, string] = let size = ? getFileSize(f).mapErr(toString) if expected > size: @@ -138,12 +65,12 @@ proc checkBytesLeft(f: IoHandle, expected: int64): Result[void, string] = ok() -proc readFileExact(f: IoHandle, buf: var openArray[byte]): Result[void, string] = +proc readFileExact*(f: IoHandle, buf: var openArray[byte]): Result[void, string] = if (? f.readFile(buf).mapErr(toString)) != buf.len().uint: return err("missing data") ok() -proc readHeader(f: IoHandle): Result[Header, string] = +proc readHeader*(f: IoHandle): Result[Header, string] = var buf: array[10, byte] ? readFileExact(f, buf.toOpenArray(0, 7)) @@ -197,81 +124,3 @@ proc findIndexStartOffset*(f: IoHandle): Result[int64, string] = ok(-bytes) -proc readIndex*(f: IoHandle): Result[Index, string] = - let - startPos = ? f.getFilePos().mapErr(toString) - fileSize = ? f.getFileSize().mapErr(toString) - header = ? f.readHeader() - - if header.typ != E2Index: return err("not an index") - if header.len < 16: return err("index entry too small") - if header.len mod 8 != 0: return err("index length invalid") - - var buf: array[8, byte] - ? f.readFileExact(buf) - let - slot = uint64.fromBytesLE(buf) - count = header.len div 8 - 2 - - var offsets = newSeqUninitialized[int64](count) - for i in 0.. fileSize: return err("Invalid offset") - offsets[i] = absolute - - ? f.readFileExact(buf) - if uint64(count) != uint64.fromBytesLE(buf): return err("invalid count") - - # technically not an error, but we'll throw this sanity check in here.. - if slot > int32.high().uint64: return err("fishy slot") - - ok(Index(startSlot: Slot(slot), offsets: offsets)) - -type - EraGroup* = object - slotIndex*: Index - -proc init*( - T: type EraGroup, f: IoHandle, startSlot: Option[Slot]): Result[T, string] = - discard ? f.appendHeader(E2Version, 0) - - ok(EraGroup( - slotIndex: Index( - startSlot: startSlot.get(Slot(0)), - offsets: newSeq[int64]( - if startSlot.isSome(): SLOTS_PER_HISTORICAL_ROOT.int - else: 0 - )))) - -proc update*( - g: var EraGroup, f: IoHandle, slot: Slot, szBytes: openArray[byte]): - Result[void, string] = - doAssert slot >= g.slotIndex.startSlot - # doAssert slot < g.slotIndex.startSlot + g.slotIndex.offsets.len - - g.slotIndex.offsets[int(slot - g.slotIndex.startSlot)] = - ? f.appendRecord(SnappyBeaconBlock, szBytes) - - ok() - -proc finish*( - g: var EraGroup, f: IoHandle, state: ForkyBeaconState): - Result[void, string] = - let - statePos = ? f.appendRecord(state) - - if state.slot > Slot(0): - discard ? f.appendRecord(g.slotIndex) - - discard ? f.appendIndex(state.slot, [statePos]) - - ok() diff --git a/ncli/era.nim b/ncli/era.nim new file mode 100644 index 000000000..e7dba5e93 --- /dev/null +++ b/ncli/era.nim @@ -0,0 +1,171 @@ +# beacon_chain +# Copyright (c) 2021-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + std/strformat, + results, + stew/[endians2, io2], + snappy, + ../beacon_chain/spec/[beacon_time, forks], + ../beacon_chain/spec/eth2_ssz_serialization, + ./e2store + +export + io2, + e2store.readRecord, e2store.findIndexStartOffset, + e2store.SnappyBeaconBlock, e2store.SnappyBeaconState + +type + Era* = distinct uint64 # Time unit, similar to slot + + Index* = object + startSlot*: Slot + offsets*: seq[int64] # Absolute positions in file + +const + FAR_FUTURE_ERA* = Era(not 0'u64) + +ethTimeUnit Era + +func era*(s: Slot): Era = + if s == FAR_FUTURE_SLOT: FAR_FUTURE_ERA + else: Era(s div SLOTS_PER_HISTORICAL_ROOT) + +func start_slot*(e: Era): Slot = + const maxEra = Era(FAR_FUTURE_SLOT div SLOTS_PER_HISTORICAL_ROOT) + if e >= maxEra: FAR_FUTURE_SLOT + else: Slot(e.uint64 * SLOTS_PER_HISTORICAL_ROOT) + +func eraRoot*( + genesis_validators_root: Eth2Digest, + historical_roots: openArray[Eth2Digest], + historical_summaries: openArray[HistoricalSummary], + era: Era): Opt[Eth2Digest] = + if era == Era(0): ok(genesis_validators_root) + elif era <= historical_roots.lenu64(): + ok(historical_roots[int(uint64(era) - 1)]) + elif era <= historical_roots.lenu64() + historical_summaries.lenu64(): + ok(hash_tree_root( + historical_summaries[int(uint64(era) - 1) - historical_roots.len()])) + else: err() + +func eraFileName*( + cfg: RuntimeConfig, era: Era, eraRoot: Eth2Digest): string = + try: + &"{cfg.name()}-{era.uint64:05}-{shortLog(eraRoot)}.era" + except ValueError as exc: + raiseAssert exc.msg + +proc toCompressedBytes(item: auto): seq[byte] = + snappy.encodeFramed(SSZ.encode(item)) + +proc appendRecord*( + f: IoHandle, v: ForkyTrustedSignedBeaconBlock): Result[int64, string] = + f.appendRecord(SnappyBeaconBlock, toCompressedBytes(v)) + +proc appendRecord*(f: IoHandle, v: ForkyBeaconState): Result[int64, string] = + f.appendRecord(SnappyBeaconState, toCompressedBytes(v)) + +proc appendIndex*( + f: IoHandle, startSlot: Slot, offsets: openArray[int64]): + Result[int64, string] = + let + len = offsets.len() * sizeof(int64) + 16 + pos = ? f.appendHeader(E2Index, len) + + ? f.append(startSlot.uint64.toBytesLE()) + + for v in offsets: + ? f.append(cast[uint64](v - pos).toBytesLE()) + + ? f.append(offsets.lenu64().toBytesLE()) + + ok(pos) + +proc appendRecord(f: IoHandle, index: Index): Result[int64, string] = + f.appendIndex(index.startSlot, index.offsets) + +proc readIndex*(f: IoHandle): Result[Index, string] = + let + startPos = ? f.getFilePos().mapErr(toString) + fileSize = ? f.getFileSize().mapErr(toString) + header = ? f.readHeader() + + if header.typ != E2Index: return err("not an index") + if header.len < 16: return err("index entry too small") + if header.len mod 8 != 0: return err("index length invalid") + + var buf: array[8, byte] + ? f.readFileExact(buf) + let + slot = uint64.fromBytesLE(buf) + count = header.len div 8 - 2 + + var offsets = newSeqUninitialized[int64](count) + for i in 0.. fileSize: return err("Invalid offset") + offsets[i] = absolute + + ? f.readFileExact(buf) + if uint64(count) != uint64.fromBytesLE(buf): return err("invalid count") + + # technically not an error, but we'll throw this sanity check in here.. + if slot > int32.high().uint64: return err("fishy slot") + + ok(Index(startSlot: Slot(slot), offsets: offsets)) + +type + EraGroup* = object + slotIndex*: Index + +proc init*( + T: type EraGroup, f: IoHandle, startSlot: Option[Slot]): Result[T, string] = + discard ? f.appendHeader(E2Version, 0) + + ok(EraGroup( + slotIndex: Index( + startSlot: startSlot.get(Slot(0)), + offsets: newSeq[int64]( + if startSlot.isSome(): SLOTS_PER_HISTORICAL_ROOT.int + else: 0 + )))) + +proc update*( + g: var EraGroup, f: IoHandle, slot: Slot, szBytes: openArray[byte]): + Result[void, string] = + doAssert slot >= g.slotIndex.startSlot + # doAssert slot < g.slotIndex.startSlot + g.slotIndex.offsets.len + + g.slotIndex.offsets[int(slot - g.slotIndex.startSlot)] = + ? f.appendRecord(SnappyBeaconBlock, szBytes) + + ok() + +proc finish*( + g: var EraGroup, f: IoHandle, state: ForkyBeaconState): + Result[void, string] = + let + statePos = ? f.appendRecord(state) + + if state.slot > Slot(0): + discard ? f.appendRecord(g.slotIndex) + + discard ? f.appendIndex(state.slot, [statePos]) + + ok() diff --git a/ncli/ncli_db.nim b/ncli/ncli_db.nim index a807bb0e1..3fb028c20 100644 --- a/ncli/ncli_db.nim +++ b/ncli/ncli_db.nim @@ -17,7 +17,7 @@ import ssz_codec], ../beacon_chain/sszdump, ../research/simutils, - ./e2store, ./ncli_common, ./validator_db_aggregator + ./era, ./ncli_common, ./validator_db_aggregator when defined(posix): import system/ansi_c