mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-27 20:10:02 +00:00
Add reliable-channel FFI ops + events (nim-ffi v0.1.3)
Expose the reliable-channel layer through the v0.1.3 FFI: - channel_create / channel_send / channel_close call the ReliableChannelManager api (createReliableChannel / send / closeChannel), marshalling channel id + base64 payload + ephemeral by hand - channel message received / sent / errored are surfaced by listening to the channel-layer broker events in start_node and forwarding them through callEventCallback (received payload base64-encoded), dropped in stop_node Stays on nim-ffi v0.1.3 (no typed/CBOR rewrite). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
aca652008a
commit
369ad1b430
76
library/channels_api/channel_api.nim
Normal file
76
library/channels_api/channel_api.nim
Normal file
@ -0,0 +1,76 @@
|
||||
import std/json
|
||||
import chronos, results, ffi
|
||||
import
|
||||
logos_delivery/waku/common/base64,
|
||||
logos_delivery,
|
||||
logos_delivery/waku/waku_core/topics/content_topic,
|
||||
logos_delivery/api/types,
|
||||
../declare_lib
|
||||
|
||||
proc logosdelivery_channel_create(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
channelIdStr: cstring,
|
||||
contentTopicStr: cstring,
|
||||
senderIdStr: cstring,
|
||||
) {.ffi.} =
|
||||
requireInitializedNode(ctx, "ChannelCreate"):
|
||||
return err(errMsg)
|
||||
|
||||
let id = ctx.myLib[].reliableChannelManager.createReliableChannel(
|
||||
ChannelId($channelIdStr),
|
||||
ContentTopic($contentTopicStr),
|
||||
SdsParticipantID($senderIdStr),
|
||||
).valueOr:
|
||||
return err("ChannelCreate failed: " & $error)
|
||||
|
||||
return ok(string(id))
|
||||
|
||||
proc logosdelivery_channel_send(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
channelIdStr: cstring,
|
||||
messageJson: cstring,
|
||||
) {.ffi.} =
|
||||
## `messageJson` carries `{ "payload": <base64>, "ephemeral": <bool> }`.
|
||||
requireInitializedNode(ctx, "ChannelSend"):
|
||||
return err(errMsg)
|
||||
|
||||
var jsonNode: JsonNode
|
||||
try:
|
||||
jsonNode = parseJson($messageJson)
|
||||
except Exception as e:
|
||||
return err("Failed to parse channel message JSON: " & e.msg)
|
||||
|
||||
if not jsonNode.hasKey("payload"):
|
||||
return err("Missing payload field")
|
||||
|
||||
let payload = base64.decode(Base64String(jsonNode["payload"].getStr())).valueOr:
|
||||
return err("invalid payload format: " & error)
|
||||
|
||||
let ephemeral = jsonNode.getOrDefault("ephemeral").getBool(false)
|
||||
|
||||
let requestId = (
|
||||
await ctx.myLib[].reliableChannelManager.send(
|
||||
ChannelId($channelIdStr), payload, ephemeral
|
||||
)
|
||||
).valueOr:
|
||||
return err("ChannelSend failed: " & $error)
|
||||
|
||||
return ok($requestId)
|
||||
|
||||
proc logosdelivery_channel_close(
|
||||
ctx: ptr FFIContext[LogosDelivery],
|
||||
callback: FFICallBack,
|
||||
userData: pointer,
|
||||
channelIdStr: cstring,
|
||||
) {.ffi.} =
|
||||
requireInitializedNode(ctx, "ChannelClose"):
|
||||
return err(errMsg)
|
||||
|
||||
(await ctx.myLib[].reliableChannelManager.closeChannel(ChannelId($channelIdStr))).isOkOr:
|
||||
return err("ChannelClose failed: " & $error)
|
||||
|
||||
return ok("")
|
||||
@ -31,7 +31,8 @@ include
|
||||
./kernel_api/protocols/relay_api,
|
||||
./kernel_api/protocols/store_api,
|
||||
./kernel_api/protocols/lightpush_api,
|
||||
./kernel_api/protocols/filter_api
|
||||
./kernel_api/protocols/filter_api,
|
||||
./channels_api/channel_api
|
||||
|
||||
################################################################################
|
||||
### Exported procs (former libwaku API)
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import std/json
|
||||
import chronos, chronicles, results, ffi
|
||||
import logos_delivery/waku/common/base64
|
||||
import
|
||||
logos_delivery,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
@ -124,6 +125,40 @@ proc logosdelivery_start_node(
|
||||
chronicles.error "ConnectionStatusChange.listen failed", err = $error
|
||||
return err("ConnectionStatusChange.listen failed: " & $error)
|
||||
|
||||
let channelReceivedListener = ChannelMessageReceivedEvent.listen(
|
||||
ctx.myLib[].waku.brokerCtx,
|
||||
proc(event: ChannelMessageReceivedEvent) {.async: (raises: []).} =
|
||||
callEventCallback(ctx, "onChannelMessageReceived"):
|
||||
$(
|
||||
%*{
|
||||
"eventType": "channel_message_received",
|
||||
"channelId": string(event.channelId),
|
||||
"senderId": $event.senderId,
|
||||
"payload": string(base64.encode(event.payload)),
|
||||
}
|
||||
),
|
||||
).valueOr:
|
||||
chronicles.error "ChannelMessageReceivedEvent.listen failed", err = $error
|
||||
return err("ChannelMessageReceivedEvent.listen failed: " & $error)
|
||||
|
||||
let channelSentListener = ChannelMessageSentEvent.listen(
|
||||
ctx.myLib[].waku.brokerCtx,
|
||||
proc(event: ChannelMessageSentEvent) {.async: (raises: []).} =
|
||||
callEventCallback(ctx, "onChannelMessageSent"):
|
||||
$newJsonEvent("channel_message_sent", event),
|
||||
).valueOr:
|
||||
chronicles.error "ChannelMessageSentEvent.listen failed", err = $error
|
||||
return err("ChannelMessageSentEvent.listen failed: " & $error)
|
||||
|
||||
let channelErrorListener = ChannelMessageErrorEvent.listen(
|
||||
ctx.myLib[].waku.brokerCtx,
|
||||
proc(event: ChannelMessageErrorEvent) {.async: (raises: []).} =
|
||||
callEventCallback(ctx, "onChannelMessageError"):
|
||||
$newJsonEvent("channel_message_error", event),
|
||||
).valueOr:
|
||||
chronicles.error "ChannelMessageErrorEvent.listen failed", err = $error
|
||||
return err("ChannelMessageErrorEvent.listen failed: " & $error)
|
||||
|
||||
(await ctx.myLib[].start()).isOkOr:
|
||||
let errMsg = $error
|
||||
chronicles.error "START_NODE failed", err = errMsg
|
||||
@ -141,6 +176,9 @@ proc logosdelivery_stop_node(
|
||||
await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
|
||||
await MessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
|
||||
await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].waku.brokerCtx)
|
||||
await ChannelMessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
|
||||
await ChannelMessageSentEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
|
||||
await ChannelMessageErrorEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
|
||||
|
||||
(await ctx.myLib[].stop()).isOkOr:
|
||||
let errMsg = $error
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user