diff --git a/.gitignore b/.gitignore index b8be30d..45bb7a3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .DS_Store tests/test_reliability -tests/bloom +tests/test_bloom nph docs for_reference diff --git a/bindings/bindings.nim b/bindings/bindings.nim index 98a19e9..23508bb 100644 --- a/bindings/bindings.nim +++ b/bindings/bindings.nim @@ -1,19 +1,10 @@ import std/typetraits -import system # for GC thread setup/teardown -import chronos -import results +import system, chronos, results import ../src/[reliability, reliability_utils, message] type CReliabilityManagerHandle* = pointer type - # Callback Types (Imported from C Header) - CEventType* {.importc: "CEventType", header: "bindings.h", pure.} = enum - EVENT_MESSAGE_READY = 1 - EVENT_MESSAGE_SENT = 2 - EVENT_MISSING_DEPENDENCIES = 3 - EVENT_PERIODIC_SYNC = 4 - CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object is_ok*: bool error_message*: cstring @@ -34,30 +25,23 @@ type proc allocCString*(s: string): cstring {.inline, gcsafe.} = if s.len == 0: - echo "[Nim Binding][allocCString] Allocating empty string" return nil result = cast[cstring](allocShared(s.len + 1)) copyMem(result, s.cstring, s.len + 1) - echo "[Nim Binding][allocCString] Allocated cstring at ", - cast[int](result), " for: ", s proc allocSeqByte*(s: seq[byte]): (pointer, csize_t) {.inline, gcsafe.} = if s.len == 0: - echo "[Nim Binding][allocSeqByte] Allocating empty seq[byte]" return (nil, 0) let len = s.len let bufferPtr = allocShared(len) if len > 0: copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural) - echo "[Nim Binding][allocSeqByte] Allocated buffer at ", - cast[int](bufferPtr), " of length ", len return (bufferPtr, len.csize_t) proc allocSeqCString*( s: seq[string] ): (ptr cstring, csize_t) {.inline, gcsafe, cdecl.} = if s.len == 0: - echo "[Nim Binding][allocSeqCString] Allocating empty seq[string]" return (nil, 0) let count = s.len # Allocate memory for 'count' cstring pointers, cast to ptr UncheckedArray @@ -65,33 +49,22 @@ proc allocSeqCString*( for i in 0 ..< count: # Allocate each string and store its pointer in the array using unchecked array indexing arrPtr[i] = allocCString(s[i]) - echo "[Nim Binding][allocSeqCString] Allocated cstring for missingDep[", - i, "]: ", s[i], " at ", cast[int](arrPtr[i]) # Return pointer to the first element, cast back to ptr cstring - echo "[Nim Binding][allocSeqCString] Allocated array at ", - cast[int](arrPtr), " with count ", count return (cast[ptr cstring](arrPtr), count.csize_t) proc freeCString*(cs: cstring) {.inline, gcsafe.} = if cs != nil: - echo "[Nim Binding][freeCString] Freeing cstring at ", cast[int](cs) deallocShared(cs) proc freeSeqByte*(bufferPtr: pointer) {.inline, gcsafe, cdecl.} = if bufferPtr != nil: - echo "[Nim Binding][freeSeqByte] Freeing buffer at ", cast[int](bufferPtr) deallocShared(bufferPtr) -# Corrected to accept ptr cstring proc freeSeqCString*(arrPtr: ptr cstring, count: csize_t) {.inline, gcsafe, cdecl.} = if arrPtr != nil: - echo "[Nim Binding][freeSeqCString] Freeing array at ", - cast[int](arrPtr), " with count ", count # Cast to ptr UncheckedArray for proper iteration/indexing before freeing let arr = cast[ptr UncheckedArray[cstring]](arrPtr) for i in 0 ..< count: - echo "[Nim Binding][freeSeqCString] Freeing cstring[", - i, "] at ", cast[int](arr[i]) freeCString(arr[i]) # Free each individual cstring deallocShared(arrPtr) # Free the array pointer itself @@ -114,15 +87,13 @@ proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) {.gcs defer: tearDownForeignThreadGc() # Ensure teardown even if callback errors - echo "[Nim Binding] nimMessageReadyCallback called for: ", messageId let handle = cast[CReliabilityManagerHandle](rm) # Still use handle for C side let cb = rm.cCallback if cb == nil: - echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle) return - # Pass handle, event type, and messageId (as data1), plus user_data + # Pass handle, event type, and messageId cb(handle, EVENT_MESSAGE_READY, cast[pointer](messageId.cstring), nil, 0) proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} = @@ -130,12 +101,10 @@ proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsa defer: tearDownForeignThreadGc() - echo "[Nim Binding] nimMessageSentCallback called for: ", messageId let handle = cast[CReliabilityManagerHandle](rm) let cb = rm.cCallback if cb == nil: - echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle) return cb(handle, EVENT_MESSAGE_SENT, cast[pointer](messageId.cstring), nil, 0) @@ -147,13 +116,10 @@ proc nimMissingDependenciesCallback( defer: tearDownForeignThreadGc() - echo "[Nim Binding] nimMissingDependenciesCallback called for: ", - messageId, " with deps: ", $missingDeps let handle = cast[CReliabilityManagerHandle](rm) let cb = rm.cCallback if cb == nil: - echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle) return # Prepare data for the callback @@ -166,9 +132,6 @@ proc nimMissingDependenciesCallback( cDepsNim[i] = dep.cstring # Nim GC manages these cstrings via the seq cDepsPtr = cast[ptr cstring](cDepsNim[0].addr) cDepsCount = missingDeps.len.csize_t - # Ensure cDepsNim stays alive during the call if cDepsPtr points into it - # Using allocSeqCString might be safer if Go needs to hold onto the data. - # For now, assuming Go copies the data immediately during the callback. cb( handle, @@ -183,24 +146,21 @@ proc nimPeriodicSyncCallback(rm: ReliabilityManager) {.gcsafe.} = defer: tearDownForeignThreadGc() - echo "[Nim Binding] nimPeriodicSyncCallback called" let handle = cast[CReliabilityManagerHandle](rm) let cb = rm.cCallback if cb == nil: - echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle) return cb(handle, EVENT_PERIODIC_SYNC, nil, nil, 0) -# --- Exported C Functions - Using Opaque Pointer --- +# --- Exported C Functions --- proc NewReliabilityManager*( channelIdCStr: cstring ): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} = let channelId = $channelIdCStr if channelId.len == 0: - echo "Error creating ReliabilityManager: Channel ID cannot be empty" return nil # Return nil pointer let rmResult = newReliabilityManager(channelId) if rmResult.isOk: @@ -221,28 +181,19 @@ proc NewReliabilityManager*( GC_ref(rm) # Prevent GC from moving the object while Go holds the handle return handle else: - echo "Error creating ReliabilityManager: ", rmResult.error - return nil # Return nil pointer + return nil proc CleanupReliabilityManager*( handle: CReliabilityManagerHandle ) {.exportc, dynlib, cdecl.} = let handlePtr = handle - echo "[Nim Binding][Cleanup] Called with handle: ", cast[int](handlePtr) if handlePtr != nil: - # Go side should handle removing the handle from its registry. - # We just need to unref the Nim object. - # Cast opaque pointer back to Nim ref type let rm = cast[ReliabilityManager](handlePtr) - echo "[Nim Binding][Cleanup] Calling Nim core cleanup for handle: ", - cast[int](handlePtr) cleanup(rm) - echo "[Nim Binding][Cleanup] Calling GC_unref for handle: ", cast[int](handlePtr) GC_unref(rm) # Allow GC to collect the object now that Go is done - echo "[Nim Binding][Cleanup] GC_unref returned for handle: ", cast[int](handlePtr) else: - echo "[Nim Binding][Cleanup] Warning: CleanupReliabilityManager called with NULL handle" + discard proc ResetReliabilityManager*( handle: CReliabilityManagerHandle @@ -250,33 +201,28 @@ proc ResetReliabilityManager*( if handle == nil: return toCResultErrStr("ReliabilityManager handle is NULL") let rm = cast[ReliabilityManager](handle) - let result = resetReliabilityManager(rm) - if result.isOk: + let opResult = resetReliabilityManager(rm) + if opResult.isOk: return toCResultOk() else: - return toCResultErr(result.error) + return toCResultErr(opResult.error) proc WrapOutgoingMessage*( handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t, messageIdCStr: cstring, -): CWrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe - echo "[Nim Binding][WrapOutgoingMessage] Called with handle=", - cast[int](handle), " messageLen=", messageLen, " messageId=", $messageIdCStr +): CWrapResult {.exportc, dynlib, cdecl.} = if handle == nil: - echo "[Nim Binding][WrapOutgoingMessage] Error: handle is nil" return CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) let rm = cast[ReliabilityManager](handle) if messageC == nil and messageLen > 0: - echo "[Nim Binding][WrapOutgoingMessage] Error: message pointer is NULL but length > 0" return CWrapResult( base_result: toCResultErrStr("Message pointer is NULL but length > 0") ) if messageIdCStr == nil: - echo "[Nim Binding][WrapOutgoingMessage] Error: messageId pointer is NULL" return CWrapResult(base_result: toCResultErrStr("Message ID pointer is NULL")) let messageId = $messageIdCStr @@ -290,28 +236,21 @@ proc WrapOutgoingMessage*( let wrapResult = wrapOutgoingMessage(rm, messageNim, messageId) if wrapResult.isOk: let (wrappedDataPtr, wrappedDataLen) = allocSeqByte(wrapResult.get()) - echo "[Nim Binding][WrapOutgoingMessage] Returning wrapped message at ", - cast[int](wrappedDataPtr), " len=", wrappedDataLen return CWrapResult( base_result: toCResultOk(), message: wrappedDataPtr, message_len: wrappedDataLen ) else: - echo "[Nim Binding][WrapOutgoingMessage] Error: ", $wrapResult.error return CWrapResult(base_result: toCResultErr(wrapResult.error)) proc UnwrapReceivedMessage*( handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t ): CUnwrapResult {.exportc, dynlib, cdecl.} = - echo "[Nim Binding][UnwrapReceivedMessage] Called with handle=", - cast[int](handle), " messageLen=", messageLen if handle == nil: - echo "[Nim Binding][UnwrapReceivedMessage] Error: handle is nil" return CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) let rm = cast[ReliabilityManager](handle) if messageC == nil and messageLen > 0: - echo "[Nim Binding][UnwrapReceivedMessage] Error: message pointer is NULL but length > 0" return CUnwrapResult( base_result: toCResultErrStr("Message pointer is NULL but length > 0") ) @@ -328,14 +267,6 @@ proc UnwrapReceivedMessage*( let (unwrappedContent, missingDepsNim) = unwrapResult.get() let (contentPtr, contentLen) = allocSeqByte(unwrappedContent) let (depsPtr, depsCount) = allocSeqCString(missingDepsNim) - echo "[Nim Binding][UnwrapReceivedMessage] Returning content at ", - cast[int](contentPtr), - " len=", - contentLen, - " missingDepsPtr=", - cast[int](depsPtr), - " count=", - depsCount return CUnwrapResult( base_result: toCResultOk(), message: contentPtr, @@ -344,44 +275,33 @@ proc UnwrapReceivedMessage*( missing_deps_count: depsCount, ) else: - echo "[Nim Binding][UnwrapReceivedMessage] Error: ", $unwrapResult.error return CUnwrapResult(base_result: toCResultErr(unwrapResult.error)) proc MarkDependenciesMet*( handle: CReliabilityManagerHandle, messageIDsC: ptr cstring, count: csize_t ): CResult {.exportc, dynlib, cdecl.} = - echo "[Nim Binding][MarkDependenciesMet] Called with handle=", - cast[int](handle), " count=", count if handle == nil: - echo "[Nim Binding][MarkDependenciesMet] Error: handle is nil" return toCResultErrStr("ReliabilityManager handle is NULL") let rm = cast[ReliabilityManager](handle) if messageIDsC == nil and count > 0: - echo "[Nim Binding][MarkDependenciesMet] Error: messageIDs pointer is NULL but count > 0" return toCResultErrStr("MessageIDs pointer is NULL but count > 0") var messageIDsNim = newSeq[string](count) # Cast to ptr UncheckedArray for indexing let messageIDsCArray = cast[ptr UncheckedArray[cstring]](messageIDsC) for i in 0 ..< count: - let currentCStr = messageIDsCArray[i] # Use unchecked array indexing + let currentCStr = messageIDsCArray[i] if currentCStr != nil: messageIDsNim[i] = $currentCStr - echo "[Nim Binding][MarkDependenciesMet] messageID[", - i, "] = ", messageIDsNim[i], " at ", cast[int](currentCStr) else: - echo "[Nim Binding][MarkDependenciesMet] NULL message ID found in array at index ", - i return toCResultErrStr("NULL message ID found in array") - let result = markDependenciesMet(rm, messageIDsNim) - if result.isOk: - echo "[Nim Binding][MarkDependenciesMet] Success" + let opResult = markDependenciesMet(rm, messageIDsNim) + if opResult.isOk: return toCResultOk() else: - echo "[Nim Binding][MarkDependenciesMet] Error: ", $result.error - return toCResultErr(result.error) + return toCResultErr(opResult.error) proc RegisterCallback*( handle: CReliabilityManagerHandle, @@ -389,16 +309,13 @@ proc RegisterCallback*( cUserDataPtr: pointer, ) {.exportc, dynlib, cdecl, gcsafe.} = if handle == nil: - echo "[Nim Binding][RegisterCallback] Error: handle is NULL" return let rm = cast[ReliabilityManager](handle) rm.cCallback = cEventCallback - rm.cUserData = cUserDataPtr # Store user data pointer - echo "[Nim Binding] Stored C callback and user data for handle: ", cast[int](handle) + rm.cUserData = cUserDataPtr proc StartPeriodicTasks*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} = if handle == nil: - echo "Error: Cannot start periodic tasks: NULL ReliabilityManager handle" return let rm = cast[ReliabilityManager](handle) startPeriodicTasks(rm) diff --git a/bindings/generated/libbindings.dylib b/bindings/generated/libbindings.dylib deleted file mode 100755 index 2c076e1..0000000 Binary files a/bindings/generated/libbindings.dylib and /dev/null differ diff --git a/bindings/generated/libsds.dylib b/bindings/generated/libsds.dylib new file mode 100755 index 0000000..564ccae Binary files /dev/null and b/bindings/generated/libsds.dylib differ diff --git a/bindings/generated/libsds.dylib.arm b/bindings/generated/libsds.dylib.arm new file mode 100755 index 0000000..c700e75 Binary files /dev/null and b/bindings/generated/libsds.dylib.arm differ diff --git a/bindings/generated/libsds.dylib.x64 b/bindings/generated/libsds.dylib.x64 new file mode 100755 index 0000000..5d89bdc Binary files /dev/null and b/bindings/generated/libsds.dylib.x64 differ diff --git a/reliability.nimble b/reliability.nimble index dcb705e..ca4398e 100644 --- a/reliability.nimble +++ b/reliability.nimble @@ -17,8 +17,8 @@ task test, "Run the test suite": task bindings, "Generate bindings": proc compile(libName: string, flags = "") = - exec "nim c -f " & flags & " -d:release --app:lib --mm:arc --tlsEmulation:off --out:" & - libName & " --outdir:bindings/generated bindings/bindings.nim" + exec "nim c -f " & flags & " -d:release --app:lib --mm:refc --out:" & libName & + " --outdir:bindings/generated bindings/bindings.nim" # Create required directories mkDir "bindings/generated" diff --git a/sds_wrapper.go b/sds_wrapper.go index b1095a4..0fd002a 100644 --- a/sds_wrapper.go +++ b/sds_wrapper.go @@ -2,7 +2,7 @@ package main /* #cgo CFLAGS: -I${SRCDIR}/bindings -#cgo LDFLAGS: -L${SRCDIR}/bindings/generated -lbindings +#cgo LDFLAGS: -L${SRCDIR}/bindings/generated -lsds #cgo LDFLAGS: -Wl,-rpath,${SRCDIR}/bindings/generated #include // For C.free @@ -41,7 +41,7 @@ type Callbacks struct { OnPeriodicSync func() } -// Global map to store callbacks associated with handles +// Global map to store callbacks associated with handles (needed for Go relay) var ( callbackRegistry = make(map[ReliabilityManagerHandle]*Callbacks) registryMutex sync.RWMutex @@ -56,7 +56,6 @@ func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) { handle := C.NewReliabilityManager(cChannelId) if handle == nil { - // Note: Nim side currently just prints to stdout on creation failure return nil, errors.New("failed to create ReliabilityManager (check Nim logs/stdout)") } return ReliabilityManagerHandle(handle), nil @@ -64,14 +63,14 @@ func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) { // CleanupReliabilityManager frees the resources associated with the handle func CleanupReliabilityManager(handle ReliabilityManagerHandle) { - fmt.Printf("Go: CleanupReliabilityManager called for handle %p\n", handle) // Log entry if handle == nil { - fmt.Println("Go: CleanupReliabilityManager: handle is nil, returning.") return } - fmt.Printf("Go: CleanupReliabilityManager: Calling C.CleanupReliabilityManager for handle %p\n", handle) + // Remove from Go registry first + registryMutex.Lock() + delete(callbackRegistry, handle) + registryMutex.Unlock() C.CleanupReliabilityManager(unsafe.Pointer(handle)) - fmt.Printf("Go: CleanupReliabilityManager: C.CleanupReliabilityManager returned for handle %p\n", handle) // Log exit } // ResetReliabilityManager resets the state of the manager @@ -240,7 +239,6 @@ func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 un registryMutex.RUnlock() if !ok || callbacks == nil { - fmt.Printf("Go: globalCallbackRelay: No callbacks registered for handle %v\n", goHandle) return } diff --git a/sds_wrapper_test.go b/sds_wrapper_test.go index 8899a12..388df5b 100644 --- a/sds_wrapper_test.go +++ b/sds_wrapper_test.go @@ -1,6 +1,5 @@ package main import ( - "fmt" "sync" "testing" "time" @@ -32,14 +31,12 @@ func TestHandleUniqueness(t *testing.T) { t.Fatalf("NewReliabilityManager (1) failed: %v", err1) } defer CleanupReliabilityManager(handle1) - t.Logf("Handle 1: %p", handle1) handle2, err2 := NewReliabilityManager(channelID) if err2 != nil || handle2 == nil { t.Fatalf("NewReliabilityManager (2) failed: %v", err2) } defer CleanupReliabilityManager(handle2) - t.Logf("Handle 2: %p", handle2) if handle1 == handle2 { t.Errorf("Expected unique handles, but both are %p", handle1) @@ -166,14 +163,11 @@ func TestCallback_OnMessageReady(t *testing.T) { callbacks := Callbacks{ OnMessageReady: func(messageId MessageID) { - fmt.Printf("Test_OnMessageReady: Received: %s\n", messageId) // Non-blocking send to channel select { case readyChan <- messageId: - fmt.Printf("Test_OnMessageReady: Sent '%s' to readyChan\n", messageId) // Log after send default: // Avoid blocking if channel is full or test already timed out - fmt.Printf("Test_OnMessageReady: Warning - readyChan buffer full or test finished for %s\n", messageId) } }, } @@ -189,14 +183,12 @@ func TestCallback_OnMessageReady(t *testing.T) { msgID := MessageID("cb-ready-1") // Wrap on sender - t.Logf("Test_OnMessageReady: Wrapping message with handleSender: %p", handleSender) // Log sender handle wrappedMsg, err := WrapOutgoingMessage(handleSender, payload, msgID) if err != nil { t.Fatalf("WrapOutgoingMessage failed: %v", err) } // Unwrap on receiver - t.Logf("Test_OnMessageReady: Unwrapping message with handleReceiver: %p", handleReceiver) // Log receiver handle _, _, err = UnwrapReceivedMessage(handleReceiver, wrappedMsg) if err != nil { t.Fatalf("UnwrapReceivedMessage failed: %v", err) @@ -240,7 +232,6 @@ func TestCallback_OnMessageSent(t *testing.T) { callbacks := Callbacks{ OnMessageSent: func(messageId MessageID) { - fmt.Printf("Test_OnMessageSent: Received: %s\n", messageId) cbMutex.Lock() sentCalled = true sentMsgID = messageId @@ -328,7 +319,6 @@ func TestCallback_OnMissingDependencies(t *testing.T) { callbacks := Callbacks{ OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { - fmt.Printf("Test_OnMissingDependencies: Received for %s: %v\n", messageId, missingDeps) cbMutex.Lock() missingCalled = true missingMsgID = messageId @@ -354,7 +344,6 @@ func TestCallback_OnMissingDependencies(t *testing.T) { if err != nil { t.Fatalf("WrapOutgoingMessage (1) on sender failed: %v", err) } - // _, _, err = UnwrapReceivedMessage(handleSender, wrappedMsg1) // No need to unwrap on sender // 2. Sender sends msg2 (depends on msg1) payload2 := []byte("missing test 2") @@ -410,7 +399,6 @@ func TestCallback_OnPeriodicSync(t *testing.T) { callbacks := Callbacks{ OnPeriodicSync: func() { - fmt.Println("Test_OnPeriodicSync: Received") cbMutex.Lock() if !syncCalled { // Only signal the first time syncCalled = true @@ -433,8 +421,6 @@ func TestCallback_OnPeriodicSync(t *testing.T) { // --- Verification --- // Wait for the periodic sync callback with a timeout (needs to be longer than sync interval) - // Default sync interval is 30s, which is too long for a unit test. - // We rely on the periodic tasks starting quickly and triggering the callback soon. select { case <-syncChan: // Success diff --git a/src/reliability.nim b/src/reliability.nim index 31d2016..93acd2e 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -60,15 +60,8 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) = logError("Failed to deserialize bloom filter") if acknowledged: - echo "[Nim Core] reviewAckStatus: Message acknowledged: ", - outMsg.message.messageId if rm.onMessageSent != nil: - echo "[Nim Core] reviewAckStatus: Calling onMessageSent for: ", - outMsg.message.messageId rm.onMessageSent(rm, outMsg.message.messageId) # Pass rm - else: - echo "[Nim Core] reviewAckStatus: rm.onMessageSent is nil, cannot call callback for: ", - outMsg.message.messageId rm.outgoingBuffer.delete(i) else: inc i @@ -158,14 +151,8 @@ proc processIncomingBuffer(rm: ReliabilityManager) = for msg in rm.incomingBuffer: if msg.messageId == msgId: rm.addToHistory(msg.messageId) - echo "[Nim Core] processIncomingBuffer: Message ready: ", msg.messageId if rm.onMessageReady != nil: - echo "[Nim Core] processIncomingBuffer: Calling onMessageReady for: ", - msg.messageId - rm.onMessageReady(rm, msg.messageId) # Pass rm - else: - echo "[Nim Core] processIncomingBuffer: rm.onMessageReady is nil, cannot call callback for: ", - msg.messageId + rm.onMessageReady(rm, msg.messageId) processed.incl(msgId) # Add any dependent messages that might now be ready if msgId in dependencies: @@ -197,11 +184,10 @@ proc unwrapReceivedMessage*( let msg = msgResult.get if rm.bloomFilter.contains(msg.messageId): - echo "[Nim Core] unwrapReceivedMessage: Duplicate message detected (in bloom filter): ", - msg.messageId # Add this log + # Duplicate message detected return ok((msg.content, @[])) - rm.bloomFilter.add(msg.messageId) # Add to receiver's bloom filter + rm.bloomFilter.add(msg.messageId) # Update Lamport timestamp rm.updateLamportTimestamp(msg.lamportTimestamp) @@ -228,36 +214,13 @@ proc unwrapReceivedMessage*( rm.addToHistory(msg.messageId) rm.processIncomingBuffer() # This might trigger onMessageReady internally # If processIncomingBuffer didn't handle it (e.g., buffer was empty), handle it now. - # We know deps are met, so it should be ready. - # NOTE: Need to ensure addToHistory isn't called twice if processIncomingBuffer also adds it. - # Let's assume processIncomingBuffer handles adding to history if it processes the message. - # We only call the callback here if it wasn't handled by processIncomingBuffer. - # A more robust check would involve seeing if msgId was added to 'processed' set in processIncomingBuffer, - # but let's try simply calling the callback if the condition is met. - # We already added to history on line 222. - echo "[Nim Core] unwrapReceivedMessage: Message ready (direct): ", msg.messageId - # rm.addToHistory(msg.messageId) # Removed potential duplicate add if rm.onMessageReady != nil: - echo "[Nim Core] unwrapReceivedMessage: Calling onMessageReady for: ", - msg.messageId - rm.onMessageReady(rm, msg.messageId) # Pass rm - else: - echo "[Nim Core] unwrapReceivedMessage: rm.onMessageReady is nil, cannot call callback for: ", - msg.messageId + rm.onMessageReady(rm, msg.messageId) else: # Buffer message and request missing dependencies - echo "[Nim Core] unwrapReceivedMessage: Buffering message due to missing deps: ", - msg.messageId rm.incomingBuffer.add(msg) - echo "[Nim Core] unwrapReceivedMessage: Checking onMissingDependencies callback for: ", - msg.messageId if rm.onMissingDependencies != nil: - echo "[Nim Core] unwrapReceivedMessage: Calling onMissingDependencies for: ", - msg.messageId - rm.onMissingDependencies(rm, msg.messageId, missingDeps) # Pass rm - else: - echo "[Nim Core] unwrapReceivedMessage: rm.onMissingDependencies is nil, cannot call callback for: ", - msg.messageId + rm.onMissingDependencies(rm, msg.messageId, missingDeps) return ok((msg.content, missingDeps)) except: @@ -279,7 +242,6 @@ proc markDependenciesMet*( if not rm.bloomFilter.contains(msgId): rm.bloomFilter.add(msgId) # rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application? - echo "[Nim Core] markDependenciesMet: Calling processIncomingBuffer after marking deps" rm.processIncomingBuffer() return ok() @@ -289,14 +251,12 @@ proc markDependenciesMet*( proc setCallbacks*( rm: ReliabilityManager, onMessageReady: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.}, - # Add rm onMessageSent: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.}, - # Add rm onMissingDependencies: proc( rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID] - ) {.gcsafe.}, # Add rm + ) {.gcsafe.}, onPeriodicSync: proc(rm: ReliabilityManager) {.gcsafe.} = nil, -) = # Add rm, make type explicit +) = ## Sets the callback functions for various events in the ReliabilityManager. ## ## Parameters: @@ -329,14 +289,8 @@ proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} = else: if rm.onMessageSent != nil: # Assuming message timeout means it's considered "sent" or "failed" - echo "[Nim Core] checkUnacknowledgedMessages: Calling onMessageSent for timed out message: ", - unackMsg.message.messageId - rm.onMessageSent(rm, unackMsg.message.messageId) # Pass rm - else: - echo "[Nim Core] checkUnacknowledgedMessages: rm.onMessageSent is nil for timed out message: ", - unackMsg.message.messageId + rm.onMessageSent(rm, unackMsg.message.messageId) else: - # Dedent this else to match `if elapsed > rm.config.resendInterval:` (line 296) newOutgoingBuffer.add(unackMsg) rm.outgoingBuffer = newOutgoingBuffer @@ -360,12 +314,8 @@ proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledErr while true: {.gcsafe.}: try: - echo "[Nim Core] periodicSyncMessage: Checking onPeriodicSync callback" if rm.onPeriodicSync != nil: - echo "[Nim Core] periodicSyncMessage: Calling onPeriodicSync" rm.onPeriodicSync(rm) # Pass rm - else: - echo "[Nim Core] periodicSyncMessage: rm.onPeriodicSync is nil" except Exception as e: logError("Error in periodic sync: " & e.msg) await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds)) diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index eab5056..9d1cfb6 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -19,7 +19,6 @@ type data2: pointer, data3: csize_t, ) {.cdecl, gcsafe.} - PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} # This is the Nim internal type ReliabilityConfig* = object @@ -42,15 +41,13 @@ type channelId*: string config*: ReliabilityConfig lock*: Lock - # Nim internal callbacks (assigned in bindings/bindings.nim) + # Nim internal callbacks (assigned in bindings) onMessageReady*: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} - # Pass rm onMessageSent*: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} - # Pass rm onMissingDependencies*: proc( rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID] - ) {.gcsafe.} # Pass rm - onPeriodicSync*: proc(rm: ReliabilityManager) {.gcsafe.} # Pass rm + ) {.gcsafe.} + onPeriodicSync*: proc(rm: ReliabilityManager) {.gcsafe.} # C callback info (set via RegisterCallback) cCallback*: CEventCallback