mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-12 05:14:14 +00:00
Remove unused seed_db and related code (#2471)
This commit is contained in:
parent
94340037bf
commit
4a20756e6b
@ -1,153 +0,0 @@
|
||||
# Fluffy
|
||||
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, os],
|
||||
strutils,
|
||||
eth/db/kvstore,
|
||||
eth/db/kvstore_sqlite3,
|
||||
stint,
|
||||
./content_db_custom_sql_functions
|
||||
|
||||
export kvstore_sqlite3
|
||||
|
||||
type
|
||||
ContentData =
|
||||
tuple[contentId: array[32, byte], contentKey: seq[byte], content: seq[byte]]
|
||||
|
||||
ContentDataDist* =
|
||||
tuple[
|
||||
contentId: array[32, byte],
|
||||
contentKey: seq[byte],
|
||||
content: seq[byte],
|
||||
distance: array[32, byte],
|
||||
]
|
||||
|
||||
SeedDb* = ref object
|
||||
store: SqStoreRef
|
||||
putStmt: SqliteStmt[(array[32, byte], seq[byte], seq[byte]), void]
|
||||
getStmt: SqliteStmt[array[32, byte], ContentData]
|
||||
getInRangeStmt:
|
||||
SqliteStmt[(array[32, byte], array[32, byte], int64, int64), ContentDataDist]
|
||||
|
||||
template expectDb(x: auto): untyped =
|
||||
# There's no meaningful error handling implemented for a corrupt database or
|
||||
# full disk - this requires manual intervention, so we'll panic for now
|
||||
x.expect("working database (disk broken/full?)")
|
||||
|
||||
proc getDbBasePathAndName*(path: string): Option[(string, string)] =
|
||||
let (basePath, name) = splitPath(path)
|
||||
if len(basePath) > 0 and len(name) > 0 and name.endsWith(".sqlite3"):
|
||||
let nameAndExt = rsplit(name, ".", 1)
|
||||
|
||||
if len(nameAndExt) < 2 and len(nameAndExt[0]) == 0:
|
||||
return none((string, string))
|
||||
|
||||
return some((basePath, nameAndExt[0]))
|
||||
else:
|
||||
return none((string, string))
|
||||
|
||||
proc new*(T: type SeedDb, path: string, name: string, inMemory = false): SeedDb =
|
||||
let db =
|
||||
if inMemory:
|
||||
SqStoreRef.init("", "seed-db-test", inMemory = true).expect(
|
||||
"working database (out of memory?)"
|
||||
)
|
||||
else:
|
||||
SqStoreRef.init(path, name).expectDb()
|
||||
|
||||
if not db.readOnly:
|
||||
let createSql =
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS seed_data (
|
||||
contentid BLOB PRIMARY KEY,
|
||||
contentkey BLOB,
|
||||
content BLOB
|
||||
);"""
|
||||
|
||||
db.exec(createSql).expectDb()
|
||||
|
||||
let putStmt = db.prepareStmt(
|
||||
"INSERT OR REPLACE INTO seed_data (contentid, contentkey, content) VALUES (?, ?, ?);",
|
||||
(array[32, byte], seq[byte], seq[byte]),
|
||||
void,
|
||||
)[]
|
||||
|
||||
let getStmt = db.prepareStmt(
|
||||
"SELECT contentid, contentkey, content FROM seed_data WHERE contentid = ?;",
|
||||
array[32, byte],
|
||||
ContentData,
|
||||
)[]
|
||||
|
||||
db.createCustomFunction("xorDistance", 2, xorDistance).expect(
|
||||
"Custom function xorDistance creation OK"
|
||||
)
|
||||
|
||||
let getInRangeStmt = db.prepareStmt(
|
||||
"""
|
||||
SELECT contentid, contentkey, content, xorDistance(?, contentid) as distance
|
||||
FROM seed_data
|
||||
WHERE distance <= ?
|
||||
LIMIT ?
|
||||
OFFSET ?;
|
||||
""",
|
||||
(array[32, byte], array[32, byte], int64, int64),
|
||||
ContentDataDist,
|
||||
)[]
|
||||
|
||||
SeedDb(store: db, putStmt: putStmt, getStmt: getStmt, getInRangeStmt: getInRangeStmt)
|
||||
|
||||
proc put*(
|
||||
db: SeedDb, contentId: array[32, byte], contentKey: seq[byte], content: seq[byte]
|
||||
): void =
|
||||
db.putStmt.exec((contentId, contentKey, content)).expectDb()
|
||||
|
||||
proc put*(
|
||||
db: SeedDb, contentId: UInt256, contentKey: seq[byte], content: seq[byte]
|
||||
): void =
|
||||
db.put(contentId.toBytesBE(), contentKey, content)
|
||||
|
||||
proc get*(db: SeedDb, contentId: array[32, byte]): Option[ContentData] =
|
||||
var res = none[ContentData]()
|
||||
discard db.getStmt
|
||||
.exec(
|
||||
contentId,
|
||||
proc(v: ContentData) =
|
||||
res = some(v)
|
||||
,
|
||||
)
|
||||
.expectDb()
|
||||
return res
|
||||
|
||||
proc get*(db: SeedDb, contentId: UInt256): Option[ContentData] =
|
||||
db.get(contentId.toBytesBE())
|
||||
|
||||
proc getContentInRange*(
|
||||
db: SeedDb, nodeId: UInt256, nodeRadius: UInt256, max: int64, offset: int64
|
||||
): seq[ContentDataDist] =
|
||||
## Return `max` amount of content in `nodeId` range, starting from `offset` position
|
||||
## i.e using `offset` 0 will return `max` closest items, using `offset` `10` will
|
||||
## will retrun `max` closest items except first 10
|
||||
|
||||
var res: seq[ContentDataDist] = @[]
|
||||
var cd: ContentDataDist
|
||||
for e in db.getInRangeStmt.exec(
|
||||
(nodeId.toBytesBE(), nodeRadius.toBytesBE(), max, offset), cd
|
||||
):
|
||||
res.add(cd)
|
||||
return res
|
||||
|
||||
proc getContentInRange*(
|
||||
db: SeedDb, nodeId: UInt256, nodeRadius: UInt256, max: int64
|
||||
): seq[ContentDataDist] =
|
||||
## Return `max` amount of content in `nodeId` range, starting from closest content
|
||||
return db.getContentInRange(nodeId, nodeRadius, max, 0)
|
||||
|
||||
proc close*(db: SeedDb) =
|
||||
db.store.close()
|
@ -1,278 +0,0 @@
|
||||
# Fluffy
|
||||
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/math,
|
||||
chronos,
|
||||
eth/p2p/discoveryv5/[node, random2],
|
||||
./wire/portal_protocol,
|
||||
./history/[history_content, history_network],
|
||||
../database/seed_db
|
||||
|
||||
# Experimental module which implements different content seeding strategies.
|
||||
# Module is oblivious to content stored in seed database as all content related
|
||||
# parameters should be available in seed db i.e (contentId, contentKey, content)
|
||||
# One thing which might need to be parameterized per network basis in the future is
|
||||
# the distance function.
|
||||
# TODO: At this point all calls are one shot calls but we can also experiment with
|
||||
# approaches which start some process which continuously seeds data.
|
||||
# This would require creation of separate object which would manage started task
|
||||
# like:
|
||||
# type NetworkSeedingManager = ref object
|
||||
# seedTask: Future[void]
|
||||
# and creating few procs which would start/stop given seedTask or even few
|
||||
# seed tasks
|
||||
|
||||
const
|
||||
#TODO currently we are using value for history network, but this should be
|
||||
#caluculated per netowork basis
|
||||
maxItemsPerOfferBySize = getMaxOfferedContentKeys(
|
||||
uint32(len(PortalProtocolId)), uint32(history_content.maxContentKeySize)
|
||||
)
|
||||
|
||||
# Offering is restricted to max 64 items
|
||||
maxItemPerOfferByLen = 64
|
||||
|
||||
maxItemsPerOffer = min(maxItemsPerOfferBySize, maxItemPerOfferByLen)
|
||||
|
||||
proc depthContentPropagate*(
|
||||
p: PortalProtocol, seedDbPath: string, maxClosestNodes: uint32
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
## Choses `maxClosestNodes` closest known nodes with known radius and tries to
|
||||
## offer as much content as possible in their range from seed db. Offers are made conccurently
|
||||
## with at most one offer per peer at the time.
|
||||
|
||||
const batchSize = maxItemsPerOffer
|
||||
|
||||
var gossipWorkers: seq[Future[void]]
|
||||
|
||||
# TODO improve peer selection strategy, to be sure more network is covered, although
|
||||
# it still does not need to be perfect as nodes which receive content will still
|
||||
# propagate it further by neighbour gossip
|
||||
let closestWithRadius =
|
||||
p.getNClosestNodesWithRadius(p.localNode.id, int(maxClosestNodes), seenOnly = true)
|
||||
|
||||
proc worker(
|
||||
p: PortalProtocol, db: SeedDb, node: Node, radius: UInt256
|
||||
): Future[void] {.async.} =
|
||||
var offset = 0
|
||||
while true:
|
||||
let content = db.getContentInRange(node.id, radius, batchSize, offset)
|
||||
|
||||
if len(content) == 0:
|
||||
break
|
||||
|
||||
var contentKV: seq[ContentKV]
|
||||
for e in content:
|
||||
let info =
|
||||
ContentKV(contentKey: ByteList.init(e.contentKey), content: e.content)
|
||||
contentKV.add(info)
|
||||
|
||||
let offerResult = await p.offer(node, contentKV)
|
||||
|
||||
if offerResult.isErr() or len(content) < batchSize:
|
||||
# peer failed or we reached end of database stop offering more content
|
||||
break
|
||||
|
||||
offset = offset + batchSize
|
||||
|
||||
proc saveDataToLocalDb(p: PortalProtocol, db: SeedDb) =
|
||||
let localBatchSize = 10000
|
||||
|
||||
var offset = 0
|
||||
while true:
|
||||
let content =
|
||||
db.getContentInRange(p.localNode.id, p.dataRadius, localBatchSize, offset)
|
||||
|
||||
if len(content) == 0:
|
||||
break
|
||||
|
||||
for e in content:
|
||||
p.storeContent(
|
||||
ByteList.init(e.contentKey), UInt256.fromBytesBE(e.contentId), e.content
|
||||
)
|
||||
|
||||
if len(content) < localBatchSize:
|
||||
# got to the end of db.
|
||||
break
|
||||
|
||||
offset = offset + localBatchSize
|
||||
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let
|
||||
(dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
|
||||
for n in closestWithRadius:
|
||||
gossipWorkers.add(p.worker(db, n[0], n[1]))
|
||||
|
||||
p.saveDataToLocalDb(db)
|
||||
|
||||
await allFutures(gossipWorkers)
|
||||
|
||||
db.close()
|
||||
|
||||
return ok()
|
||||
|
||||
func contentDataToKeys(
|
||||
contentData: seq[ContentDataDist]
|
||||
): (Opt[NodeId], ContentKeysList, seq[seq[byte]]) =
|
||||
var contentKeys: seq[ByteList]
|
||||
var content: seq[seq[byte]]
|
||||
for cd in contentData:
|
||||
contentKeys.add(ByteList.init(cd.contentKey))
|
||||
content.add(cd.content)
|
||||
return (Opt.none(NodeId), ContentKeysList(contentKeys), content)
|
||||
|
||||
proc breadthContentPropagate*(
|
||||
p: PortalProtocol, seedDbPath: string
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
## Iterates over whole seed database, and offer batches of content to different
|
||||
## set of nodes
|
||||
|
||||
const concurrentGossips = 20
|
||||
|
||||
const gossipsPerBatch = 5
|
||||
|
||||
var gossipQueue =
|
||||
newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](concurrentGossips)
|
||||
|
||||
var gossipWorkers: seq[Future[void]]
|
||||
|
||||
proc gossipWorker(p: PortalProtocol) {.async.} =
|
||||
while true:
|
||||
let (srcNodeId, keys, content) = await gossipQueue.popFirst()
|
||||
|
||||
discard await p.neighborhoodGossip(srcNodeId, keys, content)
|
||||
|
||||
for i in 0 ..< concurrentGossips:
|
||||
gossipWorkers.add(gossipWorker(p))
|
||||
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let
|
||||
(dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
batchSize = maxItemsPerOffer
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
target = p.localNode.id
|
||||
|
||||
var offset = 0
|
||||
|
||||
while true:
|
||||
# Setting radius to `UInt256.high` and using batchSize and offset, means
|
||||
# we will iterate over whole database in batches of `maxItemsPerOffer` items
|
||||
var contentData = db.getContentInRange(target, UInt256.high, batchSize, offset)
|
||||
|
||||
if len(contentData) == 0:
|
||||
break
|
||||
|
||||
for cd in contentData:
|
||||
p.storeContent(
|
||||
ByteList.init(cd.contentKey), UInt256.fromBytesBE(cd.contentId), cd.content
|
||||
)
|
||||
|
||||
# TODO this a bit hacky way to make sure we will engage more valid peers for each
|
||||
# batch of data. This maybe removed after improving neighborhoodGossip
|
||||
# to better chose peers based on propagated content
|
||||
for i in 0 ..< gossipsPerBatch:
|
||||
p.baseProtocol.rng[].shuffle(contentData)
|
||||
let keysWithContent = contentDataToKeys(contentData)
|
||||
await gossipQueue.put(keysWithContent)
|
||||
|
||||
if len(contentData) < batchSize:
|
||||
break
|
||||
|
||||
offset = offset + batchSize
|
||||
|
||||
db.close()
|
||||
|
||||
return ok()
|
||||
|
||||
proc offerContentInNodeRange*(
|
||||
p: PortalProtocol, seedDbPath: string, nodeId: NodeId, max: uint32, starting: uint32
|
||||
): Future[PortalResult[int]] {.async.} =
|
||||
## Offers `max` closest elements starting from `starting` index to peer
|
||||
## with given `nodeId`.
|
||||
## Maximum value of `max` is 64 , as this is limit for single offer. Although
|
||||
## `starting` argument is needed as seed_db is read only, so if there is
|
||||
## more content in peer range than max, then to offer 64 closest elements
|
||||
## it needs to be set to 0. To offer next 64 elements it need to be set to
|
||||
## 64 etc.
|
||||
## Return number of items really offered to remote peer.
|
||||
|
||||
let numberToToOffer = min(int(max), maxItemsPerOffer)
|
||||
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let (dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
|
||||
let maybeNodeAndRadius = await p.resolveWithRadius(nodeId)
|
||||
|
||||
if maybeNodeAndRadius.isNone():
|
||||
return err("Could not find node with provided nodeId")
|
||||
|
||||
let
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
(node, radius) = maybeNodeAndRadius.unsafeGet()
|
||||
content =
|
||||
db.getContentInRange(node.id, radius, int64(numberToToOffer), int64(starting))
|
||||
|
||||
# We got all we wanted from seed_db, it can be closed now.
|
||||
db.close()
|
||||
|
||||
var ci: seq[ContentKV]
|
||||
|
||||
for cont in content:
|
||||
let k = ByteList.init(cont.contentKey)
|
||||
let info = ContentKV(contentKey: k, content: cont.content)
|
||||
ci.add(info)
|
||||
|
||||
# waiting for offer result, by the end of this call remote node should
|
||||
# have received offered content
|
||||
let offerResult = await p.offer(node, ci)
|
||||
|
||||
if offerResult.isOk():
|
||||
return ok(len(content))
|
||||
else:
|
||||
return err(offerResult.error)
|
||||
|
||||
proc storeContentInNodeRange*(
|
||||
p: PortalProtocol, seedDbPath: string, max: uint32, starting: uint32
|
||||
): PortalResult[void] =
|
||||
let maybePathAndDbName = getDbBasePathAndName(seedDbPath)
|
||||
|
||||
if maybePathAndDbName.isNone():
|
||||
return err("Provided path is not valid sqlite database path")
|
||||
|
||||
let (dbPath, dbName) = maybePathAndDbName.unsafeGet()
|
||||
|
||||
let
|
||||
localRadius = p.dataRadius
|
||||
db = SeedDb.new(path = dbPath, name = dbName)
|
||||
localId = p.localNode.id
|
||||
contentInRange =
|
||||
db.getContentInRange(localId, localRadius, int64(max), int64(starting))
|
||||
|
||||
db.close()
|
||||
|
||||
for contentData in contentInRange:
|
||||
let cid = UInt256.fromBytesBE(contentData.contentId)
|
||||
p.storeContent(ByteList.init(contentData.contentKey), cid, contentData.content)
|
||||
|
||||
return ok()
|
@ -11,9 +11,8 @@ import
|
||||
json_rpc/[rpcproxy, rpcserver],
|
||||
stew/byteutils,
|
||||
../network/wire/portal_protocol,
|
||||
../network/network_seed,
|
||||
../eth_data/history_data_seeding,
|
||||
../database/[content_db, seed_db]
|
||||
../database/content_db
|
||||
|
||||
export rpcserver
|
||||
|
||||
@ -98,49 +97,3 @@ proc installPortalDebugApiHandlers*(
|
||||
return true
|
||||
else:
|
||||
raise newException(ValueError, $res.error)
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_storeContentInNodeRange") do(
|
||||
dbPath: string, max: uint32, starting: uint32
|
||||
) -> bool:
|
||||
let storeResult = p.storeContentInNodeRange(dbPath, max, starting)
|
||||
|
||||
if storeResult.isOk():
|
||||
return true
|
||||
else:
|
||||
raise newException(ValueError, $storeResult.error)
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_offerContentInNodeRange") do(
|
||||
dbPath: string, nodeId: NodeId, max: uint32, starting: uint32
|
||||
) -> int:
|
||||
# waiting for offer result, by the end of this call remote node should
|
||||
# have received offered content
|
||||
let offerResult = await p.offerContentInNodeRange(dbPath, nodeId, max, starting)
|
||||
|
||||
if offerResult.isOk():
|
||||
return offerResult.get()
|
||||
else:
|
||||
raise newException(ValueError, $offerResult.error)
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_depthContentPropagate") do(
|
||||
dbPath: string, max: uint32
|
||||
) -> bool:
|
||||
# TODO Consider making this call asynchronously without waiting for result
|
||||
# as for big seed db size it could take a loot of time.
|
||||
let propagateResult = await p.depthContentPropagate(dbPath, max)
|
||||
|
||||
if propagateResult.isOk():
|
||||
return true
|
||||
else:
|
||||
raise newException(ValueError, $propagateResult.error)
|
||||
|
||||
rpcServer.rpc("portal_" & network & "_breadthContentPropagate") do(
|
||||
dbPath: string
|
||||
) -> bool:
|
||||
# TODO Consider making this call asynchronously without waiting for result
|
||||
# as for big seed db size it could take a loot of time.
|
||||
let propagateResult = await p.breadthContentPropagate(dbPath)
|
||||
|
||||
if propagateResult.isOk():
|
||||
return true
|
||||
else:
|
||||
raise newException(ValueError, $propagateResult.error)
|
||||
|
@ -20,7 +20,6 @@ import
|
||||
../rpc/eth_rpc_client,
|
||||
../eth_data/[history_data_seeding, history_data_json_store, history_data_ssz_e2s],
|
||||
../network/history/[history_content, accumulator],
|
||||
../database/seed_db,
|
||||
../tests/test_history_util
|
||||
|
||||
type
|
||||
@ -333,200 +332,3 @@ procSuite "Portal testnet tests":
|
||||
# discard
|
||||
|
||||
await client.close()
|
||||
|
||||
asyncTest "Portal History - Propagate content from seed db":
|
||||
# Skipping this as it seems to fail now at offerContentInNodeRange, likely
|
||||
# due to not being possibly to validate block bodies. This would mean the
|
||||
# test is flawed and block headers should be offered before bodies and
|
||||
# receipts.
|
||||
# TODO: Split this up and activate test
|
||||
skip()
|
||||
|
||||
# const
|
||||
# headerFile = "./vendor/portal-spec-tests/tests/mainnet/history/headers/1000011-1000030.e2s"
|
||||
# accumulatorFile = "./vendor/portal-spec-tests/tests/mainnet/history/accumulator/epoch-accumulator-00122.ssz"
|
||||
# blockDataFile = "./fluffy/tests/blocks/mainnet_blocks_1000011_1000030.json"
|
||||
|
||||
# # Path for the temporary db. A separate dir is used as sqlite usually also
|
||||
# # creates wal files.
|
||||
# tempDbPath = "./fluffy/tests/blocks/tempDir/mainnet_blocks_1000011_1000030.sqlite3"
|
||||
|
||||
# let
|
||||
# blockHeaders = readBlockHeaders(headerFile).valueOr:
|
||||
# raiseAssert "Invalid header file: " & headerFile
|
||||
# epochAccumulator = readEpochAccumulatorCached(accumulatorFile).valueOr:
|
||||
# raiseAssert "Invalid epoch accumulator file: " & accumulatorFile
|
||||
# blockHeadersWithProof =
|
||||
# buildHeadersWithProof(blockHeaders, epochAccumulator).valueOr:
|
||||
# raiseAssert "Could not build headers with proof"
|
||||
# blockData =
|
||||
# readJsonType(blockDataFile, BlockDataTable).valueOr:
|
||||
# raiseAssert "Invalid block data file" & blockDataFile
|
||||
|
||||
# clients = await connectToRpcServers(config)
|
||||
|
||||
# var nodeInfos: seq[NodeInfo]
|
||||
# for client in clients:
|
||||
# let nodeInfo = await client.portal_history_nodeInfo()
|
||||
# await client.close()
|
||||
# nodeInfos.add(nodeInfo)
|
||||
|
||||
# let (dbFile, dbName) = getDbBasePathAndName(tempDbPath).unsafeGet()
|
||||
# createDir(dbFile)
|
||||
# let db = SeedDb.new(path = dbFile, name = dbName)
|
||||
# defer:
|
||||
# db.close()
|
||||
# removeDir(dbFile)
|
||||
|
||||
# # Fill seed db with block headers with proof
|
||||
# for (content, contentKey) in blockHeadersWithProof:
|
||||
# let contentId = history_content.toContentId(ByteList(contentKey))
|
||||
# db.put(contentId, contentKey, content)
|
||||
|
||||
# # Fill seed db with block bodies and receipts
|
||||
# for t in blocksContent(blockData, false):
|
||||
# db.put(t[0], t[1], t[2])
|
||||
|
||||
# let lastNodeIdx = len(nodeInfos) - 1
|
||||
|
||||
# # Store content in node 0 database
|
||||
# check (await clients[0].portal_history_storeContentInNodeRange(
|
||||
# tempDbPath, 100, 0))
|
||||
# await clients[0].close()
|
||||
|
||||
# # Offer content to node 1..63
|
||||
# for i in 1..lastNodeIdx:
|
||||
# let recipientId = nodeInfos[i].nodeId
|
||||
# let offerResponse = await retryUntil(
|
||||
# proc (): Future[int] {.async.} =
|
||||
# try:
|
||||
# let res = await clients[0].portal_history_offerContentInNodeRange(
|
||||
# tempDbPath, recipientId, 64, 0)
|
||||
# await clients[0].close()
|
||||
# return res
|
||||
# except CatchableError as exc:
|
||||
# await clients[0].close()
|
||||
# raise exc
|
||||
# ,
|
||||
# proc (os: int): bool = return true,
|
||||
# "Offer failed",
|
||||
# i
|
||||
# )
|
||||
# check:
|
||||
# offerResponse > 0
|
||||
|
||||
# for i, client in clients:
|
||||
# # Note: Once there is the Canonical Indices Network, we don't need to
|
||||
# # access this file anymore here for the block hashes.
|
||||
# for hash in blockData.blockHashes():
|
||||
# let content = await retryUntil(
|
||||
# proc (): Future[Option[BlockObject]] {.async.} =
|
||||
# try:
|
||||
# let res = await client.eth_getBlockByHash(hash.ethHashStr(), true)
|
||||
# await client.close()
|
||||
# return res
|
||||
# except CatchableError as exc:
|
||||
# await client.close()
|
||||
# raise exc
|
||||
# ,
|
||||
# proc (mc: Option[BlockObject]): bool = return mc.isSome(),
|
||||
# "Did not receive expected Block with hash " & hash.data.toHex(),
|
||||
# i
|
||||
# )
|
||||
# check content.isSome()
|
||||
|
||||
# let blockObj = content.get()
|
||||
# check blockObj.hash.get() == hash
|
||||
|
||||
# for tx in blockObj.transactions:
|
||||
# var txObj: TransactionObject
|
||||
# tx.fromJson("tx", txObj)
|
||||
# check txObj.blockHash.get() == hash
|
||||
|
||||
# await client.close()
|
||||
|
||||
asyncTest "Portal History - Propagate content from seed db in depth first fashion":
|
||||
# Skipping this test as it is flawed considering block headers should be
|
||||
# offered before bodies and receipts.
|
||||
# TODO: Split this up and activate test
|
||||
skip()
|
||||
|
||||
# const
|
||||
# headerFile = "./vendor/portal-spec-tests/tests/mainnet/history/headers/1000011-1000030.e2s"
|
||||
# accumulatorFile = "./vendor/portal-spec-tests/tests/mainnet/history/accumulator/epoch-accumulator-00122.ssz"
|
||||
# # Different set of data for each test as tests are statefull so previously
|
||||
# # propagated content is still in the network
|
||||
# blockDataFile = "./fluffy/tests/blocks/mainnet_blocks_1000040_1000050.json"
|
||||
|
||||
# # Path for the temporary db. A separate dir is used as sqlite usually also
|
||||
# # creates wal files.
|
||||
# tempDbPath = "./fluffy/tests/blocks/tempDir/mainnet_blocks_1000040_100050.sqlite3"
|
||||
|
||||
# let
|
||||
# blockHeaders = readBlockHeaders(headerFile).valueOr:
|
||||
# raiseAssert "Invalid header file: " & headerFile
|
||||
# epochAccumulator = readEpochAccumulatorCached(accumulatorFile).valueOr:
|
||||
# raiseAssert "Invalid epoch accumulator file: " & accumulatorFile
|
||||
# blockHeadersWithProof =
|
||||
# buildHeadersWithProof(blockHeaders, epochAccumulator).valueOr:
|
||||
# raiseAssert "Could not build headers with proof"
|
||||
# blockData =
|
||||
# readJsonType(blockDataFile, BlockDataTable).valueOr:
|
||||
# raiseAssert "Invalid block data file" & blockDataFile
|
||||
|
||||
# clients = await connectToRpcServers(config)
|
||||
|
||||
# var nodeInfos: seq[NodeInfo]
|
||||
# for client in clients:
|
||||
# let nodeInfo = await client.portal_history_nodeInfo()
|
||||
# await client.close()
|
||||
# nodeInfos.add(nodeInfo)
|
||||
|
||||
# let (dbFile, dbName) = getDbBasePathAndName(tempDbPath).unsafeGet()
|
||||
# createDir(dbFile)
|
||||
# let db = SeedDb.new(path = dbFile, name = dbName)
|
||||
# defer:
|
||||
# db.close()
|
||||
# removeDir(dbFile)
|
||||
|
||||
# # Fill seed db with block headers with proof
|
||||
# for (content, contentKey) in blockHeadersWithProof:
|
||||
# let contentId = history_content.toContentId(ByteList(contentKey))
|
||||
# db.put(contentId, contentKey, content)
|
||||
|
||||
# # Fill seed db with block bodies and receipts
|
||||
# for t in blocksContent(blockData, false):
|
||||
# db.put(t[0], t[1], t[2])
|
||||
|
||||
# check (await clients[0].portal_history_depthContentPropagate(tempDbPath, 64))
|
||||
# await clients[0].close()
|
||||
|
||||
# for i, client in clients:
|
||||
# # Note: Once there is the Canonical Indices Network, we don't need to
|
||||
# # access this file anymore here for the block hashes.
|
||||
# for hash in blockData.blockHashes():
|
||||
# let content = await retryUntil(
|
||||
# proc (): Future[Option[BlockObject]] {.async.} =
|
||||
# try:
|
||||
# let res = await client.eth_getBlockByHash(hash.ethHashStr(), true)
|
||||
# await client.close()
|
||||
# return res
|
||||
# except CatchableError as exc:
|
||||
# await client.close()
|
||||
# raise exc
|
||||
# ,
|
||||
# proc (mc: Option[BlockObject]): bool = return mc.isSome(),
|
||||
# "Did not receive expected Block with hash " & hash.data.toHex(),
|
||||
# i
|
||||
# )
|
||||
# check content.isSome()
|
||||
|
||||
# let blockObj = content.get()
|
||||
# check blockObj.hash.get() == hash
|
||||
|
||||
# for tx in blockObj.transactions:
|
||||
# var txObj: TransactionObject
|
||||
# tx.fromJson("tx", txObj)
|
||||
# check txObj.blockHash.get() == hash
|
||||
|
||||
# await client.close()
|
||||
|
@ -49,7 +49,6 @@ import
|
||||
json_rpc/rpcclient,
|
||||
snappy,
|
||||
ncli/e2store,
|
||||
../database/seed_db,
|
||||
../../premix/[downloader, parser],
|
||||
../network/history/[history_content, accumulator],
|
||||
../eth_data/[history_data_json_store, history_data_ssz_e2s, era1],
|
||||
@ -123,49 +122,11 @@ proc writeBlocksToJson(config: ExporterConf, client: RpcClient) =
|
||||
fatal "Error occured while closing file", error = e.msg
|
||||
quit 1
|
||||
|
||||
proc writeBlocksToDb(config: ExporterConf, client: RpcClient) =
|
||||
let db = SeedDb.new(distinctBase(config.dataDir), config.fileName)
|
||||
|
||||
defer:
|
||||
db.close()
|
||||
|
||||
for i in config.startBlock .. config.endBlock:
|
||||
let
|
||||
blck = downloadBlock(i, client)
|
||||
blockHash = blck.header.blockHash()
|
||||
contentKeyType = BlockKey(blockHash: blockHash)
|
||||
headerKey =
|
||||
encode(ContentKey(contentType: blockHeader, blockHeaderKey: contentKeyType))
|
||||
bodyKey = encode(ContentKey(contentType: blockBody, blockBodyKey: contentKeyType))
|
||||
receiptsKey =
|
||||
encode(ContentKey(contentType: receipts, receiptsKey: contentKeyType))
|
||||
|
||||
db.put(headerKey.toContentId(), headerKey.asSeq(), rlp.encode(blck.header))
|
||||
|
||||
# No need to seed empty lists into database
|
||||
if len(blck.body.transactions) > 0 or len(blck.body.uncles) > 0:
|
||||
let body = encode(blck.body)
|
||||
db.put(bodyKey.toContentId(), bodyKey.asSeq(), body)
|
||||
|
||||
if len(blck.receipts) > 0:
|
||||
let receipts = encode(blck.receipts)
|
||||
db.put(receiptsKey.toContentId(), receiptsKey.asSeq(), receipts)
|
||||
|
||||
info "Data successfuly written to db"
|
||||
|
||||
proc exportBlocks(config: ExporterConf, client: RpcClient) =
|
||||
case config.storageMode
|
||||
of JsonStorage:
|
||||
if config.headersOnly:
|
||||
writeHeadersToJson(config, client)
|
||||
else:
|
||||
writeBlocksToJson(config, client)
|
||||
of DbStorage:
|
||||
if config.headersOnly:
|
||||
fatal "Db mode not available for headers only"
|
||||
quit 1
|
||||
else:
|
||||
writeBlocksToDb(config, client)
|
||||
if config.headersOnly:
|
||||
writeHeadersToJson(config, client)
|
||||
else:
|
||||
writeBlocksToJson(config, client)
|
||||
|
||||
proc newRpcClient(web3Url: Web3Url): RpcClient =
|
||||
# TODO: I don't like this API. I think the creation of the RPC clients should
|
||||
|
@ -29,10 +29,6 @@ type
|
||||
kind*: Web3UrlKind
|
||||
url*: string
|
||||
|
||||
StorageMode* = enum
|
||||
JsonStorage
|
||||
DbStorage
|
||||
|
||||
const
|
||||
defaultDataDirDesc* = defaultDataDir()
|
||||
defaultBlockFileName* = "eth-block-data"
|
||||
@ -111,11 +107,6 @@ type
|
||||
defaultValueDesc: $defaultBlockFileName,
|
||||
name: "file-name"
|
||||
.}: string
|
||||
storageMode* {.
|
||||
desc: "Storage mode of block data export",
|
||||
defaultValue: JsonStorage,
|
||||
name: "storage-mode"
|
||||
.}: StorageMode
|
||||
headersOnly* {.
|
||||
desc: "Only export the headers instead of full blocks and receipts",
|
||||
defaultValue: false,
|
||||
@ -236,18 +227,6 @@ proc parseCmdArg*(T: type Web3Url, p: string): T {.raises: [ValueError].} =
|
||||
proc completeCmdArg*(T: type Web3Url, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type StorageMode, p: string): T {.raises: [ValueError].} =
|
||||
if p == "db":
|
||||
return DbStorage
|
||||
elif p == "json":
|
||||
return JsonStorage
|
||||
else:
|
||||
let msg = "Provided mode: " & p & " is not a valid. Should be `json` or `db`"
|
||||
raise newException(ValueError, msg)
|
||||
|
||||
proc completeCmdArg*(T: type StorageMode, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
func parseCmdArg*(
|
||||
T: type Eth2Digest, input: string
|
||||
): T {.raises: [ValueError, Defect].} =
|
||||
|
Loading…
x
Reference in New Issue
Block a user