Merge 05595e2d92c6618d5d75ed8dc0fa5ef3ebda9376 into 07491b28e6f385215be5551ed860c14256ace95c

This commit is contained in:
Ivan FB 2026-06-12 10:16:04 -03:00 committed by GitHub
commit ef676158d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 276 additions and 448 deletions

View File

@ -1,8 +1,11 @@
// Generated manually and inspired by the one generated by the Nim Compiler.
// In order to see the header file generated by Nim just run `make libsds`
// from the root repo folder and the header should be created in
// nimcache/release/libsds/libsds.h
// C API for libsds, built on the nim-ffi framework (v0.2.0+).
//
// Requests, responses and events are marshalled as CBOR. Request payloads are
// passed as a (reqCbor, reqCborLen) byte buffer; results and events are
// delivered to the callback as a CBOR buffer (msg, len). Each request/response
// struct and event payload is defined in library/libsds.nim. Events are
// wrapped in a CBOR envelope { eventType: <wire name>, payload: <struct> }.
#ifndef __libsds__
#define __libsds__
@ -18,51 +21,71 @@
extern "C" {
#endif
// Result/event callback. `msg` is the CBOR payload of length `len`.
// callerRet is one of the RET_* codes above.
typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData);
// Synchronous provider invoked by SDS-R to fetch a retrieval hint for a
// message id. The implementation allocates `*hint` (and sets `*hintLen`); the
// library takes ownership and frees it with deallocShared. Registered via
// sds_set_retrieval_hint_provider (see below).
typedef void (*SdsRetrievalHintProvider) (const char* messageId, char** hint, size_t* hintLen, void* userData);
// --- Core API Functions ---
// --- Lifecycle -------------------------------------------------------------
// Create a context + ReliabilityManager. reqCbor encodes SdsConfig
// { participantId: tstr } (empty participantId disables SDS-R). Returns the
// context handle, or NULL on failure; the callback also fires on completion.
void* sds_create(const uint8_t* reqCbor, size_t reqCborLen, SdsCallBack callback, void* userData);
// Tear down the context created by sds_create. Blocks until the worker and
// watchdog threads have joined.
int sds_destroy(void* ctx);
void* SdsNewReliabilityManager(SdsCallBack callback, void* userData);
// --- Events ----------------------------------------------------------------
// Subscribe `callback` to an event by wire name and receive a stable listener
// id (non-zero). Event wire names: "message_ready", "message_sent",
// "missing_dependencies", "periodic_sync", "repair_ready". Subscribe to each
// event separately. Payloads arrive as CBOR { eventType, payload }.
uint64_t sds_add_event_listener(void* ctx, const char* eventName, SdsCallBack callback, void* userData);
void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData);
// Remove a listener by id. Returns 0 on success, non-zero if not found.
int sds_remove_event_listener(void* ctx, uint64_t listenerId);
void SdsSetRetrievalHintProvider(void* ctx, SdsRetrievalHintProvider callback, void* userData);
// Register the SDS-R retrieval-hint provider. reqCbor encodes
// SdsHintProviderRequest { callbackAddr: uint, userDataAddr: uint } — the
// SdsRetrievalHintProvider function pointer and its user-data as integer
// addresses.
int sds_set_retrieval_hint_provider(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen);
int SdsCleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData);
int SdsResetReliabilityManager(void* ctx, SdsCallBack callback, void* userData);
// --- Core API Functions ----------------------------------------------------
// Each takes a CBOR-encoded request buffer; the result is delivered to
// `callback` as CBOR.
int SdsWrapOutgoingMessage(void* ctx,
void* message,
size_t messageLen,
const char* messageId,
const char* channelId,
SdsCallBack callback,
void* userData);
// reqCbor: SdsWrapRequest { message: bytes, messageId: tstr, channelId: tstr }
// result: SdsWrapResponse { message: bytes }
int sds_wrap_outgoing_message(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen);
int SdsUnwrapReceivedMessage(void* ctx,
void* message,
size_t messageLen,
SdsCallBack callback,
void* userData);
// reqCbor: SdsUnwrapRequest { message: bytes }
// result: SdsUnwrapResponse { message: bytes, channelId: tstr,
// missingDeps: [{ messageId: tstr, retrievalHint: bytes }] }
int sds_unwrap_received_message(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen);
int SdsMarkDependenciesMet(void* ctx,
char** messageIDs,
size_t count,
const char* channelId,
SdsCallBack callback,
void* userData);
// reqCbor: SdsMarkDependenciesRequest { messageIds: [tstr], channelId: tstr }
int sds_mark_dependencies_met(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen);
int SdsStartPeriodicTasks(void* ctx, SdsCallBack callback, void* userData);
// reqCbor: empty/unit payload (no fields).
int sds_reset(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen);
// reqCbor: empty/unit payload (no fields).
int sds_start_periodic_tasks(void* ctx, SdsCallBack callback, void* userData, const uint8_t* reqCbor, size_t reqCborLen);
#ifdef __cplusplus
}
#endif
#endif /* __libsds__ */
#endif /* __libsds__ */

