diff --git a/fluffy/database/era1_db.nim b/fluffy/database/era1_db.nim new file mode 100644 index 000000000..3ea7c8f8a --- /dev/null +++ b/fluffy/database/era1_db.nim @@ -0,0 +1,66 @@ +# fluffy +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import std/os, stew/io2, results, ../network/history/accumulator, ../eth_data/era1 + +type Era1DB* = ref object + ## The Era1 database manages a collection of era files that together make up + ## a linear history of pre-merge execution chain data. + path: string + network: string + accumulator: FinishedAccumulator + files: seq[Era1File] + +proc getEra1File(db: Era1DB, era: Era1): Result[Era1File, string] = + for f in db.files: + if f.blockIdx.startNumber.era == era: + return ok(f) + + if era > mergeBlockNumber.era(): + return err("Selected era1 past pre-merge data") + + let + root = db.accumulator.historicalEpochs[era.int] + name = era1FileName(db.network, era, Digest(data: root)) + path = db.path / name + + if not isFile(path): + return err("No such era file") + + # TODO: The open call does not do full verification. It is assumed here that + # trusted files are used. We might want to add a full validation option. + let f = Era1File.open(path).valueOr: + return err(error) + + if db.files.len > 16: # TODO LRU + close(db.files[0]) + db.files.delete(0) + + db.files.add(f) + ok(f) + +proc new*( + T: type Era1DB, path: string, network: string, accumulator: FinishedAccumulator +): Era1DB = + Era1DB(path: path, network: network, accumulator: accumulator) + +proc getBlockTuple*(db: Era1DB, blockNumber: uint64): Result[BlockTuple, string] = + let f = ?db.getEra1File(blockNumber.era) + + f.getBlockTuple(blockNumber) + +proc getAccumulator*( + db: Era1DB, blockNumber: uint64 +): Result[EpochAccumulatorCached, string] = + ## Get the Epoch Accumulator that the block with `blockNumber` is part of. + # TODO: Probably want this `EpochAccumulatorCached` also actually cached in + # the Era1File or EraDB object. + let f = ?db.getEra1File(blockNumber.era) + + f.buildAccumulator() diff --git a/fluffy/eth_data/era1.nim b/fluffy/eth_data/era1.nim index 6ef561164..225e67c70 100644 --- a/fluffy/eth_data/era1.nim +++ b/fluffy/eth_data/era1.nim @@ -260,7 +260,7 @@ func era1FileName*(network: string, era: Era1, eraRoot: Digest): string = type Era1File* = ref object handle: Opt[IoHandle] - blockIdx: BlockIndex + blockIdx*: BlockIndex BlockTuple* = tuple[header: BlockHeader, body: BlockBody, receipts: seq[Receipt], td: UInt256] diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 9a4abbef3..5816125c8 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -136,7 +136,7 @@ func fromPortalReceipts*( # forks but not for the EL BlockBody (usage of Option) does not play so well # together. -func fromBlockBody(T: type PortalBlockBodyLegacy, body: BlockBody): T = +func fromBlockBody*(T: type PortalBlockBodyLegacy, body: BlockBody): T = var transactions: Transactions for tx in body.transactions: discard transactions.add(TransactionByteList(rlp.encode(tx))) @@ -145,7 +145,7 @@ func fromBlockBody(T: type PortalBlockBodyLegacy, body: BlockBody): T = PortalBlockBodyLegacy(transactions: transactions, uncles: uncles) -func fromBlockBody(T: type PortalBlockBodyShanghai, body: BlockBody): T = +func fromBlockBody*(T: type PortalBlockBodyShanghai, body: BlockBody): T = var transactions: Transactions for tx in body.transactions: discard transactions.add(TransactionByteList(rlp.encode(tx))) diff --git a/fluffy/rpc/rpc_calls/rpc_portal_calls.nim b/fluffy/rpc/rpc_calls/rpc_portal_calls.nim index 5579b165f..6eb4b8142 100644 --- a/fluffy/rpc/rpc_calls/rpc_portal_calls.nim +++ b/fluffy/rpc/rpc_calls/rpc_portal_calls.nim @@ -25,7 +25,7 @@ createRpcSigsFromNim(RpcClient): proc portal_stateFindContent(enr: Record, contentKey: string): JsonNode proc portal_stateOffer(enr: Record, contentKey: string, contentValue: string): string proc portal_stateRecursiveFindNodes(nodeId: NodeId): seq[Record] - proc portal_stateRecursiveFindContent(contentKey: string): string + proc portal_stateRecursiveFindContent(contentKey: string): ContentInfo proc portal_stateStore(contentKey: string, contentValue: string): bool proc portal_stateLocalContent(contentKey: string): string proc portal_stateGossip(contentKey: string, contentValue: string): int @@ -46,7 +46,7 @@ createRpcSigsFromNim(RpcClient): ): string proc portal_historyRecursiveFindNodes(nodeId: NodeId): seq[Record] - proc portal_historyRecursiveFindContent(contentKey: string): string + proc portal_historyRecursiveFindContent(contentKey: string): ContentInfo proc portal_historyStore(contentKey: string, contentValue: string): bool proc portal_historyLocalContent(contentKey: string): string proc portal_historyGossip(contentKey: string, contentValue: string): int @@ -64,7 +64,7 @@ createRpcSigsFromNim(RpcClient): proc portal_beaconFindContent(enr: Record, contentKey: string): JsonNode proc portal_beaconOffer(enr: Record, contentKey: string, contentValue: string): string proc portal_beaconRecursiveFindNodes(nodeId: NodeId): seq[Record] - proc portal_beaconRecursiveFindContent(contentKey: string): string + proc portal_beaconRecursiveFindContent(contentKey: string): ContentInfo proc portal_beaconStore(contentKey: string, contentValue: string): bool proc portal_beaconLocalContent(contentKey: string): string proc portal_beaconGossip(contentKey: string, contentValue: string): int diff --git a/fluffy/rpc/rpc_types.nim b/fluffy/rpc/rpc_types.nim index b0dc67cd9..54c484bb3 100644 --- a/fluffy/rpc/rpc_types.nim +++ b/fluffy/rpc/rpc_types.nim @@ -26,9 +26,14 @@ type PingResult* = tuple[enrSeq: uint64, dataRadius: UInt256] + ContentInfo* = object + content*: string + utpTransfer*: bool + NodeInfo.useDefaultSerializationIn JrpcConv RoutingTableInfo.useDefaultSerializationIn JrpcConv (string, string).useDefaultSerializationIn JrpcConv +ContentInfo.useDefaultSerializationIn JrpcConv func getNodeInfo*(r: RoutingTable): NodeInfo = NodeInfo(enr: r.localNode.record, nodeId: r.localNode.id) diff --git a/fluffy/tools/content_verifier.nim b/fluffy/tools/content_verifier.nim index 6fa4090c1..817d39e85 100644 --- a/fluffy/tools/content_verifier.nim +++ b/fluffy/tools/content_verifier.nim @@ -45,11 +45,11 @@ proc checkAccumulators(client: RpcClient) {.async.} = let contentKey = ContentKey.init(epochAccumulator, root) try: - let content = await client.portal_historyRecursiveFindContent( + let contentInfo = await client.portal_historyRecursiveFindContent( contentKey.encode.asSeq().toHex() ) - let res = decodeSsz(hexToSeqByte(content), EpochAccumulator) + let res = decodeSsz(hexToSeqByte(contentInfo.content), EpochAccumulator) if res.isErr(): echo "[Invalid] EpochAccumulator number " & $i & ": " & $root & " error: " & res.error diff --git a/fluffy/tools/portal_bridge/portal_bridge_conf.nim b/fluffy/tools/portal_bridge/portal_bridge_conf.nim index 4832ea019..f72294424 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_conf.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_conf.nim @@ -108,11 +108,18 @@ type backfill* {. desc: - "Randomly backfill block headers, bodies and receipts into the network from the era1 files", + "Randomly backfill pre-merge block headers, bodies and receipts into the network from the era1 files", defaultValue: false, name: "backfill" .}: bool + audit* {. + desc: + "Run pre-merge backfill in audit mode, which will only gossip content that if failed to fetch from the network", + defaultValue: false, + name: "audit" + .}: bool + era1Dir* {. desc: "The directory where all era1 files are stored", defaultValue: defaultEra1DataDir(), diff --git a/fluffy/tools/portal_bridge/portal_bridge_history.nim b/fluffy/tools/portal_bridge/portal_bridge_history.nim index 9bf875303..246ce4b40 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_history.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_history.nim @@ -22,6 +22,7 @@ import ../../network/history/[history_content, history_network], ../../network_metadata, ../../eth_data/[era1, history_data_ssz_e2s, history_data_seeding], + ../../database/era1_db, ./portal_bridge_conf from stew/objects import checkedEnumAssign @@ -170,7 +171,9 @@ proc gossipBlockHeader( return ok() proc gossipBlockBody( - client: RpcClient, hash: common_types.BlockHash, body: PortalBlockBodyShanghai + client: RpcClient, + hash: common_types.BlockHash, + body: PortalBlockBodyLegacy | PortalBlockBodyShanghai, ): Future[Result[void, string]] {.async: (raises: []).} = let contentKey = history_content.ContentKey.init(blockBody, hash) @@ -402,6 +405,142 @@ proc runBackfillLoop( info "Succesfully gossiped era1 file", eraFile +proc runBackfillLoopAuditMode( + portalClient: RpcClient, web3Client: RpcClient, era1Dir: string +) {.async: (raises: [CancelledError]).} = + let + rng = newRng() + db = Era1DB.new(era1Dir, "mainnet", loadAccumulator()) + + while true: + let + # Grab a random blockNumber to audit and potentially gossip + blockNumber = rng[].rand(network_metadata.mergeBlockNumber - 1).uint64 + (header, body, receipts, _) = db.getBlockTuple(blockNumber).valueOr: + error "Failed to get block tuple", error, blockNumber + continue + blockHash = header.blockHash() + + var headerSuccess, bodySuccess, receiptsSuccess = false + + logScope: + blockNumber = blockNumber + + # header + block headerBlock: + let + contentKey = ContentKey.init(blockHeader, blockHash) + contentHex = + try: + ( + await portalClient.portal_historyRecursiveFindContent( + contentKey.encode.asSeq().toHex() + ) + ).content + except CatchableError as e: + error "Failed to find block header content", error = e.msg + break headerBlock + content = + try: + hexToSeqByte(contentHex) + except ValueError as e: + error "Invalid hex for block header content", error = e.msg + break headerBlock + + headerWithProof = decodeSsz(content, BlockHeaderWithProof).valueOr: + error "Failed to decode block header content", error + break headerBlock + + if keccakHash(headerWithProof.header.asSeq()) != blockHash: + error "Block hash mismatch", blockNumber + break headerBlock + + info "Retrieved block header from Portal network" + headerSuccess = true + + # body + block bodyBlock: + let + contentKey = ContentKey.init(blockBody, blockHash) + contentHex = + try: + ( + await portalClient.portal_historyRecursiveFindContent( + contentKey.encode.asSeq().toHex() + ) + ).content + except CatchableError as e: + error "Failed to find block body content", error = e.msg + break bodyBlock + content = + try: + hexToSeqByte(contentHex) + except ValueError as e: + error "Invalid hex for block body content", error = e.msg + break bodyBlock + + validateBlockBodyBytes(content, header).isOkOr: + error "Block body is invalid", error + break bodyBlock + + info "Retrieved block body from Portal network" + bodySuccess = true + + # receipts + block receiptsBlock: + let + contentKey = ContentKey.init(ContentType.receipts, blockHash) + contentHex = + try: + ( + await portalClient.portal_historyRecursiveFindContent( + contentKey.encode.asSeq().toHex() + ) + ).content + except CatchableError as e: + error "Failed to find block receipts content", error = e.msg + break receiptsBlock + content = + try: + hexToSeqByte(contentHex) + except ValueError as e: + error "Invalid hex for block receipts content", error = e.msg + break receiptsBlock + + validateReceiptsBytes(content, header.receiptRoot).isOkOr: + error "Block receipts are invalid", error + break receiptsBlock + + info "Retrieved block receipts from Portal network" + receiptsSuccess = true + + # Gossip missing content + if not headerSuccess: + let + epochAccumulator = db.getAccumulator(blockNumber).valueOr: + raiseAssert "Failed to get accumulator from EraDB: " & error + headerWithProof = buildHeaderWithProof(header, epochAccumulator).valueOr: + raiseAssert "Failed to build header with proof: " & error + + (await portalClient.gossipBlockHeader(blockHash, headerWithProof)).isOkOr: + error "Failed to gossip block header", error + if not bodySuccess: + ( + await portalClient.gossipBlockBody( + blockHash, PortalBlockBodyLegacy.fromBlockBody(body) + ) + ).isOkOr: + error "Failed to gossip block body", error + if not receiptsSuccess: + ( + await portalClient.gossipReceipts( + blockHash, PortalReceipts.fromReceipts(receipts) + ) + ).isOkOr: + error "Failed to gossip receipts", error + + await sleepAsync(2.seconds) + proc runHistory*(config: PortalBridgeConf) = let portalClient = newRpcHttpClient() @@ -427,7 +566,12 @@ proc runHistory*(config: PortalBridgeConf) = asyncSpawn runLatestLoop(portalClient, web3Client, config.blockVerify) if config.backfill: - asyncSpawn runBackfillLoop(portalClient, web3Client, config.era1Dir.string) + if config.audit: + asyncSpawn runBackfillLoopAuditMode( + portalClient, web3Client, config.era1Dir.string + ) + else: + asyncSpawn runBackfillLoop(portalClient, web3Client, config.era1Dir.string) while true: poll()