fix: fixed callbacks issue

go wrapper test working now
This commit is contained in:
shash256 2025-04-20 00:04:24 +05:30
parent bd4c81c73d
commit f162630b63
16 changed files with 979 additions and 527 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -1,21 +1,19 @@
import std/[locks, typetraits, tables] # Added tables import std/typetraits
import system # for GC thread setup/teardown
import chronos import chronos
import results import results
import ../src/[reliability, reliability_utils, message] import ../src/[reliability, reliability_utils, message]
type type CReliabilityManagerHandle* = pointer
CReliabilityManagerHandle* = pointer
type type
# Callback Types (Imported from C Header) # Callback Types (Imported from C Header)
CEventType* {.importc: "CEventType", header: "bindings.h", pure.} = enum CEventType* {.importc: "CEventType", header: "bindings.h", pure.} = enum
EVENT_MESSAGE_READY = 1, EVENT_MESSAGE_READY = 1
EVENT_MESSAGE_SENT = 2, EVENT_MESSAGE_SENT = 2
EVENT_MISSING_DEPENDENCIES = 3, EVENT_MISSING_DEPENDENCIES = 3
EVENT_PERIODIC_SYNC = 4 EVENT_PERIODIC_SYNC = 4
CEventCallback* = proc(handle: pointer, eventType: CEventType, data1: pointer, data2: pointer, data3: csize_t) {.cdecl.} # Use csize_t
CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object
is_ok*: bool is_ok*: bool
error_message*: cstring error_message*: cstring
@ -32,54 +30,68 @@ type
missing_deps*: ptr cstring missing_deps*: ptr cstring
missing_deps_count*: csize_t missing_deps_count*: csize_t
# --- Callback Registry ---
type
CallbackRegistry = Table[CReliabilityManagerHandle, CEventCallback]
var
callbackRegistry: CallbackRegistry
registryLock: Lock
initLock(registryLock)
# --- Memory Management Helpers --- # --- Memory Management Helpers ---
proc allocCString*(s: string): cstring {.inline, gcsafe.} = proc allocCString*(s: string): cstring {.inline, gcsafe.} =
if s.len == 0: return nil if s.len == 0:
echo "[Nim Binding][allocCString] Allocating empty string"
return nil
result = cast[cstring](allocShared(s.len + 1)) result = cast[cstring](allocShared(s.len + 1))
copyMem(result, s.cstring, s.len + 1) copyMem(result, s.cstring, s.len + 1)
echo "[Nim Binding][allocCString] Allocated cstring at ",
cast[int](result), " for: ", s
proc allocSeqByte*(s: seq[byte]): (pointer, csize_t) {.inline, gcsafe.} = proc allocSeqByte*(s: seq[byte]): (pointer, csize_t) {.inline, gcsafe.} =
if s.len == 0: return (nil, 0) if s.len == 0:
echo "[Nim Binding][allocSeqByte] Allocating empty seq[byte]"
return (nil, 0)
let len = s.len let len = s.len
let bufferPtr = allocShared(len) let bufferPtr = allocShared(len)
if len > 0: if len > 0:
copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural) copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural)
echo "[Nim Binding][allocSeqByte] Allocated buffer at ",
cast[int](bufferPtr), " of length ", len
return (bufferPtr, len.csize_t) return (bufferPtr, len.csize_t)
proc allocSeqCString*(s: seq[string]): (ptr cstring, csize_t) {.inline, gcsafe, cdecl.} = proc allocSeqCString*(
if s.len == 0: return (nil, 0) s: seq[string]
): (ptr cstring, csize_t) {.inline, gcsafe, cdecl.} =
if s.len == 0:
echo "[Nim Binding][allocSeqCString] Allocating empty seq[string]"
return (nil, 0)
let count = s.len let count = s.len
# Allocate memory for 'count' cstring pointers, cast to ptr UncheckedArray # Allocate memory for 'count' cstring pointers, cast to ptr UncheckedArray
let arrPtr = cast[ptr UncheckedArray[cstring]](allocShared(count * sizeof(cstring))) let arrPtr = cast[ptr UncheckedArray[cstring]](allocShared(count * sizeof(cstring)))
for i in 0..<count: for i in 0 ..< count:
# Allocate each string and store its pointer in the array using unchecked array indexing # Allocate each string and store its pointer in the array using unchecked array indexing
arrPtr[i] = allocCString(s[i]) arrPtr[i] = allocCString(s[i])
echo "[Nim Binding][allocSeqCString] Allocated cstring for missingDep[",
i, "]: ", s[i], " at ", cast[int](arrPtr[i])
# Return pointer to the first element, cast back to ptr cstring # Return pointer to the first element, cast back to ptr cstring
echo "[Nim Binding][allocSeqCString] Allocated array at ",
cast[int](arrPtr), " with count ", count
return (cast[ptr cstring](arrPtr), count.csize_t) return (cast[ptr cstring](arrPtr), count.csize_t)
proc freeCString*(cs: cstring) {.inline, gcsafe.} = proc freeCString*(cs: cstring) {.inline, gcsafe.} =
if cs != nil: deallocShared(cs) if cs != nil:
echo "[Nim Binding][freeCString] Freeing cstring at ", cast[int](cs)
deallocShared(cs)
proc freeSeqByte*(bufferPtr: pointer) {.inline, gcsafe, cdecl.} = proc freeSeqByte*(bufferPtr: pointer) {.inline, gcsafe, cdecl.} =
if bufferPtr != nil: deallocShared(bufferPtr) if bufferPtr != nil:
echo "[Nim Binding][freeSeqByte] Freeing buffer at ", cast[int](bufferPtr)
deallocShared(bufferPtr)
# Corrected to accept ptr cstring # Corrected to accept ptr cstring
proc freeSeqCString*(arrPtr: ptr cstring, count: csize_t) {.inline, gcsafe, cdecl.} = proc freeSeqCString*(arrPtr: ptr cstring, count: csize_t) {.inline, gcsafe, cdecl.} =
if arrPtr != nil: if arrPtr != nil:
echo "[Nim Binding][freeSeqCString] Freeing array at ",
cast[int](arrPtr), " with count ", count
# Cast to ptr UncheckedArray for proper iteration/indexing before freeing # Cast to ptr UncheckedArray for proper iteration/indexing before freeing
let arr = cast[ptr UncheckedArray[cstring]](arrPtr) let arr = cast[ptr UncheckedArray[cstring]](arrPtr)
for i in 0..<count: for i in 0 ..< count:
echo "[Nim Binding][freeSeqCString] Freeing cstring[",
i, "] at ", cast[int](arr[i])
freeCString(arr[i]) # Free each individual cstring freeCString(arr[i]) # Free each individual cstring
deallocShared(arrPtr) # Free the array pointer itself deallocShared(arrPtr) # Free the array pointer itself
@ -95,42 +107,54 @@ proc toCResultErrStr*(errMsg: string): CResult =
CResult(is_ok: false, error_message: allocCString(errMsg)) CResult(is_ok: false, error_message: allocCString(errMsg))
# --- Callback Wrappers (Nim -> C) --- # --- Callback Wrappers (Nim -> C) ---
# These wrappers call the single global Go callback relay. # These wrappers retrieve the C callback info from the ReliabilityManager object.
proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
setupForeignThreadGc() # Setup GC for this Go thread
defer:
tearDownForeignThreadGc() # Ensure teardown even if callback errors
proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) =
echo "[Nim Binding] nimMessageReadyCallback called for: ", messageId echo "[Nim Binding] nimMessageReadyCallback called for: ", messageId
let handle = cast[CReliabilityManagerHandle](rm) let handle = cast[CReliabilityManagerHandle](rm) # Still use handle for C side
var cb: CEventCallback let cb = rm.cCallback
withLock registryLock:
if not callbackRegistry.hasKey(handle): if cb == nil:
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle)
return return
cb = callbackRegistry[handle]
# Pass handle, event type, and messageId (as data1), plus user_data
# Pass handle, event type, and messageId (as data1)
cb(handle, EVENT_MESSAGE_READY, cast[pointer](messageId.cstring), nil, 0) cb(handle, EVENT_MESSAGE_READY, cast[pointer](messageId.cstring), nil, 0)
proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) = proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
setupForeignThreadGc()
defer:
tearDownForeignThreadGc()
echo "[Nim Binding] nimMessageSentCallback called for: ", messageId echo "[Nim Binding] nimMessageSentCallback called for: ", messageId
let handle = cast[CReliabilityManagerHandle](rm) let handle = cast[CReliabilityManagerHandle](rm)
var cb: CEventCallback let cb = rm.cCallback
withLock registryLock:
if not callbackRegistry.hasKey(handle): if cb == nil:
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle)
return return
cb = callbackRegistry[handle]
cb(handle, EVENT_MESSAGE_SENT, cast[pointer](messageId.cstring), nil, 0) cb(handle, EVENT_MESSAGE_SENT, cast[pointer](messageId.cstring), nil, 0)
proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) = proc nimMissingDependenciesCallback(
echo "[Nim Binding] nimMissingDependenciesCallback called for: ", messageId, " with deps: ", $missingDeps rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} =
setupForeignThreadGc()
defer:
tearDownForeignThreadGc()
echo "[Nim Binding] nimMissingDependenciesCallback called for: ",
messageId, " with deps: ", $missingDeps
let handle = cast[CReliabilityManagerHandle](rm) let handle = cast[CReliabilityManagerHandle](rm)
var cb: CEventCallback let cb = rm.cCallback
withLock registryLock:
if not callbackRegistry.hasKey(handle): if cb == nil:
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle)
return return
cb = callbackRegistry[handle]
# Prepare data for the callback # Prepare data for the callback
var cDepsPtr: ptr cstring = nil var cDepsPtr: ptr cstring = nil
@ -142,24 +166,38 @@ proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID
cDepsNim[i] = dep.cstring # Nim GC manages these cstrings via the seq cDepsNim[i] = dep.cstring # Nim GC manages these cstrings via the seq
cDepsPtr = cast[ptr cstring](cDepsNim[0].addr) cDepsPtr = cast[ptr cstring](cDepsNim[0].addr)
cDepsCount = missingDeps.len.csize_t cDepsCount = missingDeps.len.csize_t
# Ensure cDepsNim stays alive during the call if cDepsPtr points into it
# Using allocSeqCString might be safer if Go needs to hold onto the data.
# For now, assuming Go copies the data immediately during the callback.
cb(handle, EVENT_MISSING_DEPENDENCIES, cast[pointer](messageId.cstring), cast[pointer](cDepsPtr), cDepsCount) cb(
handle,
EVENT_MISSING_DEPENDENCIES,
cast[pointer](messageId.cstring),
cast[pointer](cDepsPtr),
cDepsCount,
)
proc nimPeriodicSyncCallback(rm: ReliabilityManager) {.gcsafe.} =
setupForeignThreadGc()
defer:
tearDownForeignThreadGc()
proc nimPeriodicSyncCallback(rm: ReliabilityManager) =
echo "[Nim Binding] nimPeriodicSyncCallback called" echo "[Nim Binding] nimPeriodicSyncCallback called"
let handle = cast[CReliabilityManagerHandle](rm) let handle = cast[CReliabilityManagerHandle](rm)
var cb: CEventCallback let cb = rm.cCallback
withLock registryLock:
if not callbackRegistry.hasKey(handle): if cb == nil:
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) echo "[Nim Binding] No C callback stored in handle: ", cast[int](handle)
return return
cb = callbackRegistry[handle]
cb(handle, EVENT_PERIODIC_SYNC, nil, nil, 0) cb(handle, EVENT_PERIODIC_SYNC, nil, nil, 0)
# --- Exported C Functions - Using Opaque Pointer --- # --- Exported C Functions - Using Opaque Pointer ---
proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} = proc NewReliabilityManager*(
channelIdCStr: cstring
): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} =
let channelId = $channelIdCStr let channelId = $channelIdCStr
if channelId.len == 0: if channelId.len == 0:
echo "Error creating ReliabilityManager: Channel ID cannot be empty" echo "Error creating ReliabilityManager: Channel ID cannot be empty"
@ -167,12 +205,16 @@ proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {
let rmResult = newReliabilityManager(channelId) let rmResult = newReliabilityManager(channelId)
if rmResult.isOk: if rmResult.isOk:
let rm = rmResult.get() let rm = rmResult.get()
# Assign anonymous procs that capture 'rm' and call the wrappers rm.onMessageReady = proc(rmArg: ReliabilityManager, msgId: MessageID) {.gcsafe.} =
# Ensure signatures match the non-gcsafe fields in ReliabilityManager nimMessageReadyCallback(rmArg, msgId)
rm.onMessageReady = proc(msgId: MessageID) = nimMessageReadyCallback(rm, msgId) rm.onMessageSent = proc(rmArg: ReliabilityManager, msgId: MessageID) {.gcsafe.} =
rm.onMessageSent = proc(msgId: MessageID) = nimMessageSentCallback(rm, msgId) nimMessageSentCallback(rmArg, msgId)
rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) = nimMissingDependenciesCallback(rm, msgId, deps) rm.onMissingDependencies = proc(
rm.onPeriodicSync = proc() = nimPeriodicSyncCallback(rm) rmArg: ReliabilityManager, msgId: MessageID, deps: seq[MessageID]
) {.gcsafe.} =
nimMissingDependenciesCallback(rmArg, msgId, deps)
rm.onPeriodicSync = proc(rmArg: ReliabilityManager) {.gcsafe.} =
nimPeriodicSyncCallback(rmArg)
# Return the Nim ref object cast to the opaque pointer type # Return the Nim ref object cast to the opaque pointer type
let handle = cast[CReliabilityManagerHandle](rm) let handle = cast[CReliabilityManagerHandle](rm)
@ -182,21 +224,29 @@ proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {
echo "Error creating ReliabilityManager: ", rmResult.error echo "Error creating ReliabilityManager: ", rmResult.error
return nil # Return nil pointer return nil # Return nil pointer
proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} = proc CleanupReliabilityManager*(
handle: CReliabilityManagerHandle
) {.exportc, dynlib, cdecl.} =
let handlePtr = handle let handlePtr = handle
echo "[Nim Binding][Cleanup] Called with handle: ", cast[int](handlePtr)
if handlePtr != nil: if handlePtr != nil:
# Go side should handle removing the handle from its registry. # Go side should handle removing the handle from its registry.
# We just need to unref the Nim object. # We just need to unref the Nim object.
# No need to interact with gEventCallback here.
# Cast opaque pointer back to Nim ref type # Cast opaque pointer back to Nim ref type
let rm = cast[ReliabilityManager](handlePtr) let rm = cast[ReliabilityManager](handlePtr)
cleanup(rm) # Call Nim cleanup echo "[Nim Binding][Cleanup] Calling Nim core cleanup for handle: ",
cast[int](handlePtr)
cleanup(rm)
echo "[Nim Binding][Cleanup] Calling GC_unref for handle: ", cast[int](handlePtr)
GC_unref(rm) # Allow GC to collect the object now that Go is done GC_unref(rm) # Allow GC to collect the object now that Go is done
echo "[Nim Binding][Cleanup] GC_unref returned for handle: ", cast[int](handlePtr)
else: else:
echo "Warning: CleanupReliabilityManager called with NULL handle" echo "[Nim Binding][Cleanup] Warning: CleanupReliabilityManager called with NULL handle"
proc ResetReliabilityManager*(handle: CReliabilityManagerHandle): CResult {.exportc, dynlib, cdecl, gcsafe.} = proc ResetReliabilityManager*(
handle: CReliabilityManagerHandle
): CResult {.exportc, dynlib, cdecl, gcsafe.} =
if handle == nil: if handle == nil:
return toCResultErrStr("ReliabilityManager handle is NULL") return toCResultErrStr("ReliabilityManager handle is NULL")
let rm = cast[ReliabilityManager](handle) let rm = cast[ReliabilityManager](handle)
@ -206,15 +256,28 @@ proc ResetReliabilityManager*(handle: CReliabilityManagerHandle): CResult {.expo
else: else:
return toCResultErr(result.error) return toCResultErr(result.error)
proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe proc WrapOutgoingMessage*(
handle: CReliabilityManagerHandle,
messageC: pointer,
messageLen: csize_t,
messageIdCStr: cstring,
): CWrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe
echo "[Nim Binding][WrapOutgoingMessage] Called with handle=",
cast[int](handle), " messageLen=", messageLen, " messageId=", $messageIdCStr
if handle == nil: if handle == nil:
return CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) echo "[Nim Binding][WrapOutgoingMessage] Error: handle is nil"
return
CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
let rm = cast[ReliabilityManager](handle) let rm = cast[ReliabilityManager](handle)
if messageC == nil and messageLen > 0: if messageC == nil and messageLen > 0:
return CWrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0")) echo "[Nim Binding][WrapOutgoingMessage] Error: message pointer is NULL but length > 0"
return CWrapResult(
base_result: toCResultErrStr("Message pointer is NULL but length > 0")
)
if messageIdCStr == nil: if messageIdCStr == nil:
return CWrapResult(base_result: toCResultErrStr("Message ID pointer is NULL")) echo "[Nim Binding][WrapOutgoingMessage] Error: messageId pointer is NULL"
return CWrapResult(base_result: toCResultErrStr("Message ID pointer is NULL"))
let messageId = $messageIdCStr let messageId = $messageIdCStr
var messageNim: seq[byte] var messageNim: seq[byte]
@ -227,21 +290,31 @@ proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer,
let wrapResult = wrapOutgoingMessage(rm, messageNim, messageId) let wrapResult = wrapOutgoingMessage(rm, messageNim, messageId)
if wrapResult.isOk: if wrapResult.isOk:
let (wrappedDataPtr, wrappedDataLen) = allocSeqByte(wrapResult.get()) let (wrappedDataPtr, wrappedDataLen) = allocSeqByte(wrapResult.get())
echo "[Nim Binding][WrapOutgoingMessage] Returning wrapped message at ",
cast[int](wrappedDataPtr), " len=", wrappedDataLen
return CWrapResult( return CWrapResult(
base_result: toCResultOk(), base_result: toCResultOk(), message: wrappedDataPtr, message_len: wrappedDataLen
message: wrappedDataPtr,
message_len: wrappedDataLen
) )
else: else:
echo "[Nim Binding][WrapOutgoingMessage] Error: ", $wrapResult.error
return CWrapResult(base_result: toCResultErr(wrapResult.error)) return CWrapResult(base_result: toCResultErr(wrapResult.error))
proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t): CUnwrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe proc UnwrapReceivedMessage*(
handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t
): CUnwrapResult {.exportc, dynlib, cdecl.} =
echo "[Nim Binding][UnwrapReceivedMessage] Called with handle=",
cast[int](handle), " messageLen=", messageLen
if handle == nil: if handle == nil:
return CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) echo "[Nim Binding][UnwrapReceivedMessage] Error: handle is nil"
return
CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
let rm = cast[ReliabilityManager](handle) let rm = cast[ReliabilityManager](handle)
if messageC == nil and messageLen > 0: if messageC == nil and messageLen > 0:
return CUnwrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0")) echo "[Nim Binding][UnwrapReceivedMessage] Error: message pointer is NULL but length > 0"
return CUnwrapResult(
base_result: toCResultErrStr("Message pointer is NULL but length > 0")
)
var messageNim: seq[byte] var messageNim: seq[byte]
if messageLen > 0: if messageLen > 0:
@ -255,46 +328,73 @@ proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer
let (unwrappedContent, missingDepsNim) = unwrapResult.get() let (unwrappedContent, missingDepsNim) = unwrapResult.get()
let (contentPtr, contentLen) = allocSeqByte(unwrappedContent) let (contentPtr, contentLen) = allocSeqByte(unwrappedContent)
let (depsPtr, depsCount) = allocSeqCString(missingDepsNim) let (depsPtr, depsCount) = allocSeqCString(missingDepsNim)
echo "[Nim Binding][UnwrapReceivedMessage] Returning content at ",
cast[int](contentPtr),
" len=",
contentLen,
" missingDepsPtr=",
cast[int](depsPtr),
" count=",
depsCount
return CUnwrapResult( return CUnwrapResult(
base_result: toCResultOk(), base_result: toCResultOk(),
message: contentPtr, message: contentPtr,
message_len: contentLen, message_len: contentLen,
missing_deps: depsPtr, missing_deps: depsPtr,
missing_deps_count: depsCount missing_deps_count: depsCount,
) )
else: else:
echo "[Nim Binding][UnwrapReceivedMessage] Error: ", $unwrapResult.error
return CUnwrapResult(base_result: toCResultErr(unwrapResult.error)) return CUnwrapResult(base_result: toCResultErr(unwrapResult.error))
proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr cstring, count: csize_t): CResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe proc MarkDependenciesMet*(
handle: CReliabilityManagerHandle, messageIDsC: ptr cstring, count: csize_t
): CResult {.exportc, dynlib, cdecl.} =
echo "[Nim Binding][MarkDependenciesMet] Called with handle=",
cast[int](handle), " count=", count
if handle == nil: if handle == nil:
echo "[Nim Binding][MarkDependenciesMet] Error: handle is nil"
return toCResultErrStr("ReliabilityManager handle is NULL") return toCResultErrStr("ReliabilityManager handle is NULL")
let rm = cast[ReliabilityManager](handle) let rm = cast[ReliabilityManager](handle)
if messageIDsC == nil and count > 0: if messageIDsC == nil and count > 0:
echo "[Nim Binding][MarkDependenciesMet] Error: messageIDs pointer is NULL but count > 0"
return toCResultErrStr("MessageIDs pointer is NULL but count > 0") return toCResultErrStr("MessageIDs pointer is NULL but count > 0")
var messageIDsNim = newSeq[string](count) var messageIDsNim = newSeq[string](count)
# Cast to ptr UncheckedArray for indexing # Cast to ptr UncheckedArray for indexing
let messageIDsCArray = cast[ptr UncheckedArray[cstring]](messageIDsC) let messageIDsCArray = cast[ptr UncheckedArray[cstring]](messageIDsC)
for i in 0..<count: for i in 0 ..< count:
let currentCStr = messageIDsCArray[i] # Use unchecked array indexing let currentCStr = messageIDsCArray[i] # Use unchecked array indexing
if currentCStr != nil: if currentCStr != nil:
messageIDsNim[i] = $currentCStr messageIDsNim[i] = $currentCStr
echo "[Nim Binding][MarkDependenciesMet] messageID[",
i, "] = ", messageIDsNim[i], " at ", cast[int](currentCStr)
else: else:
echo "[Nim Binding][MarkDependenciesMet] NULL message ID found in array at index ",
i
return toCResultErrStr("NULL message ID found in array") return toCResultErrStr("NULL message ID found in array")
let result = markDependenciesMet(rm, messageIDsNim) let result = markDependenciesMet(rm, messageIDsNim)
if result.isOk: if result.isOk:
echo "[Nim Binding][MarkDependenciesMet] Success"
return toCResultOk() return toCResultOk()
else: else:
echo "[Nim Binding][MarkDependenciesMet] Error: ", $result.error
return toCResultErr(result.error) return toCResultErr(result.error)
proc RegisterCallback*(handle: CReliabilityManagerHandle, proc RegisterCallback*(
cEventCallback: CEventCallback, handle: CReliabilityManagerHandle,
cUserDataPtr: pointer) {.exportc, dynlib, cdecl.} = cEventCallback: CEventCallback,
withLock registryLock: cUserDataPtr: pointer,
callbackRegistry[handle] = cEventCallback ) {.exportc, dynlib, cdecl, gcsafe.} =
echo "[Nim Binding] Registered callback for handle: ", cast[int](handle) if handle == nil:
echo "[Nim Binding][RegisterCallback] Error: handle is NULL"
return
let rm = cast[ReliabilityManager](handle)
rm.cCallback = cEventCallback
rm.cUserData = cUserDataPtr # Store user data pointer
echo "[Nim Binding] Stored C callback and user data for handle: ", cast[int](handle)
proc StartPeriodicTasks*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} = proc StartPeriodicTasks*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} =
if handle == nil: if handle == nil:

