FFI: migrate liblogosdelivery root to nim-ffi v0.2.0-rc.3

Rewrite the FFI root over the new per-layer APIs using nim-ffi v0.2.0 typed
{.ffiCtor.}/{.ffiDtor.}/{.ffi.}/{.ffiEvent.} + CBOR, replacing the
hand-written cstring/JSON bridge. Events are fed by internal nim-broker
listeners (no AppCallbacks). Adds the messaging_api/channels_api groups and
the broker-listener event modules, and drops the v0.1 scaffolding
(declare_lib, node_api, node_lifecycle_api, logos_delivery_api/*, json_*).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-25 04:18:35 +02:00
parent 4dfe72e12f
commit c64d156f2a
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
22 changed files with 261 additions and 685 deletions

View File

@ -0,0 +1,24 @@
## Opaque handle to a live reliable channel. Holds the owning manager + the
## channel id so the channel ops (send / close) need no other context. Only its
## uint64 id crosses the FFI boundary; the object stays in the ctx registry.
type ReliableChannelHandle {.ffiHandle.} = ref object
manager: ReliableChannelManager
channelId: ChannelId
proc channel_create*(
self: LogosDelivery, channelId: string, contentTopic: string, senderId: string
): Future[Result[ReliableChannelHandle, string]] {.ffi.} =
## Creates a reliable channel and returns a handle to it. The send handler and
## rng come from the manager; encryption providers are installed separately.
let id = self.reliableChannelManager.createReliableChannel(
ChannelId(channelId), ContentTopic(contentTopic), SdsParticipantID(senderId)
).valueOr:
return err(error)
return ok(ReliableChannelHandle(manager: self.reliableChannelManager, channelId: id))
proc channel_close*(ch: ReliableChannelHandle): Future[Result[string, string]] {.ffi.} =
## Stops the channel's SDS loops and deregisters it from the manager.
## Persisted SDS state survives, so re-creating the channel restores it.
(await ch.manager.closeChannel(ch.channelId)).isOkOr:
return err(error)
return ok("")

View File

@ -0,0 +1,33 @@
## Reliable-channel events: per-channel message received / sent / errored,
## fed by the channel-layer broker events.
proc onChannelMessageReceived*(
channelId: string, senderId: string, payload: seq[byte]
) {.ffiEvent: "on_channel_message_received".}
proc onChannelMessageSent*(
channelId: string, requestId: string
) {.ffiEvent: "on_channel_message_sent".}
proc onChannelMessageError*(
channelId: string, requestId: string, error: string
) {.ffiEvent: "on_channel_message_error".}
proc listenChannelEvents(self: LogosDelivery) =
let brokerCtx = self.waku.brokerCtx
discard ChannelMessageReceivedEvent.listen(
brokerCtx,
proc(e: ChannelMessageReceivedEvent) {.async: (raises: []).} =
onChannelMessageReceived(string(e.channelId), $e.senderId, e.payload),
)
discard ChannelMessageSentEvent.listen(
brokerCtx,
proc(e: ChannelMessageSentEvent) {.async: (raises: []).} =
onChannelMessageSent(string(e.channelId), $e.requestId),
)
discard ChannelMessageErrorEvent.listen(
brokerCtx,
proc(e: ChannelMessageErrorEvent) {.async: (raises: []).} =
onChannelMessageError(string(e.channelId), $e.requestId, e.error),
)

View File

@ -0,0 +1,9 @@
proc channel_send*(
ch: ReliableChannelHandle, payload: seq[byte], ephemeral: bool
): Future[Result[string, string]] {.ffi.} =
## Sends `payload` on the reliable channel. Routes through the messaging
## layer (ReliableChannelManager.send -> MessagingClient.send); returns the
## channel-layer request id.
let requestId = (await ch.manager.send(ch.channelId, payload, ephemeral)).valueOr:
return err(error)
return ok($requestId)

View File

@ -1,33 +0,0 @@
import ffi
import std/locks
import logos_delivery
declareLibrary("logosdelivery")
var eventCallbackLock: Lock
initLock(eventCallbackLock)
template requireInitializedNode*(
ctx: ptr FFIContext[LogosDelivery], opName: string, onError: untyped
) =
if isNil(ctx):
let errMsg {.inject.} = opName & " failed: invalid context"
onError
elif isNil(ctx.myLib) or isNil(ctx.myLib[]):
let errMsg {.inject.} = opName & " failed: node is not initialized"
onError
proc logosdelivery_set_event_callback(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.dynlib, exportc, cdecl.} =
if isNil(ctx):
echo "error: invalid context in logosdelivery_set_event_callback"
return
# prevent race conditions that might happen due incorrect usage.
eventCallbackLock.acquire()
defer:
eventCallbackLock.release()
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData

View File

@ -0,0 +1,12 @@
## Per-peer connection changes (connected/disconnected/…), fed by WakuPeerEvent.
proc onConnectionChange*(
peerId: string, event: string
) {.ffiEvent: "on_connection_change".}
proc listenConnectionChangeEvents(self: LogosDelivery) =
discard WakuPeerEvent.listen(
self.waku.brokerCtx,
proc(e: WakuPeerEvent) {.async: (raises: []).} =
onConnectionChange($e.peerId, $e.kind),
)

View File

@ -0,0 +1,12 @@
## Node connectivity (online/offline) status, fed by EventConnectionStatusChange.
proc onConnectionStatusChange*(
status: string
) {.ffiEvent: "on_connection_status_change".}
proc listenConnectionStatusEvents(self: LogosDelivery) =
discard EventConnectionStatusChange.listen(
self.waku.brokerCtx,
proc(e: EventConnectionStatusChange) {.async: (raises: []).} =
onConnectionStatusChange($e.connectionStatus),
)

View File

@ -1,6 +0,0 @@
type JsonEvent* = ref object of RootObj # https://rfc.vac.dev/spec/36/#jsonsignal-type
eventType* {.requiresInit.}: string
method `$`*(jsonEvent: JsonEvent): string {.base.} =
discard
# All events should implement this

View File

@ -1,17 +0,0 @@
import system, std/json, libp2p/[connmanager, peerid]
import ../../logos_delivery/waku/common/base64, ./json_base_event
type JsonConnectionChangeEvent* = ref object of JsonEvent
peerId*: string
peerEvent*: PeerEventKind
proc new*(
T: type JsonConnectionChangeEvent, peerId: string, peerEvent: PeerEventKind
): T =
return JsonConnectionChangeEvent(
eventType: "connection_change", peerId: peerId, peerEvent: peerEvent
)
method `$`*(jsonConnectionChangeEvent: JsonConnectionChangeEvent): string =
$(%*jsonConnectionChangeEvent)

View File

@ -1,15 +0,0 @@
{.push raises: [].}
import system, std/json
import ./json_base_event
import ../../logos_delivery/api/types
type JsonConnectionStatusChangeEvent* = ref object of JsonEvent
status*: ConnectionStatus
proc new*(T: type JsonConnectionStatusChangeEvent, status: ConnectionStatus): T =
return
JsonConnectionStatusChangeEvent(eventType: "node_health_change", status: status)
method `$`*(event: JsonConnectionStatusChangeEvent): string =
$(%*event)

View File

@ -1,106 +0,0 @@
import system, results, std/json, std/strutils
import stew/byteutils
import
../../logos_delivery/waku/common/base64,
../../logos_delivery/waku/waku_core/message,
../../logos_delivery/waku/waku_core/message/message,
../utils,
./json_base_event
type JsonMessage* = ref object # https://rfc.vac.dev/spec/36/#jsonmessage-type
payload*: Base64String
contentTopic*: string
version*: uint
timestamp*: int64
ephemeral*: bool
meta*: Base64String
proof*: Base64String
func fromJsonNode*(
T: type JsonMessage, jsonContent: JsonNode
): Result[JsonMessage, string] =
# Visit https://rfc.vac.dev/spec/14/ for further details
# Check if required fields exist
if not jsonContent.hasKey("payload"):
return err("Missing required field in WakuMessage: payload")
if not jsonContent.hasKey("contentTopic"):
return err("Missing required field in WakuMessage: contentTopic")
ok(
JsonMessage(
payload: Base64String(jsonContent["payload"].getStr()),
contentTopic: jsonContent["contentTopic"].getStr(),
version: uint32(jsonContent{"version"}.getInt()),
timestamp: (?jsonContent.getProtoInt64("timestamp")).get(0),
ephemeral: jsonContent{"ephemeral"}.getBool(),
meta: Base64String(jsonContent{"meta"}.getStr()),
proof: Base64String(jsonContent{"proof"}.getStr()),
)
)
proc toWakuMessage*(self: JsonMessage): Result[WakuMessage, string] =
let payload = base64.decode(self.payload).valueOr:
return err("invalid payload format: " & error)
let meta = base64.decode(self.meta).valueOr:
return err("invalid meta format: " & error)
let proof = base64.decode(self.proof).valueOr:
return err("invalid proof format: " & error)
ok(
WakuMessage(
payload: payload,
meta: meta,
contentTopic: self.contentTopic,
version: uint32(self.version),
timestamp: self.timestamp,
ephemeral: self.ephemeral,
proof: proof,
)
)
proc `%`*(value: Base64String): JsonNode =
%(value.string)
type JsonMessageEvent* = ref object of JsonEvent
pubsubTopic*: string
messageHash*: string
wakuMessage*: JsonMessage
proc new*(T: type JsonMessageEvent, pubSubTopic: string, msg: WakuMessage): T =
# Returns a WakuMessage event as indicated in
# https://github.com/vacp2p/rfc/blob/master/content/docs/rfcs/36/README.md#jsonmessageevent-type
var payload = newSeq[byte](len(msg.payload))
if len(msg.payload) != 0:
copyMem(addr payload[0], unsafeAddr msg.payload[0], len(msg.payload))
var meta = newSeq[byte](len(msg.meta))
if len(msg.meta) != 0:
copyMem(addr meta[0], unsafeAddr msg.meta[0], len(msg.meta))
var proof = newSeq[byte](len(msg.proof))
if len(msg.proof) != 0:
copyMem(addr proof[0], unsafeAddr msg.proof[0], len(msg.proof))
let msgHash = computeMessageHash(pubSubTopic, msg)
return JsonMessageEvent(
eventType: "message",
pubSubTopic: pubSubTopic,
messageHash: msgHash.to0xHex(),
wakuMessage: JsonMessage(
payload: base64.encode(payload),
contentTopic: msg.contentTopic,
version: msg.version,
timestamp: int64(msg.timestamp),
ephemeral: msg.ephemeral,
meta: base64.encode(meta),
proof: base64.encode(proof),
),
)
method `$`*(jsonMessage: JsonMessageEvent): string =
$(%*jsonMessage)

View File

@ -1,20 +0,0 @@
import system, results, std/json
import stew/byteutils
import ../../logos_delivery/waku/common/base64, ./json_base_event
import ../../logos_delivery/waku/waku_relay
type JsonTopicHealthChangeEvent* = ref object of JsonEvent
pubsubTopic*: string
topicHealth*: TopicHealth
proc new*(
T: type JsonTopicHealthChangeEvent, pubsubTopic: string, topicHealth: TopicHealth
): T =
return JsonTopicHealthChangeEvent(
eventType: "relay_topic_health_change",
pubsubTopic: pubsubTopic,
topicHealth: topicHealth,
)
method `$`*(jsonTopicHealthChange: JsonTopicHealthChangeEvent): string =
$(%*jsonTopicHealthChange)

View File

@ -0,0 +1,49 @@
## Message events: send lifecycle (sent/error/propagated/received) plus raw
## inbound network messages. Each FFI event is fed by an internal broker event.
proc onMessageSent*(
requestId: string, messageHash: string
) {.ffiEvent: "on_message_sent".}
proc onMessageError*(
requestId: string, messageHash: string, error: string
) {.ffiEvent: "on_message_error".}
proc onMessagePropagated*(
requestId: string, messageHash: string
) {.ffiEvent: "on_message_propagated".}
proc onMessageReceived*(messageHash: string) {.ffiEvent: "on_message_received".}
proc onNetworkMessage*(
pubsubTopic: string, message: WakuMessage
) {.ffiEvent: "on_network_message".}
proc listenMessageEvents(self: LogosDelivery) =
let brokerCtx = self.waku.brokerCtx
discard MessageSentEvent.listen(
brokerCtx,
proc(e: MessageSentEvent) {.async: (raises: []).} =
onMessageSent($e.requestId, e.messageHash),
)
discard MessageErrorEvent.listen(
brokerCtx,
proc(e: MessageErrorEvent) {.async: (raises: []).} =
onMessageError($e.requestId, e.messageHash, e.error),
)
discard MessagePropagatedEvent.listen(
brokerCtx,
proc(e: MessagePropagatedEvent) {.async: (raises: []).} =
onMessagePropagated($e.requestId, e.messageHash),
)
discard MessageReceivedEvent.listen(
brokerCtx,
proc(e: MessageReceivedEvent) {.async: (raises: []).} =
onMessageReceived(e.messageHash),
)
discard MessageSeenEvent.listen(
brokerCtx,
proc(e: MessageSeenEvent) {.async: (raises: []).} =
onNetworkMessage(string(e.topic), e.message),
)

View File

@ -0,0 +1,12 @@
## Per-shard (pubsub topic) health changes, fed by EventShardTopicHealthChange.
proc onTopicHealthChange*(
pubsubTopic: string, health: string
) {.ffiEvent: "on_topic_health_change".}
proc listenTopicHealthEvents(self: LogosDelivery) =
discard EventShardTopicHealthChange.listen(
self.waku.brokerCtx,
proc(e: EventShardTopicHealthChange) {.async: (raises: []).} =
onTopicHealthChange(string(e.topic), $e.health),
)

View File

@ -1,82 +0,0 @@
import logos_delivery/waku/compat/option_valueor
import std/[options, json, strutils, net]
import chronos, chronicles, results, confutils, confutils/std/net, ffi
import
logos_delivery/waku/node/peer_manager/peer_manager,
tools/confutils/cli_args,
logos_delivery/waku/waku,
logos_delivery/waku/factory/node_factory,
logos_delivery/waku/factory/app_callbacks,
logos_delivery/waku/rest_api/endpoint/builder,
library/declare_lib
proc createWaku(
configJson: cstring, appCallbacks: AppCallbacks = nil
): Future[Result[LogosDelivery, string]] {.async.} =
var conf = defaultWakuNodeConf().valueOr:
return err("Failed creating node: " & error)
var errorResp: string
var jsonNode: JsonNode
try:
jsonNode = parseJson($configJson)
except Exception:
return err(
"exception in createWaku when calling parseJson: " & getCurrentExceptionMsg() &
" configJson string: " & $configJson
)
for confField, confValue in fieldPairs(conf):
if jsonNode.contains(confField):
# Make sure string doesn't contain the leading or trailing " character
let formattedString = ($jsonNode[confField]).strip(chars = {'\"'})
# Override conf field with the value set in the json-string
try:
confValue = parseCmdArg(typeof(confValue), formattedString)
except Exception:
return err(
"exception in createWaku when parsing configuration. exc: " &
getCurrentExceptionMsg() & ". string that could not be parsed: " &
formattedString & ". expected type: " & $typeof(confValue)
)
# Don't send relay app callbacks if relay is disabled
if not conf.relay and not appCallbacks.isNil():
appCallbacks.relayHandler = nil
appCallbacks.topicHealthChangeHandler = nil
conf.rest = false ## libwaku never runs the REST server
let logosRes = (await LogosDelivery.new(conf, appCallbacks)).valueOr:
error "LogosDelivery initialization failed", error = error
return err("Failed setting up LogosDelivery: " & $error)
return ok(logosRes)
registerReqFFI(CreateNodeWithCallbacksRequest, ctx: ptr FFIContext[LogosDelivery]):
proc(
configJson: cstring, appCallbacks: AppCallbacks
): Future[Result[string, string]] {.async.} =
ctx.myLib[] = (await createWaku(configJson, cast[AppCallbacks](appCallbacks))).valueOr:
error "CreateNodeWithCallbacksRequest failed", error = error
return err($error)
return ok("")
proc waku_start(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].start()).isOkOr:
error "START_NODE failed", error = error
return err("failed to start: " & $error)
return ok("")
proc waku_stop(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].stop()).isOkOr:
error "STOP_NODE failed", error = error
return err("failed to stop: " & $error)
return ok("")

View File

@ -1,113 +1,91 @@
import logos_delivery/waku/compat/option_valueor
import std/[atomics, options, macros]
import chronicles, chronos, chronos/threadsync, ffi
import
logos_delivery/waku/waku_core/message/message,
logos_delivery/waku/waku_core/topics/pubsub_topic,
logos_delivery/waku/waku_relay,
logos_delivery,
logos_delivery/waku/waku,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/node/health_monitor/health_status,
../logos_delivery/waku/factory/app_callbacks,
./events/json_message_event,
./events/json_topic_health_change_event,
./events/json_connection_change_event,
./events/json_connection_status_change_event,
./declare_lib
## C FFI library root (nim-ffi v0.2.0).
##
## The FFI context owns one `LogosDelivery` (the per-layer concentrator). The
## v0.2.0 framework generates the C ABI, CBOR (de)serialization and the request
## channel from the `{.ffiCtor.}` / `{.ffiDtor.}` / `{.ffi.}` / `{.ffiEvent.}`
## annotations below and in the included api modules; `genBindings()` (last
## call) emits the foreign-language bindings under `-d:ffiGenBindings`.
import ffi
import std/strutils
import chronos, results, chronicles
################################################################################
## Include different APIs, i.e. all procs with {.ffi.} pragma
import logos_delivery
import logos_delivery/api/types
import tools/confutils/conf_from_json
import logos_delivery/waku/api/events/peer_events
import logos_delivery/waku/waku_core
declareLibrary("logosdelivery", LogosDelivery, defaultABIFormat = "cbor")
# --- shared wire types -----------------------------------------------------
type PeerConnInfoFFI* {.ffi.} = object
peerId: string
protocols: seq[string]
addresses: seq[string]
# --- library-initiated events (one {.ffi.} type-set + listener per file) -----
include
./logos_delivery_api/node_api,
./logos_delivery_api/messaging_api,
./logos_delivery_api/debug_api,
./kernel_api/peer_manager_api,
./kernel_api/discovery_api,
./kernel_api/node_lifecycle_api,
./events/message_events,
./events/connection_status_events,
./events/topic_health_events,
./events/connection_change_events,
./channels_api/events
proc listenInternalEvents(self: LogosDelivery) =
## Feed every FFI event from an internal nim-broker event.
## Listener handles are discarded on purpose: the listeners live for the node's lifetime.
self.listenMessageEvents()
self.listenConnectionStatusEvents()
self.listenTopicHealthEvents()
self.listenConnectionChangeEvents()
self.listenChannelEvents()
# --- constructor / destructor ----------------------------------------------
proc logosdelivery_create*(
configJson: string
): Future[Result[LogosDelivery, string]] {.ffiCtor.} =
let conf = parseNodeConfFromJson(configJson).valueOr:
return err("failed to parse node config: " & error)
let logos = (await LogosDelivery.new(conf)).valueOr:
return err("failed to create LogosDelivery: " & error)
logos.listenInternalEvents()
return ok(logos)
proc logosdelivery_destroy*(self: LogosDelivery) {.ffiDtor.} =
## The framework drains the FFI thread and frees the context; callers stop the
## node via `logosdelivery_stop` first.
discard
# --- lifecycle -------------------------------------------------------------
proc start*(self: LogosDelivery): Future[Result[string, string]] {.ffi.} =
(await self.start()).isOkOr:
return err(error)
return ok("")
proc stop*(self: LogosDelivery): Future[Result[string, string]] {.ffi.} =
(await self.stop()).isOkOr:
return err(error)
return ok("")
# --- operations (typed {.ffi.} procs, grouped per layer/protocol) ----------
include
./messaging_api/subscriptions_api,
./messaging_api/send_api,
./channels_api/channel_lifecycle_api,
./channels_api/send_api,
./kernel_api/node_info_api,
./kernel_api/debug_node_api,
./kernel_api/ping_api,
./kernel_api/peer_manager_api,
./kernel_api/discovery_api,
./kernel_api/protocols/relay_api,
./kernel_api/protocols/store_api,
./kernel_api/protocols/lightpush_api,
./kernel_api/protocols/store_api,
./kernel_api/protocols/filter_api
################################################################################
### Exported procs (former libwaku API)
proc waku_new(
configJson: cstring, callback: FFICallback, userData: pointer
): pointer {.dynlib, exportc, cdecl.} =
initializeLibrary()
## Creates a new instance of the WakuNode.
if isNil(callback):
echo "error: missing callback in waku_new"
return nil
## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = ffi.createFFIContext[LogosDelivery]().valueOr:
let msg = "Error in createFFIContext: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
ctx.userData = userData
proc onReceivedMessage(ctx: ptr FFIContext): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
proc onTopicHealthChange(ctx: ptr FFIContext): TopicHealthChangeHandler =
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
callEventCallback(ctx, "onTopicHealthChange"):
$JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
proc onConnectionChange(ctx: ptr FFIContext): ConnectionChangeHandler =
return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} =
callEventCallback(ctx, "onConnectionChange"):
$JsonConnectionChangeEvent.new($peerId, peerEvent)
proc onConnectionStatusChange(ctx: ptr FFIContext): ConnectionStatusChangeHandler =
return proc(status: ConnectionStatus) {.async.} =
callEventCallback(ctx, "onConnectionStatusChange"):
$JsonConnectionStatusChangeEvent.new(status)
let appCallbacks = AppCallbacks(
relayHandler: onReceivedMessage(ctx),
topicHealthChangeHandler: onTopicHealthChange(ctx),
connectionChangeHandler: onConnectionChange(ctx),
connectionStatusChangeHandler: onConnectionStatusChange(ctx),
)
ffi.sendRequestToFFIThread(
ctx,
CreateNodeWithCallbacksRequest.ffiNewReq(
callback, userData, configJson, appCallbacks
),
).isOkOr:
let msg = "error in sendRequestToFFIThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
return ctx
proc waku_destroy(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkParams(ctx, callback, userData)
ffi.destroyFFIContext(ctx).isOkOr:
let msg = "libwaku error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
## always need to invoke the callback although we don't retrieve value to the caller
callback(RET_OK, nil, 0, userData)
return RET_OK
# ### End of exported procs
# ################################################################################
# genBindings() MUST be the last top-level call — after every {.ffi.},
# {.ffiCtor.}, {.ffiDtor.} and {.ffiEvent.} pragma (incl. the included files).
genBindings()

View File

@ -1,56 +0,0 @@
import std/[json, strutils]
import logos_delivery/waku/factory/waku_state_info
import tools/confutils/[cli_args, config_option_meta]
proc logosdelivery_get_available_node_info_ids(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## Returns the list of all available node info item ids that
## can be queried with `get_node_info_item`.
requireInitializedNode(ctx, "GetNodeInfoIds"):
return err(errMsg)
return ok($ctx.myLib[].waku.stateInfo.getAllPossibleInfoItemIds())
proc logosdelivery_get_node_info(
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
nodeInfoId: cstring,
) {.ffi.} =
## Returns the content of the node info item with the given id if it exists.
requireInitializedNode(ctx, "GetNodeInfoItem"):
return err(errMsg)
let infoItemIdEnum =
try:
parseEnum[NodeInfoId]($nodeInfoId)
except ValueError:
return err("Invalid node info id: " & $nodeInfoId)
return ok(ctx.myLib[].waku.stateInfo.getNodeInfoItem(infoItemIdEnum))
proc logosdelivery_get_available_configs(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
## Returns information about the accepted config items.
requireInitializedNode(ctx, "GetAvailableConfigs"):
return err(errMsg)
let optionMetas: seq[ConfigOptionMeta] = extractConfigOptionMeta(WakuNodeConf)
var configOptionDetails = newJArray()
# for confField, confValue in fieldPairs(conf):
# defaultConfig[confField] = $confValue
for meta in optionMetas:
configOptionDetails.add(
%*{
meta.fieldName: meta.typeName & "(" & meta.defaultValue & ")", "desc": meta.desc
}
)
var jsonNode = newJObject()
jsonNode["configOptions"] = configOptionDetails
let asString = pretty(jsonNode)
return ok(pretty(jsonNode))

View File

@ -1,91 +0,0 @@
import std/[json]
import chronos, results, ffi
import stew/byteutils
import
logos_delivery/waku/common/base64,
logos_delivery/waku/waku,
logos_delivery/waku/waku_core/topics/content_topic,
logos_delivery/api/types,
../declare_lib
proc logosdelivery_subscribe(
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
contentTopicStr: cstring,
) {.ffi.} =
requireInitializedNode(ctx, "Subscribe"):
return err(errMsg)
# ContentTopic is just a string type alias
let contentTopic = ContentTopic($contentTopicStr)
(await ctx.myLib[].messagingClient.subscribe(contentTopic)).isOkOr:
let errMsg = $error
return err("Subscribe failed: " & errMsg)
return ok("")
proc logosdelivery_unsubscribe(
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
contentTopicStr: cstring,
) {.ffi.} =
requireInitializedNode(ctx, "Unsubscribe"):
return err(errMsg)
# ContentTopic is just a string type alias
let contentTopic = ContentTopic($contentTopicStr)
ctx.myLib[].messagingClient.unsubscribe(contentTopic).isOkOr:
let errMsg = $error
return err("Unsubscribe failed: " & errMsg)
return ok("")
proc logosdelivery_send(
ctx: ptr FFIContext[LogosDelivery],
callback: FFICallBack,
userData: pointer,
messageJson: cstring,
) {.ffi.} =
requireInitializedNode(ctx, "Send"):
return err(errMsg)
## Parse the message JSON and send the message
var jsonNode: JsonNode
try:
jsonNode = parseJson($messageJson)
except Exception as e:
return err("Failed to parse message JSON: " & e.msg)
# Extract content topic
if not jsonNode.hasKey("contentTopic"):
return err("Missing contentTopic field")
# ContentTopic is just a string type alias
let contentTopic = ContentTopic(jsonNode["contentTopic"].getStr())
# Extract payload (expect base64 encoded string)
if not jsonNode.hasKey("payload"):
return err("Missing payload field")
let payloadStr = jsonNode["payload"].getStr()
let payload = base64.decode(Base64String(payloadStr)).valueOr:
return err("invalid payload format: " & error)
# Extract ephemeral flag
let ephemeral = jsonNode.getOrDefault("ephemeral").getBool(false)
# Create message envelope
let envelope = MessageEnvelope.init(
contentTopic = contentTopic, payload = payload, ephemeral = ephemeral
)
# Send the message via the messaging layer's own API.
let requestId = (await ctx.myLib[].messagingClient.send(envelope)).valueOr:
let errMsg = $error
return err("Send failed: " & errMsg)
return ok($requestId)

View File

@ -1,150 +0,0 @@
import std/json
import chronos, chronicles, results, ffi
import
logos_delivery,
logos_delivery/waku/node/waku_node,
logos_delivery/waku/events/message_events,
logos_delivery/api/types,
logos_delivery/waku/events/[message_events, health_events],
tools/confutils/conf_from_json,
../declare_lib,
../json_event
# Add JSON serialization for RequestId
proc `%`*(id: RequestId): JsonNode =
%($id)
registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[LogosDelivery]):
proc(configJson: cstring): Future[Result[string, string]] {.async.} =
let conf = parseNodeConfFromJson($configJson).valueOr:
error "Failed to assemble WakuNodeConf from JSON",
error = error, configJson = $configJson
return err("failed parseNodeConfFromJson " & error)
ctx.myLib[] = (await LogosDelivery.new(conf)).valueOr:
let errMsg = $error
chronicles.error "CreateNodeRequest failed", err = errMsg
return err(errMsg)
return ok("")
proc logosdelivery_destroy(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkParams(ctx, callback, userData)
ffi.destroyFFIContext(ctx).isOkOr:
let msg = "liblogosdelivery error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
## always need to invoke the callback although we don't retrieve value to the caller
callback(RET_OK, nil, 0, userData)
return RET_OK
proc logosdelivery_create_node(
configJson: cstring, callback: FFICallback, userData: pointer
): pointer {.dynlib, exportc, cdecl.} =
initializeLibrary()
if callback.isNil():
echo "error: missing callback in logosdelivery_create_node"
return nil
var ctx = ffi.createFFIContext[LogosDelivery]().valueOr:
let msg = "Error in createFFIContext: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
ctx.userData = userData
ffi.sendRequestToFFIThread(
ctx, CreateNodeRequest.ffiNewReq(callback, userData, configJson)
).isOkOr:
let msg = "error in sendRequestToFFIThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
# free allocated resources as they won't be available
ffi.destroyFFIContext(ctx).isOkOr:
chronicles.error "Error in destroyFFIContext after sendRequestToFFIThread during creation",
err = $error
return nil
return ctx
proc logosdelivery_start_node(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
requireInitializedNode(ctx, "START_NODE"):
return err(errMsg)
# setting up outgoing event listeners
let sentListener = MessageSentEvent.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: MessageSentEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageSent"):
$newJsonEvent("message_sent", event),
).valueOr:
chronicles.error "MessageSentEvent.listen failed", err = $error
return err("MessageSentEvent.listen failed: " & $error)
let errorListener = MessageErrorEvent.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: MessageErrorEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageError"):
$newJsonEvent("message_error", event),
).valueOr:
chronicles.error "MessageErrorEvent.listen failed", err = $error
return err("MessageErrorEvent.listen failed: " & $error)
let propagatedListener = MessagePropagatedEvent.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessagePropagated"):
$newJsonEvent("message_propagated", event),
).valueOr:
chronicles.error "MessagePropagatedEvent.listen failed", err = $error
return err("MessagePropagatedEvent.listen failed: " & $error)
let receivedListener = MessageReceivedEvent.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
callEventCallback(ctx, "onMessageReceived"):
$newJsonEvent("message_received", event),
).valueOr:
chronicles.error "MessageReceivedEvent.listen failed", err = $error
return err("MessageReceivedEvent.listen failed: " & $error)
let ConnectionStatusChangeListener = EventConnectionStatusChange.listen(
ctx.myLib[].waku.brokerCtx,
proc(event: EventConnectionStatusChange) {.async: (raises: []).} =
callEventCallback(ctx, "onConnectionStatusChange"):
$newJsonEvent("connection_status_change", event),
).valueOr:
chronicles.error "ConnectionStatusChange.listen failed", err = $error
return err("ConnectionStatusChange.listen failed: " & $error)
(await ctx.myLib[].start()).isOkOr:
let errMsg = $error
chronicles.error "START_NODE failed", err = errMsg
return err("failed to start: " & errMsg)
return ok("")
proc logosdelivery_stop_node(
ctx: ptr FFIContext[LogosDelivery], callback: FFICallBack, userData: pointer
) {.ffi.} =
requireInitializedNode(ctx, "STOP_NODE"):
return err(errMsg)
await MessageErrorEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await MessageSentEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await MessagePropagatedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await MessageReceivedEvent.dropAllListeners(ctx.myLib[].waku.brokerCtx)
await EventConnectionStatusChange.dropAllListeners(ctx.myLib[].waku.brokerCtx)
(await ctx.myLib[].stop()).isOkOr:
let errMsg = $error
chronicles.error "STOP_NODE failed", err = errMsg
return err("failed to stop: " & errMsg)
return ok("")

View File

@ -0,0 +1,9 @@
proc messaging_send*(
self: LogosDelivery, contentTopic: string, payload: seq[byte], ephemeral: bool
): Future[Result[string, string]] {.ffi.} =
let envelope = MessageEnvelope.init(
contentTopic = ContentTopic(contentTopic), payload = payload, ephemeral = ephemeral
)
let requestId = (await self.messagingClient.send(envelope)).valueOr:
return err(error)
return ok($requestId)

View File

@ -0,0 +1,13 @@
proc subscribe*(
self: LogosDelivery, contentTopic: string
): Future[Result[string, string]] {.ffi.} =
(await self.messagingClient.subscribe(ContentTopic(contentTopic))).isOkOr:
return err(error)
return ok("")
proc unsubscribe*(
self: LogosDelivery, contentTopic: string
): Future[Result[string, string]] {.ffi.} =
self.messagingClient.unsubscribe(ContentTopic(contentTopic)).isOkOr:
return err(error)
return ok("")

View File

@ -61,7 +61,7 @@ requires "nim >= 2.2.4",
# Packages not on nimble (use git URLs)
requires "https://github.com/logos-messaging/nim-ffi#v0.1.3"
requires "https://github.com/logos-messaging/nim-ffi#v0.2.0-rc.3"
requires "https://github.com/logos-messaging/nim-sds.git#b12f5ee07c5b764303b51fb948b32a4ade1de3b5"

View File

@ -643,18 +643,19 @@
}
},
"ffi": {
"version": "0.1.3",
"vcsRevision": "06111de155253b34e47ed2aaed1d61d08d62cc1b",
"version": "#v0.2.0-rc.3",
"vcsRevision": "8f15afce5c377a0e5ee53c35b228025b903604ea",
"url": "https://github.com/logos-messaging/nim-ffi",
"downloadMethod": "git",
"dependencies": [
"nim",
"chronos",
"chronicles",
"taskpools"
"taskpools",
"cbor_serialization"
],
"checksums": {
"sha1": "6f9d49375ea1dc71add55c72ac80a808f238e5b0"
"sha1": "7f00eaaa01ce59a0c1603e6fb8757ba712f9a53e"
}
},
"boringssl": {