debug: callbacks issue

This commit is contained in:
shash256 2025-04-07 17:19:44 +05:30
parent edfd257fca
commit 6ad01d1aa9
7 changed files with 473 additions and 393 deletions

View File

@ -39,11 +39,18 @@ typedef struct {
// --- Callback Function Pointer Types ---
// Keep const char* here as these are inputs *to* the callback
typedef void (*MessageReadyCallback)(const char* messageID);
typedef void (*MessageSentCallback)(const char* messageID);
typedef void (*MissingDependenciesCallback)(const char* messageID, const char** missingDeps, size_t missingDepsCount);
typedef void (*PeriodicSyncCallback)(void* user_data);
// Define event types (enum or constants)
typedef enum {
EVENT_MESSAGE_READY = 1,
EVENT_MESSAGE_SENT = 2,
EVENT_MISSING_DEPENDENCIES = 3,
EVENT_PERIODIC_SYNC = 4
} CEventType;
// Single callback type for all events
// Nim will call this, passing the handle and event-specific data
typedef void (*CEventCallback)(void* handle, CEventType eventType, void* data1, void* data2, size_t data3);
// --- Core API Functions ---
@ -92,23 +99,17 @@ CUnwrapResult UnwrapReceivedMessage(void* handle, void* message, size_t messageL
* @param count The number of message IDs in the array.
* @return CResult indicating success or failure.
*/
CResult MarkDependenciesMet(void* handle, char*** messageIDs, size_t count);
CResult MarkDependenciesMet(void* handle, char** messageIDs, size_t count); // Reverted to char**
/**
* @brief Registers callback functions.
* @param handle The opaque handle (void*) of the instance.
* @param messageReady Callback for when a message is ready.
* @param messageSent Callback for when an outgoing message is acknowledged.
* @param missingDependencies Callback for when missing dependencies are detected.
* @param periodicSync Callback for periodic sync suggestions.
* @param user_data A pointer to user-defined data passed to callbacks.
* @param eventCallback The single callback function to handle all events.
* @param user_data A pointer to user-defined data (optional, could be managed in Go).
*/
void RegisterCallbacks(void* handle,
void* messageReady,
void* messageSent,
void* missingDependencies,
void* periodicSync,
void* user_data); // Keep user_data, align with Nim proc
void RegisterCallback(void* handle, CEventCallback eventCallback, void* user_data); // Renamed and simplified
/**
* @brief Starts the background periodic tasks.

View File

@ -1,13 +1,21 @@
import std/[locks, typetraits]
import std/[locks, typetraits, tables] # Added tables
import chronos
import results
import ../src/[reliability, reliability_utils, message]
# --- C Type Definitions ---
type
CReliabilityManagerHandle* = pointer
type
# Callback Types (Imported from C Header)
CEventType* {.importc: "CEventType", header: "bindings.h", pure.} = enum
EVENT_MESSAGE_READY = 1,
EVENT_MESSAGE_SENT = 2,
EVENT_MISSING_DEPENDENCIES = 3,
EVENT_PERIODIC_SYNC = 4
CEventCallback* = proc(handle: pointer, eventType: CEventType, data1: pointer, data2: pointer, data3: csize_t) {.cdecl.} # Use csize_t
CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object
is_ok*: bool
error_message*: cstring
@ -15,20 +23,24 @@ type
CWrapResult* {.importc: "CWrapResult", header: "bindings.h", bycopy.} = object
base_result*: CResult
message*: pointer
message_len*: csize
message_len*: csize_t
CUnwrapResult* {.importc: "CUnwrapResult", header: "bindings.h", bycopy.} = object
base_result*: CResult
message*: pointer
message_len*: csize
missing_deps*: ptr ptr cstring
missing_deps_count*: csize
message_len*: csize_t
missing_deps*: ptr cstring
missing_deps_count*: csize_t
# Callback Types
CMessageReadyCallback* = proc (messageID: cstring) {.cdecl, gcsafe, raises: [].}
CMessageSentCallback* = proc (messageID: cstring) {.cdecl, gcsafe, raises: [].}
CMissingDependenciesCallback* = proc (messageID: cstring, missingDeps: ptr ptr cstring, missingDepsCount: csize) {.cdecl, gcsafe, raises: [].}
CPeriodicSyncCallback* = proc (user_data: pointer) {.cdecl, gcsafe, raises: [].}
# --- Callback Registry ---
type
CallbackRegistry = Table[CReliabilityManagerHandle, CEventCallback]
var
callbackRegistry: CallbackRegistry
registryLock: Lock
initLock(registryLock)
# --- Memory Management Helpers ---
@ -37,22 +49,24 @@ proc allocCString*(s: string): cstring {.inline, gcsafe.} =
result = cast[cstring](allocShared(s.len + 1))
copyMem(result, s.cstring, s.len + 1)
proc allocSeqByte*(s: seq[byte]): (pointer, csize) {.inline, gcsafe.} =
proc allocSeqByte*(s: seq[byte]): (pointer, csize_t) {.inline, gcsafe.} =
if s.len == 0: return (nil, 0)
let len = s.len
let bufferPtr = allocShared(len)
if len > 0:
copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural)
return (bufferPtr, len.csize)
return (bufferPtr, len.csize_t)
proc allocSeqCString*(s: seq[string]): (ptr ptr cstring, csize) {.inline, gcsafe, cdecl.} =
proc allocSeqCString*(s: seq[string]): (ptr cstring, csize_t) {.inline, gcsafe, cdecl.} =
if s.len == 0: return (nil, 0)
let count = s.len
let arrPtr = cast[ptr ptr cstring](allocShared(count * sizeof(cstring)))
# Allocate memory for 'count' cstring pointers, cast to ptr UncheckedArray
let arrPtr = cast[ptr UncheckedArray[cstring]](allocShared(count * sizeof(cstring)))
for i in 0..<count:
let tempCStr: cstring = allocCString(s[i])
copyMem(addr arrPtr[i], addr tempCStr, sizeof(cstring))
return (arrPtr, count.csize)
# Allocate each string and store its pointer in the array using unchecked array indexing
arrPtr[i] = allocCString(s[i])
# Return pointer to the first element, cast back to ptr cstring
return (cast[ptr cstring](arrPtr), count.csize_t)
proc freeCString*(cs: cstring) {.inline, gcsafe.} =
if cs != nil: deallocShared(cs)
@ -60,11 +74,14 @@ proc freeCString*(cs: cstring) {.inline, gcsafe.} =
proc freeSeqByte*(bufferPtr: pointer) {.inline, gcsafe, cdecl.} =
if bufferPtr != nil: deallocShared(bufferPtr)
proc freeSeqCString*(arrPtr: ptr ptr cstring, count: csize) {.inline, gcsafe, cdecl.} =
# Corrected to accept ptr cstring
proc freeSeqCString*(arrPtr: ptr cstring, count: csize_t) {.inline, gcsafe, cdecl.} =
if arrPtr != nil:
# Cast to ptr UncheckedArray for proper iteration/indexing before freeing
let arr = cast[ptr UncheckedArray[cstring]](arrPtr)
for i in 0..<count:
freeCString(cast[cstring](arrPtr[i]))
deallocShared(arrPtr)
freeCString(arr[i]) # Free each individual cstring
deallocShared(arrPtr) # Free the array pointer itself
# --- Result Conversion Helpers ---
@ -78,44 +95,69 @@ proc toCResultErrStr*(errMsg: string): CResult =
CResult(is_ok: false, error_message: allocCString(errMsg))
# --- Callback Wrappers (Nim -> C) ---
# These still accept the ReliabilityManager instance directly
# These wrappers call the single global Go callback relay.
# These wrappers now need to handle the user_data explicitly if needed,
# but the C callbacks themselves don't take it directly anymore (except PeriodicSync).
# The user_data is stored in rm.cUserData.
proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) =
echo "[Nim Binding] nimMessageReadyCallback called for: ", messageId
let handle = cast[CReliabilityManagerHandle](rm)
var cb: CEventCallback
withLock registryLock:
if not callbackRegistry.hasKey(handle):
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle)
return
cb = callbackRegistry[handle]
# Pass handle, event type, and messageId (as data1)
cb(handle, EVENT_MESSAGE_READY, cast[pointer](messageId.cstring), nil, 0)
proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
let cbPtr = rm.cMessageReadyCallback
if cbPtr != nil:
let cb = cast[CMessageReadyCallback](cbPtr)
# Call the C callback without user_data, as per the updated typedef
cb(messageId.cstring)
proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) =
echo "[Nim Binding] nimMessageSentCallback called for: ", messageId
let handle = cast[CReliabilityManagerHandle](rm)
var cb: CEventCallback
withLock registryLock:
if not callbackRegistry.hasKey(handle):
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle)
return
cb = callbackRegistry[handle]
cb(handle, EVENT_MESSAGE_SENT, cast[pointer](messageId.cstring), nil, 0)
proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
let cbPtr = rm.cMessageSentCallback
if cbPtr != nil:
let cb = cast[CMessageSentCallback](cbPtr)
# Call the C callback without user_data
cb(messageId.cstring)
proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) =
echo "[Nim Binding] nimMissingDependenciesCallback called for: ", messageId, " with deps: ", $missingDeps
let handle = cast[CReliabilityManagerHandle](rm)
var cb: CEventCallback
withLock registryLock:
if not callbackRegistry.hasKey(handle):
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle)
return
cb = callbackRegistry[handle]
proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
let cbPtr = rm.cMissingDependenciesCallback
if cbPtr != nil:
var cDeps = newSeq[cstring](missingDeps.len)
# Prepare data for the callback
var cDepsPtr: ptr cstring = nil
var cDepsCount: csize_t = 0
var cDepsNim: seq[cstring] = @[] # Keep Nim seq alive during call
if missingDeps.len > 0:
cDepsNim = newSeq[cstring](missingDeps.len)
for i, dep in missingDeps:
cDeps[i] = dep.cstring
let cDepsPtr = if cDeps.len > 0: cDeps[0].addr else: nil
let cb = cast[CMissingDependenciesCallback](cbPtr)
# Call the C callback without user_data
cb(messageId.cstring, cast[ptr ptr cstring](cDepsPtr), missingDeps.len.csize)
cDepsNim[i] = dep.cstring # Nim GC manages these cstrings via the seq
cDepsPtr = cast[ptr cstring](cDepsNim[0].addr)
cDepsCount = missingDeps.len.csize_t
proc nimPeriodicSyncCallback(rm: ReliabilityManager) {.gcsafe.} =
let cbPtr = rm.cPeriodicSyncCallback
if cbPtr != nil:
let cb = cast[CPeriodicSyncCallback](cbPtr)
cb(rm.cUserData)
cb(handle, EVENT_MISSING_DEPENDENCIES, cast[pointer](messageId.cstring), cast[pointer](cDepsPtr), cDepsCount)
# --- Exported C Functions - Using Opaque Pointer (pointer/void*) ---
proc nimPeriodicSyncCallback(rm: ReliabilityManager) =
echo "[Nim Binding] nimPeriodicSyncCallback called"
let handle = cast[CReliabilityManagerHandle](rm)
var cb: CEventCallback
withLock registryLock:
if not callbackRegistry.hasKey(handle):
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle)
return
cb = callbackRegistry[handle]
cb(handle, EVENT_PERIODIC_SYNC, nil, nil, 0)
# --- Exported C Functions - Using Opaque Pointer ---
proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} =
let channelId = $channelIdCStr
@ -125,47 +167,49 @@ proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {
let rmResult = newReliabilityManager(channelId)
if rmResult.isOk:
let rm = rmResult.get()
# Initialize C callback fields to nil
rm.cMessageReadyCallback = nil
rm.cMessageSentCallback = nil
rm.cMissingDependenciesCallback = nil
rm.cPeriodicSyncCallback = nil
rm.cUserData = nil
# Assign Nim wrappers that capture the 'rm' instance directly
rm.onMessageReady = proc(msgId: MessageID) {.gcsafe.} = nimMessageReadyCallback(rm, msgId)
rm.onMessageSent = proc(msgId: MessageID) {.gcsafe.} = nimMessageSentCallback(rm, msgId)
rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) {.gcsafe.} = nimMissingDependenciesCallback(rm, msgId, deps)
rm.onPeriodicSync = proc() {.gcsafe.} = nimPeriodicSyncCallback(rm)
# Assign anonymous procs that capture 'rm' and call the wrappers
# Ensure signatures match the non-gcsafe fields in ReliabilityManager
rm.onMessageReady = proc(msgId: MessageID) = nimMessageReadyCallback(rm, msgId)
rm.onMessageSent = proc(msgId: MessageID) = nimMessageSentCallback(rm, msgId)
rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) = nimMissingDependenciesCallback(rm, msgId, deps)
rm.onPeriodicSync = proc() = nimPeriodicSyncCallback(rm)
# Return the Nim ref object cast to the opaque pointer type
return cast[CReliabilityManagerHandle](rm)
let handle = cast[CReliabilityManagerHandle](rm)
GC_ref(rm) # Prevent GC from moving the object while Go holds the handle
return handle
else:
echo "Error creating ReliabilityManager: ", rmResult.error
return nil # Return nil pointer
proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl, gcsafe.} =
if handle != nil:
proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} =
let handlePtr = handle
if handlePtr != nil:
# Go side should handle removing the handle from its registry.
# We just need to unref the Nim object.
# No need to interact with gEventCallback here.
# Cast opaque pointer back to Nim ref type
let rm = cast[ReliabilityManager](handle)
let rm = cast[ReliabilityManager](handlePtr)
cleanup(rm) # Call Nim cleanup
# Nim GC will collect 'rm' eventually as the handle is the only reference
GC_unref(rm) # Allow GC to collect the object now that Go is done
else:
echo "Warning: CleanupReliabilityManager called with NULL handle"
proc ResetReliabilityManager*(handle: CReliabilityManagerHandle): CResult {.exportc, dynlib, cdecl, gcsafe.} =
if handle == nil:
return toCResultErrStr("ReliabilityManager handle is NULL")
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
let rm = cast[ReliabilityManager](handle)
let result = resetReliabilityManager(rm)
if result.isOk:
return toCResultOk()
else:
return toCResultErr(result.error)
proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl, gcsafe.} =
proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe
if handle == nil:
return CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
let rm = cast[ReliabilityManager](handle)
if messageC == nil and messageLen > 0:
return CWrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0"))
@ -191,10 +235,10 @@ proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer,
else:
return CWrapResult(base_result: toCResultErr(wrapResult.error))
proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize): CUnwrapResult {.exportc, dynlib, cdecl, gcsafe.} =
proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t): CUnwrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe
if handle == nil:
return CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
let rm = cast[ReliabilityManager](handle)
if messageC == nil and messageLen > 0:
return CUnwrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0"))
@ -221,17 +265,19 @@ proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer
else:
return CUnwrapResult(base_result: toCResultErr(unwrapResult.error))
proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr ptr cstring, count: csize): CResult {.exportc, dynlib, cdecl, gcsafe.} =
proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr cstring, count: csize_t): CResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe
if handle == nil:
return toCResultErrStr("ReliabilityManager handle is NULL")
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
let rm = cast[ReliabilityManager](handle)
if messageIDsC == nil and count > 0:
return toCResultErrStr("MessageIDs pointer is NULL but count > 0")
var messageIDsNim = newSeq[string](count)
# Cast to ptr UncheckedArray for indexing
let messageIDsCArray = cast[ptr UncheckedArray[cstring]](messageIDsC)
for i in 0..<count:
let currentCStr = cast[cstring](messageIDsC[i])
let currentCStr = messageIDsCArray[i] # Use unchecked array indexing
if currentCStr != nil:
messageIDsNim[i] = $currentCStr
else:
@ -243,32 +289,21 @@ proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr pt
else:
return toCResultErr(result.error)
proc RegisterCallbacks*(handle: CReliabilityManagerHandle,
cMessageReady: pointer,
cMessageSent: pointer,
cMissingDependencies: pointer,
cPeriodicSync: pointer,
cUserDataPtr: pointer) {.exportc, dynlib, cdecl, gcsafe.} =
if handle == nil:
echo "Error: Cannot register callbacks: NULL ReliabilityManager handle"
return
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
# Lock the specific manager instance while modifying its fields
withLock rm.lock:
rm.cMessageReadyCallback = cMessageReady
rm.cMessageSentCallback = cMessageSent
rm.cMissingDependenciesCallback = cMissingDependencies
rm.cPeriodicSyncCallback = cPeriodicSync
rm.cUserData = cUserDataPtr
proc RegisterCallback*(handle: CReliabilityManagerHandle,
cEventCallback: CEventCallback,
cUserDataPtr: pointer) {.exportc, dynlib, cdecl.} =
withLock registryLock:
callbackRegistry[handle] = cEventCallback
echo "[Nim Binding] Registered callback for handle: ", cast[int](handle)
proc StartPeriodicTasks*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl, gcsafe.} =
proc StartPeriodicTasks*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} =
if handle == nil:
echo "Error: Cannot start periodic tasks: NULL ReliabilityManager handle"
return
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
let rm = cast[ReliabilityManager](handle)
startPeriodicTasks(rm)
# --- Memory Freeing Functions - Added cdecl ---
# --- Memory Freeing Functions ---
proc FreeCResultError*(result: CResult) {.exportc, dynlib, gcsafe, cdecl.} =
freeCString(result.error_message)

82
bindings/callbacks.go Normal file
View File

@ -0,0 +1,82 @@
package bindings
/*
#include <stddef.h> // For size_t
*/
import "C"
import (
"fmt"
"unsafe"
)
// --- Go Callback Implementations (Exported to C) ---
//export goMessageReadyCallback
func goMessageReadyCallback(messageID *C.char) {
msgIdStr := C.GoString(messageID)
registryMutex.RLock()
defer registryMutex.RUnlock()
// Find the correct Go callback based on handle (this is tricky without handle passed)
// For now, iterate through all registered callbacks. This is NOT ideal for multiple managers.
// A better approach would involve passing the handle back through user_data if possible,
// or maintaining a single global callback handler if only one manager instance is expected.
// Let's assume a single instance for simplicity for now.
for _, callbacks := range callbackRegistry {
if callbacks != nil && callbacks.OnMessageReady != nil {
// Run in a goroutine to avoid blocking the C thread
go callbacks.OnMessageReady(MessageID(msgIdStr))
}
}
fmt.Printf("Go: Message Ready: %s\n", msgIdStr) // Debug print
}
//export goMessageSentCallback
func goMessageSentCallback(messageID *C.char) {
msgIdStr := C.GoString(messageID)
registryMutex.RLock()
defer registryMutex.RUnlock()
for _, callbacks := range callbackRegistry {
if callbacks != nil && callbacks.OnMessageSent != nil {
go callbacks.OnMessageSent(MessageID(msgIdStr))
}
}
fmt.Printf("Go: Message Sent: %s\n", msgIdStr) // Debug print
}
//export goMissingDependenciesCallback
func goMissingDependenciesCallback(messageID *C.char, missingDeps **C.char, missingDepsCount C.size_t) {
msgIdStr := C.GoString(messageID)
deps := make([]MessageID, missingDepsCount)
if missingDepsCount > 0 {
// Convert C array of C strings to Go slice
cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(missingDeps))[:missingDepsCount:missingDepsCount]
for i, s := range cDepsArray {
deps[i] = MessageID(C.GoString(s))
}
}
registryMutex.RLock()
defer registryMutex.RUnlock()
for _, callbacks := range callbackRegistry {
if callbacks != nil && callbacks.OnMissingDependencies != nil {
go callbacks.OnMissingDependencies(MessageID(msgIdStr), deps)
}
}
fmt.Printf("Go: Missing Deps for %s: %v\n", msgIdStr, deps) // Debug print
}
//export goPeriodicSyncCallback
func goPeriodicSyncCallback() {
registryMutex.RLock()
defer registryMutex.RUnlock()
for _, callbacks := range callbackRegistry {
if callbacks != nil && callbacks.OnPeriodicSync != nil {
go callbacks.OnPeriodicSync()
}
}
fmt.Println("Go: Periodic Sync Requested") // Debug print
}

Binary file not shown.

View File

@ -8,12 +8,8 @@ package main
#include <stdlib.h> // For C.free
#include "bindings/bindings.h" // Update include path
// Forward declarations for Go callback functions exported to C
// These are the functions Nim will eventually call via the pointers we give it.
extern void goMessageReadyCallback(char* messageID);
extern void goMessageSentCallback(char* messageID);
extern void goMissingDependenciesCallback(char* messageID, char** missingDeps, size_t missingDepsCount);
extern void goPeriodicSyncCallback();
// Forward declaration for the single Go callback relay function
extern void globalCallbackRelay(void* handle, CEventType eventType, void* data1, void* data2, size_t data3);
// Helper function to call the C memory freeing functions
static void callFreeCResultError(CResult res) { FreeCResultError(res); }
@ -45,7 +41,7 @@ type Callbacks struct {
OnPeriodicSync func()
}
// Global map to store callbacks associated with handles (necessary due to cgo limitations)
// Global map to store callbacks associated with handles
var (
callbackRegistry = make(map[ReliabilityManagerHandle]*Callbacks)
registryMutex sync.RWMutex
@ -148,7 +144,6 @@ func UnwrapReceivedMessage(handle ReliabilityManagerHandle, message []byte) ([]b
}
// Copy unwrapped message content
// Explicitly cast the message pointer to unsafe.Pointer
unwrappedContent := C.GoBytes(unsafe.Pointer(cUnwrapResult.message), C.int(cUnwrapResult.message_len))
// Copy missing dependencies
@ -190,8 +185,8 @@ func MarkDependenciesMet(handle ReliabilityManagerHandle, messageIDs []MessageID
cMessageIDsPtr = nil // Handle empty slice case
}
// Pass the address of the pointer variable (&cMessageIDsPtr), which is of type ***C.char
cResult := C.MarkDependenciesMet(unsafe.Pointer(handle), &cMessageIDsPtr, C.size_t(len(messageIDs)))
// Pass the pointer variable (cMessageIDsPtr) directly, which is of type **C.char
cResult := C.MarkDependenciesMet(unsafe.Pointer(handle), cMessageIDsPtr, C.size_t(len(messageIDs)))
if !cResult.is_ok {
errMsg := C.GoString(cResult.error_message)
@ -201,29 +196,23 @@ func MarkDependenciesMet(handle ReliabilityManagerHandle, messageIDs []MessageID
return nil
}
// RegisterCallbacks sets the Go callback functions
func RegisterCallbacks(handle ReliabilityManagerHandle, callbacks Callbacks) error {
// RegisterCallback sets the single Go callback relay function
func RegisterCallback(handle ReliabilityManagerHandle, callbacks Callbacks) error {
if handle == nil {
return errors.New("handle is nil")
}
// Store the Go callbacks associated with this handle
registryMutex.Lock()
callbackRegistry[handle] = &callbacks
registryMutex.Unlock()
// Pass the C relay functions to Nim
// Nim will store these function pointers. When Nim calls them, they execute the C relay,
// Pass pointers to the exported Go functions directly.
// Nim expects function pointers matching the C callback typedefs.
// Cgo makes the exported Go functions available as C function pointers.
// Cast these function pointers to unsafe.Pointer to match the void* expected by the C function.
C.RegisterCallbacks(
// Register the single global Go relay function with the Nim library
// Nim will call globalCallbackRelay, passing the handle as the first argument.
C.RegisterCallback(
unsafe.Pointer(handle),
unsafe.Pointer(C.goMessageReadyCallback),
unsafe.Pointer(C.goMessageSentCallback),
unsafe.Pointer(C.goMissingDependenciesCallback),
unsafe.Pointer(C.goPeriodicSyncCallback),
unsafe.Pointer(handle), // Pass handle as user_data
(C.CEventCallback)(C.globalCallbackRelay), // Pass the Go relay function pointer
nil, // user_data is not used here, handle is passed directly by Nim wrapper
)
return nil
}
@ -238,70 +227,54 @@ func StartPeriodicTasks(handle ReliabilityManagerHandle) error {
return nil
}
// --- Go Callback Implementations (Exported to C) ---
func goMessageReadyCallback(messageID *C.char) {
msgIdStr := C.GoString(messageID)
registryMutex.RLock()
defer registryMutex.RUnlock()
// Find the correct Go callback based on handle (this is tricky without handle passed)
// For now, iterate through all registered callbacks. This is NOT ideal for multiple managers.
// A better approach would involve passing the handle back through user_data if possible,
// or maintaining a single global callback handler if only one manager instance is expected.
// Let's assume a single instance for simplicity for now.
for _, callbacks := range callbackRegistry {
if callbacks != nil && callbacks.OnMessageReady != nil {
// Run in a goroutine to avoid blocking the C thread
go callbacks.OnMessageReady(MessageID(msgIdStr))
}
}
fmt.Printf("Go: Message Ready: %s\n", msgIdStr) // Debug print
}
func goMessageSentCallback(messageID *C.char) {
msgIdStr := C.GoString(messageID)
registryMutex.RLock()
defer registryMutex.RUnlock()
for _, callbacks := range callbackRegistry {
if callbacks != nil && callbacks.OnMessageSent != nil {
go callbacks.OnMessageSent(MessageID(msgIdStr))
}
}
fmt.Printf("Go: Message Sent: %s\n", msgIdStr) // Debug print
}
func goMissingDependenciesCallback(messageID *C.char, missingDeps **C.char, missingDepsCount C.size_t) {
msgIdStr := C.GoString(messageID)
deps := make([]MessageID, missingDepsCount)
if missingDepsCount > 0 {
// Convert C array of C strings to Go slice
cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(missingDeps))[:missingDepsCount:missingDepsCount]
for i, s := range cDepsArray {
deps[i] = MessageID(C.GoString(s))
}
}
// globalCallbackRelay is called by Nim for all events.
// It uses the handle to find the correct Go Callbacks struct and dispatch the call.
//export globalCallbackRelay
func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 unsafe.Pointer, data2 unsafe.Pointer, data3 C.size_t) {
goHandle := ReliabilityManagerHandle(handle)
registryMutex.RLock()
defer registryMutex.RUnlock()
callbacks, ok := callbackRegistry[goHandle]
registryMutex.RUnlock()
for _, callbacks := range callbackRegistry {
if callbacks != nil && callbacks.OnMissingDependencies != nil {
go callbacks.OnMissingDependencies(MessageID(msgIdStr), deps)
}
if !ok || callbacks == nil {
fmt.Printf("Go: globalCallbackRelay: No callbacks registered for handle %v\n", goHandle) // Uncommented
return
}
fmt.Printf("Go: Missing Deps for %s: %v\n", msgIdStr, deps) // Debug print
}
func goPeriodicSyncCallback() {
registryMutex.RLock()
defer registryMutex.RUnlock()
for _, callbacks := range callbackRegistry {
if callbacks != nil && callbacks.OnPeriodicSync != nil {
go callbacks.OnPeriodicSync()
// Use a goroutine to avoid blocking the Nim thread
go func() {
switch eventType {
case C.EVENT_MESSAGE_READY:
if callbacks.OnMessageReady != nil {
msgIdStr := C.GoString((*C.char)(data1))
callbacks.OnMessageReady(MessageID(msgIdStr))
}
case C.EVENT_MESSAGE_SENT:
if callbacks.OnMessageSent != nil {
msgIdStr := C.GoString((*C.char)(data1))
callbacks.OnMessageSent(MessageID(msgIdStr))
}
case C.EVENT_MISSING_DEPENDENCIES:
if callbacks.OnMissingDependencies != nil {
msgIdStr := C.GoString((*C.char)(data1))
depsCount := int(data3)
deps := make([]MessageID, depsCount)
if depsCount > 0 {
// Convert C array of C strings (**char) to Go slice
cDepsArray := (*[1 << 30]*C.char)(data2)[:depsCount:depsCount]
for i, s := range cDepsArray {
deps[i] = MessageID(C.GoString(s))
}
}
callbacks.OnMissingDependencies(MessageID(msgIdStr), deps)
}
case C.EVENT_PERIODIC_SYNC:
if callbacks.OnPeriodicSync != nil {
callbacks.OnPeriodicSync()
}
default:
fmt.Printf("Go: globalCallbackRelay: Received unknown event type %d for handle %v\n", eventType, goHandle)
}
}
fmt.Println("Go: Periodic Sync Requested") // Debug print
}()
}

View File

@ -1,9 +1,9 @@
package main
import (
// "fmt"
// "sync"
"fmt"
"sync"
"testing"
// "time"
"time"
)
// Test basic creation, cleanup, and reset
@ -58,205 +58,202 @@ func TestWrapUnwrap(t *testing.T) {
}
}
// // Test dependency handling
// func TestDependencies(t *testing.T) {
// channelID := "test-deps"
// handle, err := NewReliabilityManager(channelID)
// if err != nil {
// t.Fatalf("NewReliabilityManager failed: %v", err)
// }
// defer CleanupReliabilityManager(handle)
// Test dependency handling
func TestDependencies(t *testing.T) {
channelID := "test-deps"
handle, err := NewReliabilityManager(channelID)
if err != nil {
t.Fatalf("NewReliabilityManager failed: %v", err)
}
defer CleanupReliabilityManager(handle)
// // 1. Send message 1 (will become a dependency)
// payload1 := []byte("message one")
// msgID1 := MessageID("msg-dep-1")
// wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
// if err != nil {
// t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
// }
// // Simulate receiving msg1 to add it to history (implicitly acknowledges it)
// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
// if err != nil {
// t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
// }
// 1. Send message 1 (will become a dependency)
payload1 := []byte("message one")
msgID1 := MessageID("msg-dep-1")
wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
if err != nil {
t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
}
// Simulate receiving msg1 to add it to history (implicitly acknowledges it)
_, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
}
// // 2. Send message 2 (depends on message 1 implicitly via causal history)
// payload2 := []byte("message two")
// msgID2 := MessageID("msg-dep-2")
// wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
// if err != nil {
// t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
// }
// 2. Send message 2 (depends on message 1 implicitly via causal history)
payload2 := []byte("message two")
msgID2 := MessageID("msg-dep-2")
wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
if err != nil {
t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
}
// // 3. Create a new manager to simulate a different peer receiving msg2 without msg1
// handle2, err := NewReliabilityManager(channelID) // Same channel ID
// if err != nil {
// t.Fatalf("NewReliabilityManager (2) failed: %v", err)
// }
// defer CleanupReliabilityManager(handle2)
// 3. Create a new manager to simulate a different peer receiving msg2 without msg1
handle2, err := NewReliabilityManager(channelID) // Same channel ID
if err != nil {
t.Fatalf("NewReliabilityManager (2) failed: %v", err)
}
defer CleanupReliabilityManager(handle2)
// // 4. Unwrap message 2 on the second manager - should report msg1 as missing
// _, missingDeps, err := UnwrapReceivedMessage(handle2, wrappedMsg2)
// if err != nil {
// t.Fatalf("UnwrapReceivedMessage (2) on handle2 failed: %v", err)
// }
// 4. Unwrap message 2 on the second manager - should report msg1 as missing
_, missingDeps, err := UnwrapReceivedMessage(handle2, wrappedMsg2)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (2) on handle2 failed: %v", err)
}
// if len(missingDeps) == 0 {
// t.Fatalf("Expected missing dependencies, got none")
// }
// foundDep1 := false
// for _, dep := range missingDeps {
// if dep == msgID1 {
// foundDep1 = true
// break
// }
// }
// if !foundDep1 {
// t.Errorf("Expected missing dependency %q, got %v", msgID1, missingDeps)
// }
if len(missingDeps) == 0 {
t.Fatalf("Expected missing dependencies, got none")
}
foundDep1 := false
for _, dep := range missingDeps {
if dep == msgID1 {
foundDep1 = true
break
}
}
if !foundDep1 {
t.Errorf("Expected missing dependency %q, got %v", msgID1, missingDeps)
}
// // 5. Mark the dependency as met
// err = MarkDependenciesMet(handle2, []MessageID{msgID1})
// if err != nil {
// t.Fatalf("MarkDependenciesMet failed: %v", err)
// }
// 5. Mark the dependency as met
err = MarkDependenciesMet(handle2, []MessageID{msgID1})
if err != nil {
t.Fatalf("MarkDependenciesMet failed: %v", err)
}
}
// // Ideally, we'd check if the message is now moved from an internal buffer,
// // but the current API doesn't expose buffer state. We rely on callbacks for this.
// }
// Test callbacks
func TestCallbacks(t *testing.T) {
channelID := "test-callbacks"
handle, err := NewReliabilityManager(channelID)
if err != nil {
t.Fatalf("NewReliabilityManager failed: %v", err)
}
defer CleanupReliabilityManager(handle)
// // Test callbacks
// func TestCallbacks(t *testing.T) {
// channelID := "test-callbacks"
// handle, err := NewReliabilityManager(channelID)
// if err != nil {
// t.Fatalf("NewReliabilityManager failed: %v", err)
// }
// defer CleanupReliabilityManager(handle)
var wg sync.WaitGroup
receivedReady := make(map[MessageID]bool)
receivedSent := make(map[MessageID]bool)
receivedMissing := make(map[MessageID][]MessageID)
syncRequested := false
var cbMutex sync.Mutex // Protect access to callback tracking maps/vars
// var wg sync.WaitGroup
// receivedReady := make(map[MessageID]bool)
// receivedSent := make(map[MessageID]bool)
// receivedMissing := make(map[MessageID][]MessageID)
// syncRequested := false
// var cbMutex sync.Mutex // Protect access to callback tracking maps/vars
callbacks := Callbacks{
OnMessageReady: func(messageId MessageID) {
fmt.Printf("Test: OnMessageReady received: %s\n", messageId)
cbMutex.Lock()
receivedReady[messageId] = true
cbMutex.Unlock()
wg.Done()
},
OnMessageSent: func(messageId MessageID) {
fmt.Printf("Test: OnMessageSent received: %s\n", messageId)
cbMutex.Lock()
receivedSent[messageId] = true
cbMutex.Unlock()
wg.Done()
},
OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) {
fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps)
cbMutex.Lock()
receivedMissing[messageId] = missingDeps
cbMutex.Unlock()
wg.Done()
},
OnPeriodicSync: func() {
fmt.Println("Test: OnPeriodicSync received")
cbMutex.Lock()
syncRequested = true
cbMutex.Unlock()
// Don't wg.Done() here, it might be called multiple times
},
}
// callbacks := Callbacks{
// OnMessageReady: func(messageId MessageID) {
// fmt.Printf("Test: OnMessageReady received: %s\n", messageId)
// cbMutex.Lock()
// receivedReady[messageId] = true
// cbMutex.Unlock()
// wg.Done()
// },
// OnMessageSent: func(messageId MessageID) {
// fmt.Printf("Test: OnMessageSent received: %s\n", messageId)
// cbMutex.Lock()
// receivedSent[messageId] = true
// cbMutex.Unlock()
// wg.Done()
// },
// OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) {
// fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps)
// cbMutex.Lock()
// receivedMissing[messageId] = missingDeps
// cbMutex.Unlock()
// wg.Done()
// },
// OnPeriodicSync: func() {
// fmt.Println("Test: OnPeriodicSync received")
// cbMutex.Lock()
// syncRequested = true
// cbMutex.Unlock()
// // Don't wg.Done() here, it might be called multiple times
// },
// }
err = RegisterCallback(handle, callbacks)
if err != nil {
t.Fatalf("RegisterCallback failed: %v", err)
}
// err = RegisterCallbacks(handle, callbacks)
// if err != nil {
// t.Fatalf("RegisterCallbacks failed: %v", err)
// }
// Start tasks AFTER registering callbacks
err = StartPeriodicTasks(handle)
if err != nil {
t.Fatalf("StartPeriodicTasks failed: %v", err)
}
// // Start tasks AFTER registering callbacks
// err = StartPeriodicTasks(handle)
// if err != nil {
// t.Fatalf("StartPeriodicTasks failed: %v", err)
// }
// --- Test Scenario ---
// // --- Test Scenario ---
// 1. Send msg1
wg.Add(1) // Expect OnMessageSent for msg1 eventually
payload1 := []byte("callback test 1")
msgID1 := MessageID("cb-msg-1")
wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
if err != nil {
t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
}
// // 1. Send msg1
// wg.Add(1) // Expect OnMessageSent for msg1 eventually
// payload1 := []byte("callback test 1")
// msgID1 := MessageID("cb-msg-1")
// wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
// if err != nil {
// t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
// }
// 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history)
wg.Add(1) // Expect OnMessageReady for msg1
_, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
}
// // 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history)
// wg.Add(1) // Expect OnMessageReady for msg1
// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
// if err != nil {
// t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
// }
// 3. Send msg2 (depends on msg1)
wg.Add(1) // Expect OnMessageSent for msg2 eventually
payload2 := []byte("callback test 2")
msgID2 := MessageID("cb-msg-2")
wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
if err != nil {
t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
}
// // 3. Send msg2 (depends on msg1)
// wg.Add(1) // Expect OnMessageSent for msg2 eventually
// payload2 := []byte("callback test 2")
// msgID2 := MessageID("cb-msg-2")
// wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
// if err != nil {
// t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
// }
// 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2)
wg.Add(1) // Expect OnMessageReady for msg2
_, _, err = UnwrapReceivedMessage(handle, wrappedMsg2)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err)
}
// // 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2)
// wg.Add(1) // Expect OnMessageReady for msg2
// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg2)
// if err != nil {
// t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err)
// }
// --- Verification ---
// Wait for expected callbacks with a timeout
waitTimeout(&wg, 5*time.Second, t)
// // --- Verification ---
// // Wait for expected callbacks with a timeout
// waitTimeout(&wg, 5*time.Second, t)
cbMutex.Lock()
defer cbMutex.Unlock()
// cbMutex.Lock()
// defer cbMutex.Unlock()
if !receivedReady[msgID1] {
t.Errorf("OnMessageReady not called for %s", msgID1)
}
if !receivedReady[msgID2] {
t.Errorf("OnMessageReady not called for %s", msgID2)
}
if !receivedSent[msgID1] {
t.Errorf("OnMessageSent not called for %s", msgID1)
}
if !receivedSent[msgID2] {
t.Errorf("OnMessageSent not called for %s", msgID2)
}
// We didn't explicitly test missing deps in this path
if len(receivedMissing) > 0 {
t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing)
}
// Periodic sync is harder to guarantee in a short test, just check if it was ever true
if !syncRequested {
t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout")
}
}
// if !receivedReady[msgID1] {
// t.Errorf("OnMessageReady not called for %s", msgID1)
// }
// if !receivedReady[msgID2] {
// t.Errorf("OnMessageReady not called for %s", msgID2)
// }
// if !receivedSent[msgID1] {
// t.Errorf("OnMessageSent not called for %s", msgID1)
// }
// if !receivedSent[msgID2] {
// t.Errorf("OnMessageSent not called for %s", msgID2)
// }
// // We didn't explicitly test missing deps in this path
// if len(receivedMissing) > 0 {
// t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing)
// }
// // Periodic sync is harder to guarantee in a short test, just check if it was ever true
// if !syncRequested {
// t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout")
// }
// }
// // Helper function to wait for WaitGroup with a timeout
// func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) {
// c := make(chan struct{})
// go func() {
// defer close(c)
// wg.Wait()
// }()
// select {
// case <-c:
// // Completed normally
// case <-time.After(timeout):
// t.Fatalf("Timed out waiting for callbacks")
// }
// }
// Helper function to wait for WaitGroup with a timeout
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
// Completed normally
case <-time.After(timeout):
t.Fatalf("Timed out waiting for callbacks")
}
}

View File

@ -24,18 +24,10 @@ type
channelId*: string
config*: ReliabilityConfig
lock*: Lock
# Nim callbacks (used internally or if not using C bindings)
onMessageReady*: proc(messageId: MessageID) {.gcsafe.}
onMessageSent*: proc(messageId: MessageID) {.gcsafe.}
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
onPeriodicSync*: PeriodicSyncCallback
# C callback pointers and user data (for FFI)
cMessageReadyCallback*: pointer
cMessageSentCallback*: pointer
cMissingDependenciesCallback*: pointer
cPeriodicSyncCallback*: pointer
cUserData*: pointer
onMessageReady*: proc(messageId: MessageID)
onMessageSent*: proc(messageId: MessageID)
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID])
onPeriodicSync*: proc()
ReliabilityError* = enum
reInvalidArgument