nimbus-eth1/fluffy/tools/eth_data_exporter.nim

601 lines
21 KiB
Nim

# Nimbus
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# Tool to download chain history data from local node, and save it to the json
# file or sqlite database.
# In case of json:
# Block data is stored as it gets transmitted over the wire and as defined here:
# https://github.com/ethereum/portal-network-specs/blob/master/history-network.md#content-keys-and-values
#
# Json file has following format:
# {
# "hexEncodedBlockHash: {
# "header": "the rlp encoded block header as a hex string"
# "body": "the SSZ encoded container of transactions and uncles as a hex string"
# "receipts: "The SSZ encoded list of the receipts as a hex string"
# "number": "block number"
# },
# ...,
# ...,
# }
# In case of sqlite:
# Data is saved in a format friendly to history network i.e one table with 3
# columns: contentid, contentkey, content.
# Such format enables queries to quickly find content in range of some node
# which makes it possible to offer content to nodes in bulk.
#
# When using geth as client to download receipts from, be aware that you will
# have to set the number of blocks to maintain the transaction index for to
# unlimited if you want access to all transactions/receipts.
# e.g: `./build/bin/geth --ws --txlookuplimit=0`
#
{.push raises: [Defect].}
import
std/[json, typetraits, strutils, strformat, os],
confutils,
stew/[byteutils, io2],
json_serialization,
faststreams, chronicles,
eth/[common, rlp], chronos,
eth/common/eth_types_json_serialization,
json_rpc/rpcclient,
ncli/e2store,
../seed_db,
../../premix/[downloader, parser],
../network/history/[history_content, accumulator],
../data/history_data_parser
# Need to be selective due to the `Block` type conflict from downloader
from ../network/history/history_network import encode
proc defaultDataDir*(): string =
let dataDir = when defined(windows):
"AppData" / "Roaming" / "EthData"
elif defined(macosx):
"Library" / "Application Support" / "EthData"
else:
".cache" / "eth-data"
getHomeDir() / dataDir
const
defaultDataDirDesc = defaultDataDir()
defaultBlockFileName = "eth-block-data"
defaultAccumulatorFileName = "mainnet-master-accumulator.ssz"
type
ExporterCmd* = enum
# TODO: Need to use the ugly """ multi lines as string concat with &
# doesn't work well together with confutils.
exportBlockData =
"""
Export block data (headers, bodies and receipts) to ajson format or a
database. Some of this functionality is likely to get deprecated"""
exportHeaders =
"""
Export block headers from an Ethereum JSON RPC Execution endpoint to
*.e2s files arranged per epoch (8192 blocks)"""
verifyHeaders =
"""
Verify *.e2s files containing block headers. Verify currently only
means being able to RLP decode the block headers"""
exportAccumulatorData =
"""
Build and export the master accumulator and historical epoch
accumulators. Requires *.e2s block header files generated with the
exportHeaders command up until the merge block"""
printAccumulatorData =
"""
Print the root hash of the master accumulator and of all historical
epoch accumulators. Requires data generated by exportAccumulatorData
command"""
StorageMode* = enum
Json, Db
ExporterConf* = object
logLevel* {.
defaultValue: LogLevel.INFO
defaultValueDesc: $LogLevel.INFO
desc: "Sets the log level"
name: "log-level" .}: LogLevel
initialBlock* {.
desc: "Number of the first block which should be downloaded"
defaultValue: 0
name: "initial-block" .}: uint64
endBlock* {.
desc: "Number of the last block which should be downloaded"
defaultValue: 0
name: "end-block" .}: uint64
dataDir* {.
desc: "The directory where generated data files will be exported to"
defaultValue: defaultDataDir()
defaultValueDesc: $defaultDataDirDesc
name: "data-dir" .}: OutDir
case cmd* {.
command
defaultValue: exportBlockData .}: ExporterCmd
of exportBlockData:
fileName* {.
desc: "File name (minus extension) where block data will be exported to"
defaultValue: defaultBlockFileName
defaultValueDesc: $defaultBlockFileName
name: "file-name" .}: string
storageMode* {.
desc: "Storage mode of block data export"
defaultValue: Json
name: "storage-mode" .}: StorageMode
headersOnly* {.
desc: "Only export the headers instead of full blocks and receipts"
defaultValue: false
name: "headers-only" .}: bool
of exportHeaders:
startEpoch* {.
desc: "Number of the first epoch which should be downloaded"
defaultValue: 0
name: "start-epoch" .}: uint64
endEpoch* {.
desc: "Number of the last epoch which should be downloaded"
defaultValue: 1896
name: "end-epoch" .}: uint64
# TODO:
# Although options are the same as for exportHeaders, we can't drop them
# under the same case of as confutils does not agree with that.
of verifyHeaders:
startEpochVerify* {.
desc: "Number of the first epoch which should be downloaded"
defaultValue: 0
name: "start-epoch" .}: uint64
endEpochVerify* {.
desc: "Number of the last epoch which should be downloaded"
defaultValue: 1896
name: "end-epoch" .}: uint64
of exportAccumulatorData:
accumulatorFileName* {.
desc: "File to which the serialized accumulator is written"
defaultValue: defaultAccumulatorFileName
defaultValueDesc: $defaultAccumulatorFileName
name: "accumulator-file-name" .}: string
writeEpochAccumulators* {.
desc: "Write also the SSZ encoded epoch accumulators to specific files"
defaultValue: false
name: "write-epoch-accumulators" .}: bool
of printAccumulatorData:
accumulatorFileNamePrint* {.
desc: "File from which the serialized accumulator is read"
defaultValue: defaultAccumulatorFileName
defaultValueDesc: $defaultAccumulatorFileName
name: "accumulator-file-name" .}: string
HeaderRecord = object
header: string
number: uint64
BlockRecord = object
header: string
body: string
receipts: string
number: uint64
proc parseCmdArg*(T: type StorageMode, p: TaintedString): T
{.raises: [Defect, ConfigurationError].} =
if p == "db":
return Db
elif p == "json":
return Json
else:
let msg = "Provided mode: " & p & " is not a valid. Should be `json` or `db`"
raise newException(ConfigurationError, msg)
proc completeCmdArg*(T: type StorageMode, val: TaintedString): seq[string] =
return @[]
proc writeHeaderRecord(
writer: var JsonWriter, header: BlockHeader)
{.raises: [IOError, Defect].} =
let
dataRecord = HeaderRecord(
header: rlp.encode(header).to0xHex(),
number: header.blockNumber.truncate(uint64))
headerHash = to0xHex(rlpHash(header).data)
writer.writeField(headerHash, dataRecord)
proc writeBlockRecord(
writer: var JsonWriter, blck: Block)
{.raises: [IOError, Defect].} =
let
dataRecord = BlockRecord(
header: rlp.encode(blck.header).to0xHex(),
body: encode(blck.body).to0xHex(),
receipts: encode(blck.receipts).to0xHex(),
number: blck.header.blockNumber.truncate(uint64))
headerHash = to0xHex(rlpHash(blck.header).data)
writer.writeField(headerHash, dataRecord)
proc downloadHeader(client: RpcClient, i: uint64): BlockHeader =
let blockNumber = u256(i)
try:
let jsonHeader = requestHeader(blockNumber, some(client))
parseBlockHeader(jsonHeader)
except CatchableError as e:
fatal "Error while requesting BlockHeader", error = e.msg, number = i
quit 1
proc downloadBlock(i: uint64, client: RpcClient): Block =
let num = u256(i)
try:
return requestBlock(num, flags = {DownloadReceipts}, client = some(client))
except CatchableError as e:
fatal "Error while requesting Block", error = e.msg, number = i
quit 1
proc createAndOpenFile(dataDir: string, fileName: string): OutputStreamHandle =
# Creates directory and file, if file already exists
# program is aborted with info to user, to avoid losing data
let fileName: string =
if not fileName.endsWith(".json"):
fileName & ".json"
else:
fileName
let filePath = dataDir / fileName
if isFile(filePath):
fatal "File under provided path already exists and would be overwritten",
path = filePath
quit 1
let res = createPath(dataDir)
if res.isErr():
fatal "Error occurred while creating directory",
error = ioErrorMsg(res.error)
quit 1
try:
return fileOutput(filePath)
except IOError as e:
fatal "Error occurred while opening the file", error = e.msg
quit 1
proc writeHeadersToJson(config: ExporterConf, client: RpcClient) =
let fh = createAndOpenFile(string config.dataDir, string config.fileName)
try:
var writer = JsonWriter[DefaultFlavor].init(fh.s, pretty = true)
writer.beginRecord()
for i in config.initialBlock..config.endBlock:
let blck = client.downloadHeader(i)
writer.writeHeaderRecord(blck)
if ((i - config.initialBlock) mod 8192) == 0 and i != config.initialBlock:
info "Downloaded 8192 new block headers", currentHeader = i
writer.endRecord()
info "File successfully written", path = config.dataDir / config.fileName
except IOError as e:
fatal "Error occured while writing to file", error = e.msg
quit 1
finally:
try:
fh.close()
except IOError as e:
fatal "Error occured while closing file", error = e.msg
quit 1
proc writeBlocksToJson(config: ExporterConf, client: RpcClient) =
let fh = createAndOpenFile(string config.dataDir, string config.fileName)
try:
var writer = JsonWriter[DefaultFlavor].init(fh.s, pretty = true)
writer.beginRecord()
for i in config.initialBlock..config.endBlock:
let blck = downloadBlock(i, client)
writer.writeBlockRecord(blck)
if ((i - config.initialBlock) mod 8192) == 0 and i != config.initialBlock:
info "Downloaded 8192 new blocks", currentBlock = i
writer.endRecord()
info "File successfully written", path = config.dataDir / config.fileName
except IOError as e:
fatal "Error occured while writing to file", error = e.msg
quit 1
finally:
try:
fh.close()
except IOError as e:
fatal "Error occured while closing file", error = e.msg
quit 1
proc writeBlocksToDb(config: ExporterConf, client: RpcClient) =
let db = SeedDb.new(distinctBase(config.dataDir), config.fileName)
defer:
db.close()
for i in config.initialBlock..config.endBlock:
let
blck = downloadBlock(i, client)
blockHash = blck.header.blockHash()
contentKeyType = BlockKey(blockHash: blockHash)
headerKey = encode(ContentKey(
contentType: blockHeader, blockHeaderKey: contentKeyType))
bodyKey = encode(ContentKey(
contentType: blockBody, blockBodyKey: contentKeyType))
receiptsKey = encode(
ContentKey(contentType: receipts, receiptsKey: contentKeyType))
db.put(headerKey.toContentId(), headerKey.asSeq(), rlp.encode(blck.header))
# No need to seed empty lists into database
if len(blck.body.transactions) > 0 or len(blck.body.uncles) > 0:
let body = encode(blck.body)
db.put(bodyKey.toContentId(), bodyKey.asSeq(), body)
if len(blck.receipts) > 0:
let receipts = encode(blck.receipts)
db.put(receiptsKey.toContentId(), receiptsKey.asSeq(), receipts)
info "Data successfuly written to db"
proc exportBlocks(config: ExporterConf, client: RpcClient) =
case config.storageMode
of Json:
if config.headersOnly:
writeHeadersToJson(config, client)
else:
writeBlocksToJson(config, client)
of Db:
if config.headersOnly:
fatal "Db mode not available for headers only"
quit 1
else:
writeBlocksToDb(config, client)
const
# Using the e2s format to store data, but without the specific structure
# like in an era file, as we currently don't really need that.
# See: https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md
# Added one type for now, with numbers not formally specified.
# Note:
# Snappy compression for `ExecutionBlockHeaderRecord` only helps for the
# first ~1M (?) block headers, after that there is no gain so we don't do it.
ExecutionBlockHeaderRecord = [byte 0xFF, 0x00]
when isMainModule:
{.pop.}
let config = ExporterConf.load()
{.push raises: [Defect].}
setLogLevel(config.logLevel)
if (config.endBlock < config.initialBlock):
fatal "Initial block number should be smaller than end block number",
initialBlock = config.initialBlock,
endBlock = config.endBlock
quit 1
let dataDir = config.dataDir.string
if not isDir(dataDir):
let res = createPath(dataDir)
if res.isErr():
fatal "Error occurred while creating data directory",
dir = dataDir, error = ioErrorMsg(res.error)
quit 1
var client: RpcClient
try:
let c = newRpcWebSocketClient()
# TODO: Hardcoded to the default geth ws address. This should become
# a configurable cli option
waitFor c.connect("ws://127.0.0.1:8546")
client = c
except CatchableError as e:
fatal "Error while connecting to data provider", error = e.msg
quit 1
case config.cmd
of ExporterCmd.exportBlockData:
try:
exportBlocks(config, client)
finally:
waitFor client.close()
of ExporterCmd.exportHeaders:
proc exportEpochHeaders(file: string, epoch: uint64): Result[void, string] =
# Downloading headers from JSON RPC endpoint
info "Requesting epoch headers", epoch
var headers: seq[BlockHeader]
for j in 0..<epochSize.uint64:
debug "Requesting block", number = j
let header = client.downloadHeader(epoch*epochSize + j)
headers.add(header)
let fh = ? openFile(file, {OpenFlags.Write, OpenFlags.Create}).mapErr(toString)
defer: discard closeFile(fh)
info "Writing headers to file", file
for header in headers:
discard ? fh.appendRecord(ExecutionBlockHeaderRecord, rlp.encode(header))
ok()
# TODO: Could make the JSON-RPC requests concurrent per epoch.
# Batching would also be nice but our json-rpc does not support that:
# https://geth.ethereum.org/docs/rpc/batch
for i in config.startEpoch..config.endEpoch:
let file = dataDir / &"mainnet-headers-epoch-{i.uint64:05}.e2s"
if isFile(file):
notice "Skipping epoch headers, file already exists", file
else:
let res = exportEpochHeaders(file, i)
if res.isErr():
error "Failed exporting epoch headers", file, error = res.error
waitFor client.close()
of ExporterCmd.verifyHeaders:
proc verifyEpochHeaders(file: string, epoch: uint64): Result[void, string] =
let fh = ? openFile(file, {OpenFlags.Read}).mapErr(toString)
defer: discard closeFile(fh)
var data: seq[byte]
while true:
let header = readRecord(fh, data).valueOr:
break
if header.typ == ExecutionBlockHeaderRecord:
let
blockHeader =
try:
rlp.decode(data, BlockHeader)
except RlpError as e:
return err("Invalid block header: " & e.msg)
headerHash = to0xHex(rlpHash(blockHeader).data)
debug "Header decoded successfully",
hash = headerHash, blockNumber = blockHeader.blockNumber
else:
warn "Skipping record, not a block header", typ = toHex(header.typ)
ok()
for i in config.startEpochVerify..config.endEpochVerify:
let file = dataDir / &"mainnet-headers-epoch-{i.uint64:05}.e2s"
let res = verifyEpochHeaders(file, i)
if res.isErr():
error "Failed verifying epoch headers", file, error = res.error
else:
info "Successfully decoded epoch headers", file
of ExporterCmd.exportAccumulatorData:
# TODO:
# Also write epoch accumulators to files. These can be re-used for creation
# of headers with proofs.
# Lets first check if the accumulator file already exists before starting
# to build it.
let accumulatorFile = dataDir / config.accumulatorFileName
if isFile(accumulatorFile):
notice "Not building accumulator, file already exists",
file = accumulatorFile
quit 1
# Lets verify if the necessary files exists before starting to build the
# accumulator.
for i in 0..<preMergeEpochs:
let file = dataDir / &"mainnet-headers-epoch-{i.uint64:05}.e2s"
if not isFile(file):
fatal "Required epoch headers file does not exist", file
quit 1
proc buildAccumulator(dataDir: string, writeEpochAccumulators = false):
Result[FinishedAccumulator, string] =
var accumulator: Accumulator
for i in 0..<preMergeEpochs:
let file =
try: dataDir / &"mainnet-headers-epoch-{i.uint64:05}.e2s"
except ValueError as e: raiseAssert e.msg
let fh = ? openFile(file, {OpenFlags.Read}).mapErr(toString)
defer: discard closeFile(fh)
var data: seq[byte]
var count = 0'u64
while true:
let header = readRecord(fh, data).valueOr:
break
if header.typ == ExecutionBlockHeaderRecord:
let blockHeader =
try:
rlp.decode(data, BlockHeader)
except RlpError as e:
return err("Invalid block header in " & file & ": " & e.msg)
# Quick sanity check
if blockHeader.blockNumber.truncate(uint64) != i*epochSize + count:
fatal "Incorrect block headers in file", file = file,
blockNumber = blockHeader.blockNumber,
expectedBlockNumber = i*epochSize + count
quit 1
updateAccumulator(accumulator, blockHeader)
# Note: writing away of epoch accumulators occurs 1 iteration before
# updating the epoch accumulator, as the latter happens when passed
# a header for the next epoch (or on finishing the epoch).
if writeEpochAccumulators:
if accumulator.currentEpoch.len() == epochSize or
blockHeader.blockNumber.truncate(uint64) == mergeBlockNumber - 1:
let file =
try: dataDir / &"mainnet-epoch-accumulator-{i.uint64:05}.ssz"
except ValueError as e: raiseAssert e.msg
let res = io2.writeFile(file, SSZ.encode(accumulator.currentEpoch))
if res.isErr():
error "Failed writing epoch accumulator to file",
file, error = res.error
else:
notice "Succesfully wrote epoch accumulator to file", file
if count == epochSize - 1:
info "Updated an epoch", epoch = i
count.inc()
if blockHeader.blockNumber.truncate(uint64) == mergeBlockNumber - 1:
let finishedAccumulator = finishAccumulator(accumulator)
info "Updated last epoch, finished building master accumulator",
epoch = i
return ok(finishedAccumulator)
else:
warn "Skipping record, not a block header", typ = toHex(header.typ)
err("Not enough headers provided to finish the accumulator")
let accumulatorRes = buildAccumulator(dataDir, config.writeEpochAccumulators)
if accumulatorRes.isErr():
fatal "Could not build accumulator", error = accumulatorRes.error
quit 1
let accumulator = accumulatorRes.get()
let res = io2.writeFile(accumulatorFile, SSZ.encode(accumulator))
if res.isErr():
error "Failed writing accumulator to file",
file = accumulatorFile, error = res.error
quit 1
else:
notice "Succesfully wrote master accumulator to file",
file = accumulatorFile
of ExporterCmd.printAccumulatorData:
let file = dataDir / config.accumulatorFileNamePrint
let res = readAccumulator(file)
if res.isErr():
fatal "Failed reading accumulator from file", error = res.error, file
quit 1
let
accumulator = res.get()
accumulatorRoot = hash_tree_root(accumulator)
info "Accumulator decoded successfully",
root = accumulatorRoot
echo "Master Accumulator:"
echo "-------------------"
echo &"Root: {accumulatorRoot}"
echo ""
echo "Historical Epochs:"
echo "------------------"
echo "Epoch Root"
for i, root in accumulator.historicalEpochs:
echo &"{i.uint64:05} 0x{root.toHex()}"