nim-sds/library/libsds.nim
Ivan FB e3b49d18a2
feat: adapt libsds to nim-ffi 0.2 CBOR ABI
Rebuild the C wrapper on nim-ffi 0.2's high-level macros (declareLibrary +
{.ffiCtor.}/{.ffi.}/{.ffiEvent.}), which marshal parameters and results as
CBOR and expose snake_case sds_* entry points. Replaces the previous
hand-written positional/JSON ABI.

- request/response objects are {.ffi.} types (CBOR); the unwrap response is
  a proper nested object (message, channelId, missingDeps) instead of
  hand-built JSON, and retrievalHint travels as raw bytes (no base64).
- events become {.ffiEvent.} procs (message_ready, message_sent,
  missing_dependencies, periodic_sync, repair_ready), delivered to the host
  via sds_add_event_listener.
- the retrieval-hint provider stays hand-written (a C function pointer has
  no CBOR form); its pointers travel as uint64 through the request channel,
  and the provided buffer is freed with libc free to match the host's
  malloc (Go's C.CBytes).
- pin nim-ffi at the fix/foreign-host-concurrency-v0.2 branch (recycle pool
  + foreign-thread GC fixes) and add the cbor_serialization dependency.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 23:50:13 +02:00

283 lines
10 KiB
Nim

## C-compatible FFI wrapper around the SDS ReliabilityManager.
##
## Built on the `nim-ffi` 0.2 package's high-level macros: `declareLibrary`
## emits the bootstrap plus the per-event listener registry
## (`sds_add_event_listener` / `sds_remove_event_listener`);
## `{.ffiCtor.}`/`{.ffi.}`/`{.ffiDtor.}` generate the C entry points,
## marshalling parameters and return values as CBOR; `{.ffiEvent.}` declares
## the library-initiated events (also CBOR). Exported C names are snake_case
## (`sds_wrap_outgoing_message`, …); see `library/libsds.h`. The Go bindings
## (sds-go-bindings) must match this API.
##
## The one exception is `sds_set_retrieval_hint_provider`: it takes a C
## function pointer, which has no sensible CBOR representation, so it is
## hand-written and dispatched to the worker thread (the pointers travel as
## uint64 through the request channel) to store the provider in a thread-local.
import system/ansi_c
import ffi
import sds
# Bootstrap (pragmas, linker flags, libsdsNimMain, initializeLibrary) plus the
# `sds_add_event_listener` / `sds_remove_event_listener` C exports and the
# per-type `ReliabilityManagerFFIPool` used by the hand-written entry point
# below (the ffiCtor/ffiDtor macros declare it too, guarded by `when not
# declared`).
declareLibrary("sds", ReliabilityManager)
type SdsRetrievalHintProvider* = proc(
messageId: cstring, hint: ptr cstring, hintLen: ptr csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
# The active retrieval-hint provider, stored per worker thread (one thread per
# context). Set by sds_set_retrieval_hint_provider via a dispatched request so
# the write lands on the worker thread, where the manager's hint closure reads
# it during message processing.
var sdsRetrievalHintCb {.threadvar.}: pointer
var sdsRetrievalHintUserData {.threadvar.}: pointer
################################################################################
### CBOR-marshalled request/response types
type SdsConfig* {.ffi.} = object
participantId: string ## empty disables SDS-R (see newReliabilityManager)
type SdsWrapRequest* {.ffi.} = object
message: seq[byte]
messageId: string
channelId: string
type SdsWrapResponse* {.ffi.} = object
message: seq[byte]
type SdsUnwrapRequest* {.ffi.} = object
message: seq[byte]
# One missing dependency: the message id plus an optional retrieval hint. The
# hint is a raw byte string on the CBOR wire (no base64, unlike the old JSON).
type SdsMissingDep* {.ffi.} = object
messageId: string
retrievalHint: seq[byte]
type SdsUnwrapResponse* {.ffi.} = object
message: seq[byte]
channelId: string
missingDeps: seq[SdsMissingDep]
type SdsMarkDependenciesRequest* {.ffi.} = object
messageIds: seq[string]
channelId: string
################################################################################
### Library-initiated events (CBOR EventEnvelope via {.ffiEvent.})
type SdsMessageReadyEvent* {.ffi.} = object
messageId: string
channelId: string
type SdsMessageSentEvent* {.ffi.} = object
messageId: string
channelId: string
type SdsMissingDependenciesEvent* {.ffi.} = object
messageId: string
missingDeps: seq[SdsMissingDep]
channelId: string
type SdsPeriodicSyncEvent* {.ffi.} = object
ok: bool ## carries no data; a field keeps the CBOR map non-degenerate
type SdsRepairReadyEvent* {.ffi.} = object
message: seq[byte]
channelId: string
proc onMessageReady*(evt: SdsMessageReadyEvent) {.ffiEvent: "message_ready".}
proc onMessageSent*(evt: SdsMessageSentEvent) {.ffiEvent: "message_sent".}
proc onMissingDependencies*(
evt: SdsMissingDependenciesEvent
) {.ffiEvent: "missing_dependencies".}
proc onPeriodicSync*(evt: SdsPeriodicSyncEvent) {.ffiEvent: "periodic_sync".}
proc onRepairReady*(evt: SdsRepairReadyEvent) {.ffiEvent: "repair_ready".}
proc toMissingDeps(entries: seq[HistoryEntry]): seq[SdsMissingDep] =
var deps = newSeq[SdsMissingDep](entries.len)
for i, entry in entries:
deps[i] =
SdsMissingDep(messageId: entry.messageId, retrievalHint: entry.retrievalHint)
return deps
################################################################################
### Constructor — creates the FFI context and the ReliabilityManager.
###
### The event closures run on the worker thread and forward CBOR payloads to
### the listeners registered via sds_add_event_listener (the {.ffiEvent.} procs
### read the per-thread event queue, so no context handle is needed here).
proc sdsCreate*(
config: SdsConfig
): Future[Result[ReliabilityManager, string]] {.ffiCtor.} =
# The ctor body runs on the (possibly recycled) worker thread. Drop any
# retrieval-hint provider left over from a previous owner of this thread so a
# stale C function pointer is never invoked.
sdsRetrievalHintCb = nil
sdsRetrievalHintUserData = nil
let rm = newReliabilityManager(
participantId = config.participantId.SdsParticipantID
).valueOr:
error "Failed creating reliability manager", error = error
return err("Failed creating reliability manager: " & $error)
let messageReadyCb = proc(
messageId: SdsMessageID, channelId: SdsChannelID
) {.gcsafe.} =
onMessageReady(SdsMessageReadyEvent(messageId: messageId, channelId: channelId))
let messageSentCb = proc(
messageId: SdsMessageID, channelId: SdsChannelID
) {.gcsafe.} =
onMessageSent(SdsMessageSentEvent(messageId: messageId, channelId: channelId))
let missingDependenciesCb = proc(
messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.} =
onMissingDependencies(
SdsMissingDependenciesEvent(
messageId: messageId,
missingDeps: toMissingDeps(missingDeps),
channelId: channelId,
)
)
let periodicSyncCb = proc() {.gcsafe.} =
onPeriodicSync(SdsPeriodicSyncEvent(ok: true))
let repairReadyCb = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} =
onRepairReady(SdsRepairReadyEvent(message: message, channelId: channelId))
let retrievalHintProvider = proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} =
if sdsRetrievalHintCb.isNil():
return @[]
var hint: cstring
var hintLen: csize_t
cast[SdsRetrievalHintProvider](sdsRetrievalHintCb)(
messageId.cstring, addr hint, addr hintLen, sdsRetrievalHintUserData
)
if not hint.isNil() and hintLen > 0:
var hintBytes = newSeq[byte](hintLen)
copyMem(addr hintBytes[0], hint, hintLen)
# The provider allocates *hint with libc malloc (Go's C.CBytes); free it
# with libc free, not Nim's deallocShared, to keep the allocator paired.
c_free(cast[pointer](hint))
return hintBytes
return @[]
await rm.setCallbacks(
messageReadyCb, messageSentCb, missingDependenciesCb, periodicSyncCb,
retrievalHintProvider, repairReadyCb,
)
return ok(rm)
################################################################################
### Async methods — each runs its body on the worker thread.
proc sdsWrapOutgoingMessage*(
rm: ReliabilityManager, req: SdsWrapRequest
): Future[Result[SdsWrapResponse, string]] {.ffi.} =
let wrapped = (
await wrapOutgoingMessage(rm, req.message, req.messageId, req.channelId)
).valueOr:
error "WRAP_MESSAGE failed", error = error
return err("error processing wrap request: " & $error)
return ok(SdsWrapResponse(message: wrapped))
proc sdsUnwrapReceivedMessage*(
rm: ReliabilityManager, req: SdsUnwrapRequest
): Future[Result[SdsUnwrapResponse, string]] {.ffi.} =
let (unwrapped, missingDeps, channelId) = (
await unwrapReceivedMessage(rm, req.message)
).valueOr:
return err("error processing unwrap request: " & $error)
return ok(
SdsUnwrapResponse(
message: unwrapped, channelId: channelId, missingDeps: toMissingDeps(missingDeps)
)
)
proc sdsMarkDependenciesMet*(
rm: ReliabilityManager, req: SdsMarkDependenciesRequest
): Future[Result[string, string]] {.ffi.} =
(await markDependenciesMet(rm, req.messageIds, req.channelId)).isOkOr:
error "MARK_DEPENDENCIES_MET failed", error = error
return err("error processing mark-dependencies request: " & $error)
return ok("")
proc sdsReset*(rm: ReliabilityManager): Future[Result[string, string]] {.ffi.} =
(await resetReliabilityManager(rm)).isOkOr:
error "RESET failed", error = error
return err("error processing reset request: " & $error)
return ok("")
proc sdsStartPeriodicTasks*(
rm: ReliabilityManager
): Future[Result[string, string]] {.ffi.} =
# The empty await forces the macro down its async path so the body runs on the
# worker thread — startPeriodicTasks schedules futures on that thread's loop.
await sleepAsync(chronos.milliseconds(0))
rm.startPeriodicTasks()
return ok("")
################################################################################
### Destructor — runs library cleanup then tears down the FFI context.
proc sdsDestroy*(rm: ReliabilityManager) {.ffiDtor.} =
discard
################################################################################
### Retrieval-hint provider (hand-written: a C function pointer cannot be passed
### as CBOR). The setter dispatches a request — the provider/userData pointers
### travel as uint64 — so the provider is stored in the worker thread's
### thread-local, where sdsCreate's hint closure reads it.
proc sdsNoopCallback(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
discard
registerReqFFI(SdsSetHintReq, ctx: ptr FFIContext[ReliabilityManager]):
proc(cbPtr: uint64, udPtr: uint64): Future[Result[string, string]] {.async.} =
sdsRetrievalHintCb = cast[pointer](cbPtr)
sdsRetrievalHintUserData = cast[pointer](udPtr)
return ok("")
proc sds_set_retrieval_hint_provider(
ctx: ptr FFIContext[ReliabilityManager],
callback: SdsRetrievalHintProvider,
userData: pointer,
): cint {.dynlib, exportc, cdecl, raises: [].} =
initializeLibrary()
if not ReliabilityManagerFFIPool.isValidCtx(cast[pointer](ctx)):
return RET_ERR
let sendRes =
try:
ffi_context.sendRequestToFFIThread(
ctx,
SdsSetHintReq.ffiNewReq(
sdsNoopCallback, nil, cast[uint64](cast[pointer](callback)),
cast[uint64](userData),
),
)
except Exception as exc:
Result[void, string].err("sendRequestToFFIThread exception: " & exc.msg)
if sendRes.isErr():
return RET_ERR
return RET_OK
# Emit binding metadata (no-op unless -d:ffiGenBindings). Must follow every
# {.ffi.}/{.ffiCtor.}/{.ffiDtor.} annotation.
genBindings()