# Fluffy # Copyright (c) 2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import results, chronos, eth/p2p/discoveryv5/protocol, beacon_chain/spec/forks, stew/byteutils, ./network_metadata, ./eth_data/history_data_ssz_e2s, ./database/content_db, ./network/wire/[portal_stream, portal_protocol_config], ./network/beacon/[beacon_init_loader, beacon_light_client], ./network/history/[history_network, history_content], ./network/state/[state_network, state_content] export beacon_light_client, history_network, state_network, portal_protocol_config, forks type PortalNodeState* = enum Starting Running Stopping PortalNodeConfig* = object accumulatorFile*: Opt[string] disableStateRootValidation*: bool trustedBlockRoot*: Opt[Digest] portalConfig*: PortalProtocolConfig dataDir*: string storageCapacity*: uint64 contentRequestRetries*: int PortalNode* = ref object state*: PortalNodeState discovery: protocol.Protocol contentDB: ContentDB streamManager: StreamManager beaconNetwork*: Opt[BeaconNetwork] historyNetwork*: Opt[HistoryNetwork] stateNetwork*: Opt[StateNetwork] beaconLightClient*: Opt[LightClient] statusLogLoop: Future[void] # Beacon light client application callbacks triggered when new finalized header # or optimistic header is available. proc onFinalizedHeader( lightClient: LightClient, finalizedHeader: ForkedLightClientHeader ) = withForkyHeader(finalizedHeader): when lcDataFork > LightClientDataFork.None: info "New LC finalized header", finalized_header = shortLog(forkyHeader) proc onOptimisticHeader( lightClient: LightClient, optimisticHeader: ForkedLightClientHeader ) = withForkyHeader(optimisticHeader): when lcDataFork > LightClientDataFork.None: info "New LC optimistic header", optimistic_header = shortLog(forkyHeader) proc getDbDirectory*(network: PortalNetwork): string = if network == PortalNetwork.mainnet: "db" else: "db_" & network.symbolName() proc new*( T: type PortalNode, network: PortalNetwork, config: PortalNodeConfig, discovery: protocol.Protocol, subnetworks: set[PortalSubnetwork], bootstrapRecords: openArray[Record] = [], rng = newRng(), ): T = let # Store the database at contentdb prefixed with the first 8 chars of node id. # This is done because the content in the db is dependant on the `NodeId` and # the selected `Radius`. contentDB = ContentDB.new( config.dataDir / network.getDbDirectory() / "contentdb_" & discovery.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(), storageCapacity = config.storageCapacity, radiusConfig = config.portalConfig.radiusConfig, localId = discovery.localNode.id, ) # TODO: Portal works only over mainnet data currently networkData = loadNetworkData("mainnet") streamManager = StreamManager.new(discovery) accumulator = # Building an accumulator from header epoch files takes > 2m30s and is # thus not really a viable option at start-up. # Options are: # - Start with baked-in accumulator # - Start with file containing SSZ encoded accumulator if config.accumulatorFile.isSome: readAccumulator(config.accumulatorFile.value).expect( "Need a file with a valid SSZ encoded accumulator" ) else: # Get it from binary file containing SSZ encoded accumulator loadAccumulator() beaconNetwork = if PortalSubnetwork.beacon in subnetworks: let beaconDb = BeaconDb.new(networkData, config.dataDir / "db" / "beacon_db") beaconNetwork = BeaconNetwork.new( network, discovery, beaconDb, streamManager, networkData.forks, networkData.clock.getBeaconTimeFn(), networkData.metadata.cfg, config.trustedBlockRoot, bootstrapRecords = bootstrapRecords, portalConfig = config.portalConfig, ) Opt.some(beaconNetwork) else: Opt.none(BeaconNetwork) historyNetwork = if PortalSubnetwork.history in subnetworks: Opt.some( HistoryNetwork.new( network, discovery, contentDB, streamManager, accumulator, bootstrapRecords = bootstrapRecords, portalConfig = config.portalConfig, contentRequestRetries = config.contentRequestRetries, ) ) else: Opt.none(HistoryNetwork) stateNetwork = if PortalSubnetwork.state in subnetworks: Opt.some( StateNetwork.new( network, discovery, contentDB, streamManager, bootstrapRecords = bootstrapRecords, portalConfig = config.portalConfig, historyNetwork = historyNetwork, not config.disableStateRootValidation, contentRequestRetries = config.contentRequestRetries, ) ) else: Opt.none(StateNetwork) beaconLightClient = if beaconNetwork.isSome(): let beaconLightClient = LightClient.new( beaconNetwork.value, rng, networkData, LightClientFinalizationMode.Optimistic ) beaconLightClient.onFinalizedHeader = onFinalizedHeader beaconLightClient.onOptimisticHeader = onOptimisticHeader # TODO: # Quite dirty. Use register validate callbacks instead. Or, revisit # the object relationships regarding the beacon light client. beaconNetwork.value.processor = beaconLightClient.processor Opt.some(beaconLightClient) else: Opt.none(LightClient) PortalNode( discovery: discovery, contentDB: contentDB, streamManager: streamManager, beaconNetwork: beaconNetwork, historyNetwork: historyNetwork, stateNetwork: stateNetwork, beaconLightClient: beaconLightClient, ) proc statusLogLoop(n: PortalNode) {.async: (raises: []).} = try: while true: # This is the data radius percentage compared to full storage. This will # drop a lot when using the logbase2 scale, namely `/ 2` per 1 logaritmic # radius drop. # TODO: Get some float precision calculus? let radiusPercentage = n.contentDB.dataRadius div (UInt256.high() div u256(100)) info "Portal node status", radiusPercentage = radiusPercentage.toString(10) & "%", radius = n.contentDB.dataRadius.toHex(), dbSize = $(n.contentDB.size() div 1000) & "kb" await sleepAsync(60.seconds) except CancelledError: trace "statusLogLoop canceled" proc start*(n: PortalNode) = debug "Starting Portal node" n.discovery.start() if n.beaconNetwork.isSome(): n.beaconNetwork.value.start() if n.historyNetwork.isSome(): n.historyNetwork.value.start() if n.stateNetwork.isSome(): n.stateNetwork.value.start() if n.beaconLightClient.isSome(): n.beaconLightClient.value.start() n.statusLogLoop = statusLogLoop(n) n.state = PortalNodeState.Running proc stop*(n: PortalNode) {.async: (raises: []).} = debug "Stopping Portal node" var futures: seq[Future[void]] if n.beaconNetwork.isSome(): futures.add(n.beaconNetwork.value.stop()) if n.historyNetwork.isSome(): futures.add(n.historyNetwork.value.stop()) if n.stateNetwork.isSome(): futures.add(n.stateNetwork.value.stop()) if n.beaconLightClient.isSome(): futures.add(n.beaconLightClient.value.stop()) if not n.statusLogLoop.isNil(): futures.add(n.statusLogLoop.cancelAndWait()) await noCancel(allFutures(futures)) await n.discovery.closeWait() n.contentDB.close() n.statusLogLoop = nil