Add portal_bridge history mode (#2067)

Portal bridge mode for following latest and injecting the latest
block data into the Portal network.
This commit is contained in:
Kim De Mey 2024-03-11 18:20:29 +01:00 committed by GitHub
parent 332ec75d5a
commit 1159b0114e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 399 additions and 25 deletions

View File

@ -2,7 +2,43 @@
## From content bridges
### Seeding post-merge block data through the `beacon_lc_bridge`
### Seeding history data with the `portal_bridge`
#### Step 1: Run a Portal client
Run a Portal client with the Portal JSON-RPC API enabled, e.g. fluffy:
```bash
./build/fluffy --rpc
```
#### Step 2: Run an EL client
The portal_bridge needs access to the EL JSON-RPC API, either through a local
Ethereum client or via a web3 provider.
#### Step 3: Run the Portal bridge in history mode
Build & run the `portal_bridge`:
```bash
make portal_bridge
WEB3_URL="http://127.0.0.1:8546" # Replace with your provider.
./build/portal_bridge history --web3-url:${WEB3_URL}
```
### Seeding post-merge history data with the `beacon_lc_bridge`
The `beacon_lc_bridge` is more of a standalone bridge that does not require access to a full node with its EL JSON-RPC API. However it is also more limited in the functions it provides.
It will start with the consensus light client sync and follow beacon block gossip. Once it is synced, the execution payload of new beacon blocks will be extracted and injected in the Portal network as execution headers
and blocks.
> Note: The execution headers will come without a proof.
The injection into the Portal network is done via the
`portal_historyGossip` JSON-RPC endpoint of the running Fluffy node.
> Note: Backfilling of block bodies and headers is not yet supported.
Run a Fluffy node with the JSON-RPC API enabled.
@ -18,18 +54,6 @@ TRUSTED_BLOCK_ROOT=0x12345678901234567890123456789012345678901234567890123456789
./build/beacon_lc_bridge --trusted-block-root=${TRUSTED_BLOCK_ROOT}
```
The `beacon_lc_bridge` will start with the consensus light client sync follow
beacon block gossip. Once it is synced, the execution payload of new beacon
blocks will be extracted and injected in the Portal network as execution headers
and blocks.
> Note: The execution headers will come without a proof for now.
The injection into the Portal network is done via the
`portal_historyGossip` JSON-RPC endpoint of the running Fluffy node.
> Note: Backfilling of block bodies and headers is not yet supported.
## From locally stored block data
### Building and seeding epoch accumulators

View File

@ -228,7 +228,7 @@ func validateBlockHeaderBytes*(
else:
ok(header)
proc validateBlockBody(
proc validateBlockBody*(
body: PortalBlockBodyLegacy, header: BlockHeader
): Result[void, string] =
## Validate the block body against the txRoot and ommersHash from the header.
@ -245,7 +245,7 @@ proc validateBlockBody(
ok()
proc validateBlockBody(
proc validateBlockBody*(
body: PortalBlockBodyShanghai, header: BlockHeader
): Result[void, string] =
## Validate the block body against the txRoot, ommersHash and withdrawalsRoot

View File

@ -13,7 +13,7 @@
#
# Beacon Network:
#
# For the beacon network a consensus full node is require on one side,
# For the beacon network a consensus full node is required on one side,
# making use of the Beacon Node REST-API, and a Portal node on the other side,
# making use of the Portal JSON-RPC API.
#
@ -26,10 +26,17 @@
#
# Updates, optimistic updates and finality updates are injected as they become
# available.
# Updating these as better updates come available is not yet implemented.
#
# History network:
#
# To be implemented
# For the history network a execution client is required on one side, making use
# of the EL JSON-RPC API, and a Portal node on the other side, making use of the
# Portal JSON-RPC API.
#
# Portal Network <-> Portal Client (e.g. fluffy) <--Portal JSON-RPC--> bridge <--EL JSON-RPC--> execution client / web3 provider
#
# Backfilling is not yet implemented. Backfilling will make use of Era1 files.
#
# State network:
#
@ -51,8 +58,7 @@ import
../../rpc/portal_rpc_client,
../../logging,
../eth_data_exporter/cl_data_exporter,
./portal_bridge_conf,
./portal_bridge_beacon
./[portal_bridge_conf, portal_bridge_beacon, portal_bridge_history]
proc runBeacon(config: PortalBridgeConf) {.raises: [CatchableError].} =
notice "Launching Fluffy beacon chain bridge", cmdParams = commandLineParams()
@ -240,6 +246,6 @@ when isMainModule:
of PortalBridgeCmd.beacon:
runBeacon(config)
of PortalBridgeCmd.history:
notice "Functionality not yet implemented"
runHistory(config)
of PortalBridgeCmd.state:
notice "Functionality not yet implemented"

View File

@ -7,13 +7,21 @@
{.push raises: [].}
import confutils, confutils/std/net, nimcrypto/hash, ../../logging
import std/uri, confutils, confutils/std/net, nimcrypto/hash, ../../logging
export net
type
TrustedDigest* = MDigest[32 * 8]
Web3UrlKind* = enum
HttpUrl
WsUrl
Web3Url* = object
kind*: Web3UrlKind
url*: string
PortalBridgeCmd* = enum
beacon = "Run a Portal bridge for the beacon network"
history = "Run a Portal bridge for the history network"
@ -68,7 +76,13 @@ type
name: "trusted-block-root"
.}: Option[TrustedDigest]
of PortalBridgeCmd.history:
discard
web3Url* {.desc: "Execution layer JSON-RPC API URL", name: "web3-url".}: Web3Url
blockVerify* {.
desc: "Verify the block header, body and receipts",
defaultValue: false,
name: "block-verify"
.}: bool
of PortalBridgeCmd.state:
discard
@ -77,3 +91,21 @@ func parseCmdArg*(T: type TrustedDigest, input: string): T {.raises: [ValueError
func completeCmdArg*(T: type TrustedDigest, input: string): seq[string] =
return @[]
proc parseCmdArg*(T: type Web3Url, p: string): T {.raises: [ValueError].} =
let
url = parseUri(p)
normalizedScheme = url.scheme.toLowerAscii()
if (normalizedScheme == "http" or normalizedScheme == "https"):
Web3Url(kind: HttpUrl, url: p)
elif (normalizedScheme == "ws" or normalizedScheme == "wss"):
Web3Url(kind: WsUrl, url: p)
else:
raise newException(
ValueError,
"The Web3 URL must specify one of following protocols: http/https/ws/wss",
)
proc completeCmdArg*(T: type Web3Url, val: string): seq[string] =
return @[]

View File

@ -0,0 +1,312 @@
# Fluffy
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
chronos,
chronicles,
web3/[eth_api, eth_api_types],
results,
stew/byteutils,
eth/common/[eth_types, eth_types_rlp],
../../../nimbus/beacon/web3_eth_conv,
../../../hive_integration/nodocker/engine/engine_client,
../../rpc/[portal_rpc_client],
../../network/history/[history_content, history_network],
./portal_bridge_conf
from stew/objects import checkedEnumAssign
const newHeadPollInterval = 6.seconds # Slot with potential block is every 12s
## Conversion functions for Block and Receipts
func asEthBlock(blockObject: BlockObject): EthBlock =
let
header = blockObject.toBlockHeader()
transactions = toTransactions(blockObject.transactions)
withdrawals = toWithdrawals(blockObject.withdrawals)
EthBlock(header: header, txs: transactions, withdrawals: withdrawals)
func asPortalBlock(
ethBlock: EthBlock
): (BlockHeaderWithProof, PortalBlockBodyShanghai) =
var transactions: Transactions
for tx in ethBlock.txs:
discard transactions.add(TransactionByteList(rlp.encode(tx)))
var withdrawals: Withdrawals
doAssert ethBlock.withdrawals.isSome() # TODO: always the case? also when empty?
for w in ethBlock.withdrawals.get():
discard withdrawals.add(WithdrawalByteList(rlp.encode(w)))
let
headerWithProof = BlockHeaderWithProof(
header: ByteList(rlp.encode(ethBlock.header)), proof: BlockHeaderProof.init()
)
portalBody = PortalBlockBodyShanghai(
transactions: transactions, uncles: Uncles(@[byte 0xc0]), withdrawals: withdrawals
)
(headerWithProof, portalBody)
func asTxType(quantity: Option[Quantity]): Result[TxType, string] =
let value = quantity.get(0.Quantity).uint8
var txType: TxType
if not checkedEnumAssign(txType, value):
err("Invalid data for TxType: " & $value)
else:
ok(txType)
func asReceipt(receiptObject: ReceiptObject): Result[Receipt, string] =
let receiptType = asTxType(receiptObject.`type`).valueOr:
return err("Failed conversion to TxType" & error)
var logs: seq[Log]
if receiptObject.logs.len > 0:
for log in receiptObject.logs:
var topics: seq[eth_types.Topic]
for topic in log.topics:
topics.add(eth_types.Topic(topic))
logs.add(Log(address: ethAddr log.address, data: log.data, topics: topics))
let cumulativeGasUsed = receiptObject.cumulativeGasUsed.GasInt
if receiptObject.status.isSome():
let status = receiptObject.status.get().int
ok(
Receipt(
receiptType: receiptType,
isHash: false,
status: status == 1,
cumulativeGasUsed: cumulativeGasUsed,
bloom: BloomFilter(receiptObject.logsBloom),
logs: logs,
)
)
elif receiptObject.root.isSome():
ok(
Receipt(
receiptType: receiptType,
isHash: true,
hash: ethHash receiptObject.root.get(),
cumulativeGasUsed: cumulativeGasUsed,
bloom: BloomFilter(receiptObject.logsBloom),
logs: logs,
)
)
else:
err("No root nor status field in the JSON receipt object")
func asReceipts(receiptObjects: seq[ReceiptObject]): Result[seq[Receipt], string] =
var receipts: seq[Receipt]
for receiptObject in receiptObjects:
let receipt = asReceipt(receiptObject).valueOr:
return err(error)
receipts.add(receipt)
ok(receipts)
## EL JSON-RPC API helper calls for requesting block and receipts
proc getBlockByNumber(
client: RpcClient, blockTag: RtBlockIdentifier, fullTransactions: bool = true
): Future[Result[BlockObject, string]] {.async: (raises: []).} =
let blck =
try:
let res = await client.eth_getBlockByNumber(blockTag, fullTransactions)
if res.isNil:
return err("failed to get latest blockHeader")
res
except CatchableError as e:
return err("JSON-RPC eth_getBlockByNumber failed: " & e.msg)
return ok(blck)
proc getBlockReceipts(
client: RpcClient, blockNumber: uint64
): Future[Result[seq[ReceiptObject], string]] {.async: (raises: []).} =
let res =
try:
await client.eth_getBlockReceipts(blockNumber)
except CatchableError as e:
return err("JSON-RPC eth_getBlockReceipts failed: " & e.msg)
if res.isNone():
err("Failed getting receipts")
else:
ok(res.get())
## Portal JSON-RPC API helper calls for pushing block and receipts
proc gossipBlockHeader(
client: RpcClient,
hash: common_types.BlockHash,
headerWithProof: BlockHeaderWithProof,
): Future[Result[void, string]] {.async: (raises: []).} =
let
contentKey = history_content.ContentKey.init(blockHeader, hash)
encodedContentKeyHex = contentKey.encode.asSeq().toHex()
peers =
try:
await client.portal_historyGossip(
encodedContentKeyHex, SSZ.encode(headerWithProof).toHex()
)
except CatchableError as e:
return err("JSON-RPC error: " & $e.msg)
info "Block header gossiped", peers, contentKey = encodedContentKeyHex
return ok()
proc gossipBlockBody(
client: RpcClient, hash: common_types.BlockHash, body: PortalBlockBodyShanghai
): Future[Result[void, string]] {.async: (raises: []).} =
let
contentKey = history_content.ContentKey.init(blockBody, hash)
encodedContentKeyHex = contentKey.encode.asSeq().toHex()
peers =
try:
await client.portal_historyGossip(
encodedContentKeyHex, SSZ.encode(body).toHex()
)
except CatchableError as e:
return err("JSON-RPC error: " & $e.msg)
info "Block body gossiped", peers, contentKey = encodedContentKeyHex
return ok()
proc gossipReceipts(
client: RpcClient, hash: common_types.BlockHash, receipts: PortalReceipts
): Future[Result[void, string]] {.async: (raises: []).} =
let
contentKey =
history_content.ContentKey.init(history_content.ContentType.receipts, hash)
encodedContentKeyHex = contentKey.encode.asSeq().toHex()
peers =
try:
await client.portal_historyGossip(
encodedContentKeyHex, SSZ.encode(receipts).toHex()
)
except CatchableError as e:
return err("JSON-RPC error: " & $e.msg)
info "Receipts gossiped", peers, contentKey = encodedContentKeyHex
return ok()
proc runLatestLoop(
portalClient: RpcClient, web3Client: RpcClient, validate = false
) {.async: (raises: [CancelledError]).} =
## Loop that requests the latest block + receipts and pushes them into the
## Portal network.
## Current strategy is to poll for the latest block and receipts, and then
## convert the data (optionally verify it) and push it into the Portal network.
## If the EL JSON-RPC API calls fail, 1 second is waited before retrying.
## If the Portal JSON-RPC API calls fail, the error is logged and the loop
## continues.
## TODO: Might want to add retries on Portal JSON-RPC API call failures too.
## TODO: Investigate Portal side JSON-RPC error seen:
## "JSON-RPC error: Request Entity Too Large"
let blockId = blockId("latest")
var lastBlockNumber = 0'u64
while true:
let t0 = Moment.now()
let blockObject = (await getBlockByNumber(web3Client, blockId)).valueOr:
error "Failed to get latest block", error
await sleepAsync(1.seconds)
continue
let blockNumber = distinctBase(blockObject.number)
if blockNumber > lastBlockNumber:
let receiptObjects = (await web3Client.getBlockReceipts(blockNumber)).valueOr:
error "Failed to get latest receipts", error
await sleepAsync(1.seconds)
continue
let
ethBlock = blockObject.asEthBlock()
(headerWithProof, body) = ethBlock.asPortalBlock()
receipts = receiptObjects.asReceipts().valueOr:
# Note: this failure should not occur. It would mean invalid encoded
# receipts by provider
error "Error converting JSON RPC receipt objects", error
await sleepAsync(1.seconds)
continue
portalReceipts = PortalReceipts.fromReceipts(receipts)
lastBlockNumber = blockNumber
let hash = common_types.BlockHash(data: distinctBase(blockObject.hash))
if validate:
if validateBlockHeaderBytes(headerWithProof.header.asSeq(), hash).isErr():
error "Block header is invalid"
continue
if validateBlockBody(body, ethBlock.header).isErr():
error "Block body is invalid"
continue
if validateReceipts(portalReceipts, ethBlock.header.receiptRoot).isErr():
error "Receipts root is invalid"
continue
# gossip block header
(await portalClient.gossipBlockHeader(hash, headerWithProof)).isOkOr:
error "Failed to gossip block header", error
# For bodies & receipts to get verified, the header needs to be available
# on the network. Wait a little to get the headers propagated through
# the network.
await sleepAsync(2.seconds)
# gossip block body
(await portalClient.gossipBlockBody(hash, body)).isOkOr:
error "Failed to gossip block body", error
# gossip receipts
(await portalClient.gossipReceipts(hash, portalReceipts)).isOkOr:
error "Failed to gossip receipts", error
# Making sure here that we poll enough times not to miss a block.
# We could also do some work without awaiting it, e.g. the gossiping or
# the requesting the receipts during the sleep time. But we also want to
# avoid creating a backlog of requests or gossip.
let t1 = Moment.now()
let elapsed = t1 - t0
if elapsed < newHeadPollInterval:
await sleepAsync(newHeadPollInterval - elapsed)
else:
warn "Block gossip took longer than the poll interval"
proc runHistory*(config: PortalBridgeConf) =
let
portalClient = newRpcHttpClient()
# TODO: Use Web3 object?
web3Client: RpcClient =
case config.web3Url.kind
of HttpUrl:
newRpcHttpClient()
of WsUrl:
newRpcWebSocketClient()
try:
waitFor portalClient.connect(config.rpcAddress, Port(config.rpcPort), false)
except CatchableError as e:
error "Failed to connect to portal RPC", error = $e.msg
if config.web3Url.kind == HttpUrl:
try:
waitFor (RpcHttpClient(web3Client)).connect(config.web3Url.url)
except CatchableError as e:
error "Failed to connect to web3 RPC", error = $e.msg
asyncSpawn runLatestLoop(portalClient, web3Client, config.blockVerify)
while true:
poll()

View File

@ -236,7 +236,7 @@ proc maybeInt(n: Option[Quantity]): Option[int] =
return none(int)
some(n.get.int)
proc toBlockHeader(bc: BlockObject): common.BlockHeader =
proc toBlockHeader*(bc: BlockObject): common.BlockHeader =
common.BlockHeader(
blockNumber : toBlockNumber(bc.number),
parentHash : ethHash bc.parentHash,
@ -299,7 +299,7 @@ proc toTransaction(tx: TransactionObject): Transaction =
S : tx.s,
)
proc toTransactions(txs: openArray[TxOrHash]): seq[Transaction] =
proc toTransactions*(txs: openArray[TxOrHash]): seq[Transaction] =
for x in txs:
doAssert x.kind == tohTx
result.add toTransaction(x.tx)
@ -317,7 +317,7 @@ proc toWithdrawals(list: seq[WithdrawalObject]): seq[Withdrawal] =
for wd in list:
result.add toWithdrawal(wd)
proc toWithdrawals(list: Option[seq[WithdrawalObject]]): Option[seq[Withdrawal]] =
proc toWithdrawals*(list: Option[seq[WithdrawalObject]]): Option[seq[Withdrawal]] =
if list.isNone:
return none(seq[Withdrawal])
some(toWithdrawals(list.get))