Add an audit mode to portal_bridge history backfill (#2109)

This commit is contained in:
Kim De Mey 2024-03-26 22:27:31 +01:00 committed by GitHub
parent 547f198ace
commit 8e4368195a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 233 additions and 11 deletions

View File

@ -0,0 +1,66 @@
# fluffy
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import std/os, stew/io2, results, ../network/history/accumulator, ../eth_data/era1
type Era1DB* = ref object
## The Era1 database manages a collection of era files that together make up
## a linear history of pre-merge execution chain data.
path: string
network: string
accumulator: FinishedAccumulator
files: seq[Era1File]
proc getEra1File(db: Era1DB, era: Era1): Result[Era1File, string] =
for f in db.files:
if f.blockIdx.startNumber.era == era:
return ok(f)
if era > mergeBlockNumber.era():
return err("Selected era1 past pre-merge data")
let
root = db.accumulator.historicalEpochs[era.int]
name = era1FileName(db.network, era, Digest(data: root))
path = db.path / name
if not isFile(path):
return err("No such era file")
# TODO: The open call does not do full verification. It is assumed here that
# trusted files are used. We might want to add a full validation option.
let f = Era1File.open(path).valueOr:
return err(error)
if db.files.len > 16: # TODO LRU
close(db.files[0])
db.files.delete(0)
db.files.add(f)
ok(f)
proc new*(
T: type Era1DB, path: string, network: string, accumulator: FinishedAccumulator
): Era1DB =
Era1DB(path: path, network: network, accumulator: accumulator)
proc getBlockTuple*(db: Era1DB, blockNumber: uint64): Result[BlockTuple, string] =
let f = ?db.getEra1File(blockNumber.era)
f.getBlockTuple(blockNumber)
proc getAccumulator*(
db: Era1DB, blockNumber: uint64
): Result[EpochAccumulatorCached, string] =
## Get the Epoch Accumulator that the block with `blockNumber` is part of.
# TODO: Probably want this `EpochAccumulatorCached` also actually cached in
# the Era1File or EraDB object.
let f = ?db.getEra1File(blockNumber.era)
f.buildAccumulator()

View File

@ -260,7 +260,7 @@ func era1FileName*(network: string, era: Era1, eraRoot: Digest): string =
type
Era1File* = ref object
handle: Opt[IoHandle]
blockIdx: BlockIndex
blockIdx*: BlockIndex
BlockTuple* =
tuple[header: BlockHeader, body: BlockBody, receipts: seq[Receipt], td: UInt256]

View File

@ -136,7 +136,7 @@ func fromPortalReceipts*(
# forks but not for the EL BlockBody (usage of Option) does not play so well
# together.
func fromBlockBody(T: type PortalBlockBodyLegacy, body: BlockBody): T =
func fromBlockBody*(T: type PortalBlockBodyLegacy, body: BlockBody): T =
var transactions: Transactions
for tx in body.transactions:
discard transactions.add(TransactionByteList(rlp.encode(tx)))
@ -145,7 +145,7 @@ func fromBlockBody(T: type PortalBlockBodyLegacy, body: BlockBody): T =
PortalBlockBodyLegacy(transactions: transactions, uncles: uncles)
func fromBlockBody(T: type PortalBlockBodyShanghai, body: BlockBody): T =
func fromBlockBody*(T: type PortalBlockBodyShanghai, body: BlockBody): T =
var transactions: Transactions
for tx in body.transactions:
discard transactions.add(TransactionByteList(rlp.encode(tx)))

View File

@ -25,7 +25,7 @@ createRpcSigsFromNim(RpcClient):
proc portal_stateFindContent(enr: Record, contentKey: string): JsonNode
proc portal_stateOffer(enr: Record, contentKey: string, contentValue: string): string
proc portal_stateRecursiveFindNodes(nodeId: NodeId): seq[Record]
proc portal_stateRecursiveFindContent(contentKey: string): string
proc portal_stateRecursiveFindContent(contentKey: string): ContentInfo
proc portal_stateStore(contentKey: string, contentValue: string): bool
proc portal_stateLocalContent(contentKey: string): string
proc portal_stateGossip(contentKey: string, contentValue: string): int
@ -46,7 +46,7 @@ createRpcSigsFromNim(RpcClient):
): string
proc portal_historyRecursiveFindNodes(nodeId: NodeId): seq[Record]
proc portal_historyRecursiveFindContent(contentKey: string): string
proc portal_historyRecursiveFindContent(contentKey: string): ContentInfo
proc portal_historyStore(contentKey: string, contentValue: string): bool
proc portal_historyLocalContent(contentKey: string): string
proc portal_historyGossip(contentKey: string, contentValue: string): int
@ -64,7 +64,7 @@ createRpcSigsFromNim(RpcClient):
proc portal_beaconFindContent(enr: Record, contentKey: string): JsonNode
proc portal_beaconOffer(enr: Record, contentKey: string, contentValue: string): string
proc portal_beaconRecursiveFindNodes(nodeId: NodeId): seq[Record]
proc portal_beaconRecursiveFindContent(contentKey: string): string
proc portal_beaconRecursiveFindContent(contentKey: string): ContentInfo
proc portal_beaconStore(contentKey: string, contentValue: string): bool
proc portal_beaconLocalContent(contentKey: string): string
proc portal_beaconGossip(contentKey: string, contentValue: string): int

View File

@ -26,9 +26,14 @@ type
PingResult* = tuple[enrSeq: uint64, dataRadius: UInt256]
ContentInfo* = object
content*: string
utpTransfer*: bool
NodeInfo.useDefaultSerializationIn JrpcConv
RoutingTableInfo.useDefaultSerializationIn JrpcConv
(string, string).useDefaultSerializationIn JrpcConv
ContentInfo.useDefaultSerializationIn JrpcConv
func getNodeInfo*(r: RoutingTable): NodeInfo =
NodeInfo(enr: r.localNode.record, nodeId: r.localNode.id)

View File

@ -45,11 +45,11 @@ proc checkAccumulators(client: RpcClient) {.async.} =
let contentKey = ContentKey.init(epochAccumulator, root)
try:
let content = await client.portal_historyRecursiveFindContent(
let contentInfo = await client.portal_historyRecursiveFindContent(
contentKey.encode.asSeq().toHex()
)
let res = decodeSsz(hexToSeqByte(content), EpochAccumulator)
let res = decodeSsz(hexToSeqByte(contentInfo.content), EpochAccumulator)
if res.isErr():
echo "[Invalid] EpochAccumulator number " & $i & ": " & $root & " error: " &
res.error

View File

@ -108,11 +108,18 @@ type
backfill* {.
desc:
"Randomly backfill block headers, bodies and receipts into the network from the era1 files",
"Randomly backfill pre-merge block headers, bodies and receipts into the network from the era1 files",
defaultValue: false,
name: "backfill"
.}: bool
audit* {.
desc:
"Run pre-merge backfill in audit mode, which will only gossip content that if failed to fetch from the network",
defaultValue: false,
name: "audit"
.}: bool
era1Dir* {.
desc: "The directory where all era1 files are stored",
defaultValue: defaultEra1DataDir(),

View File

@ -22,6 +22,7 @@ import
../../network/history/[history_content, history_network],
../../network_metadata,
../../eth_data/[era1, history_data_ssz_e2s, history_data_seeding],
../../database/era1_db,
./portal_bridge_conf
from stew/objects import checkedEnumAssign
@ -170,7 +171,9 @@ proc gossipBlockHeader(
return ok()
proc gossipBlockBody(
client: RpcClient, hash: common_types.BlockHash, body: PortalBlockBodyShanghai
client: RpcClient,
hash: common_types.BlockHash,
body: PortalBlockBodyLegacy | PortalBlockBodyShanghai,
): Future[Result[void, string]] {.async: (raises: []).} =
let
contentKey = history_content.ContentKey.init(blockBody, hash)
@ -402,6 +405,142 @@ proc runBackfillLoop(
info "Succesfully gossiped era1 file", eraFile
proc runBackfillLoopAuditMode(
portalClient: RpcClient, web3Client: RpcClient, era1Dir: string
) {.async: (raises: [CancelledError]).} =
let
rng = newRng()
db = Era1DB.new(era1Dir, "mainnet", loadAccumulator())
while true:
let
# Grab a random blockNumber to audit and potentially gossip
blockNumber = rng[].rand(network_metadata.mergeBlockNumber - 1).uint64
(header, body, receipts, _) = db.getBlockTuple(blockNumber).valueOr:
error "Failed to get block tuple", error, blockNumber
continue
blockHash = header.blockHash()
var headerSuccess, bodySuccess, receiptsSuccess = false
logScope:
blockNumber = blockNumber
# header
block headerBlock:
let
contentKey = ContentKey.init(blockHeader, blockHash)
contentHex =
try:
(
await portalClient.portal_historyRecursiveFindContent(
contentKey.encode.asSeq().toHex()
)
).content
except CatchableError as e:
error "Failed to find block header content", error = e.msg
break headerBlock
content =
try:
hexToSeqByte(contentHex)
except ValueError as e:
error "Invalid hex for block header content", error = e.msg
break headerBlock
headerWithProof = decodeSsz(content, BlockHeaderWithProof).valueOr:
error "Failed to decode block header content", error
break headerBlock
if keccakHash(headerWithProof.header.asSeq()) != blockHash:
error "Block hash mismatch", blockNumber
break headerBlock
info "Retrieved block header from Portal network"
headerSuccess = true
# body
block bodyBlock:
let
contentKey = ContentKey.init(blockBody, blockHash)
contentHex =
try:
(
await portalClient.portal_historyRecursiveFindContent(
contentKey.encode.asSeq().toHex()
)
).content
except CatchableError as e:
error "Failed to find block body content", error = e.msg
break bodyBlock
content =
try:
hexToSeqByte(contentHex)
except ValueError as e:
error "Invalid hex for block body content", error = e.msg
break bodyBlock
validateBlockBodyBytes(content, header).isOkOr:
error "Block body is invalid", error
break bodyBlock
info "Retrieved block body from Portal network"
bodySuccess = true
# receipts
block receiptsBlock:
let
contentKey = ContentKey.init(ContentType.receipts, blockHash)
contentHex =
try:
(
await portalClient.portal_historyRecursiveFindContent(
contentKey.encode.asSeq().toHex()
)
).content
except CatchableError as e:
error "Failed to find block receipts content", error = e.msg
break receiptsBlock
content =
try:
hexToSeqByte(contentHex)
except ValueError as e:
error "Invalid hex for block receipts content", error = e.msg
break receiptsBlock
validateReceiptsBytes(content, header.receiptRoot).isOkOr:
error "Block receipts are invalid", error
break receiptsBlock
info "Retrieved block receipts from Portal network"
receiptsSuccess = true
# Gossip missing content
if not headerSuccess:
let
epochAccumulator = db.getAccumulator(blockNumber).valueOr:
raiseAssert "Failed to get accumulator from EraDB: " & error
headerWithProof = buildHeaderWithProof(header, epochAccumulator).valueOr:
raiseAssert "Failed to build header with proof: " & error
(await portalClient.gossipBlockHeader(blockHash, headerWithProof)).isOkOr:
error "Failed to gossip block header", error
if not bodySuccess:
(
await portalClient.gossipBlockBody(
blockHash, PortalBlockBodyLegacy.fromBlockBody(body)
)
).isOkOr:
error "Failed to gossip block body", error
if not receiptsSuccess:
(
await portalClient.gossipReceipts(
blockHash, PortalReceipts.fromReceipts(receipts)
)
).isOkOr:
error "Failed to gossip receipts", error
await sleepAsync(2.seconds)
proc runHistory*(config: PortalBridgeConf) =
let
portalClient = newRpcHttpClient()
@ -427,7 +566,12 @@ proc runHistory*(config: PortalBridgeConf) =
asyncSpawn runLatestLoop(portalClient, web3Client, config.blockVerify)
if config.backfill:
asyncSpawn runBackfillLoop(portalClient, web3Client, config.era1Dir.string)
if config.audit:
asyncSpawn runBackfillLoopAuditMode(
portalClient, web3Client, config.era1Dir.string
)
else:
asyncSpawn runBackfillLoop(portalClient, web3Client, config.era1Dir.string)
while true:
poll()