Optionally write all epoch accumulators when building accumulator

- Can write epoch accumulators to files now with eth_data_exporter
- RPC requests to gossip epoch accumulators now uses these files
instead of building on the fly
- Other build accumulator calls are adjusted and only used for
tests and thus moved to testing folder
This commit is contained in:
kdeme 2022-10-18 13:07:32 +02:00 committed by zah
parent 09766ef283
commit e6d8bb4f2f
9 changed files with 138 additions and 155 deletions

View File

@ -206,14 +206,8 @@ proc readAccumulator*(file: string): Result[FinishedAccumulator, string] =
err("Failed decoding accumulator: " & e.msg)
proc readEpochAccumulator*(dataFile: string): Result[EpochAccumulator, string] =
let res = ? readJsonType(dataFile, EpochAccumulatorObject)
let encodedAccumulator =
try:
res.epochAccumulator.hexToSeqByte()
except ValueError as e:
return err("Invalid hex data for accumulator: " & e.msg)
proc readEpochAccumulator*(file: string): Result[EpochAccumulator, string] =
let encodedAccumulator = ? readAllFile(file).mapErr(toString)
try:
ok(SSZ.decode(encodedAccumulator, EpochAccumulator))

View File

@ -8,6 +8,7 @@
{.push raises: [Defect].}
import
std/[strformat, os],
stew/results, chronos, chronicles,
eth/common/eth_types,
../network/wire/portal_protocol,
@ -18,38 +19,6 @@ export results
### Helper calls to seed the local database and/or the network
proc buildAccumulator*(dataFile: string): Result[FinishedAccumulator, string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
buildAccumulator(headers)
proc buildAccumulatorData*(
dataFile: string):
Result[seq[(ContentKey, EpochAccumulator)], string] =
let blockData = ? readJsonType(dataFile, BlockDataTable)
var headers: seq[BlockHeader]
# Len of headers from blockdata + genesis header
headers.setLen(blockData.len() + 1)
headers[0] = getGenesisHeader()
for k, v in blockData.pairs:
let header = ? v.readBlockHeader()
headers[header.blockNumber.truncate(int)] = header
ok(buildAccumulatorData(headers))
proc historyStore*(
p: PortalProtocol, dataFile: string, verify = false):
Result[void, string] =
@ -62,32 +31,12 @@ proc historyStore*(
ok()
proc propagateAccumulatorData*(
p: PortalProtocol, dataFile: string):
Future[Result[void, string]] {.async.} =
## Propagate all epoch accumulators created when building the accumulator
## from the block headers.
## dataFile holds block data
let epochAccumulators = buildAccumulatorData(dataFile)
if epochAccumulators.isErr():
return err(epochAccumulators.error)
else:
for (key, epochAccumulator) in epochAccumulators.get():
let content = SSZ.encode(epochAccumulator)
p.storeContent(
history_content.toContentId(key), content)
discard await p.neighborhoodGossip(
ContentKeysList(@[encode(key)]), @[content])
return ok()
proc propagateEpochAccumulator*(
p: PortalProtocol, dataFile: string):
p: PortalProtocol, file: string):
Future[Result[void, string]] {.async.} =
## Propagate a specific epoch accumulator into the network.
## dataFile holds the SSZ serialized epoch accumulator
let epochAccumulatorRes = readEpochAccumulator(dataFile)
## file holds the SSZ serialized epoch accumulator.
let epochAccumulatorRes = readEpochAccumulator(file)
if epochAccumulatorRes.isErr():
return err(epochAccumulatorRes.error)
else:
@ -99,13 +48,35 @@ proc propagateEpochAccumulator*(
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
# Note: The file actually holds the SSZ encoded accumulator, but we need
# to decode as we need the root for the content key.
encodedAccumulator = SSZ.encode(accumulator)
info "Gossiping epoch accumulator", rootHash
p.storeContent(
history_content.toContentId(key), SSZ.encode(accumulator))
history_content.toContentId(key), encodedAccumulator)
discard await p.neighborhoodGossip(
ContentKeysList(@[encode(key)]), @[SSZ.encode(accumulator)])
ContentKeysList(@[encode(key)]), @[encodedAccumulator])
return ok()
proc propagateEpochAccumulators*(
p: PortalProtocol, path: string):
Future[Result[void, string]] {.async.} =
## Propagate all epoch accumulators created when building the accumulator
## from the block headers.
## path is a directory that holds all SSZ encoded epoch accumulator files.
for i in 0..<preMergeEpochs:
let file =
try: path / &"mainnet-epoch-accumulator-{i.uint64:05}.ssz"
except ValueError as e: raiseAssert e.msg
let res = await p.propagateEpochAccumulator(file)
if res.isErr():
return err(res.error)
return ok()
proc historyPropagate*(
p: PortalProtocol, dataFile: string, verify = false):
Future[Result[void, string]] {.async.} =

