Split parsing and seeding functionality for history data (#1218)

This commit is contained in:
Kim De Mey 2022-09-09 21:21:48 +02:00 committed by GitHub
parent c138a4913c
commit 011f44abea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 188 additions and 170 deletions

View File

@ -9,19 +9,17 @@
import
json_serialization, json_serialization/std/tables,
stew/[byteutils, io2, results], chronos, chronicles,
stew/[byteutils, io2, results], chronicles,
eth/[rlp, common/eth_types],
# TODO: `NetworkId` should not be in these private types
eth/p2p/private/p2p_types,
../nimbus/[chain_config, genesis],
"."/[content_db, seed_db],
./network/wire/portal_protocol,
./network/history/[history_content, accumulator]
../../nimbus/[chain_config, genesis],
../network/history/[history_content, accumulator]
export results, tables
# Helper calls to, offline, populate the database with the current existing json
# files with block data. Might move to some other storage format later on.
# Helper calls to parse history data from json files. Format currently
# unspecified and likely to change.
# Perhaps https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md
# can be interesting here too.
@ -72,7 +70,7 @@ iterator blockHashes*(blockData: BlockDataTable): BlockHash =
yield blockHash
func readBlockData(
func readBlockData*(
hash: string, blockData: BlockData, verify = false):
Result[seq[(ContentKey, seq[byte])], string] =
var res: seq[(ContentKey, seq[byte])]
@ -163,38 +161,6 @@ proc getGenesisHeader*(id: NetworkId = MainNet): BlockHeader =
except RlpError:
raise (ref Defect)(msg: "Genesis should be valid")
proc buildAccumulator*(dataFile: string): Result[Accumulator, string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
ok(buildAccumulator(headers))
proc buildAccumulatorData*(
dataFile: string):
Result[seq[(ContentKey, EpochAccumulator)], string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
ok(buildAccumulatorData(headers))
proc readAccumulator*(dataFile: string): Result[Accumulator, string] =
let res = ? readJsonType(dataFile, AccumulatorObject)
@ -222,126 +188,3 @@ proc readEpochAccumulator*(dataFile: string): Result[EpochAccumulator, string] =
ok(SSZ.decode(encodedAccumulator, EpochAccumulator))
except SszError as e:
err("Decoding epoch accumulator failed: " & e.msg)
proc historyStore*(
p: PortalProtocol, dataFile: string, verify = false):
Result[void, string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
for b in blocks(blockData, verify):
for value in b:
# Note: This is the slowest part due to the hashing that takes place.
p.storeContent(history_content.toContentId(value[0]), value[1])
ok()
proc propagateAccumulatorData*(
p: PortalProtocol, dataFile: string):
Future[Result[void, string]] {.async.} =
## Propagate all epoch accumulators created when building the accumulator
## from the block headers.
## dataFile holds block data
let epochAccumulators = buildAccumulatorData(dataFile)
if epochAccumulators.isErr():
return err(epochAccumulators.error)
else:
for (key, epochAccumulator) in epochAccumulators.get():
let content = SSZ.encode(epochAccumulator)
p.storeContent(
history_content.toContentId(key), content)
await p.neighborhoodGossip(
ContentKeysList(@[encode(key)]), @[content])
return ok()
proc propagateEpochAccumulator*(
p: PortalProtocol, dataFile: string):
Future[Result[void, string]] {.async.} =
## Propagate a specific epoch accumulator into the network.
## dataFile holds the SSZ serialized epoch accumulator
let epochAccumulatorRes = readEpochAccumulator(dataFile)
if epochAccumulatorRes.isErr():
return err(epochAccumulatorRes.error)
else:
let
accumulator = epochAccumulatorRes.get()
rootHash = accumulator.hash_tree_root()
key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
p.storeContent(
history_content.toContentId(key), SSZ.encode(accumulator))
await p.neighborhoodGossip(
ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)])
return ok()
proc historyPropagate*(
p: PortalProtocol, dataFile: string, verify = false):
Future[Result[void, string]] {.async.} =
const concurrentGossips = 20
var gossipQueue =
newAsyncQueue[(ContentKeysList, seq[byte])](concurrentGossips)
var gossipWorkers: seq[Future[void]]
proc gossipWorker(p: PortalProtocol) {.async.} =
while true:
let (keys, content) = await gossipQueue.popFirst()
await p.neighborhoodGossip(keys, @[content])
for i in 0 ..< concurrentGossips:
gossipWorkers.add(gossipWorker(p))
let blockData = readJsonType(dataFile, BlockDataTable)
if blockData.isOk():
for b in blocks(blockData.get(), verify):
for value in b:
# Only sending non empty data, e.g. empty receipts are not send
# TODO: Could do a similar thing for a combination of empty
# txs and empty uncles, as then the serialization is always the same.
if value[1].len() > 0:
info "Seeding block content into the network", contentKey = value[0]
# Note: This is the slowest part due to the hashing that takes place.
let contentId = history_content.toContentId(value[0])
p.storeContent(contentId, value[1])
await gossipQueue.addLast(
(ContentKeysList(@[encode(value[0])]), value[1]))
return ok()
else:
return err(blockData.error)
proc historyPropagateBlock*(
p: PortalProtocol, dataFile: string, blockHash: string, verify = false):
Future[Result[void, string]] {.async.} =
let blockDataTable = readJsonType(dataFile, BlockDataTable)
if blockDataTable.isOk():
let b =
try:
blockDataTable.get()[blockHash]
except KeyError:
return err("Block hash not found in block data file")
let blockDataRes = readBlockData(blockHash, b)
if blockDataRes.isErr:
return err(blockDataRes.error)
let blockData = blockDataRes.get()
for value in blockData:
info "Seeding block content into the network", contentKey = value[0]
let contentId = history_content.toContentId(value[0])
p.storeContent(contentId, value[1])
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]])
return ok()
else:
return err(blockDataTable.error)