Binary file not shown.

View File

@ -1,9 +1,9 @@
# Package # Package
version = "0.1.0" version = "0.1.0"
author = "Waku Team" author = "Waku Team"
description = "E2E Reliability Protocol API" description = "E2E Reliability Protocol API"
license = "MIT" license = "MIT"
srcDir = "src" srcDir = "src"
# Dependencies # Dependencies
requires "nim >= 2.0.8" requires "nim >= 2.0.8"
@ -17,7 +17,8 @@ task test, "Run the test suite":
task bindings, "Generate bindings": task bindings, "Generate bindings":
proc compile(libName: string, flags = "") = proc compile(libName: string, flags = "") =
exec "nim c -f " & flags & " -d:release --app:lib --mm:arc --tlsEmulation:off --out:" & libName & " --outdir:bindings/generated bindings/bindings.nim" exec "nim c -f " & flags & " -d:release --app:lib --mm:arc --tlsEmulation:off --out:" &
libName & " --outdir:bindings/generated bindings/bindings.nim"
# Create required directories # Create required directories
mkDir "bindings/generated" mkDir "bindings/generated"
@ -25,8 +26,10 @@ task bindings, "Generate bindings":
when defined(windows): when defined(windows):
compile "reliability.dll" compile "reliability.dll"
elif defined(macosx): elif defined(macosx):
compile "libsds.dylib.arm", "--cpu:arm64 -l:'-target arm64-apple-macos11' -t:'-target arm64-apple-macos11'" compile "libsds.dylib.arm",
compile "libsds.dylib.x64", "--cpu:amd64 -l:'-target x86_64-apple-macos10.12' -t:'-target x86_64-apple-macos10.12'" "--cpu:arm64 -l:'-target arm64-apple-macos11' -t:'-target arm64-apple-macos11'"
compile "libsds.dylib.x64",
"--cpu:amd64 -l:'-target x86_64-apple-macos10.12' -t:'-target x86_64-apple-macos10.12'"
exec "lipo bindings/generated/libsds.dylib.arm bindings/generated/libsds.dylib.x64 -output bindings/generated/libsds.dylib -create" exec "lipo bindings/generated/libsds.dylib.arm bindings/generated/libsds.dylib.x64 -output bindings/generated/libsds.dylib -create"
else: else:
compile "libsds.so" compile "libsds.so"

View File

