From 4dfe72e12fbca9de6a1dbe46e45f1647a15ca998 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Thu, 25 Jun 2026 04:17:49 +0200 Subject: [PATCH] messaging & channels: per-layer api/ folders + concentrator Extract the messaging (send/subscription) and reliable-channel (lifecycle/send) operations into their own api/ folders with events moved to the owning layer, thin messaging_client and reliable_channel_manager, and add the LogosDelivery concentrator that aggregates the per-layer APIs. Co-Authored-By: Claude Opus 4.8 --- .../channels/api/channel_lifecycle.nim | 89 +++++++++++++++ logos_delivery/channels/{ => api}/events.nim | 10 +- logos_delivery/channels/api/send.nim | 21 ++++ logos_delivery/channels/reliable_channel.nim | 2 +- .../channels/reliable_channel_manager.nim | 105 +----------------- logos_delivery/logos_delivery.nim | 30 +++++ .../api/events.nim} | 11 +- logos_delivery/messaging/api/send.nim | 34 ++++++ logos_delivery/messaging/api/subscription.nim | 18 +++ .../recv_service/recv_service.nim | 5 +- .../send_service/send_service.nim | 2 +- logos_delivery/messaging/messaging_client.nim | 53 ++------- tests/api/test_api_health.nim | 2 +- tests/api/test_api_receive.nim | 3 +- tests/api/test_api_subscription.nim | 1 - .../test_reliable_channel_send_receive.nim | 2 +- 16 files changed, 220 insertions(+), 168 deletions(-) create mode 100644 logos_delivery/channels/api/channel_lifecycle.nim rename logos_delivery/channels/{ => api}/events.nim (80%) create mode 100644 logos_delivery/channels/api/send.nim rename logos_delivery/{waku/events/message_events.nim => messaging/api/events.nim} (73%) create mode 100644 logos_delivery/messaging/api/send.nim create mode 100644 logos_delivery/messaging/api/subscription.nim diff --git a/logos_delivery/channels/api/channel_lifecycle.nim b/logos_delivery/channels/api/channel_lifecycle.nim new file mode 100644 index 000000000..b50ba1b63 --- /dev/null +++ b/logos_delivery/channels/api/channel_lifecycle.nim @@ -0,0 +1,89 @@ +## Reliable Channel layer API — channel lifecycle +## (createReliableChannel / closeChannel). +import std/[options, tables] +import results, chronos, chronicles + +import logos_delivery/api/types +import logos_delivery/channels/reliable_channel_manager +import logos_delivery/channels/reliable_channel +import logos_delivery/waku/persistency/sds_persistency + +# ReliableChannel, SendHandler, config and wire-version markers. +export reliable_channel + +const SdsJobId = "sds" + ## One persistency job shared by every channel's SDS state; rows are + ## keyed by channelId. + +proc sdsPersistence(): Option[Persistence] = + ## SDS backend from the Persistency singleton; memory-only fallback when + ## it is unavailable (e.g. unit tests). + let p = Persistency.instance().valueOr: + info "SDS persistence disabled, running memory-only", reason = $error + return none(Persistence) + let job = p.openJob(SdsJobId).valueOr: + warn "SDS persistence disabled, could not open persistency job", + jobId = SdsJobId, reason = $error + return none(Persistence) + return some(newSdsPersistence(job)) + +proc createReliableChannel*( + self: ReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + sendHandler: SendHandler = nil, +): Result[ChannelId, string] = + ## Spec entry point. The `sendHandler` and `rng` the channel needs are + ## sourced from the owning `ReliableChannelManager` rather than passed + ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` + ## request brokers — the application installs its own providers + ## (or `setNoopEncryption()`) before traffic flows. + ## + ## `sendHandler` defaults to the manager's default (constructed at mount + ## from `MessagingClient.send`); tests pass a fake to bypass the network. + if self.channels.hasKey(channelId): + return err("channel already exists: " & channelId) + + let segConfig = SegmentationConfig( + segmentSizeBytes: DefaultSegmentSizeBytes, + enableReedSolomon: false, + persistence: nil, + ) + let sdsConfig = SdsConfig( + acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, + maxRetransmissions: DefaultMaxRetransmissions, + causalHistorySize: DefaultCausalHistorySize, + persistence: sdsPersistence(), + ) + let rateConfig = RateLimitConfig( + epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch + ) + + let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler + + let chn = ReliableChannel.new( + sendHandler = effectiveSendHandler, + channelId = channelId, + contentTopic = contentTopic, + senderId = senderId, + segConfig = segConfig, + sdsConfig = sdsConfig, + rateConfig = rateConfig, + brokerCtx = self.brokerCtx, + ) + + self.channels[channelId] = chn + return ok(channelId) + +proc closeChannel*( + self: ReliableChannelManager, channelId: ChannelId +): Future[Result[void, string]] {.async: (raises: []).} = + ## Stops the channel's SDS loops and releases the channel. Persisted SDS + ## state survives, so re-creating the channel restores it. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + self.channels.del(channelId) + await chn.stop() + return ok() diff --git a/logos_delivery/channels/events.nim b/logos_delivery/channels/api/events.nim similarity index 80% rename from logos_delivery/channels/events.nim rename to logos_delivery/channels/api/events.nim index 5f69095e4..685059053 100644 --- a/logos_delivery/channels/events.nim +++ b/logos_delivery/channels/api/events.nim @@ -1,20 +1,20 @@ -## Reliable Channel event types emitted to API consumers. +## Reliable Channel layer API — event surface. ## ## Lifecycle events for individual segments (sent / propagated / errored) ## are the same as the network-level ones the MessagingClient already ## emits — `requestId` is shared across layers — so we just re-export -## `waku/events/message_events` and avoid declaring duplicates. +## `messaging/api/events` and avoid declaring duplicates. ## ## Only the channel-level `MessageReceivedEvent` carries data that has ## no analogue in the lower layer (reassembled application payload, ## senderId, channelId), so it lives here. -import logos_delivery/waku/events/message_events as waku_message_events +import logos_delivery/messaging/api/events as messaging_events import brokers/event_broker -import ./types as channel_types +import logos_delivery/channels/types as channel_types -export waku_message_events, channel_types, event_broker +export messaging_events, channel_types, event_broker EventBroker: type ChannelMessageReceivedEvent* = object diff --git a/logos_delivery/channels/api/send.nim b/logos_delivery/channels/api/send.nim new file mode 100644 index 000000000..23c41ac13 --- /dev/null +++ b/logos_delivery/channels/api/send.nim @@ -0,0 +1,21 @@ +## Reliable Channel layer API — channel send operation. +import std/tables +import results, chronos + +import logos_delivery/api/types +import logos_delivery/channels/reliable_channel_manager +import logos_delivery/channels/reliable_channel + +proc send*( + self: ReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, +): Future[Result[RequestId, string]] {.async: (raises: []).} = + ## Spec-level entry point. Looks the channel up by id and delegates + ## to `ReliableChannel.send`, which exposes the visible pipeline + ## segmentation -> sds -> rate_limit_manager -> encryption. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + return await chn.send(appPayload, ephemeral) diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index 307dc17a4..39cea259e 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -25,7 +25,7 @@ import logos_delivery/api/types import logos_delivery/messaging/delivery_service/send_service import logos_delivery/waku/waku_core/topics -import ./events +import logos_delivery/channels/api/events import ./segmentation/segmentation import ./scalable_data_sync/scalable_data_sync import ./rate_limit_manager/rate_limit_manager diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index 29feab0b9..3eafe880e 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -13,21 +13,14 @@ import stew/byteutils import brokers/broker_context -import logos_delivery/waku/events/message_events as waku_message_events import logos_delivery/messaging/messaging_client +import logos_delivery/messaging/api/send import logos_delivery/api/types -import logos_delivery/waku/waku_core/topics -import logos_delivery/waku/persistency/sds_persistency import ./reliable_channel -import ./encryption/noop_encryption export reliable_channel -const SdsJobId = "sds" - ## One persistency job shared by every channel's SDS state; rows are - ## keyed by channelId. - type ReliableChannelManagerConf* = object ## Per-layer config object for the reliable @@ -35,13 +28,13 @@ type ## will move here in a follow-up PR); kept so each layer owns its own config. ReliableChannelManager* = ref object - channels: Table[ChannelId, ReliableChannel] + channels*: Table[ChannelId, ReliableChannel] ## read by `channels/api.nim` messagingClient: MessagingClient ## The channel layer chains onto messaging. - sendHandler: SendHandler + sendHandler*: SendHandler ## Default egress dispatch for channels created through this manager. ## Built in `new` as a closure over `MessagingClient.send` so the channel ## layer itself stays callable-only. - brokerCtx: BrokerContext + brokerCtx*: BrokerContext proc new*( T: type ReliableChannelManager, @@ -82,96 +75,6 @@ proc stop*(self: ReliableChannelManager) {.async.} = await chn.stop() self.channels.clear() -proc sdsPersistence(): Option[Persistence] = - ## SDS backend from the Persistency singleton; memory-only fallback when - ## it is unavailable (e.g. unit tests). - let p = Persistency.instance().valueOr: - info "SDS persistence disabled, running memory-only", reason = $error - return none(Persistence) - let job = p.openJob(SdsJobId).valueOr: - warn "SDS persistence disabled, could not open persistency job", - jobId = SdsJobId, reason = $error - return none(Persistence) - return some(newSdsPersistence(job)) - -proc createReliableChannel*( - self: ReliableChannelManager, - channelId: ChannelId, - contentTopic: ContentTopic, - senderId: SdsParticipantID, - sendHandler: SendHandler = nil, -): Result[ChannelId, string] = - ## Spec entry point. The `sendHandler` and `rng` the channel needs are - ## sourced from the owning `ReliableChannelManager` rather than passed - ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` - ## request brokers — the application installs its own providers - ## (or `setNoopEncryption()`) before traffic flows. - ## - ## Segmentation, SDS and rate-limit configs will eventually be read - ## from the node's `NodeConfig`. Defaults for now. - ## - ## `sendHandler` defaults to the manager's default (constructed at mount - ## from `MessagingClient.send`); tests pass a fake to bypass the network. - if self.channels.hasKey(channelId): - return err("channel already exists: " & channelId) - - let segConfig = SegmentationConfig( - segmentSizeBytes: DefaultSegmentSizeBytes, - enableReedSolomon: false, - persistence: nil, - ) - let sdsConfig = SdsConfig( - acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, - maxRetransmissions: DefaultMaxRetransmissions, - causalHistorySize: DefaultCausalHistorySize, - persistence: sdsPersistence(), - ) - let rateConfig = RateLimitConfig( - epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch - ) - - let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler - - let chn = ReliableChannel.new( - sendHandler = effectiveSendHandler, - channelId = channelId, - contentTopic = contentTopic, - senderId = senderId, - segConfig = segConfig, - sdsConfig = sdsConfig, - rateConfig = rateConfig, - brokerCtx = self.brokerCtx, - ) - - self.channels[channelId] = chn - return ok(channelId) - -proc closeChannel*( - self: ReliableChannelManager, channelId: ChannelId -): Future[Result[void, string]] {.async: (raises: []).} = - ## Stops the channel's SDS loops and releases the channel. Persisted SDS - ## state survives, so re-creating the channel restores it. - let chn = self.channels.getOrDefault(channelId) - if chn.isNil(): - return err("unknown channel: " & channelId) - self.channels.del(channelId) - await chn.stop() - return ok() - -proc send*( - self: ReliableChannelManager, - channelId: ChannelId, - appPayload: seq[byte], - ephemeral: bool = false, -): Future[Result[RequestId, string]] {.async: (raises: []).} = - ## Spec-level entry point. Looks the channel up by id and delegates - ## to `ReliableChannel.send`, which exposes the visible pipeline - ## segmentation -> sds -> rate_limit_manager -> encryption. - let chn = self.channels.getOrDefault(channelId) - if chn.isNil(): - return err("unknown channel: " & channelId) - return await chn.send(appPayload, ephemeral) - ## Inbound messages are not handed to the manager by direct call. Each ## `ReliableChannel` installs its own `MessageReceivedEvent` listener ## in `ReliableChannel.new`, filters by spec marker and `contentTopic`, diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index f61202b9a..8f22b397f 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -11,12 +11,42 @@ import results, chronos, chronicles +# Each layer has a core module (type + new/start/stop) and an api/ folder whose +# modules each implement a differentiated set of operations, plus an events +# surface. The concentrator re-exports them so library consumers get the full +# surface from `import logos_delivery`. (The per-layer `events` modules share a +# stem, so they are imported under aliases.) + +# Waku layer import logos_delivery/waku/waku export waku +import + logos_delivery/waku/api/[ + topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, + ping, + ] +export + topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, ping +import logos_delivery/waku/api/events/[message_events, health_events] +export message_events, health_events + +# Messaging layer import logos_delivery/messaging/messaging_client export messaging_client +import logos_delivery/messaging/api/[subscription, send] +export subscription, send +import logos_delivery/messaging/api/events as messaging_api_events +export messaging_api_events + +# Reliable Channel layer import logos_delivery/channels/reliable_channel_manager export reliable_channel_manager +import logos_delivery/channels/api/channel_lifecycle +export channel_lifecycle +import logos_delivery/channels/api/send as channel_send +export channel_send +import logos_delivery/channels/api/events as channels_api_events +export channels_api_events import logos_delivery/waku/factory/waku_conf import logos_delivery/waku/factory/app_callbacks diff --git a/logos_delivery/waku/events/message_events.nim b/logos_delivery/messaging/api/events.nim similarity index 73% rename from logos_delivery/waku/events/message_events.nim rename to logos_delivery/messaging/api/events.nim index 2e4bece80..89ec704b9 100644 --- a/logos_delivery/waku/events/message_events.nim +++ b/logos_delivery/messaging/api/events.nim @@ -1,7 +1,8 @@ +## Messaging layer API — event surface (messaging-level message events). import brokers/event_broker import logos_delivery/api/types -import logos_delivery/waku/[waku_core/message, waku_core/topics] -export types +import logos_delivery/waku/waku_core/message +export event_broker, types EventBroker: # Event emitted when a message is sent to the network @@ -27,9 +28,3 @@ EventBroker: type MessageReceivedEvent* = object messageHash*: string message*: WakuMessage - -EventBroker: - # Internal event emitted when a message arrives from the network via any protocol - type MessageSeenEvent* = object - topic*: PubsubTopic - message*: WakuMessage diff --git a/logos_delivery/messaging/api/send.nim b/logos_delivery/messaging/api/send.nim new file mode 100644 index 000000000..cc6312471 --- /dev/null +++ b/logos_delivery/messaging/api/send.nim @@ -0,0 +1,34 @@ +## Messaging layer API — send operation. +import results, chronos, chronicles + +import logos_delivery/api/types +import logos_delivery/messaging/messaging_client +import logos_delivery/waku/node/[waku_node, subscription_manager] +import logos_delivery/messaging/delivery_service/send_service +import logos_delivery/messaging/delivery_service/send_service/delivery_task + +proc send*( + self: MessagingClient, envelope: MessageEnvelope +): Future[Result[RequestId, string]] {.async.} = + ## High-level messaging API send. Auto-subscribes to the content topic + ## (so the local node sees its own gossipsub broadcast), builds a + ## `DeliveryTask`, and hands it to the send service. Returns the request + ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. + ?self.checkApiAvailability() + + let isSubbed = + self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) + if not isSubbed: + info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic + self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: + warn "Failed to auto-subscribe", error = error + return err("Failed to auto-subscribe before sending: " & error) + + let requestId = RequestId.new(self.node.rng) + + let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr: + return err("MessagingClient.send: Failed to create delivery task: " & error) + + asyncSpawn self.sendService.send(deliveryTask) + + return ok(requestId) diff --git a/logos_delivery/messaging/api/subscription.nim b/logos_delivery/messaging/api/subscription.nim new file mode 100644 index 000000000..35b8c7e53 --- /dev/null +++ b/logos_delivery/messaging/api/subscription.nim @@ -0,0 +1,18 @@ +## Messaging layer API — subscription operations. +import results, chronos + +import logos_delivery/api/types +import logos_delivery/messaging/messaging_client +import logos_delivery/waku/node/[waku_node, subscription_manager] + +proc subscribe*( + self: MessagingClient, contentTopic: ContentTopic +): Future[Result[void, string]] {.async.} = + ?self.checkApiAvailability() + return self.node.subscriptionManager.subscribe(contentTopic) + +proc unsubscribe*( + self: MessagingClient, contentTopic: ContentTopic +): Result[void, string] = + ?self.checkApiAvailability() + return self.node.subscriptionManager.unsubscribe(contentTopic) diff --git a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim index 90bdb0839..a6e9701d9 100644 --- a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim +++ b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim @@ -13,11 +13,12 @@ import waku_store/client, waku_store/common, waku_filter_v2/client, - events/message_events, - events/health_events, + api/events/message_events, + api/events/health_events, waku_node, node/subscription_manager, ] +import logos_delivery/messaging/api/events const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages diff --git a/logos_delivery/messaging/delivery_service/send_service/send_service.nim b/logos_delivery/messaging/delivery_service/send_service/send_service.nim index 00f2ff672..5e21ac4ba 100644 --- a/logos_delivery/messaging/delivery_service/send_service/send_service.nim +++ b/logos_delivery/messaging/delivery_service/send_service/send_service.nim @@ -18,8 +18,8 @@ import waku_rln_relay/rln_relay, waku_lightpush/client, waku_lightpush/callbacks, - events/message_events, ] +import logos_delivery/messaging/api/events logScope: topics = "send service" diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 96cd13eb1..a92d045f5 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -1,10 +1,10 @@ +## Messaging layer core: the `MessagingClient` type plus its construction and +## lifecycle. The public operations (subscribe / unsubscribe / send) live in +## `messaging/api.nim`. import results, chronos -import chronicles import - logos_delivery/api/types, - logos_delivery/waku/node/[waku_node, subscription_manager], - logos_delivery/messaging/delivery_service/[recv_service, send_service], - logos_delivery/messaging/delivery_service/send_service/delivery_task + logos_delivery/waku/node/waku_node, + logos_delivery/messaging/delivery_service/[recv_service, send_service] type MessagingClientConf* = object @@ -14,7 +14,7 @@ type useP2PReliability*: bool MessagingClient* = ref object - node: WakuNode + node*: WakuNode ## Waku core driven by this layer; read by `messaging/api.nim`. sendService*: SendService recvService*: RecvService started: bool @@ -43,46 +43,9 @@ proc stop*(self: MessagingClient) {.async.} = await self.recvService.stopRecvService() self.started = false -proc checkApiAvailability(self: MessagingClient): Result[void, string] = +proc checkApiAvailability*(self: MessagingClient): Result[void, string] = + ## Shared guard for the api operation module. if self.isNil(): return err("MessagingClient is not initialized") return ok() - -proc subscribe*( - self: MessagingClient, contentTopic: ContentTopic -): Future[Result[void, string]] {.async.} = - ?checkApiAvailability(self) - - return self.node.subscriptionManager.subscribe(contentTopic) - -proc unsubscribe*( - self: MessagingClient, contentTopic: ContentTopic -): Result[void, string] = - ?checkApiAvailability(self) - - return self.node.subscriptionManager.unsubscribe(contentTopic) - -proc send*( - self: MessagingClient, envelope: MessageEnvelope -): Future[Result[RequestId, string]] {.async.} = - ## High-level messaging API send. Auto-subscribes to the content topic - ## (so the local node sees its own gossipsub broadcast), builds a - ## `DeliveryTask`, and hands it to the send service. Returns the request - ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. - let isSubbed = - self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) - if not isSubbed: - info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic - self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: - warn "Failed to auto-subscribe", error = error - return err("Failed to auto-subscribe before sending: " & error) - - let requestId = RequestId.new(self.node.rng) - - let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr: - return err("MessagingClient.send: Failed to create delivery task: " & error) - - asyncSpawn self.sendService.send(deliveryTask) - - return ok(requestId) diff --git a/tests/api/test_api_health.nim b/tests/api/test_api_health.nim index 83efedd18..27030d432 100644 --- a/tests/api/test_api_health.nim +++ b/tests/api/test_api_health.nim @@ -12,7 +12,7 @@ import [topic_health, health_status, protocol_health, health_report], logos_delivery/waku/requests/health_requests, logos_delivery/waku/requests/node_requests, - logos_delivery/waku/events/health_events, + logos_delivery/waku/api/events/health_events, logos_delivery/waku/common/waku_protocol, logos_delivery/waku/factory/waku_conf import tools/confutils/cli_args diff --git a/tests/api/test_api_receive.nim b/tests/api/test_api_receive.nim index 41c0f0477..b1e7635d8 100644 --- a/tests/api/test_api_receive.nim +++ b/tests/api/test_api_receive.nim @@ -14,8 +14,7 @@ import logos_delivery/waku/[ waku_node, waku_core, - events/message_events, - events/health_events, + api/events/health_events, waku_relay/protocol, waku_archive, waku_archive/common as archive_common, diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index bf2851b02..ca9d2c9b3 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -12,7 +12,6 @@ import logos_delivery/waku/[ waku_node, waku_core, - events/message_events, waku_relay/protocol, node/waku_node/filter, node/subscription_manager, diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index b3d9a8f00..17ceb56ad 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -10,7 +10,7 @@ import ../testlib/[common, wakucore, wakunode, testasync] import logos_delivery import logos_delivery/waku/[waku_node, waku_core] import logos_delivery/waku/factory/waku_conf -import logos_delivery/waku/events/message_events as waku_message_events +import logos_delivery/messaging/api/events as waku_message_events import tools/confutils/cli_args import logos_delivery/channels/reliable_channel_manager