View File

@ -0,0 +1,174 @@
# # Nimbus - Portal Network
# # Copyright (c) 2022 Status Research & Development GmbH
# # Licensed and distributed under either of
# # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# # at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
stew/results, chronos, chronicles,
eth/common/eth_types,
../network/wire/portal_protocol,
../network/history/[history_content, accumulator],
./history_data_parser
export results
### Helper calls to seed the local database and/or the network
proc buildAccumulator*(dataFile: string): Result[Accumulator, string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
ok(buildAccumulator(headers))
proc buildAccumulatorData*(
dataFile: string):
Result[seq[(ContentKey, EpochAccumulator)], string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
ok(buildAccumulatorData(headers))
proc historyStore*(
p: PortalProtocol, dataFile: string, verify = false):
Result[void, string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
for b in blocks(blockData, verify):
for value in b:
# Note: This is the slowest part due to the hashing that takes place.
p.storeContent(history_content.toContentId(value[0]), value[1])
ok()
proc propagateAccumulatorData*(
p: PortalProtocol, dataFile: string):
Future[Result[void, string]] {.async.} =
## Propagate all epoch accumulators created when building the accumulator
## from the block headers.
## dataFile holds block data
let epochAccumulators = buildAccumulatorData(dataFile)
if epochAccumulators.isErr():
return err(epochAccumulators.error)
else:
for (key, epochAccumulator) in epochAccumulators.get():
let content = SSZ.encode(epochAccumulator)
p.storeContent(
history_content.toContentId(key), content)
await p.neighborhoodGossip(
ContentKeysList(@[encode(key)]), @[content])
return ok()
proc propagateEpochAccumulator*(
p: PortalProtocol, dataFile: string):
Future[Result[void, string]] {.async.} =
## Propagate a specific epoch accumulator into the network.
## dataFile holds the SSZ serialized epoch accumulator
let epochAccumulatorRes = readEpochAccumulator(dataFile)
if epochAccumulatorRes.isErr():
return err(epochAccumulatorRes.error)
else:
let
accumulator = epochAccumulatorRes.get()
rootHash = accumulator.hash_tree_root()
key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
p.storeContent(
history_content.toContentId(key), SSZ.encode(accumulator))
await p.neighborhoodGossip(
ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)])
return ok()
proc historyPropagate*(
p: PortalProtocol, dataFile: string, verify = false):
Future[Result[void, string]] {.async.} =
const concurrentGossips = 20
var gossipQueue =
newAsyncQueue[(ContentKeysList, seq[byte])](concurrentGossips)
var gossipWorkers: seq[Future[void]]
proc gossipWorker(p: PortalProtocol) {.async.} =
while true:
let (keys, content) = await gossipQueue.popFirst()
await p.neighborhoodGossip(keys, @[content])
for i in 0 ..< concurrentGossips:
gossipWorkers.add(gossipWorker(p))
let blockData = readJsonType(dataFile, BlockDataTable)
if blockData.isOk():
for b in blocks(blockData.get(), verify):
for value in b:
# Only sending non empty data, e.g. empty receipts are not send
# TODO: Could do a similar thing for a combination of empty
# txs and empty uncles, as then the serialization is always the same.
if value[1].len() > 0:
info "Seeding block content into the network", contentKey = value[0]
# Note: This is the slowest part due to the hashing that takes place.
let contentId = history_content.toContentId(value[0])
p.storeContent(contentId, value[1])
await gossipQueue.addLast(
(ContentKeysList(@[encode(value[0])]), value[1]))
return ok()
else:
return err(blockData.error)
proc historyPropagateBlock*(
p: PortalProtocol, dataFile: string, blockHash: string, verify = false):
Future[Result[void, string]] {.async.} =
let blockDataTable = readJsonType(dataFile, BlockDataTable)
if blockDataTable.isOk():
let b =
try:
blockDataTable.get()[blockHash]
except KeyError:
return err("Block hash not found in block data file")
let blockDataRes = readBlockData(blockHash, b)
if blockDataRes.isErr:
return err(blockDataRes.error)
let blockData = blockDataRes.get()
for value in blockData:
info "Seeding block content into the network", contentKey = value[0]
let contentId = history_content.toContentId(value[0])
p.storeContent(contentId, value[1])
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]])
return ok()
else:
return err(blockDataTable.error)