@ -64,13 +64,14 @@ func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) {
// CleanupReliabilityManager frees the resources associated with the handle // CleanupReliabilityManager frees the resources associated with the handle
func CleanupReliabilityManager(handle ReliabilityManagerHandle) { func CleanupReliabilityManager(handle ReliabilityManagerHandle) {
fmt.Printf("Go: CleanupReliabilityManager called for handle %p\n", handle) // Log entry
if handle == nil { if handle == nil {
fmt.Println("Go: CleanupReliabilityManager: handle is nil, returning.")
return return
} }
registryMutex.Lock() fmt.Printf("Go: CleanupReliabilityManager: Calling C.CleanupReliabilityManager for handle %p\n", handle)
delete(callbackRegistry, handle)
registryMutex.Unlock()
C.CleanupReliabilityManager(unsafe.Pointer(handle)) C.CleanupReliabilityManager(unsafe.Pointer(handle))
fmt.Printf("Go: CleanupReliabilityManager: C.CleanupReliabilityManager returned for handle %p\n", handle) // Log exit
} }
// ResetReliabilityManager resets the state of the manager // ResetReliabilityManager resets the state of the manager
@ -212,7 +213,7 @@ func RegisterCallback(handle ReliabilityManagerHandle, callbacks Callbacks) erro
C.RegisterCallback( C.RegisterCallback(
unsafe.Pointer(handle), unsafe.Pointer(handle),
(C.CEventCallback)(C.globalCallbackRelay), // Pass the Go relay function pointer (C.CEventCallback)(C.globalCallbackRelay), // Pass the Go relay function pointer
nil, // user_data is not used here, handle is passed directly by Nim wrapper nil,
) )
return nil return nil
} }
@ -229,6 +230,7 @@ func StartPeriodicTasks(handle ReliabilityManagerHandle) error {
// globalCallbackRelay is called by Nim for all events. // globalCallbackRelay is called by Nim for all events.
// It uses the handle to find the correct Go Callbacks struct and dispatch the call. // It uses the handle to find the correct Go Callbacks struct and dispatch the call.
//export globalCallbackRelay //export globalCallbackRelay
func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 unsafe.Pointer, data2 unsafe.Pointer, data3 C.size_t) { func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 unsafe.Pointer, data2 unsafe.Pointer, data3 C.size_t) {
goHandle := ReliabilityManagerHandle(handle) goHandle := ReliabilityManagerHandle(handle)
@ -238,13 +240,11 @@ func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 un
registryMutex.RUnlock() registryMutex.RUnlock()
if !ok || callbacks == nil { if !ok || callbacks == nil {
fmt.Printf("Go: globalCallbackRelay: No callbacks registered for handle %v\n", goHandle) // Uncommented fmt.Printf("Go: globalCallbackRelay: No callbacks registered for handle %v\n", goHandle)
return return
} }
// Use a goroutine to avoid blocking the Nim thread switch eventType {
go func() {
switch eventType {
case C.EVENT_MESSAGE_READY: case C.EVENT_MESSAGE_READY:
if callbacks.OnMessageReady != nil { if callbacks.OnMessageReady != nil {
msgIdStr := C.GoString((*C.char)(data1)) msgIdStr := C.GoString((*C.char)(data1))
@ -275,6 +275,5 @@ func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 un
} }
default: default:
fmt.Printf("Go: globalCallbackRelay: Received unknown event type %d for handle %v\n", eventType, goHandle) fmt.Printf("Go: globalCallbackRelay: Received unknown event type %d for handle %v\n", eventType, goHandle)
} }
}()
} }

View File

@ -24,6 +24,28 @@ func TestLifecycle(t *testing.T) {
} }
} }
// Test that consecutive calls return unique handles
func TestHandleUniqueness(t *testing.T) {
channelID := "test-unique-handles"
handle1, err1 := NewReliabilityManager(channelID)
if err1 != nil || handle1 == nil {
t.Fatalf("NewReliabilityManager (1) failed: %v", err1)
}
defer CleanupReliabilityManager(handle1)
t.Logf("Handle 1: %p", handle1)
handle2, err2 := NewReliabilityManager(channelID)
if err2 != nil || handle2 == nil {
t.Fatalf("NewReliabilityManager (2) failed: %v", err2)
}
defer CleanupReliabilityManager(handle2)
t.Logf("Handle 2: %p", handle2)
if handle1 == handle2 {
t.Errorf("Expected unique handles, but both are %p", handle1)
}
}
// Test wrapping and unwrapping a simple message // Test wrapping and unwrapping a simple message
func TestWrapUnwrap(t *testing.T) { func TestWrapUnwrap(t *testing.T) {
channelID := "test-wrap-unwrap" channelID := "test-wrap-unwrap"
@ -122,50 +144,279 @@ func TestDependencies(t *testing.T) {
} }
} }
// Test callbacks // Test OnMessageReady callback
func TestCallbacks(t *testing.T) { func TestCallback_OnMessageReady(t *testing.T) {
channelID := "test-callbacks" channelID := "test-cb-ready"
// Create sender and receiver handles
handleSender, err := NewReliabilityManager(channelID)
if err != nil {
t.Fatalf("NewReliabilityManager (sender) failed: %v", err)
}
defer CleanupReliabilityManager(handleSender)
handleReceiver, err := NewReliabilityManager(channelID)
if err != nil {
t.Fatalf("NewReliabilityManager (receiver) failed: %v", err)
}
defer CleanupReliabilityManager(handleReceiver)
// Use a channel for signaling instead of WaitGroup
readyChan := make(chan MessageID, 1)
callbacks := Callbacks{
OnMessageReady: func(messageId MessageID) {
fmt.Printf("Test_OnMessageReady: Received: %s\n", messageId)
// Non-blocking send to channel
select {
case readyChan <- messageId:
fmt.Printf("Test_OnMessageReady: Sent '%s' to readyChan\n", messageId) // Log after send
default:
// Avoid blocking if channel is full or test already timed out
fmt.Printf("Test_OnMessageReady: Warning - readyChan buffer full or test finished for %s\n", messageId)
}
},
}
// Register callback only on the receiver handle
err = RegisterCallback(handleReceiver, callbacks)
if err != nil {
t.Fatalf("RegisterCallback failed: %v", err)
}
// Scenario: Wrap message on sender, unwrap on receiver
payload := []byte("ready test")
msgID := MessageID("cb-ready-1")
// Wrap on sender
t.Logf("Test_OnMessageReady: Wrapping message with handleSender: %p", handleSender) // Log sender handle
wrappedMsg, err := WrapOutgoingMessage(handleSender, payload, msgID)
if err != nil {
t.Fatalf("WrapOutgoingMessage failed: %v", err)
}
// Unwrap on receiver
t.Logf("Test_OnMessageReady: Unwrapping message with handleReceiver: %p", handleReceiver) // Log receiver handle
_, _, err = UnwrapReceivedMessage(handleReceiver, wrappedMsg)
if err != nil {
t.Fatalf("UnwrapReceivedMessage failed: %v", err)
}
// Verification - Wait on channel with timeout
select {
case receivedMsgID := <-readyChan:
// Mark as called implicitly since we received on channel
if receivedMsgID != msgID {
t.Errorf("OnMessageReady called with wrong ID: got %q, want %q", receivedMsgID, msgID)
}
case <-time.After(2 * time.Second):
// If timeout occurs, the channel receive failed.
t.Errorf("Timed out waiting for OnMessageReady callback on readyChan")
}
}
// Test OnMessageSent callback (via causal history ACK)
func TestCallback_OnMessageSent(t *testing.T) {
channelID := "test-cb-sent"
// Create two handles
handle1, err := NewReliabilityManager(channelID)
if err != nil {
t.Fatalf("NewReliabilityManager (1) failed: %v", err)
}
defer CleanupReliabilityManager(handle1)
handle2, err := NewReliabilityManager(channelID)
if err != nil {
t.Fatalf("NewReliabilityManager (2) failed: %v", err)
}
defer CleanupReliabilityManager(handle2)
var wg sync.WaitGroup
sentCalled := false
var sentMsgID MessageID
var cbMutex sync.Mutex
callbacks := Callbacks{
OnMessageSent: func(messageId MessageID) {
fmt.Printf("Test_OnMessageSent: Received: %s\n", messageId)
cbMutex.Lock()
sentCalled = true
sentMsgID = messageId
cbMutex.Unlock()
wg.Done()
},
}
// Register callback on handle1 (the original sender)
err = RegisterCallback(handle1, callbacks)
if err != nil {
t.Fatalf("RegisterCallback failed: %v", err)
}
// Scenario: handle1 sends msg1, handle2 receives msg1,
// handle2 sends msg2 (acking msg1), handle1 receives msg2.
// 1. handle1 sends msg1
payload1 := []byte("sent test 1")
msgID1 := MessageID("cb-sent-1")
wrappedMsg1, err := WrapOutgoingMessage(handle1, payload1, msgID1)
if err != nil {
t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
}
// Note: msg1 is now in handle1's outgoing buffer
// 2. handle2 receives msg1 (to update its state)
_, _, err = UnwrapReceivedMessage(handle2, wrappedMsg1)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (1) on handle2 failed: %v", err)
}
// 3. handle2 sends msg2 (will include msg1 in causal history)
payload2 := []byte("sent test 2")
msgID2 := MessageID("cb-sent-2")
wrappedMsg2, err := WrapOutgoingMessage(handle2, payload2, msgID2)
if err != nil {
t.Fatalf("WrapOutgoingMessage (2) on handle2 failed: %v", err)
}
// 4. handle1 receives msg2 (should trigger ACK for msg1)
wg.Add(1) // Expect OnMessageSent for msg1 on handle1
_, _, err = UnwrapReceivedMessage(handle1, wrappedMsg2)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (2) on handle1 failed: %v", err)
}
// Verification
waitTimeout(&wg, 2*time.Second, t)
cbMutex.Lock()
defer cbMutex.Unlock()
if !sentCalled {
t.Errorf("OnMessageSent was not called")
}
// We primarily care that msg1 was ACKed.
if sentMsgID != msgID1 {
t.Errorf("OnMessageSent called with wrong ID: got %q, want %q", sentMsgID, msgID1)
}
}
// Test OnMissingDependencies callback
func TestCallback_OnMissingDependencies(t *testing.T) {
channelID := "test-cb-missing"
// Use separate sender/receiver handles explicitly
handleSender, err := NewReliabilityManager(channelID)
if err != nil {
t.Fatalf("NewReliabilityManager (sender) failed: %v", err)
}
defer CleanupReliabilityManager(handleSender)
handleReceiver, err := NewReliabilityManager(channelID)
if err != nil {
t.Fatalf("NewReliabilityManager (receiver) failed: %v", err)
}
defer CleanupReliabilityManager(handleReceiver)
var wg sync.WaitGroup
missingCalled := false
var missingMsgID MessageID
var missingDepsList []MessageID
var cbMutex sync.Mutex
callbacks := Callbacks{
OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) {
fmt.Printf("Test_OnMissingDependencies: Received for %s: %v\n", messageId, missingDeps)
cbMutex.Lock()
missingCalled = true
missingMsgID = messageId
missingDepsList = missingDeps // Copy slice
cbMutex.Unlock()
wg.Done()
},
}
// Register callback only on the receiver handle
err = RegisterCallback(handleReceiver, callbacks)
if err != nil {
t.Fatalf("RegisterCallback failed: %v", err)
}
// Scenario: Sender sends msg1, then sender sends msg2 (depends on msg1),
// then receiver receives msg2 (which hasn't seen msg1).
// 1. Sender sends msg1
payload1 := []byte("missing test 1")
msgID1 := MessageID("cb-miss-1")
_, err = WrapOutgoingMessage(handleSender, payload1, msgID1) // Assign to _
if err != nil {
t.Fatalf("WrapOutgoingMessage (1) on sender failed: %v", err)
}
// _, _, err = UnwrapReceivedMessage(handleSender, wrappedMsg1) // No need to unwrap on sender
// 2. Sender sends msg2 (depends on msg1)
payload2 := []byte("missing test 2")
msgID2 := MessageID("cb-miss-2")
wrappedMsg2, err := WrapOutgoingMessage(handleSender, payload2, msgID2)
if err != nil {
t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
}
// 3. Receiver receives msg2 (haven't seen msg1)
wg.Add(1) // Expect OnMissingDependencies
_, _, err = UnwrapReceivedMessage(handleReceiver, wrappedMsg2)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (2) on receiver failed: %v", err)
}
// Verification
waitTimeout(&wg, 2*time.Second, t)
cbMutex.Lock()
defer cbMutex.Unlock()
if !missingCalled {
t.Errorf("OnMissingDependencies was not called")
}
if missingMsgID != msgID2 {
t.Errorf("OnMissingDependencies called for wrong ID: got %q, want %q", missingMsgID, msgID2)
}
foundDep := false
for _, dep := range missingDepsList {
if dep == msgID1 {
foundDep = true
break
}
}
if !foundDep {
t.Errorf("OnMissingDependencies did not report %q as missing, got: %v", msgID1, missingDepsList)
}
}
// Test OnPeriodicSync callback
func TestCallback_OnPeriodicSync(t *testing.T) {
channelID := "test-cb-sync"
handle, err := NewReliabilityManager(channelID) handle, err := NewReliabilityManager(channelID)
if err != nil { if err != nil {
t.Fatalf("NewReliabilityManager failed: %v", err) t.Fatalf("NewReliabilityManager failed: %v", err)
} }
defer CleanupReliabilityManager(handle) defer CleanupReliabilityManager(handle)
var wg sync.WaitGroup syncCalled := false
receivedReady := make(map[MessageID]bool) var cbMutex sync.Mutex
receivedSent := make(map[MessageID]bool) // Use a channel to signal when the callback is hit, as WaitGroup isn't ideal for periodic calls
receivedMissing := make(map[MessageID][]MessageID) syncChan := make(chan bool, 1)
syncRequested := false
var cbMutex sync.Mutex // Protect access to callback tracking maps/vars
callbacks := Callbacks{ callbacks := Callbacks{
OnMessageReady: func(messageId MessageID) {
fmt.Printf("Test: OnMessageReady received: %s\n", messageId)
cbMutex.Lock()
receivedReady[messageId] = true
cbMutex.Unlock()
wg.Done()
},
OnMessageSent: func(messageId MessageID) {
fmt.Printf("Test: OnMessageSent received: %s\n", messageId)
cbMutex.Lock()
receivedSent[messageId] = true
cbMutex.Unlock()
wg.Done()
},
OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) {
fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps)
cbMutex.Lock()
receivedMissing[messageId] = missingDeps
cbMutex.Unlock()
wg.Done()
},
OnPeriodicSync: func() { OnPeriodicSync: func() {
fmt.Println("Test: OnPeriodicSync received") fmt.Println("Test_OnPeriodicSync: Received")
cbMutex.Lock() cbMutex.Lock()
syncRequested = true if !syncCalled { // Only signal the first time
syncCalled = true
syncChan <- true
}
cbMutex.Unlock() cbMutex.Unlock()
// Don't wg.Done() here, it might be called multiple times
}, },
} }
@ -174,72 +425,28 @@ func TestCallbacks(t *testing.T) {
t.Fatalf("RegisterCallback failed: %v", err) t.Fatalf("RegisterCallback failed: %v", err)
} }
// Start tasks AFTER registering callbacks // Start periodic tasks
err = StartPeriodicTasks(handle) err = StartPeriodicTasks(handle)
if err != nil { if err != nil {
t.Fatalf("StartPeriodicTasks failed: %v", err) t.Fatalf("StartPeriodicTasks failed: %v", err)
} }
// --- Test Scenario ---
// 1. Send msg1
wg.Add(1) // Expect OnMessageSent for msg1 eventually
payload1 := []byte("callback test 1")
msgID1 := MessageID("cb-msg-1")
wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
if err != nil {
t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
}
// 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history)
wg.Add(1) // Expect OnMessageReady for msg1
_, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
}
// 3. Send msg2 (depends on msg1)
wg.Add(1) // Expect OnMessageSent for msg2 eventually
payload2 := []byte("callback test 2")
msgID2 := MessageID("cb-msg-2")
wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
if err != nil {
t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
}
// 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2)
wg.Add(1) // Expect OnMessageReady for msg2
_, _, err = UnwrapReceivedMessage(handle, wrappedMsg2)
if err != nil {
t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err)
}
// --- Verification --- // --- Verification ---
// Wait for expected callbacks with a timeout // Wait for the periodic sync callback with a timeout (needs to be longer than sync interval)
waitTimeout(&wg, 5*time.Second, t) // Default sync interval is 30s, which is too long for a unit test.
// We rely on the periodic tasks starting quickly and triggering the callback soon.
select {
case <-syncChan:
// Success
case <-time.After(10 * time.Second):
t.Errorf("Timed out waiting for OnPeriodicSync callback")
}
cbMutex.Lock() cbMutex.Lock()
defer cbMutex.Unlock() defer cbMutex.Unlock()
if !syncCalled {
if !receivedReady[msgID1] { // This might happen if the timeout was too short
t.Errorf("OnMessageReady not called for %s", msgID1) t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout")
}
if !receivedReady[msgID2] {
t.Errorf("OnMessageReady not called for %s", msgID2)
}
if !receivedSent[msgID1] {
t.Errorf("OnMessageSent not called for %s", msgID1)
}
if !receivedSent[msgID2] {
t.Errorf("OnMessageSent not called for %s", msgID2)
}
// We didn't explicitly test missing deps in this path
if len(receivedMissing) > 0 {
t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing)
}
// Periodic sync is harder to guarantee in a short test, just check if it was ever true
if !syncRequested {
t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout")
} }
} }

