From 1159b0114e188844ef47fce4916335e3c3c365de Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Mon, 11 Mar 2024 18:20:29 +0100 Subject: [PATCH] Add portal_bridge history mode (#2067) Portal bridge mode for following latest and injecting the latest block data into the Portal network. --- .../docs/history-content-bridging.md | 50 ++- fluffy/network/history/history_network.nim | 4 +- fluffy/tools/portal_bridge/portal_bridge.nim | 16 +- .../portal_bridge/portal_bridge_conf.nim | 36 +- .../portal_bridge/portal_bridge_history.nim | 312 ++++++++++++++++++ .../nodocker/engine/engine_client.nim | 6 +- 6 files changed, 399 insertions(+), 25 deletions(-) create mode 100644 fluffy/tools/portal_bridge/portal_bridge_history.nim diff --git a/fluffy/docs/the_fluffy_book/docs/history-content-bridging.md b/fluffy/docs/the_fluffy_book/docs/history-content-bridging.md index 9db309526..258f0a171 100644 --- a/fluffy/docs/the_fluffy_book/docs/history-content-bridging.md +++ b/fluffy/docs/the_fluffy_book/docs/history-content-bridging.md @@ -2,7 +2,43 @@ ## From content bridges -### Seeding post-merge block data through the `beacon_lc_bridge` +### Seeding history data with the `portal_bridge` + +#### Step 1: Run a Portal client + +Run a Portal client with the Portal JSON-RPC API enabled, e.g. fluffy: + +```bash +./build/fluffy --rpc +``` + +#### Step 2: Run an EL client + +The portal_bridge needs access to the EL JSON-RPC API, either through a local +Ethereum client or via a web3 provider. + +#### Step 3: Run the Portal bridge in history mode + +Build & run the `portal_bridge`: +```bash +make portal_bridge + +WEB3_URL="http://127.0.0.1:8546" # Replace with your provider. +./build/portal_bridge history --web3-url:${WEB3_URL} +``` + +### Seeding post-merge history data with the `beacon_lc_bridge` + +The `beacon_lc_bridge` is more of a standalone bridge that does not require access to a full node with its EL JSON-RPC API. However it is also more limited in the functions it provides. +It will start with the consensus light client sync and follow beacon block gossip. Once it is synced, the execution payload of new beacon blocks will be extracted and injected in the Portal network as execution headers +and blocks. + +> Note: The execution headers will come without a proof. + +The injection into the Portal network is done via the +`portal_historyGossip` JSON-RPC endpoint of the running Fluffy node. + +> Note: Backfilling of block bodies and headers is not yet supported. Run a Fluffy node with the JSON-RPC API enabled. @@ -18,18 +54,6 @@ TRUSTED_BLOCK_ROOT=0x12345678901234567890123456789012345678901234567890123456789 ./build/beacon_lc_bridge --trusted-block-root=${TRUSTED_BLOCK_ROOT} ``` -The `beacon_lc_bridge` will start with the consensus light client sync follow -beacon block gossip. Once it is synced, the execution payload of new beacon -blocks will be extracted and injected in the Portal network as execution headers -and blocks. - -> Note: The execution headers will come without a proof for now. - -The injection into the Portal network is done via the -`portal_historyGossip` JSON-RPC endpoint of the running Fluffy node. - -> Note: Backfilling of block bodies and headers is not yet supported. - ## From locally stored block data ### Building and seeding epoch accumulators diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index a02e97d2a..9a4abbef3 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -228,7 +228,7 @@ func validateBlockHeaderBytes*( else: ok(header) -proc validateBlockBody( +proc validateBlockBody*( body: PortalBlockBodyLegacy, header: BlockHeader ): Result[void, string] = ## Validate the block body against the txRoot and ommersHash from the header. @@ -245,7 +245,7 @@ proc validateBlockBody( ok() -proc validateBlockBody( +proc validateBlockBody*( body: PortalBlockBodyShanghai, header: BlockHeader ): Result[void, string] = ## Validate the block body against the txRoot, ommersHash and withdrawalsRoot diff --git a/fluffy/tools/portal_bridge/portal_bridge.nim b/fluffy/tools/portal_bridge/portal_bridge.nim index fb390a310..0e280ca75 100644 --- a/fluffy/tools/portal_bridge/portal_bridge.nim +++ b/fluffy/tools/portal_bridge/portal_bridge.nim @@ -13,7 +13,7 @@ # # Beacon Network: # -# For the beacon network a consensus full node is require on one side, +# For the beacon network a consensus full node is required on one side, # making use of the Beacon Node REST-API, and a Portal node on the other side, # making use of the Portal JSON-RPC API. # @@ -26,10 +26,17 @@ # # Updates, optimistic updates and finality updates are injected as they become # available. +# Updating these as better updates come available is not yet implemented. # # History network: # -# To be implemented +# For the history network a execution client is required on one side, making use +# of the EL JSON-RPC API, and a Portal node on the other side, making use of the +# Portal JSON-RPC API. +# +# Portal Network <-> Portal Client (e.g. fluffy) <--Portal JSON-RPC--> bridge <--EL JSON-RPC--> execution client / web3 provider +# +# Backfilling is not yet implemented. Backfilling will make use of Era1 files. # # State network: # @@ -51,8 +58,7 @@ import ../../rpc/portal_rpc_client, ../../logging, ../eth_data_exporter/cl_data_exporter, - ./portal_bridge_conf, - ./portal_bridge_beacon + ./[portal_bridge_conf, portal_bridge_beacon, portal_bridge_history] proc runBeacon(config: PortalBridgeConf) {.raises: [CatchableError].} = notice "Launching Fluffy beacon chain bridge", cmdParams = commandLineParams() @@ -240,6 +246,6 @@ when isMainModule: of PortalBridgeCmd.beacon: runBeacon(config) of PortalBridgeCmd.history: - notice "Functionality not yet implemented" + runHistory(config) of PortalBridgeCmd.state: notice "Functionality not yet implemented" diff --git a/fluffy/tools/portal_bridge/portal_bridge_conf.nim b/fluffy/tools/portal_bridge/portal_bridge_conf.nim index 0c0c7ff4d..0404357ff 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_conf.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_conf.nim @@ -7,13 +7,21 @@ {.push raises: [].} -import confutils, confutils/std/net, nimcrypto/hash, ../../logging +import std/uri, confutils, confutils/std/net, nimcrypto/hash, ../../logging export net type TrustedDigest* = MDigest[32 * 8] + Web3UrlKind* = enum + HttpUrl + WsUrl + + Web3Url* = object + kind*: Web3UrlKind + url*: string + PortalBridgeCmd* = enum beacon = "Run a Portal bridge for the beacon network" history = "Run a Portal bridge for the history network" @@ -68,7 +76,13 @@ type name: "trusted-block-root" .}: Option[TrustedDigest] of PortalBridgeCmd.history: - discard + web3Url* {.desc: "Execution layer JSON-RPC API URL", name: "web3-url".}: Web3Url + + blockVerify* {. + desc: "Verify the block header, body and receipts", + defaultValue: false, + name: "block-verify" + .}: bool of PortalBridgeCmd.state: discard @@ -77,3 +91,21 @@ func parseCmdArg*(T: type TrustedDigest, input: string): T {.raises: [ValueError func completeCmdArg*(T: type TrustedDigest, input: string): seq[string] = return @[] + +proc parseCmdArg*(T: type Web3Url, p: string): T {.raises: [ValueError].} = + let + url = parseUri(p) + normalizedScheme = url.scheme.toLowerAscii() + + if (normalizedScheme == "http" or normalizedScheme == "https"): + Web3Url(kind: HttpUrl, url: p) + elif (normalizedScheme == "ws" or normalizedScheme == "wss"): + Web3Url(kind: WsUrl, url: p) + else: + raise newException( + ValueError, + "The Web3 URL must specify one of following protocols: http/https/ws/wss", + ) + +proc completeCmdArg*(T: type Web3Url, val: string): seq[string] = + return @[] diff --git a/fluffy/tools/portal_bridge/portal_bridge_history.nim b/fluffy/tools/portal_bridge/portal_bridge_history.nim new file mode 100644 index 000000000..b20adbf69 --- /dev/null +++ b/fluffy/tools/portal_bridge/portal_bridge_history.nim @@ -0,0 +1,312 @@ +# Fluffy +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + chronos, + chronicles, + web3/[eth_api, eth_api_types], + results, + stew/byteutils, + eth/common/[eth_types, eth_types_rlp], + ../../../nimbus/beacon/web3_eth_conv, + ../../../hive_integration/nodocker/engine/engine_client, + ../../rpc/[portal_rpc_client], + ../../network/history/[history_content, history_network], + ./portal_bridge_conf + +from stew/objects import checkedEnumAssign + +const newHeadPollInterval = 6.seconds # Slot with potential block is every 12s + +## Conversion functions for Block and Receipts + +func asEthBlock(blockObject: BlockObject): EthBlock = + let + header = blockObject.toBlockHeader() + transactions = toTransactions(blockObject.transactions) + withdrawals = toWithdrawals(blockObject.withdrawals) + + EthBlock(header: header, txs: transactions, withdrawals: withdrawals) + +func asPortalBlock( + ethBlock: EthBlock +): (BlockHeaderWithProof, PortalBlockBodyShanghai) = + var transactions: Transactions + for tx in ethBlock.txs: + discard transactions.add(TransactionByteList(rlp.encode(tx))) + + var withdrawals: Withdrawals + doAssert ethBlock.withdrawals.isSome() # TODO: always the case? also when empty? + for w in ethBlock.withdrawals.get(): + discard withdrawals.add(WithdrawalByteList(rlp.encode(w))) + + let + headerWithProof = BlockHeaderWithProof( + header: ByteList(rlp.encode(ethBlock.header)), proof: BlockHeaderProof.init() + ) + portalBody = PortalBlockBodyShanghai( + transactions: transactions, uncles: Uncles(@[byte 0xc0]), withdrawals: withdrawals + ) + + (headerWithProof, portalBody) + +func asTxType(quantity: Option[Quantity]): Result[TxType, string] = + let value = quantity.get(0.Quantity).uint8 + var txType: TxType + if not checkedEnumAssign(txType, value): + err("Invalid data for TxType: " & $value) + else: + ok(txType) + +func asReceipt(receiptObject: ReceiptObject): Result[Receipt, string] = + let receiptType = asTxType(receiptObject.`type`).valueOr: + return err("Failed conversion to TxType" & error) + + var logs: seq[Log] + if receiptObject.logs.len > 0: + for log in receiptObject.logs: + var topics: seq[eth_types.Topic] + for topic in log.topics: + topics.add(eth_types.Topic(topic)) + + logs.add(Log(address: ethAddr log.address, data: log.data, topics: topics)) + + let cumulativeGasUsed = receiptObject.cumulativeGasUsed.GasInt + if receiptObject.status.isSome(): + let status = receiptObject.status.get().int + ok( + Receipt( + receiptType: receiptType, + isHash: false, + status: status == 1, + cumulativeGasUsed: cumulativeGasUsed, + bloom: BloomFilter(receiptObject.logsBloom), + logs: logs, + ) + ) + elif receiptObject.root.isSome(): + ok( + Receipt( + receiptType: receiptType, + isHash: true, + hash: ethHash receiptObject.root.get(), + cumulativeGasUsed: cumulativeGasUsed, + bloom: BloomFilter(receiptObject.logsBloom), + logs: logs, + ) + ) + else: + err("No root nor status field in the JSON receipt object") + +func asReceipts(receiptObjects: seq[ReceiptObject]): Result[seq[Receipt], string] = + var receipts: seq[Receipt] + for receiptObject in receiptObjects: + let receipt = asReceipt(receiptObject).valueOr: + return err(error) + receipts.add(receipt) + + ok(receipts) + +## EL JSON-RPC API helper calls for requesting block and receipts + +proc getBlockByNumber( + client: RpcClient, blockTag: RtBlockIdentifier, fullTransactions: bool = true +): Future[Result[BlockObject, string]] {.async: (raises: []).} = + let blck = + try: + let res = await client.eth_getBlockByNumber(blockTag, fullTransactions) + if res.isNil: + return err("failed to get latest blockHeader") + + res + except CatchableError as e: + return err("JSON-RPC eth_getBlockByNumber failed: " & e.msg) + + return ok(blck) + +proc getBlockReceipts( + client: RpcClient, blockNumber: uint64 +): Future[Result[seq[ReceiptObject], string]] {.async: (raises: []).} = + let res = + try: + await client.eth_getBlockReceipts(blockNumber) + except CatchableError as e: + return err("JSON-RPC eth_getBlockReceipts failed: " & e.msg) + if res.isNone(): + err("Failed getting receipts") + else: + ok(res.get()) + +## Portal JSON-RPC API helper calls for pushing block and receipts + +proc gossipBlockHeader( + client: RpcClient, + hash: common_types.BlockHash, + headerWithProof: BlockHeaderWithProof, +): Future[Result[void, string]] {.async: (raises: []).} = + let + contentKey = history_content.ContentKey.init(blockHeader, hash) + encodedContentKeyHex = contentKey.encode.asSeq().toHex() + + peers = + try: + await client.portal_historyGossip( + encodedContentKeyHex, SSZ.encode(headerWithProof).toHex() + ) + except CatchableError as e: + return err("JSON-RPC error: " & $e.msg) + + info "Block header gossiped", peers, contentKey = encodedContentKeyHex + return ok() + +proc gossipBlockBody( + client: RpcClient, hash: common_types.BlockHash, body: PortalBlockBodyShanghai +): Future[Result[void, string]] {.async: (raises: []).} = + let + contentKey = history_content.ContentKey.init(blockBody, hash) + encodedContentKeyHex = contentKey.encode.asSeq().toHex() + + peers = + try: + await client.portal_historyGossip( + encodedContentKeyHex, SSZ.encode(body).toHex() + ) + except CatchableError as e: + return err("JSON-RPC error: " & $e.msg) + + info "Block body gossiped", peers, contentKey = encodedContentKeyHex + return ok() + +proc gossipReceipts( + client: RpcClient, hash: common_types.BlockHash, receipts: PortalReceipts +): Future[Result[void, string]] {.async: (raises: []).} = + let + contentKey = + history_content.ContentKey.init(history_content.ContentType.receipts, hash) + encodedContentKeyHex = contentKey.encode.asSeq().toHex() + + peers = + try: + await client.portal_historyGossip( + encodedContentKeyHex, SSZ.encode(receipts).toHex() + ) + except CatchableError as e: + return err("JSON-RPC error: " & $e.msg) + + info "Receipts gossiped", peers, contentKey = encodedContentKeyHex + return ok() + +proc runLatestLoop( + portalClient: RpcClient, web3Client: RpcClient, validate = false +) {.async: (raises: [CancelledError]).} = + ## Loop that requests the latest block + receipts and pushes them into the + ## Portal network. + ## Current strategy is to poll for the latest block and receipts, and then + ## convert the data (optionally verify it) and push it into the Portal network. + ## If the EL JSON-RPC API calls fail, 1 second is waited before retrying. + ## If the Portal JSON-RPC API calls fail, the error is logged and the loop + ## continues. + ## TODO: Might want to add retries on Portal JSON-RPC API call failures too. + ## TODO: Investigate Portal side JSON-RPC error seen: + ## "JSON-RPC error: Request Entity Too Large" + let blockId = blockId("latest") + var lastBlockNumber = 0'u64 + while true: + let t0 = Moment.now() + let blockObject = (await getBlockByNumber(web3Client, blockId)).valueOr: + error "Failed to get latest block", error + await sleepAsync(1.seconds) + continue + + let blockNumber = distinctBase(blockObject.number) + if blockNumber > lastBlockNumber: + let receiptObjects = (await web3Client.getBlockReceipts(blockNumber)).valueOr: + error "Failed to get latest receipts", error + await sleepAsync(1.seconds) + continue + + let + ethBlock = blockObject.asEthBlock() + (headerWithProof, body) = ethBlock.asPortalBlock() + + receipts = receiptObjects.asReceipts().valueOr: + # Note: this failure should not occur. It would mean invalid encoded + # receipts by provider + error "Error converting JSON RPC receipt objects", error + await sleepAsync(1.seconds) + continue + portalReceipts = PortalReceipts.fromReceipts(receipts) + + lastBlockNumber = blockNumber + + let hash = common_types.BlockHash(data: distinctBase(blockObject.hash)) + if validate: + if validateBlockHeaderBytes(headerWithProof.header.asSeq(), hash).isErr(): + error "Block header is invalid" + continue + if validateBlockBody(body, ethBlock.header).isErr(): + error "Block body is invalid" + continue + if validateReceipts(portalReceipts, ethBlock.header.receiptRoot).isErr(): + error "Receipts root is invalid" + continue + + # gossip block header + (await portalClient.gossipBlockHeader(hash, headerWithProof)).isOkOr: + error "Failed to gossip block header", error + + # For bodies & receipts to get verified, the header needs to be available + # on the network. Wait a little to get the headers propagated through + # the network. + await sleepAsync(2.seconds) + + # gossip block body + (await portalClient.gossipBlockBody(hash, body)).isOkOr: + error "Failed to gossip block body", error + + # gossip receipts + (await portalClient.gossipReceipts(hash, portalReceipts)).isOkOr: + error "Failed to gossip receipts", error + + # Making sure here that we poll enough times not to miss a block. + # We could also do some work without awaiting it, e.g. the gossiping or + # the requesting the receipts during the sleep time. But we also want to + # avoid creating a backlog of requests or gossip. + let t1 = Moment.now() + let elapsed = t1 - t0 + if elapsed < newHeadPollInterval: + await sleepAsync(newHeadPollInterval - elapsed) + else: + warn "Block gossip took longer than the poll interval" + +proc runHistory*(config: PortalBridgeConf) = + let + portalClient = newRpcHttpClient() + # TODO: Use Web3 object? + web3Client: RpcClient = + case config.web3Url.kind + of HttpUrl: + newRpcHttpClient() + of WsUrl: + newRpcWebSocketClient() + try: + waitFor portalClient.connect(config.rpcAddress, Port(config.rpcPort), false) + except CatchableError as e: + error "Failed to connect to portal RPC", error = $e.msg + + if config.web3Url.kind == HttpUrl: + try: + waitFor (RpcHttpClient(web3Client)).connect(config.web3Url.url) + except CatchableError as e: + error "Failed to connect to web3 RPC", error = $e.msg + + asyncSpawn runLatestLoop(portalClient, web3Client, config.blockVerify) + + while true: + poll() diff --git a/hive_integration/nodocker/engine/engine_client.nim b/hive_integration/nodocker/engine/engine_client.nim index 45457d2fa..03e8d0abb 100644 --- a/hive_integration/nodocker/engine/engine_client.nim +++ b/hive_integration/nodocker/engine/engine_client.nim @@ -236,7 +236,7 @@ proc maybeInt(n: Option[Quantity]): Option[int] = return none(int) some(n.get.int) -proc toBlockHeader(bc: BlockObject): common.BlockHeader = +proc toBlockHeader*(bc: BlockObject): common.BlockHeader = common.BlockHeader( blockNumber : toBlockNumber(bc.number), parentHash : ethHash bc.parentHash, @@ -299,7 +299,7 @@ proc toTransaction(tx: TransactionObject): Transaction = S : tx.s, ) -proc toTransactions(txs: openArray[TxOrHash]): seq[Transaction] = +proc toTransactions*(txs: openArray[TxOrHash]): seq[Transaction] = for x in txs: doAssert x.kind == tohTx result.add toTransaction(x.tx) @@ -317,7 +317,7 @@ proc toWithdrawals(list: seq[WithdrawalObject]): seq[Withdrawal] = for wd in list: result.add toWithdrawal(wd) -proc toWithdrawals(list: Option[seq[WithdrawalObject]]): Option[seq[Withdrawal]] = +proc toWithdrawals*(list: Option[seq[WithdrawalObject]]): Option[seq[Withdrawal]] = if list.isNone: return none(seq[Withdrawal]) some(toWithdrawals(list.get))