nimbus-eth1/fluffy/tools/portal_bridge/portal_bridge_beacon.nim
Kim De Mey 7e2a636717
Add basic validation for LC bootstraps + portal_bridge changes (#2527)
- Add basic validation for LC bootstrap gossip, validating either
by trusted block root (only 1) when not synced, or by comparing
with the header of the latest finality update when synced.

- Update portal_bridge beacon to also gossip bootstraps into the
network on each end of epoch.
2024-07-25 20:15:26 +02:00

457 lines
16 KiB
Nim

# Fluffy
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/os,
chronos,
chronicles,
chronicles/topics_registry,
stew/byteutils,
eth/async_utils,
json_rpc/clients/httpclient,
beacon_chain/spec/eth2_apis/rest_beacon_client,
../../network/beacon/beacon_content,
../../rpc/portal_rpc_client,
../eth_data_exporter/cl_data_exporter,
./[portal_bridge_conf, portal_bridge_common]
const
largeRequestsTimeout = 120.seconds # For downloading large items such as states.
restRequestsTimeout = 30.seconds
# TODO: From nimbus_binary_common, but we don't want to import that.
proc sleepAsync(t: TimeDiff): Future[void] =
sleepAsync(nanoseconds(if t.nanoseconds < 0: 0'i64 else: t.nanoseconds))
proc gossipLCBootstrapUpdate(
restClient: RestClientRef,
portalRpcClient: RpcClient,
trustedBlockRoot: Eth2Digest,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
): Future[Result[void, string]] {.async.} =
var bootstrap =
try:
info "Downloading LC bootstrap"
awaitWithTimeout(
restClient.getLightClientBootstrap(trustedBlockRoot, cfg, forkDigests),
restRequestsTimeout,
):
return err("Attempt to download LC bootstrap timed out")
except CatchableError as exc:
return err("Unable to download LC bootstrap: " & exc.msg)
withForkyObject(bootstrap):
when lcDataFork > LightClientDataFork.None:
let
slot = forkyObject.header.beacon.slot
contentKey = encode(bootstrapContentKey(trustedBlockRoot))
forkDigest = forkDigestAtEpoch(forkDigests[], epoch(slot), cfg)
content = encodeBootstrapForked(forkDigest, bootstrap)
proc GossipRpcAndClose(): Future[Result[void, string]] {.async.} =
try:
let
contentKeyHex = contentKey.asSeq().toHex()
peers = await portalRpcClient.portal_beaconRandomGossip(
contentKeyHex, content.toHex()
)
info "Beacon LC bootstrap gossiped", peers, contentKey = contentKeyHex
return ok()
except CatchableError as e:
return err("JSON-RPC error: " & $e.msg)
let res = await GossipRpcAndClose()
if res.isOk():
return ok()
else:
return err(res.error)
else:
return err("No LC bootstraps pre Altair")
proc gossipLCUpdates(
restClient: RestClientRef,
portalRpcClient: RpcClient,
startPeriod: uint64,
count: uint64,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
): Future[Result[void, string]] {.async.} =
var updates =
try:
info "Downloading LC updates", count
awaitWithTimeout(
restClient.getLightClientUpdatesByRange(
SyncCommitteePeriod(startPeriod), count, cfg, forkDigests
),
restRequestsTimeout,
):
return err("Attempt to download LC updates timed out")
except CatchableError as exc:
return err("Unable to download LC updates: " & exc.msg)
if updates.len() > 0:
withForkyObject(updates[0]):
when lcDataFork > LightClientDataFork.None:
let
slot = forkyObject.attested_header.beacon.slot
period = slot.sync_committee_period
contentKey = encode(updateContentKey(period.uint64, count))
forkDigest = forkDigestAtEpoch(forkDigests[], epoch(slot), cfg)
content = encodeLightClientUpdatesForked(forkDigest, updates)
proc GossipRpcAndClose(): Future[Result[void, string]] {.async.} =
try:
let
contentKeyHex = contentKey.asSeq().toHex()
peers = await portalRpcClient.portal_beaconRandomGossip(
contentKeyHex, content.toHex()
)
info "Beacon LC update gossiped",
peers, contentKey = contentKeyHex, period, count
return ok()
except CatchableError as e:
return err("JSON-RPC error: " & $e.msg)
let res = await GossipRpcAndClose()
if res.isOk():
return ok()
else:
return err(res.error)
else:
return err("No LC updates pre Altair")
else:
# TODO:
# currently only error if no updates at all found. This might be due
# to selecting future period or too old period.
# Might want to error here in case count != updates.len or might not want to
# error at all and perhaps return the updates.len.
return err("No updates downloaded")
proc gossipLCFinalityUpdate(
restClient: RestClientRef,
portalRpcClient: RpcClient,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
): Future[Result[(Slot, Eth2Digest), string]] {.async.} =
var update =
try:
info "Downloading LC finality update"
awaitWithTimeout(
restClient.getLightClientFinalityUpdate(cfg, forkDigests), restRequestsTimeout
):
return err("Attempt to download LC finality update timed out")
except CatchableError as exc:
return err("Unable to download LC finality update: " & exc.msg)
withForkyObject(update):
when lcDataFork > LightClientDataFork.None:
let
finalizedSlot = forkyObject.finalized_header.beacon.slot
blockRoot = hash_tree_root(forkyObject.finalized_header.beacon)
contentKey = encode(finalityUpdateContentKey(finalizedSlot.uint64))
forkDigest = forkDigestAtEpoch(
forkDigests[], epoch(forkyObject.attested_header.beacon.slot), cfg
)
content = encodeFinalityUpdateForked(forkDigest, update)
proc GossipRpcAndClose(): Future[Result[void, string]] {.async.} =
try:
let
contentKeyHex = contentKey.asSeq().toHex()
peers = await portalRpcClient.portal_beaconRandomGossip(
contentKeyHex, content.toHex()
)
info "Beacon LC finality update gossiped",
peers, contentKey = contentKeyHex, finalizedSlot
return ok()
except CatchableError as e:
return err("JSON-RPC error: " & $e.msg)
let res = await GossipRpcAndClose()
if res.isOk():
return ok((finalizedSlot, blockRoot))
else:
return err(res.error)
else:
return err("No LC updates pre Altair")
proc gossipLCOptimisticUpdate(
restClient: RestClientRef,
portalRpcClient: RpcClient,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
): Future[Result[Slot, string]] {.async.} =
var update =
try:
info "Downloading LC optimistic update"
awaitWithTimeout(
restClient.getLightClientOptimisticUpdate(cfg, forkDigests), restRequestsTimeout
):
return err("Attempt to download LC optimistic update timed out")
except CatchableError as exc:
return err("Unable to download LC optimistic update: " & exc.msg)
withForkyObject(update):
when lcDataFork > LightClientDataFork.None:
let
slot = forkyObject.signature_slot
contentKey = encode(optimisticUpdateContentKey(slot.uint64))
forkDigest = forkDigestAtEpoch(
forkDigests[], epoch(forkyObject.attested_header.beacon.slot), cfg
)
content = encodeOptimisticUpdateForked(forkDigest, update)
proc GossipRpcAndClose(): Future[Result[void, string]] {.async.} =
try:
let
contentKeyHex = contentKey.asSeq().toHex()
peers = await portalRpcClient.portal_beaconRandomGossip(
contentKeyHex, content.toHex()
)
info "Beacon LC optimistic update gossiped",
peers, contentKey = contentKeyHex, slot
return ok()
except CatchableError as e:
return err("JSON-RPC error: " & $e.msg)
let res = await GossipRpcAndClose()
if res.isOk():
return ok(slot)
else:
return err(res.error)
else:
return err("No LC updates pre Altair")
proc gossipHistoricalSummaries(
restClient: RestClientRef,
portalRpcClient: RpcClient,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
): Future[Result[void, string]] {.async.} =
let state =
try:
notice "Downloading beacon state"
awaitWithTimeout(
restClient.getStateV2(StateIdent.init(StateIdentType.Finalized), cfg),
largeRequestsTimeout,
):
return err("Attempt to download beacon state timed out")
except CatchableError as exc:
return err("Unable to download beacon state: " & exc.msg)
if state == nil:
return err("No beacon state found")
withState(state[]):
when consensusFork >= ConsensusFork.Capella:
let
historical_summaries = forkyState.data.historical_summaries
proof = ?buildProof(state[])
epoch = forkyState.data.slot.epoch()
forkDigest = forkDigestAtEpoch(forkDigests[], epoch, cfg)
summariesWithProof = HistoricalSummariesWithProof(
epoch: epoch, historical_summaries: historical_summaries, proof: proof
)
contentKey = encode(historicalSummariesContentKey(epoch.uint64))
content = encodeSsz(summariesWithProof, forkDigest)
try:
let peers = await portalRpcClient.portal_beaconRandomGossip(
contentKey.asSeq().toHex(), content.toHex()
)
info "Beacon historical_summaries gossiped", peers, epoch
return ok()
except CatchableError as e:
return err("JSON-RPC error: " & $e.msg)
else:
return err("No historical_summaries pre Capella")
proc runBeacon*(config: PortalBridgeConf) {.raises: [CatchableError].} =
notice "Launching Fluffy beacon chain bridge", cmdParams = commandLineParams()
let
(cfg, forkDigests, beaconClock) = getBeaconData()
getBeaconTime = beaconClock.getBeaconTimeFn()
portalRpcClient = newRpcClientConnect(config.portalRpcUrl)
restClient = RestClientRef.new(config.restUrl).valueOr:
fatal "Cannot connect to server", error = $error
quit QuitFailure
proc backfill(
beaconRestClient: RestClientRef,
portalRpcClient: RpcClient,
backfillAmount: uint64,
trustedBlockRoot: Option[TrustedDigest],
) {.async.} =
# Bootstrap backfill, currently just one bootstrap selected by
# trusted-block-root, could become a selected list, or some other way.
if trustedBlockRoot.isSome():
let res = await gossipLCBootstrapUpdate(
beaconRestClient, portalRpcClient, trustedBlockRoot.get(), cfg, forkDigests
)
if res.isErr():
warn "Error gossiping LC bootstrap", error = res.error
await portalRpcClient.close()
# Updates backfill, selected by backfillAmount
# Might want to alter this to default backfill to the
# `MIN_EPOCHS_FOR_BLOCK_REQUESTS`.
# TODO: This can be up to 128, but our JSON-RPC requests fail with a value
# higher than 16. TBI
const updatesPerRequest = 16
let
wallSlot = getBeaconTime().slotOrZero()
currentPeriod = wallSlot div (SLOTS_PER_EPOCH * EPOCHS_PER_SYNC_COMMITTEE_PERIOD)
requestAmount = backfillAmount div updatesPerRequest
leftOver = backfillAmount mod updatesPerRequest
for i in 0 ..< requestAmount:
let res = await gossipLCUpdates(
beaconRestClient,
portalRpcClient,
currentPeriod - updatesPerRequest * (i + 1) + 1,
updatesPerRequest,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC updates", error = res.error
await portalRpcClient.close()
if leftOver > 0:
let res = await gossipLCUpdates(
beaconRestClient,
portalRpcClient,
currentPeriod - updatesPerRequest * requestAmount - leftOver + 1,
leftOver,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC updates", error = res.error
await portalRpcClient.close()
var
lastOptimisticUpdateSlot = Slot(0)
lastFinalityUpdateEpoch = epoch(lastOptimisticUpdateSlot)
lastUpdatePeriod = sync_committee_period(lastOptimisticUpdateSlot)
proc onSlotGossip(wallTime: BeaconTime, lastSlot: Slot) {.async.} =
let
wallSlot = wallTime.slotOrZero()
wallEpoch = epoch(wallSlot)
wallPeriod = sync_committee_period(wallSlot)
notice "Slot start info",
slot = wallSlot,
epoch = wallEpoch,
period = wallPeriod,
lastOptimisticUpdateSlot,
lastFinalityUpdateEpoch,
lastUpdatePeriod,
slotsTillNextEpoch = SLOTS_PER_EPOCH - (wallSlot mod SLOTS_PER_EPOCH),
slotsTillNextPeriod =
SLOTS_PER_SYNC_COMMITTEE_PERIOD - (wallSlot mod SLOTS_PER_SYNC_COMMITTEE_PERIOD)
if wallSlot > lastOptimisticUpdateSlot + 1:
# TODO: If this turns out to be too tricky to not gossip old updates,
# then an alternative could be to verify in the gossip calls if the actual
# slot number received is the correct one, before gossiping into Portal.
# And/or look into possibly using eth/v1/events for
# light_client_finality_update and light_client_optimistic_update if that
# is something that works.
# Or basically `lightClientOptimisticUpdateSlotOffset`
await sleepAsync((SECONDS_PER_SLOT div INTERVALS_PER_SLOT).int.seconds)
let res =
await gossipLCOptimisticUpdate(restClient, portalRpcClient, cfg, forkDigests)
if res.isErr():
warn "Error gossiping LC optimistic update", error = res.error
else:
if wallEpoch > lastFinalityUpdateEpoch + 2 and wallSlot > start_slot(wallEpoch):
let res =
await gossipLCFinalityUpdate(restClient, portalRpcClient, cfg, forkDigests)
if res.isErr():
warn "Error gossiping LC finality update", error = res.error
else:
let (slot, blockRoot) = res.value()
lastFinalityUpdateEpoch = epoch(slot)
let res = await gossipLCBootstrapUpdate(
restClient, portalRpcClient, blockRoot, cfg, forkDigests
)
if res.isErr():
warn "Error gossiping LC bootstrap", error = res.error
let res2 = await gossipHistoricalSummaries(
restClient, portalRpcClient, cfg, forkDigests
)
if res2.isErr():
warn "Error gossiping historical summaries", error = res.error
if wallPeriod > lastUpdatePeriod and wallSlot > start_slot(wallEpoch):
# TODO: Need to delay timing here also with one slot?
let res = await gossipLCUpdates(
restClient,
portalRpcClient,
sync_committee_period(wallSlot).uint64,
1,
cfg,
forkDigests,
)
if res.isErr():
warn "Error gossiping LC update", error = res.error
else:
lastUpdatePeriod = wallPeriod
lastOptimisticUpdateSlot = res.get()
proc runOnSlotLoop() {.async.} =
var
curSlot = getBeaconTime().slotOrZero()
nextSlot = curSlot + 1
timeToNextSlot = nextSlot.start_beacon_time() - getBeaconTime()
while true:
await sleepAsync(timeToNextSlot)
let
wallTime = getBeaconTime()
wallSlot = wallTime.slotOrZero()
await onSlotGossip(wallTime, curSlot)
curSlot = wallSlot
nextSlot = wallSlot + 1
timeToNextSlot = nextSlot.start_beacon_time() - getBeaconTime()
waitFor backfill(
restClient, portalRpcClient, config.backfillAmount, config.trustedBlockRoot
)
asyncSpawn runOnSlotLoop()
while true:
poll()