View File

@ -4,22 +4,21 @@ import strutils
import results import results
import private/probabilities import private/probabilities
type type BloomFilter* = object
BloomFilter* = object capacity*: int
capacity*: int errorRate*: float
errorRate*: float kHashes*: int
kHashes*: int mBits*: int
mBits*: int intArray*: seq[int]
intArray*: seq[int]
{.push overflowChecks: off.} # Turn off overflow checks for hashing operations {.push overflowChecks: off.} # Turn off overflow checks for hashing operations
proc hashN(item: string, n: int, maxValue: int): int = proc hashN(item: string, n: int, maxValue: int): int =
## Get the nth hash using Nim's built-in hash function using ## Get the nth hash using Nim's built-in hash function using
## the double hashing technique from Kirsch and Mitzenmacher, 2008: ## the double hashing technique from Kirsch and Mitzenmacher, 2008:
## http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/rsa.pdf ## http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/rsa.pdf
let let
hashA = abs(hash(item)) mod maxValue # Use abs to handle negative hashes hashA = abs(hash(item)) mod maxValue # Use abs to handle negative hashes
hashB = abs(hash(item & " b")) mod maxValue # string concatenation hashB = abs(hash(item & " b")) mod maxValue # string concatenation
abs((hashA + n * hashB)) mod maxValue abs((hashA + n * hashB)) mod maxValue
# # Use bit rotation for second hash instead of string concatenation if speed if preferred over FP-rate # # Use bit rotation for second hash instead of string concatenation if speed if preferred over FP-rate
@ -31,20 +30,24 @@ proc hashN(item: string, n: int, maxValue: int): int =
{.pop.} {.pop.}
proc getMOverNBitsForK*(k: int, targetError: float, proc getMOverNBitsForK*(
probabilityTable = kErrors): Result[int, string] = k: int, targetError: float, probabilityTable = kErrors
): Result[int, string] =
## Returns the optimal number of m/n bits for a given k. ## Returns the optimal number of m/n bits for a given k.
if k notin 0..12: if k notin 0 .. 12:
return err("K must be <= 12 if forceNBitsPerElem is not also specified.") return err("K must be <= 12 if forceNBitsPerElem is not also specified.")
for mOverN in 2..probabilityTable[k].high: for mOverN in 2 .. probabilityTable[k].high:
if probabilityTable[k][mOverN] < targetError: if probabilityTable[k][mOverN] < targetError:
return ok(mOverN) return ok(mOverN)
err("Specified value of k and error rate not achievable using less than 4 bytes / element.") err(
"Specified value of k and error rate not achievable using less than 4 bytes / element."
)
proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0, proc initializeBloomFilter*(
forceNBitsPerElem = 0): Result[BloomFilter, string] = capacity: int, errorRate: float, k = 0, forceNBitsPerElem = 0
): Result[BloomFilter, string] =
## Initializes a Bloom filter with specified parameters. ## Initializes a Bloom filter with specified parameters.
## ##
## Parameters: ## Parameters:
@ -76,25 +79,29 @@ proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0,
mBits = capacity * nBitsPerElem mBits = capacity * nBitsPerElem
mInts = 1 + mBits div (sizeof(int) * 8) mInts = 1 + mBits div (sizeof(int) * 8)
ok(BloomFilter( ok(
capacity: capacity, BloomFilter(
errorRate: errorRate, capacity: capacity,
kHashes: kHashes, errorRate: errorRate,
mBits: mBits, kHashes: kHashes,
intArray: newSeq[int](mInts) mBits: mBits,
)) intArray: newSeq[int](mInts),
)
)
proc `$`*(bf: BloomFilter): string = proc `$`*(bf: BloomFilter): string =
## Prints the configuration of the Bloom filter. ## Prints the configuration of the Bloom filter.
"Bloom filter with $1 capacity, $2 error rate, $3 hash functions, and requiring $4 bits of memory." % "Bloom filter with $1 capacity, $2 error rate, $3 hash functions, and requiring $4 bits of memory." %
[$bf.capacity, [
formatFloat(bf.errorRate, format = ffScientific, precision = 1), $bf.capacity,
$bf.kHashes, formatFloat(bf.errorRate, format = ffScientific, precision = 1),
$(bf.mBits div bf.capacity)] $bf.kHashes,
$(bf.mBits div bf.capacity),
]
proc computeHashes(bf: BloomFilter, item: string): seq[int] = proc computeHashes(bf: BloomFilter, item: string): seq[int] =
var hashes = newSeq[int](bf.kHashes) var hashes = newSeq[int](bf.kHashes)
for i in 0..<bf.kHashes: for i in 0 ..< bf.kHashes:
hashes[i] = hashN(item, i, bf.mBits) hashes[i] = hashN(item, i, bf.mBits)
hashes hashes
@ -120,4 +127,4 @@ proc lookup*(bf: BloomFilter, item: string): bool =
currentInt = bf.intArray[intAddress] currentInt = bf.intArray[intAddress]
if currentInt != (currentInt or (1 shl bitOffset)): if currentInt != (currentInt or (1 shl bitOffset)):
return false return false
true true

View File

@ -27,4 +27,4 @@ const
DefaultMaxResendAttempts* = 5 DefaultMaxResendAttempts* = 5
DefaultSyncMessageInterval* = initDuration(seconds = 30) DefaultSyncMessageInterval* = initDuration(seconds = 30)
DefaultBufferSweepInterval* = initDuration(seconds = 60) DefaultBufferSweepInterval* = initDuration(seconds = 60)
MaxMessageSize* = 1024 * 1024 # 1 MB MaxMessageSize* = 1024 * 1024 # 1 MB

View File

