From 2fb90cfb430ecb3f75870062f07a3125233db799 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Fri, 9 Feb 2024 11:13:12 +0100 Subject: [PATCH] Initial implementation of era1 + export + verify (#1998) * Initial implementation of era1 + export + verify Initial implementation of era1 + export and verify command in the existing eth_data_exporter. * Quick verify version to test with geth created era1 files * Add Era1File object and getter for specific block tuple * Add getAccumulatorRoot and update AccumulatorRoot type name + clean-up of some comments. * Implement BlockIndex based verify + additional helper calls + fix bug for reading last (incomplete) Era1 file * Fix init BlockIndex offsets for last era + extra check on reading * Remove duplicated e2store code --- fluffy/eth_data/era1.nim | 392 ++++++++++++++++++ fluffy/eth_data/history_data_ssz_e2s.nim | 6 +- fluffy/network/history/accumulator.nim | 9 +- fluffy/tools/eth_data_exporter.nim | 118 +++++- .../tools/eth_data_exporter/exporter_conf.nim | 14 + vendor/nimbus-eth2 | 2 +- 6 files changed, 532 insertions(+), 9 deletions(-) create mode 100644 fluffy/eth_data/era1.nim diff --git a/fluffy/eth_data/era1.nim b/fluffy/eth_data/era1.nim new file mode 100644 index 000000000..693636aea --- /dev/null +++ b/fluffy/eth_data/era1.nim @@ -0,0 +1,392 @@ +# fluffy +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + std/[strformat, typetraits], + results, stew/[endians2, io2, byteutils, arrayops], + stint, snappy, + eth/common/eth_types_rlp, + beacon_chain/spec/beacon_time, + ssz_serialization, + ncli/e2store, + ../network/history/accumulator + +from nimcrypto/hash import fromHex +from ../../nimbus/utils/utils import calcTxRoot, calcReceiptRoot + +export e2store.readRecord + +# Implementation of era1 file format as current described in: +# https://github.com/ethereum/go-ethereum/pull/26621 + +# era1 := Version | block-tuple* | other-entries* | Accumulator | BlockIndex +# block-tuple := CompressedHeader | CompressedBody | CompressedReceipts | TotalDifficulty + +# block-index := starting-number | index | index | index ... | count + +# CompressedHeader = { type: 0x03, data: snappyFramed(rlp(header)) } +# CompressedBody = { type: 0x04, data: snappyFramed(rlp(body)) } +# CompressedReceipts = { type: 0x05, data: snappyFramed(rlp(receipts)) } +# TotalDifficulty = { type: 0x06, data: uint256(header.total_difficulty) } +# AccumulatorRoot = { type: 0x07, data: hash_tree_root(List(HeaderRecord, 8192)) } +# BlockIndex = { type: 0x3266, data: block-index } + +# Important note: +# Snappy does not give the same compression result as the implementation used +# by go-ethereum for some block headers and block bodies. This means that we +# cannot rely on the secondary verification mechanism that is based on doing the +# sha256sum of the full era1 files. +# + +const + # Note: When specification is more official, these could go with the other + # E2S types. + CompressedHeader* = [byte 0x03, 0x00] + CompressedBody* = [byte 0x04, 0x00] + CompressedReceipts* = [byte 0x05, 0x00] + TotalDifficulty* = [byte 0x06, 0x00] + AccumulatorRoot* = [byte 0x07, 0x00] + E2BlockIndex* = [byte 0x66, 0x32] + + MaxEra1Size* = 8192 + +type + BlockIndex* = object + startNumber*: uint64 + offsets*: seq[int64] # Absolute positions in file + + Era1* = distinct uint64 # Period of 8192 blocks (not an exact time unit) + + Era1Group* = object + blockIndex*: BlockIndex + +# As stated, not really a time unit but nevertheless, need the borrows +ethTimeUnit Era1 + +template lenu64(x: untyped): untyped = + uint64(len(x)) + +# Note: appendIndex, appendRecord and readIndex for BlockIndex are very similar +# to its consensus layer counter parts. The difference lies in the naming of +# slots vs block numbers and there is different behavior for the first era +# (first slot) and the last era (era1 ends at merge block). + +proc appendIndex*( + f: IoHandle, startNumber: uint64, offsets: openArray[int64]): + Result[int64, string] = + let + len = offsets.len() * sizeof(int64) + 16 + pos = ? f.appendHeader(E2BlockIndex, len) + + ? f.append(startNumber.uint64.toBytesLE()) + + for v in offsets: + ? f.append(cast[uint64](v - pos).toBytesLE()) + + ? f.append(offsets.lenu64().toBytesLE()) + + ok(pos) + +proc appendRecord(f: IoHandle, index: BlockIndex): Result[int64, string] = + f.appendIndex(index.startNumber, index.offsets) + +proc readBlockIndex*(f: IoHandle): Result[BlockIndex, string] = + let + startPos = ? f.getFilePos().mapErr(toString) + fileSize = ? f.getFileSize().mapErr(toString) + header = ? f.readHeader() + + if header.typ != E2BlockIndex: return err("not an index") + if header.len < 16: return err("index entry too small") + if header.len mod 8 != 0: return err("index length invalid") + + var buf: array[8, byte] + ? f.readFileExact(buf) + let + blockNumber = uint64.fromBytesLE(buf) + count = header.len div 8 - 2 + + var offsets = newSeqUninitialized[int64](count) + for i in 0.. fileSize: return err("Invalid offset") + offsets[i] = absolute + + ? f.readFileExact(buf) + if uint64(count) != uint64.fromBytesLE(buf): return err("invalid count") + + # technically not an error, but we'll throw this sanity check in here.. + if blockNumber > int32.high().uint64: return err("fishy block number") + + ok(BlockIndex(startNumber: blockNumber, offsets: offsets)) + +func startNumber*(era: Era1): uint64 = + era * MaxEra1Size + +func endNumber*(era: Era1): uint64 = + if (era + 1) * MaxEra1Size - 1'u64 >= mergeBlockNumber: + # The incomplete era just before the merge + mergeBlockNumber - 1'u64 + else: + (era + 1) * MaxEra1Size - 1'u64 + +func endNumber*(blockIdx: BlockIndex): uint64 = + blockIdx.startNumber + blockIdx.offsets.lenu64() - 1 + +func era*(blockNumber: uint64): Era1 = + Era1(blockNumber div MaxEra1Size) + +func offsetsLen(startNumber: uint64): int = + # For the era where the merge happens the era files only holds the blocks + # until the merge block so the offsets length needs to be adapted too. + if startNumber.era() >= mergeBlockNumber.era(): + int((mergeBlockNumber) mod MaxEra1Size) + else: + MaxEra1Size + +proc toCompressedRlpBytes(item: auto): seq[byte] = + snappy.encodeFramed(rlp.encode(item)) + +proc fromCompressedRlpBytes(bytes: openArray[byte], T: type): Result[T, string] = + try: + ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), T)) + except RlpError as e: + err("Invalid Compressed RLP data" & e.msg) + +proc init*( + T: type Era1Group, f: IoHandle, startNumber: uint64 + ): Result[T, string] = + discard ? f.appendHeader(E2Version, 0) + + ok(Era1Group( + blockIndex: BlockIndex( + startNumber: startNumber, + offsets: newSeq[int64](startNumber.offsetsLen()) + ))) + +proc update*( + g: var Era1Group, f: IoHandle, blockNumber: uint64, + header, body, receipts, totalDifficulty: openArray[byte] + ): Result[void, string] = + doAssert blockNumber >= g.blockIndex.startNumber + + g.blockIndex.offsets[int(blockNumber - g.blockIndex.startNumber)] = + ? f.appendRecord(CompressedHeader, header) + discard ? f.appendRecord(CompressedBody, body) + discard ? f.appendRecord(CompressedReceipts, receipts) + discard ? f.appendRecord(TotalDifficulty, totalDifficulty) + + ok() + +proc update*( + g: var Era1Group, f: IoHandle, blockNumber: uint64, + header: BlockHeader, body: BlockBody, receipts: seq[Receipt], + totalDifficulty: UInt256 + ): Result[void, string] = + g.update( + f, blockNumber, + toCompressedRlpBytes(header), + toCompressedRlpBytes(body), + toCompressedRlpBytes(receipts), + totalDifficulty.toBytesLE() + ) + +proc finish*( + g: var Era1Group, f: IoHandle, accumulatorRoot: Digest, lastBlockNumber: uint64 + ):Result[void, string] = + let accumulatorRootPos = ? f.appendRecord(AccumulatorRoot, accumulatorRoot.data) + + if lastBlockNumber > 0: + discard ? f.appendRecord(g.blockIndex) + + # TODO: + # This is not something added in current specification of era1. + # But perhaps we want to be able to quickly jump to acummulator root. + # discard ? f.appendIndex(lastBlockNumber, [accumulatorRootPos]) + discard accumulatorRootPos + + ok() + +func shortLog*(x: Digest): string = + x.data.toOpenArray(0, 3).toHex() + +func era1FileName*(network: string, era: Era1, eraRoot: Digest): string = + try: + &"{network}-{era.uint64:05}-{shortLog(eraRoot)}.era1" + except ValueError as exc: + raiseAssert exc.msg + +# Helpers to directly read objects from era1 files +# TODO: Might want to var parameters to avoid copying as is done for era files. + +type + Era1File* = ref object + handle: Opt[IoHandle] + blockIdx: BlockIndex + +proc open*(_: type Era1File, name: string): Result[Era1File, string] = + var + f = Opt[IoHandle].ok(? openFile(name, {OpenFlags.Read}).mapErr(ioErrorMsg)) + + defer: + if f.isSome(): discard closeFile(f[]) + + # Indices can be found at the end of each era file - we only support + # single-era files for now + ? f[].setFilePos(0, SeekPosition.SeekEnd).mapErr(ioErrorMsg) + + # Last in the file is the block index + let + blockIdxPos = ? f[].findIndexStartOffset() + ? f[].setFilePos(blockIdxPos, SeekPosition.SeekCurrent).mapErr(ioErrorMsg) + + let blockIdx = ? f[].readBlockIndex() + if blockIdx.offsets.len() != blockIdx.startNumber.offsetsLen(): + return err("Block index length invalid") + + let res = Era1File(handle: f, blockIdx: blockIdx) + reset(f) + ok res + +proc close*(f: Era1File) = + if f.handle.isSome(): + discard closeFile(f.handle.get()) + reset(f.handle) + +proc getBlockHeader(f: Era1File): Result[BlockHeader, string] = + var bytes: seq[byte] + + let header = ? f[].handle.get().readRecord(bytes) + if header.typ != CompressedHeader: + return err("Invalid era file: didn't find block header at index position") + + fromCompressedRlpBytes(bytes, BlockHeader) + +proc getBlockBody(f: Era1File): Result[BlockBody, string] = + var bytes: seq[byte] + + let header = ? f[].handle.get().readRecord(bytes) + if header.typ != CompressedBody: + return err("Invalid era file: didn't find block body at index position") + + fromCompressedRlpBytes(bytes, BlockBody) + +proc getReceipts(f: Era1File): Result[seq[Receipt], string] = + var bytes: seq[byte] + + let header = ? f[].handle.get().readRecord(bytes) + if header.typ != CompressedReceipts: + return err("Invalid era file: didn't find receipts at index position") + + fromCompressedRlpBytes(bytes, seq[Receipt]) + +proc getTotalDifficulty(f: Era1File): Result[UInt256, string] = + var bytes: seq[byte] + + let header = ? f[].handle.get().readRecord(bytes) + if header.typ != TotalDifficulty: + return err("Invalid era file: didn't find total difficulty at index position") + + if bytes.len != 32: + return err("Invalid total difficulty length") + + ok(UInt256.fromBytesLE(bytes)) + +proc getNextBlockTuple*( + f: Era1File + ): Result[(BlockHeader, BlockBody, seq[Receipt], UInt256), string] = + doAssert not isNil(f) and f[].handle.isSome + + let + blockHeader = ? getBlockHeader(f) + blockBody = ? getBlockBody(f) + receipts = ? getReceipts(f) + totalDifficulty = ? getTotalDifficulty(f) + + ok((blockHeader, blockBody, receipts, totalDifficulty)) + +proc getBlockTuple*( + f: Era1File, blockNumber: uint64 + ): Result[(BlockHeader, BlockBody, seq[Receipt], UInt256), string] = + doAssert not isNil(f) and f[].handle.isSome + doAssert( + blockNumber >= f[].blockIdx.startNumber and + blockNumber <= f[].blockIdx.endNumber, + "Wrong era1 file for selected block number") + + let pos = f[].blockIdx.offsets[blockNumber - f[].blockIdx.startNumber] + + ? f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg) + + getNextBlockTuple(f) + +# TODO: Should we add this perhaps in the Era1File object and grab it in open()? +proc getAccumulatorRoot*(f: Era1File): Result[Digest, string] = + # Get position of BlockIndex + ? f[].handle.get().setFilePos(0, SeekPosition.SeekEnd).mapErr(ioErrorMsg) + let blockIdxPos = ? f[].handle.get().findIndexStartOffset() + + # Accumulator root is 40 bytes before the BlockIndex + let accumulatorRootPos = blockIdxPos - 40 # 8 + 32 + ? f[].handle.get().setFilePos(accumulatorRootPos, SeekPosition.SeekCurrent).mapErr(ioErrorMsg) + + var bytes: seq[byte] + let header = ? f[].handle.get().readRecord(bytes) + + if header.typ != AccumulatorRoot: + return err("Invalid era file: didn't find accumulator root at index position") + + if bytes.len != 32: + return err("invalid accumulator root") + + ok(Digest(data: array[32, byte].initCopyFrom(bytes))) + +proc verify*(f: Era1File): Result[Digest, string] = + let + startNumber = f.blockIdx.startNumber + endNumber = f.blockIdx.endNumber() + + var headerRecords: seq[HeaderRecord] + for blockNumber in startNumber..endNumber: + let + (blockHeader, blockBody, receipts, totalDifficulty) = + ? f.getBlockTuple(blockNumber) + + txRoot = calcTxRoot(blockBody.transactions) + ommershHash = keccakHash(rlp.encode(blockBody.uncles)) + + if blockHeader.txRoot != txRoot: + return err("Invalid transactions root") + + if blockHeader.ommersHash != ommershHash: + return err("Invalid ommers hash") + + if blockHeader.receiptRoot != calcReceiptRoot(receipts): + return err("Invalid receipts root") + + headerRecords.add(HeaderRecord( + blockHash: blockHeader.blockHash(), + totalDifficulty: totalDifficulty)) + + let expectedRoot = ? f.getAccumulatorRoot() + let accumulatorRoot = getEpochAccumulatorRoot(headerRecords) + + if accumulatorRoot != expectedRoot: + err("Invalid accumulator root") + else: + ok(accumulatorRoot) diff --git a/fluffy/eth_data/history_data_ssz_e2s.nim b/fluffy/eth_data/history_data_ssz_e2s.nim index a6d043068..1a34babd6 100644 --- a/fluffy/eth_data/history_data_ssz_e2s.nim +++ b/fluffy/eth_data/history_data_ssz_e2s.nim @@ -1,5 +1,5 @@ # Nimbus - Portal Network -# Copyright (c) 2022-2023 Status Research & Development GmbH +# Copyright (c) 2022-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -15,10 +15,6 @@ import export results -proc toString*(v: IoErrorCode): string = - try: ioErrorMsg(v) - except Exception as e: raiseAssert e.msg - # Reading SSZ data from files proc readAccumulator*(file: string): Result[FinishedAccumulator, string] = diff --git a/fluffy/network/history/accumulator.nim b/fluffy/network/history/accumulator.nim index 602e5d07d..005794a3c 100644 --- a/fluffy/network/history/accumulator.nim +++ b/fluffy/network/history/accumulator.nim @@ -1,5 +1,5 @@ # Nimbus -# Copyright (c) 2022-2023 Status Research & Development GmbH +# Copyright (c) 2022-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -73,6 +73,13 @@ func init*(T: type Accumulator): T = currentEpoch: EpochAccumulator.init(@[]) ) +func getEpochAccumulatorRoot*( + headerRecords: openArray[HeaderRecord] + ): Digest = + let epochAccumulator = EpochAccumulator.init(@headerRecords) + + hash_tree_root(epochAccumulator) + func updateAccumulator*( a: var Accumulator, header: BlockHeader) = doAssert(header.blockNumber.truncate(uint64) < mergeBlockNumber, diff --git a/fluffy/tools/eth_data_exporter.nim b/fluffy/tools/eth_data_exporter.nim index 85c74cf6e..fedc87fa5 100644 --- a/fluffy/tools/eth_data_exporter.nim +++ b/fluffy/tools/eth_data_exporter.nim @@ -1,5 +1,5 @@ # Fluffy -# Copyright (c) 2022-2023 Status Research & Development GmbH +# Copyright (c) 2022-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -45,15 +45,17 @@ import eth/[common, rlp], chronos, eth/common/eth_types_json_serialization, json_rpc/rpcclient, + snappy, ncli/e2store, ../database/seed_db, ../../premix/[downloader, parser], ../network/history/[history_content, accumulator], - ../eth_data/[history_data_json_store, history_data_ssz_e2s], + ../eth_data/[history_data_json_store, history_data_ssz_e2s, era1], eth_data_exporter/[exporter_conf, exporter_common, cl_data_exporter] # Need to be selective due to the `Block` type conflict from downloader from ../network/history/history_network import encode +from ../../nimbus/utils/utils import calcTxRoot, calcReceiptRoot chronicles.formatIt(IoErrorCode): $it @@ -185,14 +187,121 @@ proc connectRpcClient( of HttpUrl: try: await RpcHttpClient(client).connect(web3Url.url) + ok() except CatchableError as e: return err(e.msg) of WsUrl: try: await RpcWebSocketClient(client).connect(web3Url.url) + ok() except CatchableError as e: return err(e.msg) +proc cmdExportEra1(config: ExporterConf) = + let client = newRpcClient(config.web3Url) + try: + let connectRes = waitFor client.connectRpcClient(config.web3Url) + if connectRes.isErr(): + fatal "Failed connecting to JSON-RPC client", error = connectRes.error + quit 1 + except CatchableError as e: + # TODO: Add async raises to get rid of this. + fatal "Failed connecting to JSON-RPC client", error = e.msg + quit 1 + + var era = Era1(config.era) + while config.eraCount == 0 or era < Era1(config.era) + config.eraCount: + defer: era += 1 + + let + startNumber = era.startNumber() + endNumber = era.endNumber() + + if startNumber >= mergeBlockNumber: + info "Stopping era as it is after the merge" + break + + var accumulatorRoot = default(Digest) + let tmpName = era1FileName("mainnet", era, default(Digest)) & ".tmp" + + info "Writing era1", tmpName + + var completed = false + block writeFileBlock: + let e2 = openFile(tmpName, {OpenFlags.Write, OpenFlags.Create, OpenFlags.Truncate}).get() + defer: discard closeFile(e2) + + # TODO: Not checking the result of init, update or finish here, as all + # error cases are fatal. But maybe we could throw proper errors still. + var group = Era1Group.init(e2, startNumber).get() + + # Header records to build the accumulator root + var headerRecords: seq[accumulator.HeaderRecord] + for blockNumber in startNumber..endNumber: + let blck = + try: + # TODO: Not sure about the errors that can occur here. But the whole + # block requests over json-rpc should be reworked here (and can be + # used in the bridge also then) + requestBlock(blockNumber.u256, flags = {DownloadReceipts}, client = some(client)) + except CatchableError as e: + error "Failed retrieving block, skip creation of era1 file", blockNumber, era, error = e.msg + break writeFileBlock + + var ttd: UInt256 + try: + blck.jsonData.fromJson "totalDifficulty", ttd + except ValueError: + break writeFileBlock + + headerRecords.add(accumulator.HeaderRecord( + blockHash: blck.header.blockHash(), + totalDifficulty: ttd)) + + group.update( + e2, blockNumber, blck.header, blck.body, blck.receipts, ttd).get() + + accumulatorRoot = getEpochAccumulatorRoot(headerRecords) + + group.finish(e2, accumulatorRoot, endNumber).get() + completed = true + if completed: + let name = era1FileName("mainnet", era, accumulatorRoot) + # We cannot check for the exact file any earlier as we need to know the + # accumulator root. + # TODO: Could scan for file with era number in it. + if isFile(name): + info "Era1 file already exists", era, name + if (let e = io2.removeFile(tmpName); e.isErr): + warn "Failed to clean up tmp era1 file", tmpName, error = e.error + continue + + try: + moveFile(tmpName, name) + except Exception as e: # TODO + warn "Failed to rename era1 file to its final name", + name, tmpName, error = e.msg + + info "Writing era1 completed", name + else: + error "Failed creating the era1 file", era + if (let e = io2.removeFile(tmpName); e.isErr): + warn "Failed to clean up incomplete era1 file", tmpName, error = e.error + +proc cmdVerifyEra1(config: ExporterConf) = + let f = Era1File.open(config.era1FileName).valueOr: + warn "Failed to open era file", error = error + quit 1 + defer: close(f) + + let root = f.verify.valueOr: + warn "Verification of era file failed", error = error + quit 1 + + notice "Era1 file succesfully verified", + accumulatorRoot = root.data.to0xHex(), + file = config.era1FileName + when isMainModule: {.pop.} let config = ExporterConf.load() @@ -539,6 +648,11 @@ when isMainModule: fatal "Error occured while closing file", error = e.msg quit 1 + of HistoryCmd.exportEra1: + cmdExportEra1(config) + of HistoryCmd.verifyEra1: + cmdVerifyEra1(config) + of ExporterCmd.beacon: let (cfg, forkDigests, _) = getBeaconData() diff --git a/fluffy/tools/eth_data_exporter/exporter_conf.nim b/fluffy/tools/eth_data_exporter/exporter_conf.nim index 1b08dc48b..0d1b8ac1d 100644 --- a/fluffy/tools/eth_data_exporter/exporter_conf.nim +++ b/fluffy/tools/eth_data_exporter/exporter_conf.nim @@ -61,6 +61,8 @@ type "Export block headers from an Ethereum JSON RPC Execution endpoint to *.e2s files (unlimited amount)" exportHeadersWithProof = "Export block headers with proof from *.e2s headers file and epochAccumulator files" + exportEra1 = "Export historical data to era1 store" + verifyEra1 = "Read and verify historical data from era1 store" BeaconCmd* = enum exportLCBootstrap = "Export Light Client Bootstrap" @@ -159,6 +161,18 @@ type endBlockNumber2* {. desc: "Number of the last block header to be exported" name: "end-block" .}: uint64 + of exportEra1: + era* {. + defaultValue: 0 + desc: "The era number to write".}: uint64 + eraCount* {. + defaultValue: 0 + name: "count" + desc: "Number of eras to write (0=all)".}: uint64 + of verifyEra1: + era1FileName* {. + desc: "Era1 file to read and verify" + name: "era1-file-name" .}: string of ExporterCmd.beacon: restUrl* {. desc: "URL of the beacon node REST service" diff --git a/vendor/nimbus-eth2 b/vendor/nimbus-eth2 index e398078ab..dca444bea 160000 --- a/vendor/nimbus-eth2 +++ b/vendor/nimbus-eth2 @@ -1 +1 @@ -Subproject commit e398078abcfb81dbce1804fa9920d2e92e493c64 +Subproject commit dca444bea7565a1d02b64aa6c9b588627187c345