Move portal bridge beacon code and add skeleton for state bridge (#2101)

This commit is contained in:
Kim De Mey 2024-03-22 19:01:22 +01:00 committed by GitHub
parent 889a1165b0
commit 1c98733d4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 236 additions and 193 deletions

View File

@ -46,194 +46,13 @@
{.push raises: [].} {.push raises: [].}
import import
std/os,
chronos, chronos,
confutils, confutils,
confutils/std/net, confutils/std/net,
chronicles,
chronicles/topics_registry,
json_rpc/clients/httpclient,
beacon_chain/spec/eth2_apis/rest_beacon_client,
../../network/beacon/beacon_content,
../../rpc/portal_rpc_client,
../../logging, ../../logging,
../eth_data_exporter/cl_data_exporter, ./[
./[portal_bridge_conf, portal_bridge_beacon, portal_bridge_history] portal_bridge_conf, portal_bridge_beacon, portal_bridge_history, portal_bridge_state
]
proc runBeacon(config: PortalBridgeConf) {.raises: [CatchableError].} =
notice "Launching Fluffy beacon chain bridge", cmdParams = commandLineParams()
let
(cfg, forkDigests, beaconClock) = getBeaconData()
getBeaconTime = beaconClock.getBeaconTimeFn()
portalRpcClient = newRpcHttpClient()
restClient = RestClientRef.new(config.restUrl).valueOr:
fatal "Cannot connect to server", error = $error
quit QuitFailure
proc backfill(
beaconRestClient: RestClientRef,
rpcAddress: string,
rpcPort: Port,
backfillAmount: uint64,
trustedBlockRoot: Option[TrustedDigest],
) {.async.} =
# Bootstrap backfill, currently just one bootstrap selected by
# trusted-block-root, could become a selected list, or some other way.
if trustedBlockRoot.isSome():
await portalRpcClient.connect(rpcAddress, rpcPort, false)
let res = await gossipLCBootstrapUpdate(
beaconRestClient, portalRpcClient, trustedBlockRoot.get(), cfg, forkDigests
)
if res.isErr():
warn "Error gossiping LC bootstrap", error = res.error
await portalRpcClient.close()
# Updates backfill, selected by backfillAmount
# Might want to alter this to default backfill to the
# `MIN_EPOCHS_FOR_BLOCK_REQUESTS`.
# TODO: This can be up to 128, but our JSON-RPC requests fail with a value
# higher than 16. TBI
const updatesPerRequest = 16
let
wallSlot = getBeaconTime().slotOrZero()
currentPeriod = wallSlot div (SLOTS_PER_EPOCH * EPOCHS_PER_SYNC_COMMITTEE_PERIOD)
requestAmount = backfillAmount div updatesPerRequest
leftOver = backfillAmount mod updatesPerRequest
for i in 0 ..< requestAmount:
await portalRpcClient.connect(rpcAddress, rpcPort, false)
let res = await gossipLCUpdates(
beaconRestClient,
portalRpcClient,
currentPeriod - updatesPerRequest * (i + 1) + 1,
updatesPerRequest,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC updates", error = res.error
await portalRpcClient.close()
if leftOver > 0:
await portalRpcClient.connect(rpcAddress, rpcPort, false)
let res = await gossipLCUpdates(
beaconRestClient,
portalRpcClient,
currentPeriod - updatesPerRequest * requestAmount - leftOver + 1,
leftOver,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC updates", error = res.error
await portalRpcClient.close()
var
lastOptimisticUpdateSlot = Slot(0)
lastFinalityUpdateEpoch = epoch(lastOptimisticUpdateSlot)
lastUpdatePeriod = sync_committee_period(lastOptimisticUpdateSlot)
proc onSlotGossip(wallTime: BeaconTime, lastSlot: Slot) {.async.} =
let
wallSlot = wallTime.slotOrZero()
wallEpoch = epoch(wallSlot)
wallPeriod = sync_committee_period(wallSlot)
notice "Slot start info",
slot = wallSlot,
epoch = wallEpoch,
period = wallPeriod,
lastOptimisticUpdateSlot,
lastFinalityUpdateEpoch,
lastUpdatePeriod,
slotsTillNextEpoch = SLOTS_PER_EPOCH - (wallSlot mod SLOTS_PER_EPOCH),
slotsTillNextPeriod =
SLOTS_PER_SYNC_COMMITTEE_PERIOD - (wallSlot mod SLOTS_PER_SYNC_COMMITTEE_PERIOD)
if wallSlot > lastOptimisticUpdateSlot + 1:
# TODO: If this turns out to be too tricky to not gossip old updates,
# then an alternative could be to verify in the gossip calls if the actual
# slot number received is the correct one, before gossiping into Portal.
# And/or look into possibly using eth/v1/events for
# light_client_finality_update and light_client_optimistic_update if that
# is something that works.
# Or basically `lightClientOptimisticUpdateSlotOffset`
await sleepAsync((SECONDS_PER_SLOT div INTERVALS_PER_SLOT).int.seconds)
await portalRpcClient.connect(config.rpcAddress, Port(config.rpcPort), false)
let res =
await gossipLCOptimisticUpdate(restClient, portalRpcClient, cfg, forkDigests)
if res.isErr():
warn "Error gossiping LC optimistic update", error = res.error
else:
if wallEpoch > lastFinalityUpdateEpoch + 2 and wallSlot > start_slot(wallEpoch):
let res =
await gossipLCFinalityUpdate(restClient, portalRpcClient, cfg, forkDigests)
if res.isErr():
warn "Error gossiping LC finality update", error = res.error
else:
lastFinalityUpdateEpoch = epoch(res.get())
if wallPeriod > lastUpdatePeriod and wallSlot > start_slot(wallEpoch):
# TODO: Need to delay timing here also with one slot?
let res = await gossipLCUpdates(
restClient,
portalRpcClient,
sync_committee_period(wallSlot).uint64,
1,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC update", error = res.error
else:
lastUpdatePeriod = wallPeriod
lastOptimisticUpdateSlot = res.get()
proc runOnSlotLoop() {.async.} =
var
curSlot = getBeaconTime().slotOrZero()
nextSlot = curSlot + 1
timeToNextSlot = nextSlot.start_beacon_time() - getBeaconTime()
while true:
await sleepAsync(timeToNextSlot)
let
wallTime = getBeaconTime()
wallSlot = wallTime.slotOrZero()
await onSlotGossip(wallTime, curSlot)
curSlot = wallSlot
nextSlot = wallSlot + 1
timeToNextSlot = nextSlot.start_beacon_time() - getBeaconTime()
waitFor backfill(
restClient, config.rpcAddress, config.rpcPort, config.backfillAmount,
config.trustedBlockRoot,
)
asyncSpawn runOnSlotLoop()
while true:
poll()
when isMainModule: when isMainModule:
{.pop.} {.pop.}
@ -248,4 +67,4 @@ when isMainModule:
of PortalBridgeCmd.history: of PortalBridgeCmd.history:
runHistory(config) runHistory(config)
of PortalBridgeCmd.state: of PortalBridgeCmd.state:
notice "Functionality not yet implemented" runState(config)

View File

@ -8,6 +8,7 @@
{.push raises: [].} {.push raises: [].}
import import
std/os,
chronos, chronos,
chronicles, chronicles,
chronicles/topics_registry, chronicles/topics_registry,
@ -17,15 +18,16 @@ import
beacon_chain/spec/eth2_apis/rest_beacon_client, beacon_chain/spec/eth2_apis/rest_beacon_client,
../../network/beacon/beacon_content, ../../network/beacon/beacon_content,
../../rpc/portal_rpc_client, ../../rpc/portal_rpc_client,
../eth_data_exporter/cl_data_exporter ../eth_data_exporter/cl_data_exporter,
./portal_bridge_conf
const restRequestsTimeout = 30.seconds const restRequestsTimeout = 30.seconds
# TODO: From nimbus_binary_common, but we don't want to import that. # TODO: From nimbus_binary_common, but we don't want to import that.
proc sleepAsync*(t: TimeDiff): Future[void] = proc sleepAsync(t: TimeDiff): Future[void] =
sleepAsync(nanoseconds(if t.nanoseconds < 0: 0'i64 else: t.nanoseconds)) sleepAsync(nanoseconds(if t.nanoseconds < 0: 0'i64 else: t.nanoseconds))
proc gossipLCBootstrapUpdate*( proc gossipLCBootstrapUpdate(
restClient: RestClientRef, restClient: RestClientRef,
portalRpcClient: RpcHttpClient, portalRpcClient: RpcHttpClient,
trustedBlockRoot: Eth2Digest, trustedBlockRoot: Eth2Digest,
@ -71,7 +73,7 @@ proc gossipLCBootstrapUpdate*(
else: else:
return err("No LC bootstraps pre Altair") return err("No LC bootstraps pre Altair")
proc gossipLCUpdates*( proc gossipLCUpdates(
restClient: RestClientRef, restClient: RestClientRef,
portalRpcClient: RpcHttpClient, portalRpcClient: RpcHttpClient,
startPeriod: uint64, startPeriod: uint64,
@ -131,7 +133,7 @@ proc gossipLCUpdates*(
# error at all and perhaps return the updates.len. # error at all and perhaps return the updates.len.
return err("No updates downloaded") return err("No updates downloaded")
proc gossipLCFinalityUpdate*( proc gossipLCFinalityUpdate(
restClient: RestClientRef, restClient: RestClientRef,
portalRpcClient: RpcHttpClient, portalRpcClient: RpcHttpClient,
cfg: RuntimeConfig, cfg: RuntimeConfig,
@ -178,7 +180,7 @@ proc gossipLCFinalityUpdate*(
else: else:
return err("No LC updates pre Altair") return err("No LC updates pre Altair")
proc gossipLCOptimisticUpdate*( proc gossipLCOptimisticUpdate(
restClient: RestClientRef, restClient: RestClientRef,
portalRpcClient: RpcHttpClient, portalRpcClient: RpcHttpClient,
cfg: RuntimeConfig, cfg: RuntimeConfig,
@ -225,3 +227,178 @@ proc gossipLCOptimisticUpdate*(
return err(res.error) return err(res.error)
else: else:
return err("No LC updates pre Altair") return err("No LC updates pre Altair")
proc runBeacon*(config: PortalBridgeConf) {.raises: [CatchableError].} =
notice "Launching Fluffy beacon chain bridge", cmdParams = commandLineParams()
let
(cfg, forkDigests, beaconClock) = getBeaconData()
getBeaconTime = beaconClock.getBeaconTimeFn()
portalRpcClient = newRpcHttpClient()
restClient = RestClientRef.new(config.restUrl).valueOr:
fatal "Cannot connect to server", error = $error
quit QuitFailure
proc backfill(
beaconRestClient: RestClientRef,
rpcAddress: string,
rpcPort: Port,
backfillAmount: uint64,
trustedBlockRoot: Option[TrustedDigest],
) {.async.} =
# Bootstrap backfill, currently just one bootstrap selected by
# trusted-block-root, could become a selected list, or some other way.
if trustedBlockRoot.isSome():
await portalRpcClient.connect(rpcAddress, rpcPort, false)
let res = await gossipLCBootstrapUpdate(
beaconRestClient, portalRpcClient, trustedBlockRoot.get(), cfg, forkDigests
)
if res.isErr():
warn "Error gossiping LC bootstrap", error = res.error
await portalRpcClient.close()
# Updates backfill, selected by backfillAmount
# Might want to alter this to default backfill to the
# `MIN_EPOCHS_FOR_BLOCK_REQUESTS`.
# TODO: This can be up to 128, but our JSON-RPC requests fail with a value
# higher than 16. TBI
const updatesPerRequest = 16
let
wallSlot = getBeaconTime().slotOrZero()
currentPeriod = wallSlot div (SLOTS_PER_EPOCH * EPOCHS_PER_SYNC_COMMITTEE_PERIOD)
requestAmount = backfillAmount div updatesPerRequest
leftOver = backfillAmount mod updatesPerRequest
for i in 0 ..< requestAmount:
await portalRpcClient.connect(rpcAddress, rpcPort, false)
let res = await gossipLCUpdates(
beaconRestClient,
portalRpcClient,
currentPeriod - updatesPerRequest * (i + 1) + 1,
updatesPerRequest,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC updates", error = res.error
await portalRpcClient.close()
if leftOver > 0:
await portalRpcClient.connect(rpcAddress, rpcPort, false)
let res = await gossipLCUpdates(
beaconRestClient,
portalRpcClient,
currentPeriod - updatesPerRequest * requestAmount - leftOver + 1,
leftOver,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC updates", error = res.error
await portalRpcClient.close()
var
lastOptimisticUpdateSlot = Slot(0)
lastFinalityUpdateEpoch = epoch(lastOptimisticUpdateSlot)
lastUpdatePeriod = sync_committee_period(lastOptimisticUpdateSlot)
proc onSlotGossip(wallTime: BeaconTime, lastSlot: Slot) {.async.} =
let
wallSlot = wallTime.slotOrZero()
wallEpoch = epoch(wallSlot)
wallPeriod = sync_committee_period(wallSlot)
notice "Slot start info",
slot = wallSlot,
epoch = wallEpoch,
period = wallPeriod,
lastOptimisticUpdateSlot,
lastFinalityUpdateEpoch,
lastUpdatePeriod,
slotsTillNextEpoch = SLOTS_PER_EPOCH - (wallSlot mod SLOTS_PER_EPOCH),
slotsTillNextPeriod =
SLOTS_PER_SYNC_COMMITTEE_PERIOD - (wallSlot mod SLOTS_PER_SYNC_COMMITTEE_PERIOD)
if wallSlot > lastOptimisticUpdateSlot + 1:
# TODO: If this turns out to be too tricky to not gossip old updates,
# then an alternative could be to verify in the gossip calls if the actual
# slot number received is the correct one, before gossiping into Portal.
# And/or look into possibly using eth/v1/events for
# light_client_finality_update and light_client_optimistic_update if that
# is something that works.
# Or basically `lightClientOptimisticUpdateSlotOffset`
await sleepAsync((SECONDS_PER_SLOT div INTERVALS_PER_SLOT).int.seconds)
await portalRpcClient.connect(config.rpcAddress, Port(config.rpcPort), false)
let res =
await gossipLCOptimisticUpdate(restClient, portalRpcClient, cfg, forkDigests)
if res.isErr():
warn "Error gossiping LC optimistic update", error = res.error
else:
if wallEpoch > lastFinalityUpdateEpoch + 2 and wallSlot > start_slot(wallEpoch):
let res =
await gossipLCFinalityUpdate(restClient, portalRpcClient, cfg, forkDigests)
if res.isErr():
warn "Error gossiping LC finality update", error = res.error
else:
lastFinalityUpdateEpoch = epoch(res.get())
if wallPeriod > lastUpdatePeriod and wallSlot > start_slot(wallEpoch):
# TODO: Need to delay timing here also with one slot?
let res = await gossipLCUpdates(
restClient,
portalRpcClient,
sync_committee_period(wallSlot).uint64,
1,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC update", error = res.error
else:
lastUpdatePeriod = wallPeriod
lastOptimisticUpdateSlot = res.get()
proc runOnSlotLoop() {.async.} =
var
curSlot = getBeaconTime().slotOrZero()
nextSlot = curSlot + 1
timeToNextSlot = nextSlot.start_beacon_time() - getBeaconTime()
while true:
await sleepAsync(timeToNextSlot)
let
wallTime = getBeaconTime()
wallSlot = wallTime.slotOrZero()
await onSlotGossip(wallTime, curSlot)
curSlot = wallSlot
nextSlot = wallSlot + 1
timeToNextSlot = nextSlot.start_beacon_time() - getBeaconTime()
waitFor backfill(
restClient, config.rpcAddress, config.rpcPort, config.backfillAmount,
config.trustedBlockRoot,
)
asyncSpawn runOnSlotLoop()
while true:
poll()

View File

@ -7,7 +7,8 @@
{.push raises: [].} {.push raises: [].}
import std/[os, uri], confutils, confutils/std/net, nimcrypto/hash, ../../logging import
std/[strutils, os, uri], confutils, confutils/std/net, nimcrypto/hash, ../../logging
export net export net
@ -119,7 +120,8 @@ type
name: "era1-dir" name: "era1-dir"
.}: InputDir .}: InputDir
of PortalBridgeCmd.state: of PortalBridgeCmd.state:
discard web3UrlState* {.desc: "Execution layer JSON-RPC API URL", name: "web3-url".}:
Web3Url
func parseCmdArg*(T: type TrustedDigest, input: string): T {.raises: [ValueError].} = func parseCmdArg*(T: type TrustedDigest, input: string): T {.raises: [ValueError].} =
TrustedDigest.fromHex(input) TrustedDigest.fromHex(input)

View File

@ -0,0 +1,45 @@
# Fluffy
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import chronos, chronicles, ../../rpc/portal_rpc_client, ./portal_bridge_conf
proc runState*(config: PortalBridgeConf) =
let
portalClient = newRpcHttpClient()
# TODO: Use Web3 object?
web3Client: RpcClient =
case config.web3UrlState.kind
of HttpUrl:
newRpcHttpClient()
of WsUrl:
newRpcWebSocketClient()
try:
waitFor portalClient.connect(config.rpcAddress, Port(config.rpcPort), false)
except CatchableError as e:
error "Failed to connect to portal RPC", error = $e.msg
if config.web3UrlState.kind == HttpUrl:
try:
waitFor (RpcHttpClient(web3Client)).connect(config.web3UrlState.url)
except CatchableError as e:
error "Failed to connect to web3 RPC", error = $e.msg
# TODO:
# Here we'd want to implement initially a loop that backfills the state
# content. Secondly, a loop that follows the head and injects the latest
# state changes too.
#
# The first step would probably be the easier one to start with, as one
# can start from genesis state.
# It could be implemented by using the `exp_getProofsByBlockNumber` JSON-RPC
# method from nimbus-eth1.
# It could also be implemented by having the whole state execution happening
# inside the bridge, and getting the blocks from era1 files.
notice "State bridge functionality not yet implemented"
quit QuitSuccess