mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-03 22:53:12 +00:00
chore: cleanup
This commit is contained in:
parent
f162630b63
commit
2cb504959c
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,6 +1,6 @@
|
||||
.DS_Store
|
||||
tests/test_reliability
|
||||
tests/bloom
|
||||
tests/test_bloom
|
||||
nph
|
||||
docs
|
||||
for_reference
|
||||
|
||||
@ -1,19 +1,10 @@
|
||||
import std/typetraits
|
||||
import system # for GC thread setup/teardown
|
||||
import chronos
|
||||
import results
|
||||
import system, chronos, results
|
||||
import ../src/[reliability, reliability_utils, message]
|
||||
|
||||
type CReliabilityManagerHandle* = pointer
|
||||
|
||||
type
|
||||
# Callback Types (Imported from C Header)
|
||||
CEventType* {.importc: "CEventType", header: "bindings.h", pure.} = enum
|
||||
EVENT_MESSAGE_READY = 1
|
||||
EVENT_MESSAGE_SENT = 2
|
||||
EVENT_MISSING_DEPENDENCIES = 3
|
||||
EVENT_PERIODIC_SYNC = 4
|
||||
|
||||
CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object
|
||||
is_ok*: bool
|
||||
error_message*: cstring
|
||||
@ -34,30 +25,23 @@ type
|
||||
|
||||
proc allocCString*(s: string): cstring {.inline, gcsafe.} =
|
||||
if s.len == 0:
|
||||
echo "[Nim Binding][allocCString] Allocating empty string"
|
||||
return nil
|
||||
result = cast[cstring](allocShared(s.len + 1))
|
||||
copyMem(result, s.cstring, s.len + 1)
|
||||
echo "[Nim Binding][allocCString] Allocated cstring at ",
|
||||
cast[int](result), " for: ", s
|
||||
|
||||
proc allocSeqByte*(s: seq[byte]): (pointer, csize_t) {.inline, gcsafe.} =
|
||||
if s.len == 0:
|
||||
echo "[Nim Binding][allocSeqByte] Allocating empty seq[byte]"
|
||||
return (nil, 0)
|
||||
let len = s.len
|
||||
let bufferPtr = allocShared(len)
|
||||
if len > 0:
|
||||
copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural)
|
||||
echo "[Nim Binding][allocSeqByte] Allocated buffer at ",
|
||||
cast[int](bufferPtr), " of length ", len
|
||||
return (bufferPtr, len.csize_t)
|
||||
|
||||
proc allocSeqCString*(
|
||||
s: seq[string]
|
||||
): (ptr cstring, csize_t) {.inline, gcsafe, cdecl.} =
|
||||
if s.len == 0:
|
||||
echo "[Nim Binding][allocSeqCString] Allocating empty seq[string]"
|
||||
return (nil, 0)
|
||||
let count = s.len
|
||||
# Allocate memory for 'count' cstring pointers, cast to ptr UncheckedArray
|
||||
@ -65,33 +49,22 @@ proc allocSeqCString*(
|
||||
for i in 0 ..< count:
|
||||
# Allocate each string and store its pointer in the array using unchecked array indexing
|
||||
arrPtr[i] = allocCString(s[i])
|
||||
echo "[Nim Binding][allocSeqCString] Allocated cstring for missingDep[",
|
||||
i, "]: ", s[i], " at ", cast[int](arrPtr[i])
|
||||
# Return pointer to the first element, cast back to ptr cstring
|
||||
echo "[Nim Binding][allocSeqCString] Allocated array at ",
|
||||
cast[int](arrPtr), " with count ", count
|
||||
return (cast[ptr cstring](arrPtr), count.csize_t)
|
||||
|
||||
proc freeCString*(cs: cstring) {.inline, gcsafe.} =
|
||||
if cs != nil:
|
||||
echo "[Nim Binding][freeCString] Freeing cstring at ", cast[int](cs)
|
||||
deallocShared(cs)
|
||||
|
||||
proc freeSeqByte*(bufferPtr: pointer) {.inline, gcsafe, cdecl.} =
|
||||
if bufferPtr != nil:
|
||||
echo "[Nim Binding][freeSeqByte] Freeing buffer at ", cast[int](bufferPtr)
|
||||
deallocShared(bufferPtr)
|
||||
|
||||
# Corrected to accept ptr cstring
|
||||
proc freeSeqCString*(arrPtr: ptr cstring, count: csize_t) {.inline, gcsafe, cdecl.} =
|
||||
if arrPtr != nil:
|
||||
echo "[Nim Binding][freeSeqCString] Freeing array at ",
|
||||
cast[int](arrPtr), " with count ", count
|
||||
# Cast to ptr UncheckedArray for proper iteration/indexing before freeing
|
||||
let arr = cast[ptr UncheckedArray[cstring]](arrPtr)
|
||||
for i in 0 ..< count:
|
||||
echo "[Nim Binding][freeSeqCString] Freeing cstring[",
|
||||
i, "] at ", cast[int](arr[i])
|
||||
freeCString(arr[i]) # Free each individual cstring
|
||||
deallocShared(arrPtr) # Free the array pointer itself
|
||||
|
||||
@ -114,15 +87,13 @@ proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) {.gcs
|
||||
defer:
|
||||
tearDownForeignThreadGc() # Ensure teardown even if callback errors
|
||||
|
||||
echo "[Nim Binding] nimMessageReadyCallback called for: ", messageId
|
||||
let handle = cast[CReliabilityManagerHandle](rm) # Still use handle for C side
|
||||
let cb = rm.cCallback
|
||||
|
||||
if cb == nil:
|
||||
echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle)
|
||||
return
|
||||
|
||||
# Pass handle, event type, and messageId (as data1), plus user_data
|
||||
# Pass handle, event type, and messageId
|
||||
cb(handle, EVENT_MESSAGE_READY, cast[pointer](messageId.cstring), nil, 0)
|
||||
|
||||
proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
|
||||
@ -130,12 +101,10 @@ proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsa
|
||||
defer:
|
||||
tearDownForeignThreadGc()
|
||||
|
||||
echo "[Nim Binding] nimMessageSentCallback called for: ", messageId
|
||||
let handle = cast[CReliabilityManagerHandle](rm)
|
||||
let cb = rm.cCallback
|
||||
|
||||
if cb == nil:
|
||||
echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle)
|
||||
return
|
||||
|
||||
cb(handle, EVENT_MESSAGE_SENT, cast[pointer](messageId.cstring), nil, 0)
|
||||
@ -147,13 +116,10 @@ proc nimMissingDependenciesCallback(
|
||||
defer:
|
||||
tearDownForeignThreadGc()
|
||||
|
||||
echo "[Nim Binding] nimMissingDependenciesCallback called for: ",
|
||||
messageId, " with deps: ", $missingDeps
|
||||
let handle = cast[CReliabilityManagerHandle](rm)
|
||||
let cb = rm.cCallback
|
||||
|
||||
if cb == nil:
|
||||
echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle)
|
||||
return
|
||||
|
||||
# Prepare data for the callback
|
||||
@ -166,9 +132,6 @@ proc nimMissingDependenciesCallback(
|
||||
cDepsNim[i] = dep.cstring # Nim GC manages these cstrings via the seq
|
||||
cDepsPtr = cast[ptr cstring](cDepsNim[0].addr)
|
||||
cDepsCount = missingDeps.len.csize_t
|
||||
# Ensure cDepsNim stays alive during the call if cDepsPtr points into it
|
||||
# Using allocSeqCString might be safer if Go needs to hold onto the data.
|
||||
# For now, assuming Go copies the data immediately during the callback.
|
||||
|
||||
cb(
|
||||
handle,
|
||||
@ -183,24 +146,21 @@ proc nimPeriodicSyncCallback(rm: ReliabilityManager) {.gcsafe.} =
|
||||
defer:
|
||||
tearDownForeignThreadGc()
|
||||
|
||||
echo "[Nim Binding] nimPeriodicSyncCallback called"
|
||||
let handle = cast[CReliabilityManagerHandle](rm)
|
||||
let cb = rm.cCallback
|
||||
|
||||
if cb == nil:
|
||||
echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle)
|
||||
return
|
||||
|
||||
cb(handle, EVENT_PERIODIC_SYNC, nil, nil, 0)
|
||||
|
||||
# --- Exported C Functions - Using Opaque Pointer ---
|
||||
# --- Exported C Functions ---
|
||||
|
||||
proc NewReliabilityManager*(
|
||||
channelIdCStr: cstring
|
||||
): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
let channelId = $channelIdCStr
|
||||
if channelId.len == 0:
|
||||
echo "Error creating ReliabilityManager: Channel ID cannot be empty"
|
||||
return nil # Return nil pointer
|
||||
let rmResult = newReliabilityManager(channelId)
|
||||
if rmResult.isOk:
|
||||
@ -221,28 +181,19 @@ proc NewReliabilityManager*(
|
||||
GC_ref(rm) # Prevent GC from moving the object while Go holds the handle
|
||||
return handle
|
||||
else:
|
||||
echo "Error creating ReliabilityManager: ", rmResult.error
|
||||
return nil # Return nil pointer
|
||||
return nil
|
||||
|
||||
proc CleanupReliabilityManager*(
|
||||
handle: CReliabilityManagerHandle
|
||||
) {.exportc, dynlib, cdecl.} =
|
||||
let handlePtr = handle
|
||||
echo "[Nim Binding][Cleanup] Called with handle: ", cast[int](handlePtr)
|
||||
if handlePtr != nil:
|
||||
# Go side should handle removing the handle from its registry.
|
||||
# We just need to unref the Nim object.
|
||||
|
||||
# Cast opaque pointer back to Nim ref type
|
||||
let rm = cast[ReliabilityManager](handlePtr)
|
||||
echo "[Nim Binding][Cleanup] Calling Nim core cleanup for handle: ",
|
||||
cast[int](handlePtr)
|
||||
cleanup(rm)
|
||||
echo "[Nim Binding][Cleanup] Calling GC_unref for handle: ", cast[int](handlePtr)
|
||||
GC_unref(rm) # Allow GC to collect the object now that Go is done
|
||||
echo "[Nim Binding][Cleanup] GC_unref returned for handle: ", cast[int](handlePtr)
|
||||
else:
|
||||
echo "[Nim Binding][Cleanup] Warning: CleanupReliabilityManager called with NULL handle"
|
||||
discard
|
||||
|
||||
proc ResetReliabilityManager*(
|
||||
handle: CReliabilityManagerHandle
|
||||
@ -250,33 +201,28 @@ proc ResetReliabilityManager*(
|
||||
if handle == nil:
|
||||
return toCResultErrStr("ReliabilityManager handle is NULL")
|
||||
let rm = cast[ReliabilityManager](handle)
|
||||
let result = resetReliabilityManager(rm)
|
||||
if result.isOk:
|
||||
let opResult = resetReliabilityManager(rm)
|
||||
if opResult.isOk:
|
||||
return toCResultOk()
|
||||
else:
|
||||
return toCResultErr(result.error)
|
||||
return toCResultErr(opResult.error)
|
||||
|
||||
proc WrapOutgoingMessage*(
|
||||
handle: CReliabilityManagerHandle,
|
||||
messageC: pointer,
|
||||
messageLen: csize_t,
|
||||
messageIdCStr: cstring,
|
||||
): CWrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe
|
||||
echo "[Nim Binding][WrapOutgoingMessage] Called with handle=",
|
||||
cast[int](handle), " messageLen=", messageLen, " messageId=", $messageIdCStr
|
||||
): CWrapResult {.exportc, dynlib, cdecl.} =
|
||||
if handle == nil:
|
||||
echo "[Nim Binding][WrapOutgoingMessage] Error: handle is nil"
|
||||
return
|
||||
CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
|
||||
let rm = cast[ReliabilityManager](handle)
|
||||
|
||||
if messageC == nil and messageLen > 0:
|
||||
echo "[Nim Binding][WrapOutgoingMessage] Error: message pointer is NULL but length > 0"
|
||||
return CWrapResult(
|
||||
base_result: toCResultErrStr("Message pointer is NULL but length > 0")
|
||||
)
|
||||
if messageIdCStr == nil:
|
||||
echo "[Nim Binding][WrapOutgoingMessage] Error: messageId pointer is NULL"
|
||||
return CWrapResult(base_result: toCResultErrStr("Message ID pointer is NULL"))
|
||||
|
||||
let messageId = $messageIdCStr
|
||||
@ -290,28 +236,21 @@ proc WrapOutgoingMessage*(
|
||||
let wrapResult = wrapOutgoingMessage(rm, messageNim, messageId)
|
||||
if wrapResult.isOk:
|
||||
let (wrappedDataPtr, wrappedDataLen) = allocSeqByte(wrapResult.get())
|
||||
echo "[Nim Binding][WrapOutgoingMessage] Returning wrapped message at ",
|
||||
cast[int](wrappedDataPtr), " len=", wrappedDataLen
|
||||
return CWrapResult(
|
||||
base_result: toCResultOk(), message: wrappedDataPtr, message_len: wrappedDataLen
|
||||
)
|
||||
else:
|
||||
echo "[Nim Binding][WrapOutgoingMessage] Error: ", $wrapResult.error
|
||||
return CWrapResult(base_result: toCResultErr(wrapResult.error))
|
||||
|
||||
proc UnwrapReceivedMessage*(
|
||||
handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t
|
||||
): CUnwrapResult {.exportc, dynlib, cdecl.} =
|
||||
echo "[Nim Binding][UnwrapReceivedMessage] Called with handle=",
|
||||
cast[int](handle), " messageLen=", messageLen
|
||||
if handle == nil:
|
||||
echo "[Nim Binding][UnwrapReceivedMessage] Error: handle is nil"
|
||||
return
|
||||
CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
|
||||
let rm = cast[ReliabilityManager](handle)
|
||||
|
||||
if messageC == nil and messageLen > 0:
|
||||
echo "[Nim Binding][UnwrapReceivedMessage] Error: message pointer is NULL but length > 0"
|
||||
return CUnwrapResult(
|
||||
base_result: toCResultErrStr("Message pointer is NULL but length > 0")
|
||||
)
|
||||
@ -328,14 +267,6 @@ proc UnwrapReceivedMessage*(
|
||||
let (unwrappedContent, missingDepsNim) = unwrapResult.get()
|
||||
let (contentPtr, contentLen) = allocSeqByte(unwrappedContent)
|
||||
let (depsPtr, depsCount) = allocSeqCString(missingDepsNim)
|
||||
echo "[Nim Binding][UnwrapReceivedMessage] Returning content at ",
|
||||
cast[int](contentPtr),
|
||||
" len=",
|
||||
contentLen,
|
||||
" missingDepsPtr=",
|
||||
cast[int](depsPtr),
|
||||
" count=",
|
||||
depsCount
|
||||
return CUnwrapResult(
|
||||
base_result: toCResultOk(),
|
||||
message: contentPtr,
|
||||
@ -344,44 +275,33 @@ proc UnwrapReceivedMessage*(
|
||||
missing_deps_count: depsCount,
|
||||
)
|
||||
else:
|
||||
echo "[Nim Binding][UnwrapReceivedMessage] Error: ", $unwrapResult.error
|
||||
return CUnwrapResult(base_result: toCResultErr(unwrapResult.error))
|
||||
|
||||
proc MarkDependenciesMet*(
|
||||
handle: CReliabilityManagerHandle, messageIDsC: ptr cstring, count: csize_t
|
||||
): CResult {.exportc, dynlib, cdecl.} =
|
||||
echo "[Nim Binding][MarkDependenciesMet] Called with handle=",
|
||||
cast[int](handle), " count=", count
|
||||
if handle == nil:
|
||||
echo "[Nim Binding][MarkDependenciesMet] Error: handle is nil"
|
||||
return toCResultErrStr("ReliabilityManager handle is NULL")
|
||||
let rm = cast[ReliabilityManager](handle)
|
||||
|
||||
if messageIDsC == nil and count > 0:
|
||||
echo "[Nim Binding][MarkDependenciesMet] Error: messageIDs pointer is NULL but count > 0"
|
||||
return toCResultErrStr("MessageIDs pointer is NULL but count > 0")
|
||||
|
||||
var messageIDsNim = newSeq[string](count)
|
||||
# Cast to ptr UncheckedArray for indexing
|
||||
let messageIDsCArray = cast[ptr UncheckedArray[cstring]](messageIDsC)
|
||||
for i in 0 ..< count:
|
||||
let currentCStr = messageIDsCArray[i] # Use unchecked array indexing
|
||||
let currentCStr = messageIDsCArray[i]
|
||||
if currentCStr != nil:
|
||||
messageIDsNim[i] = $currentCStr
|
||||
echo "[Nim Binding][MarkDependenciesMet] messageID[",
|
||||
i, "] = ", messageIDsNim[i], " at ", cast[int](currentCStr)
|
||||
else:
|
||||
echo "[Nim Binding][MarkDependenciesMet] NULL message ID found in array at index ",
|
||||
i
|
||||
return toCResultErrStr("NULL message ID found in array")
|
||||
|
||||
let result = markDependenciesMet(rm, messageIDsNim)
|
||||
if result.isOk:
|
||||
echo "[Nim Binding][MarkDependenciesMet] Success"
|
||||
let opResult = markDependenciesMet(rm, messageIDsNim)
|
||||
if opResult.isOk:
|
||||
return toCResultOk()
|
||||
else:
|
||||
echo "[Nim Binding][MarkDependenciesMet] Error: ", $result.error
|
||||
return toCResultErr(result.error)
|
||||
return toCResultErr(opResult.error)
|
||||
|
||||
proc RegisterCallback*(
|
||||
handle: CReliabilityManagerHandle,
|
||||
@ -389,16 +309,13 @@ proc RegisterCallback*(
|
||||
cUserDataPtr: pointer,
|
||||
) {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
if handle == nil:
|
||||
echo "[Nim Binding][RegisterCallback] Error: handle is NULL"
|
||||
return
|
||||
let rm = cast[ReliabilityManager](handle)
|
||||
rm.cCallback = cEventCallback
|
||||
rm.cUserData = cUserDataPtr # Store user data pointer
|
||||
echo "[Nim Binding] Stored C callback and user data for handle: ", cast[int](handle)
|
||||
rm.cUserData = cUserDataPtr
|
||||
|
||||
proc StartPeriodicTasks*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} =
|
||||
if handle == nil:
|
||||
echo "Error: Cannot start periodic tasks: NULL ReliabilityManager handle"
|
||||
return
|
||||
let rm = cast[ReliabilityManager](handle)
|
||||
startPeriodicTasks(rm)
|
||||
|
||||
Binary file not shown.
BIN
bindings/generated/libsds.dylib
Executable file
BIN
bindings/generated/libsds.dylib
Executable file
Binary file not shown.
BIN
bindings/generated/libsds.dylib.arm
Executable file
BIN
bindings/generated/libsds.dylib.arm
Executable file
Binary file not shown.
BIN
bindings/generated/libsds.dylib.x64
Executable file
BIN
bindings/generated/libsds.dylib.x64
Executable file
Binary file not shown.
@ -17,8 +17,8 @@ task test, "Run the test suite":
|
||||
|
||||
task bindings, "Generate bindings":
|
||||
proc compile(libName: string, flags = "") =
|
||||
exec "nim c -f " & flags & " -d:release --app:lib --mm:arc --tlsEmulation:off --out:" &
|
||||
libName & " --outdir:bindings/generated bindings/bindings.nim"
|
||||
exec "nim c -f " & flags & " -d:release --app:lib --mm:refc --out:" & libName &
|
||||
" --outdir:bindings/generated bindings/bindings.nim"
|
||||
|
||||
# Create required directories
|
||||
mkDir "bindings/generated"
|
||||
|
||||
@ -2,7 +2,7 @@ package main
|
||||
|
||||
/*
|
||||
#cgo CFLAGS: -I${SRCDIR}/bindings
|
||||
#cgo LDFLAGS: -L${SRCDIR}/bindings/generated -lbindings
|
||||
#cgo LDFLAGS: -L${SRCDIR}/bindings/generated -lsds
|
||||
#cgo LDFLAGS: -Wl,-rpath,${SRCDIR}/bindings/generated
|
||||
|
||||
#include <stdlib.h> // For C.free
|
||||
@ -41,7 +41,7 @@ type Callbacks struct {
|
||||
OnPeriodicSync func()
|
||||
}
|
||||
|
||||
// Global map to store callbacks associated with handles
|
||||
// Global map to store callbacks associated with handles (needed for Go relay)
|
||||
var (
|
||||
callbackRegistry = make(map[ReliabilityManagerHandle]*Callbacks)
|
||||
registryMutex sync.RWMutex
|
||||
@ -56,7 +56,6 @@ func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) {
|
||||
|
||||
handle := C.NewReliabilityManager(cChannelId)
|
||||
if handle == nil {
|
||||
// Note: Nim side currently just prints to stdout on creation failure
|
||||
return nil, errors.New("failed to create ReliabilityManager (check Nim logs/stdout)")
|
||||
}
|
||||
return ReliabilityManagerHandle(handle), nil
|
||||
@ -64,14 +63,14 @@ func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) {
|
||||
|
||||
// CleanupReliabilityManager frees the resources associated with the handle
|
||||
func CleanupReliabilityManager(handle ReliabilityManagerHandle) {
|
||||
fmt.Printf("Go: CleanupReliabilityManager called for handle %p\n", handle) // Log entry
|
||||
if handle == nil {
|
||||
fmt.Println("Go: CleanupReliabilityManager: handle is nil, returning.")
|
||||
return
|
||||
}
|
||||
fmt.Printf("Go: CleanupReliabilityManager: Calling C.CleanupReliabilityManager for handle %p\n", handle)
|
||||
// Remove from Go registry first
|
||||
registryMutex.Lock()
|
||||
delete(callbackRegistry, handle)
|
||||
registryMutex.Unlock()
|
||||
C.CleanupReliabilityManager(unsafe.Pointer(handle))
|
||||
fmt.Printf("Go: CleanupReliabilityManager: C.CleanupReliabilityManager returned for handle %p\n", handle) // Log exit
|
||||
}
|
||||
|
||||
// ResetReliabilityManager resets the state of the manager
|
||||
@ -240,7 +239,6 @@ func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 un
|
||||
registryMutex.RUnlock()
|
||||
|
||||
if !ok || callbacks == nil {
|
||||
fmt.Printf("Go: globalCallbackRelay: No callbacks registered for handle %v\n", goHandle)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
package main
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -32,14 +31,12 @@ func TestHandleUniqueness(t *testing.T) {
|
||||
t.Fatalf("NewReliabilityManager (1) failed: %v", err1)
|
||||
}
|
||||
defer CleanupReliabilityManager(handle1)
|
||||
t.Logf("Handle 1: %p", handle1)
|
||||
|
||||
handle2, err2 := NewReliabilityManager(channelID)
|
||||
if err2 != nil || handle2 == nil {
|
||||
t.Fatalf("NewReliabilityManager (2) failed: %v", err2)
|
||||
}
|
||||
defer CleanupReliabilityManager(handle2)
|
||||
t.Logf("Handle 2: %p", handle2)
|
||||
|
||||
if handle1 == handle2 {
|
||||
t.Errorf("Expected unique handles, but both are %p", handle1)
|
||||
@ -166,14 +163,11 @@ func TestCallback_OnMessageReady(t *testing.T) {
|
||||
|
||||
callbacks := Callbacks{
|
||||
OnMessageReady: func(messageId MessageID) {
|
||||
fmt.Printf("Test_OnMessageReady: Received: %s\n", messageId)
|
||||
// Non-blocking send to channel
|
||||
select {
|
||||
case readyChan <- messageId:
|
||||
fmt.Printf("Test_OnMessageReady: Sent '%s' to readyChan\n", messageId) // Log after send
|
||||
default:
|
||||
// Avoid blocking if channel is full or test already timed out
|
||||
fmt.Printf("Test_OnMessageReady: Warning - readyChan buffer full or test finished for %s\n", messageId)
|
||||
}
|
||||
},
|
||||
}
|
||||
@ -189,14 +183,12 @@ func TestCallback_OnMessageReady(t *testing.T) {
|
||||
msgID := MessageID("cb-ready-1")
|
||||
|
||||
// Wrap on sender
|
||||
t.Logf("Test_OnMessageReady: Wrapping message with handleSender: %p", handleSender) // Log sender handle
|
||||
wrappedMsg, err := WrapOutgoingMessage(handleSender, payload, msgID)
|
||||
if err != nil {
|
||||
t.Fatalf("WrapOutgoingMessage failed: %v", err)
|
||||
}
|
||||
|
||||
// Unwrap on receiver
|
||||
t.Logf("Test_OnMessageReady: Unwrapping message with handleReceiver: %p", handleReceiver) // Log receiver handle
|
||||
_, _, err = UnwrapReceivedMessage(handleReceiver, wrappedMsg)
|
||||
if err != nil {
|
||||
t.Fatalf("UnwrapReceivedMessage failed: %v", err)
|
||||
@ -240,7 +232,6 @@ func TestCallback_OnMessageSent(t *testing.T) {
|
||||
|
||||
callbacks := Callbacks{
|
||||
OnMessageSent: func(messageId MessageID) {
|
||||
fmt.Printf("Test_OnMessageSent: Received: %s\n", messageId)
|
||||
cbMutex.Lock()
|
||||
sentCalled = true
|
||||
sentMsgID = messageId
|
||||
@ -328,7 +319,6 @@ func TestCallback_OnMissingDependencies(t *testing.T) {
|
||||
|
||||
callbacks := Callbacks{
|
||||
OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) {
|
||||
fmt.Printf("Test_OnMissingDependencies: Received for %s: %v\n", messageId, missingDeps)
|
||||
cbMutex.Lock()
|
||||
missingCalled = true
|
||||
missingMsgID = messageId
|
||||
@ -354,7 +344,6 @@ func TestCallback_OnMissingDependencies(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("WrapOutgoingMessage (1) on sender failed: %v", err)
|
||||
}
|
||||
// _, _, err = UnwrapReceivedMessage(handleSender, wrappedMsg1) // No need to unwrap on sender
|
||||
|
||||
// 2. Sender sends msg2 (depends on msg1)
|
||||
payload2 := []byte("missing test 2")
|
||||
@ -410,7 +399,6 @@ func TestCallback_OnPeriodicSync(t *testing.T) {
|
||||
|
||||
callbacks := Callbacks{
|
||||
OnPeriodicSync: func() {
|
||||
fmt.Println("Test_OnPeriodicSync: Received")
|
||||
cbMutex.Lock()
|
||||
if !syncCalled { // Only signal the first time
|
||||
syncCalled = true
|
||||
@ -433,8 +421,6 @@ func TestCallback_OnPeriodicSync(t *testing.T) {
|
||||
|
||||
// --- Verification ---
|
||||
// Wait for the periodic sync callback with a timeout (needs to be longer than sync interval)
|
||||
// Default sync interval is 30s, which is too long for a unit test.
|
||||
// We rely on the periodic tasks starting quickly and triggering the callback soon.
|
||||
select {
|
||||
case <-syncChan:
|
||||
// Success
|
||||
|
||||
@ -60,15 +60,8 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) =
|
||||
logError("Failed to deserialize bloom filter")
|
||||
|
||||
if acknowledged:
|
||||
echo "[Nim Core] reviewAckStatus: Message acknowledged: ",
|
||||
outMsg.message.messageId
|
||||
if rm.onMessageSent != nil:
|
||||
echo "[Nim Core] reviewAckStatus: Calling onMessageSent for: ",
|
||||
outMsg.message.messageId
|
||||
rm.onMessageSent(rm, outMsg.message.messageId) # Pass rm
|
||||
else:
|
||||
echo "[Nim Core] reviewAckStatus: rm.onMessageSent is nil, cannot call callback for: ",
|
||||
outMsg.message.messageId
|
||||
rm.outgoingBuffer.delete(i)
|
||||
else:
|
||||
inc i
|
||||
@ -158,14 +151,8 @@ proc processIncomingBuffer(rm: ReliabilityManager) =
|
||||
for msg in rm.incomingBuffer:
|
||||
if msg.messageId == msgId:
|
||||
rm.addToHistory(msg.messageId)
|
||||
echo "[Nim Core] processIncomingBuffer: Message ready: ", msg.messageId
|
||||
if rm.onMessageReady != nil:
|
||||
echo "[Nim Core] processIncomingBuffer: Calling onMessageReady for: ",
|
||||
msg.messageId
|
||||
rm.onMessageReady(rm, msg.messageId) # Pass rm
|
||||
else:
|
||||
echo "[Nim Core] processIncomingBuffer: rm.onMessageReady is nil, cannot call callback for: ",
|
||||
msg.messageId
|
||||
rm.onMessageReady(rm, msg.messageId)
|
||||
processed.incl(msgId)
|
||||
# Add any dependent messages that might now be ready
|
||||
if msgId in dependencies:
|
||||
@ -197,11 +184,10 @@ proc unwrapReceivedMessage*(
|
||||
|
||||
let msg = msgResult.get
|
||||
if rm.bloomFilter.contains(msg.messageId):
|
||||
echo "[Nim Core] unwrapReceivedMessage: Duplicate message detected (in bloom filter): ",
|
||||
msg.messageId # Add this log
|
||||
# Duplicate message detected
|
||||
return ok((msg.content, @[]))
|
||||
|
||||
rm.bloomFilter.add(msg.messageId) # Add to receiver's bloom filter
|
||||
rm.bloomFilter.add(msg.messageId)
|
||||
|
||||
# Update Lamport timestamp
|
||||
rm.updateLamportTimestamp(msg.lamportTimestamp)
|
||||
@ -228,36 +214,13 @@ proc unwrapReceivedMessage*(
|
||||
rm.addToHistory(msg.messageId)
|
||||
rm.processIncomingBuffer() # This might trigger onMessageReady internally
|
||||
# If processIncomingBuffer didn't handle it (e.g., buffer was empty), handle it now.
|
||||
# We know deps are met, so it should be ready.
|
||||
# NOTE: Need to ensure addToHistory isn't called twice if processIncomingBuffer also adds it.
|
||||
# Let's assume processIncomingBuffer handles adding to history if it processes the message.
|
||||
# We only call the callback here if it wasn't handled by processIncomingBuffer.
|
||||
# A more robust check would involve seeing if msgId was added to 'processed' set in processIncomingBuffer,
|
||||
# but let's try simply calling the callback if the condition is met.
|
||||
# We already added to history on line 222.
|
||||
echo "[Nim Core] unwrapReceivedMessage: Message ready (direct): ", msg.messageId
|
||||
# rm.addToHistory(msg.messageId) # Removed potential duplicate add
|
||||
if rm.onMessageReady != nil:
|
||||
echo "[Nim Core] unwrapReceivedMessage: Calling onMessageReady for: ",
|
||||
msg.messageId
|
||||
rm.onMessageReady(rm, msg.messageId) # Pass rm
|
||||
else:
|
||||
echo "[Nim Core] unwrapReceivedMessage: rm.onMessageReady is nil, cannot call callback for: ",
|
||||
msg.messageId
|
||||
rm.onMessageReady(rm, msg.messageId)
|
||||
else:
|
||||
# Buffer message and request missing dependencies
|
||||
echo "[Nim Core] unwrapReceivedMessage: Buffering message due to missing deps: ",
|
||||
msg.messageId
|
||||
rm.incomingBuffer.add(msg)
|
||||
echo "[Nim Core] unwrapReceivedMessage: Checking onMissingDependencies callback for: ",
|
||||
msg.messageId
|
||||
if rm.onMissingDependencies != nil:
|
||||
echo "[Nim Core] unwrapReceivedMessage: Calling onMissingDependencies for: ",
|
||||
msg.messageId
|
||||
rm.onMissingDependencies(rm, msg.messageId, missingDeps) # Pass rm
|
||||
else:
|
||||
echo "[Nim Core] unwrapReceivedMessage: rm.onMissingDependencies is nil, cannot call callback for: ",
|
||||
msg.messageId
|
||||
rm.onMissingDependencies(rm, msg.messageId, missingDeps)
|
||||
|
||||
return ok((msg.content, missingDeps))
|
||||
except:
|
||||
@ -279,7 +242,6 @@ proc markDependenciesMet*(
|
||||
if not rm.bloomFilter.contains(msgId):
|
||||
rm.bloomFilter.add(msgId)
|
||||
# rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application?
|
||||
echo "[Nim Core] markDependenciesMet: Calling processIncomingBuffer after marking deps"
|
||||
rm.processIncomingBuffer()
|
||||
|
||||
return ok()
|
||||
@ -289,14 +251,12 @@ proc markDependenciesMet*(
|
||||
proc setCallbacks*(
|
||||
rm: ReliabilityManager,
|
||||
onMessageReady: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.},
|
||||
# Add rm
|
||||
onMessageSent: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.},
|
||||
# Add rm
|
||||
onMissingDependencies: proc(
|
||||
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
|
||||
) {.gcsafe.}, # Add rm
|
||||
) {.gcsafe.},
|
||||
onPeriodicSync: proc(rm: ReliabilityManager) {.gcsafe.} = nil,
|
||||
) = # Add rm, make type explicit
|
||||
) =
|
||||
## Sets the callback functions for various events in the ReliabilityManager.
|
||||
##
|
||||
## Parameters:
|
||||
@ -329,14 +289,8 @@ proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} =
|
||||
else:
|
||||
if rm.onMessageSent != nil:
|
||||
# Assuming message timeout means it's considered "sent" or "failed"
|
||||
echo "[Nim Core] checkUnacknowledgedMessages: Calling onMessageSent for timed out message: ",
|
||||
unackMsg.message.messageId
|
||||
rm.onMessageSent(rm, unackMsg.message.messageId) # Pass rm
|
||||
else:
|
||||
echo "[Nim Core] checkUnacknowledgedMessages: rm.onMessageSent is nil for timed out message: ",
|
||||
unackMsg.message.messageId
|
||||
rm.onMessageSent(rm, unackMsg.message.messageId)
|
||||
else:
|
||||
# Dedent this else to match `if elapsed > rm.config.resendInterval:` (line 296)
|
||||
newOutgoingBuffer.add(unackMsg)
|
||||
|
||||
rm.outgoingBuffer = newOutgoingBuffer
|
||||
@ -360,12 +314,8 @@ proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledErr
|
||||
while true:
|
||||
{.gcsafe.}:
|
||||
try:
|
||||
echo "[Nim Core] periodicSyncMessage: Checking onPeriodicSync callback"
|
||||
if rm.onPeriodicSync != nil:
|
||||
echo "[Nim Core] periodicSyncMessage: Calling onPeriodicSync"
|
||||
rm.onPeriodicSync(rm) # Pass rm
|
||||
else:
|
||||
echo "[Nim Core] periodicSyncMessage: rm.onPeriodicSync is nil"
|
||||
except Exception as e:
|
||||
logError("Error in periodic sync: " & e.msg)
|
||||
await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds))
|
||||
|
||||
@ -19,7 +19,6 @@ type
|
||||
data2: pointer,
|
||||
data3: csize_t,
|
||||
) {.cdecl, gcsafe.}
|
||||
|
||||
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} # This is the Nim internal type
|
||||
|
||||
ReliabilityConfig* = object
|
||||
@ -42,15 +41,13 @@ type
|
||||
channelId*: string
|
||||
config*: ReliabilityConfig
|
||||
lock*: Lock
|
||||
# Nim internal callbacks (assigned in bindings/bindings.nim)
|
||||
# Nim internal callbacks (assigned in bindings)
|
||||
onMessageReady*: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.}
|
||||
# Pass rm
|
||||
onMessageSent*: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.}
|
||||
# Pass rm
|
||||
onMissingDependencies*: proc(
|
||||
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
|
||||
) {.gcsafe.} # Pass rm
|
||||
onPeriodicSync*: proc(rm: ReliabilityManager) {.gcsafe.} # Pass rm
|
||||
) {.gcsafe.}
|
||||
onPeriodicSync*: proc(rm: ReliabilityManager) {.gcsafe.}
|
||||
|
||||
# C callback info (set via RegisterCallback)
|
||||
cCallback*: CEventCallback
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user