diff --git a/library/events/json_connection_status_change_event.nim b/library/events/json_connection_status_change_event.nim index 9b8a0f9e6..f5af78f24 100644 --- a/library/events/json_connection_status_change_event.nim +++ b/library/events/json_connection_status_change_event.nim @@ -2,7 +2,7 @@ import system, std/json import ./json_base_event -import ../../logos_delivery/waku/api/types +import ../../logos_delivery/api/types type JsonConnectionStatusChangeEvent* = ref object of JsonEvent status*: ConnectionStatus diff --git a/logos_delivery/waku/api/api_conf.nim b/logos_delivery/api/api_conf.nim similarity index 100% rename from logos_delivery/waku/api/api_conf.nim rename to logos_delivery/api/api_conf.nim diff --git a/logos_delivery/waku/api/types.nim b/logos_delivery/api/types.nim similarity index 100% rename from logos_delivery/waku/api/types.nim rename to logos_delivery/api/types.nim diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index 5a7ab24d4..307dc17a4 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -21,7 +21,7 @@ import bearssl/rand import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto -import logos_delivery/waku/api/types +import logos_delivery/api/types import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/waku_core/topics @@ -135,7 +135,7 @@ proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) = ## and the total number of confirmed + failed segments equals the total expected segments. ## Therefore, the channel-level request is removed from `self.channelReqs` ## and the appropriate final event is emitted. - ## + ## let state = self.channelReqs.getOrDefault(channelReqId) if state.totalExpectedSegments == 0: ## Either already finalized (and removed) or never inserted. diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index 887444b19..29feab0b9 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -15,7 +15,7 @@ import brokers/broker_context import logos_delivery/waku/events/message_events as waku_message_events import logos_delivery/messaging/messaging_client -import logos_delivery/waku/api/types +import logos_delivery/api/types import logos_delivery/waku/waku_core/topics import logos_delivery/waku/persistency/sds_persistency diff --git a/logos_delivery/channels/types.nim b/logos_delivery/channels/types.nim index ec663cf7b..7730c5c58 100644 --- a/logos_delivery/channels/types.nim +++ b/logos_delivery/channels/types.nim @@ -1,7 +1,7 @@ ## Core identifier types for the Reliable Channel API. import std/hashes -import logos_delivery/waku/api/types as api_types +import logos_delivery/api/types as api_types import ./scalable_data_sync/scalable_data_sync diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index 2e8d1959d..1c2682f76 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -22,7 +22,6 @@ export reliable_channel_manager import logos_delivery/waku/factory/waku_conf import logos_delivery/waku/factory/app_callbacks -import logos_delivery/waku/api/[api_conf, types] logScope: topics = "logosdelivery" @@ -82,6 +81,13 @@ proc new*( proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = ## Starts each layer bottom-up: transport first, then messaging, then channels. + if self.waku.isNil(): + return err("Waku node is not initialized") + if self.messagingClient.isNil(): + return err("MessagingClient is not initialized") + if self.reliableChannelManager.isNil(): + return err("ReliableChannelManager is not initialized") + (await self.waku.start()).isOkOr: return err("failed to start Waku: " & error) @@ -102,3 +108,8 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = return err("failed to stop Waku: " & error) return ok() + +proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} = + if self.waku.isNil(): + return err("Waku node is not initialized") + return ok(self.waku.healthMonitor.onlineMonitor.amIOnline()) diff --git a/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim b/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim index 685f9afce..1f76078fe 100644 --- a/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim +++ b/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim @@ -3,7 +3,7 @@ import std/[options, times], chronos import brokers/broker_context import logos_delivery/waku/waku_core, - logos_delivery/waku/api/types, + logos_delivery/api/types, logos_delivery/waku/requests/node_requests type DeliveryState* {.pure.} = enum diff --git a/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim b/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim index 3acc44bb4..dc40b797d 100644 --- a/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim +++ b/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim @@ -4,7 +4,7 @@ import chronos, chronicles import brokers/broker_context import logos_delivery/waku/[waku_core], logos_delivery/waku/waku_lightpush/[common, rpc] import logos_delivery/waku/requests/health_requests -import logos_delivery/waku/api/types +import logos_delivery/api/types import ./[delivery_task, send_processor] logScope: diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 652671e46..a05c94377 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -1,7 +1,7 @@ import results, chronos import chronicles import - logos_delivery/waku/api/types, + logos_delivery/api/types, logos_delivery/waku/node/[waku_node, subscription_manager], logos_delivery/messaging/delivery_service/[recv_service, send_service], logos_delivery/messaging/delivery_service/send_service/delivery_task @@ -43,6 +43,26 @@ proc stop*(self: MessagingClient) {.async.} = await self.recvService.stopRecvService() self.started = false +proc checkApiAvailability(mc: MessagingClient): Result[void, string] = + if mc.isNil(): + return err("MessagingClient is not initialized") + + return ok() + +proc subscribe*( + mc: MessagingClient, contentTopic: ContentTopic +): Future[Result[void, string]] {.async.} = + ?checkApiAvailability(mc) + + return mc.node.subscriptionManager.subscribe(contentTopic) + +proc unsubscribe*( + mc: MessagingClient, contentTopic: ContentTopic +): Result[void, string] = + ?checkApiAvailability(mc) + + return mc.node.subscriptionManager.unsubscribe(contentTopic) + proc send*( self: MessagingClient, envelope: MessageEnvelope ): Future[Result[RequestId, string]] {.async.} = diff --git a/logos_delivery/waku/api.nim b/logos_delivery/waku/api.nim index a977a062a..1fa25160b 100644 --- a/logos_delivery/waku/api.nim +++ b/logos_delivery/waku/api.nim @@ -1,5 +1,4 @@ -import ./api/[api, api_conf] import ./events/message_events import tools/confutils/entry_nodes -export api, api_conf, entry_nodes, message_events +export entry_nodes, message_events diff --git a/logos_delivery/waku/api/api.nim b/logos_delivery/waku/api/api.nim deleted file mode 100644 index a9f2b6e76..000000000 --- a/logos_delivery/waku/api/api.nim +++ /dev/null @@ -1,48 +0,0 @@ -import logos_delivery/waku/compat/option_valueor -import std/[net, options] - -import chronicles, chronos, libp2p/peerid, results - -import logos_delivery/waku/waku -import logos_delivery/waku/[requests/health_requests, waku_core, waku_node] -import logos_delivery/waku/node/subscription_manager -import libp2p/peerid -import tools/confutils/cli_args -import ./[api_conf, types] - -export cli_args - -logScope: - topics = "api" - -proc createNode*(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} = - let wakuConf = conf.toWakuConf().valueOr: - return err("Failed to handle the configuration: " & error) - - ## We are not defining app callbacks at node creation - let wakuRes = (await Waku.new(wakuConf)).valueOr: - error "waku initialization failed", error = error - return err("Failed setting up Waku: " & $error) - - return ok(wakuRes) - -proc checkApiAvailability(w: Waku): Result[void, string] = - if w.isNil(): - return err("Waku node is not initialized") - - # TODO: Conciliate request-bouncing health checks here with unit testing. - # (For now, better to just allow all sends and rely on retries.) - - return ok() - -proc subscribe*( - w: Waku, contentTopic: ContentTopic -): Future[Result[void, string]] {.async.} = - ?checkApiAvailability(w) - - return w.node.subscriptionManager.subscribe(contentTopic) - -proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] = - ?checkApiAvailability(w) - - return w.node.subscriptionManager.unsubscribe(contentTopic) diff --git a/logos_delivery/waku/api/send_api.md b/logos_delivery/waku/api/send_api.md deleted file mode 100644 index 2a5a2f8a4..000000000 --- a/logos_delivery/waku/api/send_api.md +++ /dev/null @@ -1,46 +0,0 @@ -# SEND API - -**THIS IS TO BE REMOVED BEFORE PR MERGE** - -This document collects logic and todo's around the Send API. - -## Overview - -Send api hides the complex logic of using raw protocols for reliable message delivery. -The delivery method is chosen based on the node configuration and actual availabilities of peers. - -## Delivery task - -Each message send request is bundled into a task that not just holds the composed message but also the state of the delivery. - -## Delivery methods - -Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes: -- P2PReliability validation - checking network store node whether the message is reached at least a store node. -- Simple retry until message is propagated to the network - - Relay says >0 peers as publish result - - LightpushClient returns with success - -Depending on node config: -- Relay -- Lightpush - -These methods are used in combination to achieve the best reliability. -Fallback mechanism is used to switch between methods if the current one fails. - -Relay+StoreCheck -> Relay+simple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error - -Combination is dynamically chosen on node configuration. Levels can be skipped depending on actual connectivity. -Actual connectivity is checked: -- Relay's topic health check - at least dLow peers in the mesh for the topic -- Store nodes availability - at least one store service node is available in peer manager -- Lightpush client availability - at least one lightpush service node is available in peer manager - -## Delivery processing - -At every send request, each task is tried to be delivered right away. -Any further retries and store check is done as a background task in a loop with predefined intervals. -Each task is set for a maximum number of retries and/or maximum time to live. - -In each round of store check and retry send tasks are selected based on their state. -The state is updated based on the result of the delivery method. diff --git a/logos_delivery/waku/events/health_events.nim b/logos_delivery/waku/events/health_events.nim index 4ff6f0c6c..d19de776b 100644 --- a/logos_delivery/waku/events/health_events.nim +++ b/logos_delivery/waku/events/health_events.nim @@ -1,6 +1,6 @@ import brokers/event_broker -import logos_delivery/waku/api/types +import logos_delivery/api/types import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health] import logos_delivery/waku/waku_core/topics diff --git a/logos_delivery/waku/events/message_events.nim b/logos_delivery/waku/events/message_events.nim index 9338fda67..2e4bece80 100644 --- a/logos_delivery/waku/events/message_events.nim +++ b/logos_delivery/waku/events/message_events.nim @@ -1,5 +1,6 @@ import brokers/event_broker -import logos_delivery/waku/[api/types, waku_core/message, waku_core/topics] +import logos_delivery/api/types +import logos_delivery/waku/[waku_core/message, waku_core/topics] export types EventBroker: diff --git a/logos_delivery/waku/factory/waku.nim b/logos_delivery/waku/factory/waku.nim new file mode 100644 index 000000000..a9a649ca8 --- /dev/null +++ b/logos_delivery/waku/factory/waku.nim @@ -0,0 +1,1034 @@ +import logos_delivery/waku/compat/option_valueor +{.push raises: [].} + +import + std/[options, sequtils, strformat, net, strutils], + results, + chronicles, + chronos, + secp256k1, + libp2p/protocols/ping, + libp2p/protocols/connectivity/relay/relay, + libp2p/protocols/connectivity/relay/client, + libp2p/wire, + libp2p/crypto/crypto, + libp2p/protocols/pubsub/gossipsub, + libp2p/services/autorelayservice, + libp2p/services/hpservice, + libp2p/peerid, + eth/keys, + eth/p2p/discoveryv5/enr, + presto, + metrics, + metrics/chronos_httpserver, + brokers/broker_context, + brokers/broker_implement, + logos_delivery/api/types, + logos_delivery/waku/[ + waku_core, + waku_node, + waku_archive, + waku_rln_relay, + waku_store, + waku_filter_v2, + waku_relay/protocol, + waku_enr/sharding, + waku_enr/multiaddr, + common/logging, + node/peer_manager, + node/health_monitor, + net/net_config, + node/waku_metrics, + node/subscription_manager, + rest_api/message_cache, + rest_api/endpoint/server, + rest_api/endpoint/builder as rest_server_builder, + discovery/waku_dnsdisc, + discovery/waku_discv5, + discovery/autonat_service, + requests/health_requests, + factory/node_factory, + factory/internal_config, + factory/app_callbacks, + persistency/persistency, + waku_lightpush_legacy/client, + waku_store/client as waku_store_client, + factory/validator_signed, + ], + logos_delivery/api/kernel_interface, + logos_delivery/channels/reliable_channel_manager, + logos_delivery/messaging/messaging_client, + ./waku_conf, + ./waku_state_info + +logScope: + topics = "wakunode waku" + +# Git version in git describe format (defined at compile time) +const git_version* {.strdefine.} = "n/a" + +type Waku* = ref object of KernelInterface + stateInfo*: WakuStateInfo + conf*: WakuConf + rng*: crypto.Rng + + key: crypto.PrivateKey + + wakuDiscv5*: WakuDiscoveryV5 + dynamicBootstrapNodes*: seq[RemotePeerInfo] + dnsRetryLoopHandle: Future[void] + networkConnLoopHandle: Future[void] + + node*: WakuNode + + # TODO: remove this indirection. Now kept for legacy. + healthMonitor*: NodeHealthMonitor + messagingClient*: MessagingClient + + reliableChannelManager*: ReliableChannelManager + + restServer*: WakuRestServerRef + metricsServer*: MetricsHttpServerRef + appCallbacks*: AppCallbacks + ## `brokerCtx` is inherited from the `KernelInterface` broker base. + +proc setupSwitchServices( + waku: Waku, conf: WakuConf, circuitRelay: Relay, rng: crypto.Rng +) = + proc onReservation(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} = + info "circuit relay handler new reserve event", + addrs_before = $(waku.node.announcedAddresses), addrs = $addresses + + waku.node.announcedAddresses.setLen(0) ## remove previous addresses + waku.node.announcedAddresses.add(addresses) + info "waku node announced addresses updated", + announcedAddresses = waku.node.announcedAddresses + + if not isNil(waku.wakuDiscv5): + waku.wakuDiscv5.updateAnnouncedMultiAddress(addresses).isOkOr: + error "failed to update announced multiaddress", error = $error + + let autonatService = getAutonatService(rng) + if conf.circuitRelayClient: + ## The node is considered to be behind a NAT or firewall and then it + ## should struggle to be reachable and establish connections to other nodes + const MaxNumRelayServers = 2 + let autoRelayService = AutoRelayService.new( + MaxNumRelayServers, RelayClient(circuitRelay), onReservation, rng + ) + let holePunchService = HPService.new(autonatService, autoRelayService) + waku.node.switch.services = @[Service(holePunchService)] + else: + waku.node.switch.services = @[Service(autonatService)] + + # libp2p 2.0.0 split Service.setup out of Service.start: the switch runs setup + # only at build time (SwitchBuilder.setupServices), while switch.start calls + # just start. These services are created and attached post-build, so setup must + # be invoked explicitly here -- otherwise AutonatService.addressMapper stays nil + # and the peerInfo.update() inside start dereferences it (SIGSEGV). + for service in waku.node.switch.services: + try: + service.setup(waku.node.switch) + except ServiceSetupError as e: + error "failed to set up libp2p switch service", error = e.msg + +## Initialisation + +proc newCircuitRelay(isRelayClient: bool): Relay = + # TODO: Does it mean it's a circuit-relay server when it's false? + if isRelayClient: + return RelayClient.new() + return Relay.new() + +proc setupAppCallbacks( + node: WakuNode, + conf: WakuConf, + appCallbacks: AppCallbacks, + healthMonitor: NodeHealthMonitor, +): Result[void, string] = + if appCallbacks.isNil(): + info "No external callbacks to be set" + return ok() + + if not appCallbacks.relayHandler.isNil(): + if node.wakuRelay.isNil(): + return err("Cannot configure relayHandler callback without Relay mounted") + + let autoShards = + if node.wakuAutoSharding.isSome(): + node.getAutoshards(conf.contentTopics).valueOr: + return err("Could not get autoshards: " & error) + else: + @[] + + let confShards = conf.subscribeShards.mapIt( + RelayShard(clusterId: conf.clusterId, shardId: uint16(it)) + ) + let shards = confShards & autoShards + + let uniqueShards = deduplicate(shards) + + for shard in uniqueShards: + let topic = $shard + node.subscribe((kind: PubsubSub, topic: topic), appCallbacks.relayHandler).isOkOr: + return err(fmt"Could not subscribe {topic}: " & $error) + + if not appCallbacks.topicHealthChangeHandler.isNil(): + if node.wakuRelay.isNil(): + return + err("Cannot configure topicHealthChangeHandler callback without Relay mounted") + node.wakuRelay.onTopicHealthChange = appCallbacks.topicHealthChangeHandler + + if not appCallbacks.connectionChangeHandler.isNil(): + if node.peerManager.isNil(): + return + err("Cannot configure connectionChangeHandler callback with empty peer manager") + node.peerManager.onConnectionChange = appCallbacks.connectionChangeHandler + + if not appCallbacks.connectionStatusChangeHandler.isNil(): + if healthMonitor.isNil(): + return + err("Cannot configure connectionStatusChangeHandler with empty health monitor") + + healthMonitor.onConnectionStatusChange = appCallbacks.connectionStatusChangeHandler + + return ok() + +proc getPorts( + listenAddrs: seq[MultiAddress] +): Result[tuple[tcpPort, websocketPort, quicPort: Option[Port]], string] = + var tcpPort, websocketPort, quicPort = none(Port) + + for a in listenAddrs: + if a.isWsAddress(): + if websocketPort.isNone(): + let wsAddress = initTAddress(a).valueOr: + return err("getPorts wsAddr error:" & $error) + websocketPort = some(wsAddress.port) + elif a.isQuicAddress(): + if quicPort.isNone(): + let quicAddress = initTAddress(a).valueOr: + return err("getPorts quicAddr error:" & $error) + quicPort = some(quicAddress.port) + elif tcpPort.isNone(): + let tcpAddress = initTAddress(a).valueOr: + return err("getPorts tcpAddr error:" & $error) + tcpPort = some(tcpAddress.port) + + return ok((tcpPort: tcpPort, websocketPort: websocketPort, quicPort: quicPort)) + +proc getRunningNetConfig(waku: Waku): Future[Result[NetConfig, string]] {.async.} = + let conf = waku.conf + let (tcpPort, websocketPort, quicPort) = getPorts( + waku.node.switch.peerInfo.listenAddrs + ).valueOr: + return err("Could not retrieve ports: " & error) + + if tcpPort.isSome(): + conf.endpointConf.p2pTcpPort = tcpPort.get() + + if websocketPort.isSome() and conf.webSocketConf.isSome(): + conf.webSocketConf.get().port = websocketPort.get() + + if quicPort.isSome() and conf.quicConf.isSome(): + conf.quicConf.get().port = quicPort.get() + + # Rebuild NetConfig with bound port values + let netConf = ( + await networkConfiguration( + conf.clusterId, conf.endpointConf, conf.discv5Conf, conf.webSocketConf, + conf.quicConf, conf.wakuFlags, conf.dnsAddrsNameServers, conf.portsShift, clientId, + ) + ).valueOr: + return err("Could not update NetConfig: " & error) + + return ok(netConf) + +proc updateEnr(waku: Waku): Future[Result[void, string]] {.async.} = + let netConf: NetConfig = (await getRunningNetConfig(waku)).valueOr: + return err("error calling updateNetConfig: " & $error) + let record = enrConfiguration(waku.conf, netConf).valueOr: + return err("ENR setup failed: " & error) + + if isClusterMismatched(record, waku.conf.clusterId): + return err("cluster-id mismatch configured shards") + + waku.node.enr = record + + # If TCP/WS was configured with port 0, node.announcedAddresses was built + # pre-bind with a port value of 0. In any case, the resync is harmless. + waku.node.announcedAddresses = netConf.announcedAddresses + + return ok() + +proc updateAddressInENR(waku: Waku): Result[void, string] = + let addresses: seq[MultiAddress] = waku.node.announcedAddresses + let encodedAddrs = multiaddr.encodeMultiaddrs(addresses) + + ## First update the enr info contained in WakuNode + let keyBytes = waku.key.getRawBytes().valueOr: + return err("failed to retrieve raw bytes from waku key: " & $error) + + let parsedPk = keys.PrivateKey.fromHex(keyBytes.toHex()).valueOr: + return err("failed to parse the private key: " & $error) + + let enrFields = @[toFieldPair(MultiaddrEnrField, encodedAddrs)] + waku.node.enr.update(parsedPk, extraFields = enrFields).isOkOr: + return err("failed to update multiaddress in ENR updateAddressInENR: " & $error) + + info "Waku node ENR updated successfully with new multiaddress", + enr = waku.node.enr.toUri(), record = $(waku.node.enr) + + ## Now update the ENR infor in discv5 + if not waku.wakuDiscv5.isNil(): + waku.wakuDiscv5.protocol.localNode.record = waku.node.enr + let enr = waku.wakuDiscv5.protocol.localNode.record + + info "Waku discv5 ENR updated successfully with new multiaddress", + enr = enr.toUri(), record = $(enr) + + return ok() + +proc updateWaku(waku: Waku): Future[Result[void, string]] {.async.} = + (await updateEnr(waku)).isOkOr: + return err("error calling updateEnr: " & $error) + + ?updateAnnouncedAddrWithPrimaryIpAddr(waku.node) + + ?updateAddressInENR(waku) + + return ok() + +proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} = + while true: + await sleepAsync(30.seconds) + if waku.conf.dnsDiscoveryConf.isSome(): + let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get() + waku.dynamicBootstrapNodes = ( + await waku_dnsdisc.retrieveDynamicBootstrapNodes( + dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers + ) + ).valueOr: + error "Retrieving dynamic bootstrap nodes failed", error = error + continue + + if not waku.wakuDiscv5.isNil(): + let dynamicBootstrapEnrs = + waku.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get().toUri()) + var discv5BootstrapEnrs: seq[enr.Record] + # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq + for enrUri in dynamicBootstrapEnrs: + addBootstrapNode(enrUri, discv5BootstrapEnrs) + + waku.wakuDiscv5.updateBootstrapRecords( + waku.wakuDiscv5.protocol.bootstrapRecords & discv5BootstrapEnrs + ) + + info "Connecting to dynamic bootstrap peers" + try: + await connectToNodes(waku.node, waku.dynamicBootstrapNodes, "dynamic bootstrap") + except CatchableError: + error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() + return + +# Notice this interface to be used only from LogosDelivery, hence not in the interface level. +proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = + if waku.node.started: + warn "start: waku node already started" + return ok() + + info "Retrieve dynamic bootstrap nodes" + let conf = waku.conf + + if conf.dnsDiscoveryConf.isSome(): + let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get() + let dynamicBootstrapNodesRes = + try: + await waku_dnsdisc.retrieveDynamicBootstrapNodes( + dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers + ) + except CatchableError as exc: + Result[seq[RemotePeerInfo], string].err( + "Retrieving dynamic bootstrap nodes failed: " & exc.msg + ) + + if dynamicBootstrapNodesRes.isErr(): + error "Retrieving dynamic bootstrap nodes failed", + error = dynamicBootstrapNodesRes.error + # Start Dns Discovery retry loop + waku.dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop() + else: + waku.dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() + + ## Initialize persistency singleton instance - we don't need the instance itself here, + ## but this ensures it's initialized before any store job starts. + discard Persistency.instance(conf.localStoragePath).valueOr: + error "Failed to initialize persistency instance", error = $error + return err("Failed to initialize persistency instance: " & $error) + + (await startNode(waku.node, waku.conf, waku.dynamicBootstrapNodes)).isOkOr: + return err("error while calling startNode: " & $error) + + let bound = getPorts(waku.node.switch.peerInfo.listenAddrs).valueOr: + return err("failed to read bound ports from switch: " & $error) + waku.node.ports.tcp = bound.tcpPort.get(Port(0)).uint16 + waku.node.ports.webSocket = bound.websocketPort.get(Port(0)).uint16 + waku.node.ports.quic = bound.quicPort.get(Port(0)).uint16 + + ## Discv5 + if conf.discv5Conf.isSome(): + waku.wakuDiscV5 = ( + await waku_discv5.setupAndStartDiscv5( + waku.node.enr, + waku.node.peerManager, + waku.node.topicSubscriptionQueue, + conf.discv5Conf.get(), + waku.dynamicBootstrapNodes, + waku.rng, + conf.nodeKey, + conf.endpointConf.p2pListenAddress, + conf.portsShift, + ) + ).valueOr: + return err("failed to start waku discovery v5: " & error) + + waku.node.ports.discv5Udp = waku.wakuDiscV5.udpPort.uint16 + waku.conf.discv5Conf.get().udpPort = waku.wakuDiscV5.udpPort + + ## Update waku data that is set dynamically on node start + try: + (await updateWaku(waku)).isOkOr: + return err("Error in start: " & $error) + except CatchableError: + return err("Caught exception in start: " & getCurrentExceptionMsg()) + + waku.node.subscriptionManager.subscribeAllAutoshards().isOkOr: + return err("failed to auto-subscribe autosharding shards: " & $error) + + ## Health Monitor + waku.healthMonitor.startHealthMonitor().isOkOr: + return err("failed to start health monitor: " & $error) + + ## Setup RequestConnectionStatus provider + + RequestConnectionStatus.setProvider( + globalBrokerContext(), + proc(): Result[RequestConnectionStatus, string] = + try: + let healthReport = waku.healthMonitor.getSyncNodeHealthReport() + return + ok(RequestConnectionStatus(connectionStatus: healthReport.connectionStatus)) + except CatchableError: + err("Failed to read health report: " & getCurrentExceptionMsg()), + ).isOkOr: + error "Failed to set RequestConnectionStatus provider", error = error + + ## Setup RequestProtocolHealth provider + + RequestProtocolHealth.setProvider( + globalBrokerContext(), + proc( + protocol: WakuProtocol + ): Future[Result[RequestProtocolHealth, string]] {.async.} = + try: + let protocolHealthStatus = + await waku.healthMonitor.getProtocolHealthInfo(protocol) + return ok(RequestProtocolHealth(healthStatus: protocolHealthStatus)) + except CatchableError: + return err("Failed to get protocol health: " & getCurrentExceptionMsg()), + ).isOkOr: + error "Failed to set RequestProtocolHealth provider", error = error + + ## Setup RequestHealthReport provider + + RequestHealthReport.setProvider( + globalBrokerContext(), + proc(): Future[Result[RequestHealthReport, string]] {.async.} = + try: + let report = await waku.healthMonitor.getNodeHealthReport() + return ok(RequestHealthReport(healthReport: report)) + except CatchableError: + return err("Failed to get health report: " & getCurrentExceptionMsg()), + ).isOkOr: + error "Failed to set RequestHealthReport provider", error = error + + if conf.restServerConf.isSome(): + rest_server_builder.startRestServerProtocolSupport( + waku.restServer, + waku.node, + waku.wakuDiscv5, + conf.restServerConf.get(), + conf.relay, + conf.lightPush, + conf.clusterId, + conf.subscribeShards, + conf.contentTopics, + ).isOkOr: + return err ("Starting protocols support REST server failed: " & $error) + + if conf.metricsServerConf.isSome(): + try: + let (server, port) = ( + await waku_metrics.startMetricsServerAndLogging( + conf.metricsServerConf.get(), conf.portsShift + ) + ).valueOr: + return err("Starting monitoring and external interfaces failed: " & error) + waku.metricsServer = server + waku.node.ports.metrics = port.uint16 + waku.conf.metricsServerConf.get().httpPort = port + except CatchableError: + return err( + "Caught exception starting monitoring and external interfaces failed: " & + getCurrentExceptionMsg() + ) + waku.healthMonitor.setOverallHealth(HealthStatus.READY) + + if not waku.messagingClient.isNil(): + waku.messagingClient.start().isOkOr: + return err("failed to start messaging client: " & $error) + + if not waku.reliableChannelManager.isNil(): + waku.reliableChannelManager.start().isOkOr: + return err("failed to start reliable channel manager: " & $error) + + return ok() + +# Notice this interface to be used only from LogosDelivery, hence not in the interface level. +proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = + if not waku.node.started: + warn "stop: attempting to stop node that isn't running" + + try: + waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) + + Persistency.reset() + + if not waku.metricsServer.isNil(): + await waku.metricsServer.stop() + + if not waku.wakuDiscv5.isNil(): + await waku.wakuDiscv5.stop() + + if not waku.reliableChannelManager.isNil(): + await waku.reliableChannelManager.stop() + + if not waku.messagingClient.isNil(): + await waku.messagingClient.stop() + + if not waku.node.isNil(): + await waku.node.stop() + + if not waku.dnsRetryLoopHandle.isNil(): + await waku.dnsRetryLoopHandle.cancelAndWait() + + if not waku.healthMonitor.isNil(): + await waku.healthMonitor.stopHealthMonitor() + + ## Clear RequestConnectionStatus provider + RequestConnectionStatus.clearProvider(waku.brokerCtx) + + if not waku.restServer.isNil(): + await waku.restServer.stop() + except Exception: + error "waku stop failed: " & getCurrentExceptionMsg() + return err("waku stop failed: " & getCurrentExceptionMsg()) + + return ok() + +{.pop.} + # end of `{.push raises: [].}` — kernel impl methods may propagate + # CatchableError (the BrokerImplement provider wrappers catch them). + +const FilterOpTimeout = 5.seconds + +BrokerImplement Waku of KernelInterface: + ## `new` is the BARE constructor (no ctx, no providers). Legacy callers keep + ## using `Waku.new(...)` unchanged — it is re-emitted verbatim by the macro + ## and returns a `globalBrokerContext`-bound node exactly as before, with the + ## kernel request-broker providers left unwired. `Waku.create(...)` / + ## `Waku.createUnderContext(...)` are additionally generated (async `Result` + ## shape) to wire the kernel under a fresh per-instance ctx when needed. + proc new*( + T: type Waku, wakuConf: WakuConf, appCallbacks: AppCallbacks = nil + ): Future[Result[Waku, string]] {.async.} = + let rng = crypto.newRng() + let brokerCtx = globalBrokerContext() + + logging.setupLog(wakuConf.logLevel, wakuConf.logFormat) + + ?wakuConf.validate() + wakuConf.logConf() + + let relay = newCircuitRelay(wakuConf.circuitRelayClient) + + let node = (await setupNode(wakuConf, rng, relay)).valueOr: + error "Failed setting up node", error = $error + return err("Failed setting up node: " & $error) + + let healthMonitor = NodeHealthMonitor.new(node, wakuConf.dnsAddrsNameServers) + + let restServer: WakuRestServerRef = + if wakuConf.restServerConf.isSome(): + let restServer = startRestServerEssentials( + healthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift + ).valueOr: + error "Starting essential REST server failed", error = $error + return err("Failed to start essential REST server in Waku.new: " & $error) + + restServer + else: + nil + + if not restServer.isNil(): + let boundRestPort = restServer.httpServer.address.port + node.ports.rest = boundRestPort.uint16 + wakuConf.restServerConf.get().port = boundRestPort + + # Set the extMultiAddrsOnly flag so the node knows not to replace explicit addresses + node.extMultiAddrsOnly = wakuConf.endpointConf.extMultiAddrsOnly + + node.setupAppCallbacks(wakuConf, appCallbacks, healthMonitor).isOkOr: + error "Failed setting up app callbacks", error = error + return err("Failed setting up app callbacks: " & $error) + + var waku = Waku( + stateInfo: WakuStateInfo.init(node), + conf: wakuConf, + rng: rng, + key: wakuConf.nodeKey, + node: node, + healthMonitor: healthMonitor, + appCallbacks: appCallbacks, + restServer: restServer, + brokerCtx: brokerCtx, + ) + + waku.setupSwitchServices(wakuConf, relay, rng) + + ok(waku) + + # --- topic construction --- + method buildContentTopic( + self: Waku, appName: string, appVersion: uint32, name: string, encoding: string + ): Future[Result[ContentTopic, string]] {.async.} = + try: + return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) + except CatchableError as e: + return err(e.msg) + + method buildPubsubTopic( + self: Waku, topicName: string + ): Future[Result[PubsubTopic, string]] {.async.} = + try: + return ok(PubsubTopic(fmt"/waku/2/{topicName}")) + except CatchableError as e: + return err(e.msg) + + method defaultPubsubTopic(self: Waku): Future[Result[PubsubTopic, string]] {.async.} = + return ok(DefaultPubsubTopic) + + # --- relay --- + method relayPublish( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 + ): Future[Result[int, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayPublish: WakuRelay not mounted") + + let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr: + return err($error) + + return ok(numPeers) + except CatchableError as e: + return err(e.msg) + + method relaySubscribe( + self: Waku, pubsubTopic: PubsubTopic + ): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relaySubscribe: WakuRelay not mounted") + + let handler = proc(topic: PubsubTopic, msg: WakuMessage) {.async.} = + ## Bridge inbound relay traffic to the `ReceivedMessage` kernel event + ## (replaces libwaku's set_event_callback message path). + ReceivedMessage.emit( + self.brokerCtx, ReceivedMessage(pubsubTopic: topic, message: msg) + ) + + self.node.subscribe( + (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), + WakuRelayHandler(handler), + ).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + + method relayUnsubscribe( + self: Waku, pubsubTopic: PubsubTopic + ): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayUnsubscribe: WakuRelay not mounted") + + self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + + method relayAddProtectedShard( + self: Waku, clusterId: uint16, shardId: uint16, publicKey: string + ): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayAddProtectedShard: WakuRelay not mounted") + + let pubKey = SkPublicKey.fromHex(publicKey).valueOr: + return err("relayAddProtectedShard: invalid public key: " & $error) + + let protectedShard = ProtectedShard(shard: shardId, key: pubKey) + self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId) + return ok(true) + except CatchableError as e: + return err(e.msg) + + method relayConnectedPeers( + self: Waku, pubsubTopic: PubsubTopic + ): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayConnectedPeers: WakuRelay not mounted") + + let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr: + return err($error) + + return ok(connPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + + method relayPeersInMesh( + self: Waku, pubsubTopic: PubsubTopic + ): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayPeersInMesh: WakuRelay not mounted") + + let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr: + return err($error) + + return ok(meshPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + + # --- filter --- + method filterSubscribe( + self: Waku, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: string, + ): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer) + if not await subFut.withTimeout(FilterOpTimeout): + return err("filter subscription timed out") + subFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + + method filterUnsubscribe( + self: Waku, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: string, + ): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + + method filterUnsubscribeAll( + self: Waku, peer: string + ): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let unsubFut = self.node.filterUnsubscribeAll(peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription all timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + + # --- lightpush --- + method lightpushPublish( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string + ): Future[Result[string, string]] {.async.} = + try: + if self.node.wakuLegacyLightpushClient.isNil(): + return err("wakuLegacyLightpushClient is not mounted") + + let remotePeer = parsePeerInfo(peer).valueOr: + return err("lightpushPublish failed to parse peer addr: " & $error) + + let msgHashHex = ( + await self.node.wakuLegacyLightpushClient.publish( + pubsubTopic, message, remotePeer + ) + ).valueOr: + return err($error) + + return ok(msgHashHex) + except CatchableError as e: + return err(e.msg) + + # --- store --- + method storeQuery( + self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int + ): Future[Result[StoreQueryResponse, string]] {.async.} = + try: + if self.node.wakuStoreClient.isNil(): + return err("wakuStoreClient is not mounted") + + let remotePeer = parsePeerInfo(peer).valueOr: + return err("storeQuery failed to parse peer addr: " & $error) + + let queryFut = self.node.wakuStoreClient.query(request, remotePeer) + if not await queryFut.withTimeout(timeoutMs.milliseconds): + return err("storeQuery timed out") + + let queryResponse = queryFut.read().valueOr: + return err("storeQuery failed: " & $error) + + return ok(queryResponse) + except CatchableError as e: + return err(e.msg) + + # --- peer management --- + method connect( + self: Waku, peers: seq[string], timeoutMs: uint32 + ): Future[Result[bool, string]] {.async.} = + try: + await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") + return ok(true) + except CatchableError as e: + return err(e.msg) + + method disconnectPeerById( + self: Waku, peerId: string + ): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + await self.node.peerManager.disconnectNode(pId) + return ok(true) + except CatchableError as e: + return err(e.msg) + + method disconnectAllPeers(self: Waku): Future[Result[bool, string]] {.async.} = + try: + await self.node.peerManager.disconnectAllPeers() + return ok(true) + except CatchableError as e: + return err(e.msg) + + method dialPeer( + self: Waku, peerAddr: string, protocol: string, timeoutMs: int + ): Future[Result[bool, string]] {.async.} = + try: + let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + + method dialPeerById( + self: Waku, peerId: string, protocol: string, timeoutMs: int + ): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(pId, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + + method peerIdsFromPeerstore( + self: Waku + ): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) + except CatchableError as e: + return err(e.msg) + + method connectedPeersInfo(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok( + self.node.peerManager.switch.peerStore + .peers() + .filterIt(it.connectedness == Connected) + .mapIt($it.peerId) + ) + except CatchableError as e: + return err(e.msg) + + method connectedPeers(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() + return ok(concat(inPeerIds, outPeerIds).mapIt($it)) + except CatchableError as e: + return err(e.msg) + + method peerIdsByProtocol( + self: Waku, protocol: string + ): Future[Result[seq[string], string]] {.async.} = + try: + return ok( + self.node.peerManager.switch.peerStore + .peers(protocol) + .filterIt(it.connectedness == Connected) + .mapIt($it.peerId) + ) + except CatchableError as e: + return err(e.msg) + + # --- discovery --- + method dnsDiscovery( + self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int + ): Future[Result[seq[string], string]] {.async.} = + try: + let dnsNameServers = @[parseIpAddress(nameServer)] + let discoveredPeers = ( + await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) + ).valueOr: + return err("failed discovering peers from DNS: " & $error) + + var multiAddresses = newSeq[string]() + for discPeer in discoveredPeers: + for address in discPeer.addrs: + multiAddresses.add($address & "/p2p/" & $discPeer) + + return ok(multiAddresses) + except CatchableError as e: + return err(e.msg) + + method discv5UpdateBootnodes( + self: Waku, bootnodes: seq[string] + ): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]" + self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr: + return err("error in discv5UpdateBootnodes: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + + method startDiscv5(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + (await self.wakuDiscv5.start()).isOkOr: + return err("error starting discv5: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + + method stopDiscv5(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + await self.wakuDiscv5.stop() + return ok(true) + except CatchableError as e: + return err(e.msg) + + method peerExchangeRequest( + self: Waku, numPeers: uint64 + ): Future[Result[int, string]] {.async.} = + try: + let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: + return err("failed peer exchange: " & $error) + return ok(numPeersRecv) + except CatchableError as e: + return err(e.msg) + + # --- debug / info --- + method version(self: Waku): Future[Result[string, string]] {.async.} = + return ok(WakuNodeVersionString) + + method listenAddresses(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.info().listenAddresses) + except CatchableError as e: + return err(e.msg) + + method myEnr(self: Waku): Future[Result[string, string]] {.async.} = + try: + return ok(self.node.enr.toURI()) + except CatchableError as e: + return err(e.msg) + + method myPeerId(self: Waku): Future[Result[string, string]] {.async.} = + try: + return ok($self.node.peerId()) + except CatchableError as e: + return err(e.msg) + + method metrics(self: Waku): Future[Result[string, string]] {.async.} = + {.gcsafe.}: + try: + return ok(defaultRegistry.toText()) + except CatchableError as e: + return err(e.msg) + + method isOnline(self: Waku): Future[Result[bool, string]] {.async.} = + return ok(self.healthMonitor.onlineMonitor.amIOnline()) + + method pingPeer( + self: Waku, peerAddr: string, timeoutMs: int + ): Future[Result[int64, string]] {.async.} = + try: + let peerInfo = parsePeerInfo(peerAddr).valueOr: + return err("pingPeer failed to parse peer addr: " & $error) + + let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) + defer: + await conn.close() + let pingRTT = await self.node.libp2pPing.ping(conn) + + if pingRTT == 0.nanos: + return err("could not ping peer: rtt-0") + + return ok(pingRTT.nanos) + except CatchableError as e: + return err(e.msg) diff --git a/logos_delivery/waku/node/health_monitor/connection_status.nim b/logos_delivery/waku/node/health_monitor/connection_status.nim index 68ec9d4be..3c1c5cf0f 100644 --- a/logos_delivery/waku/node/health_monitor/connection_status.nim +++ b/logos_delivery/waku/node/health_monitor/connection_status.nim @@ -1,4 +1,5 @@ -import chronos, results, std/strutils, ../../api/types +import chronos, results, std/strutils +import logos_delivery/api/types export ConnectionStatus diff --git a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim index 6e77816f0..465598794 100644 --- a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim +++ b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim @@ -8,10 +8,10 @@ import libp2p/protocols/rendezvous, libp2p/protocols/pubsub, libp2p/protocols/pubsub/rpc/messages, + logos_delivery/api/types, logos_delivery/waku/[ waku_relay, waku_rln_relay, - api/types, events/health_events, events/peer_events, node/waku_node, diff --git a/logos_delivery/waku/requests/health_requests.nim b/logos_delivery/waku/requests/health_requests.nim index 1c9ed4d70..366cdf875 100644 --- a/logos_delivery/waku/requests/health_requests.nim +++ b/logos_delivery/waku/requests/health_requests.nim @@ -1,6 +1,6 @@ import brokers/request_broker -import logos_delivery/waku/api/types +import logos_delivery/api/types import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health, health_report] import logos_delivery/waku/waku_core/topics diff --git a/logos_delivery/waku/rest_api/endpoint/health/types.nim b/logos_delivery/waku/rest_api/endpoint/health/types.nim index 331460e7a..c32a0c778 100644 --- a/logos_delivery/waku/rest_api/endpoint/health/types.nim +++ b/logos_delivery/waku/rest_api/endpoint/health/types.nim @@ -4,7 +4,8 @@ import logos_delivery/waku/compat/option_valueor import results import chronicles, json_serialization, json_serialization/std/options import ../serdes -import logos_delivery/waku/[waku_node, api/types, node/health_monitor] +import logos_delivery/api/types +import logos_delivery/waku/[waku_node, node/health_monitor] #### Serialization and deserialization diff --git a/logos_delivery/waku/waku.nim b/logos_delivery/waku/waku.nim index 188267f07..dd0f4a781 100644 --- a/logos_delivery/waku/waku.nim +++ b/logos_delivery/waku/waku.nim @@ -20,6 +20,7 @@ import metrics, metrics/chronos_httpserver, brokers/broker_context, + logos_delivery/api/types, logos_delivery/waku/[ waku_core, waku_node, @@ -30,7 +31,6 @@ import waku_relay/protocol, waku_enr/sharding, waku_enr/multiaddr, - api/types, common/logging, node/peer_manager, node/health_monitor, @@ -48,6 +48,7 @@ import factory/internal_config, factory/app_callbacks, persistency/persistency, + factory/validator_signed, ], ./factory/waku_conf, ./factory/waku_state_info @@ -611,15 +612,8 @@ proc relaySubscribe*( if self.node.wakuRelay.isNil(): return err("relaySubscribe: WakuRelay not mounted") - let handler = proc(topic: PubsubTopic, msg: WakuMessage) {.async.} = - ## Bridge inbound relay traffic to the `ReceivedMessage` kernel event - ## (replaces libwaku's set_event_callback message path). - ReceivedMessage.emit( - self.brokerCtx, ReceivedMessage(pubsubTopic: topic, message: msg) - ) - self.node.subscribe( - (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler) + (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil) ).isOkOr: return err($error) @@ -969,9 +963,6 @@ proc metrics*(self: Waku): Future[Result[string, string]] {.async.} = except CatchableError as e: return err(e.msg) -proc isOnline*(self: Waku): Future[Result[bool, string]] {.async.} = - return ok(self.healthMonitor.onlineMonitor.amIOnline()) - proc pingPeer*( self: Waku, peerAddr: string, timeoutMs: int ): Future[Result[int64, string]] {.async.} = diff --git a/tests/api/test_node_conf.nim b/tests/api/test_node_conf.nim index a5bff3906..df5b18887 100644 --- a/tests/api/test_node_conf.nim +++ b/tests/api/test_node_conf.nim @@ -5,7 +5,7 @@ import json_serialization, confutils, confutils/std/net import tools/confutils/cli_args, tools/confutils/conf_from_json, - logos_delivery/waku/api/api_conf, + logos_delivery/api/api_conf, logos_delivery/waku/factory/waku_conf, logos_delivery/waku/factory/networks_config, logos_delivery/waku/factory/conf_builder/conf_builder, @@ -350,7 +350,7 @@ suite "WakuNodeConf JSON -> WakuConf integration": {.push warning[Deprecated]: off.} -import logos_delivery/waku/api/api_conf +import logos_delivery/api/api_conf suite "NodeConfig (deprecated) - toWakuConf": test "Minimal configuration": diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 881c22b3a..b3d9a8f00 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -58,7 +58,7 @@ suite "Reliable Channel - ingress": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager ## Noop encryption providers so the Encrypt/Decrypt brokers have @@ -124,7 +124,7 @@ suite "Reliable Channel - ingress": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -181,7 +181,7 @@ suite "Reliable Channel - send state machine": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -246,7 +246,7 @@ suite "Reliable Channel - send state machine": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -347,7 +347,7 @@ suite "Reliable Channel - send state machine": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -452,7 +452,7 @@ suite "Reliable Channel - SDS persistence": var waku: LogosDelivery var manager: ReliableChannelManager lockNewGlobalBrokerContext: - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -522,7 +522,7 @@ suite "Reliable Channel - SDS lifecycle": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -593,7 +593,7 @@ suite "Reliable Channel - SDS lifecycle": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -650,7 +650,7 @@ suite "Reliable Channel - SDS lifecycle": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -710,7 +710,7 @@ suite "Reliable Channel - SDS lifecycle": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -792,7 +792,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -860,7 +860,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -948,7 +948,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -1023,7 +1023,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -1096,7 +1096,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -1162,7 +1162,7 @@ suite "Reliable Channel - SDS protocol semantics": var waku: LogosDelivery var manager: ReliableChannelManager lockNewGlobalBrokerContext: - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr() diff --git a/tests/test_waku.nim b/tests/test_waku.nim index ff009c3ba..faf1575eb 100644 --- a/tests/test_waku.nim +++ b/tests/test_waku.nim @@ -9,7 +9,7 @@ import tools/confutils/cli_args import logos_delivery/waku/factory/networks_config import logos_delivery/waku/factory/conf_builder/conf_builder -suite "Waku API - Create node": +suite "LogosDelivery API - Create node": asyncTest "Create node with minimal configuration": ## Given var nodeConf = defaultWakuNodeConf().valueOr: @@ -21,14 +21,14 @@ suite "Waku API - Create node": # This is the actual minimal config but as the node auto-start, it is not suitable for tests ## When - let node = (await createNode(nodeConf)).valueOr: - raiseAssert "createNode (minimal config) failed: " & error + let ld = (await LogosDelivery.new(nodeConf)).valueOr: + raiseAssert "LogosDelivery.new (minimal config) failed: " & error ## Then check: - not node.isNil() - node.conf.clusterId == 3 - node.conf.relay == true + not ld.isNil() + ld.waku.conf.clusterId == 3 + ld.waku.conf.relay == true asyncTest "Create node with full configuration": ## Given @@ -47,20 +47,20 @@ suite "Waku API - Create node": ] ## When - let node = (await createNode(nodeConf)).valueOr: - raiseAssert "createNode (full config) failed: " & error + let ld = (await LogosDelivery.new(nodeConf)).valueOr: + raiseAssert "LogosDelivery.new (full config) failed: " & error ## Then check: - not node.isNil() - node.conf.clusterId == 99 - node.conf.shardingConf.numShardsInCluster == 16 - node.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64 - node.conf.staticNodes.len == 1 - node.conf.relay == true - node.conf.lightPush == true - node.conf.peerExchangeService == true - node.conf.rendezvous == true + not ld.isNil() + ld.waku.conf.clusterId == 99 + ld.waku.conf.shardingConf.numShardsInCluster == 16 + ld.waku.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64 + ld.waku.conf.staticNodes.len == 1 + ld.waku.conf.relay == true + ld.waku.conf.lightPush == true + ld.waku.conf.peerExchangeService == true + ld.waku.conf.rendezvous == true asyncTest "Create node with mixed entry nodes (enrtree, multiaddr)": ## Given @@ -75,18 +75,18 @@ suite "Waku API - Create node": ] ## When - let node = (await createNode(nodeConf)).valueOr: - raiseAssert "createNode (mixed entry nodes) failed: " & error + let ld = (await LogosDelivery.new(nodeConf)).valueOr: + raiseAssert "LogosDelivery.new (mixed entry nodes) failed: " & error ## Then check: - not node.isNil() - node.conf.clusterId == 42 + not ld.isNil() + ld.waku.conf.clusterId == 42 # ENRTree should go to DNS discovery - node.conf.dnsDiscoveryConf.isSome() - node.conf.dnsDiscoveryConf.get().enrTreeUrl == + ld.waku.conf.dnsDiscoveryConf.isSome() + ld.waku.conf.dnsDiscoveryConf.get().enrTreeUrl == "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" # Multiaddr should go to static nodes - node.conf.staticNodes.len == 1 - node.conf.staticNodes[0] == + ld.waku.conf.staticNodes.len == 1 + ld.waku.conf.staticNodes[0] == "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc"