From 46666044f2716bf52aaa2a1a0623ebe80c83567c Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Thu, 25 Jun 2026 12:12:49 +0200 Subject: [PATCH] channels_api: reliable-channel FFI (create/send/close + events) Add library/channels_api over the reliable-channel logic: channel_create returns a {.ffiHandle.} ReliableChannelHandle, channel_send/channel_close operate on it, and the channel message received/sent/error events are fed by the channel-layer broker events. Wire them into the FFI root. Co-Authored-By: Claude Opus 4.8 --- .../channels_api/channel_lifecycle_api.nim | 24 ++++++++++++++ library/channels_api/events.nim | 33 +++++++++++++++++++ library/channels_api/send_api.nim | 9 +++++ library/liblogosdelivery.nim | 8 +++-- 4 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 library/channels_api/channel_lifecycle_api.nim create mode 100644 library/channels_api/events.nim create mode 100644 library/channels_api/send_api.nim diff --git a/library/channels_api/channel_lifecycle_api.nim b/library/channels_api/channel_lifecycle_api.nim new file mode 100644 index 000000000..222005df1 --- /dev/null +++ b/library/channels_api/channel_lifecycle_api.nim @@ -0,0 +1,24 @@ +## Opaque handle to a live reliable channel. Holds the owning manager + the +## channel id so the channel ops (send / close) need no other context. Only its +## uint64 id crosses the FFI boundary; the object stays in the ctx registry. +type ReliableChannelHandle {.ffiHandle.} = ref object + manager: ReliableChannelManager + channelId: ChannelId + +proc channel_create*( + self: LogosDelivery, channelId: string, contentTopic: string, senderId: string +): Future[Result[ReliableChannelHandle, string]] {.ffi.} = + ## Creates a reliable channel and returns a handle to it. The send handler and + ## rng come from the manager; encryption providers are installed separately. + let id = self.reliableChannelManager.createReliableChannel( + ChannelId(channelId), ContentTopic(contentTopic), SdsParticipantID(senderId) + ).valueOr: + return err(error) + return ok(ReliableChannelHandle(manager: self.reliableChannelManager, channelId: id)) + +proc channel_close*(ch: ReliableChannelHandle): Future[Result[string, string]] {.ffi.} = + ## Stops the channel's SDS loops and deregisters it from the manager. + ## Persisted SDS state survives, so re-creating the channel restores it. + (await ch.manager.closeChannel(ch.channelId)).isOkOr: + return err(error) + return ok("") diff --git a/library/channels_api/events.nim b/library/channels_api/events.nim new file mode 100644 index 000000000..7df6d986e --- /dev/null +++ b/library/channels_api/events.nim @@ -0,0 +1,33 @@ +## Reliable-channel events: per-channel message received / sent / errored, +## fed by the channel-layer broker events. + +proc onChannelMessageReceived*( + channelId: string, senderId: string, payload: seq[byte] +) {.ffiEvent: "on_channel_message_received".} + +proc onChannelMessageSent*( + channelId: string, requestId: string +) {.ffiEvent: "on_channel_message_sent".} + +proc onChannelMessageError*( + channelId: string, requestId: string, error: string +) {.ffiEvent: "on_channel_message_error".} + +proc listenChannelEvents(self: LogosDelivery) = + let brokerCtx = self.waku.brokerCtx + + discard ChannelMessageReceivedEvent.listen( + brokerCtx, + proc(e: ChannelMessageReceivedEvent) {.async: (raises: []).} = + onChannelMessageReceived(string(e.channelId), $e.senderId, e.payload), + ) + discard ChannelMessageSentEvent.listen( + brokerCtx, + proc(e: ChannelMessageSentEvent) {.async: (raises: []).} = + onChannelMessageSent(string(e.channelId), $e.requestId), + ) + discard ChannelMessageErrorEvent.listen( + brokerCtx, + proc(e: ChannelMessageErrorEvent) {.async: (raises: []).} = + onChannelMessageError(string(e.channelId), $e.requestId, e.error), + ) diff --git a/library/channels_api/send_api.nim b/library/channels_api/send_api.nim new file mode 100644 index 000000000..594817751 --- /dev/null +++ b/library/channels_api/send_api.nim @@ -0,0 +1,9 @@ +proc channel_send*( + ch: ReliableChannelHandle, payload: seq[byte], ephemeral: bool +): Future[Result[string, string]] {.ffi.} = + ## Sends `payload` on the reliable channel. Routes through the messaging + ## layer (ReliableChannelManager.send -> MessagingClient.send); returns the + ## channel-layer request id. + let requestId = (await ch.manager.send(ch.channelId, payload, ephemeral)).valueOr: + return err(error) + return ok($requestId) diff --git a/library/liblogosdelivery.nim b/library/liblogosdelivery.nim index 06321fc4b..d5530cd1b 100644 --- a/library/liblogosdelivery.nim +++ b/library/liblogosdelivery.nim @@ -28,7 +28,8 @@ include ./events/message_events, ./events/connection_status_events, ./events/topic_health_events, - ./events/connection_change_events + ./events/connection_change_events, + ./channels_api/events proc listenInternalEvents(self: LogosDelivery) = ## Feed every FFI event from an internal nim-broker event. @@ -37,6 +38,7 @@ proc listenInternalEvents(self: LogosDelivery) = self.listenConnectionStatusEvents() self.listenTopicHealthEvents() self.listenConnectionChangeEvents() + self.listenChannelEvents() # --- constructor / destructor ---------------------------------------------- proc logosdelivery_create*( @@ -68,10 +70,12 @@ proc stop*(self: LogosDelivery): Future[Result[string, string]] {.ffi.} = return err(error) return ok("") -# --- operations (typed {.ffi.} procs, grouped per protocol) ---------------- +# --- operations (typed {.ffi.} procs, grouped per layer/protocol) ---------- include ./messaging_api/subscriptions_api, ./messaging_api/send_api, + ./channels_api/channel_lifecycle_api, + ./channels_api/send_api, ./kernel_api/node_info_api, ./kernel_api/debug_node_api, ./kernel_api/ping_api,