nimbus-eth1/fluffy/portal_node.nim

252 lines
8.0 KiB
Nim
Raw Normal View History

# Fluffy
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
results,
chronos,
eth/p2p/discoveryv5/protocol,
beacon_chain/spec/forks,
./network_metadata,
./eth_data/history_data_ssz_e2s,
./database/content_db,
./network/wire/[portal_stream, portal_protocol_config],
./network/beacon/[beacon_init_loader, beacon_light_client],
./network/history/[history_network, history_content],
./network/state/[state_network, state_content]
export
beacon_light_client, history_network, state_network, portal_protocol_config, forks
type
PortalNodeState* = enum
Starting
Running
Stopping
PortalNodeConfig* = object
accumulatorFile*: Opt[string]
disableStateRootValidation*: bool
trustedBlockRoot*: Opt[Digest]
portalConfig*: PortalProtocolConfig
dataDir*: string
storageCapacity*: uint64
contentRequestRetries*: int
PortalNode* = ref object
state*: PortalNodeState
discovery: protocol.Protocol
contentDB: ContentDB
streamManager: StreamManager
beaconNetwork*: Opt[BeaconNetwork]
historyNetwork*: Opt[HistoryNetwork]
stateNetwork*: Opt[StateNetwork]
beaconLightClient*: Opt[LightClient]
statusLogLoop: Future[void]
# Beacon light client application callbacks triggered when new finalized header
# or optimistic header is available.
proc onFinalizedHeader(
lightClient: LightClient, finalizedHeader: ForkedLightClientHeader
) =
withForkyHeader(finalizedHeader):
when lcDataFork > LightClientDataFork.None:
info "New LC finalized header", finalized_header = shortLog(forkyHeader)
proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: ForkedLightClientHeader
) =
withForkyHeader(optimisticHeader):
when lcDataFork > LightClientDataFork.None:
info "New LC optimistic header", optimistic_header = shortLog(forkyHeader)
proc getDbDirectory*(network: PortalNetwork): string =
if network == PortalNetwork.mainnet:
"db"
else:
"db_" & network.symbolName()
proc new*(
T: type PortalNode,
network: PortalNetwork,
config: PortalNodeConfig,
discovery: protocol.Protocol,
subnetworks: set[PortalSubnetwork],
bootstrapRecords: openArray[Record] = [],
rng = newRng(),
): T =
let
# Store the database at contentdb prefixed with the first 8 chars of node id.
# This is done because the content in the db is dependant on the `NodeId` and
# the selected `Radius`.
contentDB = ContentDB.new(
config.dataDir / network.getDbDirectory() / "contentdb_" &
discovery.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(),
storageCapacity = config.storageCapacity,
radiusConfig = config.portalConfig.radiusConfig,
localId = discovery.localNode.id,
)
# TODO: Portal works only over mainnet data currently
networkData = loadNetworkData("mainnet")
streamManager = StreamManager.new(discovery)
accumulator =
# Building an accumulator from header epoch files takes > 2m30s and is
# thus not really a viable option at start-up.
# Options are:
# - Start with baked-in accumulator
# - Start with file containing SSZ encoded accumulator
if config.accumulatorFile.isSome:
readAccumulator(config.accumulatorFile.value).expect(
"Need a file with a valid SSZ encoded accumulator"
)
else:
# Get it from binary file containing SSZ encoded accumulator
loadAccumulator()
beaconNetwork =
if PortalSubnetwork.beacon in subnetworks:
let
beaconDb = BeaconDb.new(networkData, config.dataDir / "db" / "beacon_db")
beaconNetwork = BeaconNetwork.new(
network,
discovery,
beaconDb,
streamManager,
networkData.forks,
networkData.clock.getBeaconTimeFn(),
networkData.metadata.cfg,
config.trustedBlockRoot,
bootstrapRecords = bootstrapRecords,
portalConfig = config.portalConfig,
)
Opt.some(beaconNetwork)
else:
Opt.none(BeaconNetwork)
historyNetwork =
if PortalSubnetwork.history in subnetworks:
Opt.some(
HistoryNetwork.new(
network,
discovery,
contentDB,
streamManager,
accumulator,
bootstrapRecords = bootstrapRecords,
portalConfig = config.portalConfig,
contentRequestRetries = config.contentRequestRetries,
)
)
else:
Opt.none(HistoryNetwork)
stateNetwork =
if PortalSubnetwork.state in subnetworks:
Opt.some(
StateNetwork.new(
network,
discovery,
contentDB,
streamManager,
bootstrapRecords = bootstrapRecords,
portalConfig = config.portalConfig,
historyNetwork = historyNetwork,
not config.disableStateRootValidation,
contentRequestRetries = config.contentRequestRetries,
)
)
else:
Opt.none(StateNetwork)
beaconLightClient =
if beaconNetwork.isSome():
let beaconLightClient = LightClient.new(
beaconNetwork.value, rng, networkData, LightClientFinalizationMode.Optimistic
)
beaconLightClient.onFinalizedHeader = onFinalizedHeader
beaconLightClient.onOptimisticHeader = onOptimisticHeader
# TODO:
# Quite dirty. Use register validate callbacks instead. Or, revisit
# the object relationships regarding the beacon light client.
beaconNetwork.value.processor = beaconLightClient.processor
Opt.some(beaconLightClient)
else:
Opt.none(LightClient)
PortalNode(
discovery: discovery,
contentDB: contentDB,
streamManager: streamManager,
beaconNetwork: beaconNetwork,
historyNetwork: historyNetwork,
stateNetwork: stateNetwork,
beaconLightClient: beaconLightClient,
)
proc statusLogLoop(n: PortalNode) {.async: (raises: []).} =
try:
while true:
# This is the data radius percentage compared to full storage. This will
# drop a lot when using the logbase2 scale, namely `/ 2` per 1 logaritmic
# radius drop.
# TODO: Get some float precision calculus?
let radiusPercentage = n.contentDB.dataRadius div (UInt256.high() div u256(100))
info "Portal node status",
radiusPercentage = radiusPercentage.toString(10) & "%",
radius = n.contentDB.dataRadius.toHex(),
dbSize = $(n.contentDB.size() div 1000) & "kb"
await sleepAsync(60.seconds)
except CancelledError:
trace "statusLogLoop canceled"
proc start*(n: PortalNode) =
debug "Starting Portal node"
n.discovery.start()
if n.beaconNetwork.isSome():
n.beaconNetwork.value.start()
if n.historyNetwork.isSome():
n.historyNetwork.value.start()
if n.stateNetwork.isSome():
n.stateNetwork.value.start()
if n.beaconLightClient.isSome():
n.beaconLightClient.value.start()
n.statusLogLoop = statusLogLoop(n)
n.state = PortalNodeState.Running
proc stop*(n: PortalNode) {.async: (raises: []).} =
debug "Stopping Portal node"
var futures: seq[Future[void]]
if n.beaconNetwork.isSome():
futures.add(n.beaconNetwork.value.stop())
if n.historyNetwork.isSome():
futures.add(n.historyNetwork.value.stop())
if n.stateNetwork.isSome():
futures.add(n.stateNetwork.value.stop())
if n.beaconLightClient.isSome():
futures.add(n.beaconLightClient.value.stop())
if not n.statusLogLoop.isNil():
futures.add(n.statusLogLoop.cancelAndWait())
await noCancel(allFutures(futures))
await n.discovery.closeWait()
n.contentDB.close()
n.statusLogLoop = nil