View File

@ -102,51 +102,6 @@ func finishAccumulator*(a: var Accumulator): FinishedAccumulator =
FinishedAccumulator(historicalEpochs: a.historicalEpochs)
func buildAccumulator*(
headers: seq[BlockHeader]): Result[FinishedAccumulator, string] =
var accumulator: Accumulator
for header in headers:
updateAccumulator(accumulator, header)
if header.blockNumber.truncate(uint64) == mergeBlockNumber - 1:
return ok(finishAccumulator(accumulator))
err("Not enough headers provided to finish the accumulator")
func buildAccumulatorData*(headers: seq[BlockHeader]):
seq[(ContentKey, EpochAccumulator)] =
var accumulator: Accumulator
var epochAccumulators: seq[(ContentKey, EpochAccumulator)]
for header in headers:
updateAccumulator(accumulator, header)
# TODO: By allowing updateAccumulator and finishAccumulator to return
# optionally the finished epoch accumulators we would avoid double
# hash_tree_root computations.
if accumulator.currentEpoch.len() == epochSize:
let
rootHash = accumulator.currentEpoch.hash_tree_root()
key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
epochAccumulators.add((key, accumulator.currentEpoch))
if header.blockNumber.truncate(uint64) == mergeBlockNumber - 1:
let
rootHash = accumulator.currentEpoch.hash_tree_root()
key = ContentKey(
contentType: epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(
epochHash: rootHash))
epochAccumulators.add((key, accumulator.currentEpoch))
discard finishAccumulator(accumulator)
epochAccumulators
## Calls and helper calls for building header proofs and verifying headers
## against the Accumulator and the header proofs.

View File

@ -4,10 +4,8 @@ proc portal_history_storeContent(dataFile: string): bool
proc portal_history_propagate(dataFile: string): bool
proc portal_history_propagateHeaders(dataFile: string): bool
proc portal_history_propagateBlock(dataFile: string, blockHash: string): bool
proc portal_history_propagateAccumulatorData(
dataFile: string): bool
proc portal_history_propagateEpochAccumulator(
dataFile: string): bool
proc portal_history_propagateEpochAccumulator(dataFile: string): bool
proc portal_history_propagateEpochAccumulators(path: string): bool
proc portal_history_storeContentInNodeRange(
dbPath: string, max: uint32, starting: uint32): bool
proc portal_history_offerContentInNodeRange(
@ -16,4 +14,3 @@ proc portal_history_depthContentPropagate(
dbPath: string, max: uint32): bool
proc portal_history_breadthContentPropagate(
dbPath: string): bool

View File

@ -64,21 +64,20 @@ proc installPortalDebugApiHandlers*(
else:
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "_propagateAccumulatorData") do(
dataFile: string) -> bool:
let res = await p.propagateAccumulatorData(dataFile)
if res.isOk():
return true
else:
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "_propagateEpochAccumulator") do(
dataFile: string) -> bool:
let res = await p.propagateEpochAccumulator(dataFile)
if res.isOk():
return true
else:
echo $res.error
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "_propagateEpochAccumulators") do(
path: string) -> bool:
let res = await p.propagateEpochAccumulators(path)
if res.isOk():
return true
else:
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "_storeContentInNodeRange") do(

View File

@ -13,15 +13,15 @@ import
unittest2, stint,
eth/common/eth_types_rlp,
../data/history_data_parser,
../network/history/[history_content, accumulator]
../network/history/[history_content, accumulator],
./test_helpers
func buildProof(
epochAccumulators: seq[(ContentKey, EpochAccumulator)],
header: BlockHeader):
epochAccumulators: seq[EpochAccumulator], header: BlockHeader):
Result[seq[Digest], string] =
let
epochIndex = getEpochIndex(header)
epochAccumulator = epochAccumulators[epochIndex][1]
epochAccumulator = epochAccumulators[epochIndex]
headerRecordIndex = getHeaderRecordIndex(header, epochIndex)
gIndex = GeneralizedIndex(epochSize*2*2 + (headerRecordIndex*2))
@ -54,13 +54,9 @@ suite "Header Accumulator":
headers.add(BlockHeader(
blockNumber: i.stuint(256), difficulty: 1.stuint(256)))
let
accumulatorRes = buildAccumulator(headers)
epochAccumulators = buildAccumulatorData(headers)
let accumulatorRes = buildAccumulatorData(headers)
check accumulatorRes.isOk()
let accumulator = accumulatorRes.get()
let (accumulator, epochAccumulators) = accumulatorRes.get()
block: # Test valid headers
for i in headersToTest:

