# Fluffy # Copyright (c) 2021-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import std/[os, enumutils], confutils, confutils/std/net, chronicles, chronicles/topics_registry, chronos, metrics, metrics/chronos_httpserver, json_rpc/clients/httpclient, json_rpc/rpcproxy, results, stew/[byteutils, io2], eth/keys, eth/net/nat, eth/p2p/discoveryv5/protocol as discv5_protocol, beacon_chain/beacon_clock, beacon_chain/spec/forks, beacon_chain/spec/datatypes/altair, beacon_chain/gossip_processing/light_client_processor, ./conf, ./network_metadata, ./common/common_utils, ./rpc/ [rpc_web3_api, rpc_eth_api, rpc_discovery_api, rpc_portal_api, rpc_portal_debug_api], ./network/state/[state_network, state_content], ./network/history/[history_network, history_content], ./network/beacon/[beacon_init_loader, beacon_light_client], ./network/wire/[portal_stream, portal_protocol_config, portal_protocol], ./eth_data/history_data_ssz_e2s, ./database/content_db, ./version, ./logging chronicles.formatIt(IoErrorCode): $it # Application callbacks used when new finalized header or optimistic header is # available. proc onFinalizedHeader( lightClient: LightClient, finalizedHeader: ForkedLightClientHeader ) = withForkyHeader(finalizedHeader): when lcDataFork > LightClientDataFork.None: info "New LC finalized header", finalized_header = shortLog(forkyHeader) proc onOptimisticHeader( lightClient: LightClient, optimisticHeader: ForkedLightClientHeader ) = withForkyHeader(optimisticHeader): when lcDataFork > LightClientDataFork.None: info "New LC optimistic header", optimistic_header = shortLog(forkyHeader) proc getDbDirectory(network: PortalNetwork): string = if network == PortalNetwork.mainnet: "db" else: "db_" & network.symbolName() proc run(config: PortalConf) {.raises: [CatchableError].} = setupLogging(config.logLevel, config.logStdout) notice "Launching Fluffy", version = fullVersionStr, cmdParams = commandLineParams() # Make sure dataDir exists let pathExists = createPath(config.dataDir.string) if pathExists.isErr(): fatal "Failed to create data directory", dataDir = config.dataDir, error = pathExists.error quit 1 let rng = newRng() bindIp = config.listenAddress udpPort = Port(config.udpPort) # TODO: allow for no TCP port mapping! (extIp, _, extUdpPort) = try: setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "fluffy") except CatchableError as exc: raise exc # TODO: Ideally we don't have the Exception here except Exception as exc: raiseAssert exc.msg (netkey, newNetKey) = if config.networkKey.isSome(): (config.networkKey.get(), true) else: getPersistentNetKey(rng[], config.networkKeyFile) enrFilePath = config.dataDir / "fluffy_node.enr" previousEnr = if not newNetKey: getPersistentEnr(enrFilePath) else: Opt.none(enr.Record) var bootstrapRecords: seq[Record] loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords) bootstrapRecords.add(config.bootstrapNodes) let portalNetwork = if config.portalNetworkDeprecated.isNone(): config.network else: warn "DEPRECATED: The --portal-network flag will be removed in the future, " & "please use the drop in replacement --network flag instead" config.portalNetworkDeprecated.get() let portalSubnetworks = if config.networksDeprecated == {}: config.portalSubnetworks else: warn "DEPRECATED: The --networks flag will be removed in the future, " & "please use the drop in replacement --portal-subnetworks flag instead" config.networksDeprecated case portalNetwork of PortalNetwork.none: discard # don't connect to any network bootstrap nodes of PortalNetwork.mainnet: for enrURI in mainnetBootstrapNodes: let res = enr.Record.fromURI(enrURI) if res.isOk(): bootstrapRecords.add(res.value) of PortalNetwork.angelfood: for enrURI in angelfoodBootstrapNodes: let res = enr.Record.fromURI(enrURI) if res.isOk(): bootstrapRecords.add(res.value) let discoveryConfig = DiscoveryConfig.init(config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop) d = newProtocol( netkey, extIp, Opt.none(Port), extUdpPort, # Note: The addition of default clientInfo to the ENR is a temporary # measure to easily identify & debug the clients used in the testnet. # Might make this into a, default off, cli option. localEnrFields = {"c": enrClientInfoShort}, bootstrapRecords = bootstrapRecords, previousRecord = # TODO: discv5/enr code still uses Option, to be changed. if previousEnr.isSome(): Opt.some(previousEnr.get()) else: Opt.none(enr.Record) , bindIp = bindIp, bindPort = udpPort, enrAutoUpdate = config.enrAutoUpdate, config = discoveryConfig, rng = rng, ) d.open() # Force pruning if config.forcePrune: let db = ContentDB.new( config.dataDir / portalNetwork.getDbDirectory() / "contentdb_" & d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(), storageCapacity = config.storageCapacityMB * 1_000_000, manualCheckpoint = true, ) let radius = if config.radiusConfig.kind == Static: UInt256.fromLogRadius(config.radiusConfig.logRadius) else: let oldRadiusApproximation = db.getLargestDistance(d.localNode.id) db.estimateNewRadius(oldRadiusApproximation) # Note: In the case of dynamical radius this is all an approximation that # heavily relies on uniformly distributed content and thus will always # have an error margin, either down or up of the requested capacity. # TODO I: Perhaps we want to add an offset to counter the latter. # TODO II: Perhaps for dynamical radius, we want to also apply the vacuum # without the forcePrune flag and purely by checking the amount of free # space versus the pruning fraction. The problem with this is that the # vacuum will temporarily double the space usage (WAL + DB) and thus to do # this automatically without user requesting it could be dangerous. # TODO III: Adding Radius metadata to the db could be yet another way to # decide whether or not to force prune, instead of this flag. db.forcePrune(d.localNode.id, radius) db.close() # Store the database at contentdb prefixed with the first 8 chars of node id. # This is done because the content in the db is dependant on the `NodeId` and # the selected `Radius`. let db = ContentDB.new( config.dataDir / portalNetwork.getDbDirectory() / "contentdb_" & d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(), storageCapacity = config.storageCapacityMB * 1_000_000, ) portalConfig = PortalProtocolConfig.init( config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig, config.disablePoke, ) streamManager = StreamManager.new(d) accumulator = # Building an accumulator from header epoch files takes > 2m30s and is # thus not really a viable option at start-up. # Options are: # - Start with baked-in accumulator # - Start with file containing SSZ encoded accumulator if config.accumulatorFile.isSome(): readAccumulator(string config.accumulatorFile.get()).expect( "Need a file with a valid SSZ encoded accumulator" ) else: # Get it from binary file containing SSZ encoded accumulator loadAccumulator() historyNetwork = if PortalSubnetwork.history in portalSubnetworks: Opt.some( HistoryNetwork.new( portalNetwork, d, db, streamManager, accumulator, bootstrapRecords = bootstrapRecords, portalConfig = portalConfig, ) ) else: Opt.none(HistoryNetwork) stateNetwork = if PortalSubnetwork.state in portalSubnetworks: Opt.some( StateNetwork.new( portalNetwork, d, db, streamManager, bootstrapRecords = bootstrapRecords, portalConfig = portalConfig, historyNetwork = historyNetwork, not config.disableStateRootValidation, ) ) else: Opt.none(StateNetwork) beaconLightClient = # TODO: Currently disabled by default as it is not sufficiently polished. # Eventually this should be always-on functionality. if PortalSubnetwork.beacon in portalSubnetworks and config.trustedBlockRoot.isSome(): let # Portal works only over mainnet data currently networkData = loadNetworkData("mainnet") beaconDb = BeaconDb.new(networkData, config.dataDir / "db" / "beacon_db") beaconNetwork = BeaconNetwork.new( portalNetwork, d, beaconDb, streamManager, networkData.forks, bootstrapRecords = bootstrapRecords, portalConfig = portalConfig, ) let beaconLightClient = LightClient.new( beaconNetwork, rng, networkData, LightClientFinalizationMode.Optimistic ) beaconLightClient.onFinalizedHeader = onFinalizedHeader beaconLightClient.onOptimisticHeader = onOptimisticHeader beaconLightClient.trustedBlockRoot = config.trustedBlockRoot # TODO: # Quite dirty. Use register validate callbacks instead. Or, revisit # the object relationships regarding the beacon light client. beaconNetwork.processor = beaconLightClient.processor Opt.some(beaconLightClient) else: Opt.none(LightClient) # TODO: If no new network key is generated then we should first check if an # enr file exists, and in the case it does read out the seqNum from it and # reuse that. let enrFile = config.dataDir / "fluffy_node.enr" if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr: fatal "Failed to write the enr file", file = enrFile quit 1 ## Start metrics HTTP server if config.metricsEnabled: let address = config.metricsAddress port = config.metricsPort url = "http://" & $address & ":" & $port & "/metrics" server = MetricsHttpServerRef.new($address, port).valueOr: error "Could not instantiate metrics HTTP server", url, error quit QuitFailure info "Starting metrics HTTP server", url try: waitFor server.start() except MetricsError as exc: fatal "Could not start metrics HTTP server", url, error_msg = exc.msg, error_name = exc.name quit QuitFailure ## Starting the different networks. d.start() if stateNetwork.isSome(): stateNetwork.get().start() if historyNetwork.isSome(): historyNetwork.get().start() if beaconLightClient.isSome(): let lc = beaconLightClient.get() lc.network.start() lc.start() proc onSecond(time: Moment) = discard # TODO: # Figure out what to do with this one. # let wallSlot = lc.getBeaconTime().slotOrZero() # lc.updateGossipStatus(wallSlot + 1) proc runOnSecondLoop() {.async.} = let sleepTime = chronos.seconds(1) while true: let start = chronos.now(chronos.Moment) await chronos.sleepAsync(sleepTime) let afterSleep = chronos.now(chronos.Moment) let sleepTime = afterSleep - start onSecond(start) let finished = chronos.now(chronos.Moment) let processingTime = finished - afterSleep trace "onSecond task completed", sleepTime, processingTime onSecond(Moment.now()) asyncSpawn runOnSecondLoop() ## Starting the JSON-RPC APIs if config.rpcEnabled: let ta = initTAddress(config.rpcAddress, config.rpcPort) let rpcHttpServer = RpcHttpServer.new() # Note: Set maxRequestBodySize to 4MB instead of 1MB as there are blocks # that reach that limit (in hex, for gossip method). rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576) var rpcHttpServerWithProxy = RpcProxy.new(rpcHttpServer, config.proxyUri) rpcHttpServerWithProxy.installDiscoveryApiHandlers(d) rpcHttpServerWithProxy.installWeb3ApiHandlers() if stateNetwork.isSome(): rpcHttpServerWithProxy.installPortalApiHandlers( stateNetwork.get().portalProtocol, "state" ) if historyNetwork.isSome(): rpcHttpServerWithProxy.installEthApiHandlers( historyNetwork.get(), beaconLightClient, stateNetwork ) rpcHttpServerWithProxy.installPortalApiHandlers( historyNetwork.get().portalProtocol, "history" ) rpcHttpServerWithProxy.installPortalDebugApiHandlers( historyNetwork.get().portalProtocol, "history" ) if beaconLightClient.isSome(): rpcHttpServerWithProxy.installPortalApiHandlers( beaconLightClient.get().network.portalProtocol, "beacon" ) # TODO: Test proxy with remote node over HTTPS waitFor rpcHttpServerWithProxy.start() runForever() when isMainModule: {.pop.} let config = PortalConf.load( version = clientName & " " & fullVersionStr & "\p\p" & nimBanner, copyrightBanner = copyrightBanner, ) {.push raises: [].} case config.cmd of PortalCmd.noCommand: run(config)