362 lines
13 KiB
Nim
362 lines
13 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/os,
|
|
chronicles, chronos, stew/io2,
|
|
eth/db/kvstore_sqlite3,
|
|
./el/el_manager,
|
|
./gossip_processing/optimistic_processor,
|
|
./networking/[topic_params, network_metadata_downloads],
|
|
./spec/beaconstate,
|
|
./spec/datatypes/[phase0, altair, bellatrix, capella, deneb],
|
|
"."/[filepath, light_client, light_client_db, nimbus_binary_common, version]
|
|
|
|
from ./gossip_processing/block_processor import newExecutionPayload
|
|
from ./gossip_processing/eth2_processor import toValidationResult
|
|
|
|
# this needs to be global, so it can be set in the Ctrl+C signal handler
|
|
var globalRunning = true
|
|
|
|
programMain:
|
|
## Ctrl+C handling
|
|
proc controlCHandler() {.noconv.} =
|
|
when defined(windows):
|
|
# workaround for https://github.com/nim-lang/Nim/issues/4057
|
|
try:
|
|
setupForeignThreadGc()
|
|
except Exception as exc: raiseAssert exc.msg # shouldn't happen
|
|
notice "Shutting down after having received SIGINT"
|
|
globalRunning = false
|
|
try:
|
|
setControlCHook(controlCHandler)
|
|
except Exception as exc: # TODO Exception
|
|
warn "Cannot set ctrl-c handler", msg = exc.msg
|
|
|
|
var config = makeBannerAndConfig(
|
|
"Nimbus light client " & fullVersionStr, LightClientConf)
|
|
setupLogging(config.logLevel, config.logStdout, config.logFile)
|
|
|
|
notice "Launching light client",
|
|
version = fullVersionStr, cmdParams = commandLineParams(), config
|
|
|
|
let dbDir = config.databaseDir
|
|
if (let res = secureCreatePath(dbDir); res.isErr):
|
|
fatal "Failed to create create database directory",
|
|
path = dbDir, err = ioErrorMsg(res.error)
|
|
quit 1
|
|
let backend = SqStoreRef.init(dbDir, "nlc").expect("Database OK")
|
|
defer: backend.close()
|
|
let db = backend.initLightClientDB(LightClientDBNames(
|
|
legacyAltairHeaders: "altair_lc_headers",
|
|
headers: "lc_headers",
|
|
altairSyncCommittees: "altair_sync_committees")).expect("Database OK")
|
|
defer: db.close()
|
|
|
|
let metadata = loadEth2Network(config.eth2Network)
|
|
for node in metadata.bootstrapNodes:
|
|
config.bootstrapNodes.add node
|
|
template cfg(): auto = metadata.cfg
|
|
|
|
let
|
|
genesisBytes = try: waitFor metadata.fetchGenesisBytes()
|
|
except CatchableError as err:
|
|
error "Failed to obtain genesis state",
|
|
source = metadata.genesis.sourceDesc,
|
|
err = err.msg
|
|
quit 1
|
|
genesisState =
|
|
try:
|
|
newClone(readSszForkedHashedBeaconState(cfg, genesisBytes))
|
|
except CatchableError as err:
|
|
raiseAssert "Invalid baked-in state: " & err.msg
|
|
|
|
genesisTime = getStateField(genesisState[], genesis_time)
|
|
beaconClock = BeaconClock.init(genesisTime).valueOr:
|
|
error "Invalid genesis time in state", genesisTime
|
|
quit 1
|
|
getBeaconTime = beaconClock.getBeaconTimeFn()
|
|
|
|
genesis_validators_root =
|
|
getStateField(genesisState[], genesis_validators_root)
|
|
forkDigests = newClone ForkDigests.init(cfg, genesis_validators_root)
|
|
|
|
genesisBlockRoot = get_initial_beacon_block(genesisState[]).root
|
|
|
|
rng = HmacDrbgContext.new()
|
|
netKeys = getRandomNetKeys(rng[])
|
|
network = createEth2Node(
|
|
rng, config, netKeys, cfg,
|
|
forkDigests, getBeaconTime, genesis_validators_root)
|
|
engineApiUrls = config.engineApiUrls
|
|
elManager =
|
|
if engineApiUrls.len > 0:
|
|
ELManager.new(
|
|
cfg,
|
|
metadata.depositContractBlock,
|
|
metadata.depositContractBlockHash,
|
|
db = nil,
|
|
engineApiUrls,
|
|
metadata.eth1Network)
|
|
else:
|
|
nil
|
|
|
|
optimisticHandler = proc(
|
|
signedBlock: ForkedSignedBeaconBlock
|
|
): Future[void] {.async: (raises: [CancelledError]).} =
|
|
withBlck(signedBlock):
|
|
when consensusFork >= ConsensusFork.Bellatrix:
|
|
if forkyBlck.message.is_execution_block:
|
|
template payload(): auto = forkyBlck.message.body.execution_payload
|
|
if elManager != nil and not payload.block_hash.isZero:
|
|
discard await elManager.newExecutionPayload(forkyBlck.message)
|
|
else: discard
|
|
optimisticProcessor = initOptimisticProcessor(
|
|
getBeaconTime, optimisticHandler)
|
|
|
|
lightClient = createLightClient(
|
|
network, rng, config, cfg, forkDigests, getBeaconTime,
|
|
genesis_validators_root, LightClientFinalizationMode.Optimistic)
|
|
|
|
# Run `exchangeTransitionConfiguration` loop
|
|
if elManager != nil:
|
|
elManager.start(syncChain = false)
|
|
|
|
info "Listening to incoming network requests"
|
|
network.registerProtocol(
|
|
PeerSync, PeerSync.NetworkState.init(
|
|
cfg, forkDigests, genesisBlockRoot, getBeaconTime))
|
|
|
|
withAll(ConsensusFork):
|
|
let forkDigest = forkDigests[].atConsensusFork(consensusFork)
|
|
network.addValidator(
|
|
getBeaconBlocksTopic(forkDigest), proc (
|
|
signedBlock: consensusFork.SignedBeaconBlock
|
|
): ValidationResult =
|
|
toValidationResult(
|
|
optimisticProcessor.processSignedBeaconBlock(signedBlock)))
|
|
lightClient.installMessageValidators()
|
|
waitFor network.startListening()
|
|
waitFor network.start()
|
|
|
|
func isSynced(optimisticSlot: Slot, wallSlot: Slot): bool =
|
|
# Check whether light client has synced sufficiently close to wall slot
|
|
const maxAge = 2 * SLOTS_PER_EPOCH
|
|
optimisticSlot >= max(wallSlot, maxAge.Slot) - maxAge
|
|
|
|
proc onFinalizedHeader(
|
|
lightClient: LightClient, finalizedHeader: ForkedLightClientHeader) =
|
|
withForkyHeader(finalizedHeader):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
info "New LC finalized header",
|
|
finalized_header = shortLog(forkyHeader)
|
|
let
|
|
period = forkyHeader.beacon.slot.sync_committee_period
|
|
syncCommittee = lightClient.finalizedSyncCommittee.expect("Init OK")
|
|
db.putSyncCommittee(period, syncCommittee)
|
|
db.putLatestFinalizedHeader(finalizedHeader)
|
|
|
|
var optimisticFcuFut: Future[(PayloadExecutionStatus, Opt[BlockHash])]
|
|
.Raising([CancelledError])
|
|
proc onOptimisticHeader(
|
|
lightClient: LightClient, optimisticHeader: ForkedLightClientHeader) =
|
|
if optimisticFcuFut != nil:
|
|
return
|
|
withForkyHeader(optimisticHeader):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
logScope: optimistic_header = shortLog(forkyHeader)
|
|
when lcDataFork >= LightClientDataFork.Capella:
|
|
let
|
|
bid = forkyHeader.beacon.toBlockId()
|
|
consensusFork = cfg.consensusForkAtEpoch(bid.slot.epoch)
|
|
blockHash = forkyHeader.execution.block_hash
|
|
|
|
info "New LC optimistic header"
|
|
if elManager == nil or blockHash.isZero or
|
|
not isSynced(bid.slot, getBeaconTime().slotOrZero()):
|
|
return
|
|
|
|
withConsensusFork(consensusFork):
|
|
when lcDataForkAtConsensusFork(consensusFork) == lcDataFork:
|
|
optimisticFcuFut = elManager.forkchoiceUpdated(
|
|
headBlockHash = blockHash,
|
|
safeBlockHash = blockHash, # stub value
|
|
finalizedBlockHash = ZERO_HASH,
|
|
payloadAttributes = Opt.none(consensusFork.PayloadAttributes))
|
|
optimisticFcuFut.addCallback do (future: pointer):
|
|
optimisticFcuFut = nil
|
|
else:
|
|
info "Ignoring new LC optimistic header until Capella"
|
|
|
|
lightClient.onFinalizedHeader = onFinalizedHeader
|
|
lightClient.onOptimisticHeader = onOptimisticHeader
|
|
lightClient.trustedBlockRoot = some config.trustedBlockRoot
|
|
|
|
let latestHeader = db.getLatestFinalizedHeader()
|
|
withForkyHeader(latestHeader):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
let
|
|
period = forkyHeader.beacon.slot.sync_committee_period
|
|
syncCommittee = db.getSyncCommittee(period)
|
|
if syncCommittee.isErr:
|
|
error "LC store lacks sync committee", finalized_header = forkyHeader
|
|
else:
|
|
lightClient.resetToFinalizedHeader(latestHeader, syncCommittee.get)
|
|
|
|
# Full blocks gossip is required to portably drive an EL client:
|
|
# - EL clients may not sync when only driven with `forkChoiceUpdated`,
|
|
# e.g., Geth: "Forkchoice requested unknown head"
|
|
# - `newPayload` requires the full `ExecutionPayload` (most of block content)
|
|
# - `ExecutionPayload` block hash is not available in
|
|
# `altair.LightClientHeader`, so won't be exchanged via light client gossip
|
|
#
|
|
# Future `ethereum/consensus-specs` versions may remove need for full blocks.
|
|
# Therefore, this current mechanism is to be seen as temporary; it is not
|
|
# optimized for reducing code duplication, e.g., with `nimbus_beacon_node`.
|
|
|
|
func isSynced(wallSlot: Slot): bool =
|
|
let optimisticHeader = lightClient.optimisticHeader
|
|
withForkyHeader(optimisticHeader):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
isSynced(forkyHeader.beacon.slot, wallSlot)
|
|
else:
|
|
false
|
|
|
|
func shouldSyncOptimistically(wallSlot: Slot): bool =
|
|
# Check whether an EL is connected
|
|
if elManager == nil:
|
|
return false
|
|
|
|
isSynced(wallSlot)
|
|
|
|
var blocksGossipState: GossipState = {}
|
|
proc updateBlocksGossipStatus(slot: Slot) =
|
|
let
|
|
isBehind = not shouldSyncOptimistically(slot)
|
|
|
|
targetGossipState = getTargetGossipState(
|
|
slot.epoch, cfg.ALTAIR_FORK_EPOCH, cfg.BELLATRIX_FORK_EPOCH,
|
|
cfg.CAPELLA_FORK_EPOCH, cfg.DENEB_FORK_EPOCH, FAR_FUTURE_EPOCH,
|
|
isBehind)
|
|
|
|
template currentGossipState(): auto = blocksGossipState
|
|
if currentGossipState == targetGossipState:
|
|
return
|
|
|
|
if currentGossipState.card == 0 and targetGossipState.card > 0:
|
|
debug "Enabling blocks topic subscriptions",
|
|
wallSlot = slot, targetGossipState
|
|
elif currentGossipState.card > 0 and targetGossipState.card == 0:
|
|
debug "Disabling blocks topic subscriptions",
|
|
wallSlot = slot
|
|
else:
|
|
# Individual forks added / removed
|
|
discard
|
|
|
|
let
|
|
newGossipForks = targetGossipState - currentGossipState
|
|
oldGossipForks = currentGossipState - targetGossipState
|
|
|
|
for gossipFork in oldGossipForks:
|
|
let forkDigest = forkDigests[].atConsensusFork(gossipFork)
|
|
network.unsubscribe(getBeaconBlocksTopic(forkDigest))
|
|
|
|
for gossipFork in newGossipForks:
|
|
let forkDigest = forkDigests[].atConsensusFork(gossipFork)
|
|
network.subscribe(
|
|
getBeaconBlocksTopic(forkDigest), blocksTopicParams,
|
|
enableTopicMetrics = true)
|
|
|
|
blocksGossipState = targetGossipState
|
|
|
|
proc onSlot(wallTime: BeaconTime, lastSlot: Slot) =
|
|
let
|
|
wallSlot = wallTime.slotOrZero()
|
|
expectedSlot = lastSlot + 1
|
|
delay = wallTime - expectedSlot.start_beacon_time()
|
|
|
|
finalizedHeader = lightClient.finalizedHeader
|
|
optimisticHeader = lightClient.optimisticHeader
|
|
|
|
finalizedBid = withForkyHeader(finalizedHeader):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
forkyHeader.beacon.toBlockId()
|
|
else:
|
|
BlockId(root: genesisBlockRoot, slot: GENESIS_SLOT)
|
|
optimisticBid = withForkyHeader(optimisticHeader):
|
|
when lcDataFork > LightClientDataFork.None:
|
|
forkyHeader.beacon.toBlockId()
|
|
else:
|
|
BlockId(root: genesisBlockRoot, slot: GENESIS_SLOT)
|
|
|
|
syncStatus =
|
|
if optimisticHeader.kind == LightClientDataFork.None:
|
|
"bootstrapping(" & $config.trustedBlockRoot & ")"
|
|
elif not isSynced(wallSlot):
|
|
"syncing"
|
|
else:
|
|
"synced"
|
|
|
|
info "Slot start",
|
|
slot = shortLog(wallSlot),
|
|
epoch = shortLog(wallSlot.epoch),
|
|
sync = syncStatus,
|
|
peers = len(network.peerPool),
|
|
head = shortLog(optimisticBid),
|
|
finalized = shortLog(finalizedBid),
|
|
delay = shortLog(delay)
|
|
|
|
proc runOnSlotLoop() {.async.} =
|
|
var
|
|
curSlot = getBeaconTime().slotOrZero()
|
|
nextSlot = curSlot + 1
|
|
timeToNextSlot = nextSlot.start_beacon_time() - getBeaconTime()
|
|
while true:
|
|
await sleepAsync(timeToNextSlot)
|
|
|
|
let
|
|
wallTime = getBeaconTime()
|
|
wallSlot = wallTime.slotOrZero()
|
|
|
|
onSlot(wallTime, curSlot)
|
|
|
|
curSlot = wallSlot
|
|
nextSlot = wallSlot + 1
|
|
timeToNextSlot = nextSlot.start_beacon_time() - getBeaconTime()
|
|
|
|
proc onSecond(time: Moment) =
|
|
let wallSlot = getBeaconTime().slotOrZero()
|
|
if checkIfShouldStopAtEpoch(wallSlot, config.stopAtEpoch):
|
|
quit(0)
|
|
|
|
updateBlocksGossipStatus(wallSlot + 1)
|
|
lightClient.updateGossipStatus(wallSlot + 1)
|
|
|
|
proc runOnSecondLoop() {.async.} =
|
|
let sleepTime = chronos.seconds(1)
|
|
while true:
|
|
let start = chronos.now(chronos.Moment)
|
|
await chronos.sleepAsync(sleepTime)
|
|
let afterSleep = chronos.now(chronos.Moment)
|
|
let sleepTime = afterSleep - start
|
|
onSecond(start)
|
|
let finished = chronos.now(chronos.Moment)
|
|
let processingTime = finished - afterSleep
|
|
trace "onSecond task completed", sleepTime, processingTime
|
|
|
|
onSecond(Moment.now())
|
|
lightClient.start()
|
|
|
|
asyncSpawn runOnSlotLoop()
|
|
asyncSpawn runOnSecondLoop()
|
|
while globalRunning:
|
|
poll()
|
|
|
|
notice "Exiting light client"
|