View File

@ -9,7 +9,8 @@ import
stew/shims/net,
eth/keys,
eth/p2p/discoveryv5/[enr, node, routing_table],
eth/p2p/discoveryv5/protocol as discv5_protocol
eth/p2p/discoveryv5/protocol as discv5_protocol,
../network/history/accumulator
proc localAddress*(port: int): Address =
Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port))
@ -43,3 +44,31 @@ proc genByteSeq*(length: int): seq[byte] =
resultSeq[i] = byte(i)
inc i
return resultSeq
func buildAccumulator*(
headers: seq[BlockHeader]): Result[FinishedAccumulator, string] =
var accumulator: Accumulator
for header in headers:
updateAccumulator(accumulator, header)
if header.blockNumber.truncate(uint64) == mergeBlockNumber - 1:
return ok(finishAccumulator(accumulator))
err("Not enough headers provided to finish the accumulator")
func buildAccumulatorData*(headers: seq[BlockHeader]):
Result[(FinishedAccumulator, seq[EpochAccumulator]), string] =
var accumulator: Accumulator
var epochAccumulators: seq[EpochAccumulator]
for header in headers:
updateAccumulator(accumulator, header)
if accumulator.currentEpoch.len() == epochSize:
epochAccumulators.add(accumulator.currentEpoch)
if header.blockNumber.truncate(uint64) == mergeBlockNumber - 1:
epochAccumulators.add(accumulator.currentEpoch)
return ok((finishAccumulator(accumulator), epochAccumulators))
err("Not enough headers provided to finish the accumulator")

View File

