messaging & channels: per-layer api/ folders + concentrator

Extract the messaging (send/subscription) and reliable-channel
(lifecycle/send) operations into their own api/ folders with events moved to
the owning layer, thin messaging_client and reliable_channel_manager, and add
the LogosDelivery concentrator that aggregates the per-layer APIs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-25 04:17:49 +02:00
parent 8c274be44d
commit 4dfe72e12f
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
16 changed files with 220 additions and 168 deletions

View File

@ -0,0 +1,89 @@
## Reliable Channel layer API — channel lifecycle
## (createReliableChannel / closeChannel).
import std/[options, tables]
import results, chronos, chronicles
import logos_delivery/api/types
import logos_delivery/channels/reliable_channel_manager
import logos_delivery/channels/reliable_channel
import logos_delivery/waku/persistency/sds_persistency
# ReliableChannel, SendHandler, config and wire-version markers.
export reliable_channel
const SdsJobId = "sds"
## One persistency job shared by every channel's SDS state; rows are
## keyed by channelId.
proc sdsPersistence(): Option[Persistence] =
## SDS backend from the Persistency singleton; memory-only fallback when
## it is unavailable (e.g. unit tests).
let p = Persistency.instance().valueOr:
info "SDS persistence disabled, running memory-only", reason = $error
return none(Persistence)
let job = p.openJob(SdsJobId).valueOr:
warn "SDS persistence disabled, could not open persistency job",
jobId = SdsJobId, reason = $error
return none(Persistence)
return some(newSdsPersistence(job))
proc createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] =
## Spec entry point. The `sendHandler` and `rng` the channel needs are
## sourced from the owning `ReliableChannelManager` rather than passed
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
## request brokers — the application installs its own providers
## (or `setNoopEncryption()`) before traffic flows.
##
## `sendHandler` defaults to the manager's default (constructed at mount
## from `MessagingClient.send`); tests pass a fake to bypass the network.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
let segConfig = SegmentationConfig(
segmentSizeBytes: DefaultSegmentSizeBytes,
enableReedSolomon: false,
persistence: nil,
)
let sdsConfig = SdsConfig(
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
maxRetransmissions: DefaultMaxRetransmissions,
causalHistorySize: DefaultCausalHistorySize,
persistence: sdsPersistence(),
)
let rateConfig = RateLimitConfig(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
)
let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler
let chn = ReliableChannel.new(
sendHandler = effectiveSendHandler,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,
segConfig = segConfig,
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
)
self.channels[channelId] = chn
return ok(channelId)
proc closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []).} =
## Stops the channel's SDS loops and releases the channel. Persisted SDS
## state survives, so re-creating the channel restores it.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
self.channels.del(channelId)
await chn.stop()
return ok()

View File

@ -1,20 +1,20 @@
## Reliable Channel event types emitted to API consumers.
## Reliable Channel layer API — event surface.
##
## Lifecycle events for individual segments (sent / propagated / errored)
## are the same as the network-level ones the MessagingClient already
## emits — `requestId` is shared across layers — so we just re-export
## `waku/events/message_events` and avoid declaring duplicates.
## `messaging/api/events` and avoid declaring duplicates.
##
## Only the channel-level `MessageReceivedEvent` carries data that has
## no analogue in the lower layer (reassembled application payload,
## senderId, channelId), so it lives here.
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/api/events as messaging_events
import brokers/event_broker
import ./types as channel_types
import logos_delivery/channels/types as channel_types
export waku_message_events, channel_types, event_broker
export messaging_events, channel_types, event_broker
EventBroker:
type ChannelMessageReceivedEvent* = object

View File

@ -0,0 +1,21 @@
## Reliable Channel layer API — channel send operation.
import std/tables
import results, chronos
import logos_delivery/api/types
import logos_delivery/channels/reliable_channel_manager
import logos_delivery/channels/reliable_channel
proc send*(
self: ReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []).} =
## Spec-level entry point. Looks the channel up by id and delegates
## to `ReliableChannel.send`, which exposes the visible pipeline
## segmentation -> sds -> rate_limit_manager -> encryption.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
return await chn.send(appPayload, ephemeral)

View File

@ -25,7 +25,7 @@ import logos_delivery/api/types
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/waku/waku_core/topics
import ./events
import logos_delivery/channels/api/events
import ./segmentation/segmentation
import ./scalable_data_sync/scalable_data_sync
import ./rate_limit_manager/rate_limit_manager

View File

@ -13,21 +13,14 @@ import stew/byteutils
import brokers/broker_context
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/messaging_client
import logos_delivery/messaging/api/send
import logos_delivery/api/types
import logos_delivery/waku/waku_core/topics
import logos_delivery/waku/persistency/sds_persistency
import ./reliable_channel
import ./encryption/noop_encryption
export reliable_channel
const SdsJobId = "sds"
## One persistency job shared by every channel's SDS state; rows are
## keyed by channelId.
type
ReliableChannelManagerConf* = object
## Per-layer config object for the reliable
@ -35,13 +28,13 @@ type
## will move here in a follow-up PR); kept so each layer owns its own config.
ReliableChannelManager* = ref object
channels: Table[ChannelId, ReliableChannel]
channels*: Table[ChannelId, ReliableChannel] ## read by `channels/api.nim`
messagingClient: MessagingClient ## The channel layer chains onto messaging.
sendHandler: SendHandler
sendHandler*: SendHandler
## Default egress dispatch for channels created through this manager.
## Built in `new` as a closure over `MessagingClient.send` so the channel
## layer itself stays callable-only.
brokerCtx: BrokerContext
brokerCtx*: BrokerContext
proc new*(
T: type ReliableChannelManager,
@ -82,96 +75,6 @@ proc stop*(self: ReliableChannelManager) {.async.} =
await chn.stop()
self.channels.clear()
proc sdsPersistence(): Option[Persistence] =
## SDS backend from the Persistency singleton; memory-only fallback when
## it is unavailable (e.g. unit tests).
let p = Persistency.instance().valueOr:
info "SDS persistence disabled, running memory-only", reason = $error
return none(Persistence)
let job = p.openJob(SdsJobId).valueOr:
warn "SDS persistence disabled, could not open persistency job",
jobId = SdsJobId, reason = $error
return none(Persistence)
return some(newSdsPersistence(job))
proc createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] =
## Spec entry point. The `sendHandler` and `rng` the channel needs are
## sourced from the owning `ReliableChannelManager` rather than passed
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
## request brokers — the application installs its own providers
## (or `setNoopEncryption()`) before traffic flows.
##
## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now.
##
## `sendHandler` defaults to the manager's default (constructed at mount
## from `MessagingClient.send`); tests pass a fake to bypass the network.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
let segConfig = SegmentationConfig(
segmentSizeBytes: DefaultSegmentSizeBytes,
enableReedSolomon: false,
persistence: nil,
)
let sdsConfig = SdsConfig(
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
maxRetransmissions: DefaultMaxRetransmissions,
causalHistorySize: DefaultCausalHistorySize,
persistence: sdsPersistence(),
)
let rateConfig = RateLimitConfig(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
)
let effectiveSendHandler = if sendHandler.isNil(): self.sendHandler else: sendHandler
let chn = ReliableChannel.new(
sendHandler = effectiveSendHandler,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,
segConfig = segConfig,
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
)
self.channels[channelId] = chn
return ok(channelId)
proc closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []).} =
## Stops the channel's SDS loops and releases the channel. Persisted SDS
## state survives, so re-creating the channel restores it.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
self.channels.del(channelId)
await chn.stop()
return ok()
proc send*(
self: ReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []).} =
## Spec-level entry point. Looks the channel up by id and delegates
## to `ReliableChannel.send`, which exposes the visible pipeline
## segmentation -> sds -> rate_limit_manager -> encryption.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
return await chn.send(appPayload, ephemeral)
## Inbound messages are not handed to the manager by direct call. Each
## `ReliableChannel` installs its own `MessageReceivedEvent` listener
## in `ReliableChannel.new`, filters by spec marker and `contentTopic`,

View File

@ -11,12 +11,42 @@
import results, chronos, chronicles
# Each layer has a core module (type + new/start/stop) and an api/ folder whose
# modules each implement a differentiated set of operations, plus an events
# surface. The concentrator re-exports them so library consumers get the full
# surface from `import logos_delivery`. (The per-layer `events` modules share a
# stem, so they are imported under aliases.)
# Waku layer
import logos_delivery/waku/waku
export waku
import
logos_delivery/waku/api/[
topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health,
ping,
]
export
topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, ping
import logos_delivery/waku/api/events/[message_events, health_events]
export message_events, health_events
# Messaging layer
import logos_delivery/messaging/messaging_client
export messaging_client
import logos_delivery/messaging/api/[subscription, send]
export subscription, send
import logos_delivery/messaging/api/events as messaging_api_events
export messaging_api_events
# Reliable Channel layer
import logos_delivery/channels/reliable_channel_manager
export reliable_channel_manager
import logos_delivery/channels/api/channel_lifecycle
export channel_lifecycle
import logos_delivery/channels/api/send as channel_send
export channel_send
import logos_delivery/channels/api/events as channels_api_events
export channels_api_events
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/factory/app_callbacks

View File

@ -1,7 +1,8 @@
## Messaging layer API — event surface (messaging-level message events).
import brokers/event_broker
import logos_delivery/api/types
import logos_delivery/waku/[waku_core/message, waku_core/topics]
export types
import logos_delivery/waku/waku_core/message
export event_broker, types
EventBroker:
# Event emitted when a message is sent to the network
@ -27,9 +28,3 @@ EventBroker:
type MessageReceivedEvent* = object
messageHash*: string
message*: WakuMessage
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage

View File

@ -0,0 +1,34 @@
## Messaging layer API — send operation.
import results, chronos, chronicles
import logos_delivery/api/types
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/node/[waku_node, subscription_manager]
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/messaging/delivery_service/send_service/delivery_task
proc send*(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
## High-level messaging API send. Auto-subscribes to the content topic
## (so the local node sees its own gossipsub broadcast), builds a
## `DeliveryTask`, and hands it to the send service. Returns the request
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
?self.checkApiAvailability()
let isSubbed =
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(self.node.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
return err("MessagingClient.send: Failed to create delivery task: " & error)
asyncSpawn self.sendService.send(deliveryTask)
return ok(requestId)

View File

@ -0,0 +1,18 @@
## Messaging layer API — subscription operations.
import results, chronos
import logos_delivery/api/types
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/node/[waku_node, subscription_manager]
proc subscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
?self.checkApiAvailability()
return self.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Result[void, string] =
?self.checkApiAvailability()
return self.node.subscriptionManager.unsubscribe(contentTopic)

View File

@ -13,11 +13,12 @@ import
waku_store/client,
waku_store/common,
waku_filter_v2/client,
events/message_events,
events/health_events,
api/events/message_events,
api/events/health_events,
waku_node,
node/subscription_manager,
]
import logos_delivery/messaging/api/events
const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages

View File

@ -18,8 +18,8 @@ import
waku_rln_relay/rln_relay,
waku_lightpush/client,
waku_lightpush/callbacks,
events/message_events,
]
import logos_delivery/messaging/api/events
logScope:
topics = "send service"

View File

@ -1,10 +1,10 @@
## Messaging layer core: the `MessagingClient` type plus its construction and
## lifecycle. The public operations (subscribe / unsubscribe / send) live in
## `messaging/api.nim`.
import results, chronos
import chronicles
import
logos_delivery/api/types,
logos_delivery/waku/node/[waku_node, subscription_manager],
logos_delivery/messaging/delivery_service/[recv_service, send_service],
logos_delivery/messaging/delivery_service/send_service/delivery_task
logos_delivery/waku/node/waku_node,
logos_delivery/messaging/delivery_service/[recv_service, send_service]
type
MessagingClientConf* = object
@ -14,7 +14,7 @@ type
useP2PReliability*: bool
MessagingClient* = ref object
node: WakuNode
node*: WakuNode ## Waku core driven by this layer; read by `messaging/api.nim`.
sendService*: SendService
recvService*: RecvService
started: bool
@ -43,46 +43,9 @@ proc stop*(self: MessagingClient) {.async.} =
await self.recvService.stopRecvService()
self.started = false
proc checkApiAvailability(self: MessagingClient): Result[void, string] =
proc checkApiAvailability*(self: MessagingClient): Result[void, string] =
## Shared guard for the api operation module.
if self.isNil():
return err("MessagingClient is not initialized")
return ok()
proc subscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
?checkApiAvailability(self)
return self.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Result[void, string] =
?checkApiAvailability(self)
return self.node.subscriptionManager.unsubscribe(contentTopic)
proc send*(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
## High-level messaging API send. Auto-subscribes to the content topic
## (so the local node sees its own gossipsub broadcast), builds a
## `DeliveryTask`, and hands it to the send service. Returns the request
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
let isSubbed =
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(self.node.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
return err("MessagingClient.send: Failed to create delivery task: " & error)
asyncSpawn self.sendService.send(deliveryTask)
return ok(requestId)

View File

@ -12,7 +12,7 @@ import
[topic_health, health_status, protocol_health, health_report],
logos_delivery/waku/requests/health_requests,
logos_delivery/waku/requests/node_requests,
logos_delivery/waku/events/health_events,
logos_delivery/waku/api/events/health_events,
logos_delivery/waku/common/waku_protocol,
logos_delivery/waku/factory/waku_conf
import tools/confutils/cli_args

View File

@ -14,8 +14,7 @@ import
logos_delivery/waku/[
waku_node,
waku_core,
events/message_events,
events/health_events,
api/events/health_events,
waku_relay/protocol,
waku_archive,
waku_archive/common as archive_common,

View File

@ -12,7 +12,6 @@ import
logos_delivery/waku/[
waku_node,
waku_core,
events/message_events,
waku_relay/protocol,
node/waku_node/filter,
node/subscription_manager,

View File

@ -10,7 +10,7 @@ import ../testlib/[common, wakucore, wakunode, testasync]
import logos_delivery
import logos_delivery/waku/[waku_node, waku_core]
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/api/events as waku_message_events
import tools/confutils/cli_args
import logos_delivery/channels/reliable_channel_manager