diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim index dd3097e49..124d09173 100644 --- a/apps/liteprotocoltester/liteprotocoltester.nim +++ b/apps/liteprotocoltester/liteprotocoltester.nim @@ -14,7 +14,7 @@ import logos_delivery/waku/[ common/enr, common/logging, - factory/waku as waku_factory, + waku as waku_factory, waku_node, node/waku_metrics, node/peer_manager, diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index e61b76e9c..a75b472ec 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -11,7 +11,7 @@ import ../../tools/[rln_keystore_generator/rln_keystore_generator, confutils/cli_args], logos_delivery/waku/[ common/logging, - factory/waku, + waku, node/health_monitor, rest_api/endpoint/builder as rest_server_builder, waku_core/message/default_values, diff --git a/examples/wakustealthcommitments/node_spec.nim b/examples/wakustealthcommitments/node_spec.nim index 3f4deeb08..e8e0a2062 100644 --- a/examples/wakustealthcommitments/node_spec.nim +++ b/examples/wakustealthcommitments/node_spec.nim @@ -1,7 +1,7 @@ {.push raises: [].} import tools/confutils/cli_args -import logos_delivery/waku/[common/logging, factory/[waku, networks_config]] +import logos_delivery/waku/[common/logging, waku, factory/networks_config] import std/[options, strutils, os, sequtils], chronicles, diff --git a/library/events/json_connection_status_change_event.nim b/library/events/json_connection_status_change_event.nim index 9b8a0f9e6..f5af78f24 100644 --- a/library/events/json_connection_status_change_event.nim +++ b/library/events/json_connection_status_change_event.nim @@ -2,7 +2,7 @@ import system, std/json import ./json_base_event -import ../../logos_delivery/waku/api/types +import ../../logos_delivery/api/types type JsonConnectionStatusChangeEvent* = ref object of JsonEvent status*: ConnectionStatus diff --git a/library/kernel_api/debug_node_api.nim b/library/kernel_api/debug_node_api.nim index b1229d660..44764a673 100644 --- a/library/kernel_api/debug_node_api.nim +++ b/library/kernel_api/debug_node_api.nim @@ -9,7 +9,7 @@ import metrics, ffi import - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, logos_delivery/waku/node/waku_node, logos_delivery/waku/node/health_monitor, library/declare_lib diff --git a/library/kernel_api/discovery_api.nim b/library/kernel_api/discovery_api.nim index 7ae7d1dea..158c3a925 100644 --- a/library/kernel_api/discovery_api.nim +++ b/library/kernel_api/discovery_api.nim @@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor import std/json import chronos, chronicles, results, strutils, libp2p/multiaddress, ffi import - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, logos_delivery/waku/discovery/waku_dnsdisc, logos_delivery/waku/discovery/waku_discv5, logos_delivery/waku/waku_core/peers, diff --git a/library/kernel_api/node_lifecycle_api.nim b/library/kernel_api/node_lifecycle_api.nim index 279480031..dbf7ad9b1 100644 --- a/library/kernel_api/node_lifecycle_api.nim +++ b/library/kernel_api/node_lifecycle_api.nim @@ -5,7 +5,7 @@ import chronos, chronicles, results, confutils, confutils/std/net, ffi import logos_delivery/waku/node/peer_manager/peer_manager, tools/confutils/cli_args, - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, logos_delivery/waku/factory/node_factory, logos_delivery/waku/factory/app_callbacks, logos_delivery/waku/rest_api/endpoint/builder, diff --git a/library/kernel_api/peer_manager_api.nim b/library/kernel_api/peer_manager_api.nim index 38c15fc3d..eeea2c63f 100644 --- a/library/kernel_api/peer_manager_api.nim +++ b/library/kernel_api/peer_manager_api.nim @@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor import std/[sequtils, strutils, tables] import chronicles, chronos, results, options, json, ffi import - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, logos_delivery/waku/node/waku_node, logos_delivery/waku/node/peer_manager, library/declare_lib diff --git a/library/kernel_api/ping_api.nim b/library/kernel_api/ping_api.nim index 8313004d3..e6ed69dd1 100644 --- a/library/kernel_api/ping_api.nim +++ b/library/kernel_api/ping_api.nim @@ -1,9 +1,7 @@ import std/[json, strutils] import chronos, results, ffi import libp2p/[protocols/ping, switch, multiaddress, multicodec] -import - logos_delivery/waku/[factory/waku, waku_core/peers, node/waku_node], - library/declare_lib +import logos_delivery/waku/[waku, waku_core/peers, node/waku_node], library/declare_lib proc waku_ping_peer( ctx: ptr FFIContext[LogosDelivery], diff --git a/library/kernel_api/protocols/filter_api.nim b/library/kernel_api/protocols/filter_api.nim index fb1c905ca..a070bd2c7 100644 --- a/library/kernel_api/protocols/filter_api.nim +++ b/library/kernel_api/protocols/filter_api.nim @@ -4,7 +4,7 @@ import chronicles, chronos, results, ffi import logos_delivery/waku/waku_filter_v2/client, logos_delivery/waku/waku_core/message/message, - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, logos_delivery/waku/waku_relay, logos_delivery/waku/waku_filter_v2/common, logos_delivery/waku/waku_core/subscription/push_handler, diff --git a/library/kernel_api/protocols/lightpush_api.nim b/library/kernel_api/protocols/lightpush_api.nim index 287e37aa5..eb0d1de09 100644 --- a/library/kernel_api/protocols/lightpush_api.nim +++ b/library/kernel_api/protocols/lightpush_api.nim @@ -4,7 +4,7 @@ import chronicles, chronos, results, ffi import logos_delivery/waku/waku_core/message/message, logos_delivery/waku/waku_core/codecs, - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, logos_delivery/waku/waku_core/message, logos_delivery/waku/waku_core/topics/pubsub_topic, logos_delivery/waku/waku_lightpush_legacy/client, diff --git a/library/kernel_api/protocols/relay_api.nim b/library/kernel_api/protocols/relay_api.nim index 2195cdffd..d580597ae 100644 --- a/library/kernel_api/protocols/relay_api.nim +++ b/library/kernel_api/protocols/relay_api.nim @@ -3,7 +3,8 @@ import std/[net, sequtils, strutils, json], strformat import chronicles, chronos, stew/byteutils, results, ffi import logos_delivery/waku/waku_core/message/message, - logos_delivery/waku/factory/[validator_signed, waku], + logos_delivery/waku/factory/validator_signed, + logos_delivery/waku/waku, tools/confutils/cli_args, logos_delivery/waku/waku_core/message, logos_delivery/waku/waku_core/topics/pubsub_topic, diff --git a/library/kernel_api/protocols/store_api.nim b/library/kernel_api/protocols/store_api.nim index 7083f93f2..c6356bf6a 100644 --- a/library/kernel_api/protocols/store_api.nim +++ b/library/kernel_api/protocols/store_api.nim @@ -2,7 +2,7 @@ import logos_delivery/waku/compat/option_valueor import std/[json, sugar, strutils, options] import chronos, chronicles, results, stew/byteutils, ffi import - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, library/utils, logos_delivery/waku/waku_core/peers, logos_delivery/waku/waku_core/message/digest, diff --git a/library/liblogosdelivery.nim b/library/liblogosdelivery.nim index 8dbcfac29..0ead080bc 100644 --- a/library/liblogosdelivery.nim +++ b/library/liblogosdelivery.nim @@ -6,7 +6,7 @@ import logos_delivery/waku/waku_core/topics/pubsub_topic, logos_delivery/waku/waku_relay, logos_delivery, - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, logos_delivery/waku/node/waku_node, logos_delivery/waku/node/health_monitor/health_status, ../logos_delivery/waku/factory/app_callbacks, diff --git a/library/logos_delivery_api/messaging_api.nim b/library/logos_delivery_api/messaging_api.nim index 3c86138b4..0e4dc7449 100644 --- a/library/logos_delivery_api/messaging_api.nim +++ b/library/logos_delivery_api/messaging_api.nim @@ -3,9 +3,9 @@ import chronos, results, ffi import stew/byteutils import logos_delivery/waku/common/base64, - logos_delivery/waku/factory/waku, + logos_delivery/waku/waku, logos_delivery/waku/waku_core/topics/content_topic, - logos_delivery/waku/api/[api, types], + logos_delivery/api/types, ../declare_lib proc logosdelivery_subscribe( @@ -20,7 +20,7 @@ proc logosdelivery_subscribe( # ContentTopic is just a string type alias let contentTopic = ContentTopic($contentTopicStr) - (await api.subscribe(ctx.myLib[].waku, contentTopic)).isOkOr: + (await ctx.myLib[].messagingClient.subscribe(contentTopic)).isOkOr: let errMsg = $error return err("Subscribe failed: " & errMsg) @@ -38,7 +38,7 @@ proc logosdelivery_unsubscribe( # ContentTopic is just a string type alias let contentTopic = ContentTopic($contentTopicStr) - api.unsubscribe(ctx.myLib[].waku, contentTopic).isOkOr: + ctx.myLib[].messagingClient.unsubscribe(contentTopic).isOkOr: let errMsg = $error return err("Unsubscribe failed: " & errMsg) diff --git a/library/logos_delivery_api/node_api.nim b/library/logos_delivery_api/node_api.nim index f6a0945ec..354ab3a98 100644 --- a/library/logos_delivery_api/node_api.nim +++ b/library/logos_delivery_api/node_api.nim @@ -3,7 +3,8 @@ import chronos, chronicles, results, ffi import logos_delivery, logos_delivery/waku/node/waku_node, - logos_delivery/waku/api/[api, types], + logos_delivery/waku/events/message_events, + logos_delivery/api/types, logos_delivery/waku/events/[message_events, health_events], tools/confutils/conf_from_json, ../declare_lib, diff --git a/logos_delivery/waku/api/api_conf.nim b/logos_delivery/api/api_conf.nim similarity index 100% rename from logos_delivery/waku/api/api_conf.nim rename to logos_delivery/api/api_conf.nim diff --git a/logos_delivery/waku/api/types.nim b/logos_delivery/api/types.nim similarity index 98% rename from logos_delivery/waku/api/types.nim rename to logos_delivery/api/types.nim index d51d46994..5757a8e82 100644 --- a/logos_delivery/waku/api/types.nim +++ b/logos_delivery/api/types.nim @@ -6,7 +6,8 @@ import bearssl/rand, std/times, chronos import stew/byteutils import logos_delivery/waku/utils/requests as request_utils import logos_delivery/waku/waku_core/[topics/content_topic, message/message, time] -import logos_delivery/waku/requests/requests + +export content_topic, message type MessageEnvelope* = object diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index 5a7ab24d4..307dc17a4 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -21,7 +21,7 @@ import bearssl/rand import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto -import logos_delivery/waku/api/types +import logos_delivery/api/types import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/waku_core/topics @@ -135,7 +135,7 @@ proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) = ## and the total number of confirmed + failed segments equals the total expected segments. ## Therefore, the channel-level request is removed from `self.channelReqs` ## and the appropriate final event is emitted. - ## + ## let state = self.channelReqs.getOrDefault(channelReqId) if state.totalExpectedSegments == 0: ## Either already finalized (and removed) or never inserted. diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index 887444b19..29feab0b9 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -15,7 +15,7 @@ import brokers/broker_context import logos_delivery/waku/events/message_events as waku_message_events import logos_delivery/messaging/messaging_client -import logos_delivery/waku/api/types +import logos_delivery/api/types import logos_delivery/waku/waku_core/topics import logos_delivery/waku/persistency/sds_persistency diff --git a/logos_delivery/channels/types.nim b/logos_delivery/channels/types.nim index ec663cf7b..7730c5c58 100644 --- a/logos_delivery/channels/types.nim +++ b/logos_delivery/channels/types.nim @@ -1,7 +1,7 @@ ## Core identifier types for the Reliable Channel API. import std/hashes -import logos_delivery/waku/api/types as api_types +import logos_delivery/api/types as api_types import ./scalable_data_sync/scalable_data_sync diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index f01cf4207..f61202b9a 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -11,9 +11,7 @@ import results, chronos, chronicles -import logos_delivery/waku/api -export api -import logos_delivery/waku/factory/waku +import logos_delivery/waku/waku export waku import logos_delivery/messaging/messaging_client export messaging_client @@ -22,7 +20,8 @@ export reliable_channel_manager import logos_delivery/waku/factory/waku_conf import logos_delivery/waku/factory/app_callbacks -import logos_delivery/waku/api/[api_conf, types] +import tools/confutils/cli_args +import logos_delivery/waku/node/health_monitor/online_monitor logScope: topics = "logosdelivery" @@ -82,6 +81,13 @@ proc new*( proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = ## Starts each layer bottom-up: transport first, then messaging, then channels. + if self.waku.isNil(): + return err("Waku node is not initialized") + if self.messagingClient.isNil(): + return err("MessagingClient is not initialized") + if self.reliableChannelManager.isNil(): + return err("ReliableChannelManager is not initialized") + (await self.waku.start()).isOkOr: return err("failed to start Waku: " & error) @@ -102,3 +108,8 @@ proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = return err("failed to stop Waku: " & error) return ok() + +proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} = + if self.waku.isNil(): + return err("Waku node is not initialized") + return ok(self.waku.healthMonitor.onlineMonitor.amIOnline()) diff --git a/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim b/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim index 685f9afce..1f76078fe 100644 --- a/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim +++ b/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim @@ -3,7 +3,7 @@ import std/[options, times], chronos import brokers/broker_context import logos_delivery/waku/waku_core, - logos_delivery/waku/api/types, + logos_delivery/api/types, logos_delivery/waku/requests/node_requests type DeliveryState* {.pure.} = enum diff --git a/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim b/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim index 3acc44bb4..dc40b797d 100644 --- a/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim +++ b/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim @@ -4,7 +4,7 @@ import chronos, chronicles import brokers/broker_context import logos_delivery/waku/[waku_core], logos_delivery/waku/waku_lightpush/[common, rpc] import logos_delivery/waku/requests/health_requests -import logos_delivery/waku/api/types +import logos_delivery/api/types import ./[delivery_task, send_processor] logScope: diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 652671e46..96cd13eb1 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -1,7 +1,7 @@ import results, chronos import chronicles import - logos_delivery/waku/api/types, + logos_delivery/api/types, logos_delivery/waku/node/[waku_node, subscription_manager], logos_delivery/messaging/delivery_service/[recv_service, send_service], logos_delivery/messaging/delivery_service/send_service/delivery_task @@ -43,6 +43,26 @@ proc stop*(self: MessagingClient) {.async.} = await self.recvService.stopRecvService() self.started = false +proc checkApiAvailability(self: MessagingClient): Result[void, string] = + if self.isNil(): + return err("MessagingClient is not initialized") + + return ok() + +proc subscribe*( + self: MessagingClient, contentTopic: ContentTopic +): Future[Result[void, string]] {.async.} = + ?checkApiAvailability(self) + + return self.node.subscriptionManager.subscribe(contentTopic) + +proc unsubscribe*( + self: MessagingClient, contentTopic: ContentTopic +): Result[void, string] = + ?checkApiAvailability(self) + + return self.node.subscriptionManager.unsubscribe(contentTopic) + proc send*( self: MessagingClient, envelope: MessageEnvelope ): Future[Result[RequestId, string]] {.async.} = diff --git a/logos_delivery/waku/api.nim b/logos_delivery/waku/api.nim deleted file mode 100644 index a977a062a..000000000 --- a/logos_delivery/waku/api.nim +++ /dev/null @@ -1,5 +0,0 @@ -import ./api/[api, api_conf] -import ./events/message_events -import tools/confutils/entry_nodes - -export api, api_conf, entry_nodes, message_events diff --git a/logos_delivery/waku/api/api.nim b/logos_delivery/waku/api/api.nim deleted file mode 100644 index 0ffc38dc8..000000000 --- a/logos_delivery/waku/api/api.nim +++ /dev/null @@ -1,48 +0,0 @@ -import logos_delivery/waku/compat/option_valueor -import std/[net, options] - -import chronicles, chronos, libp2p/peerid, results - -import logos_delivery/waku/factory/waku -import logos_delivery/waku/[requests/health_requests, waku_core, waku_node] -import logos_delivery/waku/node/subscription_manager -import libp2p/peerid -import tools/confutils/cli_args -import ./[api_conf, types] - -export cli_args - -logScope: - topics = "api" - -proc createNode*(conf: WakuNodeConf): Future[Result[Waku, string]] {.async.} = - let wakuConf = conf.toWakuConf().valueOr: - return err("Failed to handle the configuration: " & error) - - ## We are not defining app callbacks at node creation - let wakuRes = (await Waku.new(wakuConf)).valueOr: - error "waku initialization failed", error = error - return err("Failed setting up Waku: " & $error) - - return ok(wakuRes) - -proc checkApiAvailability(w: Waku): Result[void, string] = - if w.isNil(): - return err("Waku node is not initialized") - - # TODO: Conciliate request-bouncing health checks here with unit testing. - # (For now, better to just allow all sends and rely on retries.) - - return ok() - -proc subscribe*( - w: Waku, contentTopic: ContentTopic -): Future[Result[void, string]] {.async.} = - ?checkApiAvailability(w) - - return w.node.subscriptionManager.subscribe(contentTopic) - -proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] = - ?checkApiAvailability(w) - - return w.node.subscriptionManager.unsubscribe(contentTopic) diff --git a/logos_delivery/waku/api/send_api.md b/logos_delivery/waku/api/send_api.md deleted file mode 100644 index 2a5a2f8a4..000000000 --- a/logos_delivery/waku/api/send_api.md +++ /dev/null @@ -1,46 +0,0 @@ -# SEND API - -**THIS IS TO BE REMOVED BEFORE PR MERGE** - -This document collects logic and todo's around the Send API. - -## Overview - -Send api hides the complex logic of using raw protocols for reliable message delivery. -The delivery method is chosen based on the node configuration and actual availabilities of peers. - -## Delivery task - -Each message send request is bundled into a task that not just holds the composed message but also the state of the delivery. - -## Delivery methods - -Depending on the configuration and the availability of store client protocol + actual configured and/or discovered store nodes: -- P2PReliability validation - checking network store node whether the message is reached at least a store node. -- Simple retry until message is propagated to the network - - Relay says >0 peers as publish result - - LightpushClient returns with success - -Depending on node config: -- Relay -- Lightpush - -These methods are used in combination to achieve the best reliability. -Fallback mechanism is used to switch between methods if the current one fails. - -Relay+StoreCheck -> Relay+simple retry -> Lightpush+StoreCheck -> Lightpush simple retry -> Error - -Combination is dynamically chosen on node configuration. Levels can be skipped depending on actual connectivity. -Actual connectivity is checked: -- Relay's topic health check - at least dLow peers in the mesh for the topic -- Store nodes availability - at least one store service node is available in peer manager -- Lightpush client availability - at least one lightpush service node is available in peer manager - -## Delivery processing - -At every send request, each task is tried to be delivered right away. -Any further retries and store check is done as a background task in a loop with predefined intervals. -Each task is set for a maximum number of retries and/or maximum time to live. - -In each round of store check and retry send tasks are selected based on their state. -The state is updated based on the result of the delivery method. diff --git a/logos_delivery/waku/events/health_events.nim b/logos_delivery/waku/events/health_events.nim index 4ff6f0c6c..d19de776b 100644 --- a/logos_delivery/waku/events/health_events.nim +++ b/logos_delivery/waku/events/health_events.nim @@ -1,6 +1,6 @@ import brokers/event_broker -import logos_delivery/waku/api/types +import logos_delivery/api/types import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health] import logos_delivery/waku/waku_core/topics diff --git a/logos_delivery/waku/events/message_events.nim b/logos_delivery/waku/events/message_events.nim index 9338fda67..2e4bece80 100644 --- a/logos_delivery/waku/events/message_events.nim +++ b/logos_delivery/waku/events/message_events.nim @@ -1,5 +1,6 @@ import brokers/event_broker -import logos_delivery/waku/[api/types, waku_core/message, waku_core/topics] +import logos_delivery/api/types +import logos_delivery/waku/[waku_core/message, waku_core/topics] export types EventBroker: diff --git a/logos_delivery/waku/node/health_monitor/connection_status.nim b/logos_delivery/waku/node/health_monitor/connection_status.nim index 68ec9d4be..fd0328fb7 100644 --- a/logos_delivery/waku/node/health_monitor/connection_status.nim +++ b/logos_delivery/waku/node/health_monitor/connection_status.nim @@ -1,4 +1,5 @@ -import chronos, results, std/strutils, ../../api/types +import chronos, results, std/strutils +from logos_delivery/api/types import ConnectionStatus export ConnectionStatus diff --git a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim index 859bf0179..8d647ac1e 100644 --- a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim +++ b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim @@ -8,10 +8,10 @@ import libp2p/protocols/rendezvous, libp2p/protocols/pubsub, libp2p/protocols/pubsub/rpc/messages, + logos_delivery/api/types, logos_delivery/waku/[ waku_relay, waku_rln, - api/types, events/health_events, events/peer_events, node/waku_node, diff --git a/logos_delivery/waku/requests/health_requests.nim b/logos_delivery/waku/requests/health_requests.nim index 1c9ed4d70..366cdf875 100644 --- a/logos_delivery/waku/requests/health_requests.nim +++ b/logos_delivery/waku/requests/health_requests.nim @@ -1,6 +1,6 @@ import brokers/request_broker -import logos_delivery/waku/api/types +import logos_delivery/api/types import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health, health_report] import logos_delivery/waku/waku_core/topics diff --git a/logos_delivery/waku/rest_api/endpoint/health/types.nim b/logos_delivery/waku/rest_api/endpoint/health/types.nim index 331460e7a..c32a0c778 100644 --- a/logos_delivery/waku/rest_api/endpoint/health/types.nim +++ b/logos_delivery/waku/rest_api/endpoint/health/types.nim @@ -4,7 +4,8 @@ import logos_delivery/waku/compat/option_valueor import results import chronicles, json_serialization, json_serialization/std/options import ../serdes -import logos_delivery/waku/[waku_node, api/types, node/health_monitor] +import logos_delivery/api/types +import logos_delivery/waku/[waku_node, node/health_monitor] #### Serialization and deserialization diff --git a/logos_delivery/waku/factory/waku.nim b/logos_delivery/waku/waku.nim similarity index 59% rename from logos_delivery/waku/factory/waku.nim rename to logos_delivery/waku/waku.nim index d160ac2b2..27ba98f3a 100644 --- a/logos_delivery/waku/factory/waku.nim +++ b/logos_delivery/waku/waku.nim @@ -11,6 +11,7 @@ import libp2p/wire, libp2p/crypto/crypto, libp2p/protocols/pubsub/gossipsub, + libp2p/protocols/ping, libp2p/services/autorelayservice, libp2p/services/hpservice, libp2p/peerid, @@ -20,6 +21,7 @@ import metrics, metrics/chronos_httpserver, brokers/broker_context, + logos_delivery/api/types, logos_delivery/waku/[ waku_core, waku_node, @@ -30,7 +32,6 @@ import waku_relay/protocol, waku_enr/sharding, waku_enr/multiaddr, - api/types, common/logging, node/peer_manager, node/health_monitor, @@ -48,9 +49,13 @@ import factory/internal_config, factory/app_callbacks, persistency/persistency, + factory/validator_signed, + waku_lightpush/client, + waku_lightpush_legacy/client, + waku_store/client, ], - ./waku_conf, - ./waku_state_info + ./factory/waku_conf, + ./factory/waku_state_info logScope: topics = "wakunode waku" @@ -58,6 +63,8 @@ logScope: # Git version in git describe format (defined at compile time) const git_version* {.strdefine.} = "n/a" +const FilterOpTimeout = 5.seconds + type Waku* = ref object stateInfo*: WakuStateInfo conf*: WakuConf @@ -567,12 +574,418 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = return ok() -proc isModeCoreAvailable*(waku: Waku): bool = - return not waku.node.wakuRelay.isNil() +## Kernel API realization +## +# --- topic construction --- +proc buildContentTopic*( + self: Waku, appName: string, appVersion: uint32, name: string, encoding: string +): Future[Result[ContentTopic, string]] {.async.} = + try: + return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}")) + except CatchableError as e: + return err(e.msg) -proc isModeEdgeAvailable*(waku: Waku): bool = - return - waku.node.wakuRelay.isNil() and not waku.node.wakuStoreClient.isNil() and - not waku.node.wakuFilterClient.isNil() and not waku.node.wakuLightPushClient.isNil() +proc buildPubsubTopic*( + self: Waku, topicName: string +): Future[Result[PubsubTopic, string]] {.async.} = + try: + return ok(PubsubTopic(fmt"/waku/2/{topicName}")) + except CatchableError as e: + return err(e.msg) + +proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} = + return ok(DefaultPubsubTopic) + +# --- relay --- +proc relayPublish*( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 +): Future[Result[int, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayPublish: WakuRelay not mounted") + + let numPeers = (await self.node.wakuRelay.publish(pubsubTopic, message)).valueOr: + return err($error) + + return ok(numPeers) + except CatchableError as e: + return err(e.msg) + +proc relaySubscribe*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relaySubscribe: WakuRelay not mounted") + + self.node.subscribe( + (kind: SubscriptionKind.PubsubSub, topic: pubsubTopic), WakuRelayHandler(nil) + ).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayUnsubscribe*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayUnsubscribe: WakuRelay not mounted") + + self.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: pubsubTopic)).isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayAddProtectedShard*( + self: Waku, clusterId: uint16, shardId: uint16, publicKey: string +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayAddProtectedShard: WakuRelay not mounted") + + let pubKey = SkPublicKey.fromHex(publicKey).valueOr: + return err("relayAddProtectedShard: invalid public key: " & $error) + + let protectedShard = ProtectedShard(shard: shardId, key: pubKey) + self.node.wakuRelay.addSignedShardsValidator(@[protectedShard], clusterId) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc relayConnectedPeers*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayConnectedPeers: WakuRelay not mounted") + + let connPeers = self.node.wakuRelay.getConnectedPeers(pubsubTopic).valueOr: + return err($error) + + return ok(connPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc relayPeersInMesh*( + self: Waku, pubsubTopic: PubsubTopic +): Future[Result[seq[string], string]] {.async.} = + try: + if self.node.wakuRelay.isNil(): + return err("relayPeersInMesh: WakuRelay not mounted") + + let meshPeers = self.node.wakuRelay.getPeersInMesh(pubsubTopic).valueOr: + return err($error) + + return ok(meshPeers.mapIt($it)) + except CatchableError as e: + return err(e.msg) + +# --- filter --- +proc filterSubscribe*( + self: Waku, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: string, +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let subFut = self.node.filterSubscribe(pubsubTopic, contentTopics, peer) + if not await subFut.withTimeout(FilterOpTimeout): + return err("filter subscription timed out") + subFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc filterUnsubscribe*( + self: Waku, + pubsubTopic: Option[PubsubTopic], + contentTopics: seq[ContentTopic], + peer: string, +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let unsubFut = self.node.filterUnsubscribe(pubsubTopic, contentTopics, peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc filterUnsubscribeAll*( + self: Waku, peer: string +): Future[Result[bool, string]] {.async.} = + try: + if self.node.wakuFilterClient.isNil(): + return err("wakuFilterClient is not mounted") + + let unsubFut = self.node.filterUnsubscribeAll(peer) + if not await unsubFut.withTimeout(FilterOpTimeout): + return err("filter un-subscription all timed out") + unsubFut.read().isOkOr: + return err($error) + + return ok(true) + except CatchableError as e: + return err(e.msg) + +# --- lightpush --- +proc lightpushPublish*( + self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string +): Future[Result[string, string]] {.async.} = + try: + if self.node.wakuLegacyLightpushClient.isNil(): + return err("wakuLegacyLightpushClient is not mounted") + + let remotePeer = parsePeerInfo(peer).valueOr: + return err("lightpushPublish failed to parse peer addr: " & $error) + + let msgHashHex = ( + await self.node.wakuLegacyLightpushClient.publish( + pubsubTopic, message, remotePeer + ) + ).valueOr: + return err($error) + + return ok(msgHashHex) + except CatchableError as e: + return err(e.msg) + +# --- store --- +proc storeQuery*( + self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int +): Future[Result[StoreQueryResponse, string]] {.async.} = + try: + if self.node.wakuStoreClient.isNil(): + return err("wakuStoreClient is not mounted") + + let remotePeer = parsePeerInfo(peer).valueOr: + return err("storeQuery failed to parse peer addr: " & $error) + + let queryFut = self.node.wakuStoreClient.query(request, remotePeer) + if not await queryFut.withTimeout(timeoutMs.milliseconds): + return err("storeQuery timed out") + + let queryResponse = queryFut.read().valueOr: + return err("storeQuery failed: " & $error) + + return ok(queryResponse) + except CatchableError as e: + return err(e.msg) + +# --- peer management --- +proc connect*( + self: Waku, peers: seq[string], timeoutMs: uint32 +): Future[Result[bool, string]] {.async.} = + try: + await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc disconnectPeerById*( + self: Waku, peerId: string +): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + await self.node.peerManager.disconnectNode(pId) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + await self.node.peerManager.disconnectAllPeers() + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc dialPeer*( + self: Waku, peerAddr: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async.} = + try: + let remotePeerInfo = parsePeerInfo(peerAddr).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(remotePeerInfo, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc dialPeerById*( + self: Waku, peerId: string, protocol: string, timeoutMs: int +): Future[Result[bool, string]] {.async.} = + try: + let pId = PeerId.init(peerId).valueOr: + return err($error) + let conn = await self.node.peerManager.dialPeer(pId, protocol) + if conn.isNone(): + return err("failed dialing peer") + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId)) + except CatchableError as e: + return err(e.msg) + +proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok( + self.node.peerManager.switch.peerStore + .peers() + .filterIt(it.connectedness == Connected) + .mapIt($it.peerId) + ) + except CatchableError as e: + return err(e.msg) + +proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers() + return ok(concat(inPeerIds, outPeerIds).mapIt($it)) + except CatchableError as e: + return err(e.msg) + +proc peerIdsByProtocol*( + self: Waku, protocol: string +): Future[Result[seq[string], string]] {.async.} = + try: + return ok( + self.node.peerManager.switch.peerStore + .peers(protocol) + .filterIt(it.connectedness == Connected) + .mapIt($it.peerId) + ) + except CatchableError as e: + return err(e.msg) + +# --- discovery --- +proc dnsDiscovery*( + self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int +): Future[Result[seq[string], string]] {.async.} = + try: + let dnsNameServers = @[parseIpAddress(nameServer)] + let discoveredPeers = ( + await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers) + ).valueOr: + return err("failed discovering peers from DNS: " & $error) + + var multiAddresses = newSeq[string]() + for discPeer in discoveredPeers: + for address in discPeer.addrs: + multiAddresses.add($address & "/p2p/" & $discPeer) + + return ok(multiAddresses) + except CatchableError as e: + return err(e.msg) + +proc discv5UpdateBootnodes*( + self: Waku, bootnodes: seq[string] +): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + let jsonArray = "[" & bootnodes.mapIt("\"" & it & "\"").join(",") & "]" + self.wakuDiscv5.updateBootstrapRecords(jsonArray).isOkOr: + return err("error in discv5UpdateBootnodes: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + (await self.wakuDiscv5.start()).isOkOr: + return err("error starting discv5: " & $error) + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} = + try: + if self.wakuDiscv5.isNil(): + return err("discv5 not started") + await self.wakuDiscv5.stop() + return ok(true) + except CatchableError as e: + return err(e.msg) + +proc peerExchangeRequest*( + self: Waku, numPeers: uint64 +): Future[Result[int, string]] {.async.} = + try: + let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr: + return err("failed peer exchange: " & $error) + return ok(numPeersRecv) + except CatchableError as e: + return err(e.msg) + +# --- debug / info --- +proc version*(self: Waku): Future[Result[string, string]] {.async.} = + return ok(WakuNodeVersionString) + +proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} = + try: + return ok(self.node.info().listenAddresses) + except CatchableError as e: + return err(e.msg) + +proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} = + try: + return ok(self.node.enr.toURI()) + except CatchableError as e: + return err(e.msg) + +proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} = + try: + return ok($self.node.peerId()) + except CatchableError as e: + return err(e.msg) + +proc metrics*(self: Waku): Future[Result[string, string]] {.async.} = + {.gcsafe.}: + try: + return ok(defaultRegistry.toText()) + except CatchableError as e: + return err(e.msg) + +proc pingPeer*( + self: Waku, peerAddr: string, timeoutMs: int +): Future[Result[int64, string]] {.async.} = + try: + let peerInfo = parsePeerInfo(peerAddr).valueOr: + return err("pingPeer failed to parse peer addr: " & $error) + + let conn = await self.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) + defer: + await conn.close() + let pingRTT = await self.node.libp2pPing.ping(conn) + + if pingRTT == 0.nanos: + return err("could not ping peer: rtt-0") + + return ok(pingRTT.nanos) + except CatchableError as e: + return err(e.msg) {.pop.} diff --git a/tests/api/test_api_receive.nim b/tests/api/test_api_receive.nim index d9eb02427..41c0f0477 100644 --- a/tests/api/test_api_receive.nim +++ b/tests/api/test_api_receive.nim @@ -187,7 +187,7 @@ proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} = raiseAssert "Message was not archived in time" # subscribe to the content topic; with no peers yet the subscriber stays offline - (await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe") + (await subscriber.messagingClient.subscribe(testTopic)).expect("Failed to subscribe") return TestNetwork( storeNode: storeNode, diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index 1169a7366..bf2851b02 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -225,7 +225,7 @@ suite "Messaging API, SubscriptionManager": await net.teardown() let testTopic = ContentTopic("/waku/2/test-content/proto") - (await net.subscriber.waku.subscribe(testTopic)).expect( + (await net.subscriber.messagingClient.subscribe(testTopic)).expect( "subscriberNode failed to subscribe" ) @@ -248,7 +248,9 @@ suite "Messaging API, SubscriptionManager": let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") - (await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe") + (await net.subscriber.messagingClient.subscribe(subbedTopic)).expect( + "failed to subscribe" + ) let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: @@ -268,8 +270,12 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/unsub-test/proto") - (await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe") - net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe") + (await net.subscriber.messagingClient.subscribe(testTopic)).expect( + "failed to subscribe" + ) + net.subscriber.messagingClient.unsubscribe(testTopic).expect( + "failed to unsubscribe" + ) let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: @@ -289,14 +295,14 @@ suite "Messaging API, SubscriptionManager": let topicA = ContentTopic("/waku/2/topic-a/proto") let topicB = ContentTopic("/waku/2/topic-b/proto") - (await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A") - (await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B") + (await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B") let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: await eventManager.teardown() - net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A") + net.subscriber.messagingClient.unsubscribe(topicA).expect("failed to unsub A") discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect( "Publish A failed" @@ -315,9 +321,13 @@ suite "Messaging API, SubscriptionManager": let glitchTopic = ContentTopic("/waku/2/glitch/proto") - (await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to sub") - (await net.subscriber.waku.subscribe(glitchTopic)).expect("failed to double sub") - net.subscriber.waku.unsubscribe(glitchTopic).expect("failed to unsub") + (await net.subscriber.messagingClient.subscribe(glitchTopic)).expect( + "failed to sub" + ) + (await net.subscriber.messagingClient.subscribe(glitchTopic)).expect( + "failed to double sub" + ) + net.subscriber.messagingClient.unsubscribe(glitchTopic).expect("failed to unsub") let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: @@ -338,7 +348,9 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/resub-test/proto") # Subscribe - (await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed") + (await net.subscriber.messagingClient.subscribe(testTopic)).expect( + "Initial sub failed" + ) var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard @@ -348,7 +360,7 @@ suite "Messaging API, SubscriptionManager": await eventManager.teardown() # Unsubscribe and verify teardown - net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed") + net.subscriber.messagingClient.unsubscribe(testTopic).expect("Unsub failed") eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard @@ -358,7 +370,7 @@ suite "Messaging API, SubscriptionManager": await eventManager.teardown() # Resubscribe - (await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed") + (await net.subscriber.messagingClient.subscribe(testTopic)).expect("Resub failed") eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard @@ -382,8 +394,8 @@ suite "Messaging API, SubscriptionManager": topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto") inc i - (await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A") - (await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B") + (await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B") let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 2) defer: @@ -440,7 +452,7 @@ suite "Messaging API, SubscriptionManager": # subscribe to all content topics we generated for t in allTopics: - (await net.subscriber.waku.subscribe(t)).expect("sub failed") + (await net.subscriber.messagingClient.subscribe(t)).expect("sub failed") activeSubs.add(t) await verifyNetworkState(activeSubs) @@ -448,7 +460,7 @@ suite "Messaging API, SubscriptionManager": # unsubscribe from some content topics for i in 0 ..< 50: let t = allTopics[i] - net.subscriber.waku.unsubscribe(t).expect("unsub failed") + net.subscriber.messagingClient.unsubscribe(t).expect("unsub failed") let idx = activeSubs.find(t) if idx >= 0: @@ -459,7 +471,7 @@ suite "Messaging API, SubscriptionManager": # re-subscribe to some content topics for i in 0 ..< 25: let t = allTopics[i] - (await net.subscriber.waku.subscribe(t)).expect("resub failed") + (await net.subscriber.messagingClient.subscribe(t)).expect("resub failed") activeSubs.add(t) await verifyNetworkState(activeSubs) @@ -470,7 +482,9 @@ suite "Messaging API, SubscriptionManager": await net.teardown() let testTopic = ContentTopic("/waku/2/test-content/proto") - (await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe") + (await net.subscriber.messagingClient.subscribe(testTopic)).expect( + "failed to subscribe" + ) let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: @@ -491,7 +505,9 @@ suite "Messaging API, SubscriptionManager": let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") - (await net.subscriber.waku.subscribe(subbedTopic)).expect("failed to subscribe") + (await net.subscriber.messagingClient.subscribe(subbedTopic)).expect( + "failed to subscribe" + ) let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: @@ -511,8 +527,12 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/unsub-test/proto") - (await net.subscriber.waku.subscribe(testTopic)).expect("failed to subscribe") - net.subscriber.waku.unsubscribe(testTopic).expect("failed to unsubscribe") + (await net.subscriber.messagingClient.subscribe(testTopic)).expect( + "failed to subscribe" + ) + net.subscriber.messagingClient.unsubscribe(testTopic).expect( + "failed to unsubscribe" + ) let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) defer: @@ -532,8 +552,8 @@ suite "Messaging API, SubscriptionManager": let topicA = ContentTopic("/waku/2/topic-a/proto") let topicB = ContentTopic("/waku/2/topic-b/proto") - (await net.subscriber.waku.subscribe(topicA)).expect("failed to sub A") - (await net.subscriber.waku.subscribe(topicB)).expect("failed to sub B") + (await net.subscriber.messagingClient.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.messagingClient.subscribe(topicB)).expect("failed to sub B") let shard = net.subscriber.waku.node.getRelayShard(topicA) await waitForEdgeSubs(net.subscriber, shard) @@ -542,7 +562,7 @@ suite "Messaging API, SubscriptionManager": defer: await eventManager.teardown() - net.subscriber.waku.unsubscribe(topicA).expect("failed to unsub A") + net.subscriber.messagingClient.unsubscribe(topicA).expect("failed to unsub A") discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect( "Publish A failed" @@ -561,7 +581,9 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/resub-test/proto") - (await net.subscriber.waku.subscribe(testTopic)).expect("Initial sub failed") + (await net.subscriber.messagingClient.subscribe(testTopic)).expect( + "Initial sub failed" + ) var eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 1".toBytes())).expect( @@ -571,7 +593,7 @@ suite "Messaging API, SubscriptionManager": require await eventManager.waitForEvents(TestTimeout) await eventManager.teardown() - net.subscriber.waku.unsubscribe(testTopic).expect("Unsub failed") + net.subscriber.messagingClient.unsubscribe(testTopic).expect("Unsub failed") eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard @@ -580,7 +602,7 @@ suite "Messaging API, SubscriptionManager": check not await eventManager.waitForEvents(NegativeTestTimeout) await eventManager.teardown() - (await net.subscriber.waku.subscribe(testTopic)).expect("Resub failed") + (await net.subscriber.messagingClient.subscribe(testTopic)).expect("Resub failed") eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1) discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 2".toBytes())).expect( @@ -653,7 +675,9 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/failover-test/proto") let shard = subscriber.waku.node.getRelayShard(testTopic) - (await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe") + (await subscriber.messagingClient.subscribe(testTopic)).expect( + "Failed to subscribe" + ) # Wait for dialing both filter servers (HealthyThreshold = 2) check await edgePeersReached(subscriber, shard, 2) @@ -783,7 +807,9 @@ suite "Messaging API, SubscriptionManager": let testTopic = ContentTopic("/waku/2/replacement-test/proto") let shard = subscriber.waku.node.getRelayShard(testTopic) - (await subscriber.waku.subscribe(testTopic)).expect("Failed to subscribe") + (await subscriber.messagingClient.subscribe(testTopic)).expect( + "Failed to subscribe" + ) # Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed. check await edgePeersReached(subscriber, shard, 2) diff --git a/tests/api/test_node_conf.nim b/tests/api/test_node_conf.nim index a5bff3906..df5b18887 100644 --- a/tests/api/test_node_conf.nim +++ b/tests/api/test_node_conf.nim @@ -5,7 +5,7 @@ import json_serialization, confutils, confutils/std/net import tools/confutils/cli_args, tools/confutils/conf_from_json, - logos_delivery/waku/api/api_conf, + logos_delivery/api/api_conf, logos_delivery/waku/factory/waku_conf, logos_delivery/waku/factory/networks_config, logos_delivery/waku/factory/conf_builder/conf_builder, @@ -350,7 +350,7 @@ suite "WakuNodeConf JSON -> WakuConf integration": {.push warning[Deprecated]: off.} -import logos_delivery/waku/api/api_conf +import logos_delivery/api/api_conf suite "NodeConfig (deprecated) - toWakuConf": test "Minimal configuration": diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 881c22b3a..b3d9a8f00 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -58,7 +58,7 @@ suite "Reliable Channel - ingress": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager ## Noop encryption providers so the Encrypt/Decrypt brokers have @@ -124,7 +124,7 @@ suite "Reliable Channel - ingress": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -181,7 +181,7 @@ suite "Reliable Channel - send state machine": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -246,7 +246,7 @@ suite "Reliable Channel - send state machine": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -347,7 +347,7 @@ suite "Reliable Channel - send state machine": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -452,7 +452,7 @@ suite "Reliable Channel - SDS persistence": var waku: LogosDelivery var manager: ReliableChannelManager lockNewGlobalBrokerContext: - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -522,7 +522,7 @@ suite "Reliable Channel - SDS lifecycle": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -593,7 +593,7 @@ suite "Reliable Channel - SDS lifecycle": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -650,7 +650,7 @@ suite "Reliable Channel - SDS lifecycle": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -710,7 +710,7 @@ suite "Reliable Channel - SDS lifecycle": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -792,7 +792,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -860,7 +860,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -948,7 +948,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -1023,7 +1023,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -1096,7 +1096,7 @@ suite "Reliable Channel - SDS protocol semantics": var brokerCtx: BrokerContext lockNewGlobalBrokerContext: brokerCtx = globalBrokerContext() - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager setNoopEncryption() @@ -1162,7 +1162,7 @@ suite "Reliable Channel - SDS protocol semantics": var waku: LogosDelivery var manager: ReliableChannelManager lockNewGlobalBrokerContext: - waku = (await LogosDelivery.new(createApiNodeConf())).expect("createNode") + waku = (await LogosDelivery.new(createApiNodeConf())).expect("LogosDelivery.new") manager = waku.reliableChannelManager check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr() diff --git a/tests/test_waku.nim b/tests/test_waku.nim index ff009c3ba..faf1575eb 100644 --- a/tests/test_waku.nim +++ b/tests/test_waku.nim @@ -9,7 +9,7 @@ import tools/confutils/cli_args import logos_delivery/waku/factory/networks_config import logos_delivery/waku/factory/conf_builder/conf_builder -suite "Waku API - Create node": +suite "LogosDelivery API - Create node": asyncTest "Create node with minimal configuration": ## Given var nodeConf = defaultWakuNodeConf().valueOr: @@ -21,14 +21,14 @@ suite "Waku API - Create node": # This is the actual minimal config but as the node auto-start, it is not suitable for tests ## When - let node = (await createNode(nodeConf)).valueOr: - raiseAssert "createNode (minimal config) failed: " & error + let ld = (await LogosDelivery.new(nodeConf)).valueOr: + raiseAssert "LogosDelivery.new (minimal config) failed: " & error ## Then check: - not node.isNil() - node.conf.clusterId == 3 - node.conf.relay == true + not ld.isNil() + ld.waku.conf.clusterId == 3 + ld.waku.conf.relay == true asyncTest "Create node with full configuration": ## Given @@ -47,20 +47,20 @@ suite "Waku API - Create node": ] ## When - let node = (await createNode(nodeConf)).valueOr: - raiseAssert "createNode (full config) failed: " & error + let ld = (await LogosDelivery.new(nodeConf)).valueOr: + raiseAssert "LogosDelivery.new (full config) failed: " & error ## Then check: - not node.isNil() - node.conf.clusterId == 99 - node.conf.shardingConf.numShardsInCluster == 16 - node.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64 - node.conf.staticNodes.len == 1 - node.conf.relay == true - node.conf.lightPush == true - node.conf.peerExchangeService == true - node.conf.rendezvous == true + not ld.isNil() + ld.waku.conf.clusterId == 99 + ld.waku.conf.shardingConf.numShardsInCluster == 16 + ld.waku.conf.maxMessageSizeBytes == 1024'u64 * 1024'u64 + ld.waku.conf.staticNodes.len == 1 + ld.waku.conf.relay == true + ld.waku.conf.lightPush == true + ld.waku.conf.peerExchangeService == true + ld.waku.conf.rendezvous == true asyncTest "Create node with mixed entry nodes (enrtree, multiaddr)": ## Given @@ -75,18 +75,18 @@ suite "Waku API - Create node": ] ## When - let node = (await createNode(nodeConf)).valueOr: - raiseAssert "createNode (mixed entry nodes) failed: " & error + let ld = (await LogosDelivery.new(nodeConf)).valueOr: + raiseAssert "LogosDelivery.new (mixed entry nodes) failed: " & error ## Then check: - not node.isNil() - node.conf.clusterId == 42 + not ld.isNil() + ld.waku.conf.clusterId == 42 # ENRTree should go to DNS discovery - node.conf.dnsDiscoveryConf.isSome() - node.conf.dnsDiscoveryConf.get().enrTreeUrl == + ld.waku.conf.dnsDiscoveryConf.isSome() + ld.waku.conf.dnsDiscoveryConf.get().enrTreeUrl == "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" # Multiaddr should go to static nodes - node.conf.staticNodes.len == 1 - node.conf.staticNodes[0] == + ld.waku.conf.staticNodes.len == 1 + ld.waku.conf.staticNodes[0] == "/ip4/127.0.0.1/tcp/60000/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" diff --git a/tests/waku_discv5/test_waku_discv5.nim b/tests/waku_discv5/test_waku_discv5.nim index b5d34486f..b794ec918 100644 --- a/tests/waku_discv5/test_waku_discv5.nim +++ b/tests/waku_discv5/test_waku_discv5.nim @@ -21,7 +21,7 @@ import discovery/waku_discv5, waku_enr/capabilities, factory/conf_builder/conf_builder, - factory/waku, + waku, waku_node, node/peer_manager, ], diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index eefa866db..1a312a6c6 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -10,7 +10,7 @@ import tests/testlib/[wakucore, wakunode], logos_delivery/waku/factory/conf_builder/conf_builder -include logos_delivery/waku/factory/waku, logos_delivery/waku/common/enr/typed_record +include logos_delivery/waku/waku, logos_delivery/waku/common/enr/typed_record suite "Wakunode2 - Waku": test "compilation version should be reported":