@ -7,94 +7,97 @@
type type
TErrorForK = seq[float] TErrorForK = seq[float]
TAllErrorRates* = array[0..12, TErrorForK] TAllErrorRates* = array[0 .. 12, TErrorForK]
var kErrors* {.threadvar.}: TAllErrorRates var kErrors* {.threadvar.}: TAllErrorRates
kErrors = [ kErrors = [
@[1.0], @[1.0],
@[1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, @[
0.1540000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, 1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, 0.1540000000,
0.0869000000, 0.0800000000, 0.0740000000, 0.0689000000, 0.0645000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, 0.0869000000, 0.0800000000,
0.0606000000, 0.0571000000, 0.0540000000, 0.0513000000, 0.0488000000, 0.0740000000, 0.0689000000, 0.0645000000, 0.0606000000, 0.0571000000, 0.0540000000,
0.0465000000, 0.0444000000, 0.0425000000, 0.0408000000, 0.0392000000, 0.0513000000, 0.0488000000, 0.0465000000, 0.0444000000, 0.0425000000, 0.0408000000,
0.0377000000, 0.0364000000, 0.0351000000, 0.0339000000, 0.0328000000, 0.0392000000, 0.0377000000, 0.0364000000, 0.0351000000, 0.0339000000, 0.0328000000,
0.0317000000, 0.0308000000], 0.0317000000, 0.0308000000,
],
@[1.0, 1.0, 0.4000000000, 0.2370000000, 0.1550000000, 0.1090000000, @[
0.0804000000, 0.0618000000, 0.0489000000, 0.0397000000, 0.0329000000, 1.0, 1.0, 0.4000000000, 0.2370000000, 0.1550000000, 0.1090000000, 0.0804000000,
0.0276000000, 0.0236000000, 0.0203000000, 0.0177000000, 0.0156000000, 0.0618000000, 0.0489000000, 0.0397000000, 0.0329000000, 0.0276000000, 0.0236000000,
0.0138000000, 0.0123000000, 0.0111000000, 0.0099800000, 0.0090600000, 0.0203000000, 0.0177000000, 0.0156000000, 0.0138000000, 0.0123000000, 0.0111000000,
0.0082500000, 0.0075500000, 0.0069400000, 0.0063900000, 0.0059100000, 0.0099800000, 0.0090600000, 0.0082500000, 0.0075500000, 0.0069400000, 0.0063900000,
0.0054800000, 0.0051000000, 0.0047500000, 0.0044400000, 0.0041600000, 0.0059100000, 0.0054800000, 0.0051000000, 0.0047500000, 0.0044400000, 0.0041600000,
0.0039000000, 0.0036700000], 0.0039000000, 0.0036700000,
],
@[1.0, 1.0, 1.0, 0.2530000000, 0.1470000000, 0.0920000000, 0.0609000000, @[
0.0423000000, 0.0306000000, 0.0228000000, 0.0174000000, 0.0136000000, 1.0, 1.0, 1.0, 0.2530000000, 0.1470000000, 0.0920000000, 0.0609000000, 0.0423000000,
0.0108000000, 0.0087500000, 0.0071800000, 0.0059600000, 0.0050000000, 0.0306000000, 0.0228000000, 0.0174000000, 0.0136000000, 0.0108000000, 0.0087500000,
0.0042300000, 0.0036200000, 0.0031200000, 0.0027000000, 0.0023600000, 0.0071800000, 0.0059600000, 0.0050000000, 0.0042300000, 0.0036200000, 0.0031200000,
0.0020700000, 0.0018300000, 0.0016200000, 0.0014500000, 0.0012900000, 0.0027000000, 0.0023600000, 0.0020700000, 0.0018300000, 0.0016200000, 0.0014500000,
0.0011600000, 0.0010500000, 0.0009490000, 0.0008620000, 0.0007850000, 0.0012900000, 0.0011600000, 0.0010500000, 0.0009490000, 0.0008620000, 0.0007850000,
0.0007170000], 0.0007170000,
],
@[1.0, 1.0, 1.0, 1.0, 0.1600000000, 0.0920000000, 0.0561000000, 0.0359000000, @[
0.0240000000, 0.0166000000, 0.0118000000, 0.0086400000, 0.0064600000, 1.0, 1.0, 1.0, 1.0, 0.1600000000, 0.0920000000, 0.0561000000, 0.0359000000,
0.0049200000, 0.0038100000, 0.0030000000, 0.0023900000, 0.0019300000, 0.0240000000, 0.0166000000, 0.0118000000, 0.0086400000, 0.0064600000, 0.0049200000,
0.0015800000, 0.0013000000, 0.0010800000, 0.0009050000, 0.0007640000, 0.0038100000, 0.0030000000, 0.0023900000, 0.0019300000, 0.0015800000, 0.0013000000,
0.0006490000, 0.0005550000, 0.0004780000, 0.0004130000, 0.0003590000, 0.0010800000, 0.0009050000, 0.0007640000, 0.0006490000, 0.0005550000, 0.0004780000,
0.0003140000, 0.0002760000, 0.0002430000, 0.0002150000, 0.0001910000], 0.0004130000, 0.0003590000, 0.0003140000, 0.0002760000, 0.0002430000, 0.0002150000,
0.0001910000,
@[1.0, 1.0, 1.0, 1.0, 1.0, 0.1010000000, 0.0578000000, 0.0347000000, ],
0.0217000000, 0.0141000000, 0.0094300000, 0.0065000000, 0.0045900000, @[
0.0033200000, 0.0024400000, 0.0018300000, 0.0013900000, 0.0010700000, 1.0, 1.0, 1.0, 1.0, 1.0, 0.1010000000, 0.0578000000, 0.0347000000, 0.0217000000,
0.0008390000, 0.0006630000, 0.0005300000, 0.0004270000, 0.0003470000, 0.0141000000, 0.0094300000, 0.0065000000, 0.0045900000, 0.0033200000, 0.0024400000,
0.0002850000, 0.0002350000, 0.0001960000, 0.0001640000, 0.0001380000, 0.0018300000, 0.0013900000, 0.0010700000, 0.0008390000, 0.0006630000, 0.0005300000,
0.0001170000, 0.0000996000, 0.0000853000, 0.0000733000, 0.0000633000], 0.0004270000, 0.0003470000, 0.0002850000, 0.0002350000, 0.0001960000, 0.0001640000,
0.0001380000, 0.0001170000, 0.0000996000, 0.0000853000, 0.0000733000, 0.0000633000,
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0638000000, 0.0364000000, 0.0216000000, ],
0.0133000000, 0.0084400000, 0.0055200000, 0.0037100000, 0.0025500000, @[
0.0017900000, 0.0012800000, 0.0009350000, 0.0006920000, 0.0005190000, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0638000000, 0.0364000000, 0.0216000000,
0.0003940000, 0.0003030000, 0.0002360000, 0.0001850000, 0.0001470000, 0.0133000000, 0.0084400000, 0.0055200000, 0.0037100000, 0.0025500000, 0.0017900000,
0.0001170000, 0.0000944000, 0.0000766000, 0.0000626000, 0.0000515000, 0.0012800000, 0.0009350000, 0.0006920000, 0.0005190000, 0.0003940000, 0.0003030000,
0.0000426000, 0.0000355000, 0.0000297000, 0.0000250000], 0.0002360000, 0.0001850000, 0.0001470000, 0.0001170000, 0.0000944000, 0.0000766000,
0.0000626000, 0.0000515000, 0.0000426000, 0.0000355000, 0.0000297000, 0.0000250000,
@[1.0, 1.0, 1.0, ],
1.0, 1.0, 1.0, 1.0, 1.0, 0.0229000000, 0.0135000000, 0.0081900000, @[
0.0051300000, 0.0032900000, 0.0021700000, 0.0014600000, 0.0010000000, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0229000000, 0.0135000000, 0.0081900000,
0.0007020000, 0.0004990000, 0.0003600000, 0.0002640000, 0.0001960000, 0.0051300000, 0.0032900000, 0.0021700000, 0.0014600000, 0.0010000000, 0.0007020000,
0.0001470000, 0.0001120000, 0.0000856000, 0.0000663000, 0.0000518000, 0.0004990000, 0.0003600000, 0.0002640000, 0.0001960000, 0.0001470000, 0.0001120000,
0.0000408000, 0.0000324000, 0.0000259000, 0.0000209000, 0.0000169000, 0.0000856000, 0.0000663000, 0.0000518000, 0.0000408000, 0.0000324000, 0.0000259000,
0.0000138000, 0.0000113000], 0.0000209000, 0.0000169000, 0.0000138000, 0.0000113000,
],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, @[
1.0, 0.0145000000, 0.0084600000, 0.0050900000, 0.0031400000, 0.0019900000, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0145000000, 0.0084600000,
0.0012900000, 0.0008520000, 0.0005740000, 0.0003940000, 0.0002750000, 0.0050900000, 0.0031400000, 0.0019900000, 0.0012900000, 0.0008520000, 0.0005740000,
0.0001940000, 0.0001400000, 0.0001010000, 0.0000746000, 0.0000555000, 0.0003940000, 0.0002750000, 0.0001940000, 0.0001400000, 0.0001010000, 0.0000746000,
0.0000417000, 0.0000316000, 0.0000242000, 0.0000187000, 0.0000146000, 0.0000555000, 0.0000417000, 0.0000316000, 0.0000242000, 0.0000187000, 0.0000146000,
0.0000114000, 0.0000090100, 0.0000071600, 0.0000057300], 0.0000114000, 0.0000090100, 0.0000071600, 0.0000057300,
],
@[1.0, 1.0, 1.0, @[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0053100000, 0.0031700000, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0053100000, 0.0031700000,
0.0019400000, 0.0012100000, 0.0007750000, 0.0005050000, 0.0003350000, 0.0019400000, 0.0012100000, 0.0007750000, 0.0005050000, 0.0003350000, 0.0002260000,
0.0002260000, 0.0001550000, 0.0001080000, 0.0000759000, 0.0000542000, 0.0001550000, 0.0001080000, 0.0000759000, 0.0000542000, 0.0000392000, 0.0000286000,
0.0000392000, 0.0000286000, 0.0000211000, 0.0000157000, 0.0000118000, 0.0000211000, 0.0000157000, 0.0000118000, 0.0000089600, 0.0000068500, 0.0000052800,
0.0000089600, 0.0000068500, 0.0000052800, 0.0000041000, 0.0000032000], 0.0000041000, 0.0000032000,
],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0033400000, @[
0.0019800000, 0.0012000000, 0.0007440000, 0.0004700000, 0.0003020000, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0033400000,
0.0001980000, 0.0001320000, 0.0000889000, 0.0000609000, 0.0000423000, 0.0019800000, 0.0012000000, 0.0007440000, 0.0004700000, 0.0003020000, 0.0001980000,
0.0000297000, 0.0000211000, 0.0000152000, 0.0000110000, 0.0000080700, 0.0001320000, 0.0000889000, 0.0000609000, 0.0000423000, 0.0000297000, 0.0000211000,
0.0000059700, 0.0000044500, 0.0000033500, 0.0000025400, 0.0000019400], 0.0000152000, 0.0000110000, 0.0000080700, 0.0000059700, 0.0000044500, 0.0000033500,
0.0000025400, 0.0000019400,
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, ],
0.0021000000, 0.0012400000, 0.0007470000, 0.0004590000, 0.0002870000, @[
0.0001830000, 0.0001180000, 0.0000777000, 0.0000518000, 0.0000350000, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0021000000,
0.0000240000, 0.0000166000, 0.0000116000, 0.0000082300, 0.0000058900, 0.0012400000, 0.0007470000, 0.0004590000, 0.0002870000, 0.0001830000, 0.0001180000,
0.0000042500, 0.0000031000, 0.0000022800, 0.0000016900, 0.0000012600], 0.0000777000, 0.0000518000, 0.0000350000, 0.0000240000, 0.0000166000, 0.0000116000,
0.0000082300, 0.0000058900, 0.0000042500, 0.0000031000, 0.0000022800, 0.0000016900,
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0000012600,
0.0007780000, 0.0004660000, 0.0002840000, 0.0001760000, 0.0001110000, ],
0.0000712000, 0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000, @[
0.0000094200, 0.0000065200, 0.0000045600, 0.0000032200, 0.0000022900, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0000016500, 0.0000012000, 0.0000008740] 0.0007780000, 0.0004660000, 0.0002840000, 0.0001760000, 0.0001110000, 0.0000712000,
0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000, 0.0000094200, 0.0000065200,
0.0000045600, 0.0000032200, 0.0000022900, 0.0000016500, 0.0000012000, 0.0000008740,
],
] ]

View File

@ -8,18 +8,18 @@ proc toBytes(s: string): seq[byte] =
proc encode*(msg: Message): ProtoBuffer = proc encode*(msg: Message): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write(1, msg.messageId) pb.write(1, msg.messageId)
pb.write(2, uint64(msg.lamportTimestamp)) pb.write(2, uint64(msg.lamportTimestamp))
for hist in msg.causalHistory: for hist in msg.causalHistory:
pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling
pb.write(4, msg.channelId) pb.write(4, msg.channelId)
pb.write(5, msg.content) pb.write(5, msg.content)
pb.write(6, msg.bloomFilter) pb.write(6, msg.bloomFilter)
pb.finish() pb.finish()
pb pb
proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] = proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] =
@ -47,11 +47,11 @@ proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] =
return err(ProtobufError.missingRequiredField("content")) return err(ProtobufError.missingRequiredField("content"))
if not ?pb.getField(6, msg.bloomFilter): if not ?pb.getField(6, msg.bloomFilter):
msg.bloomFilter = @[] # Empty if not present msg.bloomFilter = @[] # Empty if not present
ok(msg) ok(msg)
proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] = proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] =
try: try:
let pb = encode(msg) let pb = encode(msg)
ok(pb.buffer) ok(pb.buffer)
@ -71,19 +71,19 @@ proc deserializeMessage*(data: seq[byte]): Result[Message, ReliabilityError] =
proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] = proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] =
try: try:
var pb = initProtoBuffer() var pb = initProtoBuffer()
# Convert intArray to bytes # Convert intArray to bytes
var bytes = newSeq[byte](filter.intArray.len * sizeof(int)) var bytes = newSeq[byte](filter.intArray.len * sizeof(int))
for i, val in filter.intArray: for i, val in filter.intArray:
let start = i * sizeof(int) let start = i * sizeof(int)
copyMem(addr bytes[start], unsafeAddr val, sizeof(int)) copyMem(addr bytes[start], unsafeAddr val, sizeof(int))
pb.write(1, bytes) pb.write(1, bytes)
pb.write(2, uint64(filter.capacity)) pb.write(2, uint64(filter.capacity))
pb.write(3, uint64(filter.errorRate * 1_000_000)) pb.write(3, uint64(filter.errorRate * 1_000_000))
pb.write(4, uint64(filter.kHashes)) pb.write(4, uint64(filter.kHashes))
pb.write(5, uint64(filter.mBits)) pb.write(5, uint64(filter.mBits))
pb.finish() pb.finish()
ok(pb.buffer) ok(pb.buffer)
except: except:
@ -92,31 +92,31 @@ proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityEr
proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] = proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] =
if data.len == 0: if data.len == 0:
return err(reDeserializationError) return err(reDeserializationError)
try: try:
let pb = initProtoBuffer(data) let pb = initProtoBuffer(data)
var bytes: seq[byte] var bytes: seq[byte]
var cap, errRate, kHashes, mBits: uint64 var cap, errRate, kHashes, mBits: uint64
if not pb.getField(1, bytes).get() or if not pb.getField(1, bytes).get() or not pb.getField(2, cap).get() or
not pb.getField(2, cap).get() or not pb.getField(3, errRate).get() or not pb.getField(4, kHashes).get() or
not pb.getField(3, errRate).get() or not pb.getField(5, mBits).get():
not pb.getField(4, kHashes).get() or
not pb.getField(5, mBits).get():
return err(reDeserializationError) return err(reDeserializationError)
# Convert bytes back to intArray # Convert bytes back to intArray
var intArray = newSeq[int](bytes.len div sizeof(int)) var intArray = newSeq[int](bytes.len div sizeof(int))
for i in 0 ..< intArray.len: for i in 0 ..< intArray.len:
let start = i * sizeof(int) let start = i * sizeof(int)
copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int)) copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int))
ok(BloomFilter( ok(
intArray: intArray, BloomFilter(
capacity: int(cap), intArray: intArray,
errorRate: float(errRate) / 1_000_000, capacity: int(cap),
kHashes: int(kHashes), errorRate: float(errRate) / 1_000_000,
mBits: int(mBits) kHashes: int(kHashes),
)) mBits: int(mBits),
)
)
except: except:
err(reDeserializationError) err(reDeserializationError)

View File

@ -29,4 +29,4 @@ converter toProtobufError*(err: minprotobuf.ProtoError): ProtobufError =
ProtobufError(kind: ProtobufErrorKind.DecodeFailure, error: err) ProtobufError(kind: ProtobufErrorKind.DecodeFailure, error: err)
proc missingRequiredField*(T: type ProtobufError, field: string): T = proc missingRequiredField*(T: type ProtobufError, field: string): T =
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field) ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field)

View File