View File

@ -1,469 +1,252 @@
import std/[strutils, sequtils, json, base64, locks]
## C-compatible FFI wrapper around the SDS ReliabilityManager.
##
## Built on nim-ffi (v0.2.0+): `declareLibrary` emits the bootstrap plus the
## event-listener ABI (`sds_add_event_listener` / `sds_remove_event_listener`);
## `{.ffiCtor.}`/`{.ffi.}`/`{.ffiDtor.}` generate the C entry points; and
## `{.ffiEvent.}` declares library-initiated events. Requests, responses and
## events are marshalled as CBOR (see library/libsds.h). Exported C names are
## snake_case. The Go bindings (sds-go-bindings) must match this API.
##
## The one hand-written export is `sds_set_retrieval_hint_provider`: it takes a
## C function pointer (no CBOR representation), so it dispatches a request that
## stores the provider in a worker-thread thread-local.
import std/[sequtils]
import ffi
import sds
import ./events/[
json_message_ready_event, json_message_sent_event, json_missing_dependencies_event,
json_periodic_sync_event, json_repair_ready_event,
]
# Emit the library bootstrap: the {.exported.}/{.callback.} pragmas, the
# `-fPIC`/soname linker flags, the `libsdsNimMain` import and the
# `initializeLibrary()` proc the exported entry points call on every hop.
declareLibraryBase("sds")
# C callback typedefs (mirrors libsds.h). `SdsCallBack` is structurally the
# nim-ffi `FFICallBack`; the alias keeps the exported signatures readable.
type SdsCallBack* = FFICallBack
# Bootstrap + sds_add_event_listener / sds_remove_event_listener.
declareLibrary("sds", ReliabilityManager)
type SdsRetrievalHintProvider* = proc(
messageId: cstring, hint: ptr cstring, hintLen: ptr csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
# One pool per library type; the macros that would normally declare it
# (ffiCtor/ffiDtor) are not used here because we hand-write the entry points
# to preserve the exact C ABI, so we declare it explicitly.
var ReliabilityManagerFFIPool: FFIContextPool[ReliabilityManager]
# registerReqFFI inspects each request field's type via `$node`, which only
# handles plain identifiers — a bracketed `SharedSeq[byte]` makes it choke. The
# aliases give the generated request structs non-bracketed field types.
type
SdsSharedBytes = SharedSeq[byte]
SdsSharedCstrs = SharedSeq[cstring]
# Active retrieval-hint provider, per worker thread (one thread per context).
# Set by sds_set_retrieval_hint_provider through a dispatched request so the
# write lands on the worker thread, where the manager's hint closure reads it.
var sdsRetrievalHintCb {.threadvar.}: pointer
var sdsRetrievalHintUserData {.threadvar.}: pointer
################################################################################
### Retrieval-hint provider registry
###
### The retrieval-hint provider is a synchronous request/response callback
### (the C side returns bytes inline), so it does not fit the fire-and-forget
### event model. nim-ffi's FFIContext has no slot for it, so we keep a small
### per-context registry here. A fixed array of plain (non-GC) records keeps
### the lookup callable from the {.gcsafe.} hint closure running on the FFI
### thread.
### CBOR-marshalled request/response types
type RetrievalHintSlot = object
ctx: pointer
cb: pointer
userData: pointer
type SdsConfig* {.ffi.} = object
participantId: string ## empty disables SDS-R (see newReliabilityManager)
var retrievalHintSlots: array[MaxFFIContexts, RetrievalHintSlot]
var retrievalHintsLock: Lock
retrievalHintsLock.initLock()
type SdsWrapRequest* {.ffi.} = object
message: seq[byte]
messageId: string
channelId: string
proc setRetrievalHint(ctx: pointer, cb: pointer, userData: pointer) =
withLock retrievalHintsLock:
var free = -1
for i in 0 ..< MaxFFIContexts:
if retrievalHintSlots[i].ctx == ctx:
retrievalHintSlots[i] = RetrievalHintSlot(ctx: ctx, cb: cb, userData: userData)
return
if free < 0 and retrievalHintSlots[i].ctx.isNil:
free = i
if free >= 0:
retrievalHintSlots[free] = RetrievalHintSlot(ctx: ctx, cb: cb, userData: userData)
type SdsWrapResponse* {.ffi.} = object
message: seq[byte]
proc getRetrievalHint(ctx: pointer): tuple[cb: pointer, userData: pointer] {.gcsafe.} =
withLock retrievalHintsLock:
for i in 0 ..< MaxFFIContexts:
if retrievalHintSlots[i].ctx == ctx:
return (retrievalHintSlots[i].cb, retrievalHintSlots[i].userData)
return (nil, nil)
type SdsUnwrapRequest* {.ffi.} = object
message: seq[byte]
proc clearRetrievalHint(ctx: pointer) =
withLock retrievalHintsLock:
for i in 0 ..< MaxFFIContexts:
if retrievalHintSlots[i].ctx == ctx:
retrievalHintSlots[i] = RetrievalHintSlot()
return
type SdsMissingDep* {.ffi.} = object
messageId: string
retrievalHint: seq[byte]
type SdsUnwrapResponse* {.ffi.} = object
message: seq[byte]
channelId: string
missingDeps: seq[SdsMissingDep]
type SdsMarkDependenciesRequest* {.ffi.} = object
messageIds: seq[string]
channelId: string
################################################################################
### Shared-memory copy helpers
### Library-initiated events
###
### Request payloads carrying binary/pointer data must be deep-copied into
### shared memory on the caller thread, because the FFI thread acks receipt
### before it reads the payload — the caller may free its buffer in between.
### cstring fields are deep-copied by the generated `ffiNewReq`; raw byte and
### `char**` arrays are not, so we copy them here.
### Each {.ffiEvent.} proc is an emitter: calling it from a worker-thread
### handler dispatches a CBOR EventEnvelope to every listener subscribed (via
### sds_add_event_listener) to the matching wire name.
proc copyToSharedSeqByte(p: pointer, len: int): SharedSeq[byte] =
if p.isNil or len <= 0:
return (cast[ptr UncheckedArray[byte]](nil), 0)
let data = allocShared(len)
copyMem(data, p, len)
return (cast[ptr UncheckedArray[byte]](data), len)
type SdsMessageReadyPayload* {.ffi.} = object
messageId: string
channelId: string
proc copyToSharedSeqCstr(p: pointer, count: int): SharedSeq[cstring] =
if p.isNil or count <= 0:
return (cast[ptr UncheckedArray[cstring]](nil), 0)
let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * count))
let src = cast[ptr UncheckedArray[cstring]](p)
for i in 0 ..< count:
data[i] = src[i].alloc()
return (data, count)
type SdsMessageSentPayload* {.ffi.} = object
messageId: string
channelId: string
proc freeSharedSeqCstr(s: var SharedSeq[cstring]) =
if not s.data.isNil():
for i in 0 ..< s.len:
if not s.data[i].isNil:
deallocShared(s.data[i])
deallocShared(s.data)
s.len = 0
type SdsMissingDependenciesPayload* {.ffi.} = object
messageId: string
channelId: string
missingDeps: seq[SdsMissingDep]
type SdsPeriodicSyncPayload* {.ffi.} = object
placeholder: bool ## events need a payload type; periodic sync carries no data
type SdsRepairReadyPayload* {.ffi.} = object
message: seq[byte]
channelId: string
proc emitMessageReady*(p: SdsMessageReadyPayload) {.ffiEvent: "message_ready".}
proc emitMessageSent*(p: SdsMessageSentPayload) {.ffiEvent: "message_sent".}
proc emitMissingDependencies*(
p: SdsMissingDependenciesPayload
) {.ffiEvent: "missing_dependencies".}
proc emitPeriodicSync*(p: SdsPeriodicSyncPayload) {.ffiEvent: "periodic_sync".}
proc emitRepairReady*(p: SdsRepairReadyPayload) {.ffiEvent: "repair_ready".}
################################################################################
### Event callbacks
### Constructor — creates the FFI context and the ReliabilityManager.
###
### These build the AppCallbacks closures handed to the ReliabilityManager.
### They run on the FFI worker thread and forward JSON event payloads to the
### C callback registered via SdsSetEventCallback (stored on the context).
### The AppCallbacks closures run on the worker thread; they build typed
### payloads and fire the {.ffiEvent.} emitters, which reach the C listeners.
proc onMessageReady(ctx: ptr FFIContext[ReliabilityManager]): MessageReadyCallback =
return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
callEventCallback(ctx, "onMessageReady"):
$JsonMessageReadyEvent.new(messageId, channelId)
proc sdsCreate*(
config: SdsConfig
): Future[Result[ReliabilityManager, string]] {.ffiCtor.} =
let rm = newReliabilityManager(participantId = config.participantId.SdsParticipantID).valueOr:
error "Failed creating reliability manager", error = error
return err("Failed creating reliability manager: " & $error)
proc onMessageSent(ctx: ptr FFIContext[ReliabilityManager]): MessageSentCallback =
return proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
callEventCallback(ctx, "onMessageSent"):
$JsonMessageSentEvent.new(messageId, channelId)
let messageReadyCb = proc(
messageId: SdsMessageID, channelId: SdsChannelID
) {.gcsafe.} =
{.cast(gcsafe).}:
emitMessageReady(
SdsMessageReadyPayload(messageId: $messageId, channelId: $channelId)
)
proc onMissingDependencies(
ctx: ptr FFIContext[ReliabilityManager]
): MissingDependenciesCallback =
return proc(
let messageSentCb = proc(
messageId: SdsMessageID, channelId: SdsChannelID
) {.gcsafe.} =
{.cast(gcsafe).}:
emitMessageSent(
SdsMessageSentPayload(messageId: $messageId, channelId: $channelId)
)
let missingDependenciesCb = proc(
messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.} =
callEventCallback(ctx, "onMissingDependencies"):
$JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId)
{.cast(gcsafe).}:
let deps = missingDeps.mapIt(
SdsMissingDep(messageId: $it.messageId, retrievalHint: it.retrievalHint)
)
emitMissingDependencies(
SdsMissingDependenciesPayload(
messageId: $messageId, channelId: $channelId, missingDeps: deps
)
)
proc onPeriodicSync(ctx: ptr FFIContext[ReliabilityManager]): PeriodicSyncCallback =
return proc() {.gcsafe.} =
callEventCallback(ctx, "onPeriodicSync"):
$JsonPeriodicSyncEvent.new()
let periodicSyncCb = proc() {.gcsafe.} =
{.cast(gcsafe).}:
emitPeriodicSync(SdsPeriodicSyncPayload(placeholder: false))
proc onRepairReady(ctx: ptr FFIContext[ReliabilityManager]): RepairReadyCallback =
return proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} =
callEventCallback(ctx, "onRepairReady"):
$JsonRepairReadyEvent.new(message, channelId)
let repairReadyCb = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} =
{.cast(gcsafe).}:
emitRepairReady(SdsRepairReadyPayload(message: message, channelId: $channelId))
proc onRetrievalHint(ctx: ptr FFIContext[ReliabilityManager]): RetrievalHintProvider =
return proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} =
let (cb, userData) = getRetrievalHint(cast[pointer](ctx))
if cb.isNil():
let retrievalHintProvider = proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} =
if sdsRetrievalHintCb.isNil():
return @[]
var hint: cstring
var hintLen: csize_t
cast[SdsRetrievalHintProvider](cb)(
messageId.cstring, addr hint, addr hintLen, userData
cast[SdsRetrievalHintProvider](sdsRetrievalHintCb)(
messageId.cstring, addr hint, addr hintLen, sdsRetrievalHintUserData
)
if not hint.isNil() and hintLen > 0:
var hintBytes = newSeq[byte](hintLen)
copyMem(addr hintBytes[0], hint, hintLen)
deallocShared(hint)
return hintBytes
return @[]
await rm.setCallbacks(
messageReadyCb, messageSentCb, missingDependenciesCb, periodicSyncCb,
retrievalHintProvider, repairReadyCb,
)
return ok(rm)
################################################################################
### Request handlers (executed on the FFI worker thread)
### Async methods — each runs its body on the worker thread.
registerReqFFI(SdsCreateRmReq, ctx: ptr FFIContext[ReliabilityManager]):
proc(): Future[Result[string, string]] {.async.} =
# TODO: thread `participantId` through SdsNewReliabilityManager FFI input
# and remove this hardcoded "". Empty id silently disables SDS-R; this is
# acceptable as a temporary FFI-only fallback until sds-go-bindings and
# logos-delivery's C-side caller are updated to supply the identity.
let rm = newReliabilityManager(participantId = "".SdsParticipantID).valueOr:
error "Failed creating reliability manager", error = error
return err("Failed creating reliability manager: " & $error)
await rm.setCallbacks(
onMessageReady(ctx), onMessageSent(ctx), onMissingDependencies(ctx),
onPeriodicSync(ctx), onRetrievalHint(ctx), onRepairReady(ctx),
proc sdsWrapOutgoingMessage*(
rm: ReliabilityManager, req: SdsWrapRequest
): Future[Result[SdsWrapResponse, string]] {.ffi.} =
let wrapped = (
await wrapOutgoingMessage(
rm, req.message, req.messageId.SdsMessageID, req.channelId.SdsChannelID
)
).valueOr:
error "WRAP_MESSAGE failed", error = error
return err("error processing wrap request: " & $error)
return ok(SdsWrapResponse(message: wrapped))
ctx.myLib[] = rm
return ok("")
proc sdsUnwrapReceivedMessage*(
rm: ReliabilityManager, req: SdsUnwrapRequest
): Future[Result[SdsUnwrapResponse, string]] {.ffi.} =
let (unwrapped, missingDeps, channelId) = (
await unwrapReceivedMessage(rm, req.message)
).valueOr:
return err("error processing unwrap request: " & $error)
registerReqFFI(SdsResetRmReq, ctx: ptr FFIContext[ReliabilityManager]):
proc(): Future[Result[string, string]] {.async.} =
(await resetReliabilityManager(ctx.myLib[])).isOkOr:
error "RESET_RELIABILITY_MANAGER failed", error = error
return err("error processing RESET_RELIABILITY_MANAGER request: " & $error)
return ok("")
registerReqFFI(SdsStartPeriodicTasksReq, ctx: ptr FFIContext[ReliabilityManager]):
proc(): Future[Result[string, string]] {.async.} =
ctx.myLib[].startPeriodicTasks()
return ok("")
registerReqFFI(SdsWrapMessageReq, ctx: ptr FFIContext[ReliabilityManager]):
proc(
message: SdsSharedBytes, messageId: cstring, channelId: cstring
): Future[Result[string, string]] {.async.} =
var msg = message
defer:
deallocSharedSeq(msg)
let wrappedMessage = (
await wrapOutgoingMessage(ctx.myLib[], message.toSeq(), $messageId, $channelId)
).valueOr:
error "WRAP_MESSAGE failed", error = error
return err("error processing WRAP_MESSAGE request: " & $error)
# returns a comma-separated string of bytes
return ok(wrappedMessage.mapIt($it).join(","))
registerReqFFI(SdsUnwrapMessageReq, ctx: ptr FFIContext[ReliabilityManager]):
proc(message: SdsSharedBytes): Future[Result[string, string]] {.async.} =
var msg = message
defer:
deallocSharedSeq(msg)
let (unwrappedMessage, missingDeps, extractedChannelId) = (
await unwrapReceivedMessage(ctx.myLib[], message.toSeq())
).valueOr:
return err("error processing UNWRAP_MESSAGE request: " & $error)
# return the result as a json string
var node = newJObject()
node["message"] = %*unwrappedMessage
node["channelId"] = %*extractedChannelId
var missingDepsNode = newJArray()
for dep in missingDeps:
var depNode = newJObject()
depNode["messageId"] = %*dep.messageId
depNode["retrievalHint"] = %*encode(dep.retrievalHint)
missingDepsNode.add(depNode)
node["missingDeps"] = missingDepsNode
return ok($node)
registerReqFFI(SdsMarkDepsReq, ctx: ptr FFIContext[ReliabilityManager]):
proc(
messageIds: SdsSharedCstrs, channelId: cstring
): Future[Result[string, string]] {.async.} =
var ids = messageIds
defer:
freeSharedSeqCstr(ids)
let messageIdSeq = ids.toSeq().mapIt($it)
(await markDependenciesMet(ctx.myLib[], messageIdSeq, $channelId)).isOkOr:
error "MARK_DEPENDENCIES_MET failed", error = error
return err("error processing MARK_DEPENDENCIES_MET request: " & $error)
return ok("")
################################################################################
### Dispatch helper
###
### Sends a request to the FFI worker thread and returns RET_OK/RET_ERR,
### reporting any failure through the callback. The try/except keeps the
### exported entry points `raises: []` (sendRequestToFFIThread can raise),
### which `processReq` alone would not guarantee.
template dispatchReq(
ctx: untyped, callback: FFICallBack, userData: pointer, reqExpr: untyped
) =
let sendRes =
try:
ffi_context.sendRequestToFFIThread(ctx, reqExpr)
except Exception as exc:
Result[void, string].err("sendRequestToFFIThread exception: " & exc.msg)
if sendRes.isErr():
let m = "libsds error: " & sendRes.error
callback(RET_ERR, unsafeAddr m[0], cast[csize_t](m.len), userData)
return RET_ERR
return RET_OK
################################################################################
### Exported C entry points (called from the application thread)
###
### Signatures must match library/libsds.h exactly. Each one validates the
### context against the pool (rejecting nil/dangling pointers at the boundary),
### checks the callback, deep-copies any pointer payloads into shared memory,
### then dispatches a request to the FFI worker thread.
proc SdsNewReliabilityManager(
callback: FFICallBack, userData: pointer
): pointer {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if isNil(callback):
echo "error: missing callback in SdsNewReliabilityManager"
return nil
let ctx = ReliabilityManagerFFIPool.createFFIContext().valueOr:
let msg = "Error creating SDS FFI context: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return nil
let sendRes =
try:
ffi_context.sendRequestToFFIThread(ctx, SdsCreateRmReq.ffiNewReq(callback, userData))
except Exception as exc:
Result[void, string].err("sendRequestToFFIThread exception: " & exc.msg)
if sendRes.isErr():
let msg = "error creating reliability manager: " & sendRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
discard ReliabilityManagerFFIPool.destroyFFIContext(ctx)
return nil
return cast[pointer](ctx)
proc SdsSetEventCallback(
ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer
) {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
echo "error: invalid context in SdsSetEventCallback"
return
ctx[].callbackState.callback = cast[pointer](callback)
ctx[].callbackState.userData = userData
proc SdsSetRetrievalHintProvider(
ctx: ptr FFIContext[ReliabilityManager],
callback: SdsRetrievalHintProvider,
userData: pointer,
) {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
echo "error: invalid context in SdsSetRetrievalHintProvider"
return
setRetrievalHint(cast[pointer](ctx), cast[pointer](callback), userData)
proc SdsCleanupReliabilityManager(
ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
return RET_ERR
if isNil(callback):
return RET_MISSING_CALLBACK
clearRetrievalHint(cast[pointer](ctx))
let res = ReliabilityManagerFFIPool.destroyFFIContext(ctx)
if res.isErr():
let msg = "error cleaning up reliability manager: " & res.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
callback(RET_OK, nil, 0, userData)
return RET_OK
proc SdsResetReliabilityManager(
ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
return RET_ERR
if isNil(callback):
return RET_MISSING_CALLBACK
dispatchReq(ctx, callback, userData, SdsResetRmReq.ffiNewReq(callback, userData))
proc SdsWrapOutgoingMessage(
ctx: ptr FFIContext[ReliabilityManager],
message: pointer,
messageLen: csize_t,
messageId: cstring,
channelId: cstring,
callback: FFICallBack,
userData: pointer,
): cint {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
return RET_ERR
if isNil(callback):
return RET_MISSING_CALLBACK
if message == nil and messageLen > 0:
let msg = "libsds error: message pointer is NULL but length > 0"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
if messageId == nil:
let msg = "libsds error: message ID pointer is NULL"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
if channelId == nil:
let msg = "libsds error: channel ID pointer is NULL"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
if $channelId == "":
let msg = "libsds error: channel ID is empty string"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
let sharedMsg = copyToSharedSeqByte(message, messageLen.int)
dispatchReq(
ctx, callback, userData,
SdsWrapMessageReq.ffiNewReq(callback, userData, sharedMsg, messageId, channelId),
let deps = missingDeps.mapIt(
SdsMissingDep(messageId: $it.messageId, retrievalHint: it.retrievalHint)
)
return ok(
SdsUnwrapResponse(message: unwrapped, channelId: $channelId, missingDeps: deps)
)
proc SdsUnwrapReceivedMessage(
ctx: ptr FFIContext[ReliabilityManager],
message: pointer,
messageLen: csize_t,
callback: FFICallBack,
userData: pointer,
): cint {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
return RET_ERR
if isNil(callback):
return RET_MISSING_CALLBACK
proc sdsMarkDependenciesMet*(
rm: ReliabilityManager, req: SdsMarkDependenciesRequest
): Future[Result[string, string]] {.ffi.} =
let messageIds = req.messageIds.mapIt(it.SdsMessageID)
(await markDependenciesMet(rm, messageIds, req.channelId.SdsChannelID)).isOkOr:
error "MARK_DEPENDENCIES_MET failed", error = error
return err("error processing mark-dependencies request: " & $error)
return ok("")
if message == nil and messageLen > 0:
let msg = "libsds error: message pointer is NULL but length > 0"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
proc sdsReset*(rm: ReliabilityManager): Future[Result[string, string]] {.ffi.} =
(await resetReliabilityManager(rm)).isOkOr:
error "RESET failed", error = error
return err("error processing reset request: " & $error)
return ok("")
let sharedMsg = copyToSharedSeqByte(message, messageLen.int)
dispatchReq(ctx, callback, userData, SdsUnwrapMessageReq.ffiNewReq(callback, userData, sharedMsg))
proc sdsStartPeriodicTasks*(
rm: ReliabilityManager
): Future[Result[string, string]] {.ffi.} =
# The empty await forces the macro down its async path so the body runs on the
# worker thread — startPeriodicTasks schedules futures on that thread's loop.
await sleepAsync(chronos.milliseconds(0))
rm.startPeriodicTasks()
return ok("")
proc SdsMarkDependenciesMet(
ctx: ptr FFIContext[ReliabilityManager],
messageIds: pointer,
count: csize_t,
channelId: cstring,
callback: FFICallBack,
userData: pointer,
): cint {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
return RET_ERR
if isNil(callback):
return RET_MISSING_CALLBACK
################################################################################
### Destructor — runs library cleanup then tears down the FFI context.
if messageIds == nil and count > 0:
let msg = "libsds error: MessageIDs pointer is NULL but count > 0"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
proc sdsDestroy*(rm: ReliabilityManager) {.ffiDtor.} =
discard
if channelId == nil:
let msg = "libsds error: channel ID pointer is NULL"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
################################################################################
### Retrieval-hint provider.
###
### The provider is a C function pointer, which has no CBOR representation, so
### it is passed as integer addresses. The body runs on the worker thread (the
### empty await forces the async path) and stores the pointers in the
### thread-local that sdsCreate's hint closure reads. The caller passes the
### function pointer and user-data as uint64 addresses.
if $channelId == "":
let msg = "libsds error: channel ID is empty string"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData)
return RET_ERR
type SdsHintProviderRequest* {.ffi.} = object
callbackAddr: uint64
userDataAddr: uint64
let sharedIds = copyToSharedSeqCstr(messageIds, count.int)
dispatchReq(
ctx, callback, userData,
SdsMarkDepsReq.ffiNewReq(callback, userData, sharedIds, channelId),
)
proc sdsSetRetrievalHintProvider*(
rm: ReliabilityManager, req: SdsHintProviderRequest
): Future[Result[string, string]] {.ffi.} =
discard rm
await sleepAsync(chronos.milliseconds(0))
sdsRetrievalHintCb = cast[pointer](req.callbackAddr)
sdsRetrievalHintUserData = cast[pointer](req.userDataAddr)
return ok("")
proc SdsStartPeriodicTasks(
ctx: ptr FFIContext[ReliabilityManager], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
return RET_ERR
if isNil(callback):
return RET_MISSING_CALLBACK
dispatchReq(ctx, callback, userData, SdsStartPeriodicTasksReq.ffiNewReq(callback, userData))
# Emit binding metadata (no-op unless -d:ffiGenBindings). Must follow every
# {.ffi.}/{.ffiCtor.}/{.ffiDtor.}/{.ffiEvent.} annotation.
genBindings()

View File

@ -315,17 +315,32 @@
}
},
"ffi": {
"version": "0.1.4",
"vcsRevision": "fb25f069d2dfae2b543d79d2c1a81f197de22a2b",
"version": "0.2.0",
"vcsRevision": "f96a5b158add9c321e33ff804f2275a5314a501b",
"url": "https://github.com/logos-messaging/nim-ffi",
"downloadMethod": "git",
"dependencies": [
"chronos",
"chronicles",
"taskpools"
"taskpools",
"cbor_serialization"
],
"checksums": {
"sha1": "4a5d4020a40106fa2a698d5fe975b9a8ba961f91"
"sha1": "46fad75cc79b7ae6eabfadb8fc3e68764dde89a5"
}
},
"cbor_serialization": {
"version": "0.3.0",
"vcsRevision": "1664160e04d153573373afddc552b9cbf6fbe4dc",
"url": "https://github.com/vacp2p/nim-cbor-serialization",
"downloadMethod": "git",
"dependencies": [
"serialization",
"stew",
"results"
],
"checksums": {
"sha1": "ab126eae09a6e39c72972a6a0b83cb06a2ffe8f0"
}
}
},

View File

@ -166,8 +166,15 @@
ffi = pkgs.fetchgit {
url = "https://github.com/logos-messaging/nim-ffi";
rev = "fb25f069d2dfae2b543d79d2c1a81f197de22a2b";
sha256 = "0zkjnrm2yjlw27q99kv2x8ll61mbz4nr0cvmyq0csydh43c08k0p";
rev = "f96a5b158add9c321e33ff804f2275a5314a501b";
sha256 = "1hcv1k3c18rhg2nrndld2b6xx23nfnlcfkm0bidqha4by4hzn9lr";
fetchSubmodules = true;
};
cbor_serialization = pkgs.fetchgit {
url = "https://github.com/vacp2p/nim-cbor-serialization";
rev = "1664160e04d153573373afddc552b9cbf6fbe4dc";
sha256 = "0c1rj4fk0fcqvsf0yqhxvm8h10aww75gi4yfsjhlczh88ypywii2";
fetchSubmodules = true;
};

View File

@ -16,7 +16,7 @@ requires "stew"
requires "stint"
requires "metrics"
requires "results"
requires "https://github.com/logos-messaging/nim-ffi >= 0.1.4"
requires "https://github.com/logos-messaging/nim-ffi#f96a5b158add9c321e33ff804f2275a5314a501b"
proc buildLibrary(
outLibNameAndExt: string,