From 369ad1b430b319767b38675e21a59d46b7dfd474 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Thu, 25 Jun 2026 15:27:55 +0200 Subject: [PATCH] Add reliable-channel FFI ops + events (nim-ffi v0.1.3) Expose the reliable-channel layer through the v0.1.3 FFI: - channel_create / channel_send / channel_close call the ReliableChannelManager api (createReliableChannel / send / closeChannel), marshalling channel id + base64 payload + ephemeral by hand - channel message received / sent / errored are surfaced by listening to the channel-layer broker events in start_node and forwarding them through callEventCallback (received payload base64-encoded), dropped in stop_node Stays on nim-ffi v0.1.3 (no typed/CBOR rewrite). Co-Authored-By: Claude Opus 4.8 --- library/channels_api/channel_api.nim | 76 +++++++++++++++++++++++++ library/liblogosdelivery.nim | 3 +- library/logos_delivery_api/node_api.nim | 38 +++++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 library/channels_api/channel_api.nim diff --git a/library/channels_api/channel_api.nim b/library/channels_api/channel_api.nim new file mode 100644 index 000000000..b25ad78ed --- /dev/null +++ b/library/channels_api/channel_api.nim @@ -0,0 +1,76 @@ +import std/json +import chronos, results, ffi +import + logos_delivery/waku/common/base64, + logos_delivery, + logos_delivery/waku/waku_core/topics/content_topic, + logos_delivery/api/types, + ../declare_lib + +proc logosdelivery_channel_create( + ctx: ptr FFIContext[LogosDelivery], + callback: FFICallBack, + userData: pointer, + channelIdStr: cstring, + contentTopicStr: cstring, + senderIdStr: cstring, +) {.ffi.} = + requireInitializedNode(ctx, "ChannelCreate"): + return err(errMsg) + + let id = ctx.myLib[].reliableChannelManager.createReliableChannel( + ChannelId($channelIdStr), + ContentTopic($contentTopicStr), + SdsParticipantID($senderIdStr), + ).valueOr: + return err("ChannelCreate failed: " & $error) + + return ok(string(id)) + +proc logosdelivery_channel_send( + ctx: ptr FFIContext[LogosDelivery], + callback: FFICallBack, + userData: pointer, + channelIdStr: cstring, + messageJson: cstring, +) {.ffi.} = + ## `messageJson` carries `{ "payload": , "ephemeral": }`. + requireInitializedNode(ctx, "ChannelSend"): + return err(errMsg) + + var jsonNode: JsonNode + try: + jsonNode = parseJson($messageJson) + except Exception as e: + return err("Failed to parse channel message JSON: " & e.msg) + + if not jsonNode.hasKey("payload"): + return err("Missing payload field") + + let payload = base64.decode(Base64String(jsonNode["payload"].getStr())).valueOr: + return err("invalid payload format: " & error) + + let ephemeral = jsonNode.getOrDefault("ephemeral").getBool(false) + + let requestId = ( + await ctx.myLib[].reliableChannelManager.send( + ChannelId($channelIdStr), payload, ephemeral + ) + ).valueOr: + return err("ChannelSend failed: " & $error) + + return ok($requestId) + +proc logosdelivery_channel_close( + ctx: ptr FFIContext[LogosDelivery], + callback: FFICallBack, + userData: pointer, + channelIdStr: cstring, +) {.ffi.} = + requireInitializedNode(ctx, "ChannelClose"): + return err(errMsg) + + (await ctx.myLib[].reliableChannelManager.closeChannel(ChannelId($channelIdStr))).isOkOr: + return err("ChannelClose failed: " & $error) + + return ok("") diff --git a/library/liblogosdelivery.nim b/library/liblogosdelivery.nim index 0ead080bc..755503e7c 100644 --- a/library/liblogosdelivery.nim +++ b/library/liblogosdelivery.nim @@ -31,7 +31,8 @@ include ./kernel_api/protocols/relay_api, ./kernel_api/protocols/store_api, ./kernel_api/protocols/lightpush_api, - ./kernel_api/protocols/filter_api + ./kernel_api/protocols/filter_api, + ./channels_api/channel_api ################################################################################ ### Exported procs (former libwaku API) diff --git a/library/logos_delivery_api/node_api.nim b/library/logos_delivery_api/node_api.nim index 0a5436bc8..cfee2f5e2 100644 --- a/library/logos_delivery_api/node_api.nim +++ b/library/logos_delivery_api/node_api.nim @@ -1,5 +1,6 @@ import std/json import chronos, chronicles, results, ffi +import logos_delivery/waku/common/base64 import logos_delivery, logos_delivery/waku/node/waku_node, @@ -124,6 +125,40 @@ proc logosdelivery_start_node( chronicles.error "ConnectionStatusChange.listen failed", err = $error return err("ConnectionStatusChange.listen failed: " & $error) + let channelReceivedListener = ChannelMessageReceivedEvent.listen( + ctx.myLib[].waku.brokerCtx, + proc(event: ChannelMessageReceivedEvent) {.async: (raises: []).} = + callEventCallback(ctx, "onChannelMessageReceived"): + $( + %*{ + "eventType": "channel_message_received", + "channelId": string(event.channelId), + "senderId": $event.senderId, + "payload": string(base64.encode(event.payload)), + } + ), + ).valueOr: + chronicles.error "ChannelMessageReceivedEvent.listen failed", err = $error + return err("ChannelMessageReceivedEvent.listen failed: " & $error) + + let channelSentListener = ChannelMessageSentEvent.listen( + ctx.myLib[].waku.brokerCtx, + proc(event: ChannelMessageSentEvent) {.async: (raises: []).} = + callEventCallback(ctx, "onChannelMessageSent"): + $newJsonEvent("channel_message_sent", event), + ).valueOr: + chronicles.error "ChannelMessageSentEvent.listen failed", err = $error + return err("ChannelMessageSentEvent.listen failed: " & $error) + + let channelErrorListener = ChannelMessageErrorEvent.listen( + ctx.myLib[].waku.brokerCtx, + proc(event: ChannelMessageErrorEvent) {.async: (raises: []).} = + callEventCallback(ctx, "onChannelMessageError"): + $newJsonEvent("channel_message_error", event), + ).valueOr: + chronicles.error "ChannelMessageErrorEvent.listen failed", err = $error + return err("ChannelMessageErrorEvent.listen failed: " & $error) + (await ctx.myLib[].start()).isOkOr: let errMsg = $error chronicles.error "START_NODE failed", err = errMsg @@ -141,6 +176,9 @@ proc logosdelivery_stop_node( await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) await MessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].waku.brokerCtx) + await ChannelMessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) + await ChannelMessageSentEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) + await ChannelMessageErrorEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) (await ctx.myLib[].stop()).isOkOr: let errMsg = $error