@ -2,7 +2,9 @@ import std/[times, locks, tables, sets]
import chronos, results import chronos, results
import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter] import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter]
proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager, ReliabilityError] = proc newReliabilityManager*(
channelId: string, config: ReliabilityConfig = defaultConfig()
): Result[ReliabilityManager, ReliabilityError] =
## Creates a new ReliabilityManager with the specified channel ID and configuration. ## Creates a new ReliabilityManager with the specified channel ID and configuration.
## ##
## Parameters: ## Parameters:
@ -13,14 +15,12 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau
## A Result containing either a new ReliabilityManager instance or an error. ## A Result containing either a new ReliabilityManager instance or an error.
if channelId.len == 0: if channelId.len == 0:
return err(reInvalidArgument) return err(reInvalidArgument)
try: try:
let bloomFilter = newRollingBloomFilter( let bloomFilter = newRollingBloomFilter(
config.bloomFilterCapacity, config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow
config.bloomFilterErrorRate,
config.bloomFilterWindow
) )
let rm = ReliabilityManager( let rm = ReliabilityManager(
lamportTimestamp: 0, lamportTimestamp: 0,
messageHistory: @[], messageHistory: @[],
@ -28,7 +28,7 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau
outgoingBuffer: @[], outgoingBuffer: @[],
incomingBuffer: @[], incomingBuffer: @[],
channelId: channelId, channelId: channelId,
config: config config: config,
) )
initLock(rm.lock) initLock(rm.lock)
return ok(rm) return ok(rm)
@ -40,35 +40,42 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) =
while i < rm.outgoingBuffer.len: while i < rm.outgoingBuffer.len:
var acknowledged = false var acknowledged = false
let outMsg = rm.outgoingBuffer[i] let outMsg = rm.outgoingBuffer[i]
# Check if message is in causal history # Check if message is in causal history
for msgID in msg.causalHistory: for msgID in msg.causalHistory:
if outMsg.message.messageId == msgID: if outMsg.message.messageId == msgID:
acknowledged = true acknowledged = true
break break
# Check bloom filter if not already acknowledged # Check bloom filter if not already acknowledged
if not acknowledged and msg.bloomFilter.len > 0: if not acknowledged and msg.bloomFilter.len > 0:
let bfResult = deserializeBloomFilter(msg.bloomFilter) let bfResult = deserializeBloomFilter(msg.bloomFilter)
if bfResult.isOk: if bfResult.isOk:
var rbf = RollingBloomFilter( var rbf = RollingBloomFilter(
filter: bfResult.get(), filter: bfResult.get(), window: rm.bloomFilter.window, messages: @[]
window: rm.bloomFilter.window,
messages: @[]
) )
if rbf.contains(outMsg.message.messageId): if rbf.contains(outMsg.message.messageId):
acknowledged = true acknowledged = true
else: else:
logError("Failed to deserialize bloom filter") logError("Failed to deserialize bloom filter")
if acknowledged: if acknowledged:
echo "[Nim Core] reviewAckStatus: Message acknowledged: ",
outMsg.message.messageId
if rm.onMessageSent != nil: if rm.onMessageSent != nil:
rm.onMessageSent(outMsg.message.messageId) echo "[Nim Core] reviewAckStatus: Calling onMessageSent for: ",
outMsg.message.messageId
rm.onMessageSent(rm, outMsg.message.messageId) # Pass rm
else:
echo "[Nim Core] reviewAckStatus: rm.onMessageSent is nil, cannot call callback for: ",
outMsg.message.messageId
rm.outgoingBuffer.delete(i) rm.outgoingBuffer.delete(i)
else: else:
inc i inc i
proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: MessageID): Result[seq[byte], ReliabilityError] = proc wrapOutgoingMessage*(
rm: ReliabilityManager, message: seq[byte], messageId: MessageID
): Result[seq[byte], ReliabilityError] =
## Wraps an outgoing message with reliability metadata. ## Wraps an outgoing message with reliability metadata.
## ##
## Parameters: ## Parameters:
@ -84,7 +91,7 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId:
withLock rm.lock: withLock rm.lock:
try: try:
rm.updateLamportTimestamp(getTime().toUnix) rm.updateLamportTimestamp(getTime().toUnix)
# Serialize current bloom filter # Serialize current bloom filter
var bloomBytes: seq[byte] var bloomBytes: seq[byte]
let bfResult = serializeBloomFilter(rm.bloomFilter.filter) let bfResult = serializeBloomFilter(rm.bloomFilter.filter)
@ -100,15 +107,13 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId:
causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory), causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory),
channelId: rm.channelId, channelId: rm.channelId,
content: message, content: message,
bloomFilter: bloomBytes bloomFilter: bloomBytes,
) )
# Add to outgoing buffer # Add to outgoing buffer
rm.outgoingBuffer.add(UnacknowledgedMessage( rm.outgoingBuffer.add(
message: msg, UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)
sendTime: getTime(), )
resendAttempts: 0
))
# Add to causal history and bloom filter # Add to causal history and bloom filter
rm.bloomFilter.add(msg.messageId) rm.bloomFilter.add(msg.messageId)
@ -153,10 +158,15 @@ proc processIncomingBuffer(rm: ReliabilityManager) =
for msg in rm.incomingBuffer: for msg in rm.incomingBuffer:
if msg.messageId == msgId: if msg.messageId == msgId:
rm.addToHistory(msg.messageId) rm.addToHistory(msg.messageId)
echo "[Nim Core] processIncomingBuffer: Message ready: ", msg.messageId
if rm.onMessageReady != nil: if rm.onMessageReady != nil:
rm.onMessageReady(msg.messageId) echo "[Nim Core] processIncomingBuffer: Calling onMessageReady for: ",
msg.messageId
rm.onMessageReady(rm, msg.messageId) # Pass rm
else:
echo "[Nim Core] processIncomingBuffer: rm.onMessageReady is nil, cannot call callback for: ",
msg.messageId
processed.incl(msgId) processed.incl(msgId)
# Add any dependent messages that might now be ready # Add any dependent messages that might now be ready
if msgId in dependencies: if msgId in dependencies:
for dependentId in dependencies[msgId]: for dependentId in dependencies[msgId]:
@ -170,7 +180,9 @@ proc processIncomingBuffer(rm: ReliabilityManager) =
rm.incomingBuffer = newIncomingBuffer rm.incomingBuffer = newIncomingBuffer
proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] = proc unwrapReceivedMessage*(
rm: ReliabilityManager, message: seq[byte]
): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] =
## Unwraps a received message and processes its reliability metadata. ## Unwraps a received message and processes its reliability metadata.
## ##
## Parameters: ## Parameters:
@ -182,12 +194,14 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[
let msgResult = deserializeMessage(message) let msgResult = deserializeMessage(message)
if not msgResult.isOk: if not msgResult.isOk:
return err(msgResult.error) return err(msgResult.error)
let msg = msgResult.get let msg = msgResult.get
if rm.bloomFilter.contains(msg.messageId): if rm.bloomFilter.contains(msg.messageId):
echo "[Nim Core] unwrapReceivedMessage: Duplicate message detected (in bloom filter): ",
msg.messageId # Add this log
return ok((msg.content, @[])) return ok((msg.content, @[]))
rm.bloomFilter.add(msg.messageId) rm.bloomFilter.add(msg.messageId) # Add to receiver's bloom filter
# Update Lamport timestamp # Update Lamport timestamp
rm.updateLamportTimestamp(msg.lamportTimestamp) rm.updateLamportTimestamp(msg.lamportTimestamp)
@ -212,20 +226,46 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[
else: else:
# All dependencies met, add to history # All dependencies met, add to history
rm.addToHistory(msg.messageId) rm.addToHistory(msg.messageId)
rm.processIncomingBuffer() rm.processIncomingBuffer() # This might trigger onMessageReady internally
# If processIncomingBuffer didn't handle it (e.g., buffer was empty), handle it now.
# We know deps are met, so it should be ready.
# NOTE: Need to ensure addToHistory isn't called twice if processIncomingBuffer also adds it.
# Let's assume processIncomingBuffer handles adding to history if it processes the message.
# We only call the callback here if it wasn't handled by processIncomingBuffer.
# A more robust check would involve seeing if msgId was added to 'processed' set in processIncomingBuffer,
# but let's try simply calling the callback if the condition is met.
# We already added to history on line 222.
echo "[Nim Core] unwrapReceivedMessage: Message ready (direct): ", msg.messageId
# rm.addToHistory(msg.messageId) # Removed potential duplicate add
if rm.onMessageReady != nil: if rm.onMessageReady != nil:
rm.onMessageReady(msg.messageId) echo "[Nim Core] unwrapReceivedMessage: Calling onMessageReady for: ",
msg.messageId
rm.onMessageReady(rm, msg.messageId) # Pass rm
else:
echo "[Nim Core] unwrapReceivedMessage: rm.onMessageReady is nil, cannot call callback for: ",
msg.messageId
else: else:
# Buffer message and request missing dependencies # Buffer message and request missing dependencies
echo "[Nim Core] unwrapReceivedMessage: Buffering message due to missing deps: ",
msg.messageId
rm.incomingBuffer.add(msg) rm.incomingBuffer.add(msg)
echo "[Nim Core] unwrapReceivedMessage: Checking onMissingDependencies callback for: ",
msg.messageId
if rm.onMissingDependencies != nil: if rm.onMissingDependencies != nil:
rm.onMissingDependencies(msg.messageId, missingDeps) echo "[Nim Core] unwrapReceivedMessage: Calling onMissingDependencies for: ",
msg.messageId
rm.onMissingDependencies(rm, msg.messageId, missingDeps) # Pass rm
else:
echo "[Nim Core] unwrapReceivedMessage: rm.onMissingDependencies is nil, cannot call callback for: ",
msg.messageId
return ok((msg.content, missingDeps)) return ok((msg.content, missingDeps))
except: except:
return err(reInternalError) return err(reInternalError)
proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): Result[void, ReliabilityError] = proc markDependenciesMet*(
rm: ReliabilityManager, messageIds: seq[MessageID]
): Result[void, ReliabilityError] =
## Marks the specified message dependencies as met. ## Marks the specified message dependencies as met.
## ##
## Parameters: ## Parameters:
@ -239,17 +279,24 @@ proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): R
if not rm.bloomFilter.contains(msgId): if not rm.bloomFilter.contains(msgId):
rm.bloomFilter.add(msgId) rm.bloomFilter.add(msgId)
# rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application? # rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application?
echo "[Nim Core] markDependenciesMet: Calling processIncomingBuffer after marking deps"
rm.processIncomingBuffer() rm.processIncomingBuffer()
return ok() return ok()
except: except:
return err(reInternalError) return err(reInternalError)
proc setCallbacks*(rm: ReliabilityManager, proc setCallbacks*(
onMessageReady: proc(messageId: MessageID) {.gcsafe.}, rm: ReliabilityManager,
onMessageSent: proc(messageId: MessageID) {.gcsafe.}, onMessageReady: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.},
onMissingDependencies: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}, # Add rm
onPeriodicSync: PeriodicSyncCallback = nil) = onMessageSent: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.},
# Add rm
onMissingDependencies: proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.}, # Add rm
onPeriodicSync: proc(rm: ReliabilityManager) {.gcsafe.} = nil,
) = # Add rm, make type explicit
## Sets the callback functions for various events in the ReliabilityManager. ## Sets the callback functions for various events in the ReliabilityManager.
## ##
## Parameters: ## Parameters:
@ -268,7 +315,7 @@ proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} =
withLock rm.lock: withLock rm.lock:
let now = getTime() let now = getTime()
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]
try: try:
for unackMsg in rm.outgoingBuffer: for unackMsg in rm.outgoingBuffer:
let elapsed = now - unackMsg.sendTime let elapsed = now - unackMsg.sendTime
@ -281,8 +328,15 @@ proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} =
newOutgoingBuffer.add(updatedMsg) newOutgoingBuffer.add(updatedMsg)
else: else:
if rm.onMessageSent != nil: if rm.onMessageSent != nil:
rm.onMessageSent(unackMsg.message.messageId) # Assuming message timeout means it's considered "sent" or "failed"
echo "[Nim Core] checkUnacknowledgedMessages: Calling onMessageSent for timed out message: ",
unackMsg.message.messageId
rm.onMessageSent(rm, unackMsg.message.messageId) # Pass rm
else:
echo "[Nim Core] checkUnacknowledgedMessages: rm.onMessageSent is nil for timed out message: ",
unackMsg.message.messageId
else: else:
# Dedent this else to match `if elapsed > rm.config.resendInterval:` (line 296)
newOutgoingBuffer.add(unackMsg) newOutgoingBuffer.add(unackMsg)
rm.outgoingBuffer = newOutgoingBuffer rm.outgoingBuffer = newOutgoingBuffer
@ -298,7 +352,7 @@ proc periodicBufferSweep(rm: ReliabilityManager) {.async: (raises: [CancelledErr
rm.cleanBloomFilter() rm.cleanBloomFilter()
except Exception as e: except Exception as e:
logError("Error in periodic buffer sweep: " & e.msg) logError("Error in periodic buffer sweep: " & e.msg)
await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds)) await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))
proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} =
@ -306,8 +360,12 @@ proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledErr
while true: while true:
{.gcsafe.}: {.gcsafe.}:
try: try:
echo "[Nim Core] periodicSyncMessage: Checking onPeriodicSync callback"
if rm.onPeriodicSync != nil: if rm.onPeriodicSync != nil:
rm.onPeriodicSync() echo "[Nim Core] periodicSyncMessage: Calling onPeriodicSync"
rm.onPeriodicSync(rm) # Pass rm
else:
echo "[Nim Core] periodicSyncMessage: rm.onPeriodicSync is nil"
except Exception as e: except Exception as e:
logError("Error in periodic sync: " & e.msg) logError("Error in periodic sync: " & e.msg)
await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds)) await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds))
@ -333,10 +391,9 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE
rm.outgoingBuffer.setLen(0) rm.outgoingBuffer.setLen(0)
rm.incomingBuffer.setLen(0) rm.incomingBuffer.setLen(0)
rm.bloomFilter = newRollingBloomFilter( rm.bloomFilter = newRollingBloomFilter(
rm.config.bloomFilterCapacity, rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate,
rm.config.bloomFilterErrorRate, rm.config.bloomFilterWindow,
rm.config.bloomFilterWindow
) )
return ok() return ok()
except: except:
return err(reInternalError) return err(reInternalError)

View File

@ -2,7 +2,25 @@ import std/[times, locks]
import ./[rolling_bloom_filter, message] import ./[rolling_bloom_filter, message]
type type
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} # Forward declare C types needed within ReliabilityManager definition
# Ideally, these would be imported from a shared header/module if possible,
# but defining them here avoids circular dependencies for now.
CEventType* {.importc: "CEventType", header: "../bindings/bindings.h", pure.} = enum
# Use relative path
EVENT_MESSAGE_READY = 1
EVENT_MESSAGE_SENT = 2
EVENT_MISSING_DEPENDENCIES = 3
EVENT_PERIODIC_SYNC = 4
CEventCallback* = proc(
handle: pointer,
eventType: CEventType,
data1: pointer,
data2: pointer,
data3: csize_t,
) {.cdecl, gcsafe.}
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} # This is the Nim internal type
ReliabilityConfig* = object ReliabilityConfig* = object
bloomFilterCapacity*: int bloomFilterCapacity*: int
@ -24,10 +42,19 @@ type
channelId*: string channelId*: string
config*: ReliabilityConfig config*: ReliabilityConfig
lock*: Lock lock*: Lock
onMessageReady*: proc(messageId: MessageID) # Nim internal callbacks (assigned in bindings/bindings.nim)
onMessageSent*: proc(messageId: MessageID) onMessageReady*: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.}
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) # Pass rm
onPeriodicSync*: proc() onMessageSent*: proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.}
# Pass rm
onMissingDependencies*: proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} # Pass rm
onPeriodicSync*: proc(rm: ReliabilityManager) {.gcsafe.} # Pass rm
# C callback info (set via RegisterCallback)
cCallback*: CEventCallback
cUserData*: pointer
ReliabilityError* = enum ReliabilityError* = enum
reInvalidArgument reInvalidArgument
@ -51,7 +78,7 @@ proc defaultConfig*(): ReliabilityConfig =
resendInterval: DefaultResendInterval, resendInterval: DefaultResendInterval,
maxResendAttempts: DefaultMaxResendAttempts, maxResendAttempts: DefaultMaxResendAttempts,
syncMessageInterval: DefaultSyncMessageInterval, syncMessageInterval: DefaultSyncMessageInterval,
bufferSweepInterval: DefaultBufferSweepInterval bufferSweepInterval: DefaultBufferSweepInterval,
) )
proc cleanup*(rm: ReliabilityManager) {.raises: [].} = proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
@ -76,7 +103,9 @@ proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [
if rm.messageHistory.len > rm.config.maxMessageHistory: if rm.messageHistory.len > rm.config.maxMessageHistory:
rm.messageHistory.delete(0) rm.messageHistory.delete(0)
proc updateLamportTimestamp*(rm: ReliabilityManager, msgTs: int64) {.gcsafe, raises: [].} = proc updateLamportTimestamp*(
rm: ReliabilityManager, msgTs: int64
) {.gcsafe, raises: [].} =
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1 rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1
proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] = proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] =

