From 3ea5c531d163ba561f554f115c0534899753021f Mon Sep 17 00:00:00 2001 From: Kim De Mey <7857583+kdeme@users.noreply.github.com> Date: Wed, 20 Nov 2024 20:07:36 +0700 Subject: [PATCH] portal_bridge history: fully support gossip of block by number (#2853) + some minor cleanup & reuse of code --- fluffy/eth_data/era1.nim | 8 +- .../portal_bridge/portal_bridge_conf.nim | 11 ++ .../portal_bridge/portal_bridge_history.nim | 112 ++++++++++-------- vendor/nim-eth | 2 +- 4 files changed, 80 insertions(+), 53 deletions(-) diff --git a/fluffy/eth_data/era1.nim b/fluffy/eth_data/era1.nim index 060fe746e..0469bbeed 100644 --- a/fluffy/eth_data/era1.nim +++ b/fluffy/eth_data/era1.nim @@ -13,7 +13,7 @@ import stew/[endians2, io2, byteutils, arrayops], stint, snappy, - eth/common/[headers, blocks_rlp, receipts_rlp], + eth/common/[headers_rlp, blocks_rlp, receipts_rlp], beacon_chain/spec/beacon_time, ssz_serialization, ncli/e2store, @@ -184,7 +184,7 @@ proc fromCompressedRlpBytes(bytes: openArray[byte], T: type): Result[T, string] try: ok(rlp.decode(decodeFramed(bytes, checkIntegrity = false), T)) except RlpError as e: - err("Invalid Compressed RLP data" & e.msg) + err("Invalid compressed RLP data for " & $T & ": " & e.msg) proc init*(T: type Era1Group, f: IoHandle, startNumber: uint64): Result[T, string] = discard ?f.appendHeader(E2Version, 0) @@ -498,7 +498,7 @@ iterator era1BlockHeaders*(f: Era1File): headers.Header = for blockNumber in startNumber .. endNumber: let header = f.getBlockHeader(blockNumber).valueOr: - raiseAssert("Failed to read block header") + raiseAssert("Failed to read block header: " & error) yield header iterator era1BlockTuples*(f: Era1File): BlockTuple = @@ -508,5 +508,5 @@ iterator era1BlockTuples*(f: Era1File): BlockTuple = for blockNumber in startNumber .. endNumber: let blockTuple = f.getBlockTuple(blockNumber).valueOr: - raiseAssert("Failed to read block header") + raiseAssert("Failed to read block tuple: " & error) yield blockTuple diff --git a/fluffy/tools/portal_bridge/portal_bridge_conf.nim b/fluffy/tools/portal_bridge/portal_bridge_conf.nim index 935ac32ab..9c5c37337 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_conf.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_conf.nim @@ -12,6 +12,8 @@ import confutils, confutils/std/net, nimcrypto/hash, + ../../network_metadata, + ../../eth_data/era1, ../../[conf, logging] export net @@ -36,6 +38,8 @@ proc defaultPortalBridgeStateDir*(): string = else: defaultDataDir() / "bridge" / "state" +const defaultEndEra* = uint64(era(network_metadata.mergeBlockNumber - 1)) + type TrustedDigest* = MDigest[32 * 8] @@ -117,6 +121,13 @@ type name: "backfill" .}: bool + startEra* {.desc: "The era to start from", defaultValue: 0, name: "start-era".}: + uint64 + + endEra* {. + desc: "The era to stop at", defaultValue: defaultEndEra, name: "end-era" + .}: uint64 + audit* {. desc: "Run pre-merge backfill in audit mode, which will only gossip content that if failed to fetch from the network", diff --git a/fluffy/tools/portal_bridge/portal_bridge_history.nim b/fluffy/tools/portal_bridge/portal_bridge_history.nim index df7777be7..ddfe5c989 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_history.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_history.nim @@ -140,7 +140,7 @@ proc getBlockReceipts( proc gossipBlockHeader( client: RpcClient, id: Hash32 | uint64, headerWithProof: BlockHeaderWithProof -): Future[Result[void, string]] {.async: (raises: []).} = +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = let contentKey = blockHeaderContentKey(id) encodedContentKeyHex = contentKey.encode.asSeq().toHex() @@ -154,13 +154,13 @@ proc gossipBlockHeader( return err("JSON-RPC portal_historyGossip failed: " & $e.msg) info "Block header gossiped", peers, contentKey = encodedContentKeyHex - return ok() + ok() proc gossipBlockBody( client: RpcClient, hash: Hash32, body: PortalBlockBodyLegacy | PortalBlockBodyShanghai, -): Future[Result[void, string]] {.async: (raises: []).} = +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = let contentKey = blockBodyContentKey(hash) encodedContentKeyHex = contentKey.encode.asSeq().toHex() @@ -174,11 +174,11 @@ proc gossipBlockBody( return err("JSON-RPC portal_historyGossip failed: " & $e.msg) info "Block body gossiped", peers, contentKey = encodedContentKeyHex - return ok() + ok() proc gossipReceipts( client: RpcClient, hash: Hash32, receipts: PortalReceipts -): Future[Result[void, string]] {.async: (raises: []).} = +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = let contentKey = receiptsContentKey(hash) encodedContentKeyHex = contentKey.encode.asSeq().toHex() @@ -285,7 +285,7 @@ proc gossipHeadersWithProof( era1File: string, epochRecordFile: Opt[string] = Opt.none(string), verifyEra = false, -): Future[Result[void, string]] {.async: (raises: []).} = +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = let f = ?Era1File.open(era1File) if verifyEra: @@ -296,54 +296,72 @@ proc gossipHeadersWithProof( # UX hassle it adds to provide the accumulator ssz files. let epochRecord = if epochRecordFile.isNone: + info "Building accumulator from era1 file", era1File ?f.buildAccumulator() else: ?readEpochRecordCached(epochRecordFile.get()) - for (contentKey, contentValue) in f.headersWithProof(epochRecord): - let peers = - try: - await portalClient.portal_historyGossip( - contentKey.asSeq.toHex(), contentValue.toHex() - ) - except CatchableError as e: - return err("JSON-RPC portal_historyGossip failed: " & $e.msg) - info "Block header gossiped", peers, contentKey + info "Gossip headers from era1 file", era1File + for blockHeader in f.era1BlockHeaders: + doAssert blockHeader.isPreMerge() + + let + headerWithProof = buildHeaderWithProof(blockHeader, epochRecord).valueOr: + raiseAssert "Failed to build header with proof: " & $blockHeader.number + blockHash = blockHeader.rlpHash() + + # gossip block header by hash + ?(await portalClient.gossipBlockHeader(blockHash, headerWithProof)) + # gossip block header by number + ?(await portalClient.gossipBlockHeader(blockHeader.number, headerWithProof)) + + info "Succesfully gossiped headers from era1 file", era1File ok() proc gossipBlockContent( portalClient: RpcClient, era1File: string, verifyEra = false -): Future[Result[void, string]] {.async: (raises: []).} = +): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = let f = ?Era1File.open(era1File) if verifyEra: let _ = ?f.verify() - for (contentKey, contentValue) in f.blockContent(): - let peers = - try: - await portalClient.portal_historyGossip( - contentKey.asSeq.toHex(), contentValue.toHex() - ) - except CatchableError as e: - return err("JSON-RPC portal_historyGossip failed: " & $e.msg) - info "Block content gossiped", peers, contentKey + info "Gossip bodies and receipts from era1 file", era1File + for (header, body, receipts, _) in f.era1BlockTuples: + let blockHash = header.rlpHash() + + # gossip block body + ?( + await portalClient.gossipBlockBody( + blockHash, PortalBlockBodyLegacy.fromBlockBody(body) + ) + ) + + # gossip receipts + ?( + await portalClient.gossipReceipts( + blockHash, PortalReceipts.fromReceipts(receipts) + ) + ) + + info "Succesfully gossiped bodies and receipts from era1 file", era1File ok() proc runBackfillLoop( - portalClient: RpcClient, web3Client: RpcClient, era1Dir: string + portalClient: RpcClient, + web3Client: RpcClient, + era1Dir: string, + startEra: uint64, + endEra: uint64, ) {.async: (raises: [CancelledError]).} = - let - rng = newRng() - accumulator = loadAccumulator() - while true: + let accumulator = loadAccumulator() + + for era in startEra .. endEra: let - # Grab a random era1 to backfill - era = rng[].rand(int(era(network_metadata.mergeBlockNumber - 1))) root = accumulator.historicalEpochs[era] - eraFile = era1Dir / era1FileName("mainnet", Era1(era), Digest(data: root)) + era1File = era1Dir / era1FileName("mainnet", Era1(era), Digest(data: root)) # Note: # There are two design options here: @@ -360,40 +378,36 @@ proc runBackfillLoop( # new era1 can be gossiped (might need another custom json-rpc that checks # the offer queue) when false: - info "Gossip headers from era1 file", eraFile + info "Gossip headers from era1 file", era1File let headerRes = try: - await portalClient.portal_debug_historyGossipHeaders(eraFile) + await portalClient.portal_debug_historyGossipHeaders(era1File) except CatchableError as e: error "JSON-RPC portal_debug_historyGossipHeaders failed", error = e.msg false if headerRes: - info "Gossip block content from era1 file", eraFile + info "Gossip block content from era1 file", era1File let res = try: - await portalClient.portal_debug_historyGossipBlockContent(eraFile) + await portalClient.portal_debug_historyGossipBlockContent(era1File) except CatchableError as e: error "JSON-RPC portal_debug_historyGossipBlockContent failed", error = e.msg false if res: - error "Failed to gossip block content from era1 file", eraFile + error "Failed to gossip block content from era1 file", era1File else: - error "Failed to gossip headers from era1 file", eraFile + error "Failed to gossip headers from era1 file", era1File else: - info "Gossip headers from era1 file", eraFile - (await portalClient.gossipHeadersWithProof(eraFile)).isOkOr: - error "Failed to gossip headers from era1 file", error, eraFile + (await portalClient.gossipHeadersWithProof(era1File)).isOkOr: + error "Failed to gossip headers from era1 file", error, era1File continue - info "Gossip block content from era1 file", eraFile - (await portalClient.gossipBlockContent(eraFile)).isOkOr: - error "Failed to gossip block content from era1 file", error, eraFile + (await portalClient.gossipBlockContent(era1File)).isOkOr: + error "Failed to gossip block content from era1 file", error, era1File continue - info "Succesfully gossiped era1 file", eraFile - proc runBackfillLoopAuditMode( portalClient: RpcClient, web3Client: RpcClient, era1Dir: string ) {.async: (raises: [CancelledError]).} = @@ -548,7 +562,9 @@ proc runHistory*(config: PortalBridgeConf) = portalClient, web3Client, config.era1Dir.string ) else: - asyncSpawn runBackfillLoop(portalClient, web3Client, config.era1Dir.string) + asyncSpawn runBackfillLoop( + portalClient, web3Client, config.era1Dir.string, config.startEra, config.endEra + ) while true: poll() diff --git a/vendor/nim-eth b/vendor/nim-eth index 88e4be4dc..dc092ca39 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit 88e4be4dc40e044834dca68f8b69d144744bf145 +Subproject commit dc092ca39303b030b42aa405e8d5f2f44f21b457