channels_api: reliable-channel FFI (create/send/close + events)

Add library/channels_api over the reliable-channel logic: channel_create
returns a {.ffiHandle.} ReliableChannelHandle, channel_send/channel_close
operate on it, and the channel message received/sent/error events are fed by
the channel-layer broker events. Wire them into the FFI root.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-25 12:12:49 +02:00
parent e59703fad7
commit 46666044f2
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
4 changed files with 72 additions and 2 deletions

View File

@ -0,0 +1,24 @@
## Opaque handle to a live reliable channel. Holds the owning manager + the
## channel id so the channel ops (send / close) need no other context. Only its
## uint64 id crosses the FFI boundary; the object stays in the ctx registry.
type ReliableChannelHandle {.ffiHandle.} = ref object
manager: ReliableChannelManager
channelId: ChannelId
proc channel_create*(
self: LogosDelivery, channelId: string, contentTopic: string, senderId: string
): Future[Result[ReliableChannelHandle, string]] {.ffi.} =
## Creates a reliable channel and returns a handle to it. The send handler and
## rng come from the manager; encryption providers are installed separately.
let id = self.reliableChannelManager.createReliableChannel(
ChannelId(channelId), ContentTopic(contentTopic), SdsParticipantID(senderId)
).valueOr:
return err(error)
return ok(ReliableChannelHandle(manager: self.reliableChannelManager, channelId: id))
proc channel_close*(ch: ReliableChannelHandle): Future[Result[string, string]] {.ffi.} =
## Stops the channel's SDS loops and deregisters it from the manager.
## Persisted SDS state survives, so re-creating the channel restores it.
(await ch.manager.closeChannel(ch.channelId)).isOkOr:
return err(error)
return ok("")

View File

@ -0,0 +1,33 @@
## Reliable-channel events: per-channel message received / sent / errored,
## fed by the channel-layer broker events.
proc onChannelMessageReceived*(
channelId: string, senderId: string, payload: seq[byte]
) {.ffiEvent: "on_channel_message_received".}
proc onChannelMessageSent*(
channelId: string, requestId: string
) {.ffiEvent: "on_channel_message_sent".}
proc onChannelMessageError*(
channelId: string, requestId: string, error: string
) {.ffiEvent: "on_channel_message_error".}
proc listenChannelEvents(self: LogosDelivery) =
let brokerCtx = self.waku.brokerCtx
discard ChannelMessageReceivedEvent.listen(
brokerCtx,
proc(e: ChannelMessageReceivedEvent) {.async: (raises: []).} =
onChannelMessageReceived(string(e.channelId), $e.senderId, e.payload),
)
discard ChannelMessageSentEvent.listen(
brokerCtx,
proc(e: ChannelMessageSentEvent) {.async: (raises: []).} =
onChannelMessageSent(string(e.channelId), $e.requestId),
)
discard ChannelMessageErrorEvent.listen(
brokerCtx,
proc(e: ChannelMessageErrorEvent) {.async: (raises: []).} =
onChannelMessageError(string(e.channelId), $e.requestId, e.error),
)

View File

@ -0,0 +1,9 @@
proc channel_send*(
ch: ReliableChannelHandle, payload: seq[byte], ephemeral: bool
): Future[Result[string, string]] {.ffi.} =
## Sends `payload` on the reliable channel. Routes through the messaging
## layer (ReliableChannelManager.send -> MessagingClient.send); returns the
## channel-layer request id.
let requestId = (await ch.manager.send(ch.channelId, payload, ephemeral)).valueOr:
return err(error)
return ok($requestId)

View File

@ -28,7 +28,8 @@ include
./events/message_events,
./events/connection_status_events,
./events/topic_health_events,
./events/connection_change_events
./events/connection_change_events,
./channels_api/events
proc listenInternalEvents(self: LogosDelivery) =
## Feed every FFI event from an internal nim-broker event.
@ -37,6 +38,7 @@ proc listenInternalEvents(self: LogosDelivery) =
self.listenConnectionStatusEvents()
self.listenTopicHealthEvents()
self.listenConnectionChangeEvents()
self.listenChannelEvents()
# --- constructor / destructor ----------------------------------------------
proc logosdelivery_create*(
@ -68,10 +70,12 @@ proc stop*(self: LogosDelivery): Future[Result[string, string]] {.ffi.} =
return err(error)
return ok("")
# --- operations (typed {.ffi.} procs, grouped per protocol) ----------------
# --- operations (typed {.ffi.} procs, grouped per layer/protocol) ----------
include
./messaging_api/subscriptions_api,
./messaging_api/send_api,
./channels_api/channel_lifecycle_api,
./channels_api/send_api,
./kernel_api/node_info_api,
./kernel_api/debug_node_api,
./kernel_api/ping_api,