when (NimMajor, NimMinor) < (1, 4): {.push raises: [Defect].} else: {.push raises: [].} import std/[options, strutils, sequtils], stew/results, chronicles, chronos, libp2p/wire, libp2p/multicodec, libp2p/crypto/crypto, libp2p/nameresolving/dnsresolver, libp2p/protocols/pubsub/gossipsub, libp2p/peerid, eth/keys, json_rpc/rpcserver, presto, metrics, metrics/chronos_httpserver import ../../waku/common/utils/nat, ../../waku/common/utils/parse_size_units, ../../waku/common/databases/db_sqlite, ../../waku/waku_archive/driver/builder, ../../waku/waku_archive/retention_policy/builder, ../../waku/waku_core, ../../waku/waku_node, ../../waku/node/waku_metrics, ../../waku/node/peer_manager, ../../waku/node/peer_manager/peer_store/waku_peer_storage, ../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../../waku/waku_api/message_cache, ../../waku/waku_api/handlers, ../../waku/waku_api/rest/server, ../../waku/waku_api/rest/debug/handlers as rest_debug_api, ../../waku/waku_api/rest/relay/handlers as rest_relay_api, ../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api, ../../waku/waku_api/rest/filter/handlers as rest_filter_api, ../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api, ../../waku/waku_api/rest/store/handlers as rest_store_api, ../../waku/waku_api/rest/health/handlers as rest_health_api, ../../waku/waku_api/rest/admin/handlers as rest_admin_api, ../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api, ../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api, ../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api, ../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api, ../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api, ../../waku/waku_archive, ../../waku/waku_dnsdisc, ../../waku/waku_enr, ../../waku/waku_discv5, ../../waku/waku_peer_exchange, ../../waku/waku_rln_relay, ../../waku/waku_store, ../../waku/waku_lightpush, ../../waku/waku_filter, ../../waku/waku_filter_v2, ./wakunode2_validator_signed, ./internal_config, ./external_config logScope: topics = "wakunode app" # Git version in git describe format (defined at compile time) const git_version* {.strdefine.} = "n/a" type App* = object version: string conf: WakuNodeConf netConf: NetConfig rng: ref HmacDrbgContext key: crypto.PrivateKey record: Record wakuDiscv5: Option[WakuDiscoveryV5] peerStore: Option[WakuPeerStorage] dynamicBootstrapNodes: seq[RemotePeerInfo] node: WakuNode rpcServer: Option[RpcHttpServer] restServer: Option[RestServerRef] metricsServer: Option[MetricsHttpServerRef] AppResult*[T] = Result[T, string] func node*(app: App): WakuNode = app.node func version*(app: App): string = app.version ## Initialisation proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T = let key = if conf.nodeKey.isSome(): conf.nodeKey.get() else: let keyRes = crypto.PrivateKey.random(Secp256k1, rng[]) if keyRes.isErr(): error "failed to generate key", error=keyRes.error quit(QuitFailure) keyRes.get() let netConfigRes = networkConfiguration(conf, clientId) let netConfig = if netConfigRes.isErr(): error "failed to create internal config", error=netConfigRes.error quit(QuitFailure) else: netConfigRes.get() let recordRes = enrConfiguration(conf, netConfig, key) let record = if recordRes.isErr(): error "failed to create record", error=recordRes.error quit(QuitFailure) else: recordRes.get() # Check the ENR sharding info for matching config cluster id if conf.clusterId != 0: let res = record.toTyped() if res.isErr(): error "ENR setup failed", error = $res.get() quit(QuitFailure) let relayShard = res.get().relaySharding().valueOr: error "no sharding info" quit(QuitFailure) if conf.clusterId != relayShard.clusterId: error "cluster id mismatch" quit(QuitFailure) App( version: git_version, conf: conf, netConf: netConfig, rng: rng, key: key, record: record, node: nil ) ## Peer persistence const PeerPersistenceDbUrl = "peers.db" proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] = let db = ? SqliteDatabase.new(PeerPersistenceDbUrl) ? peer_store_sqlite_migrations.migrate(db) let res = WakuPeerStorage.new(db) if res.isErr(): return err("failed to init peer store" & res.error) ok(some(res.value)) proc setupPeerPersistence*(app: var App): AppResult[void] = if not app.conf.peerPersistence: return ok() let peerStoreRes = setupPeerStorage() if peerStoreRes.isErr(): return err("failed to setup peer store" & peerStoreRes.error) app.peerStore = peerStoreRes.get() ok() ## Retrieve dynamic bootstrap nodes (DNS discovery) proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress]): AppResult[seq[RemotePeerInfo]] = if dnsDiscovery and dnsDiscoveryUrl != "": # DNS discovery debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl var nameServers: seq[TransportAddress] for ip in dnsDiscoveryNameServers: nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 let dnsResolver = DnsResolver.new(nameServers) proc resolver(domain: string): Future[string] {.async, gcsafe.} = trace "resolving", domain=domain let resolved = await dnsResolver.resolveTxt(domain) return resolved[0] # Use only first answer var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) if wakuDnsDiscovery.isOk(): return wakuDnsDiscovery.get().findPeers() .mapErr(proc (e: cstring): string = $e) else: warn "Failed to init Waku DNS discovery" debug "No method for retrieving dynamic bootstrap nodes specified." ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] = let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(app.conf.dnsDiscovery, app.conf.dnsDiscoveryUrl, app.conf.dnsDiscoveryNameServers) if dynamicBootstrapNodesRes.isOk(): app.dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() else: warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error ok() ## Setup DiscoveryV5 proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 = let dynamicBootstrapEnrs = app.dynamicBootstrapNodes .filterIt(it.hasUdpPort()) .mapIt(it.enr.get()) var discv5BootstrapEnrs: seq[enr.Record] # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq for enrUri in app.conf.discv5BootstrapNodes: addBootstrapNode(enrUri, discv5BootstrapEnrs) discv5BootstrapEnrs.add(dynamicBootstrapEnrs) let discv5Config = DiscoveryConfig.init(app.conf.discv5TableIpLimit, app.conf.discv5BucketIpLimit, app.conf.discv5BitsPerHop) let discv5UdpPort = Port(uint16(app.conf.discv5UdpPort) + app.conf.portsShift) let discv5Conf = WakuDiscoveryV5Config( discv5Config: some(discv5Config), address: app.conf.listenAddress, port: discv5UdpPort, privateKey: keys.PrivateKey(app.key.skkey), bootstrapRecords: discv5BootstrapEnrs, autoupdateRecord: app.conf.discv5EnrAutoUpdate, ) WakuDiscoveryV5.new( app.rng, discv5Conf, some(app.record), some(app.node.peerManager), app.node.topicSubscriptionQueue, ) ## Init waku node instance proc initNode(conf: WakuNodeConf, netConfig: NetConfig, rng: ref HmacDrbgContext, nodeKey: crypto.PrivateKey, record: enr.Record, peerStore: Option[WakuPeerStorage], dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): AppResult[WakuNode] = ## Setup a basic Waku v2 node based on a supplied configuration ## file. Optionally include persistent peer storage. ## No protocols are mounted yet. var dnsResolver: DnsResolver if conf.dnsAddrs: # Support for DNS multiaddrs var nameServers: seq[TransportAddress] for ip in conf.dnsAddrsNameServers: nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 dnsResolver = DnsResolver.new(nameServers) var node: WakuNode let pStorage = if peerStore.isNone(): nil else: peerStore.get() # Build waku node instance var builder = WakuNodeBuilder.init() builder.withRng(rng) builder.withNodeKey(nodekey) builder.withRecord(record) builder.withNetworkConfiguration(netConfig) builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity) builder.withSwitchConfiguration( maxConnections = some(conf.maxConnections.int), secureKey = some(conf.websocketSecureKeyPath), secureCert = some(conf.websocketSecureCertPath), nameResolver = dnsResolver, sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled agentString = some(conf.agentString) ) builder.withColocationLimit(conf.colocationLimit) builder.withPeerManagerConfig(maxRelayPeers = conf.maxRelayPeers) node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err) ok(node) proc setupWakuApp*(app: var App): AppResult[void] = ## Waku node let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes) if initNodeRes.isErr(): return err("failed to init node: " & initNodeRes.error) app.node = initNodeRes.get() ## Discv5 if app.conf.discv5Discovery: app.wakuDiscV5 = some(app.setupDiscoveryV5()) ok() proc getPorts(listenAddrs: seq[MultiAddress]): AppResult[tuple[tcpPort, websocketPort: Option[Port]]] = var tcpPort, websocketPort = none(Port) for a in listenAddrs: if a.isWsAddress(): if websocketPort.isNone(): let wsAddress = initTAddress(a).valueOr: return err("getPorts wsAddr error:" & $error) websocketPort = some(wsAddress.port) elif tcpPort.isNone(): let tcpAddress = initTAddress(a).valueOr: return err("getPorts tcpAddr error:" & $error) tcpPort = some(tcpAddress.port) return ok((tcpPort: tcpPort, websocketPort: websocketPort)) proc updateNetConfig(app: var App): AppResult[void] = var conf = app.conf let (tcpPort, websocketPort) = getPorts(app.node.switch.peerInfo.listenAddrs).valueOr: return err("Could not retrieve ports " & error) if tcpPort.isSome(): conf.tcpPort = tcpPort.get() if websocketPort.isSome(): conf.websocketPort = websocketPort.get() # Rebuild NetConfig with bound port values let netConf = networkConfiguration(conf, clientId).valueOr: return err("Could not update NetConfig: " & error) app.netConf = netConf return ok() proc updateEnr(app: var App): AppResult[void] = let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr: return err("ENR setup failed: " & error) if app.conf.clusterId != 0: let tRecord = record.toTyped().valueOr: return err("ENR setup failed: " & $error) let relayShard = tRecord.relaySharding().valueOr: return err("ENR setup failed: no sharding info") if app.conf.clusterId != relayShard.clusterId: return err("ENR setup failed: cluster id mismatch") app.record = record app.node.enr = record if app.conf.discv5Discovery: app.wakuDiscV5 = some(app.setupDiscoveryV5()) return ok() proc updateApp(app: var App): AppResult[void] = if app.conf.tcpPort == Port(0) or app.conf.websocketPort == Port(0): updateNetConfig(app).isOkOr: return err("error calling updateNetConfig: " & $error) updateEnr(app).isOkOr: return err("error calling updateEnr: " & $error) app.node.announcedAddresses = app.netConf.announcedAddresses printNodeNetworkInfo(app.node) return ok() ## Mount protocols proc setupProtocols(node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey): Future[AppResult[void]] {.async.} = ## Setup configured protocols on an existing Waku v2 node. ## Optionally include persistent message storage. ## No protocols are started yet. node.mountMetadata(conf.clusterId).isOkOr: return err("failed to mount waku metadata protocol: " & error) # Mount relay on all nodes var peerExchangeHandler = none(RoutingRecordsHandler) if conf.relayPeerExchange: proc handlePeerExchange(peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]) {.gcsafe.} = ## Handle peers received via gossipsub peer exchange # TODO: Only consider peers on pubsub topics we subscribe to let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records .mapIt(toRemotePeerInfo(it.record.get())) debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len # asyncSpawn, as we don't want to block here asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange") peerExchangeHandler = some(handlePeerExchange) if conf.relay: let pubsubTopics = if conf.pubsubTopics.len > 0 or conf.contentTopics.len > 0: # TODO autoshard content topics only once. # Already checked for errors in app.init let shards = conf.contentTopics.mapIt(getShard(it).expect("Valid Shard")) conf.pubsubTopics & shards else: conf.topics let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr: return err("failed to parse 'max-num-bytes-msg-size' param: " & $error) debug "Setting max message size", num_bytes=parsedMaxMsgSize try: await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize)) except CatchableError: return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) # Add validation keys to protected topics for topicKey in conf.protectedTopics: if topicKey.topic notin pubsubTopics: warn "protected topic not in subscribed pubsub topics, skipping adding validator", protectedTopic=topicKey.topic, subscribedTopics=pubsubTopics continue notice "routing only signed traffic", protectedTopic=topicKey.topic, publicKey=topicKey.key node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topicKey.topic), topicKey.key) # Enable Rendezvous Discovery protocol when Relay is enabled try: await mountRendezvous(node) except CatchableError: return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) # Keepalive mounted on all nodes try: await mountLibp2pPing(node) except CatchableError: return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) if conf.rlnRelay: let rlnConf = WakuRlnConfig( rlnRelayDynamic: conf.rlnRelayDynamic, rlnRelayCredIndex: conf.rlnRelayCredIndex, rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress, rlnRelayCredPath: conf.rlnRelayCredPath, rlnRelayCredPassword: conf.rlnRelayCredPassword, rlnRelayTreePath: conf.rlnRelayTreePath, ) try: waitFor node.mountRlnRelay(rlnConf) except CatchableError: return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) if conf.store: var onErrAction = proc(msg: string) {.gcsafe, closure.} = ## Action to be taken when an internal error occurs during the node run. ## e.g. the connection with the database is lost and not recovered. error "Unrecoverable error occurred", error = msg quit(QuitFailure) # Archive setup let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration, conf.storeMaxNumDbConnections, onErrAction) if archiveDriverRes.isErr(): return err("failed to setup archive driver: " & archiveDriverRes.error) let retPolicyRes = RetentionPolicy.new(conf.storeMessageRetentionPolicy) if retPolicyRes.isErr(): return err("failed to create retention policy: " & retPolicyRes.error) let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get()) if mountArcRes.isErr(): return err("failed to mount waku archive protocol: " & mountArcRes.error) # Store setup try: await mountStore(node) except CatchableError: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) mountStoreClient(node) if conf.storenode != "": let storeNode = parsePeerInfo(conf.storenode) if storeNode.isOk(): node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec) else: return err("failed to set node waku store peer: " & storeNode.error) # NOTE Must be mounted after relay if conf.lightpush: try: await mountLightPush(node) except CatchableError: return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) if conf.lightpushnode != "": let lightPushNode = parsePeerInfo(conf.lightpushnode) if lightPushNode.isOk(): mountLightPushClient(node) node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) else: return err("failed to set node waku lightpush peer: " & lightPushNode.error) # Filter setup. NOTE Must be mounted after relay if conf.filter: try: await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) except CatchableError: return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) if conf.filternode != "": let filterNode = parsePeerInfo(conf.filternode) if filterNode.isOk(): try: await node.mountFilterClient() node.peerManager.addServicePeer(filterNode.value, WakuLegacyFilterCodec) node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) except CatchableError: return err("failed to mount waku filter client protocol: " & getCurrentExceptionMsg()) else: return err("failed to set node waku filter peer: " & filterNode.error) # waku peer exchange setup if conf.peerExchangeNode != "" or conf.peerExchange: try: await mountPeerExchange(node) except CatchableError: return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) if conf.peerExchangeNode != "": let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode) if peerExchangeNode.isOk(): node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec) else: return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) return ok() proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} = return await setupProtocols( app.node, app.conf, app.key ) ## Start node proc startNode(node: WakuNode, conf: WakuNodeConf, dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[AppResult[void]] {.async.} = ## Start a configured node and all mounted protocols. ## Connect to static nodes and start ## keep-alive, if configured. # Start Waku v2 node try: await node.start() except CatchableError: return err("failed to start waku node: " & getCurrentExceptionMsg()) # Connect to configured static nodes if conf.staticnodes.len > 0: try: await connectToNodes(node, conf.staticnodes, "static") except CatchableError: return err("failed to connect to static nodes: " & getCurrentExceptionMsg()) if dynamicBootstrapNodes.len > 0: info "Connecting to dynamic bootstrap peers" try: await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap") except CatchableError: return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) # retrieve px peers and add the to the peer store if conf.peerExchangeNode != "": let desiredOutDegree = node.wakuRelay.parameters.d.uint64() await node.fetchPeerExchangePeers(desiredOutDegree) # Start keepalive, if enabled if conf.keepAlive: node.startKeepalive() # Maintain relay connections if conf.relay: node.peerManager.start() return ok() proc startApp*(app: var App): AppResult[void] = let nodeRes = catch: (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)) if nodeRes.isErr(): return err("exception starting node: " & nodeRes.error.msg) nodeRes.get().isOkOr: return err("exception starting node: " & error) # Update app data that is set dynamically on node start app.updateApp().isOkOr: return err("Error in updateApp: " & $error) if app.wakuDiscv5.isSome(): let wakuDiscv5 = app.wakuDiscv5.get() let catchRes = catch: (waitFor wakuDiscv5.start()) let startRes = catchRes.valueOr: return err("failed to start waku discovery v5: " & catchRes.error.msg) startRes.isOkOr: return err("failed to start waku discovery v5: " & error) return ok() ## Monitoring and external interfaces proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeConf): AppResult[RestServerRef] = # Used to register api endpoints that are not currently installed as keys, # values are holding error messages to be returned to the client var notInstalledTab: Table[string, string] = initTable[string, string]() proc requestErrorHandler(error: RestRequestError, request: HttpRequestRef): Future[HttpResponseRef] {.async.} = case error of RestRequestError.Invalid: return await request.respond(Http400, "Invalid request", HttpTable.init()) of RestRequestError.NotFound: let rootPath = request.rawPath.split("/")[1] if notInstalledTab.hasKey(rootPath): return await request.respond(Http404, notInstalledTab[rootPath], HttpTable.init()) else: return await request.respond(Http400, "Bad request initiated. Invalid path or method used.", HttpTable.init()) of RestRequestError.InvalidContentBody: return await request.respond(Http400, "Invalid content body", HttpTable.init()) of RestRequestError.InvalidContentType: return await request.respond(Http400, "Invalid content type", HttpTable.init()) of RestRequestError.Unexpected: return defaultResponse() return defaultResponse() let server = ? newRestHttpServer(address, port, requestErrorHandler = requestErrorHandler) ## Admin REST API if conf.restAdmin: installAdminApiHandlers(server.router, app.node) ## Debug REST API installDebugApiHandlers(server.router, app.node) ## Health REST API installHealthApiHandler(server.router, app.node) ## Relay REST API if conf.relay: let cache = MessageCache.init(int(conf.restRelayCacheCapacity)) let handler = messageCacheHandler(cache) for pubsubTopic in conf.pubsubTopics: cache.pubsubSubscribe(pubsubTopic) app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) for contentTopic in conf.contentTopics: cache.contentSubscribe(contentTopic) app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler)) installRelayApiHandlers(server.router, app.node, cache) else: notInstalledTab["relay"] = "/relay endpoints are not available. Please check your configuration: --relay" ## Filter REST API if conf.filternode != "" and app.node.wakuFilterClient != nil and app.node.wakuFilterClientLegacy != nil: let legacyFilterCache = MessageCache.init() rest_legacy_filter_api.installLegacyFilterRestApiHandlers(server.router, app.node, legacyFilterCache) let filterCache = MessageCache.init() let filterDiscoHandler = if app.wakuDiscv5.isSome(): some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter)) else: none(DiscoveryHandler) rest_filter_api.installFilterRestApiHandlers( server.router, app.node, filterCache, filterDiscoHandler, ) else: notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode" ## Store REST API let storeDiscoHandler = if app.wakuDiscv5.isSome(): some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store)) else: none(DiscoveryHandler) installStoreApiHandlers(server.router, app.node, storeDiscoHandler) ## Light push API if conf.lightpushnode != "" and app.node.wakuLightpushClient != nil: let lightDiscoHandler = if app.wakuDiscv5.isSome(): some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush)) else: none(DiscoveryHandler) rest_lightpush_api.installLightPushRequestHandler(server.router, app.node, lightDiscoHandler) else: notInstalledTab["lightpush"] = "/lightpush endpoints are not available. Please check your configuration: --lightpushnode" server.start() info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/" ok(server) proc startRpcServer(app: App, address: IpAddress, port: Port, conf: WakuNodeConf): AppResult[RpcHttpServer] = let ta = initTAddress(address, port) var server: RpcHttpServer try: server = newRpcHttpServer([ta]) except CatchableError: return err("failed to init JSON-RPC server: " & getCurrentExceptionMsg()) installDebugApiHandlers(app.node, server) if conf.relay: let cache = MessageCache.init(capacity=50) let handler = messageCacheHandler(cache) for pubsubTopic in conf.pubsubTopics: cache.pubsubSubscribe(pubsubTopic) app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) for contentTopic in conf.contentTopics: cache.contentSubscribe(contentTopic) app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler)) installRelayApiHandlers(app.node, server, cache) if conf.filternode != "": let filterMessageCache = MessageCache.init(capacity=50) installFilterApiHandlers(app.node, server, filterMessageCache) installStoreApiHandlers(app.node, server) if conf.rpcAdmin: installAdminApiHandlers(app.node, server) server.start() info "RPC Server started", address=ta ok(server) proc startMetricsServer(serverIp: IpAddress, serverPort: Port): AppResult[MetricsHttpServerRef] = info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort let metricsServerRes = MetricsHttpServerRef.new($serverIp, serverPort) if metricsServerRes.isErr(): return err("metrics HTTP server start failed: " & $metricsServerRes.error) let server = metricsServerRes.value try: waitFor server.start() except CatchableError: return err("metrics HTTP server start failed: " & getCurrentExceptionMsg()) info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort ok(server) proc startMetricsLogging(): AppResult[void] = startMetricsLog() ok() proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] = if app.conf.rpc: let startRpcServerRes = startRpcServer(app, app.conf.rpcAddress, Port(app.conf.rpcPort + app.conf.portsShift), app.conf) if startRpcServerRes.isErr(): error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error else: app.rpcServer = some(startRpcServerRes.value) if app.conf.rest: let startRestServerRes = startRestServer(app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf) if startRestServerRes.isErr(): error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error else: app.restServer = some(startRestServerRes.value) if app.conf.metricsServer: let startMetricsServerRes = startMetricsServer(app.conf.metricsServerAddress, Port(app.conf.metricsServerPort + app.conf.portsShift)) if startMetricsServerRes.isErr(): error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error else: app.metricsServer = some(startMetricsServerRes.value) if app.conf.metricsLogging: let startMetricsLoggingRes = startMetricsLogging() if startMetricsLoggingRes.isErr(): error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error ok() # App shutdown proc stop*(app: App): Future[void] {.async: (raises: [Exception]).} = if app.restServer.isSome(): await app.restServer.get().stop() if app.rpcServer.isSome(): await app.rpcServer.get().stop() if app.metricsServer.isSome(): await app.metricsServer.get().stop() if app.wakuDiscv5.isSome(): await app.wakuDiscv5.get().stop() if not app.node.isNil(): await app.node.stop()