View File

@ -20,7 +20,8 @@ import
./network/state/[state_network, state_content],
./network/history/[history_network, history_content],
./network/wire/[portal_stream, portal_protocol_config],
"."/[content_db, populate_db]
./data/[history_data_seeding, history_data_parser],
./content_db
proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] =
try:

View File

@ -13,7 +13,7 @@ import
eth/common/eth_types,
ssz_serialization, ssz_serialization/[proofs, merkleization],
../../common/common_types,
../../populate_db,
../../data/history_data_seeding,
"."/[history_content, accumulator]
type

View File

@ -15,8 +15,7 @@ import
../../content_db,
../../../nimbus/constants,
../wire/[portal_protocol, portal_stream, portal_protocol_config],
"."/[history_content, accumulator],
../../populate_db
"."/[history_content, accumulator]
logScope:
topics = "portal_hist"

View File

@ -14,7 +14,8 @@ import
../../nimbus/rpc/[hexstrings, rpc_types],
../rpc/portal_rpc_client,
../rpc/eth_rpc_client,
".."/[populate_db, seed_db]
../data/[history_data_seeding, history_data_parser],
../seed_db
type
FutureCallback[A] = proc (): Future[A] {.gcsafe, raises: [Defect].}

View File

@ -12,7 +12,7 @@
import
unittest2, stint, stew/byteutils,
eth/common/eth_types_rlp,
../populate_db,
../data/history_data_parser,
../network/history/[history_content, accumulator]
func buildProof(

View File

@ -14,7 +14,7 @@ import
stew/[byteutils, results],
eth/[common/eth_types, rlp],
../common/common_types,
../populate_db,
../data/history_data_parser,
../network/history/history_network
const