Split era specific code from e2s specific code (#5866)
This commit is contained in:
parent
a746063a61
commit
dca444bea7
|
@ -8,12 +8,12 @@ import
|
||||||
std/os,
|
std/os,
|
||||||
chronicles,
|
chronicles,
|
||||||
results, snappy, taskpools,
|
results, snappy, taskpools,
|
||||||
../ncli/e2store,
|
../ncli/era,
|
||||||
./spec/datatypes/[altair, bellatrix, phase0],
|
./spec/datatypes/[altair, bellatrix, phase0],
|
||||||
./spec/[beaconstate, forks, signatures_batch],
|
./spec/[beaconstate, forks, signatures_batch],
|
||||||
./consensus_object_pools/block_dag # TODO move to somewhere else to avoid circular deps
|
./consensus_object_pools/block_dag # TODO move to somewhere else to avoid circular deps
|
||||||
|
|
||||||
export results, forks, e2store
|
export results, forks, era
|
||||||
|
|
||||||
type
|
type
|
||||||
EraFile* = ref object
|
EraFile* = ref object
|
||||||
|
|
163
ncli/e2store.nim
163
ncli/e2store.nim
|
@ -8,18 +8,11 @@
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/strformat,
|
|
||||||
results,
|
results,
|
||||||
stew/[arrayops, endians2, io2],
|
stew/[arrayops, endians2, io2]
|
||||||
snappy,
|
|
||||||
../beacon_chain/spec/[beacon_time, forks],
|
|
||||||
../beacon_chain/spec/eth2_ssz_serialization
|
|
||||||
|
|
||||||
export io2
|
export io2
|
||||||
|
|
||||||
type
|
|
||||||
Era* = distinct uint64 # Time unit, similar to slot
|
|
||||||
|
|
||||||
const
|
const
|
||||||
E2Version* = [byte 0x65, 0x32]
|
E2Version* = [byte 0x65, 0x32]
|
||||||
E2Index* = [byte 0x69, 0x32]
|
E2Index* = [byte 0x69, 0x32]
|
||||||
|
@ -27,8 +20,6 @@ const
|
||||||
SnappyBeaconBlock* = [byte 0x01, 0x00]
|
SnappyBeaconBlock* = [byte 0x01, 0x00]
|
||||||
SnappyBeaconState* = [byte 0x02, 0x00]
|
SnappyBeaconState* = [byte 0x02, 0x00]
|
||||||
|
|
||||||
FAR_FUTURE_ERA* = Era(not 0'u64)
|
|
||||||
|
|
||||||
type
|
type
|
||||||
Type* = array[2, byte]
|
Type* = array[2, byte]
|
||||||
|
|
||||||
|
@ -36,51 +27,16 @@ type
|
||||||
typ*: Type
|
typ*: Type
|
||||||
len*: int
|
len*: int
|
||||||
|
|
||||||
Index* = object
|
proc toString*(v: IoErrorCode): string =
|
||||||
startSlot*: Slot
|
|
||||||
offsets*: seq[int64] # Absolute positions in file
|
|
||||||
|
|
||||||
ethTimeUnit Era
|
|
||||||
|
|
||||||
func era*(s: Slot): Era =
|
|
||||||
if s == FAR_FUTURE_SLOT: FAR_FUTURE_ERA
|
|
||||||
else: Era(s div SLOTS_PER_HISTORICAL_ROOT)
|
|
||||||
|
|
||||||
func start_slot*(e: Era): Slot =
|
|
||||||
const maxEra = Era(FAR_FUTURE_SLOT div SLOTS_PER_HISTORICAL_ROOT)
|
|
||||||
if e >= maxEra: FAR_FUTURE_SLOT
|
|
||||||
else: Slot(e.uint64 * SLOTS_PER_HISTORICAL_ROOT)
|
|
||||||
|
|
||||||
proc toString(v: IoErrorCode): string =
|
|
||||||
try: ioErrorMsg(v)
|
try: ioErrorMsg(v)
|
||||||
except Exception as e: raiseAssert e.msg
|
except Exception as e: raiseAssert e.msg
|
||||||
|
|
||||||
func eraRoot*(
|
proc append*(f: IoHandle, data: openArray[byte]): Result[void, string] =
|
||||||
genesis_validators_root: Eth2Digest,
|
|
||||||
historical_roots: openArray[Eth2Digest],
|
|
||||||
historical_summaries: openArray[HistoricalSummary],
|
|
||||||
era: Era): Opt[Eth2Digest] =
|
|
||||||
if era == Era(0): ok(genesis_validators_root)
|
|
||||||
elif era <= historical_roots.lenu64():
|
|
||||||
ok(historical_roots[int(uint64(era) - 1)])
|
|
||||||
elif era <= historical_roots.lenu64() + historical_summaries.lenu64():
|
|
||||||
ok(hash_tree_root(
|
|
||||||
historical_summaries[int(uint64(era) - 1) - historical_roots.len()]))
|
|
||||||
else: err()
|
|
||||||
|
|
||||||
func eraFileName*(
|
|
||||||
cfg: RuntimeConfig, era: Era, eraRoot: Eth2Digest): string =
|
|
||||||
try:
|
|
||||||
&"{cfg.name()}-{era.uint64:05}-{shortLog(eraRoot)}.era"
|
|
||||||
except ValueError as exc:
|
|
||||||
raiseAssert exc.msg
|
|
||||||
|
|
||||||
proc append(f: IoHandle, data: openArray[byte]): Result[void, string] =
|
|
||||||
if (? writeFile(f, data).mapErr(toString)) != data.len.uint:
|
if (? writeFile(f, data).mapErr(toString)) != data.len.uint:
|
||||||
return err("could not write data")
|
return err("could not write data")
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc appendHeader(f: IoHandle, typ: Type, dataLen: int): Result[int64, string] =
|
proc appendHeader*(f: IoHandle, typ: Type, dataLen: int): Result[int64, string] =
|
||||||
if dataLen.uint64 > uint32.high:
|
if dataLen.uint64 > uint32.high:
|
||||||
return err("entry does not fit 32-bit length")
|
return err("entry does not fit 32-bit length")
|
||||||
|
|
||||||
|
@ -98,35 +54,6 @@ proc appendRecord*(
|
||||||
? append(f, data)
|
? append(f, data)
|
||||||
ok(start)
|
ok(start)
|
||||||
|
|
||||||
proc toCompressedBytes(item: auto): seq[byte] =
|
|
||||||
snappy.encodeFramed(SSZ.encode(item))
|
|
||||||
|
|
||||||
proc appendRecord*(
|
|
||||||
f: IoHandle, v: ForkyTrustedSignedBeaconBlock): Result[int64, string] =
|
|
||||||
f.appendRecord(SnappyBeaconBlock, toCompressedBytes(v))
|
|
||||||
|
|
||||||
proc appendRecord*(f: IoHandle, v: ForkyBeaconState): Result[int64, string] =
|
|
||||||
f.appendRecord(SnappyBeaconState, toCompressedBytes(v))
|
|
||||||
|
|
||||||
proc appendIndex*(
|
|
||||||
f: IoHandle, startSlot: Slot, offsets: openArray[int64]):
|
|
||||||
Result[int64, string] =
|
|
||||||
let
|
|
||||||
len = offsets.len() * sizeof(int64) + 16
|
|
||||||
pos = ? f.appendHeader(E2Index, len)
|
|
||||||
|
|
||||||
? f.append(startSlot.uint64.toBytesLE())
|
|
||||||
|
|
||||||
for v in offsets:
|
|
||||||
? f.append(cast[uint64](v - pos).toBytesLE())
|
|
||||||
|
|
||||||
? f.append(offsets.lenu64().toBytesLE())
|
|
||||||
|
|
||||||
ok(pos)
|
|
||||||
|
|
||||||
proc appendRecord(f: IoHandle, index: Index): Result[int64, string] =
|
|
||||||
f.appendIndex(index.startSlot, index.offsets)
|
|
||||||
|
|
||||||
proc checkBytesLeft(f: IoHandle, expected: int64): Result[void, string] =
|
proc checkBytesLeft(f: IoHandle, expected: int64): Result[void, string] =
|
||||||
let size = ? getFileSize(f).mapErr(toString)
|
let size = ? getFileSize(f).mapErr(toString)
|
||||||
if expected > size:
|
if expected > size:
|
||||||
|
@ -138,12 +65,12 @@ proc checkBytesLeft(f: IoHandle, expected: int64): Result[void, string] =
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc readFileExact(f: IoHandle, buf: var openArray[byte]): Result[void, string] =
|
proc readFileExact*(f: IoHandle, buf: var openArray[byte]): Result[void, string] =
|
||||||
if (? f.readFile(buf).mapErr(toString)) != buf.len().uint:
|
if (? f.readFile(buf).mapErr(toString)) != buf.len().uint:
|
||||||
return err("missing data")
|
return err("missing data")
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc readHeader(f: IoHandle): Result[Header, string] =
|
proc readHeader*(f: IoHandle): Result[Header, string] =
|
||||||
var buf: array[10, byte]
|
var buf: array[10, byte]
|
||||||
? readFileExact(f, buf.toOpenArray(0, 7))
|
? readFileExact(f, buf.toOpenArray(0, 7))
|
||||||
|
|
||||||
|
@ -197,81 +124,3 @@ proc findIndexStartOffset*(f: IoHandle): Result[int64, string] =
|
||||||
|
|
||||||
ok(-bytes)
|
ok(-bytes)
|
||||||
|
|
||||||
proc readIndex*(f: IoHandle): Result[Index, string] =
|
|
||||||
let
|
|
||||||
startPos = ? f.getFilePos().mapErr(toString)
|
|
||||||
fileSize = ? f.getFileSize().mapErr(toString)
|
|
||||||
header = ? f.readHeader()
|
|
||||||
|
|
||||||
if header.typ != E2Index: return err("not an index")
|
|
||||||
if header.len < 16: return err("index entry too small")
|
|
||||||
if header.len mod 8 != 0: return err("index length invalid")
|
|
||||||
|
|
||||||
var buf: array[8, byte]
|
|
||||||
? f.readFileExact(buf)
|
|
||||||
let
|
|
||||||
slot = uint64.fromBytesLE(buf)
|
|
||||||
count = header.len div 8 - 2
|
|
||||||
|
|
||||||
var offsets = newSeqUninitialized[int64](count)
|
|
||||||
for i in 0..<count:
|
|
||||||
? f.readFileExact(buf)
|
|
||||||
|
|
||||||
let
|
|
||||||
offset = uint64.fromBytesLE(buf)
|
|
||||||
absolute =
|
|
||||||
if offset == 0: 0'i64
|
|
||||||
else:
|
|
||||||
# Wrapping math is actually convenient here
|
|
||||||
cast[int64](cast[uint64](startPos) + offset)
|
|
||||||
|
|
||||||
if absolute < 0 or absolute > fileSize: return err("Invalid offset")
|
|
||||||
offsets[i] = absolute
|
|
||||||
|
|
||||||
? f.readFileExact(buf)
|
|
||||||
if uint64(count) != uint64.fromBytesLE(buf): return err("invalid count")
|
|
||||||
|
|
||||||
# technically not an error, but we'll throw this sanity check in here..
|
|
||||||
if slot > int32.high().uint64: return err("fishy slot")
|
|
||||||
|
|
||||||
ok(Index(startSlot: Slot(slot), offsets: offsets))
|
|
||||||
|
|
||||||
type
|
|
||||||
EraGroup* = object
|
|
||||||
slotIndex*: Index
|
|
||||||
|
|
||||||
proc init*(
|
|
||||||
T: type EraGroup, f: IoHandle, startSlot: Option[Slot]): Result[T, string] =
|
|
||||||
discard ? f.appendHeader(E2Version, 0)
|
|
||||||
|
|
||||||
ok(EraGroup(
|
|
||||||
slotIndex: Index(
|
|
||||||
startSlot: startSlot.get(Slot(0)),
|
|
||||||
offsets: newSeq[int64](
|
|
||||||
if startSlot.isSome(): SLOTS_PER_HISTORICAL_ROOT.int
|
|
||||||
else: 0
|
|
||||||
))))
|
|
||||||
|
|
||||||
proc update*(
|
|
||||||
g: var EraGroup, f: IoHandle, slot: Slot, szBytes: openArray[byte]):
|
|
||||||
Result[void, string] =
|
|
||||||
doAssert slot >= g.slotIndex.startSlot
|
|
||||||
# doAssert slot < g.slotIndex.startSlot + g.slotIndex.offsets.len
|
|
||||||
|
|
||||||
g.slotIndex.offsets[int(slot - g.slotIndex.startSlot)] =
|
|
||||||
? f.appendRecord(SnappyBeaconBlock, szBytes)
|
|
||||||
|
|
||||||
ok()
|
|
||||||
|
|
||||||
proc finish*(
|
|
||||||
g: var EraGroup, f: IoHandle, state: ForkyBeaconState):
|
|
||||||
Result[void, string] =
|
|
||||||
let
|
|
||||||
statePos = ? f.appendRecord(state)
|
|
||||||
|
|
||||||
if state.slot > Slot(0):
|
|
||||||
discard ? f.appendRecord(g.slotIndex)
|
|
||||||
|
|
||||||
discard ? f.appendIndex(state.slot, [statePos])
|
|
||||||
|
|
||||||
ok()
|
|
||||||
|
|
|
@ -0,0 +1,171 @@
|
||||||
|
# beacon_chain
|
||||||
|
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/strformat,
|
||||||
|
results,
|
||||||
|
stew/[endians2, io2],
|
||||||
|
snappy,
|
||||||
|
../beacon_chain/spec/[beacon_time, forks],
|
||||||
|
../beacon_chain/spec/eth2_ssz_serialization,
|
||||||
|
./e2store
|
||||||
|
|
||||||
|
export
|
||||||
|
io2,
|
||||||
|
e2store.readRecord, e2store.findIndexStartOffset,
|
||||||
|
e2store.SnappyBeaconBlock, e2store.SnappyBeaconState
|
||||||
|
|
||||||
|
type
|
||||||
|
Era* = distinct uint64 # Time unit, similar to slot
|
||||||
|
|
||||||
|
Index* = object
|
||||||
|
startSlot*: Slot
|
||||||
|
offsets*: seq[int64] # Absolute positions in file
|
||||||
|
|
||||||
|
const
|
||||||
|
FAR_FUTURE_ERA* = Era(not 0'u64)
|
||||||
|
|
||||||
|
ethTimeUnit Era
|
||||||
|
|
||||||
|
func era*(s: Slot): Era =
|
||||||
|
if s == FAR_FUTURE_SLOT: FAR_FUTURE_ERA
|
||||||
|
else: Era(s div SLOTS_PER_HISTORICAL_ROOT)
|
||||||
|
|
||||||
|
func start_slot*(e: Era): Slot =
|
||||||
|
const maxEra = Era(FAR_FUTURE_SLOT div SLOTS_PER_HISTORICAL_ROOT)
|
||||||
|
if e >= maxEra: FAR_FUTURE_SLOT
|
||||||
|
else: Slot(e.uint64 * SLOTS_PER_HISTORICAL_ROOT)
|
||||||
|
|
||||||
|
func eraRoot*(
|
||||||
|
genesis_validators_root: Eth2Digest,
|
||||||
|
historical_roots: openArray[Eth2Digest],
|
||||||
|
historical_summaries: openArray[HistoricalSummary],
|
||||||
|
era: Era): Opt[Eth2Digest] =
|
||||||
|
if era == Era(0): ok(genesis_validators_root)
|
||||||
|
elif era <= historical_roots.lenu64():
|
||||||
|
ok(historical_roots[int(uint64(era) - 1)])
|
||||||
|
elif era <= historical_roots.lenu64() + historical_summaries.lenu64():
|
||||||
|
ok(hash_tree_root(
|
||||||
|
historical_summaries[int(uint64(era) - 1) - historical_roots.len()]))
|
||||||
|
else: err()
|
||||||
|
|
||||||
|
func eraFileName*(
|
||||||
|
cfg: RuntimeConfig, era: Era, eraRoot: Eth2Digest): string =
|
||||||
|
try:
|
||||||
|
&"{cfg.name()}-{era.uint64:05}-{shortLog(eraRoot)}.era"
|
||||||
|
except ValueError as exc:
|
||||||
|
raiseAssert exc.msg
|
||||||
|
|
||||||
|
proc toCompressedBytes(item: auto): seq[byte] =
|
||||||
|
snappy.encodeFramed(SSZ.encode(item))
|
||||||
|
|
||||||
|
proc appendRecord*(
|
||||||
|
f: IoHandle, v: ForkyTrustedSignedBeaconBlock): Result[int64, string] =
|
||||||
|
f.appendRecord(SnappyBeaconBlock, toCompressedBytes(v))
|
||||||
|
|
||||||
|
proc appendRecord*(f: IoHandle, v: ForkyBeaconState): Result[int64, string] =
|
||||||
|
f.appendRecord(SnappyBeaconState, toCompressedBytes(v))
|
||||||
|
|
||||||
|
proc appendIndex*(
|
||||||
|
f: IoHandle, startSlot: Slot, offsets: openArray[int64]):
|
||||||
|
Result[int64, string] =
|
||||||
|
let
|
||||||
|
len = offsets.len() * sizeof(int64) + 16
|
||||||
|
pos = ? f.appendHeader(E2Index, len)
|
||||||
|
|
||||||
|
? f.append(startSlot.uint64.toBytesLE())
|
||||||
|
|
||||||
|
for v in offsets:
|
||||||
|
? f.append(cast[uint64](v - pos).toBytesLE())
|
||||||
|
|
||||||
|
? f.append(offsets.lenu64().toBytesLE())
|
||||||
|
|
||||||
|
ok(pos)
|
||||||
|
|
||||||
|
proc appendRecord(f: IoHandle, index: Index): Result[int64, string] =
|
||||||
|
f.appendIndex(index.startSlot, index.offsets)
|
||||||
|
|
||||||
|
proc readIndex*(f: IoHandle): Result[Index, string] =
|
||||||
|
let
|
||||||
|
startPos = ? f.getFilePos().mapErr(toString)
|
||||||
|
fileSize = ? f.getFileSize().mapErr(toString)
|
||||||
|
header = ? f.readHeader()
|
||||||
|
|
||||||
|
if header.typ != E2Index: return err("not an index")
|
||||||
|
if header.len < 16: return err("index entry too small")
|
||||||
|
if header.len mod 8 != 0: return err("index length invalid")
|
||||||
|
|
||||||
|
var buf: array[8, byte]
|
||||||
|
? f.readFileExact(buf)
|
||||||
|
let
|
||||||
|
slot = uint64.fromBytesLE(buf)
|
||||||
|
count = header.len div 8 - 2
|
||||||
|
|
||||||
|
var offsets = newSeqUninitialized[int64](count)
|
||||||
|
for i in 0..<count:
|
||||||
|
? f.readFileExact(buf)
|
||||||
|
|
||||||
|
let
|
||||||
|
offset = uint64.fromBytesLE(buf)
|
||||||
|
absolute =
|
||||||
|
if offset == 0: 0'i64
|
||||||
|
else:
|
||||||
|
# Wrapping math is actually convenient here
|
||||||
|
cast[int64](cast[uint64](startPos) + offset)
|
||||||
|
|
||||||
|
if absolute < 0 or absolute > fileSize: return err("Invalid offset")
|
||||||
|
offsets[i] = absolute
|
||||||
|
|
||||||
|
? f.readFileExact(buf)
|
||||||
|
if uint64(count) != uint64.fromBytesLE(buf): return err("invalid count")
|
||||||
|
|
||||||
|
# technically not an error, but we'll throw this sanity check in here..
|
||||||
|
if slot > int32.high().uint64: return err("fishy slot")
|
||||||
|
|
||||||
|
ok(Index(startSlot: Slot(slot), offsets: offsets))
|
||||||
|
|
||||||
|
type
|
||||||
|
EraGroup* = object
|
||||||
|
slotIndex*: Index
|
||||||
|
|
||||||
|
proc init*(
|
||||||
|
T: type EraGroup, f: IoHandle, startSlot: Option[Slot]): Result[T, string] =
|
||||||
|
discard ? f.appendHeader(E2Version, 0)
|
||||||
|
|
||||||
|
ok(EraGroup(
|
||||||
|
slotIndex: Index(
|
||||||
|
startSlot: startSlot.get(Slot(0)),
|
||||||
|
offsets: newSeq[int64](
|
||||||
|
if startSlot.isSome(): SLOTS_PER_HISTORICAL_ROOT.int
|
||||||
|
else: 0
|
||||||
|
))))
|
||||||
|
|
||||||
|
proc update*(
|
||||||
|
g: var EraGroup, f: IoHandle, slot: Slot, szBytes: openArray[byte]):
|
||||||
|
Result[void, string] =
|
||||||
|
doAssert slot >= g.slotIndex.startSlot
|
||||||
|
# doAssert slot < g.slotIndex.startSlot + g.slotIndex.offsets.len
|
||||||
|
|
||||||
|
g.slotIndex.offsets[int(slot - g.slotIndex.startSlot)] =
|
||||||
|
? f.appendRecord(SnappyBeaconBlock, szBytes)
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
proc finish*(
|
||||||
|
g: var EraGroup, f: IoHandle, state: ForkyBeaconState):
|
||||||
|
Result[void, string] =
|
||||||
|
let
|
||||||
|
statePos = ? f.appendRecord(state)
|
||||||
|
|
||||||
|
if state.slot > Slot(0):
|
||||||
|
discard ? f.appendRecord(g.slotIndex)
|
||||||
|
|
||||||
|
discard ? f.appendIndex(state.slot, [statePos])
|
||||||
|
|
||||||
|
ok()
|
|
@ -17,7 +17,7 @@ import
|
||||||
ssz_codec],
|
ssz_codec],
|
||||||
../beacon_chain/sszdump,
|
../beacon_chain/sszdump,
|
||||||
../research/simutils,
|
../research/simutils,
|
||||||
./e2store, ./ncli_common, ./validator_db_aggregator
|
./era, ./ncli_common, ./validator_db_aggregator
|
||||||
|
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
import system/ansi_c
|
import system/ansi_c
|
||||||
|
|
Loading…
Reference in New Issue