@ -8,7 +8,8 @@
import
std/os,
testutils/unittests, chronos,
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table,
eth/p2p/discoveryv5/protocol as discv5_protocol,
eth/p2p/discoveryv5/routing_table,
eth/common/eth_types_rlp,
eth/rlp,
../network/wire/[portal_protocol, portal_stream, portal_protocol_config],
@ -89,10 +90,12 @@ procSuite "History Content Network":
epochSize*3 + 1,
int(lastBlockNumber)]
let headers = createEmptyHeaders(0, int(lastBlockNumber))
let accumulatorRes = buildAccumulatorData(headers)
check accumulatorRes.isOk()
let
headers = createEmptyHeaders(0, int(lastBlockNumber))
masterAccumulator = buildAccumulator(headers).get()
epochAccumulators = buildAccumulatorData(headers)
(masterAccumulator, epochAccumulators) = accumulatorRes.get()
historyNode1 = newHistoryNode(rng, 20302, masterAccumulator)
historyNode2 = newHistoryNode(rng, 20303, masterAccumulator)
@ -107,8 +110,15 @@ procSuite "History Content Network":
headerEncoded = rlp.encode(h)
historyNode2.portalProtocol().storeContent(contentId, headerEncoded)
for (contentKey, epochAccumulator) in epochAccumulators:
let contentId = toContentId(contentKey)
# Need to store the epoch accumulators to be able to do the block to hash
# mapping
for epochAccumulator in epochAccumulators:
let
rootHash = epochAccumulator.hash_tree_root()
contentKey = ContentKey(
contentType: ContentType.epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(epochHash: rootHash))
contentId = toContentId(contentKey)
historyNode2.portalProtocol().storeContent(
contentId, SSZ.encode(epochAccumulator))
@ -139,11 +149,12 @@ procSuite "History Content Network":
# Need to provide enough headers to have the accumulator "finished".
const lastBlockNumber = int(mergeBlockNumber - 1)
let
headers = createEmptyHeaders(0, lastBlockNumber)
masterAccumulator = buildAccumulator(headers).get()
epochAccumulators = buildAccumulatorData(headers)
let headers = createEmptyHeaders(0, lastBlockNumber)
let accumulatorRes = buildAccumulatorData(headers)
check accumulatorRes.isOk()
let
(masterAccumulator, epochAccumulators) = accumulatorRes.get()
historyNode1 = newHistoryNode(rng, 20302, masterAccumulator)
historyNode2 = newHistoryNode(rng, 20303, masterAccumulator)
@ -163,8 +174,13 @@ procSuite "History Content Network":
# One of the nodes needs to have the epochAccumulator to build proofs from
# for the offered headers.
for (contentKey, epochAccumulator) in epochAccumulators:
let contentId = toContentId(contentKey)
for epochAccumulator in epochAccumulators:
let
rootHash = epochAccumulator.hash_tree_root()
contentKey = ContentKey(
contentType: ContentType.epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(epochHash: rootHash))
contentId = toContentId(contentKey)
historyNode2.portalProtocol().storeContent(
contentId, SSZ.encode(epochAccumulator))
@ -222,11 +238,12 @@ procSuite "History Content Network":
lastBlockNumber - 1,
lastBlockNumber]
let
headers = createEmptyHeaders(0, lastBlockNumber)
masterAccumulator = buildAccumulator(headers).get()
epochAccumulators = buildAccumulatorData(headers)
let headers = createEmptyHeaders(0, lastBlockNumber)
let accumulatorRes = buildAccumulatorData(headers)
check accumulatorRes.isOk()
let
(masterAccumulator, epochAccumulators) = accumulatorRes.get()
historyNode1 = newHistoryNode(rng, 20302, masterAccumulator)
historyNode2 = newHistoryNode(rng, 20303, masterAccumulator)
@ -239,8 +256,13 @@ procSuite "History Content Network":
# Need to store the epochAccumulators, because else the headers can't be
# verified if being part of the canonical chain currently
for (contentKey, epochAccumulator) in epochAccumulators:
let contentId = toContentId(contentKey)
for epochAccumulator in epochAccumulators:
let
rootHash = epochAccumulator.hash_tree_root()
contentKey = ContentKey(
contentType: ContentType.epochAccumulator,
epochAccumulatorKey: EpochAccumulatorKey(epochHash: rootHash))
contentId = toContentId(contentKey)
historyNode1.portalProtocol.storeContent(
contentId, SSZ.encode(epochAccumulator))

View File

@ -162,6 +162,10 @@ type
defaultValue: defaultAccumulatorFileName
defaultValueDesc: $defaultAccumulatorFileName
name: "accumulator-file-name" .}: string
writeEpochAccumulators* {.
desc: "Write also the SSZ encoded epoch accumulators to specific files"
defaultValue: false
name: "write-epoch-accumulators" .}: bool
of printAccumulatorData:
accumulatorFileNamePrint* {.
desc: "File from which the serialized accumulator is read"
@ -492,7 +496,7 @@ when isMainModule:
fatal "Required epoch headers file does not exist", file
quit 1
proc buildAccumulator(dataDir: string):
proc buildAccumulator(dataDir: string, writeEpochAccumulators = false):
Result[FinishedAccumulator, string] =
var accumulator: Accumulator
for i in 0..<preMergeEpochs:
@ -525,6 +529,22 @@ when isMainModule:
updateAccumulator(accumulator, blockHeader)
# Note: writing away of epoch accumulators occurs 1 iteration before
# updating the epoch accumulator, as the latter happens when passed
# a header for the next epoch (or on finishing the epoch).
if writeEpochAccumulators:
if accumulator.currentEpoch.len() == epochSize or
blockHeader.blockNumber.truncate(uint64) == mergeBlockNumber - 1:
let file =
try: dataDir / &"mainnet-epoch-accumulator-{i.uint64:05}.ssz"
except ValueError as e: raiseAssert e.msg
let res = io2.writeFile(file, SSZ.encode(accumulator.currentEpoch))
if res.isErr():
error "Failed writing epoch accumulator to file",
file, error = res.error
else:
notice "Succesfully wrote epoch accumulator to file", file
if count == epochSize - 1:
info "Updated an epoch", epoch = i
count.inc()
@ -539,7 +559,7 @@ when isMainModule:
err("Not enough headers provided to finish the accumulator")
let accumulatorRes = buildAccumulator(dataDir)
let accumulatorRes = buildAccumulator(dataDir, config.writeEpochAccumulators)
if accumulatorRes.isErr():
fatal "Could not build accumulator", error = accumulatorRes.error
quit 1