diff --git a/fluffy/populate_db.nim b/fluffy/data/history_data_parser.nim similarity index 50% rename from fluffy/populate_db.nim rename to fluffy/data/history_data_parser.nim index 342f502fa..b25f9e863 100644 --- a/fluffy/populate_db.nim +++ b/fluffy/data/history_data_parser.nim @@ -9,19 +9,17 @@ import json_serialization, json_serialization/std/tables, - stew/[byteutils, io2, results], chronos, chronicles, + stew/[byteutils, io2, results], chronicles, eth/[rlp, common/eth_types], # TODO: `NetworkId` should not be in these private types eth/p2p/private/p2p_types, - ../nimbus/[chain_config, genesis], - "."/[content_db, seed_db], - ./network/wire/portal_protocol, - ./network/history/[history_content, accumulator] + ../../nimbus/[chain_config, genesis], + ../network/history/[history_content, accumulator] export results, tables -# Helper calls to, offline, populate the database with the current existing json -# files with block data. Might move to some other storage format later on. +# Helper calls to parse history data from json files. Format currently +# unspecified and likely to change. # Perhaps https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md # can be interesting here too. @@ -72,7 +70,7 @@ iterator blockHashes*(blockData: BlockDataTable): BlockHash = yield blockHash -func readBlockData( +func readBlockData*( hash: string, blockData: BlockData, verify = false): Result[seq[(ContentKey, seq[byte])], string] = var res: seq[(ContentKey, seq[byte])] @@ -163,38 +161,6 @@ proc getGenesisHeader*(id: NetworkId = MainNet): BlockHeader = except RlpError: raise (ref Defect)(msg: "Genesis should be valid") -proc buildAccumulator*(dataFile: string): Result[Accumulator, string] = - let blockData = ? readJsonType(dataFile, BlockDataTable) - - var headers: seq[BlockHeader] - # Len of headers from blockdata + genesis header - headers.setLen(blockData.len() + 1) - - headers[0] = getGenesisHeader() - - for k, v in blockData.pairs: - let header = ? v.readBlockHeader() - headers[header.blockNumber.truncate(int)] = header - - ok(buildAccumulator(headers)) - -proc buildAccumulatorData*( - dataFile: string): - Result[seq[(ContentKey, EpochAccumulator)], string] = - let blockData = ? readJsonType(dataFile, BlockDataTable) - - var headers: seq[BlockHeader] - # Len of headers from blockdata + genesis header - headers.setLen(blockData.len() + 1) - - headers[0] = getGenesisHeader() - - for k, v in blockData.pairs: - let header = ? v.readBlockHeader() - headers[header.blockNumber.truncate(int)] = header - - ok(buildAccumulatorData(headers)) - proc readAccumulator*(dataFile: string): Result[Accumulator, string] = let res = ? readJsonType(dataFile, AccumulatorObject) @@ -222,126 +188,3 @@ proc readEpochAccumulator*(dataFile: string): Result[EpochAccumulator, string] = ok(SSZ.decode(encodedAccumulator, EpochAccumulator)) except SszError as e: err("Decoding epoch accumulator failed: " & e.msg) - -proc historyStore*( - p: PortalProtocol, dataFile: string, verify = false): - Result[void, string] = - let blockData = ? readJsonType(dataFile, BlockDataTable) - - for b in blocks(blockData, verify): - for value in b: - # Note: This is the slowest part due to the hashing that takes place. - p.storeContent(history_content.toContentId(value[0]), value[1]) - - ok() - -proc propagateAccumulatorData*( - p: PortalProtocol, dataFile: string): - Future[Result[void, string]] {.async.} = - ## Propagate all epoch accumulators created when building the accumulator - ## from the block headers. - ## dataFile holds block data - let epochAccumulators = buildAccumulatorData(dataFile) - if epochAccumulators.isErr(): - return err(epochAccumulators.error) - else: - for (key, epochAccumulator) in epochAccumulators.get(): - let content = SSZ.encode(epochAccumulator) - - p.storeContent( - history_content.toContentId(key), content) - await p.neighborhoodGossip( - ContentKeysList(@[encode(key)]), @[content]) - - return ok() - -proc propagateEpochAccumulator*( - p: PortalProtocol, dataFile: string): - Future[Result[void, string]] {.async.} = - ## Propagate a specific epoch accumulator into the network. - ## dataFile holds the SSZ serialized epoch accumulator - let epochAccumulatorRes = readEpochAccumulator(dataFile) - if epochAccumulatorRes.isErr(): - return err(epochAccumulatorRes.error) - else: - let - accumulator = epochAccumulatorRes.get() - rootHash = accumulator.hash_tree_root() - key = ContentKey( - contentType: epochAccumulator, - epochAccumulatorKey: EpochAccumulatorKey( - epochHash: rootHash)) - - p.storeContent( - history_content.toContentId(key), SSZ.encode(accumulator)) - await p.neighborhoodGossip( - ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)]) - - return ok() - -proc historyPropagate*( - p: PortalProtocol, dataFile: string, verify = false): - Future[Result[void, string]] {.async.} = - const concurrentGossips = 20 - - var gossipQueue = - newAsyncQueue[(ContentKeysList, seq[byte])](concurrentGossips) - var gossipWorkers: seq[Future[void]] - - proc gossipWorker(p: PortalProtocol) {.async.} = - while true: - let (keys, content) = await gossipQueue.popFirst() - - await p.neighborhoodGossip(keys, @[content]) - - for i in 0 ..< concurrentGossips: - gossipWorkers.add(gossipWorker(p)) - - let blockData = readJsonType(dataFile, BlockDataTable) - if blockData.isOk(): - for b in blocks(blockData.get(), verify): - for value in b: - # Only sending non empty data, e.g. empty receipts are not send - # TODO: Could do a similar thing for a combination of empty - # txs and empty uncles, as then the serialization is always the same. - if value[1].len() > 0: - info "Seeding block content into the network", contentKey = value[0] - # Note: This is the slowest part due to the hashing that takes place. - let contentId = history_content.toContentId(value[0]) - p.storeContent(contentId, value[1]) - - await gossipQueue.addLast( - (ContentKeysList(@[encode(value[0])]), value[1])) - - return ok() - else: - return err(blockData.error) - -proc historyPropagateBlock*( - p: PortalProtocol, dataFile: string, blockHash: string, verify = false): - Future[Result[void, string]] {.async.} = - let blockDataTable = readJsonType(dataFile, BlockDataTable) - - if blockDataTable.isOk(): - let b = - try: - blockDataTable.get()[blockHash] - except KeyError: - return err("Block hash not found in block data file") - - let blockDataRes = readBlockData(blockHash, b) - if blockDataRes.isErr: - return err(blockDataRes.error) - - let blockData = blockDataRes.get() - - for value in blockData: - info "Seeding block content into the network", contentKey = value[0] - let contentId = history_content.toContentId(value[0]) - p.storeContent(contentId, value[1]) - - await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]]) - - return ok() - else: - return err(blockDataTable.error) diff --git a/fluffy/data/history_data_seeding.nim b/fluffy/data/history_data_seeding.nim new file mode 100644 index 000000000..9f130fde7 --- /dev/null +++ b/fluffy/data/history_data_seeding.nim @@ -0,0 +1,174 @@ +# # Nimbus - Portal Network +# # Copyright (c) 2022 Status Research & Development GmbH +# # Licensed and distributed under either of +# # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# # at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + stew/results, chronos, chronicles, + eth/common/eth_types, + ../network/wire/portal_protocol, + ../network/history/[history_content, accumulator], + ./history_data_parser + +export results + +### Helper calls to seed the local database and/or the network + +proc buildAccumulator*(dataFile: string): Result[Accumulator, string] = + let blockData = ? readJsonType(dataFile, BlockDataTable) + + var headers: seq[BlockHeader] + # Len of headers from blockdata + genesis header + headers.setLen(blockData.len() + 1) + + headers[0] = getGenesisHeader() + + for k, v in blockData.pairs: + let header = ? v.readBlockHeader() + headers[header.blockNumber.truncate(int)] = header + + ok(buildAccumulator(headers)) + +proc buildAccumulatorData*( + dataFile: string): + Result[seq[(ContentKey, EpochAccumulator)], string] = + let blockData = ? readJsonType(dataFile, BlockDataTable) + + var headers: seq[BlockHeader] + # Len of headers from blockdata + genesis header + headers.setLen(blockData.len() + 1) + + headers[0] = getGenesisHeader() + + for k, v in blockData.pairs: + let header = ? v.readBlockHeader() + headers[header.blockNumber.truncate(int)] = header + + ok(buildAccumulatorData(headers)) + +proc historyStore*( + p: PortalProtocol, dataFile: string, verify = false): + Result[void, string] = + let blockData = ? readJsonType(dataFile, BlockDataTable) + + for b in blocks(blockData, verify): + for value in b: + # Note: This is the slowest part due to the hashing that takes place. + p.storeContent(history_content.toContentId(value[0]), value[1]) + + ok() + +proc propagateAccumulatorData*( + p: PortalProtocol, dataFile: string): + Future[Result[void, string]] {.async.} = + ## Propagate all epoch accumulators created when building the accumulator + ## from the block headers. + ## dataFile holds block data + let epochAccumulators = buildAccumulatorData(dataFile) + if epochAccumulators.isErr(): + return err(epochAccumulators.error) + else: + for (key, epochAccumulator) in epochAccumulators.get(): + let content = SSZ.encode(epochAccumulator) + + p.storeContent( + history_content.toContentId(key), content) + await p.neighborhoodGossip( + ContentKeysList(@[encode(key)]), @[content]) + + return ok() + +proc propagateEpochAccumulator*( + p: PortalProtocol, dataFile: string): + Future[Result[void, string]] {.async.} = + ## Propagate a specific epoch accumulator into the network. + ## dataFile holds the SSZ serialized epoch accumulator + let epochAccumulatorRes = readEpochAccumulator(dataFile) + if epochAccumulatorRes.isErr(): + return err(epochAccumulatorRes.error) + else: + let + accumulator = epochAccumulatorRes.get() + rootHash = accumulator.hash_tree_root() + key = ContentKey( + contentType: epochAccumulator, + epochAccumulatorKey: EpochAccumulatorKey( + epochHash: rootHash)) + + p.storeContent( + history_content.toContentId(key), SSZ.encode(accumulator)) + await p.neighborhoodGossip( + ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)]) + + return ok() + +proc historyPropagate*( + p: PortalProtocol, dataFile: string, verify = false): + Future[Result[void, string]] {.async.} = + const concurrentGossips = 20 + + var gossipQueue = + newAsyncQueue[(ContentKeysList, seq[byte])](concurrentGossips) + var gossipWorkers: seq[Future[void]] + + proc gossipWorker(p: PortalProtocol) {.async.} = + while true: + let (keys, content) = await gossipQueue.popFirst() + + await p.neighborhoodGossip(keys, @[content]) + + for i in 0 ..< concurrentGossips: + gossipWorkers.add(gossipWorker(p)) + + let blockData = readJsonType(dataFile, BlockDataTable) + if blockData.isOk(): + for b in blocks(blockData.get(), verify): + for value in b: + # Only sending non empty data, e.g. empty receipts are not send + # TODO: Could do a similar thing for a combination of empty + # txs and empty uncles, as then the serialization is always the same. + if value[1].len() > 0: + info "Seeding block content into the network", contentKey = value[0] + # Note: This is the slowest part due to the hashing that takes place. + let contentId = history_content.toContentId(value[0]) + p.storeContent(contentId, value[1]) + + await gossipQueue.addLast( + (ContentKeysList(@[encode(value[0])]), value[1])) + + return ok() + else: + return err(blockData.error) + +proc historyPropagateBlock*( + p: PortalProtocol, dataFile: string, blockHash: string, verify = false): + Future[Result[void, string]] {.async.} = + let blockDataTable = readJsonType(dataFile, BlockDataTable) + + if blockDataTable.isOk(): + let b = + try: + blockDataTable.get()[blockHash] + except KeyError: + return err("Block hash not found in block data file") + + let blockDataRes = readBlockData(blockHash, b) + if blockDataRes.isErr: + return err(blockDataRes.error) + + let blockData = blockDataRes.get() + + for value in blockData: + info "Seeding block content into the network", contentKey = value[0] + let contentId = history_content.toContentId(value[0]) + p.storeContent(contentId, value[1]) + + await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), @[value[1]]) + + return ok() + else: + return err(blockDataTable.error) diff --git a/fluffy/fluffy.nim b/fluffy/fluffy.nim index bb9ef726f..a10b3612a 100644 --- a/fluffy/fluffy.nim +++ b/fluffy/fluffy.nim @@ -20,7 +20,8 @@ import ./network/state/[state_network, state_content], ./network/history/[history_network, history_content], ./network/wire/[portal_stream, portal_protocol_config], - "."/[content_db, populate_db] + ./data/[history_data_seeding, history_data_parser], + ./content_db proc initializeBridgeClient(maybeUri: Option[string]): Option[BridgeClient] = try: diff --git a/fluffy/network/history/accumulator_db.nim b/fluffy/network/history/accumulator_db.nim index 998c83317..e77c84a03 100644 --- a/fluffy/network/history/accumulator_db.nim +++ b/fluffy/network/history/accumulator_db.nim @@ -13,7 +13,7 @@ import eth/common/eth_types, ssz_serialization, ssz_serialization/[proofs, merkleization], ../../common/common_types, - ../../populate_db, + ../../data/history_data_seeding, "."/[history_content, accumulator] type diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 08114f765..d107b4b67 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -15,8 +15,7 @@ import ../../content_db, ../../../nimbus/constants, ../wire/[portal_protocol, portal_stream, portal_protocol_config], - "."/[history_content, accumulator], - ../../populate_db + "."/[history_content, accumulator] logScope: topics = "portal_hist" diff --git a/fluffy/scripts/test_portal_testnet.nim b/fluffy/scripts/test_portal_testnet.nim index 1cb951abd..010cc76c5 100644 --- a/fluffy/scripts/test_portal_testnet.nim +++ b/fluffy/scripts/test_portal_testnet.nim @@ -14,7 +14,8 @@ import ../../nimbus/rpc/[hexstrings, rpc_types], ../rpc/portal_rpc_client, ../rpc/eth_rpc_client, - ".."/[populate_db, seed_db] + ../data/[history_data_seeding, history_data_parser], + ../seed_db type FutureCallback[A] = proc (): Future[A] {.gcsafe, raises: [Defect].} diff --git a/fluffy/tests/test_accumulator.nim b/fluffy/tests/test_accumulator.nim index 2784a0c01..b660a11ac 100644 --- a/fluffy/tests/test_accumulator.nim +++ b/fluffy/tests/test_accumulator.nim @@ -12,7 +12,7 @@ import unittest2, stint, stew/byteutils, eth/common/eth_types_rlp, - ../populate_db, + ../data/history_data_parser, ../network/history/[history_content, accumulator] func buildProof( diff --git a/fluffy/tests/test_history_validation.nim b/fluffy/tests/test_history_validation.nim index 3efcd15ad..0734bddd3 100644 --- a/fluffy/tests/test_history_validation.nim +++ b/fluffy/tests/test_history_validation.nim @@ -14,7 +14,7 @@ import stew/[byteutils, results], eth/[common/eth_types, rlp], ../common/common_types, - ../populate_db, + ../data/history_data_parser, ../network/history/history_network const