parent
5374874552
commit
489b975189
|
@ -0,0 +1,137 @@
|
|||
# Nimbus
|
||||
# Copyright (c) 2022 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/options,
|
||||
eth/db/kvstore,
|
||||
eth/db/kvstore_sqlite3,
|
||||
stint
|
||||
|
||||
export kvstore_sqlite3
|
||||
|
||||
type
|
||||
ContentData = tuple
|
||||
contentId: array[32, byte]
|
||||
contentKey: seq[byte]
|
||||
content: seq[byte]
|
||||
|
||||
ContentDataDist = tuple
|
||||
contentId: array[32, byte]
|
||||
contentKey: seq[byte]
|
||||
content: seq[byte]
|
||||
distance: array[32, byte]
|
||||
|
||||
SeedDb* = ref object
|
||||
store: SqStoreRef
|
||||
putStmt: SqliteStmt[(array[32, byte], seq[byte], seq[byte]), void]
|
||||
getStmt: SqliteStmt[array[32, byte], ContentData]
|
||||
getInRangeStmt: SqliteStmt[(array[32, byte], array[32, byte], int64), ContentDataDist]
|
||||
|
||||
func xorDistance(
|
||||
a: openArray[byte],
|
||||
b: openArray[byte]
|
||||
): Result[seq[byte], cstring] {.cdecl.} =
|
||||
var s: seq[byte] = newSeq[byte](32)
|
||||
|
||||
if len(a) != 32 or len(b) != 32:
|
||||
return err("Blobs should have 32 byte length")
|
||||
|
||||
var i = 0
|
||||
while i < 32:
|
||||
s[i] = a[i] xor b[i]
|
||||
inc i
|
||||
|
||||
return ok(s)
|
||||
|
||||
template expectDb(x: auto): untyped =
|
||||
# There's no meaningful error handling implemented for a corrupt database or
|
||||
# full disk - this requires manual intervention, so we'll panic for now
|
||||
x.expect("working database (disk broken/full?)")
|
||||
|
||||
proc new*(T: type SeedDb, path: string, name: string, inMemory = false): SeedDb =
|
||||
let db =
|
||||
if inMemory:
|
||||
SqStoreRef.init("", "seed-db-test", inMemory = true).expect(
|
||||
"working database (out of memory?)")
|
||||
else:
|
||||
SqStoreRef.init(path, name).expectDb()
|
||||
|
||||
if not db.readOnly:
|
||||
let createSql = """
|
||||
CREATE TABLE IF NOT EXISTS seed_data (
|
||||
contentid BLOB PRIMARY KEY,
|
||||
contentkey BLOB,
|
||||
content BLOB
|
||||
);"""
|
||||
|
||||
db.exec(createSql).expectDb()
|
||||
|
||||
let putStmt =
|
||||
db.prepareStmt(
|
||||
"INSERT OR REPLACE INTO seed_data (contentid, contentkey, content) VALUES (?, ?, ?);",
|
||||
(array[32, byte], seq[byte], seq[byte]),
|
||||
void).get()
|
||||
|
||||
let getStmt =
|
||||
db.prepareStmt(
|
||||
"SELECT contentid, contentkey, content FROM seed_data WHERE contentid = ?;",
|
||||
array[32, byte],
|
||||
ContentData
|
||||
).get()
|
||||
|
||||
db.registerCustomScalarFunction("xorDistance", xorDistance)
|
||||
.expect("Couldn't register custom xor function")
|
||||
|
||||
let getInRangeStmt =
|
||||
db.prepareStmt(
|
||||
"""
|
||||
SELECT contentid, contentkey, content, xorDistance(?, contentid) as distance
|
||||
FROM seed_data
|
||||
WHERE distance <= ?
|
||||
LIMIT ?;
|
||||
""",
|
||||
(array[32, byte], array[32, byte], int64),
|
||||
ContentDataDist
|
||||
).get()
|
||||
|
||||
SeedDb(
|
||||
store: db,
|
||||
putStmt: putStmt,
|
||||
getStmt: getStmt,
|
||||
getInRangeStmt: getInRangeStmt
|
||||
)
|
||||
|
||||
proc put*(db: SeedDb, contentId: array[32, byte], contentKey: seq[byte], content: seq[byte]): void =
|
||||
db.putStmt.exec((contentId, contentKey, content)).expectDb()
|
||||
|
||||
proc put*(db: SeedDb, contentId: UInt256, contentKey: seq[byte], content: seq[byte]): void =
|
||||
db.put(contentId.toByteArrayBE(), contentKey, content)
|
||||
|
||||
proc get*(db: SeedDb, contentId: array[32, byte]): Option[ContentData] =
|
||||
var res = none[ContentData]()
|
||||
discard db.getStmt.exec(contentId, proc (v: ContentData) = res = some(v)).expectDb()
|
||||
return res
|
||||
|
||||
proc get*(db: SeedDb, contentId: UInt256): Option[ContentData] =
|
||||
db.get(contentId.toByteArrayBE())
|
||||
|
||||
proc getContentInRange*(
|
||||
db: SeedDb,
|
||||
nodeId: UInt256,
|
||||
nodeRadius: UInt256,
|
||||
max: int64): seq[ContentDataDist] =
|
||||
|
||||
var res: seq[ContentDataDist] = @[]
|
||||
var cd: ContentDataDist
|
||||
for e in db.getInRangeStmt.exec((nodeId.toByteArrayBE(), nodeRadius.toByteArrayBE(), max), cd):
|
||||
res.add(cd)
|
||||
return res
|
||||
|
||||
proc close*(db: SeedDb) =
|
||||
db.store.close()
|
|
@ -6,8 +6,9 @@
|
|||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
# Tool to download chain history data from local node, and save it to the json
|
||||
# file.
|
||||
# Data of each block is rlp encoded list of:
|
||||
# file or sqlite database.
|
||||
# In case of json:
|
||||
# Data of each block is rlp encoded list of:
|
||||
# [blockHeader, [block_transactions, block_uncles], block_receipts]
|
||||
# Json file has following format:
|
||||
# {
|
||||
|
@ -18,7 +19,11 @@
|
|||
# ...,
|
||||
# ...,
|
||||
# }
|
||||
#
|
||||
# In case of sqlite:
|
||||
# Data is saved in a format friendly to history network i.e one table with 3
|
||||
# columns: contentid, contentkey, content.
|
||||
# Such format enables queries to quickly find content in range of some node
|
||||
# which makes it possible to offer content to nodes in bulk.
|
||||
#
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
@ -31,7 +36,9 @@ import
|
|||
faststreams, chronicles,
|
||||
eth/[common, rlp], chronos,
|
||||
eth/common/eth_types_json_serialization,
|
||||
../../premix/downloader
|
||||
../seed_db,
|
||||
../../premix/downloader,
|
||||
../network/history/history_content
|
||||
|
||||
proc defaultDataDir*(): string =
|
||||
let dataDir = when defined(windows):
|
||||
|
@ -45,9 +52,12 @@ proc defaultDataDir*(): string =
|
|||
|
||||
const
|
||||
defaultDataDirDesc = defaultDataDir()
|
||||
defaultFileName = "eth-history-data.json"
|
||||
defaultFileName = "eth-history-data"
|
||||
|
||||
type
|
||||
StorageMode* = enum
|
||||
Json, Db
|
||||
|
||||
ExporterConf* = object
|
||||
logLevel* {.
|
||||
defaultValue: LogLevel.INFO
|
||||
|
@ -68,16 +78,34 @@ type
|
|||
defaultValueDesc: $defaultDataDirDesc
|
||||
name: "data-dir" .}: OutDir
|
||||
filename* {.
|
||||
desc: "default name of the file with history data"
|
||||
desc: "File name (minus extension) where history data will be exported to"
|
||||
defaultValue: defaultFileName
|
||||
defaultValueDesc: $defaultFileName
|
||||
name: "filename" .}: string
|
||||
storageMode* {.
|
||||
desc: "Storage mode of data export"
|
||||
defaultValue: Json
|
||||
name: "storage-mode" .}: StorageMode
|
||||
|
||||
DataRecord = object
|
||||
rlp: string
|
||||
number: uint64
|
||||
|
||||
proc parseCmdArg*(T: type StorageMode, p: TaintedString): T
|
||||
{.raises: [Defect, ConfigurationError].} =
|
||||
if p == "db":
|
||||
return Db
|
||||
elif p == "json":
|
||||
return Json
|
||||
else:
|
||||
let msg = "Provided mode: " & p & " is not a valid. Should be `json` or `db`"
|
||||
raise newException(ConfigurationError, msg)
|
||||
|
||||
proc completeCmdArg*(T: type StorageMode, val: TaintedString): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc writeBlock(writer: var JsonWriter, blck: Block) {.raises: [IOError, Defect].} =
|
||||
let
|
||||
let
|
||||
enc = rlp.encodeList(blck.header, blck.body, blck.receipts)
|
||||
asHex = to0xHex(enc)
|
||||
dataRecord = DataRecord(rlp: asHex, number: cast[uint64](blck.header.blockNumber))
|
||||
|
@ -92,14 +120,20 @@ proc downloadBlock(i: uint64): Block =
|
|||
# which is defult port of geth json rpc server
|
||||
return requestBlock(num, flags = {DownloadReceipts})
|
||||
except CatchableError as e:
|
||||
fatal "Error while requesting Block", error = e.msg
|
||||
fatal "Error while requesting Block", error = e.msg, number = i
|
||||
quit 1
|
||||
|
||||
proc createAndOpenFile(config: ExporterConf): OutputStreamHandle =
|
||||
# Creates directory and file specified in config, if file already exists
|
||||
# Creates directory and file specified in config, if file already exists
|
||||
# program is aborted with info to user, to avoid losing data
|
||||
|
||||
let filePath = config.dataDir / config.filename
|
||||
let fileName: string =
|
||||
if not config.filename.endsWith(".json"):
|
||||
config.filename & ".json"
|
||||
else:
|
||||
config.filename
|
||||
|
||||
let filePath = config.dataDir / fileName
|
||||
|
||||
if isFile(filePath):
|
||||
fatal "File under provided path already exists and would be overwritten",
|
||||
|
@ -120,7 +154,7 @@ proc createAndOpenFile(config: ExporterConf): OutputStreamHandle =
|
|||
fatal "Error occurred while opening the file", error = e.msg
|
||||
quit 1
|
||||
|
||||
proc run(config: ExporterConf) =
|
||||
proc writeToJson(config: ExporterConf) =
|
||||
let fh = createAndOpenFile(config)
|
||||
|
||||
try:
|
||||
|
@ -141,6 +175,38 @@ proc run(config: ExporterConf) =
|
|||
fatal "Error occoured while closing file", error = e.msg
|
||||
quit 1
|
||||
|
||||
proc writeToDb(config: ExporterConf) =
|
||||
let db = SeedDb.new(distinctBase(config.dataDir), config.filename)
|
||||
defer:
|
||||
db.close()
|
||||
|
||||
for i in config.initialBlock..config.endBlock:
|
||||
let
|
||||
blck = downloadBlock(i)
|
||||
blockHash = blck.header.blockHash()
|
||||
contentKeyType = BlockKey(chainId: 1, blockHash: blockHash)
|
||||
headerKey = encode(ContentKey(contentType: blockHeader, blockHeaderKey: contentKeyType))
|
||||
bodyKey = encode(ContentKey(contentType: blockBody, blockBodyKey: contentKeyType))
|
||||
receiptsKey = encode(ContentKey(contentType: receipts, receiptsKey: contentKeyType))
|
||||
|
||||
db.put(headerKey.toContentId(), headerKey.asSeq(), rlp.encode[BlockHeader](blck.header))
|
||||
|
||||
# No need to seed empty stuff into database
|
||||
if len(blck.body.transactions) > 0 or len(blck.body.uncles) > 0:
|
||||
db.put(bodyKey.toContentId(), bodyKey.asSeq(), rlp.encode[BlockBody](blck.body))
|
||||
|
||||
if len(blck.receipts) > 0:
|
||||
db.put(receiptsKey.toContentId(), receiptsKey.asSeq(), rlp.encode[seq[Receipt]](blck.receipts))
|
||||
|
||||
info "Data successfuly written to db"
|
||||
|
||||
proc run(config: ExporterConf) =
|
||||
case config.storageMode
|
||||
of Json:
|
||||
writeToJson(config)
|
||||
of Db:
|
||||
writeToDb(config)
|
||||
|
||||
when isMainModule:
|
||||
{.pop.}
|
||||
let config = ExporterConf.load()
|
||||
|
|
Loading…
Reference in New Issue