Add Era1 based gossip calls + Era1 helpers (#2029)

- Add Era1 helpers to be able to iterate fast over block tuples
and individual total difficulties
- Add buildAccumulator from Era1 file
- Add Era1 based BlockHeader with proof + bodies/receipts gossip
calls
- Add new JSON-RPC debug methods to be able to test the above
with a standalone fluffy node
This commit is contained in:
Kim De Mey 2024-02-15 16:49:22 +01:00 committed by GitHub
parent 3305c02856
commit 9378774b9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 210 additions and 6 deletions

View File

@ -135,6 +135,13 @@ proc readBlockIndex*(f: IoHandle): Result[BlockIndex, string] =
ok(BlockIndex(startNumber: blockNumber, offsets: offsets)) ok(BlockIndex(startNumber: blockNumber, offsets: offsets))
proc skipRecord*(f: IoHandle): Result[void, string] =
let header = ? readHeader(f)
if header.len > 0:
? f.setFilePos(header.len, SeekPosition.SeekCurrent).mapErr(ioErrorMsg)
ok()
func startNumber*(era: Era1): uint64 = func startNumber*(era: Era1): uint64 =
era * MaxEra1Size era * MaxEra1Size
@ -239,6 +246,9 @@ type
handle: Opt[IoHandle] handle: Opt[IoHandle]
blockIdx: BlockIndex blockIdx: BlockIndex
BlockTuple* =
tuple[header: BlockHeader, body: BlockBody, receipts: seq[Receipt], td: UInt256]
proc open*(_: type Era1File, name: string): Result[Era1File, string] = proc open*(_: type Era1File, name: string): Result[Era1File, string] =
var var
f = Opt[IoHandle].ok(? openFile(name, {OpenFlags.Read}).mapErr(ioErrorMsg)) f = Opt[IoHandle].ok(? openFile(name, {OpenFlags.Read}).mapErr(ioErrorMsg))
@ -268,6 +278,11 @@ proc close*(f: Era1File) =
discard closeFile(f.handle.get()) discard closeFile(f.handle.get())
reset(f.handle) reset(f.handle)
proc skipRecord*(f: Era1File): Result[void, string] =
doAssert f[].handle.isSome()
f[].handle.get().skipRecord()
proc getBlockHeader(f: Era1File): Result[BlockHeader, string] = proc getBlockHeader(f: Era1File): Result[BlockHeader, string] =
var bytes: seq[byte] var bytes: seq[byte]
@ -309,7 +324,7 @@ proc getTotalDifficulty(f: Era1File): Result[UInt256, string] =
proc getNextBlockTuple*( proc getNextBlockTuple*(
f: Era1File f: Era1File
): Result[(BlockHeader, BlockBody, seq[Receipt], UInt256), string] = ): Result[BlockTuple, string] =
doAssert not isNil(f) and f[].handle.isSome doAssert not isNil(f) and f[].handle.isSome
let let
@ -322,7 +337,7 @@ proc getNextBlockTuple*(
proc getBlockTuple*( proc getBlockTuple*(
f: Era1File, blockNumber: uint64 f: Era1File, blockNumber: uint64
): Result[(BlockHeader, BlockBody, seq[Receipt], UInt256), string] = ): Result[BlockTuple, string] =
doAssert not isNil(f) and f[].handle.isSome doAssert not isNil(f) and f[].handle.isSome
doAssert( doAssert(
blockNumber >= f[].blockIdx.startNumber and blockNumber >= f[].blockIdx.startNumber and
@ -335,6 +350,39 @@ proc getBlockTuple*(
getNextBlockTuple(f) getNextBlockTuple(f)
proc getBlockHeader*(
f: Era1File, blockNumber: uint64
): Result[BlockHeader, string] =
doAssert not isNil(f) and f[].handle.isSome
doAssert(
blockNumber >= f[].blockIdx.startNumber and
blockNumber <= f[].blockIdx.endNumber,
"Wrong era1 file for selected block number")
let pos = f[].blockIdx.offsets[blockNumber - f[].blockIdx.startNumber]
? f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)
getBlockHeader(f)
proc getTotalDifficulty*(
f: Era1File, blockNumber: uint64
): Result[UInt256, string] =
doAssert not isNil(f) and f[].handle.isSome
doAssert(
blockNumber >= f[].blockIdx.startNumber and
blockNumber <= f[].blockIdx.endNumber,
"Wrong era1 file for selected block number")
let pos = f[].blockIdx.offsets[blockNumber - f[].blockIdx.startNumber]
? f[].handle.get().setFilePos(pos, SeekPosition.SeekBegin).mapErr(ioErrorMsg)
?skipRecord(f) # BlockHeader
?skipRecord(f) # BlockBody
?skipRecord(f) # Receipts
getTotalDifficulty(f)
# TODO: Should we add this perhaps in the Era1File object and grab it in open()? # TODO: Should we add this perhaps in the Era1File object and grab it in open()?
proc getAccumulatorRoot*(f: Era1File): Result[Digest, string] = proc getAccumulatorRoot*(f: Era1File): Result[Digest, string] =
# Get position of BlockIndex # Get position of BlockIndex
@ -356,6 +404,23 @@ proc getAccumulatorRoot*(f: Era1File): Result[Digest, string] =
ok(Digest(data: array[32, byte].initCopyFrom(bytes))) ok(Digest(data: array[32, byte].initCopyFrom(bytes)))
proc buildAccumulator*(f: Era1File): Result[EpochAccumulatorCached, string] =
let
startNumber = f.blockIdx.startNumber
endNumber = f.blockIdx.endNumber()
var headerRecords: seq[HeaderRecord]
for blockNumber in startNumber..endNumber:
let
blockHeader = ? f.getBlockHeader(blockNumber)
totalDifficulty = ? f.getTotalDifficulty(blockNumber)
headerRecords.add(HeaderRecord(
blockHash: blockHeader.blockHash(),
totalDifficulty: totalDifficulty))
ok(EpochAccumulatorCached.init(@headerRecords))
proc verify*(f: Era1File): Result[Digest, string] = proc verify*(f: Era1File): Result[Digest, string] =
let let
startNumber = f.blockIdx.startNumber startNumber = f.blockIdx.startNumber
@ -390,3 +455,23 @@ proc verify*(f: Era1File): Result[Digest, string] =
err("Invalid accumulator root") err("Invalid accumulator root")
else: else:
ok(accumulatorRoot) ok(accumulatorRoot)
iterator era1BlockHeaders*(f: Era1File): BlockHeader =
let
startNumber = f.blockIdx.startNumber
endNumber = f.blockIdx.endNumber()
for blockNumber in startNumber..endNumber:
let header = f.getBlockHeader(blockNumber).valueOr:
raiseAssert("Failed to read block header")
yield header
iterator era1BlockTuples*(f: Era1File): BlockTuple =
let
startNumber = f.blockIdx.startNumber
endNumber = f.blockIdx.endNumber()
for blockNumber in startNumber..endNumber:
let blockTuple = f.getBlockTuple(blockNumber).valueOr:
raiseAssert("Failed to read block header")
yield blockTuple

View File

@ -1,5 +1,5 @@
# # Nimbus - Portal Network # # Nimbus - Portal Network
# # Copyright (c) 2022-2023 Status Research & Development GmbH # # Copyright (c) 2022-2024 Status Research & Development GmbH
# # Licensed and distributed under either of # # Licensed and distributed under either of
# # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -9,11 +9,11 @@
import import
std/[strformat, os], std/[strformat, os],
stew/results, chronos, chronicles, results, chronos, chronicles,
eth/common/eth_types, eth/rlp, eth/common/eth_types, eth/rlp,
../network/wire/portal_protocol, ../network/wire/portal_protocol,
../network/history/[history_content, history_network, accumulator], ../network/history/[history_content, history_network, accumulator],
"."/[history_data_json_store, history_data_ssz_e2s] "."/[era1, history_data_json_store, history_data_ssz_e2s]
export results export results
@ -249,3 +249,101 @@ proc historyPropagateHeaders*(
return ok() return ok()
else: else:
return err(blockData.error) return err(blockData.error)
##
## Era1 based iterators that encode to Portal content
##
# Note: these iterators + the era1 iterators will assert on error. These asserts
# would indicate corrupt/invalid era1 files. We might want to instead break,
# raise an exception or return a Result type instead, but the latter does not
# have great support for usage in iterators.
iterator headersWithProof*(
f: Era1File, epochAccumulator: EpochAccumulatorCached
): (ByteList, seq[byte]) =
for blockHeader in f.era1BlockHeaders:
doAssert blockHeader.isPreMerge()
let
contentKey = ContentKey(
contentType: blockHeader,
blockHeaderKey: BlockKey(blockHash: blockHeader.blockHash())
).encode()
headerWithProof = buildHeaderWithProof(blockHeader, epochAccumulator).valueOr:
raiseAssert "Failed to build header with proof: " & $blockHeader.blockNumber
contentValue = SSZ.encode(headerWithProof)
yield (contentKey, contentValue)
iterator blockContent*(f: Era1File): (ByteList, seq[byte]) =
for (header, body, receipts, _) in f.era1BlockTuples:
let blockHash = header.blockHash()
block: # block body
let
contentKey = ContentKey(
contentType: blockBody,
blockBodyKey: BlockKey(blockHash: blockHash)
).encode()
contentValue = encode(body)
yield (contentKey, contentValue)
block: # receipts
let
contentKey = ContentKey(
contentType: receipts,
receiptsKey: BlockKey(blockHash: blockHash)
).encode()
contentValue = encode(receipts)
yield (contentKey, contentValue)
##
## Era1 based Gossip calls
##
proc historyGossipHeadersWithProof*(
p: PortalProtocol, era1File: string, epochAccumulatorFile: Opt[string],
verifyEra = false
): Future[Result[void, string]] {.async.} =
let f = ?Era1File.open(era1File)
if verifyEra:
let _ = ?f.verify()
# Note: building the accumulator takes about 150ms vs 10ms for reading it,
# so it is probably not really worth using the read version considering the
# UX hassle it adds to provide the accumulator ssz files.
let epochAccumulator =
if epochAccumulatorFile.isNone:
?f.buildAccumulator()
else:
?readEpochAccumulatorCached(epochAccumulatorFile.get())
for (contentKey, contentValue) in f.headersWithProof(epochAccumulator):
let peers = await p.neighborhoodGossip(
Opt.none(NodeId), ContentKeysList(@[contentKey]), @[contentValue])
info "Gossiped block header", contentKey, peers
ok()
proc historyGossipBlockContent*(
p: PortalProtocol, era1File: string, verifyEra = false
): Future[Result[void, string]] {.async.} =
let f = ?Era1File.open(era1File)
if verifyEra:
let _ = ?f.verify()
for (contentKey, contentValue) in f.blockContent():
let peers = await p.neighborhoodGossip(
Opt.none(NodeId), ContentKeysList(@[contentKey]), @[contentValue])
info "Gossiped block content", contentKey, peers
ok()

View File

@ -16,10 +16,31 @@ import
export rpcserver export rpcserver
# Non-spec-RPCs that are (currently) useful for testing & debugging # Non-spec-RPCs that are used for testing, debugging and seeding data without a
# bridge.
proc installPortalDebugApiHandlers*( proc installPortalDebugApiHandlers*(
rpcServer: RpcServer|RpcProxy, p: PortalProtocol, network: static string) = rpcServer: RpcServer|RpcProxy, p: PortalProtocol, network: static string) =
## Portal debug API calls related to storage and seeding from Era1 files.
rpcServer.rpc("portal_" & network & "GossipHeaders") do(
era1File: string, epochAccumulatorFile: Opt[string]) -> bool:
let res = await p.historyGossipHeadersWithProof(
era1File, epochAccumulatorFile)
if res.isOk():
return true
else:
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "GossipBlockContent") do(
era1File: string) -> bool:
let res = await p.historyGossipBlockContent(era1File)
if res.isOk():
return true
else:
raise newException(ValueError, $res.error)
## Portal debug API calls related to storage and seeding
## TODO: To be removed/replaced with the Era1 versions where applicable.
rpcServer.rpc("portal_" & network & "_storeContent") do( rpcServer.rpc("portal_" & network & "_storeContent") do(
dataFile: string) -> bool: dataFile: string) -> bool:
let res = p.historyStore(dataFile) let res = p.historyStore(dataFile)