Move Portal wire and networks setup to new portal_node module (#2464)

This commit is contained in:
Kim De Mey 2024-07-09 19:22:25 +02:00 committed by GitHub
parent 25b5f01357
commit 54e3fd1a94
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 238 additions and 177 deletions

View File

@ -106,9 +106,6 @@ proc getPersistentEnr*(enrFilePath: string): Opt[enr.Record] =
let enrUri = readResult.get() let enrUri = readResult.get()
var record: enr.Record
# TODO: This old API of var passing is very error prone and should be
# changed in nim-eth.
let res = enr.Record.fromURI(enrUri) let res = enr.Record.fromURI(enrUri)
if res.isErr(): if res.isErr():
warn "Could not decode ENR from ENR file" warn "Could not decode ENR from ENR file"

View File

@ -23,54 +23,32 @@ import
eth/keys, eth/keys,
eth/net/nat, eth/net/nat,
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/protocol as discv5_protocol,
beacon_chain/beacon_clock,
beacon_chain/spec/forks,
beacon_chain/spec/datatypes/altair,
beacon_chain/gossip_processing/light_client_processor,
./conf, ./conf,
./network_metadata, ./network_metadata,
./common/common_utils, ./common/common_utils,
./rpc/ ./rpc/
[rpc_web3_api, rpc_eth_api, rpc_discovery_api, rpc_portal_api, rpc_portal_debug_api], [rpc_web3_api, rpc_eth_api, rpc_discovery_api, rpc_portal_api, rpc_portal_debug_api],
./network/state/[state_network, state_content],
./network/history/[history_network, history_content],
./network/beacon/[beacon_init_loader, beacon_light_client],
./network/wire/[portal_stream, portal_protocol_config, portal_protocol],
./eth_data/history_data_ssz_e2s,
./database/content_db, ./database/content_db,
./portal_node,
./version, ./version,
./logging ./logging
chronicles.formatIt(IoErrorCode): chronicles.formatIt(IoErrorCode):
$it $it
# Application callbacks used when new finalized header or optimistic header is func optionToOpt[T](o: Option[T]): Opt[T] =
# available. if o.isSome():
proc onFinalizedHeader( Opt.some(o.unsafeGet())
lightClient: LightClient, finalizedHeader: ForkedLightClientHeader
) =
withForkyHeader(finalizedHeader):
when lcDataFork > LightClientDataFork.None:
info "New LC finalized header", finalized_header = shortLog(forkyHeader)
proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: ForkedLightClientHeader
) =
withForkyHeader(optimisticHeader):
when lcDataFork > LightClientDataFork.None:
info "New LC optimistic header", optimistic_header = shortLog(forkyHeader)
proc getDbDirectory(network: PortalNetwork): string =
if network == PortalNetwork.mainnet:
"db"
else: else:
"db_" & network.symbolName() Opt.none(T)
proc run(config: PortalConf) {.raises: [CatchableError].} = proc run(config: PortalConf) {.raises: [CatchableError].} =
setupLogging(config.logLevel, config.logStdout) setupLogging(config.logLevel, config.logStdout)
notice "Launching Fluffy", version = fullVersionStr, cmdParams = commandLineParams() notice "Launching Fluffy", version = fullVersionStr, cmdParams = commandLineParams()
let rng = newRng()
# Make sure dataDir exists # Make sure dataDir exists
let pathExists = createPath(config.dataDir.string) let pathExists = createPath(config.dataDir.string)
if pathExists.isErr(): if pathExists.isErr():
@ -78,8 +56,8 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
dataDir = config.dataDir, error = pathExists.error dataDir = config.dataDir, error = pathExists.error
quit 1 quit 1
## Network configuration
let let
rng = newRng()
bindIp = config.listenAddress bindIp = config.listenAddress
udpPort = Port(config.udpPort) udpPort = Port(config.udpPort)
# TODO: allow for no TCP port mapping! # TODO: allow for no TCP port mapping!
@ -137,6 +115,7 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
if res.isOk(): if res.isOk():
bootstrapRecords.add(res.value) bootstrapRecords.add(res.value)
## Discovery v5 protocol setup
let let
discoveryConfig = discoveryConfig =
DiscoveryConfig.init(config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop) DiscoveryConfig.init(config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop)
@ -150,13 +129,7 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
# Might make this into a, default off, cli option. # Might make this into a, default off, cli option.
localEnrFields = {"c": enrClientInfoShort}, localEnrFields = {"c": enrClientInfoShort},
bootstrapRecords = bootstrapRecords, bootstrapRecords = bootstrapRecords,
previousRecord = previousRecord = previousEnr,
# TODO: discv5/enr code still uses Option, to be changed.
if previousEnr.isSome():
Opt.some(previousEnr.get())
else:
Opt.none(enr.Record)
,
bindIp = bindIp, bindIp = bindIp,
bindPort = udpPort, bindPort = udpPort,
enrAutoUpdate = config.enrAutoUpdate, enrAutoUpdate = config.enrAutoUpdate,
@ -166,7 +139,7 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
d.open() d.open()
# Force pruning ## Force pruning - optional
if config.forcePrune: if config.forcePrune:
let db = ContentDB.new( let db = ContentDB.new(
config.dataDir / portalNetwork.getDbDirectory() / "contentdb_" & config.dataDir / portalNetwork.getDbDirectory() / "contentdb_" &
@ -196,104 +169,33 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
db.forcePrune(d.localNode.id, radius) db.forcePrune(d.localNode.id, radius)
db.close() db.close()
# Store the database at contentdb prefixed with the first 8 chars of node id. ## Portal node setup
# This is done because the content in the db is dependant on the `NodeId` and
# the selected `Radius`.
let let
db = ContentDB.new( portalProtocolConfig = PortalProtocolConfig.init(
config.dataDir / portalNetwork.getDbDirectory() / "contentdb_" &
d.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(),
storageCapacity = config.storageCapacityMB * 1_000_000,
)
portalConfig = PortalProtocolConfig.init(
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig, config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.radiusConfig,
config.disablePoke, config.disablePoke,
) )
streamManager = StreamManager.new(d)
accumulator = portalNodeConfig = PortalNodeConfig(
# Building an accumulator from header epoch files takes > 2m30s and is accumulatorFile: config.accumulatorFile.optionToOpt().map(
# thus not really a viable option at start-up. proc(v: InputFile): string =
# Options are: $v
# - Start with baked-in accumulator ),
# - Start with file containing SSZ encoded accumulator disableStateRootValidation: config.disableStateRootValidation,
if config.accumulatorFile.isSome(): trustedBlockRoot: config.trustedBlockRoot.optionToOpt(),
readAccumulator(string config.accumulatorFile.get()).expect( portalConfig: portalProtocolConfig,
"Need a file with a valid SSZ encoded accumulator" dataDir: string config.dataDir,
storageCapacity: config.storageCapacityMB * 1_000_000,
) )
else:
# Get it from binary file containing SSZ encoded accumulator
loadAccumulator()
historyNetwork = node = PortalNode.new(
if PortalSubnetwork.history in portalSubnetworks:
Opt.some(
HistoryNetwork.new(
portalNetwork, portalNetwork,
portalNodeConfig,
d, d,
db, portalSubnetworks,
streamManager,
accumulator,
bootstrapRecords = bootstrapRecords, bootstrapRecords = bootstrapRecords,
portalConfig = portalConfig, rng = rng,
) )
)
else:
Opt.none(HistoryNetwork)
stateNetwork =
if PortalSubnetwork.state in portalSubnetworks:
Opt.some(
StateNetwork.new(
portalNetwork,
d,
db,
streamManager,
bootstrapRecords = bootstrapRecords,
portalConfig = portalConfig,
historyNetwork = historyNetwork,
not config.disableStateRootValidation,
)
)
else:
Opt.none(StateNetwork)
beaconLightClient =
# TODO: Currently disabled by default as it is not sufficiently polished.
# Eventually this should be always-on functionality.
if PortalSubnetwork.beacon in portalSubnetworks and
config.trustedBlockRoot.isSome():
let
# Portal works only over mainnet data currently
networkData = loadNetworkData("mainnet")
beaconDb = BeaconDb.new(networkData, config.dataDir / "db" / "beacon_db")
beaconNetwork = BeaconNetwork.new(
portalNetwork,
d,
beaconDb,
streamManager,
networkData.forks,
bootstrapRecords = bootstrapRecords,
portalConfig = portalConfig,
)
let beaconLightClient = LightClient.new(
beaconNetwork, rng, networkData, LightClientFinalizationMode.Optimistic
)
beaconLightClient.onFinalizedHeader = onFinalizedHeader
beaconLightClient.onOptimisticHeader = onOptimisticHeader
beaconLightClient.trustedBlockRoot = config.trustedBlockRoot
# TODO:
# Quite dirty. Use register validate callbacks instead. Or, revisit
# the object relationships regarding the beacon light client.
beaconNetwork.processor = beaconLightClient.processor
Opt.some(beaconLightClient)
else:
Opt.none(LightClient)
# TODO: If no new network key is generated then we should first check if an # TODO: If no new network key is generated then we should first check if an
# enr file exists, and in the case it does read out the seqNum from it and # enr file exists, and in the case it does read out the seqNum from it and
@ -322,41 +224,11 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
url, error_msg = exc.msg, error_name = exc.name url, error_msg = exc.msg, error_name = exc.name
quit QuitFailure quit QuitFailure
## Starting the different networks. ## Start discovery v5 protocol and the Portal node.
d.start() d.start()
if stateNetwork.isSome(): node.start()
stateNetwork.get().start()
if historyNetwork.isSome():
historyNetwork.get().start()
if beaconLightClient.isSome():
let lc = beaconLightClient.get()
lc.network.start()
lc.start()
proc onSecond(time: Moment) = ## Start the JSON-RPC APIs
discard
# TODO:
# Figure out what to do with this one.
# let wallSlot = lc.getBeaconTime().slotOrZero()
# lc.updateGossipStatus(wallSlot + 1)
proc runOnSecondLoop() {.async.} =
let sleepTime = chronos.seconds(1)
while true:
let start = chronos.now(chronos.Moment)
await chronos.sleepAsync(sleepTime)
let afterSleep = chronos.now(chronos.Moment)
let sleepTime = afterSleep - start
onSecond(start)
let finished = chronos.now(chronos.Moment)
let processingTime = finished - afterSleep
trace "onSecond task completed", sleepTime, processingTime
onSecond(Moment.now())
asyncSpawn runOnSecondLoop()
## Starting the JSON-RPC APIs
if config.rpcEnabled: if config.rpcEnabled:
let ta = initTAddress(config.rpcAddress, config.rpcPort) let ta = initTAddress(config.rpcAddress, config.rpcPort)
@ -368,23 +240,23 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
rpcHttpServerWithProxy.installDiscoveryApiHandlers(d) rpcHttpServerWithProxy.installDiscoveryApiHandlers(d)
rpcHttpServerWithProxy.installWeb3ApiHandlers() rpcHttpServerWithProxy.installWeb3ApiHandlers()
if stateNetwork.isSome(): if node.stateNetwork.isSome():
rpcHttpServerWithProxy.installPortalApiHandlers( rpcHttpServerWithProxy.installPortalApiHandlers(
stateNetwork.get().portalProtocol, "state" node.stateNetwork.value.portalProtocol, "state"
) )
if historyNetwork.isSome(): if node.historyNetwork.isSome():
rpcHttpServerWithProxy.installEthApiHandlers( rpcHttpServerWithProxy.installEthApiHandlers(
historyNetwork.get(), beaconLightClient, stateNetwork node.historyNetwork.value, node.beaconLightClient, node.stateNetwork
) )
rpcHttpServerWithProxy.installPortalApiHandlers( rpcHttpServerWithProxy.installPortalApiHandlers(
historyNetwork.get().portalProtocol, "history" node.historyNetwork.value.portalProtocol, "history"
) )
rpcHttpServerWithProxy.installPortalDebugApiHandlers( rpcHttpServerWithProxy.installPortalDebugApiHandlers(
historyNetwork.get().portalProtocol, "history" node.historyNetwork.value.portalProtocol, "history"
) )
if beaconLightClient.isSome(): if node.beaconNetwork.isSome():
rpcHttpServerWithProxy.installPortalApiHandlers( rpcHttpServerWithProxy.installPortalApiHandlers(
beaconLightClient.get().network.portalProtocol, "beacon" node.beaconNetwork.value.portalProtocol, "beacon"
) )
# TODO: Test proxy with remote node over HTTPS # TODO: Test proxy with remote node over HTTPS
waitFor rpcHttpServerWithProxy.start() waitFor rpcHttpServerWithProxy.start()

View File

@ -34,7 +34,7 @@ type
processor*: ref LightClientProcessor processor*: ref LightClientProcessor
manager: LightClientManager manager: LightClientManager
onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback onFinalizedHeader*, onOptimisticHeader*: LightClientHeaderCallback
trustedBlockRoot*: Option[Eth2Digest] trustedBlockRoot*: Opt[Eth2Digest]
func getFinalizedHeader*(lightClient: LightClient): ForkedLightClientHeader = func getFinalizedHeader*(lightClient: LightClient): ForkedLightClientHeader =
withForkyStore(lightClient.store[]): withForkyStore(lightClient.store[]):
@ -75,7 +75,11 @@ proc new*(
) )
func getTrustedBlockRoot(): Option[Eth2Digest] = func getTrustedBlockRoot(): Option[Eth2Digest] =
lightClient.trustedBlockRoot # TODO: use Opt in LC processor
if lightClient.trustedBlockRoot.isSome():
some(lightClient.trustedBlockRoot.value)
else:
none(Eth2Digest)
proc onStoreInitialized() = proc onStoreInitialized() =
discard discard

188
fluffy/portal_node.nim Normal file
View File

@ -0,0 +1,188 @@
# Fluffy
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
results,
eth/p2p/discoveryv5/protocol,
beacon_chain/spec/forks,
./network_metadata,
./eth_data/history_data_ssz_e2s,
./database/content_db,
./network/wire/[portal_stream, portal_protocol_config],
./network/beacon/[beacon_init_loader, beacon_light_client],
./network/history/[history_network, history_content],
./network/state/[state_network, state_content]
export
beacon_light_client, history_network, state_network, portal_protocol_config, forks
type
PortalNodeConfig* = object
accumulatorFile*: Opt[string]
disableStateRootValidation*: bool
trustedBlockRoot*: Opt[Digest]
portalConfig*: PortalProtocolConfig
dataDir*: string
storageCapacity*: uint64
PortalNode* = ref object
discovery: protocol.Protocol
contentDB: ContentDB
streamManager: StreamManager
beaconNetwork*: Opt[BeaconNetwork]
historyNetwork*: Opt[HistoryNetwork]
stateNetwork*: Opt[StateNetwork]
beaconLightClient*: Opt[LightClient]
# Beacon light client application callbacks triggered when new finalized header
# or optimistic header is available.
proc onFinalizedHeader(
lightClient: LightClient, finalizedHeader: ForkedLightClientHeader
) =
withForkyHeader(finalizedHeader):
when lcDataFork > LightClientDataFork.None:
info "New LC finalized header", finalized_header = shortLog(forkyHeader)
proc onOptimisticHeader(
lightClient: LightClient, optimisticHeader: ForkedLightClientHeader
) =
withForkyHeader(optimisticHeader):
when lcDataFork > LightClientDataFork.None:
info "New LC optimistic header", optimistic_header = shortLog(forkyHeader)
proc getDbDirectory*(network: PortalNetwork): string =
if network == PortalNetwork.mainnet:
"db"
else:
"db_" & network.symbolName()
proc new*(
T: type PortalNode,
network: PortalNetwork,
config: PortalNodeConfig,
discovery: protocol.Protocol,
subnetworks: set[PortalSubnetwork],
bootstrapRecords: openArray[Record] = [],
rng = newRng(),
): T =
let
# Store the database at contentdb prefixed with the first 8 chars of node id.
# This is done because the content in the db is dependant on the `NodeId` and
# the selected `Radius`.
contentDB = ContentDB.new(
config.dataDir / network.getDbDirectory() / "contentdb_" &
discovery.localNode.id.toBytesBE().toOpenArray(0, 8).toHex(),
storageCapacity = config.storageCapacity,
)
# TODO: Portal works only over mainnet data currently
networkData = loadNetworkData("mainnet")
streamManager = StreamManager.new(discovery)
accumulator =
# Building an accumulator from header epoch files takes > 2m30s and is
# thus not really a viable option at start-up.
# Options are:
# - Start with baked-in accumulator
# - Start with file containing SSZ encoded accumulator
if config.accumulatorFile.isSome:
readAccumulator(config.accumulatorFile.value).expect(
"Need a file with a valid SSZ encoded accumulator"
)
else:
# Get it from binary file containing SSZ encoded accumulator
loadAccumulator()
beaconNetwork =
if PortalSubnetwork.beacon in subnetworks and config.trustedBlockRoot.isSome():
let
beaconDb = BeaconDb.new(networkData, config.dataDir / "db" / "beacon_db")
beaconNetwork = BeaconNetwork.new(
network,
discovery,
beaconDb,
streamManager,
networkData.forks,
bootstrapRecords = bootstrapRecords,
portalConfig = config.portalConfig,
)
Opt.some(beaconNetwork)
else:
Opt.none(BeaconNetwork)
historyNetwork =
if PortalSubnetwork.history in subnetworks:
Opt.some(
HistoryNetwork.new(
network,
discovery,
contentDB,
streamManager,
accumulator,
bootstrapRecords = bootstrapRecords,
portalConfig = config.portalConfig,
)
)
else:
Opt.none(HistoryNetwork)
stateNetwork =
if PortalSubnetwork.state in subnetworks:
Opt.some(
StateNetwork.new(
network,
discovery,
contentDB,
streamManager,
bootstrapRecords = bootstrapRecords,
portalConfig = config.portalConfig,
historyNetwork = historyNetwork,
not config.disableStateRootValidation,
)
)
else:
Opt.none(StateNetwork)
beaconLightClient =
if beaconNetwork.isSome():
let beaconLightClient = LightClient.new(
beaconNetwork.value, rng, networkData, LightClientFinalizationMode.Optimistic
)
beaconLightClient.onFinalizedHeader = onFinalizedHeader
beaconLightClient.onOptimisticHeader = onOptimisticHeader
beaconLightClient.trustedBlockRoot = config.trustedBlockRoot
# TODO:
# Quite dirty. Use register validate callbacks instead. Or, revisit
# the object relationships regarding the beacon light client.
beaconNetwork.value.processor = beaconLightClient.processor
Opt.some(beaconLightClient)
else:
Opt.none(LightClient)
PortalNode(
discovery: discovery,
contentDB: contentDB,
streamManager: streamManager,
beaconNetwork: beaconNetwork,
historyNetwork: historyNetwork,
stateNetwork: stateNetwork,
beaconLightClient: beaconLightClient,
)
proc start*(n: PortalNode) =
if n.beaconNetwork.isSome():
n.beaconNetwork.value.start()
if n.historyNetwork.isSome():
n.historyNetwork.value.start()
if n.stateNetwork.isSome():
n.stateNetwork.value.start()
if n.beaconLightClient.isSome():
n.beaconLightClient.value.start()

View File

@ -76,7 +76,7 @@ procSuite "Portal Beacon Light Client":
lc.onFinalizedHeader = headerCallback(finalizedHeaders) lc.onFinalizedHeader = headerCallback(finalizedHeaders)
lc.onOptimisticHeader = headerCallback(optimisticHeaders) lc.onOptimisticHeader = headerCallback(optimisticHeaders)
lc.trustedBlockRoot = some bootstrapHeaderHash lc.trustedBlockRoot = Opt.some bootstrapHeaderHash
# When running start the beacon light client will first try to retrieve the # When running start the beacon light client will first try to retrieve the
# bootstrap for given trustedBlockRoot # bootstrap for given trustedBlockRoot