From 361fe2cdc7c48e458a9cd3bb1b38df28f3cda65e Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Sun, 3 Mar 2024 02:59:53 +0200 Subject: [PATCH] chore: moving node initialization code to node_factory.nim (#2479) --- apps/chat2/chat2.nim | 1 + apps/chat2bridge/chat2bridge.nim | 1 + apps/networkmonitor/networkmonitor.nim | 1 + apps/wakucanary/wakucanary.nim | 3 +- apps/wakunode2/app.nim | 352 +--------------- apps/wakunode2/wakunode2.nim | 2 +- examples/publisher.nim | 3 +- examples/subscriber.nim | 3 +- .../requests/node_lifecycle_request.nim | 2 +- tests/testlib/wakunode.nim | 5 +- tests/wakunode2/test_validators.nim | 4 +- tools/rln_db_inspector/rln_db_inspector.nim | 2 +- .../rln_keystore_generator.nim | 2 +- waku/{node => factory}/builder.nim | 7 +- .../factory}/external_config.nim | 12 +- .../factory}/internal_config.nim | 12 +- waku/factory/node_factory.nim | 375 ++++++++++++++++++ .../factory/validator_signed.nim | 6 +- waku/waku_node.nim | 2 - 19 files changed, 413 insertions(+), 382 deletions(-) rename waku/{node => factory}/builder.nim (98%) rename {apps/wakunode2 => waku/factory}/external_config.nim (98%) rename {apps/wakunode2 => waku/factory}/internal_config.nim (97%) create mode 100644 waku/factory/node_factory.nim rename apps/wakunode2/wakunode2_validator_signed.nim => waku/factory/validator_signed.nim (96%) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index c03653650..9cc2228c8 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -33,6 +33,7 @@ import ../../waku/waku_node, ../../waku/node/waku_metrics, ../../waku/node/peer_manager, + ../../waku/factory/builder, ../../waku/common/utils/nat, ./config_chat2 diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 38177118b..dba70be83 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -21,6 +21,7 @@ import ../../waku/waku_filter, ../../waku/waku_filter_v2, ../../waku/waku_store, + ../../waku/factory/builder, # Chat 2 imports ../chat2/chat2, # Common cli config diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index ee46338ac..0171001eb 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -28,6 +28,7 @@ import ../../waku/waku_discv5, ../../waku/waku_dnsdisc, ../../waku/waku_rln_relay, + ../../waku/factory/builder, ../wakunode2/networks_config, ./networkmonitor_metrics, ./networkmonitor_config, diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index 6d29a0adc..55d6e86d1 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -15,7 +15,8 @@ import ../../waku/waku_enr, ../../waku/node/peer_manager, ../../waku/waku_core, - ../../waku/waku_node + ../../waku/waku_node, + ../../waku/factory/builder # protocols and their tag const ProtocolsTable = { diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 092574f22..de6627901 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -29,7 +29,6 @@ import ../../waku/node/waku_metrics, ../../waku/node/peer_manager, ../../waku/node/peer_manager/peer_store/waku_peer_storage, - ../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../../waku/waku_api/message_cache, ../../waku/waku_api/handlers, ../../waku/waku_api/rest/server, @@ -51,9 +50,9 @@ import ../../waku/waku_lightpush/common, ../../waku/waku_filter, ../../waku/waku_filter_v2, - ./wakunode2_validator_signed, - ./internal_config, - ./external_config + ../../waku/factory/node_factory, + ../../waku/factory/internal_config, + ../../waku/factory/external_config logScope: topics = "wakunode app" @@ -135,21 +134,6 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T = node: nil ) - -## Peer persistence - -const PeerPersistenceDbUrl = "peers.db" -proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] = - let db = ? SqliteDatabase.new(PeerPersistenceDbUrl) - - ? peer_store_sqlite_migrations.migrate(db) - - let res = WakuPeerStorage.new(db) - if res.isErr(): - return err("failed to init peer store" & res.error) - - ok(some(res.value)) - proc setupPeerPersistence*(app: var App): AppResult[void] = if not app.conf.peerPersistence: return ok() @@ -162,38 +146,6 @@ proc setupPeerPersistence*(app: var App): AppResult[void] = ok() -## Retrieve dynamic bootstrap nodes (DNS discovery) - -proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, - dnsDiscoveryUrl: string, - dnsDiscoveryNameServers: seq[IpAddress]): - AppResult[seq[RemotePeerInfo]] = - - if dnsDiscovery and dnsDiscoveryUrl != "": - # DNS discovery - debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl - - var nameServers: seq[TransportAddress] - for ip in dnsDiscoveryNameServers: - nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 - - let dnsResolver = DnsResolver.new(nameServers) - - proc resolver(domain: string): Future[string] {.async, gcsafe.} = - trace "resolving", domain=domain - let resolved = await dnsResolver.resolveTxt(domain) - return resolved[0] # Use only first answer - - var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) - if wakuDnsDiscovery.isOk(): - return wakuDnsDiscovery.get().findPeers() - .mapErr(proc (e: cstring): string = $e) - else: - warn "Failed to init Waku DNS discovery" - - debug "No method for retrieving dynamic bootstrap nodes specified." - ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default - proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] = let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(app.conf.dnsDiscovery, app.conf.dnsDiscoveryUrl, @@ -243,58 +195,6 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 = app.node.topicSubscriptionQueue, ) -## Init waku node instance - -proc initNode(conf: WakuNodeConf, - netConfig: NetConfig, - rng: ref HmacDrbgContext, - nodeKey: crypto.PrivateKey, - record: enr.Record, - peerStore: Option[WakuPeerStorage], - dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): AppResult[WakuNode] = - - ## Setup a basic Waku v2 node based on a supplied configuration - ## file. Optionally include persistent peer storage. - ## No protocols are mounted yet. - - var dnsResolver: DnsResolver - if conf.dnsAddrs: - # Support for DNS multiaddrs - var nameServers: seq[TransportAddress] - for ip in conf.dnsAddrsNameServers: - nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 - - dnsResolver = DnsResolver.new(nameServers) - - var node: WakuNode - - let pStorage = if peerStore.isNone(): nil - else: peerStore.get() - - # Build waku node instance - var builder = WakuNodeBuilder.init() - builder.withRng(rng) - builder.withNodeKey(nodekey) - builder.withRecord(record) - builder.withNetworkConfiguration(netConfig) - builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity) - builder.withSwitchConfiguration( - maxConnections = some(conf.maxConnections.int), - secureKey = some(conf.websocketSecureKeyPath), - secureCert = some(conf.websocketSecureCertPath), - nameResolver = dnsResolver, - sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled - agentString = some(conf.agentString) - ) - builder.withColocationLimit(conf.colocationLimit) - builder.withPeerManagerConfig( - maxRelayPeers = conf.maxRelayPeers, - shardAware = conf.relayShardedPeerManagement,) - - node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err) - - ok(node) - proc setupWakuApp*(app: var App): AppResult[void] = ## Waku node let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes) @@ -379,209 +279,6 @@ proc updateApp(app: var App): AppResult[void] = return ok() -## Mount protocols - -proc setupProtocols(node: WakuNode, - conf: WakuNodeConf, - nodeKey: crypto.PrivateKey): - Future[AppResult[void]] {.async.} = - ## Setup configured protocols on an existing Waku v2 node. - ## Optionally include persistent message storage. - ## No protocols are started yet. - - node.mountMetadata(conf.clusterId).isOkOr: - return err("failed to mount waku metadata protocol: " & error) - - # Mount relay on all nodes - var peerExchangeHandler = none(RoutingRecordsHandler) - if conf.relayPeerExchange: - proc handlePeerExchange(peer: PeerId, topic: string, - peers: seq[RoutingRecordsPair]) {.gcsafe.} = - ## Handle peers received via gossipsub peer exchange - # TODO: Only consider peers on pubsub topics we subscribe to - let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records - .mapIt(toRemotePeerInfo(it.record.get())) - - debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len - - # asyncSpawn, as we don't want to block here - asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange") - - peerExchangeHandler = some(handlePeerExchange) - - if conf.relay: - let pubsubTopics = - if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0: - # TODO autoshard content topics only once. - # Already checked for errors in app.init - let shards = conf.contentTopics.mapIt(getShard(it).expect("Valid Shard")) - conf.pubsubTopics & shards - else: - conf.topics - - let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr: - return err("failed to parse 'max-num-bytes-msg-size' param: " & $error) - - debug "Setting max message size", num_bytes=parsedMaxMsgSize - - try: - await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler, - int(parsedMaxMsgSize)) - except CatchableError: - return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) - - # Add validation keys to protected topics - var subscribedProtectedTopics : seq[ProtectedTopic] - for topicKey in conf.protectedTopics: - if topicKey.topic notin pubsubTopics: - warn "protected topic not in subscribed pubsub topics, skipping adding validator", - protectedTopic=topicKey.topic, subscribedTopics=pubsubTopics - continue - subscribedProtectedTopics.add(topicKey) - notice "routing only signed traffic", protectedTopic=topicKey.topic, publicKey=topicKey.key - node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics) - - # Enable Rendezvous Discovery protocol when Relay is enabled - try: - await mountRendezvous(node) - except CatchableError: - return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) - - # Keepalive mounted on all nodes - try: - await mountLibp2pPing(node) - except CatchableError: - return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) - - var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = - ## Action to be taken when an internal error occurs during the node run. - ## e.g. the connection with the database is lost and not recovered. - error "Unrecoverable error occurred", error = msg - quit(QuitFailure) - - if conf.rlnRelay: - when defined(rln_v2): - let rlnConf = WakuRlnConfig( - rlnRelayDynamic: conf.rlnRelayDynamic, - rlnRelayCredIndex: conf.rlnRelayCredIndex, - rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, - rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress), - rlnRelayCredPath: conf.rlnRelayCredPath, - rlnRelayCredPassword: conf.rlnRelayCredPassword, - rlnRelayTreePath: conf.rlnRelayTreePath, - rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit, - rlnEpochSizeSec: conf.rlnEpochSizeSec, - onFatalErrorAction: onFatalErrorAction, - ) - else: - let rlnConf = WakuRlnConfig( - rlnRelayDynamic: conf.rlnRelayDynamic, - rlnRelayCredIndex: conf.rlnRelayCredIndex, - rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, - rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress), - rlnRelayCredPath: conf.rlnRelayCredPath, - rlnRelayCredPassword: conf.rlnRelayCredPassword, - rlnRelayTreePath: conf.rlnRelayTreePath, - rlnEpochSizeSec: conf.rlnEpochSizeSec, - onFatalErrorAction: onFatalErrorAction, - ) - - try: - waitFor node.mountRlnRelay(rlnConf) - except CatchableError: - return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) - - if conf.store: - # Archive setup - let archiveDriverRes = waitFor ArchiveDriver.new(conf.storeMessageDbUrl, - conf.storeMessageDbVacuum, - conf.storeMessageDbMigration, - conf.storeMaxNumDbConnections, - onFatalErrorAction) - if archiveDriverRes.isErr(): - return err("failed to setup archive driver: " & archiveDriverRes.error) - - let retPolicyRes = RetentionPolicy.new(conf.storeMessageRetentionPolicy) - if retPolicyRes.isErr(): - return err("failed to create retention policy: " & retPolicyRes.error) - - let mountArcRes = node.mountArchive(archiveDriverRes.get(), - retPolicyRes.get()) - if mountArcRes.isErr(): - return err("failed to mount waku archive protocol: " & mountArcRes.error) - - # Store setup - try: - await mountStore(node) - except CatchableError: - return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) - - mountStoreClient(node) - if conf.storenode != "": - let storeNode = parsePeerInfo(conf.storenode) - if storeNode.isOk(): - node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec) - else: - return err("failed to set node waku store peer: " & storeNode.error) - - # NOTE Must be mounted after relay - if conf.lightpush: - try: - await mountLightPush(node) - except CatchableError: - return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) - - if conf.lightpushnode != "": - let lightPushNode = parsePeerInfo(conf.lightpushnode) - if lightPushNode.isOk(): - mountLightPushClient(node) - node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) - else: - return err("failed to set node waku lightpush peer: " & lightPushNode.error) - - # Filter setup. NOTE Must be mounted after relay - if conf.filter: - try: - await mountLegacyFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) - except CatchableError: - return err("failed to mount waku legacy filter protocol: " & getCurrentExceptionMsg()) - - try: - await mountFilter(node, - subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout), - maxFilterPeers = conf.filterMaxPeersToServe, - maxFilterCriteriaPerPeer = conf.filterMaxCriteria) - except CatchableError: - return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) - - if conf.filternode != "": - let filterNode = parsePeerInfo(conf.filternode) - if filterNode.isOk(): - try: - await node.mountFilterClient() - node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec) - node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) - except CatchableError: - return err("failed to mount waku filter client protocol: " & getCurrentExceptionMsg()) - else: - return err("failed to set node waku filter peer: " & filterNode.error) - - # waku peer exchange setup - if conf.peerExchangeNode != "" or conf.peerExchange: - try: - await mountPeerExchange(node) - except CatchableError: - return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) - - if conf.peerExchangeNode != "": - let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode) - if peerExchangeNode.isOk(): - node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec) - else: - return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) - - return ok() - proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} = return await setupProtocols( app.node, @@ -589,49 +286,6 @@ proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} = app.key ) -## Start node - -proc startNode(node: WakuNode, conf: WakuNodeConf, - dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[AppResult[void]] {.async.} = - ## Start a configured node and all mounted protocols. - ## Connect to static nodes and start - ## keep-alive, if configured. - - # Start Waku v2 node - try: - await node.start() - except CatchableError: - return err("failed to start waku node: " & getCurrentExceptionMsg()) - - # Connect to configured static nodes - if conf.staticnodes.len > 0: - try: - await connectToNodes(node, conf.staticnodes, "static") - except CatchableError: - return err("failed to connect to static nodes: " & getCurrentExceptionMsg()) - - if dynamicBootstrapNodes.len > 0: - info "Connecting to dynamic bootstrap peers" - try: - await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap") - except CatchableError: - return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) - - # retrieve px peers and add the to the peer store - if conf.peerExchangeNode != "": - let desiredOutDegree = node.wakuRelay.parameters.d.uint64() - await node.fetchPeerExchangePeers(desiredOutDegree) - - # Start keepalive, if enabled - if conf.keepAlive: - node.startKeepalive() - - # Maintain relay connections - if conf.relay: - node.peerManager.start() - - return ok() - proc startApp*(app: var App): AppResult[void] = let nodeRes = catch: (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)) diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index e90eada2a..7f9661873 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -15,7 +15,7 @@ import import ../../tools/rln_keystore_generator/rln_keystore_generator, ../../waku/common/logging, - ./external_config, + ../../waku/factory/external_config, ./networks_config, ./app diff --git a/examples/publisher.nim b/examples/publisher.nim index 526340c44..800ea8932 100644 --- a/examples/publisher.nim +++ b/examples/publisher.nim @@ -15,7 +15,8 @@ import ../../../waku/waku_core, ../../../waku/waku_node, ../../../waku/waku_enr, - ../../../waku/waku_discv5 + ../../../waku/waku_discv5, + ../../../waku/factory/builder proc now*(): Timestamp = getNanosecondTime(getTime().toUnixFloat()) diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 5b5865647..a415cc419 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -15,7 +15,8 @@ import ../../../waku/waku_core, ../../../waku/waku_node, ../../../waku/waku_enr, - ../../../waku/waku_discv5 + ../../../waku/waku_discv5, + ../../../waku/factory/builder # An accesible bootstrap node. See wakuv2.prod fleets.status.im const bootstrapNode = "enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZl" & diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 8f41c26ed..8396c7cce 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -16,7 +16,6 @@ import ../../../../waku/node/peer_manager/peer_manager, ../../../../waku/waku_core, ../../../../waku/node/waku_node, - ../../../../waku/node/builder, ../../../../waku/node/config, ../../../../waku/waku_archive/driver/builder, ../../../../waku/waku_archive/driver, @@ -24,6 +23,7 @@ import ../../../../waku/waku_archive/retention_policy, ../../../../waku/waku_relay/protocol, ../../../../waku/waku_store, + ../../../../waku/factory/builder, ../../../events/[json_message_event,json_base_event], ../../../alloc, ../../config diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 6122a6d5b..71b4f08c2 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -13,8 +13,9 @@ import ../../../waku/node/peer_manager, ../../../waku/waku_enr, ../../../waku/waku_discv5, - ../../apps/wakunode2/external_config, - ../../apps/wakunode2/internal_config, + ../../../waku/factory/external_config, + ../../../waku/factory/internal_config, + ../../../waku/factory/builder, ./common diff --git a/tests/wakunode2/test_validators.nim b/tests/wakunode2/test_validators.nim index 8fe82bcca..2d6c826d6 100644 --- a/tests/wakunode2/test_validators.nim +++ b/tests/wakunode2/test_validators.nim @@ -13,12 +13,12 @@ import libp2p/multihash, secp256k1 import - ../../apps/wakunode2/external_config, - ../../apps/wakunode2/wakunode2_validator_signed, ../../waku/waku_core, ../../waku/node/peer_manager, ../../waku/waku_node, ../../waku/waku_relay, + ../../waku/factory/external_config, + ../../waku/factory/validator_signed, ../testlib/wakucore, ../testlib/wakunode diff --git a/tools/rln_db_inspector/rln_db_inspector.nim b/tools/rln_db_inspector/rln_db_inspector.nim index 864f50612..d7e076d14 100644 --- a/tools/rln_db_inspector/rln_db_inspector.nim +++ b/tools/rln_db_inspector/rln_db_inspector.nim @@ -11,7 +11,7 @@ import import ../../waku/waku_rln_relay/rln, ../../waku/waku_rln_relay/conversion_utils, - ./external_config + ../../waku/factory/external_config logScope: topics = "rln_db_inspector" diff --git a/tools/rln_keystore_generator/rln_keystore_generator.nim b/tools/rln_keystore_generator/rln_keystore_generator.nim index 14cde6af1..a2fb1cd16 100644 --- a/tools/rln_keystore_generator/rln_keystore_generator.nim +++ b/tools/rln_keystore_generator/rln_keystore_generator.nim @@ -13,7 +13,7 @@ import ../../waku/waku_rln_relay/rln, ../../waku/waku_rln_relay/conversion_utils, ../../waku/waku_rln_relay/group_manager/on_chain, - ../../apps/wakunode2/external_config + ../../waku/factory/external_config logScope: topics = "rln_keystore_generator" diff --git a/waku/node/builder.nim b/waku/factory/builder.nim similarity index 98% rename from waku/node/builder.nim rename to waku/factory/builder.nim index 85cede988..9bc985281 100644 --- a/waku/node/builder.nim +++ b/waku/factory/builder.nim @@ -15,11 +15,8 @@ import import ../waku_enr, ../waku_discv5, - ./config, - ./peer_manager, - ./waku_node, - ./waku_switch - + ../waku_node, + ../node/peer_manager type WakuNodeBuilder* = object diff --git a/apps/wakunode2/external_config.nim b/waku/factory/external_config.nim similarity index 98% rename from apps/wakunode2/external_config.nim rename to waku/factory/external_config.nim index 23be15bb4..514bb09c2 100644 --- a/apps/wakunode2/external_config.nim +++ b/waku/factory/external_config.nim @@ -14,14 +14,14 @@ import nimcrypto/utils, secp256k1 import - ../../waku/common/confutils/envvar/defs as confEnvvarDefs, - ../../waku/common/confutils/envvar/std/net as confEnvvarNet, - ../../waku/common/logging, - ../../waku/waku_enr, - ../../waku/node/peer_manager + ../common/confutils/envvar/defs as confEnvvarDefs, + ../common/confutils/envvar/std/net as confEnvvarNet, + ../common/logging, + ../waku_enr, + ../node/peer_manager include - ../../waku/waku_core/message/default_values + ../waku_core/message/default_values export confTomlDefs, diff --git a/apps/wakunode2/internal_config.nim b/waku/factory/internal_config.nim similarity index 97% rename from apps/wakunode2/internal_config.nim rename to waku/factory/internal_config.nim index 95c2a6c1c..132821060 100644 --- a/apps/wakunode2/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -8,12 +8,12 @@ import stew/results, stew/shims/net import - ../../waku/common/utils/nat, - ../../waku/node/config, - ../../waku/waku_enr/capabilities, - ../../waku/waku_enr, - ../../waku/waku_core, - ./external_config + ./external_config, + ../common/utils/nat, + ../node/config, + ../waku_enr/capabilities, + ../waku_enr, + ../waku_core proc enrConfiguration*(conf: WakuNodeConf, netConfig: NetConfig, key: crypto.PrivateKey): Result[enr.Record, string] = diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim new file mode 100644 index 000000000..6d5308459 --- /dev/null +++ b/waku/factory/node_factory.nim @@ -0,0 +1,375 @@ +import + std/[options, sequtils], + chronicles, + chronos, + libp2p/peerid, + libp2p/protocols/pubsub/gossipsub, + libp2p/nameresolving/dnsresolver, + libp2p/crypto/crypto + +import + ./internal_config, + ./external_config, + ./builder, + ./validator_signed, + ../waku_enr/sharding, + ../waku_node, + ../waku_core, + ../waku_rln_relay, + ../waku_dnsdisc, + ../waku_archive, + ../waku_store, + ../waku_filter, + ../waku_filter_v2, + ../waku_peer_exchange, + ../node/peer_manager, + ../node/peer_manager/peer_store/waku_peer_storage, + ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, + ../waku_lightpush/common, + ../waku_archive/driver/builder, + ../waku_archive/retention_policy/builder, + ../common/utils/parse_size_units + +## Peer persistence + +const PeerPersistenceDbUrl = "peers.db" +proc setupPeerStorage*(): Result[Option[WakuPeerStorage], string] = + let db = ? SqliteDatabase.new(PeerPersistenceDbUrl) + + ? peer_store_sqlite_migrations.migrate(db) + + let res = WakuPeerStorage.new(db) + if res.isErr(): + return err("failed to init peer store" & res.error) + + ok(some(res.value)) + +## Retrieve dynamic bootstrap nodes (DNS discovery) + +proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, + dnsDiscoveryUrl: string, + dnsDiscoveryNameServers: seq[IpAddress]): + Result[seq[RemotePeerInfo], string] = + + if dnsDiscovery and dnsDiscoveryUrl != "": + # DNS discovery + debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl + + var nameServers: seq[TransportAddress] + for ip in dnsDiscoveryNameServers: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + let dnsResolver = DnsResolver.new(nameServers) + + proc resolver(domain: string): Future[string] {.async, gcsafe.} = + trace "resolving", domain=domain + let resolved = await dnsResolver.resolveTxt(domain) + return resolved[0] # Use only first answer + + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) + if wakuDnsDiscovery.isOk(): + return wakuDnsDiscovery.get().findPeers() + .mapErr(proc (e: cstring): string = $e) + else: + warn "Failed to init Waku DNS discovery" + + debug "No method for retrieving dynamic bootstrap nodes specified." + ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default + +## Init waku node instance + +proc initNode*(conf: WakuNodeConf, + netConfig: NetConfig, + rng: ref HmacDrbgContext, + nodeKey: crypto.PrivateKey, + record: enr.Record, + peerStore: Option[WakuPeerStorage], + dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): Result[WakuNode, string] = + + ## Setup a basic Waku v2 node based on a supplied configuration + ## file. Optionally include persistent peer storage. + ## No protocols are mounted yet. + + var dnsResolver: DnsResolver + if conf.dnsAddrs: + # Support for DNS multiaddrs + var nameServers: seq[TransportAddress] + for ip in conf.dnsAddrsNameServers: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + dnsResolver = DnsResolver.new(nameServers) + + var node: WakuNode + + let pStorage = if peerStore.isNone(): nil + else: peerStore.get() + + # Build waku node instance + var builder = WakuNodeBuilder.init() + builder.withRng(rng) + builder.withNodeKey(nodekey) + builder.withRecord(record) + builder.withNetworkConfiguration(netConfig) + builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity) + builder.withSwitchConfiguration( + maxConnections = some(conf.maxConnections.int), + secureKey = some(conf.websocketSecureKeyPath), + secureCert = some(conf.websocketSecureCertPath), + nameResolver = dnsResolver, + sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled + agentString = some(conf.agentString) + ) + builder.withColocationLimit(conf.colocationLimit) + builder.withPeerManagerConfig( + maxRelayPeers = conf.maxRelayPeers, + shardAware = conf.relayShardedPeerManagement,) + + node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err) + + ok(node) + +## Mount protocols + +proc setupProtocols*(node: WakuNode, + conf: WakuNodeConf, + nodeKey: crypto.PrivateKey): + Future[Result[void, string]] {.async.} = + ## Setup configured protocols on an existing Waku v2 node. + ## Optionally include persistent message storage. + ## No protocols are started yet. + + node.mountMetadata(conf.clusterId).isOkOr: + return err("failed to mount waku metadata protocol: " & error) + + # Mount relay on all nodes + var peerExchangeHandler = none(RoutingRecordsHandler) + if conf.relayPeerExchange: + proc handlePeerExchange(peer: PeerId, topic: string, + peers: seq[RoutingRecordsPair]) {.gcsafe.} = + ## Handle peers received via gossipsub peer exchange + # TODO: Only consider peers on pubsub topics we subscribe to + let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records + .mapIt(toRemotePeerInfo(it.record.get())) + + debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len + + # asyncSpawn, as we don't want to block here + asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange") + + peerExchangeHandler = some(handlePeerExchange) + + if conf.relay: + let pubsubTopics = + if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0: + # TODO autoshard content topics only once. + # Already checked for errors in app.init + let shards = conf.contentTopics.mapIt(getShard(it).expect("Valid Shard")) + conf.pubsubTopics & shards + else: + conf.topics + + let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr: + return err("failed to parse 'max-num-bytes-msg-size' param: " & $error) + + debug "Setting max message size", num_bytes=parsedMaxMsgSize + + try: + await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler, + int(parsedMaxMsgSize)) + except CatchableError: + return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) + + # Add validation keys to protected topics + var subscribedProtectedTopics : seq[ProtectedTopic] + for topicKey in conf.protectedTopics: + if topicKey.topic notin pubsubTopics: + warn "protected topic not in subscribed pubsub topics, skipping adding validator", + protectedTopic=topicKey.topic, subscribedTopics=pubsubTopics + continue + subscribedProtectedTopics.add(topicKey) + notice "routing only signed traffic", protectedTopic=topicKey.topic, publicKey=topicKey.key + node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics) + + # Enable Rendezvous Discovery protocol when Relay is enabled + try: + await mountRendezvous(node) + except CatchableError: + return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) + + # Keepalive mounted on all nodes + try: + await mountLibp2pPing(node) + except CatchableError: + return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) + + var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = + ## Action to be taken when an internal error occurs during the node run. + ## e.g. the connection with the database is lost and not recovered. + error "Unrecoverable error occurred", error = msg + quit(QuitFailure) + + if conf.rlnRelay: + when defined(rln_v2): + let rlnConf = WakuRlnConfig( + rlnRelayDynamic: conf.rlnRelayDynamic, + rlnRelayCredIndex: conf.rlnRelayCredIndex, + rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, + rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress), + rlnRelayCredPath: conf.rlnRelayCredPath, + rlnRelayCredPassword: conf.rlnRelayCredPassword, + rlnRelayTreePath: conf.rlnRelayTreePath, + rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit, + rlnEpochSizeSec: conf.rlnEpochSizeSec, + onFatalErrorAction: onFatalErrorAction, + ) + else: + let rlnConf = WakuRlnConfig( + rlnRelayDynamic: conf.rlnRelayDynamic, + rlnRelayCredIndex: conf.rlnRelayCredIndex, + rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, + rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress), + rlnRelayCredPath: conf.rlnRelayCredPath, + rlnRelayCredPassword: conf.rlnRelayCredPassword, + rlnRelayTreePath: conf.rlnRelayTreePath, + rlnEpochSizeSec: conf.rlnEpochSizeSec, + onFatalErrorAction: onFatalErrorAction, + ) + + try: + waitFor node.mountRlnRelay(rlnConf) + except CatchableError: + return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) + + if conf.store: + # Archive setup + let archiveDriverRes = waitFor ArchiveDriver.new(conf.storeMessageDbUrl, + conf.storeMessageDbVacuum, + conf.storeMessageDbMigration, + conf.storeMaxNumDbConnections, + onFatalErrorAction) + if archiveDriverRes.isErr(): + return err("failed to setup archive driver: " & archiveDriverRes.error) + + let retPolicyRes = RetentionPolicy.new(conf.storeMessageRetentionPolicy) + if retPolicyRes.isErr(): + return err("failed to create retention policy: " & retPolicyRes.error) + + let mountArcRes = node.mountArchive(archiveDriverRes.get(), + retPolicyRes.get()) + if mountArcRes.isErr(): + return err("failed to mount waku archive protocol: " & mountArcRes.error) + + # Store setup + try: + await mountStore(node) + except CatchableError: + return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) + + mountStoreClient(node) + if conf.storenode != "": + let storeNode = parsePeerInfo(conf.storenode) + if storeNode.isOk(): + node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec) + else: + return err("failed to set node waku store peer: " & storeNode.error) + + # NOTE Must be mounted after relay + if conf.lightpush: + try: + await mountLightPush(node) + except CatchableError: + return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) + + if conf.lightpushnode != "": + let lightPushNode = parsePeerInfo(conf.lightpushnode) + if lightPushNode.isOk(): + mountLightPushClient(node) + node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) + else: + return err("failed to set node waku lightpush peer: " & lightPushNode.error) + + # Filter setup. NOTE Must be mounted after relay + if conf.filter: + try: + await mountLegacyFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) + except CatchableError: + return err("failed to mount waku legacy filter protocol: " & getCurrentExceptionMsg()) + + try: + await mountFilter(node, + subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout), + maxFilterPeers = conf.filterMaxPeersToServe, + maxFilterCriteriaPerPeer = conf.filterMaxCriteria) + except CatchableError: + return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) + + if conf.filternode != "": + let filterNode = parsePeerInfo(conf.filternode) + if filterNode.isOk(): + try: + await node.mountFilterClient() + node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec) + node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) + except CatchableError: + return err("failed to mount waku filter client protocol: " & getCurrentExceptionMsg()) + else: + return err("failed to set node waku filter peer: " & filterNode.error) + + # waku peer exchange setup + if conf.peerExchangeNode != "" or conf.peerExchange: + try: + await mountPeerExchange(node) + except CatchableError: + return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) + + if conf.peerExchangeNode != "": + let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode) + if peerExchangeNode.isOk(): + node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec) + else: + return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) + + return ok() + +## Start node + +proc startNode*(node: WakuNode, conf: WakuNodeConf, + dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[Result[void, string]] {.async.} = + ## Start a configured node and all mounted protocols. + ## Connect to static nodes and start + ## keep-alive, if configured. + + # Start Waku v2 node + try: + await node.start() + except CatchableError: + return err("failed to start waku node: " & getCurrentExceptionMsg()) + + # Connect to configured static nodes + if conf.staticnodes.len > 0: + try: + await connectToNodes(node, conf.staticnodes, "static") + except CatchableError: + return err("failed to connect to static nodes: " & getCurrentExceptionMsg()) + + if dynamicBootstrapNodes.len > 0: + info "Connecting to dynamic bootstrap peers" + try: + await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap") + except CatchableError: + return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) + + # retrieve px peers and add the to the peer store + if conf.peerExchangeNode != "": + let desiredOutDegree = node.wakuRelay.parameters.d.uint64() + await node.fetchPeerExchangePeers(desiredOutDegree) + + # Start keepalive, if enabled + if conf.keepAlive: + node.startKeepalive() + + # Maintain relay connections + if conf.relay: + node.peerManager.start() + + return ok() \ No newline at end of file diff --git a/apps/wakunode2/wakunode2_validator_signed.nim b/waku/factory/validator_signed.nim similarity index 96% rename from apps/wakunode2/wakunode2_validator_signed.nim rename to waku/factory/validator_signed.nim index 91896eda2..14901b70c 100644 --- a/apps/wakunode2/wakunode2_validator_signed.nim +++ b/waku/factory/validator_signed.nim @@ -19,9 +19,9 @@ import const MessageWindowInSec = 5*60 # +- 5 minutes import - ../../waku/waku_relay/protocol, - ../../waku/waku_core, - ./external_config + ./external_config, + ../waku_relay/protocol, + ../waku_core declarePublicCounter waku_msg_validator_signed_outcome, "number of messages for each validation outcome", ["result"] diff --git a/waku/waku_node.nim b/waku/waku_node.nim index 32dbfb174..66cff4521 100644 --- a/waku/waku_node.nim +++ b/waku/waku_node.nim @@ -1,11 +1,9 @@ import ./node/config, - ./node/builder, ./node/waku_switch as switch, ./node/waku_node as node export config, - builder, switch, node