From b7d1cb88727dc16353bebf497b9aae735c266292 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 26 Jun 2026 12:30:07 +0200 Subject: [PATCH 1/3] api: define layer contracts with `concept`, not RootObj inheritance The IMessagingClient / IKernel / IReliableChannelManager base types were inheritance-as-documentation: nothing dispatched over them and no impl ever overrode their base `method`s (the ops live in `*/api/*` as plain procs). The `method`s only returned "not implemented" at runtime and, crucially, were never checked against the real surface -- so IKernel had silently drifted from `Waku` (relayPublish returned int not string; filter/lightpush still carried a `peer` param; connectedPeersInfo returned seq[string]). Replace each base type with a structural Nim `concept` matched against the real implementation, and assert conformance once in the concentrator (`doAssert Waku is KernelApi`, etc.) where every impl and its op modules are in scope. This is zero-cost, drops the dead vtables, and makes each layer's true surface a single compiler-checked source of truth. Move `PeerConnInfo` from `waku/api/peer_manager` into `api/types` (alongside the other api-boundary data types) so `KernelApi` can name it without an import cycle -- otherwise `connectedPeersInfo` could not be part of the contract. Co-Authored-By: Claude Opus 4.8 --- logos_delivery/api/kernel_api.nim | 231 ++++-------------- logos_delivery/api/messaging_client_api.nim | 21 +- .../api/reliable_channel_manager_api.nim | 29 +-- logos_delivery/api/types.nim | 5 + .../channels/reliable_channel_manager.nim | 2 +- logos_delivery/logos_delivery.nim | 6 + logos_delivery/messaging/messaging_client.nim | 2 +- logos_delivery/waku/api/peer_manager.nim | 5 +- logos_delivery/waku/waku.nim | 2 +- 9 files changed, 78 insertions(+), 225 deletions(-) diff --git a/logos_delivery/api/kernel_api.nim b/logos_delivery/api/kernel_api.nim index 35cf7251f..75d73144d 100644 --- a/logos_delivery/api/kernel_api.nim +++ b/logos_delivery/api/kernel_api.nim @@ -1,203 +1,76 @@ -import std/options import chronos, results import brokers/event_broker import logos_delivery/api/types as api_types import logos_delivery/waku/waku_core/topics/pubsub_topic import logos_delivery/waku/waku_core/message +import logos_delivery/waku/waku_core/subscription/push_handler import logos_delivery/waku/waku_store/common as store_types export event_broker export api_types, pubsub_topic, store_types -type IKernel* = ref object of RootObj - EventBroker: # Internal event emitted when a message arrives from the network via any protocol type MessageSeenEvent* = object topic*: PubsubTopic message*: WakuMessage -# --- topic construction --- -method buildContentTopic*( - self: IKernel, appName: string, appVersion: uint32, name: string, encoding: string -): Future[Result[ContentTopic, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.buildContentTopic not implemented") +# Structural API contract for the Kernel surface, implemented by `Waku` +# (ops in `waku/api/*`). +type KernelApi* = concept w + # --- topic construction --- + buildContentTopic(w, string, uint32, string, string) is + Future[Result[ContentTopic, string]] + buildPubsubTopic(w, string) is Future[Result[PubsubTopic, string]] + defaultPubsubTopic(w) is Future[Result[PubsubTopic, string]] -method buildPubsubTopic*( - self: IKernel, topicName: string -): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.buildPubsubTopic not implemented") + # --- relay --- + relayPublish(w, PubsubTopic, WakuMessage, uint32) is Future[Result[string, string]] + relaySubscribe(w, PubsubTopic) is Future[Result[bool, string]] + relayUnsubscribe(w, PubsubTopic) is Future[Result[bool, string]] + relayAddProtectedShard(w, uint16, uint16, string) is Future[Result[bool, string]] + relayConnectedPeers(w, PubsubTopic) is Future[Result[seq[string], string]] + relayPeersInMesh(w, PubsubTopic) is Future[Result[seq[string], string]] + relayNumPeersInMesh(w, PubsubTopic) is Future[Result[int, string]] + relayNumConnectedPeers(w, PubsubTopic) is Future[Result[int, string]] -method defaultPubsubTopic*( - self: IKernel -): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.defaultPubsubTopic not implemented") + # --- filter --- + filterSubscribe(w, PubsubTopic, seq[ContentTopic], FilterPushHandler) is + Future[Result[bool, string]] + filterUnsubscribe(w, PubsubTopic, seq[ContentTopic]) is Future[Result[bool, string]] + filterUnsubscribeAll(w) is Future[Result[bool, string]] -# --- relay --- -method relayPublish*( - self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32 -): Future[Result[int, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.relayPublish not implemented") + # --- lightpush --- + lightpushPublish(w, PubsubTopic, WakuMessage) is Future[Result[string, string]] -method relaySubscribe*( - self: IKernel, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.relaySubscribe not implemented") + # --- store --- + storeQuery(w, StoreQueryRequest, string, int) is + Future[Result[StoreQueryResponse, string]] -method relayUnsubscribe*( - self: IKernel, pubsubTopic: PubsubTopic -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.relayUnsubscribe not implemented") + # --- peer management --- + connect(w, seq[string], uint32) is Future[Result[bool, string]] + disconnectPeerById(w, string) is Future[Result[bool, string]] + disconnectAllPeers(w) is Future[Result[bool, string]] + dialPeer(w, string, string, int) is Future[Result[bool, string]] + dialPeerById(w, string, string, int) is Future[Result[bool, string]] + peerIdsFromPeerstore(w) is Future[Result[seq[string], string]] + connectedPeersInfo(w) is Future[Result[seq[PeerConnInfo], string]] + connectedPeers(w) is Future[Result[seq[string], string]] + peerIdsByProtocol(w, string) is Future[Result[seq[string], string]] -method relayAddProtectedShard*( - self: IKernel, clusterId: uint16, shardId: uint16, publicKey: string -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.relayAddProtectedShard not implemented") + # --- discovery --- + dnsDiscovery(w, string, string, int) is Future[Result[seq[string], string]] + discv5UpdateBootnodes(w, string) is Future[Result[bool, string]] + startDiscv5(w) is Future[Result[bool, string]] + stopDiscv5(w) is Future[Result[bool, string]] + peerExchangeRequest(w, uint64) is Future[Result[int, string]] -method relayConnectedPeers*( - self: IKernel, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async: (raises: []), base.} = - return err("Interface IKernel.relayConnectedPeers not implemented") - -method relayPeersInMesh*( - self: IKernel, pubsubTopic: PubsubTopic -): Future[Result[seq[string], string]] {.async: (raises: []), base.} = - return err("Interface IKernel.relayPeersInMesh not implemented") - -# --- filter --- -method filterSubscribe*( - self: IKernel, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], - peer: string, -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.filterSubscribe not implemented") - -method filterUnsubscribe*( - self: IKernel, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], - peer: string, -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.filterUnsubscribe not implemented") - -method filterUnsubscribeAll*( - self: IKernel, peer: string -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.filterUnsubscribeAll not implemented") - -# --- lightpush --- -method lightpushPublish*( - self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string -): Future[Result[string, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.lightpushPublish not implemented") - -# --- store --- -method storeQuery*( - self: IKernel, request: StoreQueryRequest, peer: string, timeoutMs: int -): Future[Result[StoreQueryResponse, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.storeQuery not implemented") - -# --- peer management --- -method connect*( - self: IKernel, peers: seq[string], timeoutMs: uint32 -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.connect not implemented") - -method disconnectPeerById*( - self: IKernel, peerId: string -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.disconnectPeerById not implemented") - -method disconnectAllPeers*( - self: IKernel -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.disconnectAllPeers not implemented") - -method dialPeer*( - self: IKernel, peerAddr: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.dialPeer not implemented") - -method dialPeerById*( - self: IKernel, peerId: string, protocol: string, timeoutMs: int -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.dialPeerById not implemented") - -method peerIdsFromPeerstore*( - self: IKernel -): Future[Result[seq[string], string]] {.async: (raises: []), base.} = - return err("Interface IKernel.peerIdsFromPeerstore not implemented") - -method connectedPeersInfo*( - self: IKernel -): Future[Result[seq[string], string]] {.async: (raises: []), base.} = - return err("Interface IKernel.connectedPeersInfo not implemented") - -method connectedPeers*( - self: IKernel -): Future[Result[seq[string], string]] {.async: (raises: []), base.} = - return err("Interface IKernel.connectedPeers not implemented") - -method peerIdsByProtocol*( - self: IKernel, protocol: string -): Future[Result[seq[string], string]] {.async: (raises: []), base.} = - return err("Interface IKernel.peerIdsByProtocol not implemented") - -# --- discovery --- -method dnsDiscovery*( - self: IKernel, enrTreeUrl: string, nameServer: string, timeoutMs: int -): Future[Result[seq[string], string]] {.async: (raises: []), base.} = - return err("Interface IKernel.dnsDiscovery not implemented") - -method discv5UpdateBootnodes*( - self: IKernel, bootnodes: seq[string] -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.discv5UpdateBootnodes not implemented") - -method startDiscv5*( - self: IKernel -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.startDiscv5 not implemented") - -method stopDiscv5*( - self: IKernel -): Future[Result[bool, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.stopDiscv5 not implemented") - -method peerExchangeRequest*( - self: IKernel, numPeers: uint64 -): Future[Result[int, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.peerExchangeRequest not implemented") - -# --- debug / info --- -method version*( - self: IKernel -): Future[Result[string, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.version not implemented") - -method listenAddresses*( - self: IKernel -): Future[Result[seq[string], string]] {.async: (raises: []), base.} = - return err("Interface IKernel.listenAddresses not implemented") - -method myEnr*( - self: IKernel -): Future[Result[string, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.myEnr not implemented") - -method myPeerId*( - self: IKernel -): Future[Result[string, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.myPeerId not implemented") - -method metrics*( - self: IKernel -): Future[Result[string, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.metrics not implemented") - -method pingPeer*( - self: IKernel, peerAddr: string, timeoutMs: int -): Future[Result[int64, string]] {.async: (raises: []), base.} = - return err("Interface IKernel.pingPeer not implemented") + # --- debug / info --- + version(w) is Future[Result[string, string]] + listenAddresses(w) is Future[Result[seq[string], string]] + myEnr(w) is Future[Result[string, string]] + myPeerId(w) is Future[Result[string, string]] + metrics(w) is Future[Result[string, string]] + isOnline(w) is Future[Result[bool, string]] + pingPeer(w, string, int) is Future[Result[int64, string]] diff --git a/logos_delivery/api/messaging_client_api.nim b/logos_delivery/api/messaging_client_api.nim index f29a5cb41..4e71a9e98 100644 --- a/logos_delivery/api/messaging_client_api.nim +++ b/logos_delivery/api/messaging_client_api.nim @@ -6,8 +6,6 @@ import logos_delivery/waku/waku_core/message export event_broker, api_types -type IMessagingClient* = ref object of RootObj - EventBroker: # Event emitted when a message is sent to the network type MessageSentEvent* = object @@ -33,17 +31,8 @@ EventBroker: messageHash*: string message*: WakuMessage -method subscribe*( - self: IMessagingClient, contentTopic: ContentTopic -): Future[Result[void, string]] {.async: (raises: []), base.} = - return err("Interface IMessagingClient.subscribe not implemented") - -method unsubscribe*( - self: IMessagingClient, contentTopic: ContentTopic -): Result[void, string] {.base, raises: [].} = - return err("Interface IMessagingClient.unsubscribe not implemented") - -method send*( - self: IMessagingClient, envelope: MessageEnvelope -): Future[Result[RequestId, string]] {.async: (raises: []), base.} = - return err("Interface IMessagingClient.send not implemented") +# Structural API contract for a messaging client (ops in `messaging/api/*`). +type MessagingApi* = concept c + subscribe(c, ContentTopic) is Future[Result[void, string]] + unsubscribe(c, ContentTopic) is Result[void, string] + send(c, MessageEnvelope) is Future[Result[RequestId, string]] diff --git a/logos_delivery/api/reliable_channel_manager_api.nim b/logos_delivery/api/reliable_channel_manager_api.nim index c5266972a..9c1b19019 100644 --- a/logos_delivery/api/reliable_channel_manager_api.nim +++ b/logos_delivery/api/reliable_channel_manager_api.nim @@ -15,8 +15,6 @@ export event_broker, api_types export channel_types, messaging_client_api type - IReliableChannelManager* = ref object of RootObj - SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. async: (raises: [CatchableError]), gcsafe .} @@ -47,24 +45,9 @@ EventBroker: requestId*: RequestId error*: string -method createReliableChannel*( - self: IReliableChannelManager, - channelId: ChannelId, - contentTopic: ContentTopic, - senderId: SdsParticipantID, - sendHandler: SendHandler = nil, -): Result[ChannelId, string] {.base.} = - return err("Interface IReliableChannelManager.createReliableChannel not implemented") - -method closeChannel*( - self: IReliableChannelManager, channelId: ChannelId -): Future[Result[void, string]] {.async: (raises: []), base.} = - return err("Interface IReliableChannelManager.closeChannel not implemented") - -method send*( - self: IReliableChannelManager, - channelId: ChannelId, - appPayload: seq[byte], - ephemeral: bool = false, -): Future[Result[RequestId, string]] {.async: (raises: []), base.} = - return err("Interface IReliableChannelManager.send not implemented") +# Structural API contract for the reliable-channel surface (ops in `channels/api/*`). +type ReliableChannelApi* = concept c + createReliableChannel(c, ChannelId, ContentTopic, SdsParticipantID) is + Result[ChannelId, string] + closeChannel(c, ChannelId) is Future[Result[void, string]] + send(c, ChannelId, seq[byte]) is Future[Result[RequestId, string]] diff --git a/logos_delivery/api/types.nim b/logos_delivery/api/types.nim index 5757a8e82..97e7a11c9 100644 --- a/logos_delivery/api/types.nim +++ b/logos_delivery/api/types.nim @@ -26,6 +26,11 @@ type PartiallyConnected Connected + PeerConnInfo* = object ## structured connected-peer info for the api boundary + peerId*: string + protocols*: seq[string] + addresses*: seq[string] + proc new*(T: typedesc[RequestId], rng: crypto.Rng): T = ## Generate a new RequestId using the provided RNG. RequestId(request_utils.generateRequestId(rng)) diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index d3f671c2b..e8d579326 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -28,7 +28,7 @@ type ## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults ## will move here in a follow-up PR); kept so each layer owns its own config. - ReliableChannelManager* = ref object of IReliableChannelManager + ReliableChannelManager* = ref object ## Implements `ReliableChannelApi`. channels*: Table[ChannelId, ReliableChannel] ## read by `channels/api.nim` messagingClient: MessagingClient ## The channel layer chains onto messaging. sendHandler*: SendHandler diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index d0fb33b90..3d72a142c 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -52,6 +52,12 @@ import logos_delivery/channels/api/send as channel_send export channel_send # ChannelMessage* events are surfaced via `export reliable_channel_manager`. +# Compile-time check that each layer's concrete type satisfies its API concept. +static: + doAssert Waku is KernelApi + doAssert MessagingClient is MessagingApi + doAssert ReliableChannelManager is ReliableChannelApi + import logos_delivery/waku/factory/waku_conf import logos_delivery/waku/factory/app_callbacks import tools/confutils/cli_args diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 4e02c7a82..e82c6d6fd 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -17,7 +17,7 @@ type ## follow-up PR. Today it only carries the p2p reliability toggle. useP2PReliability*: bool - MessagingClient* = ref object of IMessagingClient + MessagingClient* = ref object ## Implements `MessagingApi`. waku*: Waku ## The Waku kernel this layer drives; read by `messaging/api/*`. sendService*: SendService recvService*: RecvService diff --git a/logos_delivery/waku/api/peer_manager.nim b/logos_delivery/waku/api/peer_manager.nim index 26de46594..3a73051b8 100644 --- a/logos_delivery/waku/api/peer_manager.nim +++ b/logos_delivery/waku/api/peer_manager.nim @@ -8,10 +8,7 @@ import libp2p/[peerid, peerstore] import logos_delivery/waku/waku import logos_delivery/waku/[waku_core, node/waku_node, node/peer_manager] -type PeerConnInfo* = object ## structured connected-peer info for the api boundary - peerId*: string - protocols*: seq[string] - addresses*: seq[string] +# `PeerConnInfo` is defined in `api/types` (surfaced here via `import waku`). proc connect*( self: Waku, peers: seq[string], timeoutMs: uint32 diff --git a/logos_delivery/waku/waku.nim b/logos_delivery/waku/waku.nim index d3b3b9ead..12d5d5238 100644 --- a/logos_delivery/waku/waku.nim +++ b/logos_delivery/waku/waku.nim @@ -68,7 +68,7 @@ logScope: # Git version in git describe format (defined at compile time) const git_version* {.strdefine.} = "n/a" -type Waku* = ref object of IKernel +type Waku* = ref object ## Implements `KernelApi` (ops in `waku/api/*`). stateInfo*: WakuStateInfo conf*: WakuConf rng*: crypto.Rng From beaebf84d0a530418500f9314a4a0d461c409155 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 26 Jun 2026 12:30:07 +0200 Subject: [PATCH 2/3] api: drop the ILogosDelivery interface and its module LogosDelivery is the top of the layering and nothing abstracts over it (the FFI holds the concrete FFIContext[LogosDelivery]), so the ILogosDelivery RootObj base bought nothing: its `method`s only returned "not implemented" and were never dispatched. Remove the interface, keep LogosDelivery as a plain object, and demote its start/stop/isOnline from `method` to `proc` (static dispatch on the concrete type). That left `api/logos_delivery_api.nim` holding only EventConnectionStatusChange. Move it to `waku/api/events/health_events.nim` -- alongside the sibling topic health events and next to its emitter (the health monitor, which already imported that module) -- and delete the now-empty `logos_delivery_api.nim`. Co-Authored-By: Claude Opus 4.8 --- logos_delivery/api/logos_delivery_api.nim | 33 ------------------- logos_delivery/logos_delivery.nim | 11 +++---- .../recv_service/recv_service.nim | 2 +- .../waku/api/events/health_events.nim | 8 +++-- .../health_monitor/node_health_monitor.nim | 1 - 5 files changed, 10 insertions(+), 45 deletions(-) delete mode 100644 logos_delivery/api/logos_delivery_api.nim diff --git a/logos_delivery/api/logos_delivery_api.nim b/logos_delivery/api/logos_delivery_api.nim deleted file mode 100644 index 162159d8e..000000000 --- a/logos_delivery/api/logos_delivery_api.nim +++ /dev/null @@ -1,33 +0,0 @@ -## `LogosDelivery` is the project entry point. It is a pure concentrator: it -## owns exactly one instance of each API layer -## -## Waku <- MessagingClient <- ReliableChannelManager -## -## and chains them together (each layer drives the one below it). Every layer -## keeps its own, separate public API — `LogosDelivery` only wires them up and -## drives the shared `new` / `start` / `stop` lifecycle. - -{.push raises: [].} - -import results, chronos -import brokers/event_broker -import types as api_types - -export api_types, event_broker - -type - ## Entry point. Holds one instance of each API layer. - ILogosDelivery* = ref object of RootObj - -EventBroker: - type EventConnectionStatusChange* = object - connectionStatus*: ConnectionStatus - -method start*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} = - return err("ILogosDelivery.start not implemented") - -method stop*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} = - return err("ILogosDelivery.stop not implemented") - -method isOnline*(self: ILogosDelivery): Future[Result[bool, string]] {.async, base.} = - return err("ILogosDelivery.isOnline not implemented") diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index 3d72a142c..d9bc54ea0 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -11,9 +11,6 @@ import results, chronos, chronicles -import logos_delivery/api/logos_delivery_api -export logos_delivery_api - # Each layer has a core module (type + new/start/stop) and an api/ folder whose # modules each implement a differentiated set of operations, plus an events # surface. The concentrator re-exports them so library consumers get the full @@ -75,7 +72,7 @@ type messaging*: MessagingClientConf reliableChannel*: ReliableChannelManagerConf - LogosDelivery* = ref object of ILogosDelivery + LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer. waku*: Waku messagingClient*: MessagingClient @@ -120,7 +117,7 @@ proc new*( ) ) -method start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = +proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = ## Starts each layer bottom-up: transport first, then messaging, then channels. if self.waku.isNil(): return err("Waku node is not initialized") @@ -140,7 +137,7 @@ method start*(self: LogosDelivery): Future[Result[void, string]] {.async.} = return ok() -method stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = +proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = ## Stops in reverse order so higher layers drain before their dependencies. await self.reliableChannelManager.stop() await self.messagingClient.stop() @@ -150,7 +147,7 @@ method stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} = return ok() -method isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} = +proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} = if self.waku.isNil(): return err("Waku node is not initialized") return ok(self.waku.healthMonitor.onlineMonitor.amIOnline()) diff --git a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim index 1090be223..268e4c547 100644 --- a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim +++ b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim @@ -13,7 +13,7 @@ import import logos_delivery/api/kernel_api, # MessageSeenEvent logos_delivery/api/messaging_client_api, # MessageReceivedEvent - logos_delivery/api/logos_delivery_api # EventConnectionStatusChange + logos_delivery/waku/api/events/health_events # EventConnectionStatusChange const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages diff --git a/logos_delivery/waku/api/events/health_events.nim b/logos_delivery/waku/api/events/health_events.nim index fbf00debb..060b97eae 100644 --- a/logos_delivery/waku/api/events/health_events.nim +++ b/logos_delivery/waku/api/events/health_events.nim @@ -4,10 +4,12 @@ import logos_delivery/api/types import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health] import logos_delivery/waku/waku_core/topics -export protocol_health, topic_health +export event_broker, protocol_health, topic_health -# Note: `EventConnectionStatusChange` lives in `logos_delivery/api/logos_delivery_api` -# (the top-level orchestrator interface owns the node-connectivity event). +# Emitted by the health monitor when overall node connectivity changes. +EventBroker: + type EventConnectionStatusChange* = object + connectionStatus*: ConnectionStatus # Notify health changes to a subscribed topic # TODO: emit content topic health change events when subscribe/unsubscribe diff --git a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim index 1b2ea16e9..d55499d1a 100644 --- a/logos_delivery/waku/node/health_monitor/node_health_monitor.nim +++ b/logos_delivery/waku/node/health_monitor/node_health_monitor.nim @@ -9,7 +9,6 @@ import libp2p/protocols/pubsub, libp2p/protocols/pubsub/rpc/messages, logos_delivery/api/types, - logos_delivery/api/logos_delivery_api, # EventConnectionStatusChange logos_delivery/waku/[ waku_relay, waku_rln_relay, From 2636551a11547a2714067f6ab6d1226bf3e04f80 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 26 Jun 2026 15:07:31 +0200 Subject: [PATCH 3/3] nph update --- .../api/reliable_channel_manager_api.nim | 15 +++++++-------- logos_delivery/logos_delivery.nim | 3 +-- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/logos_delivery/api/reliable_channel_manager_api.nim b/logos_delivery/api/reliable_channel_manager_api.nim index 9c1b19019..8595f2f86 100644 --- a/logos_delivery/api/reliable_channel_manager_api.nim +++ b/logos_delivery/api/reliable_channel_manager_api.nim @@ -14,14 +14,13 @@ import logos_delivery/api/messaging_client_api export event_broker, api_types export channel_types, messaging_client_api -type - SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. - async: (raises: [CatchableError]), gcsafe - .} - ## Egress dispatch boundary. Typically wraps `MessagingClient.send`; - ## tests inject a fake that records calls and returns canned - ## `RequestId`s so the send state machine can be exercised end-to-end - ## without a network. +type SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe +.} + ## Egress dispatch boundary. Typically wraps `MessagingClient.send`; + ## tests inject a fake that records calls and returns canned + ## `RequestId`s so the send state machine can be exercised end-to-end + ## without a network. EventBroker: type ChannelMessageReceivedEvent* = object diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index d9bc54ea0..03ef5a2d0 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -72,8 +72,7 @@ type messaging*: MessagingClientConf reliableChannel*: ReliableChannelManagerConf - LogosDelivery* = ref object - ## Entry point. Holds one instance of each API layer. + LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer. waku*: Waku messagingClient*: MessagingClient reliableChannelManager*: ReliableChannelManager