View File

@ -3,11 +3,10 @@ import chronos
import chronicles import chronicles
import ./[bloom, message] import ./[bloom, message]
type type RollingBloomFilter* = object
RollingBloomFilter* = object filter*: BloomFilter
filter*: BloomFilter window*: times.Duration
window*: times.Duration messages*: seq[TimestampedMessageID]
messages*: seq[TimestampedMessageID]
const const
DefaultBloomFilterCapacity* = 10000 DefaultBloomFilterCapacity* = 10000
@ -20,34 +19,33 @@ proc logError*(msg: string) =
proc logInfo*(msg: string) = proc logInfo*(msg: string) =
info "ReliabilityInfo", message = msg info "ReliabilityInfo", message = msg
proc newRollingBloomFilter*(capacity: int, errorRate: float, window: times.Duration): RollingBloomFilter {.gcsafe.} = proc newRollingBloomFilter*(
capacity: int, errorRate: float, window: times.Duration
): RollingBloomFilter {.gcsafe.} =
try: try:
var filterResult: Result[BloomFilter, string] var filterResult: Result[BloomFilter, string]
{.gcsafe.}: {.gcsafe.}:
filterResult = initializeBloomFilter(capacity, errorRate) filterResult = initializeBloomFilter(capacity, errorRate)
if filterResult.isOk: if filterResult.isOk:
logInfo("Successfully initialized bloom filter") logInfo("Successfully initialized bloom filter")
return RollingBloomFilter( return RollingBloomFilter(
filter: filterResult.get(), # Extract the BloomFilter from Result filter: filterResult.get(), # Extract the BloomFilter from Result
window: window, window: window,
messages: @[] messages: @[],
) )
else: else:
logError("Failed to initialize bloom filter: " & filterResult.error) logError("Failed to initialize bloom filter: " & filterResult.error)
# Fall through to default case below # Fall through to default case below
except: except:
logError("Failed to initialize bloom filter") logError("Failed to initialize bloom filter")
# Default fallback case # Default fallback case
let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) let defaultResult =
initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
if defaultResult.isOk: if defaultResult.isOk:
return RollingBloomFilter( return
filter: defaultResult.get(), RollingBloomFilter(filter: defaultResult.get(), window: window, messages: @[])
window: window,
messages: @[]
)
else: else:
# If even default initialization fails, raise an exception # If even default initialization fails, raise an exception
logError("Failed to initialize bloom filter with default parameters") logError("Failed to initialize bloom filter with default parameters")
@ -75,9 +73,10 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
let now = getTime() let now = getTime()
let cutoff = now - rbf.window let cutoff = now - rbf.window
var newMessages: seq[TimestampedMessageID] = @[] var newMessages: seq[TimestampedMessageID] = @[]
# Initialize new filter # Initialize new filter
let newFilterResult = initializeBloomFilter(rbf.filter.capacity, rbf.filter.errorRate) let newFilterResult =
initializeBloomFilter(rbf.filter.capacity, rbf.filter.errorRate)
if newFilterResult.isErr: if newFilterResult.isErr:
logError("Failed to create new bloom filter: " & newFilterResult.error) logError("Failed to create new bloom filter: " & newFilterResult.error)
return return
@ -92,4 +91,4 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
rbf.messages = newMessages rbf.messages = newMessages
rbf.filter = newFilter rbf.filter = newFilter
except Exception as e: except Exception as e:
logError("Failed to clean bloom filter: " & e.msg) logError("Failed to clean bloom filter: " & e.msg)

View File

@ -13,9 +13,9 @@ suite "bloom filter":
sampleChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" sampleChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
testElements = newSeq[string](nElementsToTest) testElements = newSeq[string](nElementsToTest)
for i in 0..<nElementsToTest: for i in 0 ..< nElementsToTest:
var newString = "" var newString = ""
for j in 0..7: for j in 0 .. 7:
newString.add(sampleChars[rand(51)]) newString.add(sampleChars[rand(51)])
testElements[i] = newString testElements[i] = newString
@ -26,11 +26,11 @@ suite "bloom filter":
check bf.capacity == nElementsToTest check bf.capacity == nElementsToTest
check bf.errorRate == 0.001 check bf.errorRate == 0.001
check bf.kHashes == 10 check bf.kHashes == 10
check bf.mBits div bf.capacity == 15 # bits per element check bf.mBits div bf.capacity == 15 # bits per element
test "basic operations": test "basic operations":
check bf.lookup("nonexistent") == false # Test empty lookup check bf.lookup("nonexistent") == false # Test empty lookup
let bf2Result = initializeBloomFilter(100, 0.01) let bf2Result = initializeBloomFilter(100, 0.01)
check bf2Result.isOk check bf2Result.isOk
var bf2 = bf2Result.get var bf2 = bf2Result.get
@ -41,16 +41,16 @@ suite "bloom filter":
test "error rate": test "error rate":
var falsePositives = 0 var falsePositives = 0
let testSize = nElementsToTest div 2 let testSize = nElementsToTest div 2
for i in 0..<testSize: for i in 0 ..< testSize:
var testString = "" var testString = ""
for j in 0..8: # Different length than setup for j in 0 .. 8: # Different length than setup
testString.add(sampleChars[rand(51)]) testString.add(sampleChars[rand(51)])
if bf.lookup(testString): if bf.lookup(testString):
falsePositives.inc() falsePositives.inc()
let actualErrorRate = falsePositives.float / testSize.float let actualErrorRate = falsePositives.float / testSize.float
check actualErrorRate < bf.errorRate * 1.5 # Allow some margin check actualErrorRate < bf.errorRate * 1.5 # Allow some margin
test "perfect recall": test "perfect recall":
var lookupErrors = 0 var lookupErrors = 0
for item in testElements: for item in testElements:
@ -62,12 +62,14 @@ suite "bloom filter":
# Test error case for k > 12 # Test error case for k > 12
let errorCase = getMOverNBitsForK(k = 13, targetError = 0.01) let errorCase = getMOverNBitsForK(k = 13, targetError = 0.01)
check errorCase.isErr check errorCase.isErr
check errorCase.error == "K must be <= 12 if forceNBitsPerElem is not also specified." check errorCase.error ==
"K must be <= 12 if forceNBitsPerElem is not also specified."
# Test error case for unachievable error rate # Test error case for unachievable error rate
let errorCase2 = getMOverNBitsForK(k = 2, targetError = 0.00001) let errorCase2 = getMOverNBitsForK(k = 2, targetError = 0.00001)
check errorCase2.isErr check errorCase2.isErr
check errorCase2.error == "Specified value of k and error rate not achievable using less than 4 bytes / element." check errorCase2.error ==
"Specified value of k and error rate not achievable using less than 4 bytes / element."
# Test success cases # Test success cases
let case1 = getMOverNBitsForK(k = 2, targetError = 0.1) let case1 = getMOverNBitsForK(k = 2, targetError = 0.1)
@ -93,50 +95,51 @@ suite "bloom filter":
check bf3Result.isOk check bf3Result.isOk
let bf3 = bf3Result.get let bf3 = bf3Result.get
let str = $bf3 let str = $bf3
check str.contains("1000") # Capacity check str.contains("1000") # Capacity
check str.contains("4 hash") # Hash functions check str.contains("4 hash") # Hash functions
check str.contains("1.0e-02") # Error rate in scientific notation check str.contains("1.0e-02") # Error rate in scientific notation
suite "bloom filter special cases": suite "bloom filter special cases":
test "different patterns of strings": test "different patterns of strings":
const testSize = 10_000 const testSize = 10_000
let patterns = @[ let patterns =
"shortstr", @[
repeat("a", 1000), # Very long string "shortstr",
"special@#$%^&*()", # Special characters repeat("a", 1000), # Very long string
"unicode→★∑≈", # Unicode characters "special@#$%^&*()", # Special characters
repeat("pattern", 10) # Repeating pattern "unicode→★∑≈", # Unicode characters
] repeat("pattern", 10), # Repeating pattern
]
let bfResult = initializeBloomFilter(testSize, 0.01) let bfResult = initializeBloomFilter(testSize, 0.01)
check bfResult.isOk check bfResult.isOk
var bf = bfResult.get var bf = bfResult.get
var inserted = newSeq[string](testSize) var inserted = newSeq[string](testSize)
# Test pattern handling # Test pattern handling
for pattern in patterns: for pattern in patterns:
bf.insert(pattern) bf.insert(pattern)
assert bf.lookup(pattern), "failed lookup pattern: " & pattern assert bf.lookup(pattern), "failed lookup pattern: " & pattern
# Test general insertion and lookup # Test general insertion and lookup
for i in 0..<testSize: for i in 0 ..< testSize:
inserted[i] = $i & "test" & $rand(1000) inserted[i] = $i & "test" & $rand(1000)
bf.insert(inserted[i]) bf.insert(inserted[i])
# Verify all insertions # Verify all insertions
var lookupErrors = 0 var lookupErrors = 0
for item in inserted: for item in inserted:
if not bf.lookup(item): if not bf.lookup(item):
lookupErrors.inc() lookupErrors.inc()
check lookupErrors == 0 check lookupErrors == 0
# Check false positive rate # Check false positive rate
var falsePositives = 0 var falsePositives = 0
let fpTestSize = testSize div 2 let fpTestSize = testSize div 2
for i in 0..<fpTestSize: for i in 0 ..< fpTestSize:
let testItem = "notpresent" & $i & $rand(1000) let testItem = "notpresent" & $i & $rand(1000)
if bf.lookup(testItem): if bf.lookup(testItem):
falsePositives.inc() falsePositives.inc()
let fpRate = falsePositives.float / fpTestSize.float let fpRate = falsePositives.float / fpTestSize.float
check fpRate < bf.errorRate * 1.5 # Allow some margin but should be close to target check fpRate < bf.errorRate * 1.5 # Allow some margin but should be close to target

View File

