diff --git a/.gitignore b/.gitignore index 08a21fb..b8be30d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,6 @@ tests/test_reliability tests/bloom nph -bindings/generated* +docs +for_reference do_not_commit diff --git a/bindings/bindings.h b/bindings/bindings.h new file mode 100644 index 0000000..a450453 --- /dev/null +++ b/bindings/bindings.h @@ -0,0 +1,131 @@ +#ifndef BINDINGS_H +#define BINDINGS_H + +#include // For size_t +#include // For standard integer types +#include // For bool type + +#ifdef __cplusplus +extern "C" { +#endif + + +// Opaque struct declaration (handle replaces direct pointer usage) +typedef struct ReliabilityManager ReliabilityManager; // Keep forward declaration + +// Define MessageID as a C string +typedef const char* MessageID; // Keep const for the typedef itself + +// --- Result Types --- + +typedef struct { + bool is_ok; + char* error_message; +} CResult; + +typedef struct { + CResult base_result; + unsigned char* message; + size_t message_len; + MessageID* missing_deps; + size_t missing_deps_count; +} CUnwrapResult; + +typedef struct { + CResult base_result; + unsigned char* message; + size_t message_len; +} CWrapResult; + + +// --- Callback Function Pointer Types --- +// Keep const char* here as these are inputs *to* the callback +typedef void (*MessageReadyCallback)(const char* messageID); +typedef void (*MessageSentCallback)(const char* messageID); +typedef void (*MissingDependenciesCallback)(const char* messageID, const char** missingDeps, size_t missingDepsCount); +typedef void (*PeriodicSyncCallback)(void* user_data); + + +// --- Core API Functions --- + +/** + * @brief Creates a new ReliabilityManager instance. + * @param channelId A unique identifier for the communication channel. + * @return An opaque handle (void*) representing the instance, or NULL on failure. + */ +void* NewReliabilityManager(char* channelId); + +/** + * @brief Cleans up resources associated with a ReliabilityManager instance. + * @param handle The opaque handle (void*) of the instance to clean up. + */ +void CleanupReliabilityManager(void* handle); + +/** + * @brief Resets the ReliabilityManager instance. + * @param handle The opaque handle (void*) of the instance. + * @return CResult indicating success or failure. + */ +CResult ResetReliabilityManager(void* handle); +/** + * @brief Wraps an outgoing message. + * @param handle The opaque handle (void*) of the instance. + * @param message Pointer to the raw message content. + * @param messageLen Length of the raw message content. + * @param messageId A unique identifier for this message. + * @return CWrapResult containing the wrapped message or an error. + */ +CWrapResult WrapOutgoingMessage(void* handle, void* message, size_t messageLen, char* messageId); +/** + * @brief Unwraps a received message. + * @param handle The opaque handle (void*) of the instance. + * @param message Pointer to the received message data. + * @param messageLen Length of the received message data. + * @return CUnwrapResult containing the unwrapped content, missing dependencies, or an error. + */ +CUnwrapResult UnwrapReceivedMessage(void* handle, void* message, size_t messageLen); + +/** + * @brief Marks specified message dependencies as met. + * @param handle The opaque handle (void*) of the instance. + * @param messageIDs An array of message IDs to mark as met. + * @param count The number of message IDs in the array. + * @return CResult indicating success or failure. + */ +CResult MarkDependenciesMet(void* handle, char*** messageIDs, size_t count); + +/** + * @brief Registers callback functions. + * @param handle The opaque handle (void*) of the instance. + * @param messageReady Callback for when a message is ready. + * @param messageSent Callback for when an outgoing message is acknowledged. + * @param missingDependencies Callback for when missing dependencies are detected. + * @param periodicSync Callback for periodic sync suggestions. + * @param user_data A pointer to user-defined data passed to callbacks. + */ +void RegisterCallbacks(void* handle, + void* messageReady, + void* messageSent, + void* missingDependencies, + void* periodicSync, + void* user_data); // Keep user_data, align with Nim proc + +/** + * @brief Starts the background periodic tasks. + * @param handle The opaque handle (void*) of the instance. + */ +void StartPeriodicTasks(void* handle); + + +// --- Memory Freeing Functions --- + +void FreeCResultError(CResult result); +void FreeCWrapResult(CWrapResult result); +void FreeCUnwrapResult(CUnwrapResult result); + + +#ifdef __cplusplus +} // extern "C" +#endif + +#endif // BINDINGS_H diff --git a/bindings/bindings.nim b/bindings/bindings.nim index 3e1eba1..27ddb91 100644 --- a/bindings/bindings.nim +++ b/bindings/bindings.nim @@ -1,242 +1,283 @@ -import genny -import std/[times, strutils] +import std/[locks, typetraits] +import chronos import results -import ../src/[reliability, message, reliability_utils, rolling_bloom_filter] +import ../src/[reliability, reliability_utils, message] + +# --- C Type Definitions --- -# Define required sequence wrapper types for C FFI type - SeqByte* = ref object - s*: seq[byte] - - SeqMessageID* = ref object - s*: seq[MessageID] - - SeqMessage* = ref object - s*: seq[Message] - - SeqUnacknowledgedMessage* = ref object - s*: seq[UnacknowledgedMessage] + CReliabilityManagerHandle* = pointer -# Error handling -var lastError: ReliabilityError + CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object + is_ok*: bool + error_message*: cstring -proc takeError(): string = - result = $lastError - lastError = ReliabilityError.reInternalError # Reset to default + CWrapResult* {.importc: "CWrapResult", header: "bindings.h", bycopy.} = object + base_result*: CResult + message*: pointer + message_len*: csize -proc checkError(): bool = - result = lastError != ReliabilityError.reInternalError + CUnwrapResult* {.importc: "CUnwrapResult", header: "bindings.h", bycopy.} = object + base_result*: CResult + message*: pointer + message_len*: csize + missing_deps*: ptr ptr cstring + missing_deps_count*: csize -# Callback function types for C FFI -type - CMessageReadyCallback* = proc(messageId: cstring) {.cdecl, gcsafe.} - CMessageSentCallback* = proc(messageId: cstring) {.cdecl, gcsafe.} - CMissingDepsCallback* = proc(messageId: cstring, missingDeps: cstring, count: cint) {.cdecl, gcsafe.} - CPeriodicSyncCallback* = proc() {.cdecl, gcsafe.} + # Callback Types + CMessageReadyCallback* = proc (messageID: cstring) {.cdecl, gcsafe, raises: [].} + CMessageSentCallback* = proc (messageID: cstring) {.cdecl, gcsafe, raises: [].} + CMissingDependenciesCallback* = proc (messageID: cstring, missingDeps: ptr ptr cstring, missingDepsCount: csize) {.cdecl, gcsafe, raises: [].} + CPeriodicSyncCallback* = proc (user_data: pointer) {.cdecl, gcsafe, raises: [].} -# Global callback storage -var - onMessageReadyCallback: CMessageReadyCallback - onMessageSentCallback: CMessageSentCallback - onMissingDepsCallback: CMissingDepsCallback - onPeriodicSyncCallback: CPeriodicSyncCallback +# --- Memory Management Helpers --- -# Register callbacks -proc registerMessageReadyCallback*(callback: CMessageReadyCallback) = - onMessageReadyCallback = callback +proc allocCString*(s: string): cstring {.inline, gcsafe.} = + if s.len == 0: return nil + result = cast[cstring](allocShared(s.len + 1)) + copyMem(result, s.cstring, s.len + 1) -proc registerMessageSentCallback*(callback: CMessageSentCallback) = - onMessageSentCallback = callback +proc allocSeqByte*(s: seq[byte]): (pointer, csize) {.inline, gcsafe.} = + if s.len == 0: return (nil, 0) + let len = s.len + let bufferPtr = allocShared(len) + if len > 0: + copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural) + return (bufferPtr, len.csize) -proc registerMissingDepsCallback*(callback: CMissingDepsCallback) = - onMissingDepsCallback = callback +proc allocSeqCString*(s: seq[string]): (ptr ptr cstring, csize) {.inline, gcsafe, cdecl.} = + if s.len == 0: return (nil, 0) + let count = s.len + let arrPtr = cast[ptr ptr cstring](allocShared(count * sizeof(cstring))) + for i in 0.. 0: - try: - let joinedDeps = missingDeps.join(",") - onMissingDepsCallback(cstring(messageId), cstring(joinedDeps), cint(missingDeps.len)) - except: - discard +# --- Result Conversion Helpers --- -proc onPeriodicSyncAdapter() {.gcsafe, raises: [].} = - if onPeriodicSyncCallback != nil: - try: - onPeriodicSyncCallback() - except: - discard +proc toCResultOk*(): CResult = + CResult(is_ok: true, error_message: nil) -# Apply registered callbacks to a ReliabilityManager -proc applyCallbacks*(rm: ReliabilityManager): bool = - if rm == nil: - lastError = ReliabilityError.reInvalidArgument - return false - - try: - rm.setCallbacks( - onMessageReadyAdapter, - onMessageSentAdapter, - onMissingDependenciesAdapter, - onPeriodicSyncAdapter +proc toCResultErr*(err: ReliabilityError): CResult = + CResult(is_ok: false, error_message: allocCString($err)) + +proc toCResultErrStr*(errMsg: string): CResult = + CResult(is_ok: false, error_message: allocCString(errMsg)) + +# --- Callback Wrappers (Nim -> C) --- +# These still accept the ReliabilityManager instance directly + +# These wrappers now need to handle the user_data explicitly if needed, +# but the C callbacks themselves don't take it directly anymore (except PeriodicSync). +# The user_data is stored in rm.cUserData. + +proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} = + let cbPtr = rm.cMessageReadyCallback + if cbPtr != nil: + let cb = cast[CMessageReadyCallback](cbPtr) + # Call the C callback without user_data, as per the updated typedef + cb(messageId.cstring) + +proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} = + let cbPtr = rm.cMessageSentCallback + if cbPtr != nil: + let cb = cast[CMessageSentCallback](cbPtr) + # Call the C callback without user_data + cb(messageId.cstring) + +proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + let cbPtr = rm.cMissingDependenciesCallback + if cbPtr != nil: + var cDeps = newSeq[cstring](missingDeps.len) + for i, dep in missingDeps: + cDeps[i] = dep.cstring + let cDepsPtr = if cDeps.len > 0: cDeps[0].addr else: nil + let cb = cast[CMissingDependenciesCallback](cbPtr) + # Call the C callback without user_data + cb(messageId.cstring, cast[ptr ptr cstring](cDepsPtr), missingDeps.len.csize) + +proc nimPeriodicSyncCallback(rm: ReliabilityManager) {.gcsafe.} = + let cbPtr = rm.cPeriodicSyncCallback + if cbPtr != nil: + let cb = cast[CPeriodicSyncCallback](cbPtr) + cb(rm.cUserData) + +# --- Exported C Functions - Using Opaque Pointer (pointer/void*) --- + +proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} = + let channelId = $channelIdCStr + if channelId.len == 0: + echo "Error creating ReliabilityManager: Channel ID cannot be empty" + return nil # Return nil pointer + let rmResult = newReliabilityManager(channelId) + if rmResult.isOk: + let rm = rmResult.get() + # Initialize C callback fields to nil + rm.cMessageReadyCallback = nil + rm.cMessageSentCallback = nil + rm.cMissingDependenciesCallback = nil + rm.cPeriodicSyncCallback = nil + rm.cUserData = nil + # Assign Nim wrappers that capture the 'rm' instance directly + rm.onMessageReady = proc(msgId: MessageID) {.gcsafe.} = nimMessageReadyCallback(rm, msgId) + rm.onMessageSent = proc(msgId: MessageID) {.gcsafe.} = nimMessageSentCallback(rm, msgId) + rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) {.gcsafe.} = nimMissingDependenciesCallback(rm, msgId, deps) + rm.onPeriodicSync = proc() {.gcsafe.} = nimPeriodicSyncCallback(rm) + + # Return the Nim ref object cast to the opaque pointer type + return cast[CReliabilityManagerHandle](rm) + else: + echo "Error creating ReliabilityManager: ", rmResult.error + return nil # Return nil pointer + +proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl, gcsafe.} = + if handle != nil: + # Cast opaque pointer back to Nim ref type + let rm = cast[ReliabilityManager](handle) + cleanup(rm) # Call Nim cleanup + # Nim GC will collect 'rm' eventually as the handle is the only reference + else: + echo "Warning: CleanupReliabilityManager called with NULL handle" + +proc ResetReliabilityManager*(handle: CReliabilityManagerHandle): CResult {.exportc, dynlib, cdecl, gcsafe.} = + if handle == nil: + return toCResultErrStr("ReliabilityManager handle is NULL") + let rm = cast[ReliabilityManager](handle) # Cast opaque pointer + let result = resetReliabilityManager(rm) + if result.isOk: + return toCResultOk() + else: + return toCResultErr(result.error) + +proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl, gcsafe.} = + if handle == nil: + return CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) + let rm = cast[ReliabilityManager](handle) # Cast opaque pointer + + if messageC == nil and messageLen > 0: + return CWrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0")) + if messageIdCStr == nil: + return CWrapResult(base_result: toCResultErrStr("Message ID pointer is NULL")) + + let messageId = $messageIdCStr + var messageNim: seq[byte] + if messageLen > 0: + messageNim = newSeq[byte](messageLen) + copyMem(messageNim[0].addr, messageC, messageLen.Natural) + else: + messageNim = @[] + + let wrapResult = wrapOutgoingMessage(rm, messageNim, messageId) + if wrapResult.isOk: + let (wrappedDataPtr, wrappedDataLen) = allocSeqByte(wrapResult.get()) + return CWrapResult( + base_result: toCResultOk(), + message: wrappedDataPtr, + message_len: wrappedDataLen ) - return true - except: - lastError = ReliabilityError.reInternalError - return false - -# Wrapper for creating a ReliabilityManager -proc safeNewReliabilityManager(channelId: string, config: ReliabilityConfig = defaultConfig()): ReliabilityManager = - let res = newReliabilityManager(channelId, config) - if res.isOk: - return res.get else: - lastError = res.error - return nil + return CWrapResult(base_result: toCResultErr(wrapResult.error)) -# Wrapper for wrapping outgoing messages -proc safeWrapOutgoingMessage(rm: ReliabilityManager, message: seq[byte], messageId: MessageID): seq[byte] = - if rm == nil: - lastError = ReliabilityError.reInvalidArgument - return @[] - - let res = rm.wrapOutgoingMessage(message, messageId) - if res.isOk: - return res.get +proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize): CUnwrapResult {.exportc, dynlib, cdecl, gcsafe.} = + if handle == nil: + return CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) + let rm = cast[ReliabilityManager](handle) # Cast opaque pointer + + if messageC == nil and messageLen > 0: + return CUnwrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0")) + + var messageNim: seq[byte] + if messageLen > 0: + messageNim = newSeq[byte](messageLen) + copyMem(messageNim[0].addr, messageC, messageLen.Natural) else: - lastError = res.error - return @[] + messageNim = @[] -# Wrapper for unwrapping received messages -proc safeUnwrapReceivedMessage(rm: ReliabilityManager, message: seq[byte]): tuple[message: seq[byte], missingDeps: seq[MessageID]] = - if rm == nil: - lastError = ReliabilityError.reInvalidArgument - return (@[], @[]) - - let res = rm.unwrapReceivedMessage(message) - if res.isOk: - return res.get + let unwrapResult = unwrapReceivedMessage(rm, messageNim) + if unwrapResult.isOk: + let (unwrappedContent, missingDepsNim) = unwrapResult.get() + let (contentPtr, contentLen) = allocSeqByte(unwrappedContent) + let (depsPtr, depsCount) = allocSeqCString(missingDepsNim) + return CUnwrapResult( + base_result: toCResultOk(), + message: contentPtr, + message_len: contentLen, + missing_deps: depsPtr, + missing_deps_count: depsCount + ) else: - lastError = res.error - return (@[], @[]) + return CUnwrapResult(base_result: toCResultErr(unwrapResult.error)) -# Wrapper for marking dependencies as met -proc safeMarkDependenciesMet(rm: ReliabilityManager, messageIds: seq[MessageID]): bool = - if rm == nil: - lastError = ReliabilityError.reInvalidArgument - return false - - let res = rm.markDependenciesMet(messageIds) - if res.isOk: - return true +proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr ptr cstring, count: csize): CResult {.exportc, dynlib, cdecl, gcsafe.} = + if handle == nil: + return toCResultErrStr("ReliabilityManager handle is NULL") + let rm = cast[ReliabilityManager](handle) # Cast opaque pointer + + if messageIDsC == nil and count > 0: + return toCResultErrStr("MessageIDs pointer is NULL but count > 0") + + var messageIDsNim = newSeq[string](count) + for i in 0..= 2.0.8" requires "chronicles" requires "libp2p" -requires "genny >= 0.1.0" # Tasks task test, "Run the test suite": diff --git a/sds_wrapper.go b/sds_wrapper.go new file mode 100644 index 0000000..69204c4 --- /dev/null +++ b/sds_wrapper.go @@ -0,0 +1,307 @@ +package main + +/* +#cgo CFLAGS: -I${SRCDIR}/bindings +#cgo LDFLAGS: -L${SRCDIR}/bindings/generated -lbindings +#cgo LDFLAGS: -Wl,-rpath,${SRCDIR}/bindings/generated + +#include // For C.free +#include "bindings/bindings.h" // Update include path + +// Forward declarations for Go callback functions exported to C +// These are the functions Nim will eventually call via the pointers we give it. +extern void goMessageReadyCallback(char* messageID); +extern void goMessageSentCallback(char* messageID); +extern void goMissingDependenciesCallback(char* messageID, char** missingDeps, size_t missingDepsCount); +extern void goPeriodicSyncCallback(); + +// Helper function to call the C memory freeing functions +static void callFreeCResultError(CResult res) { FreeCResultError(res); } +static void callFreeCWrapResult(CWrapResult res) { FreeCWrapResult(res); } +static void callFreeCUnwrapResult(CUnwrapResult res) { FreeCUnwrapResult(res); } + +*/ +import "C" +import ( + "errors" + "fmt" + "sync" + "unsafe" +) + +// --- Go Types --- + +// ReliabilityManagerHandle represents the opaque handle to the Nim object +type ReliabilityManagerHandle unsafe.Pointer + +// MessageID is a type alias for string for clarity +type MessageID string + +// Callbacks holds the Go functions to be called by the Nim library +type Callbacks struct { + OnMessageReady func(messageId MessageID) + OnMessageSent func(messageId MessageID) + OnMissingDependencies func(messageId MessageID, missingDeps []MessageID) + OnPeriodicSync func() +} + +// Global map to store callbacks associated with handles (necessary due to cgo limitations) +var ( + callbackRegistry = make(map[ReliabilityManagerHandle]*Callbacks) + registryMutex sync.RWMutex +) + +// --- Go Wrapper Functions --- + +// NewReliabilityManager creates a new instance of the Nim ReliabilityManager +func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) { + cChannelId := C.CString(channelId) + defer C.free(unsafe.Pointer(cChannelId)) + + handle := C.NewReliabilityManager(cChannelId) + if handle == nil { + // Note: Nim side currently just prints to stdout on creation failure + return nil, errors.New("failed to create ReliabilityManager (check Nim logs/stdout)") + } + return ReliabilityManagerHandle(handle), nil +} + +// CleanupReliabilityManager frees the resources associated with the handle +func CleanupReliabilityManager(handle ReliabilityManagerHandle) { + if handle == nil { + return + } + registryMutex.Lock() + delete(callbackRegistry, handle) + registryMutex.Unlock() + C.CleanupReliabilityManager(unsafe.Pointer(handle)) +} + +// ResetReliabilityManager resets the state of the manager +func ResetReliabilityManager(handle ReliabilityManagerHandle) error { + if handle == nil { + return errors.New("handle is nil") + } + cResult := C.ResetReliabilityManager(unsafe.Pointer(handle)) + if !cResult.is_ok { + errMsg := C.GoString(cResult.error_message) + C.callFreeCResultError(cResult) // Free the error message + return errors.New(errMsg) + } + return nil +} + +// WrapOutgoingMessage wraps a message with reliability metadata +func WrapOutgoingMessage(handle ReliabilityManagerHandle, message []byte, messageId MessageID) ([]byte, error) { + if handle == nil { + return nil, errors.New("handle is nil") + } + cMessageId := C.CString(string(messageId)) + defer C.free(unsafe.Pointer(cMessageId)) + + var cMessagePtr unsafe.Pointer + if len(message) > 0 { + cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed + defer C.free(cMessagePtr) + } else { + cMessagePtr = nil + } + cMessageLen := C.size_t(len(message)) + + cWrapResult := C.WrapOutgoingMessage(unsafe.Pointer(handle), cMessagePtr, cMessageLen, cMessageId) + + if !cWrapResult.base_result.is_ok { + errMsg := C.GoString(cWrapResult.base_result.error_message) + C.callFreeCWrapResult(cWrapResult) // Free error and potentially allocated message + return nil, errors.New(errMsg) + } + + // Copy the wrapped message from C memory to Go slice + // Explicitly cast the message pointer to unsafe.Pointer + wrappedMessage := C.GoBytes(unsafe.Pointer(cWrapResult.message), C.int(cWrapResult.message_len)) + C.callFreeCWrapResult(cWrapResult) // Free the C-allocated message buffer + + return wrappedMessage, nil +} + +// UnwrapReceivedMessage unwraps a received message +func UnwrapReceivedMessage(handle ReliabilityManagerHandle, message []byte) ([]byte, []MessageID, error) { + if handle == nil { + return nil, nil, errors.New("handle is nil") + } + + var cMessagePtr unsafe.Pointer + if len(message) > 0 { + cMessagePtr = C.CBytes(message) + defer C.free(cMessagePtr) + } else { + cMessagePtr = nil + } + cMessageLen := C.size_t(len(message)) + + cUnwrapResult := C.UnwrapReceivedMessage(unsafe.Pointer(handle), cMessagePtr, cMessageLen) + + if !cUnwrapResult.base_result.is_ok { + errMsg := C.GoString(cUnwrapResult.base_result.error_message) + C.callFreeCUnwrapResult(cUnwrapResult) // Free error and potentially allocated fields + return nil, nil, errors.New(errMsg) + } + + // Copy unwrapped message content + // Explicitly cast the message pointer to unsafe.Pointer + unwrappedContent := C.GoBytes(unsafe.Pointer(cUnwrapResult.message), C.int(cUnwrapResult.message_len)) + + // Copy missing dependencies + missingDeps := make([]MessageID, cUnwrapResult.missing_deps_count) + if cUnwrapResult.missing_deps_count > 0 { + // Convert C array of C strings to Go slice of strings + cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(cUnwrapResult.missing_deps))[:cUnwrapResult.missing_deps_count:cUnwrapResult.missing_deps_count] + for i, s := range cDepsArray { + missingDeps[i] = MessageID(C.GoString(s)) + } + } + + C.callFreeCUnwrapResult(cUnwrapResult) // Free C-allocated message, deps array, and strings + + return unwrappedContent, missingDeps, nil +} + +// MarkDependenciesMet informs the library that dependencies are met +func MarkDependenciesMet(handle ReliabilityManagerHandle, messageIDs []MessageID) error { + if handle == nil { + return errors.New("handle is nil") + } + if len(messageIDs) == 0 { + return nil // Nothing to do + } + + // Convert Go string slice to C array of C strings (char**) + cMessageIDs := make([]*C.char, len(messageIDs)) + for i, id := range messageIDs { + cMessageIDs[i] = C.CString(string(id)) + defer C.free(unsafe.Pointer(cMessageIDs[i])) // Ensure each CString is freed + } + + // Create a pointer (**C.char) to the first element of the slice + var cMessageIDsPtr **C.char + if len(cMessageIDs) > 0 { + cMessageIDsPtr = &cMessageIDs[0] + } else { + cMessageIDsPtr = nil // Handle empty slice case + } + + // Pass the address of the pointer variable (&cMessageIDsPtr), which is of type ***C.char + cResult := C.MarkDependenciesMet(unsafe.Pointer(handle), &cMessageIDsPtr, C.size_t(len(messageIDs))) + + if !cResult.is_ok { + errMsg := C.GoString(cResult.error_message) + C.callFreeCResultError(cResult) + return errors.New(errMsg) + } + return nil +} + +// RegisterCallbacks sets the Go callback functions +func RegisterCallbacks(handle ReliabilityManagerHandle, callbacks Callbacks) error { + if handle == nil { + return errors.New("handle is nil") + } + + registryMutex.Lock() + callbackRegistry[handle] = &callbacks + registryMutex.Unlock() + + // Pass the C relay functions to Nim + // Nim will store these function pointers. When Nim calls them, they execute the C relay, + // Pass pointers to the exported Go functions directly. + // Nim expects function pointers matching the C callback typedefs. + // Cgo makes the exported Go functions available as C function pointers. + // Cast these function pointers to unsafe.Pointer to match the void* expected by the C function. + C.RegisterCallbacks( + unsafe.Pointer(handle), + unsafe.Pointer(C.goMessageReadyCallback), + unsafe.Pointer(C.goMessageSentCallback), + unsafe.Pointer(C.goMissingDependenciesCallback), + unsafe.Pointer(C.goPeriodicSyncCallback), + unsafe.Pointer(handle), // Pass handle as user_data + ) + return nil +} + +// StartPeriodicTasks starts the background tasks in the Nim library +func StartPeriodicTasks(handle ReliabilityManagerHandle) error { + if handle == nil { + return errors.New("handle is nil") + } + C.StartPeriodicTasks(unsafe.Pointer(handle)) + // Assuming StartPeriodicTasks doesn't return an error status in C API + return nil +} + +// --- Go Callback Implementations (Exported to C) --- + +func goMessageReadyCallback(messageID *C.char) { + msgIdStr := C.GoString(messageID) + registryMutex.RLock() + defer registryMutex.RUnlock() + + // Find the correct Go callback based on handle (this is tricky without handle passed) + // For now, iterate through all registered callbacks. This is NOT ideal for multiple managers. + // A better approach would involve passing the handle back through user_data if possible, + // or maintaining a single global callback handler if only one manager instance is expected. + // Let's assume a single instance for simplicity for now. + for _, callbacks := range callbackRegistry { + if callbacks != nil && callbacks.OnMessageReady != nil { + // Run in a goroutine to avoid blocking the C thread + go callbacks.OnMessageReady(MessageID(msgIdStr)) + } + } + fmt.Printf("Go: Message Ready: %s\n", msgIdStr) // Debug print +} + +func goMessageSentCallback(messageID *C.char) { + msgIdStr := C.GoString(messageID) + registryMutex.RLock() + defer registryMutex.RUnlock() + + for _, callbacks := range callbackRegistry { + if callbacks != nil && callbacks.OnMessageSent != nil { + go callbacks.OnMessageSent(MessageID(msgIdStr)) + } + } + fmt.Printf("Go: Message Sent: %s\n", msgIdStr) // Debug print +} + +func goMissingDependenciesCallback(messageID *C.char, missingDeps **C.char, missingDepsCount C.size_t) { + msgIdStr := C.GoString(messageID) + deps := make([]MessageID, missingDepsCount) + if missingDepsCount > 0 { + // Convert C array of C strings to Go slice + cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(missingDeps))[:missingDepsCount:missingDepsCount] + for i, s := range cDepsArray { + deps[i] = MessageID(C.GoString(s)) + } + } + + registryMutex.RLock() + defer registryMutex.RUnlock() + + for _, callbacks := range callbackRegistry { + if callbacks != nil && callbacks.OnMissingDependencies != nil { + go callbacks.OnMissingDependencies(MessageID(msgIdStr), deps) + } + } + fmt.Printf("Go: Missing Deps for %s: %v\n", msgIdStr, deps) // Debug print +} + +func goPeriodicSyncCallback() { + registryMutex.RLock() + defer registryMutex.RUnlock() + + for _, callbacks := range callbackRegistry { + if callbacks != nil && callbacks.OnPeriodicSync != nil { + go callbacks.OnPeriodicSync() + } + } + fmt.Println("Go: Periodic Sync Requested") // Debug print +} diff --git a/sds_wrapper_test.go b/sds_wrapper_test.go new file mode 100644 index 0000000..4ef5edf --- /dev/null +++ b/sds_wrapper_test.go @@ -0,0 +1,262 @@ +package main +import ( + // "fmt" + // "sync" + "testing" + // "time" +) + +// Test basic creation, cleanup, and reset +func TestLifecycle(t *testing.T) { + channelID := "test-lifecycle" + handle, err := NewReliabilityManager(channelID) + if err != nil { + t.Fatalf("NewReliabilityManager failed: %v", err) + } + if handle == nil { + t.Fatal("NewReliabilityManager returned a nil handle") + } + defer CleanupReliabilityManager(handle) // Ensure cleanup even on test failure + + err = ResetReliabilityManager(handle) + if err != nil { + t.Errorf("ResetReliabilityManager failed: %v", err) + } +} + +// Test wrapping and unwrapping a simple message +func TestWrapUnwrap(t *testing.T) { + channelID := "test-wrap-unwrap" + handle, err := NewReliabilityManager(channelID) + if err != nil { + t.Fatalf("NewReliabilityManager failed: %v", err) + } + defer CleanupReliabilityManager(handle) + + originalPayload := []byte("hello reliability") + messageID := MessageID("msg-wrap-1") + + wrappedMsg, err := WrapOutgoingMessage(handle, originalPayload, messageID) + if err != nil { + t.Fatalf("WrapOutgoingMessage failed: %v", err) + } + if len(wrappedMsg) == 0 { + t.Fatal("WrapOutgoingMessage returned empty bytes") + } + + // Simulate receiving the wrapped message + unwrappedPayload, missingDeps, err := UnwrapReceivedMessage(handle, wrappedMsg) + if err != nil { + t.Fatalf("UnwrapReceivedMessage failed: %v", err) + } + + if string(unwrappedPayload) != string(originalPayload) { + t.Errorf("Unwrapped payload mismatch: got %q, want %q", unwrappedPayload, originalPayload) + } + if len(missingDeps) != 0 { + t.Errorf("Expected 0 missing dependencies, got %d: %v", len(missingDeps), missingDeps) + } +} + +// // Test dependency handling +// func TestDependencies(t *testing.T) { +// channelID := "test-deps" +// handle, err := NewReliabilityManager(channelID) +// if err != nil { +// t.Fatalf("NewReliabilityManager failed: %v", err) +// } +// defer CleanupReliabilityManager(handle) + +// // 1. Send message 1 (will become a dependency) +// payload1 := []byte("message one") +// msgID1 := MessageID("msg-dep-1") +// wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1) +// if err != nil { +// t.Fatalf("WrapOutgoingMessage (1) failed: %v", err) +// } +// // Simulate receiving msg1 to add it to history (implicitly acknowledges it) +// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1) +// if err != nil { +// t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err) +// } + +// // 2. Send message 2 (depends on message 1 implicitly via causal history) +// payload2 := []byte("message two") +// msgID2 := MessageID("msg-dep-2") +// wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2) +// if err != nil { +// t.Fatalf("WrapOutgoingMessage (2) failed: %v", err) +// } + +// // 3. Create a new manager to simulate a different peer receiving msg2 without msg1 +// handle2, err := NewReliabilityManager(channelID) // Same channel ID +// if err != nil { +// t.Fatalf("NewReliabilityManager (2) failed: %v", err) +// } +// defer CleanupReliabilityManager(handle2) + +// // 4. Unwrap message 2 on the second manager - should report msg1 as missing +// _, missingDeps, err := UnwrapReceivedMessage(handle2, wrappedMsg2) +// if err != nil { +// t.Fatalf("UnwrapReceivedMessage (2) on handle2 failed: %v", err) +// } + +// if len(missingDeps) == 0 { +// t.Fatalf("Expected missing dependencies, got none") +// } +// foundDep1 := false +// for _, dep := range missingDeps { +// if dep == msgID1 { +// foundDep1 = true +// break +// } +// } +// if !foundDep1 { +// t.Errorf("Expected missing dependency %q, got %v", msgID1, missingDeps) +// } + +// // 5. Mark the dependency as met +// err = MarkDependenciesMet(handle2, []MessageID{msgID1}) +// if err != nil { +// t.Fatalf("MarkDependenciesMet failed: %v", err) +// } + +// // Ideally, we'd check if the message is now moved from an internal buffer, +// // but the current API doesn't expose buffer state. We rely on callbacks for this. +// } + +// // Test callbacks +// func TestCallbacks(t *testing.T) { +// channelID := "test-callbacks" +// handle, err := NewReliabilityManager(channelID) +// if err != nil { +// t.Fatalf("NewReliabilityManager failed: %v", err) +// } +// defer CleanupReliabilityManager(handle) + +// var wg sync.WaitGroup +// receivedReady := make(map[MessageID]bool) +// receivedSent := make(map[MessageID]bool) +// receivedMissing := make(map[MessageID][]MessageID) +// syncRequested := false +// var cbMutex sync.Mutex // Protect access to callback tracking maps/vars + +// callbacks := Callbacks{ +// OnMessageReady: func(messageId MessageID) { +// fmt.Printf("Test: OnMessageReady received: %s\n", messageId) +// cbMutex.Lock() +// receivedReady[messageId] = true +// cbMutex.Unlock() +// wg.Done() +// }, +// OnMessageSent: func(messageId MessageID) { +// fmt.Printf("Test: OnMessageSent received: %s\n", messageId) +// cbMutex.Lock() +// receivedSent[messageId] = true +// cbMutex.Unlock() +// wg.Done() +// }, +// OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { +// fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps) +// cbMutex.Lock() +// receivedMissing[messageId] = missingDeps +// cbMutex.Unlock() +// wg.Done() +// }, +// OnPeriodicSync: func() { +// fmt.Println("Test: OnPeriodicSync received") +// cbMutex.Lock() +// syncRequested = true +// cbMutex.Unlock() +// // Don't wg.Done() here, it might be called multiple times +// }, +// } + +// err = RegisterCallbacks(handle, callbacks) +// if err != nil { +// t.Fatalf("RegisterCallbacks failed: %v", err) +// } + +// // Start tasks AFTER registering callbacks +// err = StartPeriodicTasks(handle) +// if err != nil { +// t.Fatalf("StartPeriodicTasks failed: %v", err) +// } + +// // --- Test Scenario --- + +// // 1. Send msg1 +// wg.Add(1) // Expect OnMessageSent for msg1 eventually +// payload1 := []byte("callback test 1") +// msgID1 := MessageID("cb-msg-1") +// wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1) +// if err != nil { +// t.Fatalf("WrapOutgoingMessage (1) failed: %v", err) +// } + +// // 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history) +// wg.Add(1) // Expect OnMessageReady for msg1 +// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1) +// if err != nil { +// t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err) +// } + +// // 3. Send msg2 (depends on msg1) +// wg.Add(1) // Expect OnMessageSent for msg2 eventually +// payload2 := []byte("callback test 2") +// msgID2 := MessageID("cb-msg-2") +// wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2) +// if err != nil { +// t.Fatalf("WrapOutgoingMessage (2) failed: %v", err) +// } + +// // 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2) +// wg.Add(1) // Expect OnMessageReady for msg2 +// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg2) +// if err != nil { +// t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err) +// } + +// // --- Verification --- +// // Wait for expected callbacks with a timeout +// waitTimeout(&wg, 5*time.Second, t) + +// cbMutex.Lock() +// defer cbMutex.Unlock() + +// if !receivedReady[msgID1] { +// t.Errorf("OnMessageReady not called for %s", msgID1) +// } +// if !receivedReady[msgID2] { +// t.Errorf("OnMessageReady not called for %s", msgID2) +// } +// if !receivedSent[msgID1] { +// t.Errorf("OnMessageSent not called for %s", msgID1) +// } +// if !receivedSent[msgID2] { +// t.Errorf("OnMessageSent not called for %s", msgID2) +// } +// // We didn't explicitly test missing deps in this path +// if len(receivedMissing) > 0 { +// t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing) +// } +// // Periodic sync is harder to guarantee in a short test, just check if it was ever true +// if !syncRequested { +// t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout") +// } +// } + +// // Helper function to wait for WaitGroup with a timeout +// func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) { +// c := make(chan struct{}) +// go func() { +// defer close(c) +// wg.Wait() +// }() +// select { +// case <-c: +// // Completed normally +// case <-time.After(timeout): +// t.Fatalf("Timed out waiting for callbacks") +// } +// } diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index 28b63f2..02daf3b 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -24,11 +24,19 @@ type channelId*: string config*: ReliabilityConfig lock*: Lock + # Nim callbacks (used internally or if not using C bindings) onMessageReady*: proc(messageId: MessageID) {.gcsafe.} onMessageSent*: proc(messageId: MessageID) {.gcsafe.} onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} onPeriodicSync*: PeriodicSyncCallback + # C callback pointers and user data (for FFI) + cMessageReadyCallback*: pointer + cMessageSentCallback*: pointer + cMissingDependenciesCallback*: pointer + cPeriodicSyncCallback*: pointer + cUserData*: pointer + ReliabilityError* = enum reInvalidArgument reOutOfMemory @@ -92,4 +100,4 @@ proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] = proc getIncomingBuffer*(rm: ReliabilityManager): seq[Message] = withLock rm.lock: - result = rm.incomingBuffer \ No newline at end of file + result = rm.incomingBuffer