From c8081c885975d5b39803ee086df8d4b59af54f34 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Tue, 25 Apr 2023 15:34:57 +0200 Subject: [PATCH] refactor(wakunode2): split setup logic into app module --- .github/workflows/ci-experimental.yml | 2 +- .github/workflows/ci.yml | 2 +- Makefile | 10 +- apps/wakunode2/app.nim | 771 ++++++++++++++++++++++++ apps/wakunode2/wakunode2.nim | 749 ++--------------------- apps/wakunode2/wakunode2_setup_rest.nim | 15 +- apps/wakunode2/wakunode2_setup_rpc.nim | 17 +- tests/all_tests_wakunode2.nim | 4 + tests/wakunode2/test_app.nim | 82 +++ waku.nimble | 9 +- 10 files changed, 935 insertions(+), 726 deletions(-) create mode 100644 apps/wakunode2/app.nim create mode 100644 tests/all_tests_wakunode2.nim create mode 100644 tests/wakunode2/test_app.nim diff --git a/.github/workflows/ci-experimental.yml b/.github/workflows/ci-experimental.yml index bb07af61c..2081edb8b 100644 --- a/.github/workflows/ci-experimental.yml +++ b/.github/workflows/ci-experimental.yml @@ -122,4 +122,4 @@ jobs: key: ${{ runner.os }}-zerokit-${{ steps.submodules.outputs.zerokit-hash }} - name: Run tests - run: make V=1 LOG_LEVEL=DEBUG test2 + run: make V=1 LOG_LEVEL=DEBUG test2 testwakunode2 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 568536a50..fe274d89e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -117,7 +117,7 @@ jobs: key: ${{ runner.os }}-nim-${{ steps.submodules.outputs.nim-hash }} - name: Run tests - run: make V=1 LOG_LEVEL=DEBUG test2 + run: make V=1 LOG_LEVEL=DEBUG test2 testwakunode2 build-legacy: diff --git a/Makefile b/Makefile index 296830e54..98220e53c 100644 --- a/Makefile +++ b/Makefile @@ -146,7 +146,7 @@ testcommon: | build deps ############# ## Waku v2 ## ############# -.PHONY: testwaku2 wakunode2 example2 sim2 scripts2 wakubridge chat2 chat2bridge +.PHONY: testwaku2 wakunode2 testwakunode2 example2 sim2 scripts2 wakubridge testbridge chat2 chat2bridge testwaku2: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ @@ -156,6 +156,10 @@ wakunode2: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims +testwakunode2: | build deps librln + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim testwakunode2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims + example2: | build deps echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim example2 $(NIM_PARAMS) waku.nims @@ -174,6 +178,10 @@ wakubridge: | build deps echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim bridge $(NIM_PARAMS) waku.nims +testbridge: | build deps librln + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim testbridge $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims + chat2: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim new file mode 100644 index 000000000..2e9c0d155 --- /dev/null +++ b/apps/wakunode2/app.nim @@ -0,0 +1,771 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[options, strutils, sequtils], + stew/results, + chronicles, + chronos, + libp2p/crypto/crypto, + libp2p/nameresolving/dnsresolver, + libp2p/protocols/pubsub/gossipsub, + libp2p/peerid, + eth/keys, + eth/net/nat, + json_rpc/rpcserver, + presto +import + ../../waku/common/sqlite, + ../../waku/v2/waku_core, + ../../waku/v2/waku_node, + ../../waku/v2/node/waku_metrics, + ../../waku/v2/node/peer_manager, + ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, + ../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, + ../../waku/v2/waku_archive, + ../../waku/v2/waku_archive/driver/queue_driver, + ../../waku/v2/waku_archive/driver/sqlite_driver, + ../../waku/v2/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations, + ../../waku/v2/waku_archive/retention_policy, + ../../waku/v2/waku_archive/retention_policy/retention_policy_capacity, + ../../waku/v2/waku_archive/retention_policy/retention_policy_time, + ../../waku/v2/waku_dnsdisc, + ../../waku/v2/waku_enr, + ../../waku/v2/waku_discv5, + ../../waku/v2/waku_peer_exchange, + ../../waku/v2/waku_relay/validators, + ../../waku/v2/waku_store, + ../../waku/v2/waku_lightpush, + ../../waku/v2/waku_filter, + ./config + +when defined(rln): + import ../../waku/v2/waku_rln_relay + +logScope: + topics = "wakunode app" + + +# Git version in git describe format (defined at compile time) +const git_version* {.strdefine.} = "n/a" + +type + App* = object + version: string + conf: WakuNodeConf + + rng: ref HmacDrbgContext + peerStore: Option[WakuPeerStorage] + archiveDriver: Option[ArchiveDriver] + archiveRetentionPolicy: Option[RetentionPolicy] + dynamicBootstrapNodes: seq[RemotePeerInfo] + + node: WakuNode + + rpcServer: Option[RpcHttpServer] + restServer: Option[RestServerRef] + + AppResult*[T] = Result[T, string] + + +func node*(app: App): WakuNode = + app.node + +func version*(app: App): string = + app.version + + +## Initialisation + +proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T = + App(version: git_version, conf: conf, rng: rng, node: nil) + + +## SQLite database + +proc setupDatabaseConnection(dbUrl: string): AppResult[Option[SqliteDatabase]] = + ## dbUrl mimics SQLAlchemy Database URL schema + ## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls + if dbUrl == "" or dbUrl == "none": + return ok(none(SqliteDatabase)) + + let dbUrlParts = dbUrl.split("://", 1) + let + engine = dbUrlParts[0] + path = dbUrlParts[1] + + let connRes = case engine + of "sqlite": + # SQLite engine + # See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite + SqliteDatabase.new(path) + + else: + return err("unknown database engine") + + if connRes.isErr(): + return err("failed to init database connection: " & connRes.error) + + ok(some(connRes.value)) + + +## Peer persistence + +const PeerPersistenceDbUrl = "sqlite://peers.db" + +proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] = + let db = ?setupDatabaseConnection(PeerPersistenceDbUrl) + + ?peer_store_sqlite_migrations.migrate(db.get()) + + let res = WakuPeerStorage.new(db.get()) + if res.isErr(): + return err("failed to init peer store" & res.error) + + ok(some(res.value)) + +proc setupPeerPersistence*(app: var App): AppResult[void] = + if not app.conf.peerPersistence: + return ok() + + let peerStoreRes = setupPeerStorage() + if peerStoreRes.isErr(): + return err("failed to setup peer store" & peerStoreRes.error) + + app.peerStore = peerStoreRes.get() + + ok() + + +## Waku archive + +proc gatherSqlitePageStats(db: SqliteDatabase): AppResult[(int64, int64, int64)] = + let + pageSize = ?db.getPageSize() + pageCount = ?db.getPageCount() + freelistCount = ?db.getFreelistCount() + + ok((pageSize, pageCount, freelistCount)) + +proc performSqliteVacuum(db: SqliteDatabase): AppResult[void] = + ## SQLite database vacuuming + # TODO: Run vacuuming conditionally based on database page stats + # if (pageCount > 0 and freelistCount > 0): + + debug "starting sqlite database vacuuming" + + let resVacuum = db.vacuum() + if resVacuum.isErr(): + return err("failed to execute vacuum: " & resVacuum.error) + + debug "finished sqlite database vacuuming" + +proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): AppResult[Option[RetentionPolicy]] = + if retentionPolicy == "" or retentionPolicy == "none": + return ok(none(RetentionPolicy)) + + let rententionPolicyParts = retentionPolicy.split(":", 1) + let + policy = rententionPolicyParts[0] + policyArgs = rententionPolicyParts[1] + + + if policy == "time": + var retentionTimeSeconds: int64 + try: + retentionTimeSeconds = parseInt(policyArgs) + except ValueError: + return err("invalid time retention policy argument") + + let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds) + return ok(some(retPolicy)) + + elif policy == "capacity": + var retentionCapacity: int + try: + retentionCapacity = parseInt(policyArgs) + except ValueError: + return err("invalid capacity retention policy argument") + + let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) + return ok(some(retPolicy)) + + else: + return err("unknown retention policy") + +proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): AppResult[ArchiveDriver] = + let db = ?setupDatabaseConnection(dbUrl) + + if db.isSome(): + # SQLite vacuum + # TODO: Run this only if the database engine is SQLite + let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get()) + debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount + + if vacuum and (pageCount > 0 and freelistCount > 0): + ?performSqliteVacuum(db.get()) + + # Database migration + if migrate: + ?archive_driver_sqlite_migrations.migrate(db.get()) + + if db.isSome(): + debug "setting up sqlite waku archive driver" + let res = SqliteDriver.new(db.get()) + if res.isErr(): + return err("failed to init sqlite archive driver: " & res.error) + + ok(res.value) + + else: + debug "setting up in-memory waku archive driver" + let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages + ok(driver) + +proc setupWakuArchive*(app: var App): AppResult[void] = + if not app.conf.store: + return ok() + + # Message storage + let dbUrlValidationRes = validateDbUrl(app.conf.storeMessageDbUrl) + if dbUrlValidationRes.isErr(): + return err("failed to configure the message store database connection: " & dbUrlValidationRes.error) + + let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(), + vacuum = app.conf.storeMessageDbVacuum, + migrate = app.conf.storeMessageDbMigration) + if archiveDriverRes.isOk(): + app.archiveDriver = some(archiveDriverRes.get()) + else: + return err("failed to configure archive driver: " & archiveDriverRes.error) + + # Message store retention policy + let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(app.conf.storeMessageRetentionPolicy) + if storeMessageRetentionPolicyRes.isErr(): + return err("failed to configure the message retention policy: " & storeMessageRetentionPolicyRes.error) + + let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get()) + if archiveRetentionPolicyRes.isOk(): + app.archiveRetentionPolicy = archiveRetentionPolicyRes.get() + else: + return err("failed to configure the message retention policy: " & archiveRetentionPolicyRes.error) + + # TODO: Move retention policy execution here + # if archiveRetentionPolicy.isSome(): + # executeMessageRetentionPolicy(node) + # startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval) + + ok() + +## Retrieve dynamic bootstrap nodes (DNS discovery) + +proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): AppResult[seq[RemotePeerInfo]] = + + if dnsDiscovery and dnsDiscoveryUrl != "": + # DNS discovery + debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl + + var nameServers: seq[TransportAddress] + for ip in dnsDiscoveryNameServers: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + let dnsResolver = DnsResolver.new(nameServers) + + proc resolver(domain: string): Future[string] {.async, gcsafe.} = + trace "resolving", domain=domain + let resolved = await dnsResolver.resolveTxt(domain) + return resolved[0] # Use only first answer + + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) + if wakuDnsDiscovery.isOk(): + return wakuDnsDiscovery.get().findPeers() + .mapErr(proc (e: cstring): string = $e) + else: + warn "Failed to init Waku DNS discovery" + + debug "No method for retrieving dynamic bootstrap nodes specified." + ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default + +proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] = + let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(app.conf.dnsDiscovery, + app.conf.dnsDiscoveryUrl, + app.conf.dnsDiscoveryNameServers) + if dynamicBootstrapNodesRes.isOk(): + app.dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() + else: + warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error + + ok() + + +## Init waku node instance + +proc setupNat(natConf, clientId: string, tcpPort, udpPort: Port): + AppResult[tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], udpPort: Option[Port]]] {.gcsafe.} = + + let strategy = case natConf.toLowerAscii(): + of "any": NatAny + of "none": NatNone + of "upnp": NatUpnp + of "pmp": NatPmp + else: NatNone + + var endpoint: tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], udpPort: Option[Port]] + + if strategy != NatNone: + let extIp = getExternalIP(strategy) + if extIP.isSome(): + endpoint.ip = some(ValidIpAddress.init(extIp.get())) + # RedirectPorts in considered a gcsafety violation + # because it obtains the address of a non-gcsafe proc? + var extPorts: Option[(Port, Port)] + try: + extPorts = ({.gcsafe.}: redirectPorts(tcpPort = tcpPort, + udpPort = udpPort, + description = clientId)) + except CatchableError: + # TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now. + error "unable to determine external ports" + extPorts = none((Port, Port)) + + if extPorts.isSome(): + let (extTcpPort, extUdpPort) = extPorts.get() + endpoint.tcpPort = some(extTcpPort) + endpoint.udpPort = some(extUdpPort) + + else: # NatNone + if not natConf.startsWith("extip:"): + return err("not a valid NAT mechanism: " & $natConf) + + try: + # any required port redirection is assumed to be done by hand + endpoint.ip = some(ValidIpAddress.init(natConf[6..^1])) + except ValueError: + return err("not a valid IP address: " & $natConf[6..^1]) + + return ok(endpoint) + +proc initNode(conf: WakuNodeConf, + rng: ref HmacDrbgContext, + peerStore: Option[WakuPeerStorage], + dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): AppResult[WakuNode] = + + ## Setup a basic Waku v2 node based on a supplied configuration + ## file. Optionally include persistent peer storage. + ## No protocols are mounted yet. + + var dnsResolver: DnsResolver + if conf.dnsAddrs: + # Support for DNS multiaddrs + var nameServers: seq[TransportAddress] + for ip in conf.dnsAddrsNameServers: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + dnsResolver = DnsResolver.new(nameServers) + + let + nodekey = if conf.nodekey.isSome(): + conf.nodekey.get() + else: + let nodekeyRes = crypto.PrivateKey.random(Secp256k1, rng[]) + if nodekeyRes.isErr(): + return err("failed to generate nodekey: " & $nodekeyRes.error) + nodekeyRes.get() + + + ## `udpPort` is only supplied to satisfy underlying APIs but is not + ## actually a supported transport for libp2p traffic. + let udpPort = conf.tcpPort + let natRes = setupNat(conf.nat, clientId, + Port(uint16(conf.tcpPort) + conf.portsShift), + Port(uint16(udpPort) + conf.portsShift)) + if natRes.isErr(): + return err("failed to setup NAT: " & $natRes.error) + + let (extIp, extTcpPort, _) = natRes.get() + + + let + dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName) + else: none(string) + + discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift)) + else: none(Port) + + ## TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably + ## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual + ## config, the external port is the same as the bind port. + extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone(): + some(Port(uint16(conf.tcpPort) + conf.portsShift)) + else: + extTcpPort + extMultiAddrs = if (conf.extMultiAddrs.len > 0): + let extMultiAddrsValidationRes = validateExtMultiAddrs(conf.extMultiAddrs) + if extMultiAddrsValidationRes.isErr(): + return err("invalid external multiaddress: " & extMultiAddrsValidationRes.error) + else: + extMultiAddrsValidationRes.get() + else: + @[] + + wakuFlags = CapabilitiesBitfield.init( + lightpush = conf.lightpush, + filter = conf.filter, + store = conf.store, + relay = conf.relay + ) + + var node: WakuNode + + let pStorage = if peerStore.isNone(): nil + else: peerStore.get() + + let rng = crypto.newRng() + # Wrap in none because NetConfig does not have a default constructor + # TODO: We could change bindIp in NetConfig to be something less restrictive than ValidIpAddress, + # which doesn't allow default construction + let netConfigRes = NetConfig.init( + bindIp = conf.listenAddress, + bindPort = Port(uint16(conf.tcpPort) + conf.portsShift), + extIp = extIp, + extPort = extPort, + extMultiAddrs = extMultiAddrs, + wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), + wsEnabled = conf.websocketSupport, + wssEnabled = conf.websocketSecureSupport, + dns4DomainName = dns4DomainName, + discv5UdpPort = discv5UdpPort, + wakuFlags = some(wakuFlags), + ) + if netConfigRes.isErr(): + return err("failed to create net config instance: " & netConfigRes.error) + + let netConfig = netConfigRes.get() + var wakuDiscv5 = none(WakuDiscoveryV5) + + if conf.discv5Discovery: + let dynamicBootstrapEnrs = dynamicBootstrapNodes + .filterIt(it.hasUdpPort()) + .mapIt(it.enr.get()) + var discv5BootstrapEnrs: seq[enr.Record] + # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq + for enrUri in conf.discv5BootstrapNodes: + addBootstrapNode(enrUri, discv5BootstrapEnrs) + discv5BootstrapEnrs.add(dynamicBootstrapEnrs) + let discv5Config = DiscoveryConfig.init(conf.discv5TableIpLimit, + conf.discv5BucketIpLimit, + conf.discv5BitsPerHop) + try: + wakuDiscv5 = some(WakuDiscoveryV5.new( + extIp = netConfig.extIp, + extTcpPort = netConfig.extPort, + extUdpPort = netConfig.discv5UdpPort, + bindIp = netConfig.bindIp, + discv5UdpPort = netConfig.discv5UdpPort.get(), + bootstrapEnrs = discv5BootstrapEnrs, + enrAutoUpdate = conf.discv5EnrAutoUpdate, + privateKey = keys.PrivateKey(nodekey.skkey), + flags = netConfig.wakuFlags.get(), + multiaddrs = netConfig.enrMultiaddrs, + rng = rng, + discv5Config = discv5Config, + )) + except CatchableError: + return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg()) + + # Build waku node instance + var builder = WakuNodeBuilder.init() + builder.withRng(rng) + builder.withNodeKey(nodekey) + builder.withNetworkConfiguration(netConfig) + builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity) + builder.withSwitchConfiguration( + maxConnections = some(conf.maxConnections.int), + secureKey = some(conf.websocketSecureKeyPath), + secureCert = some(conf.websocketSecureCertPath), + nameResolver = dnsResolver, + sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled + agentString = some(conf.agentString) + ) + builder.withWakuDiscv5(wakuDiscv5.get(nil)) + + node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err) + + ok(node) + +proc setupWakuNode*(app: var App): AppResult[void] = + ## Waku node + let initNodeRes = initNode(app.conf, app.rng, app.peerStore, app.dynamicBootstrapNodes) + if initNodeRes.isErr(): + return err("failed to init node: " & initNodeRes.error) + + app.node = initNodeRes.get() + ok() + + +## Mount protocols + +proc setupProtocols(node: WakuNode, conf: WakuNodeConf, + archiveDriver: Option[ArchiveDriver], + archiveRetentionPolicy: Option[RetentionPolicy]): Future[AppResult[void]] {.async.} = + ## Setup configured protocols on an existing Waku v2 node. + ## Optionally include persistent message storage. + ## No protocols are started yet. + + # Mount relay on all nodes + var peerExchangeHandler = none(RoutingRecordsHandler) + if conf.relayPeerExchange: + proc handlePeerExchange(peer: PeerId, topic: string, + peers: seq[RoutingRecordsPair]) {.gcsafe.} = + ## Handle peers received via gossipsub peer exchange + # TODO: Only consider peers on pubsub topics we subscribe to + let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records + .mapIt(toRemotePeerInfo(it.record.get())) + + debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len + + # asyncSpawn, as we don't want to block here + asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange") + + peerExchangeHandler = some(handlePeerExchange) + + if conf.relay: + try: + let pubsubTopics = conf.topics.split(" ") + await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler) + except CatchableError: + return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) + + # TODO: Get this from cli + var topicsPublicKeys = initTable[string, SkPublicKey]() + # Add validation keys to protected topics + for topic, publicKey in topicsPublicKeys.pairs: + info "routing only signed traffic", topic=topic, publicKey=publicKey + node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topic), publicKey) + + + # Keepalive mounted on all nodes + try: + await mountLibp2pPing(node) + except CatchableError: + return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) + + when defined(rln): + if conf.rlnRelay: + + let rlnConf = WakuRlnConfig( + rlnRelayDynamic: conf.rlnRelayDynamic, + rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic, + rlnRelayContentTopic: conf.rlnRelayContentTopic, + rlnRelayMembershipIndex: some(conf.rlnRelayMembershipIndex), + rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, + rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress, + rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey, + rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress, + rlnRelayCredPath: conf.rlnRelayCredPath, + rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword + ) + + try: + await node.mountRlnRelay(rlnConf) + except CatchableError: + return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) + + if conf.store: + # Archive setup + let messageValidator: MessageValidator = DefaultMessageValidator() + mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy) + + # Store setup + try: + await mountStore(node) + except CatchableError: + return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) + + # TODO: Move this to storage setup phase + if archiveRetentionPolicy.isSome(): + executeMessageRetentionPolicy(node) + startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval) + + mountStoreClient(node) + if conf.storenode != "": + let storeNode = parsePeerInfo(conf.storenode) + if storeNode.isOk(): + node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec) + else: + return err("failed to set node waku store peer: " & storeNode.error) + + # NOTE Must be mounted after relay + if conf.lightpush: + try: + await mountLightPush(node) + except CatchableError: + return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) + + if conf.lightpushnode != "": + let lightPushNode = parsePeerInfo(conf.lightpushnode) + if lightPushNode.isOk(): + mountLightPushClient(node) + node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) + else: + return err("failed to set node waku lightpush peer: " & lightPushNode.error) + + # Filter setup. NOTE Must be mounted after relay + if conf.filter: + try: + await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) + except CatchableError: + return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) + + if conf.filternode != "": + let filterNode = parsePeerInfo(conf.filternode) + if filterNode.isOk(): + await mountFilterClient(node) + node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec) + else: + return err("failed to set node waku filter peer: " & filterNode.error) + + # waku peer exchange setup + if conf.peerExchangeNode != "" or conf.peerExchange: + try: + await mountPeerExchange(node) + except CatchableError: + return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) + + if conf.peerExchangeNode != "": + let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode) + if peerExchangeNode.isOk(): + node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec) + else: + return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) + + return ok() + +proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} = + return await setupProtocols( + app.node, + app.conf, + app.archiveDriver, + app.archiveRetentionPolicy + ) + + +## Start node + +proc startNode(node: WakuNode, conf: WakuNodeConf, + dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[AppResult[void]] {.async.} = + ## Start a configured node and all mounted protocols. + ## Connect to static nodes and start + ## keep-alive, if configured. + + # Start Waku v2 node + try: + await node.start() + except CatchableError: + return err("failed to start waku node: " & getCurrentExceptionMsg()) + + # Start discv5 and connect to discovered nodes + if conf.discv5Discovery: + try: + if not await node.startDiscv5(): + error "could not start Discovery v5" + except CatchableError: + return err("failed to start waku discovery v5: " & getCurrentExceptionMsg()) + + # Connect to configured static nodes + if conf.staticnodes.len > 0: + try: + await connectToNodes(node, conf.staticnodes, "static") + except CatchableError: + return err("failed to connect to static nodes: " & getCurrentExceptionMsg()) + + if dynamicBootstrapNodes.len > 0: + info "Connecting to dynamic bootstrap peers" + try: + await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap") + except CatchableError: + return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) + + # retrieve px peers and add the to the peer store + if conf.peerExchangeNode != "": + let desiredOutDegree = node.wakuRelay.parameters.d.uint64() + await node.fetchPeerExchangePeers(desiredOutDegree) + + # Start keepalive, if enabled + if conf.keepAlive: + node.startKeepalive() + + # Maintain relay connections + if conf.relay: + node.peerManager.start() + + return ok() + +proc startNode*(app: App): Future[AppResult[void]] {.async.} = + return await startNode( + app.node, + app.conf, + app.dynamicBootstrapNodes + ) + + +## Monitoring and external interfaces + +# TODO: Merge the `wakunode_setup_*.nim` files here. Once the encapsulating +# type (e.g., App) is implemented. Hold both servers instances to support +# a graceful shutdown. +import + ./wakunode2_setup_rpc, + ./wakunode2_setup_rest + + +proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): AppResult[void] = + startMetricsServer(address, Port(port + portsShift)) + ok() + +proc startMetricsLogging(): AppResult[void] = + startMetricsLog() + ok() + +proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] = + if app.conf.rpc: + let startRpcServerRes = startRpcServer(app.node, app.conf.rpcAddress, app.conf.rpcPort, app.conf.portsShift, app.conf) + if startRpcServerRes.isErr(): + error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error + else: + app.rpcServer = some(startRpcServerRes.value) + + if app.conf.rest: + let startRestServerRes = startRestServer(app.node, app.conf.restAddress, app.conf.restPort, app.conf.portsShift, app.conf) + if startRestServerRes.isErr(): + error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error + else: + app.restServer = some(startRestServerRes.value) + + + if app.conf.metricsServer: + let startMetricsServerRes = startMetricsServer(app.node, app.conf.metricsServerAddress, app.conf.metricsServerPort, app.conf.portsShift) + if startMetricsServerRes.isErr(): + error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error + + if app.conf.metricsLogging: + let startMetricsLoggingRes = startMetricsLogging() + if startMetricsLoggingRes.isErr(): + error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error + + ok() + + +# App shutdown + +proc stop*(app: App): Future[void] {.async.} = + if app.restServer.isSome(): + await app.restServer.get().stop() + + if app.rpcServer.isSome(): + await app.rpcServer.get().stop() + + if not app.node.isNil(): + await app.node.stop() diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index be78a8844..767e97227 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -4,629 +4,21 @@ else: {.push raises: [].} import - std/[options, tables, strutils, sequtils, os], + std/[options, strutils, os], stew/shims/net as stewNet, chronicles, chronos, metrics, libbacktrace, system/ansi_c, - eth/keys, - eth/net/nat, - eth/p2p/discoveryv5/enr, - libp2p/builders, - libp2p/multihash, - libp2p/crypto/crypto, - libp2p/protocols/ping, - libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/pubsub/rpc/messages, - libp2p/transports/wstransport, - libp2p/nameresolving/dnsresolver + libp2p/crypto/crypto import - ../../waku/common/sqlite, ../../waku/common/logging, - ../../waku/v2/node/peer_manager, - ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, - ../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, - ../../waku/v2/waku_core, - ../../waku/v2/waku_node, - ../../waku/v2/node/waku_metrics, - ../../waku/v2/waku_archive, - ../../waku/v2/waku_archive/driver/queue_driver, - ../../waku/v2/waku_archive/driver/sqlite_driver, - ../../waku/v2/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations, - ../../waku/v2/waku_archive/retention_policy, - ../../waku/v2/waku_archive/retention_policy/retention_policy_capacity, - ../../waku/v2/waku_archive/retention_policy/retention_policy_time, - ../../waku/v2/waku_store, - ../../waku/v2/waku_filter, - ../../waku/v2/waku_lightpush, - ../../waku/v2/waku_enr, - ../../waku/v2/waku_dnsdisc, - ../../waku/v2/waku_discv5, - ../../waku/v2/waku_peer_exchange, - ../../waku/v2/waku_relay/validators, - ./wakunode2_setup_rest, - ./wakunode2_setup_rpc, - ./config - -when defined(rln): - import - ../../waku/v2/waku_rln_relay - + ./config, + ./app logScope: - topics = "wakunode" - - -type SetupResult[T] = Result[T, string] - - -proc setupDatabaseConnection(dbUrl: string): SetupResult[Option[SqliteDatabase]] = - ## dbUrl mimics SQLAlchemy Database URL schema - ## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls - if dbUrl == "" or dbUrl == "none": - return ok(none(SqliteDatabase)) - - let dbUrlParts = dbUrl.split("://", 1) - let - engine = dbUrlParts[0] - path = dbUrlParts[1] - - let connRes = case engine - of "sqlite": - # SQLite engine - # See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite - SqliteDatabase.new(path) - - else: - return err("unknown database engine") - - if connRes.isErr(): - return err("failed to init database connection: " & connRes.error) - - ok(some(connRes.value)) - -proc gatherSqlitePageStats(db: SqliteDatabase): SetupResult[(int64, int64, int64)] = - let - pageSize = ?db.getPageSize() - pageCount = ?db.getPageCount() - freelistCount = ?db.getFreelistCount() - - ok((pageSize, pageCount, freelistCount)) - -proc performSqliteVacuum(db: SqliteDatabase): SetupResult[void] = - ## SQLite database vacuuming - # TODO: Run vacuuming conditionally based on database page stats - # if (pageCount > 0 and freelistCount > 0): - - debug "starting sqlite database vacuuming" - - let resVacuum = db.vacuum() - if resVacuum.isErr(): - return err("failed to execute vacuum: " & resVacuum.error) - - debug "finished sqlite database vacuuming" - - -const PeerPersistenceDbUrl = "sqlite://peers.db" - -proc setupPeerStorage(): SetupResult[Option[WakuPeerStorage]] = - let db = ?setupDatabaseConnection(PeerPersistenceDbUrl) - - ?peer_store_sqlite_migrations.migrate(db.get()) - - let res = WakuPeerStorage.new(db.get()) - if res.isErr(): - return err("failed to init peer store" & res.error) - - ok(some(res.value)) - - -proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): SetupResult[Option[RetentionPolicy]] = - if retentionPolicy == "" or retentionPolicy == "none": - return ok(none(RetentionPolicy)) - - let rententionPolicyParts = retentionPolicy.split(":", 1) - let - policy = rententionPolicyParts[0] - policyArgs = rententionPolicyParts[1] - - - if policy == "time": - var retentionTimeSeconds: int64 - try: - retentionTimeSeconds = parseInt(policyArgs) - except ValueError: - return err("invalid time retention policy argument") - - let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds) - return ok(some(retPolicy)) - - elif policy == "capacity": - var retentionCapacity: int - try: - retentionCapacity = parseInt(policyArgs) - except ValueError: - return err("invalid capacity retention policy argument") - - let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) - return ok(some(retPolicy)) - - else: - return err("unknown retention policy") - -proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): SetupResult[ArchiveDriver] = - let db = ?setupDatabaseConnection(dbUrl) - - if db.isSome(): - # SQLite vacuum - # TODO: Run this only if the database engine is SQLite - let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get()) - debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount - - if vacuum and (pageCount > 0 and freelistCount > 0): - ?performSqliteVacuum(db.get()) - - # Database migration - if migrate: - ?archive_driver_sqlite_migrations.migrate(db.get()) - - if db.isSome(): - debug "setting up sqlite waku archive driver" - let res = SqliteDriver.new(db.get()) - if res.isErr(): - return err("failed to init sqlite archive driver: " & res.error) - - ok(res.value) - - else: - debug "setting up in-memory waku archive driver" - let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages - ok(driver) - - -proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): SetupResult[seq[RemotePeerInfo]] = - - if dnsDiscovery and dnsDiscoveryUrl != "": - # DNS discovery - debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl - - var nameServers: seq[TransportAddress] - for ip in dnsDiscoveryNameServers: - nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 - - let dnsResolver = DnsResolver.new(nameServers) - - proc resolver(domain: string): Future[string] {.async, gcsafe.} = - trace "resolving", domain=domain - let resolved = await dnsResolver.resolveTxt(domain) - return resolved[0] # Use only first answer - - var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) - if wakuDnsDiscovery.isOk(): - return wakuDnsDiscovery.get().findPeers() - .mapErr(proc (e: cstring): string = $e) - else: - warn "Failed to init Waku DNS discovery" - - debug "No method for retrieving dynamic bootstrap nodes specified." - ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default - -proc setupNat(natConf, clientId: string, tcpPort, udpPort: Port): - SetupResult[tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], udpPort: Option[Port]]] {.gcsafe.} = - - let strategy = case natConf.toLowerAscii(): - of "any": NatAny - of "none": NatNone - of "upnp": NatUpnp - of "pmp": NatPmp - else: NatNone - - var endpoint: tuple[ip: Option[ValidIpAddress], tcpPort: Option[Port], udpPort: Option[Port]] - - if strategy != NatNone: - let extIp = getExternalIP(strategy) - if extIP.isSome(): - endpoint.ip = some(ValidIpAddress.init(extIp.get())) - # RedirectPorts in considered a gcsafety violation - # because it obtains the address of a non-gcsafe proc? - var extPorts: Option[(Port, Port)] - try: - extPorts = ({.gcsafe.}: redirectPorts(tcpPort = tcpPort, - udpPort = udpPort, - description = clientId)) - except CatchableError: - # TODO: nat.nim Error: can raise an unlisted exception: Exception. Isolate here for now. - error "unable to determine external ports" - extPorts = none((Port, Port)) - - if extPorts.isSome(): - let (extTcpPort, extUdpPort) = extPorts.get() - endpoint.tcpPort = some(extTcpPort) - endpoint.udpPort = some(extUdpPort) - - else: # NatNone - if not natConf.startsWith("extip:"): - return err("not a valid NAT mechanism: " & $natConf) - - try: - # any required port redirection is assumed to be done by hand - endpoint.ip = some(ValidIpAddress.init(natConf[6..^1])) - except ValueError: - return err("not a valid IP address: " & $natConf[6..^1]) - - return ok(endpoint) - -proc initNode(conf: WakuNodeConf, - rng: ref HmacDrbgContext, - peerStore: Option[WakuPeerStorage], - dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] = - - ## Setup a basic Waku v2 node based on a supplied configuration - ## file. Optionally include persistent peer storage. - ## No protocols are mounted yet. - - var dnsResolver: DnsResolver - if conf.dnsAddrs: - # Support for DNS multiaddrs - var nameServers: seq[TransportAddress] - for ip in conf.dnsAddrsNameServers: - nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 - - dnsResolver = DnsResolver.new(nameServers) - - let - nodekey = if conf.nodekey.isSome(): - conf.nodekey.get() - else: - let nodekeyRes = crypto.PrivateKey.random(Secp256k1, rng[]) - if nodekeyRes.isErr(): - return err("failed to generate nodekey: " & $nodekeyRes.error) - nodekeyRes.get() - - - ## `udpPort` is only supplied to satisfy underlying APIs but is not - ## actually a supported transport for libp2p traffic. - let udpPort = conf.tcpPort - let natRes = setupNat(conf.nat, clientId, - Port(uint16(conf.tcpPort) + conf.portsShift), - Port(uint16(udpPort) + conf.portsShift)) - if natRes.isErr(): - return err("failed to setup NAT: " & $natRes.error) - - let (extIp, extTcpPort, _) = natRes.get() - - - let - dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName) - else: none(string) - - discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift)) - else: none(Port) - - ## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably - ## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual - ## config, the external port is the same as the bind port. - extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone(): - some(Port(uint16(conf.tcpPort) + conf.portsShift)) - else: - extTcpPort - extMultiAddrs = if (conf.extMultiAddrs.len > 0): - let extMultiAddrsValidationRes = validateExtMultiAddrs(conf.extMultiAddrs) - if extMultiAddrsValidationRes.isErr(): - return err("invalid external multiaddress: " & extMultiAddrsValidationRes.error) - else: - extMultiAddrsValidationRes.get() - else: - @[] - - wakuFlags = CapabilitiesBitfield.init( - lightpush = conf.lightpush, - filter = conf.filter, - store = conf.store, - relay = conf.relay - ) - - var node: WakuNode - - let pStorage = if peerStore.isNone(): nil - else: peerStore.get() - - let rng = crypto.newRng() - # Wrap in none because NetConfig does not have a default constructor - # TODO: We could change bindIp in NetConfig to be something less restrictive than ValidIpAddress, - # which doesn't allow default construction - let netConfigRes = NetConfig.init( - bindIp = conf.listenAddress, - bindPort = Port(uint16(conf.tcpPort) + conf.portsShift), - extIp = extIp, - extPort = extPort, - extMultiAddrs = extMultiAddrs, - wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), - wsEnabled = conf.websocketSupport, - wssEnabled = conf.websocketSecureSupport, - dns4DomainName = dns4DomainName, - discv5UdpPort = discv5UdpPort, - wakuFlags = some(wakuFlags), - ) - if netConfigRes.isErr(): - return err("failed to create net config instance: " & netConfigRes.error) - - let netConfig = netConfigRes.get() - var wakuDiscv5 = none(WakuDiscoveryV5) - - if conf.discv5Discovery: - let dynamicBootstrapEnrs = dynamicBootstrapNodes - .filterIt(it.hasUdpPort()) - .mapIt(it.enr.get()) - var discv5BootstrapEnrs: seq[enr.Record] - # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq - for enrUri in conf.discv5BootstrapNodes: - addBootstrapNode(enrUri, discv5BootstrapEnrs) - discv5BootstrapEnrs.add(dynamicBootstrapEnrs) - let discv5Config = DiscoveryConfig.init(conf.discv5TableIpLimit, - conf.discv5BucketIpLimit, - conf.discv5BitsPerHop) - try: - wakuDiscv5 = some(WakuDiscoveryV5.new( - extIp = netConfig.extIp, - extTcpPort = netConfig.extPort, - extUdpPort = netConfig.discv5UdpPort, - bindIp = netConfig.bindIp, - discv5UdpPort = netConfig.discv5UdpPort.get(), - bootstrapEnrs = discv5BootstrapEnrs, - enrAutoUpdate = conf.discv5EnrAutoUpdate, - privateKey = keys.PrivateKey(nodekey.skkey), - flags = netConfig.wakuFlags.get(), - multiaddrs = netConfig.enrMultiaddrs, - rng = rng, - discv5Config = discv5Config, - )) - except CatchableError: - return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg()) - - # Build waku node instance - var builder = WakuNodeBuilder.init() - builder.withRng(rng) - builder.withNodeKey(nodekey) - builder.withNetworkConfiguration(netConfig) - builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity) - builder.withSwitchConfiguration( - maxConnections = some(conf.maxConnections.int), - secureKey = some(conf.websocketSecureKeyPath), - secureCert = some(conf.websocketSecureCertPath), - nameResolver = dnsResolver, - sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled - agentString = some(conf.agentString) - ) - builder.withWakuDiscv5(wakuDiscv5.get(nil)) - - node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err) - - ok(node) - -proc setupProtocols(node: WakuNode, conf: WakuNodeConf, - archiveDriver: Option[ArchiveDriver], - archiveRetentionPolicy: Option[RetentionPolicy]): Future[SetupResult[void]] {.async.} = - ## Setup configured protocols on an existing Waku v2 node. - ## Optionally include persistent message storage. - ## No protocols are started yet. - - # Mount relay on all nodes - var peerExchangeHandler = none(RoutingRecordsHandler) - if conf.relayPeerExchange: - proc handlePeerExchange(peer: PeerId, topic: string, - peers: seq[RoutingRecordsPair]) {.gcsafe.} = - ## Handle peers received via gossipsub peer exchange - # TODO: Only consider peers on pubsub topics we subscribe to - let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records - .mapIt(toRemotePeerInfo(it.record.get())) - - debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len - - # asyncSpawn, as we don't want to block here - asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange") - - peerExchangeHandler = some(handlePeerExchange) - - if conf.relay: - try: - let pubsubTopics = conf.topics.split(" ") - await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler) - except CatchableError: - return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) - - # TODO: Get this from cli - var topicsPublicKeys = initTable[string, SkPublicKey]() - # Add validation keys to protected topics - for topic, publicKey in topicsPublicKeys.pairs: - info "routing only signed traffic", topic=topic, publicKey=publicKey - node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topic), publicKey) - - - # Keepalive mounted on all nodes - try: - await mountLibp2pPing(node) - except CatchableError: - return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) - - when defined(rln): - if conf.rlnRelay: - - let rlnConf = WakuRlnConfig( - rlnRelayDynamic: conf.rlnRelayDynamic, - rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic, - rlnRelayContentTopic: conf.rlnRelayContentTopic, - rlnRelayMembershipIndex: some(conf.rlnRelayMembershipIndex), - rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, - rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress, - rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey, - rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress, - rlnRelayCredPath: conf.rlnRelayCredPath, - rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword - ) - - try: - await node.mountRlnRelay(rlnConf) - except CatchableError: - return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) - - if conf.store: - # Archive setup - let messageValidator: MessageValidator = DefaultMessageValidator() - mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy) - - # Store setup - try: - await mountStore(node) - except CatchableError: - return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) - - # TODO: Move this to storage setup phase - if archiveRetentionPolicy.isSome(): - executeMessageRetentionPolicy(node) - startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval) - - mountStoreClient(node) - if conf.storenode != "": - let storeNode = parsePeerInfo(conf.storenode) - if storeNode.isOk(): - node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec) - else: - return err("failed to set node waku store peer: " & storeNode.error) - - # NOTE Must be mounted after relay - if conf.lightpush: - try: - await mountLightPush(node) - except CatchableError: - return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) - - if conf.lightpushnode != "": - let lightPushNode = parsePeerInfo(conf.lightpushnode) - if lightPushNode.isOk(): - mountLightPushClient(node) - node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) - else: - return err("failed to set node waku lightpush peer: " & lightPushNode.error) - - # Filter setup. NOTE Must be mounted after relay - if conf.filter: - try: - await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) - except CatchableError: - return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) - - if conf.filternode != "": - let filterNode = parsePeerInfo(conf.filternode) - if filterNode.isOk(): - await mountFilterClient(node) - node.peerManager.addServicePeer(filterNode.value, WakuFilterCodec) - else: - return err("failed to set node waku filter peer: " & filterNode.error) - - # waku peer exchange setup - if (conf.peerExchangeNode != "") or (conf.peerExchange): - try: - await mountPeerExchange(node) - except CatchableError: - return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) - - if conf.peerExchangeNode != "": - let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode) - if peerExchangeNode.isOk(): - node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec) - else: - return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) - - return ok() - -proc startNode(node: WakuNode, conf: WakuNodeConf, - dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[SetupResult[void]] {.async.} = - ## Start a configured node and all mounted protocols. - ## Connect to static nodes and start - ## keep-alive, if configured. - - # Start Waku v2 node - try: - await node.start() - except CatchableError: - return err("failed to start waku node: " & getCurrentExceptionMsg()) - - # Start discv5 and connect to discovered nodes - if conf.discv5Discovery: - try: - if not await node.startDiscv5(): - error "could not start Discovery v5" - except CatchableError: - return err("failed to start waku discovery v5: " & getCurrentExceptionMsg()) - - # Connect to configured static nodes - if conf.staticnodes.len > 0: - try: - await connectToNodes(node, conf.staticnodes, "static") - except CatchableError: - return err("failed to connect to static nodes: " & getCurrentExceptionMsg()) - - if dynamicBootstrapNodes.len > 0: - info "Connecting to dynamic bootstrap peers" - try: - await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap") - except CatchableError: - return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) - - # retrieve px peers and add the to the peer store - if conf.peerExchangeNode != "": - let desiredOutDegree = node.wakuRelay.parameters.d.uint64() - await node.fetchPeerExchangePeers(desiredOutDegree) - - # Start keepalive, if enabled - if conf.keepAlive: - node.startKeepalive() - - # Maintain relay connections - if conf.relay: - node.peerManager.start() - - return ok() - -when defined(waku_exp_store_resume): - proc resumeMessageStore(node: WakuNode, address: string): Future[SetupResult[void]] {.async.} = - # Resume historical messages, this has to be called after the node has been started - if address != "": - return err("empty peer multiaddres") - - let remotePeer = parsePeerInfo(address) - if remotePeer.isErr(): - return err("invalid peer multiaddress: " & remotePeer.error) - - try: - await node.resume(some(@[remotePeer.value])) - except CatchableError: - return err("failed to resume messages history: " & getCurrentExceptionMsg()) - - -proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): SetupResult[void] = - try: - startRpcServer(node, address, Port(port + portsShift), conf) - except CatchableError: - return err("failed to start the json-rpc server: " & getCurrentExceptionMsg()) - - ok() - -proc startRestServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): SetupResult[void] = - startRestServer(node, address, Port(port + portsShift), conf) - ok() - -proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): SetupResult[void] = - startMetricsServer(address, Port(port + portsShift)) - ok() - - -proc startMetricsLogging(): SetupResult[void] = - startMetricsLog() - ok() + topics = "wakunode main" {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError @@ -639,7 +31,7 @@ when isMainModule: ## 5. Start monitoring tools and external interfaces ## 6. Setup graceful shutdown hooks - const versionString = "version / git commit hash: " & git_version + const versionString = "version / git commit hash: " & app.git_version let rng = crypto.newRng() let confRes = WakuNodeConf.load(version=versionString) @@ -659,6 +51,8 @@ when isMainModule: logging.setupLogFormat(conf.logFormat, color) + var wakunode2 = App.init(rng, conf) + ############## # Node setup # ############## @@ -666,122 +60,57 @@ when isMainModule: debug "1/7 Setting up storage" ## Peer persistence - var peerStore = none(WakuPeerStorage) - - if conf.peerPersistence: - let peerStoreRes = setupPeerStorage(); - if peerStoreRes.isOk(): - peerStore = peerStoreRes.get() - else: - error "failed to setup peer store", error=peerStoreRes.error - waku_node_errors.inc(labelValues = ["init_store_failure"]) + let res1 = wakunode2.setupPeerPersistence() + if res1.isErr(): + error "1/7 Setting up storage failed", error=res1.error + quit(QuitFailure) ## Waku archive - var archiveDriver = none(ArchiveDriver) - var archiveRetentionPolicy = none(RetentionPolicy) - - if conf.store: - # Message storage - let dbUrlValidationRes = validateDbUrl(conf.storeMessageDbUrl) - if dbUrlValidationRes.isErr(): - error "failed to configure the message store database connection", error=dbUrlValidationRes.error - quit(QuitFailure) - - let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(), vacuum=conf.storeMessageDbVacuum, migrate=conf.storeMessageDbMigration) - if archiveDriverRes.isOk(): - archiveDriver = some(archiveDriverRes.get()) - else: - error "failed to configure archive driver", error=archiveDriverRes.error - quit(QuitFailure) - - # Message store retention policy - let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(conf.storeMessageRetentionPolicy) - if storeMessageRetentionPolicyRes.isErr(): - error "invalid store message retention policy configuration", error=storeMessageRetentionPolicyRes.error - quit(QuitFailure) - - let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get()) - if archiveRetentionPolicyRes.isOk(): - archiveRetentionPolicy = archiveRetentionPolicyRes.get() - else: - error "failed to configure the message retention policy", error=archiveRetentionPolicyRes.error - quit(QuitFailure) - - # TODO: Move retention policy execution here - # if archiveRetentionPolicy.isSome(): - # executeMessageRetentionPolicy(node) - # startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval) - + let res2 = wakunode2.setupWakuArchive() + if res2.isErr(): + error "1/7 Setting up storage failed (waku archive)", error=res2.error + quit(QuitFailure) debug "2/7 Retrieve dynamic bootstrap nodes" - var dynamicBootstrapNodes: seq[RemotePeerInfo] - let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf.dnsDiscovery, conf.dnsDiscoveryUrl, conf.dnsDiscoveryNameServers) - if dynamicBootstrapNodesRes.isOk(): - dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() - else: - warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error + let res3 = wakunode2.setupDyamicBootstrapNodes() + if res3.isErr(): + error "2/7 Retrieving dynamic bootstrap nodes failed", error=res3.error + quit(QuitFailure) debug "3/7 Initializing node" - var node: WakuNode # This is the node we're going to setup using the conf - - let initNodeRes = initNode(conf, rng, peerStore, dynamicBootstrapNodes) - if initNodeRes.isok(): - node = initNodeRes.get() - else: - error "3/7 Initializing node failed. Quitting.", error=initNodeRes.error + let res4 = wakunode2.setupWakuNode() + if res4.isErr(): + error "3/7 Initializing node failed", error=res4.error quit(QuitFailure) debug "4/7 Mounting protocols" - let setupProtocolsRes = waitFor setupProtocols(node, conf, archiveDriver, archiveRetentionPolicy) - if setupProtocolsRes.isErr(): - error "4/7 Mounting protocols failed. Continuing in current state.", error=setupProtocolsRes.error + let res5 = waitFor wakunode2.setupAndMountProtocols() + if res5.isErr(): + error "4/7 Mounting protocols failed", error=res5.error + quit(QuitFailure) debug "5/7 Starting node and mounted protocols" - let startNodeRes = waitFor startNode(node, conf, dynamicBootstrapNodes) - if startNodeRes.isErr(): - error "5/7 Starting node and mounted protocols failed. Continuing in current state.", error=startNodeRes.error - - - when defined(waku_exp_store_resume): - # Resume message store on boot - if conf.storeResumePeer != "": - let resumeMessageStoreRes = waitFor resumeMessageStore(node, conf.storeResumePeer) - if resumeMessageStoreRes.isErr(): - error "failed to resume message store from peer node. Continuing in current state", error=resumeMessageStoreRes.error - + let res6 = waitFor wakunode2.startNode() + if res6.isErr(): + error "5/7 Starting node and protocols failed", error=res6.error + quit(QuitFailure) debug "6/7 Starting monitoring and external interfaces" - if conf.rpc: - let startRpcServerRes = startRpcServer(node, conf.rpcAddress, conf.rpcPort, conf.portsShift, conf) - if startRpcServerRes.isErr(): - error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error - - if conf.rest: - let startRestServerRes = startRestServer(node, conf.restAddress, conf.restPort, conf.portsShift, conf) - if startRestServerRes.isErr(): - error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error - - if conf.metricsServer: - let startMetricsServerRes = startMetricsServer(node, conf.metricsServerAddress, conf.metricsServerPort, conf.portsShift) - if startMetricsServerRes.isErr(): - error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error - - if conf.metricsLogging: - let startMetricsLoggingRes = startMetricsLogging() - if startMetricsLoggingRes.isErr(): - error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error - + let res7 = wakunode2.setupMonitoringAndExternalInterfaces() + if res7.isErr(): + error "6/7 Starting monitoring and external interfaces failed", error=res7.error + quit(QuitFailure) debug "7/7 Setting up shutdown hooks" ## Setup shutdown hooks for this process. ## Stop node gracefully on shutdown. - proc asyncStopper(node: WakuNode) {.async.} = + proc asyncStopper(node: App) {.async.} = await node.stop() quit(QuitSuccess) @@ -791,7 +120,7 @@ when isMainModule: # workaround for https://github.com/nim-lang/Nim/issues/4057 setupForeignThreadGc() notice "Shutting down after receiving SIGINT" - asyncSpawn asyncStopper(node) + asyncSpawn asyncStopper(wakunode2) setControlCHook(handleCtrlC) @@ -799,7 +128,7 @@ when isMainModule: when defined(posix): proc handleSigterm(signal: cint) {.noconv.} = notice "Shutting down after receiving SIGTERM" - asyncSpawn asyncStopper(node) + asyncSpawn asyncStopper(wakunode2) c_signal(ansi_c.SIGTERM, handleSigterm) @@ -812,7 +141,7 @@ when isMainModule: # Not available in -d:release mode writeStackTrace() - waitFor node.stop() + waitFor wakunode2.stop() quit(QuitFailure) c_signal(ansi_c.SIGSEGV, handleSigsegv) diff --git a/apps/wakunode2/wakunode2_setup_rest.nim b/apps/wakunode2/wakunode2_setup_rest.nim index 69e83fa56..072fd10f1 100644 --- a/apps/wakunode2/wakunode2_setup_rest.nim +++ b/apps/wakunode2/wakunode2_setup_rest.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + stew/results, stew/shims/net, chronicles, presto @@ -21,13 +22,8 @@ logScope: topics = "wakunode rest" -proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) = - let serverResult = newRestHttpServer(address, port) - if serverResult.isErr(): - notice "REST HTTP server could not be started", address = $address&":" & $port, reason = serverResult.error() - return - - let server = serverResult.get() +proc startRestServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): RestServerResult[RestServerRef] = + let server = ? newRestHttpServer(address, port) ## Debug REST API installDebugApiHandlers(server.router, node) @@ -42,3 +38,8 @@ proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: server.start() info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" + + ok(server) + +proc startRestServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): RestServerResult[RestServerRef] = + return startRestServer(node, address, Port(port + portsShift), conf) diff --git a/apps/wakunode2/wakunode2_setup_rpc.nim b/apps/wakunode2/wakunode2_setup_rpc.nim index 0ab654445..ea174261e 100644 --- a/apps/wakunode2/wakunode2_setup_rpc.nim +++ b/apps/wakunode2/wakunode2_setup_rpc.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + stew/results, stew/shims/net, chronicles, json_rpc/rpcserver @@ -20,13 +21,14 @@ import logScope: topics = "wakunode jsonrpc" +proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf): Result[RpcHttpServer, string] = + let ta = initTAddress(address, port) -proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: WakuNodeConf) - {.raises: [CatchableError].} = - - let - ta = initTAddress(address, port) + var server: RpcHttpServer + try: server = newRpcHttpServer([ta]) + except CatchableError: + return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg()) installDebugApiHandlers(node, server) @@ -49,3 +51,8 @@ proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: Port, conf: server.start() info "RPC Server started", address=ta + + ok(server) + +proc startRpcServer*(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): Result[RpcHttpServer, string] = + return startRpcServer(node, address, Port(port + portsShift), conf) diff --git a/tests/all_tests_wakunode2.nim b/tests/all_tests_wakunode2.nim new file mode 100644 index 000000000..9d5ae6230 --- /dev/null +++ b/tests/all_tests_wakunode2.nim @@ -0,0 +1,4 @@ +## Wakunode2 + +import + ./wakunode2/test_app diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim new file mode 100644 index 000000000..cdfe52da1 --- /dev/null +++ b/tests/wakunode2/test_app.nim @@ -0,0 +1,82 @@ +{.used.} + +import + stew/shims/net, + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto, + libp2p/crypto/secp, + libp2p/multiaddress, + libp2p/switch +import + ../../apps/wakunode2/config, + ../../apps/wakunode2/app, + ../v2/testlib/common, + ../v2/testlib/wakucore + + +proc defaultTestWakuNodeConf(): WakuNodeConf = + WakuNodeConf( + listenAddress: ValidIpAddress.init("127.0.0.1"), + rpcAddress: ValidIpAddress.init("127.0.0.1"), + restAddress: ValidIpAddress.init("127.0.0.1"), + metricsServerAddress: ValidIpAddress.init("127.0.0.1"), + nat: "any", + maxConnections: 50, + ) + + +suite "Wakunode2 - App": + test "compilation version should be reported": + ## Given + let conf = defaultTestWakuNodeConf() + + var wakunode2 = App.init(rng(), conf) + + ## When + let version = wakunode2.version + + ## Then + check: + version == app.git_version + + +suite "Wakunode2 - App initialization": + test "peer persistence setup should be successfully mounted": + ## Given + var conf = defaultTestWakuNodeConf() + conf.peerPersistence = true + + var wakunode2 = App.init(rng(), conf) + + ## When + let res = wakunode2.setupPeerPersistence() + + ## Then + check res.isOk() + + test "node setup is successful with default configuration": + ## Given + let conf = defaultTestWakuNodeConf() + + ## When + var wakunode2 = App.init(rng(), conf) + require wakunode2.setupPeerPersistence().isOk() + require wakunode2.setupWakuArchive().isOk() + require wakunode2.setupDyamicBootstrapNodes().isOk() + require wakunode2.setupWakuNode().isOk() + require isOk(waitFor wakunode2.setupAndMountProtocols()) + require isOk(waitFor wakunode2.startNode()) + require wakunode2.setupMonitoringAndExternalInterfaces().isOk() + + ## Then + let node = wakunode2.node + check: + not node.isNil() + node.wakuArchive.isNil() + node.wakuStore.isNil() + not node.wakuStoreClient.isNil() + + ## Cleanup + waitFor wakunode2.stop() diff --git a/waku.nimble b/waku.nimble index a391b120e..be57ec27e 100644 --- a/waku.nimble +++ b/waku.nimble @@ -70,7 +70,7 @@ task test1, "Build & run Waku v1 tests": ### Waku v2 tasks -task wakunode2, "Build Waku v2 (experimental) cli node": +task wakunode2, "Build Waku v2 cli node": let name = "wakunode2" buildBinary name, "apps/wakunode2/", "-d:chronicles_log_level=TRACE" @@ -78,6 +78,13 @@ task bridge, "Build Waku v1 - v2 bridge": let name = "wakubridge" buildBinary name, "apps/wakubridge/", "-d:chronicles_log_level=TRACE" + +task testwakunode2, "Build & run wakunode2 app tests": + test "all_tests_wakunode2" + +task testbridge, "Build & run wakubridge tests": + test "all_tests_wakubridge" + task test2, "Build & run Waku v2 tests": test "all_tests_v2"