@ -25,7 +25,7 @@ suite "Core Operations":
test "basic message wrapping and unwrapping": test "basic message wrapping and unwrapping":
let msg = @[byte(1), 2, 3] let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1" let msgId = "test-msg-1"
let wrappedResult = rm.wrapOutgoingMessage(msg, msgId) let wrappedResult = rm.wrapOutgoingMessage(msg, msgId)
check wrappedResult.isOk() check wrappedResult.isOk()
let wrapped = wrappedResult.get() let wrapped = wrappedResult.get()
@ -46,7 +46,7 @@ suite "Core Operations":
causalHistory: @[], causalHistory: @[],
channelId: "testChannel", channelId: "testChannel",
content: @[byte(1)], content: @[byte(1)],
bloomFilter: @[] bloomFilter: @[],
) )
let msg2 = Message( let msg2 = Message(
@ -55,7 +55,7 @@ suite "Core Operations":
causalHistory: @[], causalHistory: @[],
channelId: "testChannel", channelId: "testChannel",
content: @[byte(2)], content: @[byte(2)],
bloomFilter: @[] bloomFilter: @[],
) )
let serialized1 = serializeMessage(msg1) let serialized1 = serializeMessage(msg1)
@ -90,10 +90,16 @@ suite "Reliability Mechanisms":
var messageSentCount = 0 var messageSentCount = 0
var missingDepsCount = 0 var missingDepsCount = 0
# Update anonymous procs to match new signature (add rm parameter)
rm.setCallbacks( rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = messageReadyCount += 1, onMessageReady = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, messageReadyCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = missingDepsCount += 1 onMessageSent = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
onMissingDependencies = proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} =
missingDepsCount += 1, # onPeriodicSync is left as default nil
) )
# Create dependency chain: msg3 -> msg2 -> msg1 # Create dependency chain: msg3 -> msg2 -> msg1
@ -105,19 +111,19 @@ suite "Reliability Mechanisms":
let msg2 = Message( let msg2 = Message(
messageId: id2, messageId: id2,
lamportTimestamp: 2, lamportTimestamp: 2,
causalHistory: @[id1], # msg2 depends on msg1 causalHistory: @[id1], # msg2 depends on msg1
channelId: "testChannel", channelId: "testChannel",
content: @[byte(2)], content: @[byte(2)],
bloomFilter: @[] bloomFilter: @[],
) )
let msg3 = Message( let msg3 = Message(
messageId: id3, messageId: id3,
lamportTimestamp: 3, lamportTimestamp: 3,
causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2 causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2
channelId: "testChannel", channelId: "testChannel",
content: @[byte(3)], content: @[byte(3)],
bloomFilter: @[] bloomFilter: @[],
) )
let serialized2 = serializeMessage(msg2) let serialized2 = serializeMessage(msg2)
@ -130,10 +136,10 @@ suite "Reliability Mechanisms":
let unwrapResult3 = rm.unwrapReceivedMessage(serialized3.get()) let unwrapResult3 = rm.unwrapReceivedMessage(serialized3.get())
check unwrapResult3.isOk() check unwrapResult3.isOk()
let (_, missingDeps3) = unwrapResult3.get() let (_, missingDeps3) = unwrapResult3.get()
check: check:
missingDepsCount == 1 # Should trigger missing deps callback missingDepsCount == 1 # Should trigger missing deps callback
missingDeps3.len == 2 # Should be missing both msg1 and msg2 missingDeps3.len == 2 # Should be missing both msg1 and msg2
id1 in missingDeps3 id1 in missingDeps3
id2 in missingDeps3 id2 in missingDeps3
@ -141,12 +147,12 @@ suite "Reliability Mechanisms":
let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get()) let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get())
check unwrapResult2.isOk() check unwrapResult2.isOk()
let (_, missingDeps2) = unwrapResult2.get() let (_, missingDeps2) = unwrapResult2.get()
check: check:
missingDepsCount == 2 # Should have triggered another missing deps callback missingDepsCount == 2 # Should have triggered another missing deps callback
missingDeps2.len == 1 # Should only be missing msg1 missingDeps2.len == 1 # Should only be missing msg1
id1 in missingDeps2 id1 in missingDeps2
messageReadyCount == 0 # No messages should be ready yet messageReadyCount == 0 # No messages should be ready yet
# Mark first dependency (msg1) as met # Mark first dependency (msg1) as met
let markResult1 = rm.markDependenciesMet(@[id1]) let markResult1 = rm.markDependenciesMet(@[id1])
@ -156,18 +162,24 @@ suite "Reliability Mechanisms":
check: check:
incomingBuffer.len == 0 incomingBuffer.len == 0
messageReadyCount == 2 # Both msg2 and msg3 should be ready messageReadyCount == 2 # Both msg2 and msg3 should be ready
missingDepsCount == 2 # Should still be 2 from the initial missing deps missingDepsCount == 2 # Should still be 2 from the initial missing deps
test "acknowledgment via causal history": test "acknowledgment via causal history":
var messageReadyCount = 0 var messageReadyCount = 0
var messageSentCount = 0 var messageSentCount = 0
var missingDepsCount = 0 var missingDepsCount = 0
# Update anonymous procs to match new signature (add rm parameter)
rm.setCallbacks( rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = messageReadyCount += 1, onMessageReady = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, messageReadyCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = missingDepsCount += 1 onMessageSent = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
onMissingDependencies = proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} =
missingDepsCount += 1,
) )
# Send our message # Send our message
@ -180,30 +192,37 @@ suite "Reliability Mechanisms":
let msg2 = Message( let msg2 = Message(
messageId: "msg2", messageId: "msg2",
lamportTimestamp: rm.lamportTimestamp + 1, lamportTimestamp: rm.lamportTimestamp + 1,
causalHistory: @[id1], # Include our message in causal history causalHistory: @[id1], # Include our message in causal history
channelId: "testChannel", channelId: "testChannel",
content: @[byte(2)], content: @[byte(2)],
bloomFilter: @[] # Test with an empty bloom filter bloomFilter: @[] # Test with an empty bloom filter
,
) )
let serializedMsg2 = serializeMessage(msg2) let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk() check serializedMsg2.isOk()
# Process the "received" message - should trigger callbacks # Process the "received" message - should trigger callbacks
let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get()) let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk() check unwrapResult.isOk()
check: check:
messageReadyCount == 1 # For msg2 which we "received" messageReadyCount == 1 # For msg2 which we "received"
messageSentCount == 1 # For msg1 which was acknowledged via causal history messageSentCount == 1 # For msg1 which was acknowledged via causal history
test "acknowledgment via bloom filter": test "acknowledgment via bloom filter":
var messageSentCount = 0 var messageSentCount = 0
# Update anonymous procs to match new signature (add rm parameter)
rm.setCallbacks( rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard, onMessageReady = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard onMessageSent = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
onMissingDependencies = proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} =
discard,
) )
# Send our message # Send our message
@ -214,22 +233,20 @@ suite "Reliability Mechanisms":
# Create a message with bloom filter containing our message # Create a message with bloom filter containing our message
var otherPartyBloomFilter = newRollingBloomFilter( var otherPartyBloomFilter = newRollingBloomFilter(
DefaultBloomFilterCapacity, DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate, DefaultBloomFilterWindow
DefaultBloomFilterErrorRate,
DefaultBloomFilterWindow
) )
otherPartyBloomFilter.add(id1) otherPartyBloomFilter.add(id1)
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter) let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
check bfResult.isOk() check bfResult.isOk()
let msg2 = Message( let msg2 = Message(
messageId: "msg2", messageId: "msg2",
lamportTimestamp: rm.lamportTimestamp + 1, lamportTimestamp: rm.lamportTimestamp + 1,
causalHistory: @[], # Empty causal history as we're using bloom filter causalHistory: @[], # Empty causal history as we're using bloom filter
channelId: "testChannel", channelId: "testChannel",
content: @[byte(2)], content: @[byte(2)],
bloomFilter: bfResult.get() bloomFilter: bfResult.get(),
) )
let serializedMsg2 = serializeMessage(msg2) let serializedMsg2 = serializeMessage(msg2)
@ -237,8 +254,8 @@ suite "Reliability Mechanisms":
let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get()) let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk() check unwrapResult.isOk()
check messageSentCount == 1 # Our message should be acknowledged via bloom filter check messageSentCount == 1 # Our message should be acknowledged via bloom filter
# Periodic task & Buffer management tests # Periodic task & Buffer management tests
suite "Periodic Tasks & Buffer Management": suite "Periodic Tasks & Buffer Management":
@ -255,15 +272,21 @@ suite "Periodic Tasks & Buffer Management":
test "outgoing buffer management": test "outgoing buffer management":
var messageSentCount = 0 var messageSentCount = 0
# Update anonymous procs to match new signature (add rm parameter)
rm.setCallbacks( rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard, onMessageReady = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard onMessageSent = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
onMissingDependencies = proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} =
discard,
) )
# Add multiple messages # Add multiple messages
for i in 0..5: for i in 0 .. 5:
let msg = @[byte(i)] let msg = @[byte(i)]
let id = "msg" & $i let id = "msg" & $i
let wrap = rm.wrapOutgoingMessage(msg, id) let wrap = rm.wrapOutgoingMessage(msg, id)
@ -279,37 +302,44 @@ suite "Periodic Tasks & Buffer Management":
causalHistory: @["msg0", "msg2", "msg4"], causalHistory: @["msg0", "msg2", "msg4"],
channelId: "testChannel", channelId: "testChannel",
content: @[byte(100)], content: @[byte(100)],
bloomFilter: @[] bloomFilter: @[],
) )
let serializedAck = serializeMessage(ackMsg) let serializedAck = serializeMessage(ackMsg)
check serializedAck.isOk() check serializedAck.isOk()
# Process the acknowledgment # Process the acknowledgment
discard rm.unwrapReceivedMessage(serializedAck.get()) discard rm.unwrapReceivedMessage(serializedAck.get())
let finalBuffer = rm.getOutgoingBuffer() let finalBuffer = rm.getOutgoingBuffer()
check: check:
finalBuffer.len == 3 # Should have removed acknowledged messages finalBuffer.len == 3 # Should have removed acknowledged messages
messageSentCount == 3 # Should have triggered sent callback for acknowledged messages messageSentCount == 3
# Should have triggered sent callback for acknowledged messages
test "periodic buffer sweep and bloom clean": test "periodic buffer sweep and bloom clean":
var messageSentCount = 0 var messageSentCount = 0
var config = defaultConfig() var config = defaultConfig()
config.resendInterval = initDuration(milliseconds = 100) # Short for testing config.resendInterval = initDuration(milliseconds = 100) # Short for testing
config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps
config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window
config.maxResendAttempts = 3 # Set a low number of max attempts config.maxResendAttempts = 3 # Set a low number of max attempts
let rmResultP = newReliabilityManager("testChannel", config) let rmResultP = newReliabilityManager("testChannel", config)
check rmResultP.isOk() check rmResultP.isOk()
let rm = rmResultP.get() let rm = rmResultP.get()
# Update anonymous procs to match new signature (add rm parameter)
rm.setCallbacks( rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard, onMessageReady = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard onMessageSent = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
onMissingDependencies = proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} =
discard,
) )
# First message - should be cleaned from bloom filter later # First message - should be cleaned from bloom filter later
@ -324,10 +354,10 @@ suite "Periodic Tasks & Buffer Management":
rm.bloomFilter.contains(id1) rm.bloomFilter.contains(id1)
rm.startPeriodicTasks() rm.startPeriodicTasks()
# Wait long enough for bloom filter window to pass and first message to exceed max retries # Wait long enough for bloom filter window to pass and first message to exceed max retries
waitFor sleepAsync(chronos.milliseconds(500)) waitFor sleepAsync(chronos.milliseconds(500))
# Add new message # Add new message
let msg2 = @[byte(2)] let msg2 = @[byte(2)]
let id2 = "msg2" let id2 = "msg2"
@ -336,27 +366,35 @@ suite "Periodic Tasks & Buffer Management":
let finalBuffer = rm.getOutgoingBuffer() let finalBuffer = rm.getOutgoingBuffer()
check: check:
finalBuffer.len == 1 # Only msg2 should be in buffer, msg1 should be removed after max retries finalBuffer.len == 1
# Only msg2 should be in buffer, msg1 should be removed after max retries
finalBuffer[0].message.messageId == id2 # Verify it's the second message finalBuffer[0].message.messageId == id2 # Verify it's the second message
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
not rm.bloomFilter.contains(id1) # Bloom filter cleaning check not rm.bloomFilter.contains(id1) # Bloom filter cleaning check
rm.bloomFilter.contains(id2) # New message still in filter rm.bloomFilter.contains(id2) # New message still in filter
rm.cleanup() rm.cleanup()
test "periodic sync callback": test "periodic sync callback":
var syncCallCount = 0 var syncCallCount = 0
# Update anonymous procs to match new signature (add rm parameter)
rm.setCallbacks( rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard, onMessageReady = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
proc(messageId: MessageID) {.gcsafe.} = discard, discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard, onMessageSent = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
proc() {.gcsafe.} = syncCallCount += 1 discard,
onMissingDependencies = proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} =
discard,
onPeriodicSync = proc(rm: ReliabilityManager) {.gcsafe.} =
syncCallCount += 1, # Already correct from previous partial apply
) )
rm.startPeriodicTasks() rm.startPeriodicTasks()
waitFor sleepAsync(chronos.seconds(1)) waitFor sleepAsync(chronos.seconds(1))
rm.cleanup() rm.cleanup()
check syncCallCount > 0 check syncCallCount > 0
# Special cases handling # Special cases handling
@ -374,12 +412,12 @@ suite "Special Cases Handling":
test "message history limits": test "message history limits":
# Add messages up to max history size # Add messages up to max history size
for i in 0..rm.config.maxMessageHistory + 5: for i in 0 .. rm.config.maxMessageHistory + 5:
let msg = @[byte(i)] let msg = @[byte(i)]
let id = "msg" & $i let id = "msg" & $i
let wrap = rm.wrapOutgoingMessage(msg, id) let wrap = rm.wrapOutgoingMessage(msg, id)
check wrap.isOk() check wrap.isOk()
let history = rm.getMessageHistory() let history = rm.getMessageHistory()
check: check:
history.len <= rm.config.maxMessageHistory history.len <= rm.config.maxMessageHistory
@ -392,7 +430,8 @@ suite "Special Cases Handling":
causalHistory: @[], causalHistory: @[],
channelId: "testChannel", channelId: "testChannel",
content: @[byte(1)], content: @[byte(1)],
bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data
,
) )
let serializedInvalid = serializeMessage(msgInvalid) let serializedInvalid = serializeMessage(msgInvalid)
@ -402,14 +441,20 @@ suite "Special Cases Handling":
let result = rm.unwrapReceivedMessage(serializedInvalid.get()) let result = rm.unwrapReceivedMessage(serializedInvalid.get())
check: check:
result.isOk() result.isOk()
result.get()[1].len == 0 # No missing dependencies result.get()[1].len == 0 # No missing dependencies
test "duplicate message handling": test "duplicate message handling":
var messageReadyCount = 0 var messageReadyCount = 0
# Update anonymous procs to match new signature (add rm parameter)
rm.setCallbacks( rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = messageReadyCount += 1, onMessageReady = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
proc(messageId: MessageID) {.gcsafe.} = discard, messageReadyCount += 1, # Already correct from previous partial apply
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard onMessageSent = proc(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
discard, # Already correct from previous partial apply
onMissingDependencies = proc(
rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]
) {.gcsafe.} =
discard, # Already correct from previous partial apply
) )
# Create and process a message # Create and process a message
@ -419,7 +464,7 @@ suite "Special Cases Handling":
causalHistory: @[], causalHistory: @[],
channelId: "testChannel", channelId: "testChannel",
content: @[byte(1)], content: @[byte(1)],
bloomFilter: @[] bloomFilter: @[],
) )
let serialized = serializeMessage(msg) let serialized = serializeMessage(msg)
@ -431,8 +476,8 @@ suite "Special Cases Handling":
let result2 = rm.unwrapReceivedMessage(serialized.get()) let result2 = rm.unwrapReceivedMessage(serialized.get())
check: check:
result2.isOk() result2.isOk()
result2.get()[1].len == 0 # No missing deps on second process result2.get()[1].len == 0 # No missing deps on second process
messageReadyCount == 1 # Message should only be processed once messageReadyCount == 1 # Message should only be processed once
test "error handling": test "error handling":
# Empty message # Empty message
@ -466,4 +511,4 @@ suite "cleanup":
let history = rm.getMessageHistory() let history = rm.getMessageHistory()
check: check:
outBuffer.len == 0 outBuffer.len == 0
history.len == 0 history.len == 0