diff --git a/bindings/bindings.h b/bindings/bindings.h index a450453..67fd179 100644 --- a/bindings/bindings.h +++ b/bindings/bindings.h @@ -39,11 +39,18 @@ typedef struct { // --- Callback Function Pointer Types --- -// Keep const char* here as these are inputs *to* the callback -typedef void (*MessageReadyCallback)(const char* messageID); -typedef void (*MessageSentCallback)(const char* messageID); -typedef void (*MissingDependenciesCallback)(const char* messageID, const char** missingDeps, size_t missingDepsCount); -typedef void (*PeriodicSyncCallback)(void* user_data); + +// Define event types (enum or constants) +typedef enum { + EVENT_MESSAGE_READY = 1, + EVENT_MESSAGE_SENT = 2, + EVENT_MISSING_DEPENDENCIES = 3, + EVENT_PERIODIC_SYNC = 4 +} CEventType; + +// Single callback type for all events +// Nim will call this, passing the handle and event-specific data +typedef void (*CEventCallback)(void* handle, CEventType eventType, void* data1, void* data2, size_t data3); // --- Core API Functions --- @@ -92,23 +99,17 @@ CUnwrapResult UnwrapReceivedMessage(void* handle, void* message, size_t messageL * @param count The number of message IDs in the array. * @return CResult indicating success or failure. */ -CResult MarkDependenciesMet(void* handle, char*** messageIDs, size_t count); +CResult MarkDependenciesMet(void* handle, char** messageIDs, size_t count); // Reverted to char** /** * @brief Registers callback functions. * @param handle The opaque handle (void*) of the instance. * @param messageReady Callback for when a message is ready. * @param messageSent Callback for when an outgoing message is acknowledged. - * @param missingDependencies Callback for when missing dependencies are detected. - * @param periodicSync Callback for periodic sync suggestions. - * @param user_data A pointer to user-defined data passed to callbacks. + * @param eventCallback The single callback function to handle all events. + * @param user_data A pointer to user-defined data (optional, could be managed in Go). */ -void RegisterCallbacks(void* handle, - void* messageReady, - void* messageSent, - void* missingDependencies, - void* periodicSync, - void* user_data); // Keep user_data, align with Nim proc +void RegisterCallback(void* handle, CEventCallback eventCallback, void* user_data); // Renamed and simplified /** * @brief Starts the background periodic tasks. diff --git a/bindings/bindings.nim b/bindings/bindings.nim index 27ddb91..04a0a44 100644 --- a/bindings/bindings.nim +++ b/bindings/bindings.nim @@ -1,13 +1,21 @@ -import std/[locks, typetraits] +import std/[locks, typetraits, tables] # Added tables import chronos import results import ../src/[reliability, reliability_utils, message] -# --- C Type Definitions --- - type CReliabilityManagerHandle* = pointer +type + # Callback Types (Imported from C Header) + CEventType* {.importc: "CEventType", header: "bindings.h", pure.} = enum + EVENT_MESSAGE_READY = 1, + EVENT_MESSAGE_SENT = 2, + EVENT_MISSING_DEPENDENCIES = 3, + EVENT_PERIODIC_SYNC = 4 + + CEventCallback* = proc(handle: pointer, eventType: CEventType, data1: pointer, data2: pointer, data3: csize_t) {.cdecl.} # Use csize_t + CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object is_ok*: bool error_message*: cstring @@ -15,20 +23,24 @@ type CWrapResult* {.importc: "CWrapResult", header: "bindings.h", bycopy.} = object base_result*: CResult message*: pointer - message_len*: csize + message_len*: csize_t CUnwrapResult* {.importc: "CUnwrapResult", header: "bindings.h", bycopy.} = object base_result*: CResult message*: pointer - message_len*: csize - missing_deps*: ptr ptr cstring - missing_deps_count*: csize + message_len*: csize_t + missing_deps*: ptr cstring + missing_deps_count*: csize_t - # Callback Types - CMessageReadyCallback* = proc (messageID: cstring) {.cdecl, gcsafe, raises: [].} - CMessageSentCallback* = proc (messageID: cstring) {.cdecl, gcsafe, raises: [].} - CMissingDependenciesCallback* = proc (messageID: cstring, missingDeps: ptr ptr cstring, missingDepsCount: csize) {.cdecl, gcsafe, raises: [].} - CPeriodicSyncCallback* = proc (user_data: pointer) {.cdecl, gcsafe, raises: [].} +# --- Callback Registry --- +type + CallbackRegistry = Table[CReliabilityManagerHandle, CEventCallback] + +var + callbackRegistry: CallbackRegistry + registryLock: Lock + +initLock(registryLock) # --- Memory Management Helpers --- @@ -37,22 +49,24 @@ proc allocCString*(s: string): cstring {.inline, gcsafe.} = result = cast[cstring](allocShared(s.len + 1)) copyMem(result, s.cstring, s.len + 1) -proc allocSeqByte*(s: seq[byte]): (pointer, csize) {.inline, gcsafe.} = +proc allocSeqByte*(s: seq[byte]): (pointer, csize_t) {.inline, gcsafe.} = if s.len == 0: return (nil, 0) let len = s.len let bufferPtr = allocShared(len) if len > 0: copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural) - return (bufferPtr, len.csize) + return (bufferPtr, len.csize_t) -proc allocSeqCString*(s: seq[string]): (ptr ptr cstring, csize) {.inline, gcsafe, cdecl.} = +proc allocSeqCString*(s: seq[string]): (ptr cstring, csize_t) {.inline, gcsafe, cdecl.} = if s.len == 0: return (nil, 0) let count = s.len - let arrPtr = cast[ptr ptr cstring](allocShared(count * sizeof(cstring))) + # Allocate memory for 'count' cstring pointers, cast to ptr UncheckedArray + let arrPtr = cast[ptr UncheckedArray[cstring]](allocShared(count * sizeof(cstring))) for i in 0.. C) --- -# These still accept the ReliabilityManager instance directly +# These wrappers call the single global Go callback relay. -# These wrappers now need to handle the user_data explicitly if needed, -# but the C callbacks themselves don't take it directly anymore (except PeriodicSync). -# The user_data is stored in rm.cUserData. +proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) = + echo "[Nim Binding] nimMessageReadyCallback called for: ", messageId + let handle = cast[CReliabilityManagerHandle](rm) + var cb: CEventCallback + withLock registryLock: + if not callbackRegistry.hasKey(handle): + echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) + return + cb = callbackRegistry[handle] + + # Pass handle, event type, and messageId (as data1) + cb(handle, EVENT_MESSAGE_READY, cast[pointer](messageId.cstring), nil, 0) -proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} = - let cbPtr = rm.cMessageReadyCallback - if cbPtr != nil: - let cb = cast[CMessageReadyCallback](cbPtr) - # Call the C callback without user_data, as per the updated typedef - cb(messageId.cstring) +proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) = + echo "[Nim Binding] nimMessageSentCallback called for: ", messageId + let handle = cast[CReliabilityManagerHandle](rm) + var cb: CEventCallback + withLock registryLock: + if not callbackRegistry.hasKey(handle): + echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) + return + cb = callbackRegistry[handle] + + cb(handle, EVENT_MESSAGE_SENT, cast[pointer](messageId.cstring), nil, 0) -proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} = - let cbPtr = rm.cMessageSentCallback - if cbPtr != nil: - let cb = cast[CMessageSentCallback](cbPtr) - # Call the C callback without user_data - cb(messageId.cstring) +proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) = + echo "[Nim Binding] nimMissingDependenciesCallback called for: ", messageId, " with deps: ", $missingDeps + let handle = cast[CReliabilityManagerHandle](rm) + var cb: CEventCallback + withLock registryLock: + if not callbackRegistry.hasKey(handle): + echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) + return + cb = callbackRegistry[handle] -proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = - let cbPtr = rm.cMissingDependenciesCallback - if cbPtr != nil: - var cDeps = newSeq[cstring](missingDeps.len) + # Prepare data for the callback + var cDepsPtr: ptr cstring = nil + var cDepsCount: csize_t = 0 + var cDepsNim: seq[cstring] = @[] # Keep Nim seq alive during call + if missingDeps.len > 0: + cDepsNim = newSeq[cstring](missingDeps.len) for i, dep in missingDeps: - cDeps[i] = dep.cstring - let cDepsPtr = if cDeps.len > 0: cDeps[0].addr else: nil - let cb = cast[CMissingDependenciesCallback](cbPtr) - # Call the C callback without user_data - cb(messageId.cstring, cast[ptr ptr cstring](cDepsPtr), missingDeps.len.csize) + cDepsNim[i] = dep.cstring # Nim GC manages these cstrings via the seq + cDepsPtr = cast[ptr cstring](cDepsNim[0].addr) + cDepsCount = missingDeps.len.csize_t -proc nimPeriodicSyncCallback(rm: ReliabilityManager) {.gcsafe.} = - let cbPtr = rm.cPeriodicSyncCallback - if cbPtr != nil: - let cb = cast[CPeriodicSyncCallback](cbPtr) - cb(rm.cUserData) + cb(handle, EVENT_MISSING_DEPENDENCIES, cast[pointer](messageId.cstring), cast[pointer](cDepsPtr), cDepsCount) -# --- Exported C Functions - Using Opaque Pointer (pointer/void*) --- +proc nimPeriodicSyncCallback(rm: ReliabilityManager) = + echo "[Nim Binding] nimPeriodicSyncCallback called" + let handle = cast[CReliabilityManagerHandle](rm) + var cb: CEventCallback + withLock registryLock: + if not callbackRegistry.hasKey(handle): + echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) + return + cb = callbackRegistry[handle] + + cb(handle, EVENT_PERIODIC_SYNC, nil, nil, 0) + +# --- Exported C Functions - Using Opaque Pointer --- proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} = let channelId = $channelIdCStr @@ -125,47 +167,49 @@ proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle { let rmResult = newReliabilityManager(channelId) if rmResult.isOk: let rm = rmResult.get() - # Initialize C callback fields to nil - rm.cMessageReadyCallback = nil - rm.cMessageSentCallback = nil - rm.cMissingDependenciesCallback = nil - rm.cPeriodicSyncCallback = nil - rm.cUserData = nil - # Assign Nim wrappers that capture the 'rm' instance directly - rm.onMessageReady = proc(msgId: MessageID) {.gcsafe.} = nimMessageReadyCallback(rm, msgId) - rm.onMessageSent = proc(msgId: MessageID) {.gcsafe.} = nimMessageSentCallback(rm, msgId) - rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) {.gcsafe.} = nimMissingDependenciesCallback(rm, msgId, deps) - rm.onPeriodicSync = proc() {.gcsafe.} = nimPeriodicSyncCallback(rm) + # Assign anonymous procs that capture 'rm' and call the wrappers + # Ensure signatures match the non-gcsafe fields in ReliabilityManager + rm.onMessageReady = proc(msgId: MessageID) = nimMessageReadyCallback(rm, msgId) + rm.onMessageSent = proc(msgId: MessageID) = nimMessageSentCallback(rm, msgId) + rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) = nimMissingDependenciesCallback(rm, msgId, deps) + rm.onPeriodicSync = proc() = nimPeriodicSyncCallback(rm) # Return the Nim ref object cast to the opaque pointer type - return cast[CReliabilityManagerHandle](rm) + let handle = cast[CReliabilityManagerHandle](rm) + GC_ref(rm) # Prevent GC from moving the object while Go holds the handle + return handle else: echo "Error creating ReliabilityManager: ", rmResult.error return nil # Return nil pointer -proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl, gcsafe.} = - if handle != nil: +proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} = + let handlePtr = handle + if handlePtr != nil: + # Go side should handle removing the handle from its registry. + # We just need to unref the Nim object. + # No need to interact with gEventCallback here. + # Cast opaque pointer back to Nim ref type - let rm = cast[ReliabilityManager](handle) + let rm = cast[ReliabilityManager](handlePtr) cleanup(rm) # Call Nim cleanup - # Nim GC will collect 'rm' eventually as the handle is the only reference + GC_unref(rm) # Allow GC to collect the object now that Go is done else: echo "Warning: CleanupReliabilityManager called with NULL handle" proc ResetReliabilityManager*(handle: CReliabilityManagerHandle): CResult {.exportc, dynlib, cdecl, gcsafe.} = if handle == nil: return toCResultErrStr("ReliabilityManager handle is NULL") - let rm = cast[ReliabilityManager](handle) # Cast opaque pointer + let rm = cast[ReliabilityManager](handle) let result = resetReliabilityManager(rm) if result.isOk: return toCResultOk() else: return toCResultErr(result.error) -proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl, gcsafe.} = +proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe if handle == nil: return CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) - let rm = cast[ReliabilityManager](handle) # Cast opaque pointer + let rm = cast[ReliabilityManager](handle) if messageC == nil and messageLen > 0: return CWrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0")) @@ -191,10 +235,10 @@ proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, else: return CWrapResult(base_result: toCResultErr(wrapResult.error)) -proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize): CUnwrapResult {.exportc, dynlib, cdecl, gcsafe.} = +proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t): CUnwrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe if handle == nil: return CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) - let rm = cast[ReliabilityManager](handle) # Cast opaque pointer + let rm = cast[ReliabilityManager](handle) if messageC == nil and messageLen > 0: return CUnwrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0")) @@ -221,17 +265,19 @@ proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer else: return CUnwrapResult(base_result: toCResultErr(unwrapResult.error)) -proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr ptr cstring, count: csize): CResult {.exportc, dynlib, cdecl, gcsafe.} = +proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr cstring, count: csize_t): CResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe if handle == nil: return toCResultErrStr("ReliabilityManager handle is NULL") - let rm = cast[ReliabilityManager](handle) # Cast opaque pointer + let rm = cast[ReliabilityManager](handle) if messageIDsC == nil and count > 0: return toCResultErrStr("MessageIDs pointer is NULL but count > 0") var messageIDsNim = newSeq[string](count) + # Cast to ptr UncheckedArray for indexing + let messageIDsCArray = cast[ptr UncheckedArray[cstring]](messageIDsC) for i in 0.. // For size_t +*/ +import "C" +import ( + "fmt" + "unsafe" +) + +// --- Go Callback Implementations (Exported to C) --- + +//export goMessageReadyCallback +func goMessageReadyCallback(messageID *C.char) { + msgIdStr := C.GoString(messageID) + registryMutex.RLock() + defer registryMutex.RUnlock() + + // Find the correct Go callback based on handle (this is tricky without handle passed) + // For now, iterate through all registered callbacks. This is NOT ideal for multiple managers. + // A better approach would involve passing the handle back through user_data if possible, + // or maintaining a single global callback handler if only one manager instance is expected. + // Let's assume a single instance for simplicity for now. + for _, callbacks := range callbackRegistry { + if callbacks != nil && callbacks.OnMessageReady != nil { + // Run in a goroutine to avoid blocking the C thread + go callbacks.OnMessageReady(MessageID(msgIdStr)) + } + } + fmt.Printf("Go: Message Ready: %s\n", msgIdStr) // Debug print +} + +//export goMessageSentCallback +func goMessageSentCallback(messageID *C.char) { + msgIdStr := C.GoString(messageID) + registryMutex.RLock() + defer registryMutex.RUnlock() + + for _, callbacks := range callbackRegistry { + if callbacks != nil && callbacks.OnMessageSent != nil { + go callbacks.OnMessageSent(MessageID(msgIdStr)) + } + } + fmt.Printf("Go: Message Sent: %s\n", msgIdStr) // Debug print +} + +//export goMissingDependenciesCallback +func goMissingDependenciesCallback(messageID *C.char, missingDeps **C.char, missingDepsCount C.size_t) { + msgIdStr := C.GoString(messageID) + deps := make([]MessageID, missingDepsCount) + if missingDepsCount > 0 { + // Convert C array of C strings to Go slice + cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(missingDeps))[:missingDepsCount:missingDepsCount] + for i, s := range cDepsArray { + deps[i] = MessageID(C.GoString(s)) + } + } + + registryMutex.RLock() + defer registryMutex.RUnlock() + + for _, callbacks := range callbackRegistry { + if callbacks != nil && callbacks.OnMissingDependencies != nil { + go callbacks.OnMissingDependencies(MessageID(msgIdStr), deps) + } + } + fmt.Printf("Go: Missing Deps for %s: %v\n", msgIdStr, deps) // Debug print +} + +//export goPeriodicSyncCallback +func goPeriodicSyncCallback() { + registryMutex.RLock() + defer registryMutex.RUnlock() + + for _, callbacks := range callbackRegistry { + if callbacks != nil && callbacks.OnPeriodicSync != nil { + go callbacks.OnPeriodicSync() + } + } + fmt.Println("Go: Periodic Sync Requested") // Debug print +} diff --git a/bindings/generated/libbindings.dylib b/bindings/generated/libbindings.dylib index 6c5302e..8fb1999 100755 Binary files a/bindings/generated/libbindings.dylib and b/bindings/generated/libbindings.dylib differ diff --git a/sds_wrapper.go b/sds_wrapper.go index 69204c4..7c1e523 100644 --- a/sds_wrapper.go +++ b/sds_wrapper.go @@ -8,12 +8,8 @@ package main #include // For C.free #include "bindings/bindings.h" // Update include path -// Forward declarations for Go callback functions exported to C -// These are the functions Nim will eventually call via the pointers we give it. -extern void goMessageReadyCallback(char* messageID); -extern void goMessageSentCallback(char* messageID); -extern void goMissingDependenciesCallback(char* messageID, char** missingDeps, size_t missingDepsCount); -extern void goPeriodicSyncCallback(); +// Forward declaration for the single Go callback relay function +extern void globalCallbackRelay(void* handle, CEventType eventType, void* data1, void* data2, size_t data3); // Helper function to call the C memory freeing functions static void callFreeCResultError(CResult res) { FreeCResultError(res); } @@ -45,7 +41,7 @@ type Callbacks struct { OnPeriodicSync func() } -// Global map to store callbacks associated with handles (necessary due to cgo limitations) +// Global map to store callbacks associated with handles var ( callbackRegistry = make(map[ReliabilityManagerHandle]*Callbacks) registryMutex sync.RWMutex @@ -148,7 +144,6 @@ func UnwrapReceivedMessage(handle ReliabilityManagerHandle, message []byte) ([]b } // Copy unwrapped message content - // Explicitly cast the message pointer to unsafe.Pointer unwrappedContent := C.GoBytes(unsafe.Pointer(cUnwrapResult.message), C.int(cUnwrapResult.message_len)) // Copy missing dependencies @@ -190,8 +185,8 @@ func MarkDependenciesMet(handle ReliabilityManagerHandle, messageIDs []MessageID cMessageIDsPtr = nil // Handle empty slice case } - // Pass the address of the pointer variable (&cMessageIDsPtr), which is of type ***C.char - cResult := C.MarkDependenciesMet(unsafe.Pointer(handle), &cMessageIDsPtr, C.size_t(len(messageIDs))) + // Pass the pointer variable (cMessageIDsPtr) directly, which is of type **C.char + cResult := C.MarkDependenciesMet(unsafe.Pointer(handle), cMessageIDsPtr, C.size_t(len(messageIDs))) if !cResult.is_ok { errMsg := C.GoString(cResult.error_message) @@ -201,29 +196,23 @@ func MarkDependenciesMet(handle ReliabilityManagerHandle, messageIDs []MessageID return nil } -// RegisterCallbacks sets the Go callback functions -func RegisterCallbacks(handle ReliabilityManagerHandle, callbacks Callbacks) error { +// RegisterCallback sets the single Go callback relay function +func RegisterCallback(handle ReliabilityManagerHandle, callbacks Callbacks) error { if handle == nil { return errors.New("handle is nil") } + // Store the Go callbacks associated with this handle registryMutex.Lock() callbackRegistry[handle] = &callbacks registryMutex.Unlock() - // Pass the C relay functions to Nim - // Nim will store these function pointers. When Nim calls them, they execute the C relay, - // Pass pointers to the exported Go functions directly. - // Nim expects function pointers matching the C callback typedefs. - // Cgo makes the exported Go functions available as C function pointers. - // Cast these function pointers to unsafe.Pointer to match the void* expected by the C function. - C.RegisterCallbacks( + // Register the single global Go relay function with the Nim library + // Nim will call globalCallbackRelay, passing the handle as the first argument. + C.RegisterCallback( unsafe.Pointer(handle), - unsafe.Pointer(C.goMessageReadyCallback), - unsafe.Pointer(C.goMessageSentCallback), - unsafe.Pointer(C.goMissingDependenciesCallback), - unsafe.Pointer(C.goPeriodicSyncCallback), - unsafe.Pointer(handle), // Pass handle as user_data + (C.CEventCallback)(C.globalCallbackRelay), // Pass the Go relay function pointer + nil, // user_data is not used here, handle is passed directly by Nim wrapper ) return nil } @@ -238,70 +227,54 @@ func StartPeriodicTasks(handle ReliabilityManagerHandle) error { return nil } -// --- Go Callback Implementations (Exported to C) --- - -func goMessageReadyCallback(messageID *C.char) { - msgIdStr := C.GoString(messageID) - registryMutex.RLock() - defer registryMutex.RUnlock() - - // Find the correct Go callback based on handle (this is tricky without handle passed) - // For now, iterate through all registered callbacks. This is NOT ideal for multiple managers. - // A better approach would involve passing the handle back through user_data if possible, - // or maintaining a single global callback handler if only one manager instance is expected. - // Let's assume a single instance for simplicity for now. - for _, callbacks := range callbackRegistry { - if callbacks != nil && callbacks.OnMessageReady != nil { - // Run in a goroutine to avoid blocking the C thread - go callbacks.OnMessageReady(MessageID(msgIdStr)) - } - } - fmt.Printf("Go: Message Ready: %s\n", msgIdStr) // Debug print -} - -func goMessageSentCallback(messageID *C.char) { - msgIdStr := C.GoString(messageID) - registryMutex.RLock() - defer registryMutex.RUnlock() - - for _, callbacks := range callbackRegistry { - if callbacks != nil && callbacks.OnMessageSent != nil { - go callbacks.OnMessageSent(MessageID(msgIdStr)) - } - } - fmt.Printf("Go: Message Sent: %s\n", msgIdStr) // Debug print -} - -func goMissingDependenciesCallback(messageID *C.char, missingDeps **C.char, missingDepsCount C.size_t) { - msgIdStr := C.GoString(messageID) - deps := make([]MessageID, missingDepsCount) - if missingDepsCount > 0 { - // Convert C array of C strings to Go slice - cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(missingDeps))[:missingDepsCount:missingDepsCount] - for i, s := range cDepsArray { - deps[i] = MessageID(C.GoString(s)) - } - } +// globalCallbackRelay is called by Nim for all events. +// It uses the handle to find the correct Go Callbacks struct and dispatch the call. +//export globalCallbackRelay +func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 unsafe.Pointer, data2 unsafe.Pointer, data3 C.size_t) { + goHandle := ReliabilityManagerHandle(handle) registryMutex.RLock() - defer registryMutex.RUnlock() + callbacks, ok := callbackRegistry[goHandle] + registryMutex.RUnlock() - for _, callbacks := range callbackRegistry { - if callbacks != nil && callbacks.OnMissingDependencies != nil { - go callbacks.OnMissingDependencies(MessageID(msgIdStr), deps) - } + if !ok || callbacks == nil { + fmt.Printf("Go: globalCallbackRelay: No callbacks registered for handle %v\n", goHandle) // Uncommented + return } - fmt.Printf("Go: Missing Deps for %s: %v\n", msgIdStr, deps) // Debug print -} -func goPeriodicSyncCallback() { - registryMutex.RLock() - defer registryMutex.RUnlock() - - for _, callbacks := range callbackRegistry { - if callbacks != nil && callbacks.OnPeriodicSync != nil { - go callbacks.OnPeriodicSync() + // Use a goroutine to avoid blocking the Nim thread + go func() { + switch eventType { + case C.EVENT_MESSAGE_READY: + if callbacks.OnMessageReady != nil { + msgIdStr := C.GoString((*C.char)(data1)) + callbacks.OnMessageReady(MessageID(msgIdStr)) + } + case C.EVENT_MESSAGE_SENT: + if callbacks.OnMessageSent != nil { + msgIdStr := C.GoString((*C.char)(data1)) + callbacks.OnMessageSent(MessageID(msgIdStr)) + } + case C.EVENT_MISSING_DEPENDENCIES: + if callbacks.OnMissingDependencies != nil { + msgIdStr := C.GoString((*C.char)(data1)) + depsCount := int(data3) + deps := make([]MessageID, depsCount) + if depsCount > 0 { + // Convert C array of C strings (**char) to Go slice + cDepsArray := (*[1 << 30]*C.char)(data2)[:depsCount:depsCount] + for i, s := range cDepsArray { + deps[i] = MessageID(C.GoString(s)) + } + } + callbacks.OnMissingDependencies(MessageID(msgIdStr), deps) + } + case C.EVENT_PERIODIC_SYNC: + if callbacks.OnPeriodicSync != nil { + callbacks.OnPeriodicSync() + } + default: + fmt.Printf("Go: globalCallbackRelay: Received unknown event type %d for handle %v\n", eventType, goHandle) } - } - fmt.Println("Go: Periodic Sync Requested") // Debug print + }() } diff --git a/sds_wrapper_test.go b/sds_wrapper_test.go index 4ef5edf..ee1185a 100644 --- a/sds_wrapper_test.go +++ b/sds_wrapper_test.go @@ -1,9 +1,9 @@ package main import ( - // "fmt" - // "sync" + "fmt" + "sync" "testing" - // "time" + "time" ) // Test basic creation, cleanup, and reset @@ -58,205 +58,202 @@ func TestWrapUnwrap(t *testing.T) { } } -// // Test dependency handling -// func TestDependencies(t *testing.T) { -// channelID := "test-deps" -// handle, err := NewReliabilityManager(channelID) -// if err != nil { -// t.Fatalf("NewReliabilityManager failed: %v", err) -// } -// defer CleanupReliabilityManager(handle) +// Test dependency handling +func TestDependencies(t *testing.T) { + channelID := "test-deps" + handle, err := NewReliabilityManager(channelID) + if err != nil { + t.Fatalf("NewReliabilityManager failed: %v", err) + } + defer CleanupReliabilityManager(handle) -// // 1. Send message 1 (will become a dependency) -// payload1 := []byte("message one") -// msgID1 := MessageID("msg-dep-1") -// wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1) -// if err != nil { -// t.Fatalf("WrapOutgoingMessage (1) failed: %v", err) -// } -// // Simulate receiving msg1 to add it to history (implicitly acknowledges it) -// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1) -// if err != nil { -// t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err) -// } + // 1. Send message 1 (will become a dependency) + payload1 := []byte("message one") + msgID1 := MessageID("msg-dep-1") + wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1) + if err != nil { + t.Fatalf("WrapOutgoingMessage (1) failed: %v", err) + } + // Simulate receiving msg1 to add it to history (implicitly acknowledges it) + _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1) + if err != nil { + t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err) + } -// // 2. Send message 2 (depends on message 1 implicitly via causal history) -// payload2 := []byte("message two") -// msgID2 := MessageID("msg-dep-2") -// wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2) -// if err != nil { -// t.Fatalf("WrapOutgoingMessage (2) failed: %v", err) -// } + // 2. Send message 2 (depends on message 1 implicitly via causal history) + payload2 := []byte("message two") + msgID2 := MessageID("msg-dep-2") + wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2) + if err != nil { + t.Fatalf("WrapOutgoingMessage (2) failed: %v", err) + } -// // 3. Create a new manager to simulate a different peer receiving msg2 without msg1 -// handle2, err := NewReliabilityManager(channelID) // Same channel ID -// if err != nil { -// t.Fatalf("NewReliabilityManager (2) failed: %v", err) -// } -// defer CleanupReliabilityManager(handle2) + // 3. Create a new manager to simulate a different peer receiving msg2 without msg1 + handle2, err := NewReliabilityManager(channelID) // Same channel ID + if err != nil { + t.Fatalf("NewReliabilityManager (2) failed: %v", err) + } + defer CleanupReliabilityManager(handle2) -// // 4. Unwrap message 2 on the second manager - should report msg1 as missing -// _, missingDeps, err := UnwrapReceivedMessage(handle2, wrappedMsg2) -// if err != nil { -// t.Fatalf("UnwrapReceivedMessage (2) on handle2 failed: %v", err) -// } + // 4. Unwrap message 2 on the second manager - should report msg1 as missing + _, missingDeps, err := UnwrapReceivedMessage(handle2, wrappedMsg2) + if err != nil { + t.Fatalf("UnwrapReceivedMessage (2) on handle2 failed: %v", err) + } -// if len(missingDeps) == 0 { -// t.Fatalf("Expected missing dependencies, got none") -// } -// foundDep1 := false -// for _, dep := range missingDeps { -// if dep == msgID1 { -// foundDep1 = true -// break -// } -// } -// if !foundDep1 { -// t.Errorf("Expected missing dependency %q, got %v", msgID1, missingDeps) -// } + if len(missingDeps) == 0 { + t.Fatalf("Expected missing dependencies, got none") + } + foundDep1 := false + for _, dep := range missingDeps { + if dep == msgID1 { + foundDep1 = true + break + } + } + if !foundDep1 { + t.Errorf("Expected missing dependency %q, got %v", msgID1, missingDeps) + } -// // 5. Mark the dependency as met -// err = MarkDependenciesMet(handle2, []MessageID{msgID1}) -// if err != nil { -// t.Fatalf("MarkDependenciesMet failed: %v", err) -// } + // 5. Mark the dependency as met + err = MarkDependenciesMet(handle2, []MessageID{msgID1}) + if err != nil { + t.Fatalf("MarkDependenciesMet failed: %v", err) + } +} -// // Ideally, we'd check if the message is now moved from an internal buffer, -// // but the current API doesn't expose buffer state. We rely on callbacks for this. -// } +// Test callbacks +func TestCallbacks(t *testing.T) { + channelID := "test-callbacks" + handle, err := NewReliabilityManager(channelID) + if err != nil { + t.Fatalf("NewReliabilityManager failed: %v", err) + } + defer CleanupReliabilityManager(handle) -// // Test callbacks -// func TestCallbacks(t *testing.T) { -// channelID := "test-callbacks" -// handle, err := NewReliabilityManager(channelID) -// if err != nil { -// t.Fatalf("NewReliabilityManager failed: %v", err) -// } -// defer CleanupReliabilityManager(handle) + var wg sync.WaitGroup + receivedReady := make(map[MessageID]bool) + receivedSent := make(map[MessageID]bool) + receivedMissing := make(map[MessageID][]MessageID) + syncRequested := false + var cbMutex sync.Mutex // Protect access to callback tracking maps/vars -// var wg sync.WaitGroup -// receivedReady := make(map[MessageID]bool) -// receivedSent := make(map[MessageID]bool) -// receivedMissing := make(map[MessageID][]MessageID) -// syncRequested := false -// var cbMutex sync.Mutex // Protect access to callback tracking maps/vars + callbacks := Callbacks{ + OnMessageReady: func(messageId MessageID) { + fmt.Printf("Test: OnMessageReady received: %s\n", messageId) + cbMutex.Lock() + receivedReady[messageId] = true + cbMutex.Unlock() + wg.Done() + }, + OnMessageSent: func(messageId MessageID) { + fmt.Printf("Test: OnMessageSent received: %s\n", messageId) + cbMutex.Lock() + receivedSent[messageId] = true + cbMutex.Unlock() + wg.Done() + }, + OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { + fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps) + cbMutex.Lock() + receivedMissing[messageId] = missingDeps + cbMutex.Unlock() + wg.Done() + }, + OnPeriodicSync: func() { + fmt.Println("Test: OnPeriodicSync received") + cbMutex.Lock() + syncRequested = true + cbMutex.Unlock() + // Don't wg.Done() here, it might be called multiple times + }, + } -// callbacks := Callbacks{ -// OnMessageReady: func(messageId MessageID) { -// fmt.Printf("Test: OnMessageReady received: %s\n", messageId) -// cbMutex.Lock() -// receivedReady[messageId] = true -// cbMutex.Unlock() -// wg.Done() -// }, -// OnMessageSent: func(messageId MessageID) { -// fmt.Printf("Test: OnMessageSent received: %s\n", messageId) -// cbMutex.Lock() -// receivedSent[messageId] = true -// cbMutex.Unlock() -// wg.Done() -// }, -// OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { -// fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps) -// cbMutex.Lock() -// receivedMissing[messageId] = missingDeps -// cbMutex.Unlock() -// wg.Done() -// }, -// OnPeriodicSync: func() { -// fmt.Println("Test: OnPeriodicSync received") -// cbMutex.Lock() -// syncRequested = true -// cbMutex.Unlock() -// // Don't wg.Done() here, it might be called multiple times -// }, -// } + err = RegisterCallback(handle, callbacks) + if err != nil { + t.Fatalf("RegisterCallback failed: %v", err) + } -// err = RegisterCallbacks(handle, callbacks) -// if err != nil { -// t.Fatalf("RegisterCallbacks failed: %v", err) -// } + // Start tasks AFTER registering callbacks + err = StartPeriodicTasks(handle) + if err != nil { + t.Fatalf("StartPeriodicTasks failed: %v", err) + } -// // Start tasks AFTER registering callbacks -// err = StartPeriodicTasks(handle) -// if err != nil { -// t.Fatalf("StartPeriodicTasks failed: %v", err) -// } + // --- Test Scenario --- -// // --- Test Scenario --- + // 1. Send msg1 + wg.Add(1) // Expect OnMessageSent for msg1 eventually + payload1 := []byte("callback test 1") + msgID1 := MessageID("cb-msg-1") + wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1) + if err != nil { + t.Fatalf("WrapOutgoingMessage (1) failed: %v", err) + } -// // 1. Send msg1 -// wg.Add(1) // Expect OnMessageSent for msg1 eventually -// payload1 := []byte("callback test 1") -// msgID1 := MessageID("cb-msg-1") -// wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1) -// if err != nil { -// t.Fatalf("WrapOutgoingMessage (1) failed: %v", err) -// } + // 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history) + wg.Add(1) // Expect OnMessageReady for msg1 + _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1) + if err != nil { + t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err) + } -// // 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history) -// wg.Add(1) // Expect OnMessageReady for msg1 -// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1) -// if err != nil { -// t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err) -// } + // 3. Send msg2 (depends on msg1) + wg.Add(1) // Expect OnMessageSent for msg2 eventually + payload2 := []byte("callback test 2") + msgID2 := MessageID("cb-msg-2") + wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2) + if err != nil { + t.Fatalf("WrapOutgoingMessage (2) failed: %v", err) + } -// // 3. Send msg2 (depends on msg1) -// wg.Add(1) // Expect OnMessageSent for msg2 eventually -// payload2 := []byte("callback test 2") -// msgID2 := MessageID("cb-msg-2") -// wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2) -// if err != nil { -// t.Fatalf("WrapOutgoingMessage (2) failed: %v", err) -// } + // 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2) + wg.Add(1) // Expect OnMessageReady for msg2 + _, _, err = UnwrapReceivedMessage(handle, wrappedMsg2) + if err != nil { + t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err) + } -// // 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2) -// wg.Add(1) // Expect OnMessageReady for msg2 -// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg2) -// if err != nil { -// t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err) -// } + // --- Verification --- + // Wait for expected callbacks with a timeout + waitTimeout(&wg, 5*time.Second, t) -// // --- Verification --- -// // Wait for expected callbacks with a timeout -// waitTimeout(&wg, 5*time.Second, t) + cbMutex.Lock() + defer cbMutex.Unlock() -// cbMutex.Lock() -// defer cbMutex.Unlock() + if !receivedReady[msgID1] { + t.Errorf("OnMessageReady not called for %s", msgID1) + } + if !receivedReady[msgID2] { + t.Errorf("OnMessageReady not called for %s", msgID2) + } + if !receivedSent[msgID1] { + t.Errorf("OnMessageSent not called for %s", msgID1) + } + if !receivedSent[msgID2] { + t.Errorf("OnMessageSent not called for %s", msgID2) + } + // We didn't explicitly test missing deps in this path + if len(receivedMissing) > 0 { + t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing) + } + // Periodic sync is harder to guarantee in a short test, just check if it was ever true + if !syncRequested { + t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout") + } +} -// if !receivedReady[msgID1] { -// t.Errorf("OnMessageReady not called for %s", msgID1) -// } -// if !receivedReady[msgID2] { -// t.Errorf("OnMessageReady not called for %s", msgID2) -// } -// if !receivedSent[msgID1] { -// t.Errorf("OnMessageSent not called for %s", msgID1) -// } -// if !receivedSent[msgID2] { -// t.Errorf("OnMessageSent not called for %s", msgID2) -// } -// // We didn't explicitly test missing deps in this path -// if len(receivedMissing) > 0 { -// t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing) -// } -// // Periodic sync is harder to guarantee in a short test, just check if it was ever true -// if !syncRequested { -// t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout") -// } -// } - -// // Helper function to wait for WaitGroup with a timeout -// func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) { -// c := make(chan struct{}) -// go func() { -// defer close(c) -// wg.Wait() -// }() -// select { -// case <-c: -// // Completed normally -// case <-time.After(timeout): -// t.Fatalf("Timed out waiting for callbacks") -// } -// } +// Helper function to wait for WaitGroup with a timeout +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + // Completed normally + case <-time.After(timeout): + t.Fatalf("Timed out waiting for callbacks") + } +} diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index 02daf3b..33b47e6 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -24,18 +24,10 @@ type channelId*: string config*: ReliabilityConfig lock*: Lock - # Nim callbacks (used internally or if not using C bindings) - onMessageReady*: proc(messageId: MessageID) {.gcsafe.} - onMessageSent*: proc(messageId: MessageID) {.gcsafe.} - onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} - onPeriodicSync*: PeriodicSyncCallback - - # C callback pointers and user data (for FFI) - cMessageReadyCallback*: pointer - cMessageSentCallback*: pointer - cMissingDependenciesCallback*: pointer - cPeriodicSyncCallback*: pointer - cUserData*: pointer + onMessageReady*: proc(messageId: MessageID) + onMessageSent*: proc(messageId: MessageID) + onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) + onPeriodicSync*: proc() ReliabilityError* = enum reInvalidArgument