diff --git a/fluffy/data/history_data_parser.nim b/fluffy/data/history_data_parser.nim index bbfe73eb8..072338603 100644 --- a/fluffy/data/history_data_parser.nim +++ b/fluffy/data/history_data_parser.nim @@ -206,14 +206,8 @@ proc readAccumulator*(file: string): Result[FinishedAccumulator, string] = err("Failed decoding accumulator: " & e.msg) -proc readEpochAccumulator*(dataFile: string): Result[EpochAccumulator, string] = - let res = ? readJsonType(dataFile, EpochAccumulatorObject) - - let encodedAccumulator = - try: - res.epochAccumulator.hexToSeqByte() - except ValueError as e: - return err("Invalid hex data for accumulator: " & e.msg) +proc readEpochAccumulator*(file: string): Result[EpochAccumulator, string] = + let encodedAccumulator = ? readAllFile(file).mapErr(toString) try: ok(SSZ.decode(encodedAccumulator, EpochAccumulator)) diff --git a/fluffy/data/history_data_seeding.nim b/fluffy/data/history_data_seeding.nim index 92d8e7217..5bb439f76 100644 --- a/fluffy/data/history_data_seeding.nim +++ b/fluffy/data/history_data_seeding.nim @@ -8,6 +8,7 @@ {.push raises: [Defect].} import + std/[strformat, os], stew/results, chronos, chronicles, eth/common/eth_types, ../network/wire/portal_protocol, @@ -18,38 +19,6 @@ export results ### Helper calls to seed the local database and/or the network -proc buildAccumulator*(dataFile: string): Result[FinishedAccumulator, string] = - let blockData = ? readJsonType(dataFile, BlockDataTable) - - var headers: seq[BlockHeader] - # Len of headers from blockdata + genesis header - headers.setLen(blockData.len() + 1) - - headers[0] = getGenesisHeader() - - for k, v in blockData.pairs: - let header = ? v.readBlockHeader() - headers[header.blockNumber.truncate(int)] = header - - buildAccumulator(headers) - -proc buildAccumulatorData*( - dataFile: string): - Result[seq[(ContentKey, EpochAccumulator)], string] = - let blockData = ? readJsonType(dataFile, BlockDataTable) - - var headers: seq[BlockHeader] - # Len of headers from blockdata + genesis header - headers.setLen(blockData.len() + 1) - - headers[0] = getGenesisHeader() - - for k, v in blockData.pairs: - let header = ? v.readBlockHeader() - headers[header.blockNumber.truncate(int)] = header - - ok(buildAccumulatorData(headers)) - proc historyStore*( p: PortalProtocol, dataFile: string, verify = false): Result[void, string] = @@ -62,32 +31,12 @@ proc historyStore*( ok() -proc propagateAccumulatorData*( - p: PortalProtocol, dataFile: string): - Future[Result[void, string]] {.async.} = - ## Propagate all epoch accumulators created when building the accumulator - ## from the block headers. - ## dataFile holds block data - let epochAccumulators = buildAccumulatorData(dataFile) - if epochAccumulators.isErr(): - return err(epochAccumulators.error) - else: - for (key, epochAccumulator) in epochAccumulators.get(): - let content = SSZ.encode(epochAccumulator) - - p.storeContent( - history_content.toContentId(key), content) - discard await p.neighborhoodGossip( - ContentKeysList(@[encode(key)]), @[content]) - - return ok() - proc propagateEpochAccumulator*( - p: PortalProtocol, dataFile: string): + p: PortalProtocol, file: string): Future[Result[void, string]] {.async.} = ## Propagate a specific epoch accumulator into the network. - ## dataFile holds the SSZ serialized epoch accumulator - let epochAccumulatorRes = readEpochAccumulator(dataFile) + ## file holds the SSZ serialized epoch accumulator. + let epochAccumulatorRes = readEpochAccumulator(file) if epochAccumulatorRes.isErr(): return err(epochAccumulatorRes.error) else: @@ -99,13 +48,35 @@ proc propagateEpochAccumulator*( epochAccumulatorKey: EpochAccumulatorKey( epochHash: rootHash)) + # Note: The file actually holds the SSZ encoded accumulator, but we need + # to decode as we need the root for the content key. + encodedAccumulator = SSZ.encode(accumulator) + info "Gossiping epoch accumulator", rootHash + p.storeContent( - history_content.toContentId(key), SSZ.encode(accumulator)) + history_content.toContentId(key), encodedAccumulator) discard await p.neighborhoodGossip( - ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)]) + ContentKeysList(@[encode(key)]), @[encodedAccumulator]) return ok() +proc propagateEpochAccumulators*( + p: PortalProtocol, path: string): + Future[Result[void, string]] {.async.} = + ## Propagate all epoch accumulators created when building the accumulator + ## from the block headers. + ## path is a directory that holds all SSZ encoded epoch accumulator files. + for i in 0.. bool: - let res = await p.propagateAccumulatorData(dataFile) - if res.isOk(): - return true - else: - raise newException(ValueError, $res.error) - rpcServer.rpc("portal_" & network & "_propagateEpochAccumulator") do( dataFile: string) -> bool: let res = await p.propagateEpochAccumulator(dataFile) if res.isOk(): return true else: - echo $res.error + raise newException(ValueError, $res.error) + + rpcServer.rpc("portal_" & network & "_propagateEpochAccumulators") do( + path: string) -> bool: + let res = await p.propagateEpochAccumulators(path) + if res.isOk(): + return true + else: raise newException(ValueError, $res.error) rpcServer.rpc("portal_" & network & "_storeContentInNodeRange") do( diff --git a/fluffy/tests/test_accumulator.nim b/fluffy/tests/test_accumulator.nim index 28174f174..f0cb437a4 100644 --- a/fluffy/tests/test_accumulator.nim +++ b/fluffy/tests/test_accumulator.nim @@ -13,15 +13,15 @@ import unittest2, stint, eth/common/eth_types_rlp, ../data/history_data_parser, - ../network/history/[history_content, accumulator] + ../network/history/[history_content, accumulator], + ./test_helpers func buildProof( - epochAccumulators: seq[(ContentKey, EpochAccumulator)], - header: BlockHeader): + epochAccumulators: seq[EpochAccumulator], header: BlockHeader): Result[seq[Digest], string] = let epochIndex = getEpochIndex(header) - epochAccumulator = epochAccumulators[epochIndex][1] + epochAccumulator = epochAccumulators[epochIndex] headerRecordIndex = getHeaderRecordIndex(header, epochIndex) gIndex = GeneralizedIndex(epochSize*2*2 + (headerRecordIndex*2)) @@ -54,13 +54,9 @@ suite "Header Accumulator": headers.add(BlockHeader( blockNumber: i.stuint(256), difficulty: 1.stuint(256))) - let - accumulatorRes = buildAccumulator(headers) - epochAccumulators = buildAccumulatorData(headers) - + let accumulatorRes = buildAccumulatorData(headers) check accumulatorRes.isOk() - - let accumulator = accumulatorRes.get() + let (accumulator, epochAccumulators) = accumulatorRes.get() block: # Test valid headers for i in headersToTest: diff --git a/fluffy/tests/test_helpers.nim b/fluffy/tests/test_helpers.nim index cb700d1f9..c4b9bf81d 100644 --- a/fluffy/tests/test_helpers.nim +++ b/fluffy/tests/test_helpers.nim @@ -9,7 +9,8 @@ import stew/shims/net, eth/keys, eth/p2p/discoveryv5/[enr, node, routing_table], - eth/p2p/discoveryv5/protocol as discv5_protocol + eth/p2p/discoveryv5/protocol as discv5_protocol, + ../network/history/accumulator proc localAddress*(port: int): Address = Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port)) @@ -43,3 +44,31 @@ proc genByteSeq*(length: int): seq[byte] = resultSeq[i] = byte(i) inc i return resultSeq + +func buildAccumulator*( + headers: seq[BlockHeader]): Result[FinishedAccumulator, string] = + var accumulator: Accumulator + for header in headers: + updateAccumulator(accumulator, header) + + if header.blockNumber.truncate(uint64) == mergeBlockNumber - 1: + return ok(finishAccumulator(accumulator)) + + err("Not enough headers provided to finish the accumulator") + +func buildAccumulatorData*(headers: seq[BlockHeader]): + Result[(FinishedAccumulator, seq[EpochAccumulator]), string] = + var accumulator: Accumulator + var epochAccumulators: seq[EpochAccumulator] + for header in headers: + updateAccumulator(accumulator, header) + + if accumulator.currentEpoch.len() == epochSize: + epochAccumulators.add(accumulator.currentEpoch) + + if header.blockNumber.truncate(uint64) == mergeBlockNumber - 1: + epochAccumulators.add(accumulator.currentEpoch) + + return ok((finishAccumulator(accumulator), epochAccumulators)) + + err("Not enough headers provided to finish the accumulator") diff --git a/fluffy/tests/test_history_network.nim b/fluffy/tests/test_history_network.nim index 291a60d73..7ade3d1a1 100644 --- a/fluffy/tests/test_history_network.nim +++ b/fluffy/tests/test_history_network.nim @@ -8,7 +8,8 @@ import std/os, testutils/unittests, chronos, - eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table, + eth/p2p/discoveryv5/protocol as discv5_protocol, + eth/p2p/discoveryv5/routing_table, eth/common/eth_types_rlp, eth/rlp, ../network/wire/[portal_protocol, portal_stream, portal_protocol_config], @@ -89,10 +90,12 @@ procSuite "History Content Network": epochSize*3 + 1, int(lastBlockNumber)] + let headers = createEmptyHeaders(0, int(lastBlockNumber)) + let accumulatorRes = buildAccumulatorData(headers) + check accumulatorRes.isOk() + let - headers = createEmptyHeaders(0, int(lastBlockNumber)) - masterAccumulator = buildAccumulator(headers).get() - epochAccumulators = buildAccumulatorData(headers) + (masterAccumulator, epochAccumulators) = accumulatorRes.get() historyNode1 = newHistoryNode(rng, 20302, masterAccumulator) historyNode2 = newHistoryNode(rng, 20303, masterAccumulator) @@ -107,8 +110,15 @@ procSuite "History Content Network": headerEncoded = rlp.encode(h) historyNode2.portalProtocol().storeContent(contentId, headerEncoded) - for (contentKey, epochAccumulator) in epochAccumulators: - let contentId = toContentId(contentKey) + # Need to store the epoch accumulators to be able to do the block to hash + # mapping + for epochAccumulator in epochAccumulators: + let + rootHash = epochAccumulator.hash_tree_root() + contentKey = ContentKey( + contentType: ContentType.epochAccumulator, + epochAccumulatorKey: EpochAccumulatorKey(epochHash: rootHash)) + contentId = toContentId(contentKey) historyNode2.portalProtocol().storeContent( contentId, SSZ.encode(epochAccumulator)) @@ -139,11 +149,12 @@ procSuite "History Content Network": # Need to provide enough headers to have the accumulator "finished". const lastBlockNumber = int(mergeBlockNumber - 1) - let - headers = createEmptyHeaders(0, lastBlockNumber) - masterAccumulator = buildAccumulator(headers).get() - epochAccumulators = buildAccumulatorData(headers) + let headers = createEmptyHeaders(0, lastBlockNumber) + let accumulatorRes = buildAccumulatorData(headers) + check accumulatorRes.isOk() + let + (masterAccumulator, epochAccumulators) = accumulatorRes.get() historyNode1 = newHistoryNode(rng, 20302, masterAccumulator) historyNode2 = newHistoryNode(rng, 20303, masterAccumulator) @@ -163,8 +174,13 @@ procSuite "History Content Network": # One of the nodes needs to have the epochAccumulator to build proofs from # for the offered headers. - for (contentKey, epochAccumulator) in epochAccumulators: - let contentId = toContentId(contentKey) + for epochAccumulator in epochAccumulators: + let + rootHash = epochAccumulator.hash_tree_root() + contentKey = ContentKey( + contentType: ContentType.epochAccumulator, + epochAccumulatorKey: EpochAccumulatorKey(epochHash: rootHash)) + contentId = toContentId(contentKey) historyNode2.portalProtocol().storeContent( contentId, SSZ.encode(epochAccumulator)) @@ -222,11 +238,12 @@ procSuite "History Content Network": lastBlockNumber - 1, lastBlockNumber] - let - headers = createEmptyHeaders(0, lastBlockNumber) - masterAccumulator = buildAccumulator(headers).get() - epochAccumulators = buildAccumulatorData(headers) + let headers = createEmptyHeaders(0, lastBlockNumber) + let accumulatorRes = buildAccumulatorData(headers) + check accumulatorRes.isOk() + let + (masterAccumulator, epochAccumulators) = accumulatorRes.get() historyNode1 = newHistoryNode(rng, 20302, masterAccumulator) historyNode2 = newHistoryNode(rng, 20303, masterAccumulator) @@ -239,8 +256,13 @@ procSuite "History Content Network": # Need to store the epochAccumulators, because else the headers can't be # verified if being part of the canonical chain currently - for (contentKey, epochAccumulator) in epochAccumulators: - let contentId = toContentId(contentKey) + for epochAccumulator in epochAccumulators: + let + rootHash = epochAccumulator.hash_tree_root() + contentKey = ContentKey( + contentType: ContentType.epochAccumulator, + epochAccumulatorKey: EpochAccumulatorKey(epochHash: rootHash)) + contentId = toContentId(contentKey) historyNode1.portalProtocol.storeContent( contentId, SSZ.encode(epochAccumulator)) diff --git a/fluffy/tools/eth_data_exporter.nim b/fluffy/tools/eth_data_exporter.nim index 6d1f6b409..efd33256a 100644 --- a/fluffy/tools/eth_data_exporter.nim +++ b/fluffy/tools/eth_data_exporter.nim @@ -162,6 +162,10 @@ type defaultValue: defaultAccumulatorFileName defaultValueDesc: $defaultAccumulatorFileName name: "accumulator-file-name" .}: string + writeEpochAccumulators* {. + desc: "Write also the SSZ encoded epoch accumulators to specific files" + defaultValue: false + name: "write-epoch-accumulators" .}: bool of printAccumulatorData: accumulatorFileNamePrint* {. desc: "File from which the serialized accumulator is read" @@ -492,7 +496,7 @@ when isMainModule: fatal "Required epoch headers file does not exist", file quit 1 - proc buildAccumulator(dataDir: string): + proc buildAccumulator(dataDir: string, writeEpochAccumulators = false): Result[FinishedAccumulator, string] = var accumulator: Accumulator for i in 0..