nimbus-eth1/fluffy/fluffy.nim

405 lines
14 KiB
Nim

# Fluffy
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/[os, enumutils],
confutils,
confutils/std/net,
chronicles,
chronicles/topics_registry,
chronos,
metrics,
metrics/chronos_httpserver,
json_rpc/clients/httpclient,
json_rpc/rpcproxy,
results,
stew/[byteutils, io2],
eth/keys,
eth/net/nat,
eth/p2p/discoveryv5/protocol as discv5_protocol,
beacon_chain/beacon_clock,
beacon_chain/spec/forks,
beacon_chain/spec/datatypes/altair,
beacon_chain/gossip_processing/light_client_processor,
./conf,
./network_metadata,
./common/common_utils,
./rpc/
[rpc_web3_api, rpc_eth_api, rpc_discovery_api, rpc_portal_api, rpc_portal_debug_api],
./network/state/[state_network, state_content],
./network/history/[history_network, history_content],
./network/beacon/[beacon_init_loader, beacon_light_client],
./network/wire/[portal_stream, portal_protocol_config, portal_protocol],
./eth_data/history_data_ssz_e2s,
./database/content_db,
./version,
./logging
chronicles.formatIt(IoErrorCode):
$it
# Application callbacks used when new finalized header or optimistic header is
# available.
proc onFinalizedHeader(
lightClient: LightClient, finalizedHeader: ForkedLightClientHeader
) =
withForkyHeader(finalizedHeader):
when lcDataFork > LightClientDataFork.None:
info "New LC finalized header", finalized_header = shortLog(forkyHeader)
proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: ForkedLightClientHeader
) =
withForkyHeader(optimisticHeader):
when lcDataFork > LightClientDataFork.None:
info "New LC optimistic header", optimistic_header = shortLog(forkyHeader)
proc getDbDirectory(network: PortalNetwork): string =
if network == PortalNetwork.mainnet:
"db"
else:
"db_" & network.symbolName()
proc run(config: PortalConf) {.raises: [CatchableError].} =
setupLogging(config.logLevel, config.logStdout)
notice "Launching Fluffy", version = fullVersionStr, cmdParams = commandLineParams()
# Make sure dataDir exists
let pathExists = createPath(config.dataDir.string)
if pathExists.isErr():
fatal "Failed to create data directory",
dataDir = config.dataDir, error = pathExists.error
quit 1
let
rng = newRng()
bindIp = config.listenAddress
udpPort = Port(config.udpPort)
# TODO: allow for no TCP port mapping!
(extIp, _, extUdpPort) =
try:
setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "fluffy")
except CatchableError as exc:
raise exc # TODO: Ideally we don't have the Exception here
except Exception as exc:
raiseAssert exc.msg
(netkey, newNetKey) =
if config.networkKey.isSome():
(config.networkKey.get(), true)
else:
getPersistentNetKey(rng[], config.networkKeyFile)
enrFilePath = config.dataDir / "fluffy_node.enr"
previousEnr =
if not newNetKey:
getPersistentEnr(enrFilePath)
else:
Opt.none(enr.Record)
var bootstrapRecords: seq[Record]
loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
bootstrapRecords.add(config.bootstrapNodes)
let portalNetwork =
if config.portalNetworkDeprecated.isNone():
config.network
else:
warn "DEPRECATED: The --portal-network flag will be removed in the future, " &
"please use the drop in replacement --network flag instead"
config.portalNetworkDeprecated.get()
let portalSubnetworks =
if config.networksDeprecated == {}:
config.portalSubnetworks
else:
warn "DEPRECATED: The --networks flag will be removed in the future, " &
"please use the drop in replacement --portal-subnetworks flag instead"
config.networksDeprecated
case portalNetwork
of PortalNetwork.none:
discard # don't connect to any network bootstrap nodes
of PortalNetwork.mainnet:
for enrURI in mainnetBootstrapNodes:
var record: Record
if fromURI(record, enrURI):
bootstrapRecords.add(record)
of PortalNetwork.angelfood:
for enrURI in angelfoodBootstrapNodes:
var record: Record
if fromURI(record, enrURI):
bootstrapRecords.add(record)
let
discoveryConfig =
DiscoveryConfig.init(config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop)
d = newProtocol(
netkey,
extIp,
Opt.none(Port),
extUdpPort,
# Note: The addition of default clientInfo to the ENR is a temporary
# measure to easily identify & debug the clients used in the testnet.
# Might make this into a, default off, cli option.
localEnrFields = {"c": enrClientInfoShort},
bootstrapRecords = bootstrapRecords,
previousRecord =
# TODO: discv5/enr code still uses Option, to be changed.
if previousEnr.isSome():
Opt.some(previousEnr.get())
else:
Opt.none(enr.Record)
,
bindIp = bindIp,
bindPort = udpPort,
enrAutoUpdate = config.enrAutoUpdate,
config = discoveryConfig,
rng = rng,
)
d.open()
# Force pruning
if config.forcePrune:
let db = ContentDB.new(
config.dataDir / portalNetwork.getDbDirectory() / "contentdb_" &
d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(),
storageCapacity = config.storageCapacityMB * 1_000_000,
manualCheckpoint = true,
)
let radius =
if config.radiusConfig.kind == Static:
UInt256.fromLogRadius(config.radiusConfig.logRadius)
else:
let oldRadiusApproximation = db.getLargestDistance(d.localNode.id)
db.estimateNewRadius(oldRadiusApproximation)
# Note: In the case of dynamical radius this is all an approximation that
# heavily relies on uniformly distributed content and thus will always
# have an error margin, either down or up of the requested capacity.
# TODO I: Perhaps we want to add an offset to counter the latter.
# TODO II: Perhaps for dynamical radius, we want to also apply the vacuum
# without the forcePrune flag and purely by checking the amount of free
# space versus the pruning fraction. The problem with this is that the
# vacuum will temporarily double the space usage (WAL + DB) and thus to do
# this automatically without user requesting it could be dangerous.
# TODO III: Adding Radius metadata to the db could be yet another way to
# decide whether or not to force prune, instead of this flag.
db.forcePrune(d.localNode.id, radius)
db.close()
# Store the database at contentdb prefixed with the first 8 chars of node id.
# This is done because the content in the db is dependant on the `NodeId` and
# the selected `Radius`.
let
db = ContentDB.new(
config.dataDir / portalNetwork.getDbDirectory() / "contentdb_" &
d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(),
storageCapacity = config.storageCapacityMB * 1_000_000,
)
portalConfig = PortalProtocolConfig.init(
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig,
config.disablePoke,
)
streamManager = StreamManager.new(d)
accumulator =
# Building an accumulator from header epoch files takes > 2m30s and is
# thus not really a viable option at start-up.
# Options are:
# - Start with baked-in accumulator
# - Start with file containing SSZ encoded accumulator
if config.accumulatorFile.isSome():
readAccumulator(string config.accumulatorFile.get()).expect(
"Need a file with a valid SSZ encoded accumulator"
)
else:
# Get it from binary file containing SSZ encoded accumulator
loadAccumulator()
historyNetwork =
if PortalSubnetwork.history in portalSubnetworks:
Opt.some(
HistoryNetwork.new(
portalNetwork,
d,
db,
streamManager,
accumulator,
bootstrapRecords = bootstrapRecords,
portalConfig = portalConfig,
)
)
else:
Opt.none(HistoryNetwork)
stateNetwork =
if PortalSubnetwork.state in portalSubnetworks:
Opt.some(
StateNetwork.new(
portalNetwork,
d,
db,
streamManager,
bootstrapRecords = bootstrapRecords,
portalConfig = portalConfig,
historyNetwork = historyNetwork,
not config.disableStateRootValidation,
)
)
else:
Opt.none(StateNetwork)
beaconLightClient =
# TODO: Currently disabled by default as it is not sufficiently polished.
# Eventually this should be always-on functionality.
if PortalSubnetwork.beacon in portalSubnetworks and
config.trustedBlockRoot.isSome():
let
# Portal works only over mainnet data currently
networkData = loadNetworkData("mainnet")
beaconDb = BeaconDb.new(networkData, config.dataDir / "db" / "beacon_db")
beaconNetwork = BeaconNetwork.new(
portalNetwork,
d,
beaconDb,
streamManager,
networkData.forks,
bootstrapRecords = bootstrapRecords,
portalConfig = portalConfig,
)
let beaconLightClient = LightClient.new(
beaconNetwork, rng, networkData, LightClientFinalizationMode.Optimistic
)
beaconLightClient.onFinalizedHeader = onFinalizedHeader
beaconLightClient.onOptimisticHeader = onOptimisticHeader
beaconLightClient.trustedBlockRoot = config.trustedBlockRoot
# TODO:
# Quite dirty. Use register validate callbacks instead. Or, revisit
# the object relationships regarding the beacon light client.
beaconNetwork.processor = beaconLightClient.processor
Opt.some(beaconLightClient)
else:
Opt.none(LightClient)
# TODO: If no new network key is generated then we should first check if an
# enr file exists, and in the case it does read out the seqNum from it and
# reuse that.
let enrFile = config.dataDir / "fluffy_node.enr"
if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr:
fatal "Failed to write the enr file", file = enrFile
quit 1
## Start metrics HTTP server
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
url = "http://" & $address & ":" & $port & "/metrics"
server = MetricsHttpServerRef.new($address, port).valueOr:
error "Could not instantiate metrics HTTP server", url, error
quit QuitFailure
info "Starting metrics HTTP server", url
try:
waitFor server.start()
except MetricsError as exc:
fatal "Could not start metrics HTTP server",
url, error_msg = exc.msg, error_name = exc.name
quit QuitFailure
## Starting the different networks.
d.start()
if stateNetwork.isSome():
stateNetwork.get().start()
if historyNetwork.isSome():
historyNetwork.get().start()
if beaconLightClient.isSome():
let lc = beaconLightClient.get()
lc.network.start()
lc.start()
proc onSecond(time: Moment) =
discard
# TODO:
# Figure out what to do with this one.
# let wallSlot = lc.getBeaconTime().slotOrZero()
# lc.updateGossipStatus(wallSlot + 1)
proc runOnSecondLoop() {.async.} =
let sleepTime = chronos.seconds(1)
while true:
let start = chronos.now(chronos.Moment)
await chronos.sleepAsync(sleepTime)
let afterSleep = chronos.now(chronos.Moment)
let sleepTime = afterSleep - start
onSecond(start)
let finished = chronos.now(chronos.Moment)
let processingTime = finished - afterSleep
trace "onSecond task completed", sleepTime, processingTime
onSecond(Moment.now())
asyncSpawn runOnSecondLoop()
## Starting the JSON-RPC APIs
if config.rpcEnabled:
let ta = initTAddress(config.rpcAddress, config.rpcPort)
let rpcHttpServer = RpcHttpServer.new()
# Note: Set maxRequestBodySize to 4MB instead of 1MB as there are blocks
# that reach that limit (in hex, for gossip method).
rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576)
var rpcHttpServerWithProxy = RpcProxy.new(rpcHttpServer, config.proxyUri)
rpcHttpServerWithProxy.installDiscoveryApiHandlers(d)
rpcHttpServerWithProxy.installWeb3ApiHandlers()
if stateNetwork.isSome():
rpcHttpServerWithProxy.installPortalApiHandlers(
stateNetwork.get().portalProtocol, "state"
)
if historyNetwork.isSome():
rpcHttpServerWithProxy.installEthApiHandlers(
historyNetwork.get(), beaconLightClient, stateNetwork
)
rpcHttpServerWithProxy.installPortalApiHandlers(
historyNetwork.get().portalProtocol, "history"
)
rpcHttpServerWithProxy.installPortalDebugApiHandlers(
historyNetwork.get().portalProtocol, "history"
)
if beaconLightClient.isSome():
rpcHttpServerWithProxy.installPortalApiHandlers(
beaconLightClient.get().network.portalProtocol, "beacon"
)
# TODO: Test proxy with remote node over HTTPS
waitFor rpcHttpServerWithProxy.start()
runForever()
when isMainModule:
{.pop.}
let config = PortalConf.load(
version = clientName & " " & fullVersionStr & "\p\p" & nimBanner,
copyrightBanner = copyrightBanner,
)
{.push raises: [].}
case config.cmd
of PortalCmd.noCommand:
run(config)