From c64d156f2a65101b65471eb2db2e5f475352a45a Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Thu, 25 Jun 2026 04:18:35 +0200 Subject: [PATCH] FFI: migrate liblogosdelivery root to nim-ffi v0.2.0-rc.3 Rewrite the FFI root over the new per-layer APIs using nim-ffi v0.2.0 typed {.ffiCtor.}/{.ffiDtor.}/{.ffi.}/{.ffiEvent.} + CBOR, replacing the hand-written cstring/JSON bridge. Events are fed by internal nim-broker listeners (no AppCallbacks). Adds the messaging_api/channels_api groups and the broker-listener event modules, and drops the v0.1 scaffolding (declare_lib, node_api, node_lifecycle_api, logos_delivery_api/*, json_*). Co-Authored-By: Claude Opus 4.8 --- .../channels_api/channel_lifecycle_api.nim | 24 +++ library/channels_api/events.nim | 33 ++++ library/channels_api/send_api.nim | 9 + library/declare_lib.nim | 33 ---- library/events/connection_change_events.nim | 12 ++ library/events/connection_status_events.nim | 12 ++ library/events/json_base_event.nim | 6 - .../events/json_connection_change_event.nim | 17 -- .../json_connection_status_change_event.nim | 15 -- library/events/json_message_event.nim | 106 ---------- .../events/json_topic_health_change_event.nim | 20 -- library/events/message_events.nim | 49 +++++ library/events/topic_health_events.nim | 12 ++ library/kernel_api/node_lifecycle_api.nim | 82 -------- library/liblogosdelivery.nim | 186 ++++++++---------- library/logos_delivery_api/debug_api.nim | 56 ------ library/logos_delivery_api/messaging_api.nim | 91 --------- library/logos_delivery_api/node_api.nim | 150 -------------- library/messaging_api/send_api.nim | 9 + library/messaging_api/subscriptions_api.nim | 13 ++ logos_delivery.nimble | 2 +- nimble.lock | 9 +- 22 files changed, 261 insertions(+), 685 deletions(-) create mode 100644 library/channels_api/channel_lifecycle_api.nim create mode 100644 library/channels_api/events.nim create mode 100644 library/channels_api/send_api.nim delete mode 100644 library/declare_lib.nim create mode 100644 library/events/connection_change_events.nim create mode 100644 library/events/connection_status_events.nim delete mode 100644 library/events/json_base_event.nim delete mode 100644 library/events/json_connection_change_event.nim delete mode 100644 library/events/json_connection_status_change_event.nim delete mode 100644 library/events/json_message_event.nim delete mode 100644 library/events/json_topic_health_change_event.nim create mode 100644 library/events/message_events.nim create mode 100644 library/events/topic_health_events.nim delete mode 100644 library/kernel_api/node_lifecycle_api.nim delete mode 100644 library/logos_delivery_api/debug_api.nim delete mode 100644 library/logos_delivery_api/messaging_api.nim delete mode 100644 library/logos_delivery_api/node_api.nim create mode 100644 library/messaging_api/send_api.nim create mode 100644 library/messaging_api/subscriptions_api.nim diff --git a/library/channels_api/channel_lifecycle_api.nim b/library/channels_api/channel_lifecycle_api.nim new file mode 100644 index 000000000..222005df1 --- /dev/null +++ b/library/channels_api/channel_lifecycle_api.nim @@ -0,0 +1,24 @@ +## Opaque handle to a live reliable channel. Holds the owning manager + the +## channel id so the channel ops (send / close) need no other context. Only its +## uint64 id crosses the FFI boundary; the object stays in the ctx registry. +type ReliableChannelHandle {.ffiHandle.} = ref object + manager: ReliableChannelManager + channelId: ChannelId + +proc channel_create*( + self: LogosDelivery, channelId: string, contentTopic: string, senderId: string +): Future[Result[ReliableChannelHandle, string]] {.ffi.} = + ## Creates a reliable channel and returns a handle to it. The send handler and + ## rng come from the manager; encryption providers are installed separately. + let id = self.reliableChannelManager.createReliableChannel( + ChannelId(channelId), ContentTopic(contentTopic), SdsParticipantID(senderId) + ).valueOr: + return err(error) + return ok(ReliableChannelHandle(manager: self.reliableChannelManager, channelId: id)) + +proc channel_close*(ch: ReliableChannelHandle): Future[Result[string, string]] {.ffi.} = + ## Stops the channel's SDS loops and deregisters it from the manager. + ## Persisted SDS state survives, so re-creating the channel restores it. + (await ch.manager.closeChannel(ch.channelId)).isOkOr: + return err(error) + return ok("") diff --git a/library/channels_api/events.nim b/library/channels_api/events.nim new file mode 100644 index 000000000..7df6d986e --- /dev/null +++ b/library/channels_api/events.nim @@ -0,0 +1,33 @@ +## Reliable-channel events: per-channel message received / sent / errored, +## fed by the channel-layer broker events. + +proc onChannelMessageReceived*( + channelId: string, senderId: string, payload: seq[byte] +) {.ffiEvent: "on_channel_message_received".} + +proc onChannelMessageSent*( + channelId: string, requestId: string +) {.ffiEvent: "on_channel_message_sent".} + +proc onChannelMessageError*( + channelId: string, requestId: string, error: string +) {.ffiEvent: "on_channel_message_error".} + +proc listenChannelEvents(self: LogosDelivery) = + let brokerCtx = self.waku.brokerCtx + + discard ChannelMessageReceivedEvent.listen( + brokerCtx, + proc(e: ChannelMessageReceivedEvent) {.async: (raises: []).} = + onChannelMessageReceived(string(e.channelId), $e.senderId, e.payload), + ) + discard ChannelMessageSentEvent.listen( + brokerCtx, + proc(e: ChannelMessageSentEvent) {.async: (raises: []).} = + onChannelMessageSent(string(e.channelId), $e.requestId), + ) + discard ChannelMessageErrorEvent.listen( + brokerCtx, + proc(e: ChannelMessageErrorEvent) {.async: (raises: []).} = + onChannelMessageError(string(e.channelId), $e.requestId, e.error), + ) diff --git a/library/channels_api/send_api.nim b/library/channels_api/send_api.nim new file mode 100644 index 000000000..594817751 --- /dev/null +++ b/library/channels_api/send_api.nim @@ -0,0 +1,9 @@ +proc channel_send*( + ch: ReliableChannelHandle, payload: seq[byte], ephemeral: bool +): Future[Result[string, string]] {.ffi.} = + ## Sends `payload` on the reliable channel. Routes through the messaging + ## layer (ReliableChannelManager.send -> MessagingClient.send); returns the + ## channel-layer request id. + let requestId = (await ch.manager.send(ch.channelId, payload, ephemeral)).valueOr: + return err(error) + return ok($requestId) diff --git a/library/declare_lib.nim b/library/declare_lib.nim deleted file mode 100644 index eaf8f6315..000000000 --- a/library/declare_lib.nim +++ /dev/null @@ -1,33 +0,0 @@ -import ffi -import std/locks -import logos_delivery - -declareLibrary("logosdelivery") - -var eventCallbackLock: Lock -initLock(eventCallbackLock) - -template requireInitializedNode*( - ctx: ptr FFIContext[LogosDelivery], opName: string, onError: untyped -) = - if isNil(ctx): - let errMsg {.inject.} = opName & " failed: invalid context" - onError - elif isNil(ctx.myLib) or isNil(ctx.myLib[]): - let errMsg {.inject.} = opName & " failed: node is not initialized" - onError - -proc logosdelivery_set_event_callback( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -) {.dynlib, exportc, cdecl.} = - if isNil(ctx): - echo "error: invalid context in logosdelivery_set_event_callback" - return - - # prevent race conditions that might happen due incorrect usage. - eventCallbackLock.acquire() - defer: - eventCallbackLock.release() - - ctx[].eventCallback = cast[pointer](callback) - ctx[].eventUserData = userData diff --git a/library/events/connection_change_events.nim b/library/events/connection_change_events.nim new file mode 100644 index 000000000..ecd45e544 --- /dev/null +++ b/library/events/connection_change_events.nim @@ -0,0 +1,12 @@ +## Per-peer connection changes (connected/disconnected/…), fed by WakuPeerEvent. + +proc onConnectionChange*( + peerId: string, event: string +) {.ffiEvent: "on_connection_change".} + +proc listenConnectionChangeEvents(self: LogosDelivery) = + discard WakuPeerEvent.listen( + self.waku.brokerCtx, + proc(e: WakuPeerEvent) {.async: (raises: []).} = + onConnectionChange($e.peerId, $e.kind), + ) diff --git a/library/events/connection_status_events.nim b/library/events/connection_status_events.nim new file mode 100644 index 000000000..c3d937a14 --- /dev/null +++ b/library/events/connection_status_events.nim @@ -0,0 +1,12 @@ +## Node connectivity (online/offline) status, fed by EventConnectionStatusChange. + +proc onConnectionStatusChange*( + status: string +) {.ffiEvent: "on_connection_status_change".} + +proc listenConnectionStatusEvents(self: LogosDelivery) = + discard EventConnectionStatusChange.listen( + self.waku.brokerCtx, + proc(e: EventConnectionStatusChange) {.async: (raises: []).} = + onConnectionStatusChange($e.connectionStatus), + ) diff --git a/library/events/json_base_event.nim b/library/events/json_base_event.nim deleted file mode 100644 index 8c51d2c4a..000000000 --- a/library/events/json_base_event.nim +++ /dev/null @@ -1,6 +0,0 @@ -type JsonEvent* = ref object of RootObj # https://rfc.vac.dev/spec/36/#jsonsignal-type - eventType* {.requiresInit.}: string - -method `$`*(jsonEvent: JsonEvent): string {.base.} = - discard - # All events should implement this diff --git a/library/events/json_connection_change_event.nim b/library/events/json_connection_change_event.nim deleted file mode 100644 index f78dcbe59..000000000 --- a/library/events/json_connection_change_event.nim +++ /dev/null @@ -1,17 +0,0 @@ -import system, std/json, libp2p/[connmanager, peerid] - -import ../../logos_delivery/waku/common/base64, ./json_base_event - -type JsonConnectionChangeEvent* = ref object of JsonEvent - peerId*: string - peerEvent*: PeerEventKind - -proc new*( - T: type JsonConnectionChangeEvent, peerId: string, peerEvent: PeerEventKind -): T = - return JsonConnectionChangeEvent( - eventType: "connection_change", peerId: peerId, peerEvent: peerEvent - ) - -method `$`*(jsonConnectionChangeEvent: JsonConnectionChangeEvent): string = - $(%*jsonConnectionChangeEvent) diff --git a/library/events/json_connection_status_change_event.nim b/library/events/json_connection_status_change_event.nim deleted file mode 100644 index f5af78f24..000000000 --- a/library/events/json_connection_status_change_event.nim +++ /dev/null @@ -1,15 +0,0 @@ -{.push raises: [].} - -import system, std/json -import ./json_base_event -import ../../logos_delivery/api/types - -type JsonConnectionStatusChangeEvent* = ref object of JsonEvent - status*: ConnectionStatus - -proc new*(T: type JsonConnectionStatusChangeEvent, status: ConnectionStatus): T = - return - JsonConnectionStatusChangeEvent(eventType: "node_health_change", status: status) - -method `$`*(event: JsonConnectionStatusChangeEvent): string = - $(%*event) diff --git a/library/events/json_message_event.nim b/library/events/json_message_event.nim deleted file mode 100644 index 61278b4fa..000000000 --- a/library/events/json_message_event.nim +++ /dev/null @@ -1,106 +0,0 @@ -import system, results, std/json, std/strutils -import stew/byteutils -import - ../../logos_delivery/waku/common/base64, - ../../logos_delivery/waku/waku_core/message, - ../../logos_delivery/waku/waku_core/message/message, - ../utils, - ./json_base_event - -type JsonMessage* = ref object # https://rfc.vac.dev/spec/36/#jsonmessage-type - payload*: Base64String - contentTopic*: string - version*: uint - timestamp*: int64 - ephemeral*: bool - meta*: Base64String - proof*: Base64String - -func fromJsonNode*( - T: type JsonMessage, jsonContent: JsonNode -): Result[JsonMessage, string] = - # Visit https://rfc.vac.dev/spec/14/ for further details - - # Check if required fields exist - if not jsonContent.hasKey("payload"): - return err("Missing required field in WakuMessage: payload") - if not jsonContent.hasKey("contentTopic"): - return err("Missing required field in WakuMessage: contentTopic") - - ok( - JsonMessage( - payload: Base64String(jsonContent["payload"].getStr()), - contentTopic: jsonContent["contentTopic"].getStr(), - version: uint32(jsonContent{"version"}.getInt()), - timestamp: (?jsonContent.getProtoInt64("timestamp")).get(0), - ephemeral: jsonContent{"ephemeral"}.getBool(), - meta: Base64String(jsonContent{"meta"}.getStr()), - proof: Base64String(jsonContent{"proof"}.getStr()), - ) - ) - -proc toWakuMessage*(self: JsonMessage): Result[WakuMessage, string] = - let payload = base64.decode(self.payload).valueOr: - return err("invalid payload format: " & error) - - let meta = base64.decode(self.meta).valueOr: - return err("invalid meta format: " & error) - - let proof = base64.decode(self.proof).valueOr: - return err("invalid proof format: " & error) - - ok( - WakuMessage( - payload: payload, - meta: meta, - contentTopic: self.contentTopic, - version: uint32(self.version), - timestamp: self.timestamp, - ephemeral: self.ephemeral, - proof: proof, - ) - ) - -proc `%`*(value: Base64String): JsonNode = - %(value.string) - -type JsonMessageEvent* = ref object of JsonEvent - pubsubTopic*: string - messageHash*: string - wakuMessage*: JsonMessage - -proc new*(T: type JsonMessageEvent, pubSubTopic: string, msg: WakuMessage): T = - # Returns a WakuMessage event as indicated in - # https://github.com/vacp2p/rfc/blob/master/content/docs/rfcs/36/README.md#jsonmessageevent-type - - var payload = newSeq[byte](len(msg.payload)) - if len(msg.payload) != 0: - copyMem(addr payload[0], unsafeAddr msg.payload[0], len(msg.payload)) - - var meta = newSeq[byte](len(msg.meta)) - if len(msg.meta) != 0: - copyMem(addr meta[0], unsafeAddr msg.meta[0], len(msg.meta)) - - var proof = newSeq[byte](len(msg.proof)) - if len(msg.proof) != 0: - copyMem(addr proof[0], unsafeAddr msg.proof[0], len(msg.proof)) - - let msgHash = computeMessageHash(pubSubTopic, msg) - - return JsonMessageEvent( - eventType: "message", - pubSubTopic: pubSubTopic, - messageHash: msgHash.to0xHex(), - wakuMessage: JsonMessage( - payload: base64.encode(payload), - contentTopic: msg.contentTopic, - version: msg.version, - timestamp: int64(msg.timestamp), - ephemeral: msg.ephemeral, - meta: base64.encode(meta), - proof: base64.encode(proof), - ), - ) - -method `$`*(jsonMessage: JsonMessageEvent): string = - $(%*jsonMessage) diff --git a/library/events/json_topic_health_change_event.nim b/library/events/json_topic_health_change_event.nim deleted file mode 100644 index 810e89b2e..000000000 --- a/library/events/json_topic_health_change_event.nim +++ /dev/null @@ -1,20 +0,0 @@ -import system, results, std/json -import stew/byteutils -import ../../logos_delivery/waku/common/base64, ./json_base_event -import ../../logos_delivery/waku/waku_relay - -type JsonTopicHealthChangeEvent* = ref object of JsonEvent - pubsubTopic*: string - topicHealth*: TopicHealth - -proc new*( - T: type JsonTopicHealthChangeEvent, pubsubTopic: string, topicHealth: TopicHealth -): T = - return JsonTopicHealthChangeEvent( - eventType: "relay_topic_health_change", - pubsubTopic: pubsubTopic, - topicHealth: topicHealth, - ) - -method `$`*(jsonTopicHealthChange: JsonTopicHealthChangeEvent): string = - $(%*jsonTopicHealthChange) diff --git a/library/events/message_events.nim b/library/events/message_events.nim new file mode 100644 index 000000000..ff5202bfa --- /dev/null +++ b/library/events/message_events.nim @@ -0,0 +1,49 @@ +## Message events: send lifecycle (sent/error/propagated/received) plus raw +## inbound network messages. Each FFI event is fed by an internal broker event. + +proc onMessageSent*( + requestId: string, messageHash: string +) {.ffiEvent: "on_message_sent".} + +proc onMessageError*( + requestId: string, messageHash: string, error: string +) {.ffiEvent: "on_message_error".} + +proc onMessagePropagated*( + requestId: string, messageHash: string +) {.ffiEvent: "on_message_propagated".} + +proc onMessageReceived*(messageHash: string) {.ffiEvent: "on_message_received".} + +proc onNetworkMessage*( + pubsubTopic: string, message: WakuMessage +) {.ffiEvent: "on_network_message".} + +proc listenMessageEvents(self: LogosDelivery) = + let brokerCtx = self.waku.brokerCtx + + discard MessageSentEvent.listen( + brokerCtx, + proc(e: MessageSentEvent) {.async: (raises: []).} = + onMessageSent($e.requestId, e.messageHash), + ) + discard MessageErrorEvent.listen( + brokerCtx, + proc(e: MessageErrorEvent) {.async: (raises: []).} = + onMessageError($e.requestId, e.messageHash, e.error), + ) + discard MessagePropagatedEvent.listen( + brokerCtx, + proc(e: MessagePropagatedEvent) {.async: (raises: []).} = + onMessagePropagated($e.requestId, e.messageHash), + ) + discard MessageReceivedEvent.listen( + brokerCtx, + proc(e: MessageReceivedEvent) {.async: (raises: []).} = + onMessageReceived(e.messageHash), + ) + discard MessageSeenEvent.listen( + brokerCtx, + proc(e: MessageSeenEvent) {.async: (raises: []).} = + onNetworkMessage(string(e.topic), e.message), + ) diff --git a/library/events/topic_health_events.nim b/library/events/topic_health_events.nim new file mode 100644 index 000000000..6e81fff86 --- /dev/null +++ b/library/events/topic_health_events.nim @@ -0,0 +1,12 @@ +## Per-shard (pubsub topic) health changes, fed by EventShardTopicHealthChange. + +proc onTopicHealthChange*( + pubsubTopic: string, health: string +) {.ffiEvent: "on_topic_health_change".} + +proc listenTopicHealthEvents(self: LogosDelivery) = + discard EventShardTopicHealthChange.listen( + self.waku.brokerCtx, + proc(e: EventShardTopicHealthChange) {.async: (raises: []).} = + onTopicHealthChange(string(e.topic), $e.health), + ) diff --git a/library/kernel_api/node_lifecycle_api.nim b/library/kernel_api/node_lifecycle_api.nim deleted file mode 100644 index dbf7ad9b1..000000000 --- a/library/kernel_api/node_lifecycle_api.nim +++ /dev/null @@ -1,82 +0,0 @@ -import logos_delivery/waku/compat/option_valueor -import std/[options, json, strutils, net] -import chronos, chronicles, results, confutils, confutils/std/net, ffi - -import - logos_delivery/waku/node/peer_manager/peer_manager, - tools/confutils/cli_args, - logos_delivery/waku/waku, - logos_delivery/waku/factory/node_factory, - logos_delivery/waku/factory/app_callbacks, - logos_delivery/waku/rest_api/endpoint/builder, - library/declare_lib - -proc createWaku( - configJson: cstring, appCallbacks: AppCallbacks = nil -): Future[Result[LogosDelivery, string]] {.async.} = - var conf = defaultWakuNodeConf().valueOr: - return err("Failed creating node: " & error) - - var errorResp: string - - var jsonNode: JsonNode - try: - jsonNode = parseJson($configJson) - except Exception: - return err( - "exception in createWaku when calling parseJson: " & getCurrentExceptionMsg() & - " configJson string: " & $configJson - ) - - for confField, confValue in fieldPairs(conf): - if jsonNode.contains(confField): - # Make sure string doesn't contain the leading or trailing " character - let formattedString = ($jsonNode[confField]).strip(chars = {'\"'}) - # Override conf field with the value set in the json-string - try: - confValue = parseCmdArg(typeof(confValue), formattedString) - except Exception: - return err( - "exception in createWaku when parsing configuration. exc: " & - getCurrentExceptionMsg() & ". string that could not be parsed: " & - formattedString & ". expected type: " & $typeof(confValue) - ) - - # Don't send relay app callbacks if relay is disabled - if not conf.relay and not appCallbacks.isNil(): - appCallbacks.relayHandler = nil - appCallbacks.topicHealthChangeHandler = nil - - conf.rest = false ## libwaku never runs the REST server - - let logosRes = (await LogosDelivery.new(conf, appCallbacks)).valueOr: - error "LogosDelivery initialization failed", error = error - return err("Failed setting up LogosDelivery: " & $error) - - return ok(logosRes) - -registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[LogosDelivery]): - proc( - configJson: cstring, appCallbacks: AppCallbacks - ): Future[Result[string, string]] {.async.} = - ctx.myLib[] = (await createWaku(configJson, cast[AppCallbacks](appCallbacks))).valueOr: - error "CreateNodeWithCallbacksRequest failed", error = error - return err($error) - - return ok("") - -proc waku_start( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -) {.ffi.} = - (await ctx.myLib[].start()).isOkOr: - error "START_NODE failed", error = error - return err("failed to start: " & $error) - return ok("") - -proc waku_stop( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -) {.ffi.} = - (await ctx.myLib[].stop()).isOkOr: - error "STOP_NODE failed", error = error - return err("failed to stop: " & $error) - return ok("") diff --git a/library/liblogosdelivery.nim b/library/liblogosdelivery.nim index 0ead080bc..d5530cd1b 100644 --- a/library/liblogosdelivery.nim +++ b/library/liblogosdelivery.nim @@ -1,113 +1,91 @@ -import logos_delivery/waku/compat/option_valueor -import std/[atomics, options, macros] -import chronicles, chronos, chronos/threadsync, ffi -import - logos_delivery/waku/waku_core/message/message, - logos_delivery/waku/waku_core/topics/pubsub_topic, - logos_delivery/waku/waku_relay, - logos_delivery, - logos_delivery/waku/waku, - logos_delivery/waku/node/waku_node, - logos_delivery/waku/node/health_monitor/health_status, - ../logos_delivery/waku/factory/app_callbacks, - ./events/json_message_event, - ./events/json_topic_health_change_event, - ./events/json_connection_change_event, - ./events/json_connection_status_change_event, - ./declare_lib +## C FFI library root (nim-ffi v0.2.0). +## +## The FFI context owns one `LogosDelivery` (the per-layer concentrator). The +## v0.2.0 framework generates the C ABI, CBOR (de)serialization and the request +## channel from the `{.ffiCtor.}` / `{.ffiDtor.}` / `{.ffi.}` / `{.ffiEvent.}` +## annotations below and in the included api modules; `genBindings()` (last +## call) emits the foreign-language bindings under `-d:ffiGenBindings`. +import ffi +import std/strutils +import chronos, results, chronicles -################################################################################ -## Include different APIs, i.e. all procs with {.ffi.} pragma +import logos_delivery +import logos_delivery/api/types +import tools/confutils/conf_from_json +import logos_delivery/waku/api/events/peer_events +import logos_delivery/waku/waku_core +declareLibrary("logosdelivery", LogosDelivery, defaultABIFormat = "cbor") + +# --- shared wire types ----------------------------------------------------- +type PeerConnInfoFFI* {.ffi.} = object + peerId: string + protocols: seq[string] + addresses: seq[string] + +# --- library-initiated events (one {.ffi.} type-set + listener per file) ----- include - ./logos_delivery_api/node_api, - ./logos_delivery_api/messaging_api, - ./logos_delivery_api/debug_api, - ./kernel_api/peer_manager_api, - ./kernel_api/discovery_api, - ./kernel_api/node_lifecycle_api, + ./events/message_events, + ./events/connection_status_events, + ./events/topic_health_events, + ./events/connection_change_events, + ./channels_api/events + +proc listenInternalEvents(self: LogosDelivery) = + ## Feed every FFI event from an internal nim-broker event. + ## Listener handles are discarded on purpose: the listeners live for the node's lifetime. + self.listenMessageEvents() + self.listenConnectionStatusEvents() + self.listenTopicHealthEvents() + self.listenConnectionChangeEvents() + self.listenChannelEvents() + +# --- constructor / destructor ---------------------------------------------- +proc logosdelivery_create*( + configJson: string +): Future[Result[LogosDelivery, string]] {.ffiCtor.} = + let conf = parseNodeConfFromJson(configJson).valueOr: + return err("failed to parse node config: " & error) + + let logos = (await LogosDelivery.new(conf)).valueOr: + return err("failed to create LogosDelivery: " & error) + + logos.listenInternalEvents() + + return ok(logos) + +proc logosdelivery_destroy*(self: LogosDelivery) {.ffiDtor.} = + ## The framework drains the FFI thread and frees the context; callers stop the + ## node via `logosdelivery_stop` first. + discard + +# --- lifecycle ------------------------------------------------------------- +proc start*(self: LogosDelivery): Future[Result[string, string]] {.ffi.} = + (await self.start()).isOkOr: + return err(error) + return ok("") + +proc stop*(self: LogosDelivery): Future[Result[string, string]] {.ffi.} = + (await self.stop()).isOkOr: + return err(error) + return ok("") + +# --- operations (typed {.ffi.} procs, grouped per layer/protocol) ---------- +include + ./messaging_api/subscriptions_api, + ./messaging_api/send_api, + ./channels_api/channel_lifecycle_api, + ./channels_api/send_api, + ./kernel_api/node_info_api, ./kernel_api/debug_node_api, ./kernel_api/ping_api, + ./kernel_api/peer_manager_api, + ./kernel_api/discovery_api, ./kernel_api/protocols/relay_api, - ./kernel_api/protocols/store_api, ./kernel_api/protocols/lightpush_api, + ./kernel_api/protocols/store_api, ./kernel_api/protocols/filter_api -################################################################################ -### Exported procs (former libwaku API) - -proc waku_new( - configJson: cstring, callback: FFICallback, userData: pointer -): pointer {.dynlib, exportc, cdecl.} = - initializeLibrary() - - ## Creates a new instance of the WakuNode. - if isNil(callback): - echo "error: missing callback in waku_new" - return nil - - ## Create the Waku thread that will keep waiting for req from the main thread. - var ctx = ffi.createFFIContext[LogosDelivery]().valueOr: - let msg = "Error in createFFIContext: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return nil - - ctx.userData = userData - - proc onReceivedMessage(ctx: ptr FFIContext): WakuRelayHandler = - return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = - callEventCallback(ctx, "onReceivedMessage"): - $JsonMessageEvent.new(pubsubTopic, msg) - - proc onTopicHealthChange(ctx: ptr FFIContext): TopicHealthChangeHandler = - return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} = - callEventCallback(ctx, "onTopicHealthChange"): - $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth) - - proc onConnectionChange(ctx: ptr FFIContext): ConnectionChangeHandler = - return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} = - callEventCallback(ctx, "onConnectionChange"): - $JsonConnectionChangeEvent.new($peerId, peerEvent) - - proc onConnectionStatusChange(ctx: ptr FFIContext): ConnectionStatusChangeHandler = - return proc(status: ConnectionStatus) {.async.} = - callEventCallback(ctx, "onConnectionStatusChange"): - $JsonConnectionStatusChangeEvent.new(status) - - let appCallbacks = AppCallbacks( - relayHandler: onReceivedMessage(ctx), - topicHealthChangeHandler: onTopicHealthChange(ctx), - connectionChangeHandler: onConnectionChange(ctx), - connectionStatusChangeHandler: onConnectionStatusChange(ctx), - ) - - ffi.sendRequestToFFIThread( - ctx, - CreateNodeWithCallbacksRequest.ffiNewReq( - callback, userData, configJson, appCallbacks - ), - ).isOkOr: - let msg = "error in sendRequestToFFIThread: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return nil - - return ctx - -proc waku_destroy( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -): cint {.dynlib, exportc, cdecl.} = - initializeLibrary() - checkParams(ctx, callback, userData) - - ffi.destroyFFIContext(ctx).isOkOr: - let msg = "libwaku error: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return RET_ERR - - ## always need to invoke the callback although we don't retrieve value to the caller - callback(RET_OK, nil, 0, userData) - - return RET_OK - -# ### End of exported procs -# ################################################################################ +# genBindings() MUST be the last top-level call — after every {.ffi.}, +# {.ffiCtor.}, {.ffiDtor.} and {.ffiEvent.} pragma (incl. the included files). +genBindings() diff --git a/library/logos_delivery_api/debug_api.nim b/library/logos_delivery_api/debug_api.nim deleted file mode 100644 index 98d48c97c..000000000 --- a/library/logos_delivery_api/debug_api.nim +++ /dev/null @@ -1,56 +0,0 @@ -import std/[json, strutils] -import logos_delivery/waku/factory/waku_state_info -import tools/confutils/[cli_args, config_option_meta] - -proc logosdelivery_get_available_node_info_ids( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -) {.ffi.} = - ## Returns the list of all available node info item ids that - ## can be queried with `get_node_info_item`. - requireInitializedNode(ctx, "GetNodeInfoIds"): - return err(errMsg) - - return ok($ctx.myLib[].waku.stateInfo.getAllPossibleInfoItemIds()) - -proc logosdelivery_get_node_info( - ctx: ptr FFIContext[LogosDelivery], - callback: FFICallBack, - userData: pointer, - nodeInfoId: cstring, -) {.ffi.} = - ## Returns the content of the node info item with the given id if it exists. - requireInitializedNode(ctx, "GetNodeInfoItem"): - return err(errMsg) - - let infoItemIdEnum = - try: - parseEnum[NodeInfoId]($nodeInfoId) - except ValueError: - return err("Invalid node info id: " & $nodeInfoId) - - return ok(ctx.myLib[].waku.stateInfo.getNodeInfoItem(infoItemIdEnum)) - -proc logosdelivery_get_available_configs( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -) {.ffi.} = - ## Returns information about the accepted config items. - requireInitializedNode(ctx, "GetAvailableConfigs"): - return err(errMsg) - - let optionMetas: seq[ConfigOptionMeta] = extractConfigOptionMeta(WakuNodeConf) - var configOptionDetails = newJArray() - - # for confField, confValue in fieldPairs(conf): - # defaultConfig[confField] = $confValue - - for meta in optionMetas: - configOptionDetails.add( - %*{ - meta.fieldName: meta.typeName & "(" & meta.defaultValue & ")", "desc": meta.desc - } - ) - - var jsonNode = newJObject() - jsonNode["configOptions"] = configOptionDetails - let asString = pretty(jsonNode) - return ok(pretty(jsonNode)) diff --git a/library/logos_delivery_api/messaging_api.nim b/library/logos_delivery_api/messaging_api.nim deleted file mode 100644 index 0e4dc7449..000000000 --- a/library/logos_delivery_api/messaging_api.nim +++ /dev/null @@ -1,91 +0,0 @@ -import std/[json] -import chronos, results, ffi -import stew/byteutils -import - logos_delivery/waku/common/base64, - logos_delivery/waku/waku, - logos_delivery/waku/waku_core/topics/content_topic, - logos_delivery/api/types, - ../declare_lib - -proc logosdelivery_subscribe( - ctx: ptr FFIContext[LogosDelivery], - callback: FFICallBack, - userData: pointer, - contentTopicStr: cstring, -) {.ffi.} = - requireInitializedNode(ctx, "Subscribe"): - return err(errMsg) - - # ContentTopic is just a string type alias - let contentTopic = ContentTopic($contentTopicStr) - - (await ctx.myLib[].messagingClient.subscribe(contentTopic)).isOkOr: - let errMsg = $error - return err("Subscribe failed: " & errMsg) - - return ok("") - -proc logosdelivery_unsubscribe( - ctx: ptr FFIContext[LogosDelivery], - callback: FFICallBack, - userData: pointer, - contentTopicStr: cstring, -) {.ffi.} = - requireInitializedNode(ctx, "Unsubscribe"): - return err(errMsg) - - # ContentTopic is just a string type alias - let contentTopic = ContentTopic($contentTopicStr) - - ctx.myLib[].messagingClient.unsubscribe(contentTopic).isOkOr: - let errMsg = $error - return err("Unsubscribe failed: " & errMsg) - - return ok("") - -proc logosdelivery_send( - ctx: ptr FFIContext[LogosDelivery], - callback: FFICallBack, - userData: pointer, - messageJson: cstring, -) {.ffi.} = - requireInitializedNode(ctx, "Send"): - return err(errMsg) - - ## Parse the message JSON and send the message - var jsonNode: JsonNode - try: - jsonNode = parseJson($messageJson) - except Exception as e: - return err("Failed to parse message JSON: " & e.msg) - - # Extract content topic - if not jsonNode.hasKey("contentTopic"): - return err("Missing contentTopic field") - - # ContentTopic is just a string type alias - let contentTopic = ContentTopic(jsonNode["contentTopic"].getStr()) - - # Extract payload (expect base64 encoded string) - if not jsonNode.hasKey("payload"): - return err("Missing payload field") - - let payloadStr = jsonNode["payload"].getStr() - let payload = base64.decode(Base64String(payloadStr)).valueOr: - return err("invalid payload format: " & error) - - # Extract ephemeral flag - let ephemeral = jsonNode.getOrDefault("ephemeral").getBool(false) - - # Create message envelope - let envelope = MessageEnvelope.init( - contentTopic = contentTopic, payload = payload, ephemeral = ephemeral - ) - - # Send the message via the messaging layer's own API. - let requestId = (await ctx.myLib[].messagingClient.send(envelope)).valueOr: - let errMsg = $error - return err("Send failed: " & errMsg) - - return ok($requestId) diff --git a/library/logos_delivery_api/node_api.nim b/library/logos_delivery_api/node_api.nim deleted file mode 100644 index 354ab3a98..000000000 --- a/library/logos_delivery_api/node_api.nim +++ /dev/null @@ -1,150 +0,0 @@ -import std/json -import chronos, chronicles, results, ffi -import - logos_delivery, - logos_delivery/waku/node/waku_node, - logos_delivery/waku/events/message_events, - logos_delivery/api/types, - logos_delivery/waku/events/[message_events, health_events], - tools/confutils/conf_from_json, - ../declare_lib, - ../json_event - -# Add JSON serialization for RequestId -proc `%`*(id: RequestId): JsonNode = - %($id) - -registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[LogosDelivery]): - proc(configJson: cstring): Future[Result[string, string]] {.async.} = - let conf = parseNodeConfFromJson($configJson).valueOr: - error "Failed to assemble WakuNodeConf from JSON", - error = error, configJson = $configJson - return err("failed parseNodeConfFromJson " & error) - - ctx.myLib[] = (await LogosDelivery.new(conf)).valueOr: - let errMsg = $error - chronicles.error "CreateNodeRequest failed", err = errMsg - return err(errMsg) - - return ok("") - -proc logosdelivery_destroy( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -): cint {.dynlib, exportc, cdecl.} = - initializeLibrary() - checkParams(ctx, callback, userData) - - ffi.destroyFFIContext(ctx).isOkOr: - let msg = "liblogosdelivery error: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return RET_ERR - - ## always need to invoke the callback although we don't retrieve value to the caller - callback(RET_OK, nil, 0, userData) - - return RET_OK - -proc logosdelivery_create_node( - configJson: cstring, callback: FFICallback, userData: pointer -): pointer {.dynlib, exportc, cdecl.} = - initializeLibrary() - - if callback.isNil(): - echo "error: missing callback in logosdelivery_create_node" - return nil - - var ctx = ffi.createFFIContext[LogosDelivery]().valueOr: - let msg = "Error in createFFIContext: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return nil - - ctx.userData = userData - - ffi.sendRequestToFFIThread( - ctx, CreateNodeRequest.ffiNewReq(callback, userData, configJson) - ).isOkOr: - let msg = "error in sendRequestToFFIThread: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - # free allocated resources as they won't be available - ffi.destroyFFIContext(ctx).isOkOr: - chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation", - err = $error - return nil - - return ctx - -proc logosdelivery_start_node( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -) {.ffi.} = - requireInitializedNode(ctx, "START_NODE"): - return err(errMsg) - - # setting up outgoing event listeners - let sentListener = MessageSentEvent.listen( - ctx.myLib[].waku.brokerCtx, - proc(event: MessageSentEvent) {.async: (raises: []).} = - callEventCallback(ctx, "onMessageSent"): - $newJsonEvent("message_sent", event), - ).valueOr: - chronicles.error "MessageSentEvent.listen failed", err = $error - return err("MessageSentEvent.listen failed: " & $error) - - let errorListener = MessageErrorEvent.listen( - ctx.myLib[].waku.brokerCtx, - proc(event: MessageErrorEvent) {.async: (raises: []).} = - callEventCallback(ctx, "onMessageError"): - $newJsonEvent("message_error", event), - ).valueOr: - chronicles.error "MessageErrorEvent.listen failed", err = $error - return err("MessageErrorEvent.listen failed: " & $error) - - let propagatedListener = MessagePropagatedEvent.listen( - ctx.myLib[].waku.brokerCtx, - proc(event: MessagePropagatedEvent) {.async: (raises: []).} = - callEventCallback(ctx, "onMessagePropagated"): - $newJsonEvent("message_propagated", event), - ).valueOr: - chronicles.error "MessagePropagatedEvent.listen failed", err = $error - return err("MessagePropagatedEvent.listen failed: " & $error) - - let receivedListener = MessageReceivedEvent.listen( - ctx.myLib[].waku.brokerCtx, - proc(event: MessageReceivedEvent) {.async: (raises: []).} = - callEventCallback(ctx, "onMessageReceived"): - $newJsonEvent("message_received", event), - ).valueOr: - chronicles.error "MessageReceivedEvent.listen failed", err = $error - return err("MessageReceivedEvent.listen failed: " & $error) - - let ConnectionStatusChangeListener = EventConnectionStatusChange.listen( - ctx.myLib[].waku.brokerCtx, - proc(event: EventConnectionStatusChange) {.async: (raises: []).} = - callEventCallback(ctx, "onConnectionStatusChange"): - $newJsonEvent("connection_status_change", event), - ).valueOr: - chronicles.error "ConnectionStatusChange.listen failed", err = $error - return err("ConnectionStatusChange.listen failed: " & $error) - - (await ctx.myLib[].start()).isOkOr: - let errMsg = $error - chronicles.error "START_NODE failed", err = errMsg - return err("failed to start: " & errMsg) - return ok("") - -proc logosdelivery_stop_node( - ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer -) {.ffi.} = - requireInitializedNode(ctx, "STOP_NODE"): - return err(errMsg) - - await MessageErrorEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) - await MessageSentEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) - await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) - await MessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx) - await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].waku.brokerCtx) - - (await ctx.myLib[].stop()).isOkOr: - let errMsg = $error - chronicles.error "STOP_NODE failed", err = errMsg - return err("failed to stop: " & errMsg) - return ok("") diff --git a/library/messaging_api/send_api.nim b/library/messaging_api/send_api.nim new file mode 100644 index 000000000..dbd133c50 --- /dev/null +++ b/library/messaging_api/send_api.nim @@ -0,0 +1,9 @@ +proc messaging_send*( + self: LogosDelivery, contentTopic: string, payload: seq[byte], ephemeral: bool +): Future[Result[string, string]] {.ffi.} = + let envelope = MessageEnvelope.init( + contentTopic = ContentTopic(contentTopic), payload = payload, ephemeral = ephemeral + ) + let requestId = (await self.messagingClient.send(envelope)).valueOr: + return err(error) + return ok($requestId) diff --git a/library/messaging_api/subscriptions_api.nim b/library/messaging_api/subscriptions_api.nim new file mode 100644 index 000000000..3593f8ee7 --- /dev/null +++ b/library/messaging_api/subscriptions_api.nim @@ -0,0 +1,13 @@ +proc subscribe*( + self: LogosDelivery, contentTopic: string +): Future[Result[string, string]] {.ffi.} = + (await self.messagingClient.subscribe(ContentTopic(contentTopic))).isOkOr: + return err(error) + return ok("") + +proc unsubscribe*( + self: LogosDelivery, contentTopic: string +): Future[Result[string, string]] {.ffi.} = + self.messagingClient.unsubscribe(ContentTopic(contentTopic)).isOkOr: + return err(error) + return ok("") diff --git a/logos_delivery.nimble b/logos_delivery.nimble index efe118595..a9172936b 100644 --- a/logos_delivery.nimble +++ b/logos_delivery.nimble @@ -61,7 +61,7 @@ requires "nim >= 2.2.4", # Packages not on nimble (use git URLs) -requires "https://github.com/logos-messaging/nim-ffi#v0.1.3" +requires "https://github.com/logos-messaging/nim-ffi#v0.2.0-rc.3" requires "https://github.com/logos-messaging/nim-sds.git#b12f5ee07c5b764303b51fb948b32a4ade1de3b5" diff --git a/nimble.lock b/nimble.lock index 18ebde258..13e35eac4 100644 --- a/nimble.lock +++ b/nimble.lock @@ -643,18 +643,19 @@ } }, "ffi": { - "version": "0.1.3", - "vcsRevision": "06111de155253b34e47ed2aaed1d61d08d62cc1b", + "version": "#v0.2.0-rc.3", + "vcsRevision": "8f15afce5c377a0e5ee53c35b228025b903604ea", "url": "https://github.com/logos-messaging/nim-ffi", "downloadMethod": "git", "dependencies": [ "nim", "chronos", "chronicles", - "taskpools" + "taskpools", + "cbor_serialization" ], "checksums": { - "sha1": "6f9d49375ea1dc71add55c72ac80a808f238e5b0" + "sha1": "7f00eaaa01ce59a0c1603e6fb8757ba712f9a53e" } }, "boringssl": {