mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-08 00:53:13 +00:00
feat: remove genny bindings, add manual bindings and go wrapper w basic test
This commit is contained in:
parent
bce0671c7b
commit
edfd257fca
3
.gitignore
vendored
3
.gitignore
vendored
@ -2,5 +2,6 @@
|
||||
tests/test_reliability
|
||||
tests/bloom
|
||||
nph
|
||||
bindings/generated*
|
||||
docs
|
||||
for_reference
|
||||
do_not_commit
|
||||
|
||||
131
bindings/bindings.h
Normal file
131
bindings/bindings.h
Normal file
@ -0,0 +1,131 @@
|
||||
#ifndef BINDINGS_H
|
||||
#define BINDINGS_H
|
||||
|
||||
#include <stddef.h> // For size_t
|
||||
#include <stdint.h> // For standard integer types
|
||||
#include <stdbool.h> // For bool type
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
// Opaque struct declaration (handle replaces direct pointer usage)
|
||||
typedef struct ReliabilityManager ReliabilityManager; // Keep forward declaration
|
||||
|
||||
// Define MessageID as a C string
|
||||
typedef const char* MessageID; // Keep const for the typedef itself
|
||||
|
||||
// --- Result Types ---
|
||||
|
||||
typedef struct {
|
||||
bool is_ok;
|
||||
char* error_message;
|
||||
} CResult;
|
||||
|
||||
typedef struct {
|
||||
CResult base_result;
|
||||
unsigned char* message;
|
||||
size_t message_len;
|
||||
MessageID* missing_deps;
|
||||
size_t missing_deps_count;
|
||||
} CUnwrapResult;
|
||||
|
||||
typedef struct {
|
||||
CResult base_result;
|
||||
unsigned char* message;
|
||||
size_t message_len;
|
||||
} CWrapResult;
|
||||
|
||||
|
||||
// --- Callback Function Pointer Types ---
|
||||
// Keep const char* here as these are inputs *to* the callback
|
||||
typedef void (*MessageReadyCallback)(const char* messageID);
|
||||
typedef void (*MessageSentCallback)(const char* messageID);
|
||||
typedef void (*MissingDependenciesCallback)(const char* messageID, const char** missingDeps, size_t missingDepsCount);
|
||||
typedef void (*PeriodicSyncCallback)(void* user_data);
|
||||
|
||||
|
||||
// --- Core API Functions ---
|
||||
|
||||
/**
|
||||
* @brief Creates a new ReliabilityManager instance.
|
||||
* @param channelId A unique identifier for the communication channel.
|
||||
* @return An opaque handle (void*) representing the instance, or NULL on failure.
|
||||
*/
|
||||
void* NewReliabilityManager(char* channelId);
|
||||
|
||||
/**
|
||||
* @brief Cleans up resources associated with a ReliabilityManager instance.
|
||||
* @param handle The opaque handle (void*) of the instance to clean up.
|
||||
*/
|
||||
void CleanupReliabilityManager(void* handle);
|
||||
|
||||
/**
|
||||
* @brief Resets the ReliabilityManager instance.
|
||||
* @param handle The opaque handle (void*) of the instance.
|
||||
* @return CResult indicating success or failure.
|
||||
*/
|
||||
CResult ResetReliabilityManager(void* handle);
|
||||
/**
|
||||
* @brief Wraps an outgoing message.
|
||||
* @param handle The opaque handle (void*) of the instance.
|
||||
* @param message Pointer to the raw message content.
|
||||
* @param messageLen Length of the raw message content.
|
||||
* @param messageId A unique identifier for this message.
|
||||
* @return CWrapResult containing the wrapped message or an error.
|
||||
*/
|
||||
CWrapResult WrapOutgoingMessage(void* handle, void* message, size_t messageLen, char* messageId);
|
||||
/**
|
||||
* @brief Unwraps a received message.
|
||||
* @param handle The opaque handle (void*) of the instance.
|
||||
* @param message Pointer to the received message data.
|
||||
* @param messageLen Length of the received message data.
|
||||
* @return CUnwrapResult containing the unwrapped content, missing dependencies, or an error.
|
||||
*/
|
||||
CUnwrapResult UnwrapReceivedMessage(void* handle, void* message, size_t messageLen);
|
||||
|
||||
/**
|
||||
* @brief Marks specified message dependencies as met.
|
||||
* @param handle The opaque handle (void*) of the instance.
|
||||
* @param messageIDs An array of message IDs to mark as met.
|
||||
* @param count The number of message IDs in the array.
|
||||
* @return CResult indicating success or failure.
|
||||
*/
|
||||
CResult MarkDependenciesMet(void* handle, char*** messageIDs, size_t count);
|
||||
|
||||
/**
|
||||
* @brief Registers callback functions.
|
||||
* @param handle The opaque handle (void*) of the instance.
|
||||
* @param messageReady Callback for when a message is ready.
|
||||
* @param messageSent Callback for when an outgoing message is acknowledged.
|
||||
* @param missingDependencies Callback for when missing dependencies are detected.
|
||||
* @param periodicSync Callback for periodic sync suggestions.
|
||||
* @param user_data A pointer to user-defined data passed to callbacks.
|
||||
*/
|
||||
void RegisterCallbacks(void* handle,
|
||||
void* messageReady,
|
||||
void* messageSent,
|
||||
void* missingDependencies,
|
||||
void* periodicSync,
|
||||
void* user_data); // Keep user_data, align with Nim proc
|
||||
|
||||
/**
|
||||
* @brief Starts the background periodic tasks.
|
||||
* @param handle The opaque handle (void*) of the instance.
|
||||
*/
|
||||
void StartPeriodicTasks(void* handle);
|
||||
|
||||
|
||||
// --- Memory Freeing Functions ---
|
||||
|
||||
void FreeCResultError(CResult result);
|
||||
void FreeCWrapResult(CWrapResult result);
|
||||
void FreeCUnwrapResult(CUnwrapResult result);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
} // extern "C"
|
||||
#endif
|
||||
|
||||
#endif // BINDINGS_H
|
||||
@ -1,242 +1,283 @@
|
||||
import genny
|
||||
import std/[times, strutils]
|
||||
import std/[locks, typetraits]
|
||||
import chronos
|
||||
import results
|
||||
import ../src/[reliability, message, reliability_utils, rolling_bloom_filter]
|
||||
import ../src/[reliability, reliability_utils, message]
|
||||
|
||||
# --- C Type Definitions ---
|
||||
|
||||
# Define required sequence wrapper types for C FFI
|
||||
type
|
||||
SeqByte* = ref object
|
||||
s*: seq[byte]
|
||||
CReliabilityManagerHandle* = pointer
|
||||
|
||||
SeqMessageID* = ref object
|
||||
s*: seq[MessageID]
|
||||
CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object
|
||||
is_ok*: bool
|
||||
error_message*: cstring
|
||||
|
||||
SeqMessage* = ref object
|
||||
s*: seq[Message]
|
||||
CWrapResult* {.importc: "CWrapResult", header: "bindings.h", bycopy.} = object
|
||||
base_result*: CResult
|
||||
message*: pointer
|
||||
message_len*: csize
|
||||
|
||||
SeqUnacknowledgedMessage* = ref object
|
||||
s*: seq[UnacknowledgedMessage]
|
||||
CUnwrapResult* {.importc: "CUnwrapResult", header: "bindings.h", bycopy.} = object
|
||||
base_result*: CResult
|
||||
message*: pointer
|
||||
message_len*: csize
|
||||
missing_deps*: ptr ptr cstring
|
||||
missing_deps_count*: csize
|
||||
|
||||
# Error handling
|
||||
var lastError: ReliabilityError
|
||||
# Callback Types
|
||||
CMessageReadyCallback* = proc (messageID: cstring) {.cdecl, gcsafe, raises: [].}
|
||||
CMessageSentCallback* = proc (messageID: cstring) {.cdecl, gcsafe, raises: [].}
|
||||
CMissingDependenciesCallback* = proc (messageID: cstring, missingDeps: ptr ptr cstring, missingDepsCount: csize) {.cdecl, gcsafe, raises: [].}
|
||||
CPeriodicSyncCallback* = proc (user_data: pointer) {.cdecl, gcsafe, raises: [].}
|
||||
|
||||
proc takeError(): string =
|
||||
result = $lastError
|
||||
lastError = ReliabilityError.reInternalError # Reset to default
|
||||
# --- Memory Management Helpers ---
|
||||
|
||||
proc checkError(): bool =
|
||||
result = lastError != ReliabilityError.reInternalError
|
||||
proc allocCString*(s: string): cstring {.inline, gcsafe.} =
|
||||
if s.len == 0: return nil
|
||||
result = cast[cstring](allocShared(s.len + 1))
|
||||
copyMem(result, s.cstring, s.len + 1)
|
||||
|
||||
# Callback function types for C FFI
|
||||
type
|
||||
CMessageReadyCallback* = proc(messageId: cstring) {.cdecl, gcsafe.}
|
||||
CMessageSentCallback* = proc(messageId: cstring) {.cdecl, gcsafe.}
|
||||
CMissingDepsCallback* = proc(messageId: cstring, missingDeps: cstring, count: cint) {.cdecl, gcsafe.}
|
||||
CPeriodicSyncCallback* = proc() {.cdecl, gcsafe.}
|
||||
proc allocSeqByte*(s: seq[byte]): (pointer, csize) {.inline, gcsafe.} =
|
||||
if s.len == 0: return (nil, 0)
|
||||
let len = s.len
|
||||
let bufferPtr = allocShared(len)
|
||||
if len > 0:
|
||||
copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural)
|
||||
return (bufferPtr, len.csize)
|
||||
|
||||
# Global callback storage
|
||||
var
|
||||
onMessageReadyCallback: CMessageReadyCallback
|
||||
onMessageSentCallback: CMessageSentCallback
|
||||
onMissingDepsCallback: CMissingDepsCallback
|
||||
onPeriodicSyncCallback: CPeriodicSyncCallback
|
||||
proc allocSeqCString*(s: seq[string]): (ptr ptr cstring, csize) {.inline, gcsafe, cdecl.} =
|
||||
if s.len == 0: return (nil, 0)
|
||||
let count = s.len
|
||||
let arrPtr = cast[ptr ptr cstring](allocShared(count * sizeof(cstring)))
|
||||
for i in 0..<count:
|
||||
let tempCStr: cstring = allocCString(s[i])
|
||||
copyMem(addr arrPtr[i], addr tempCStr, sizeof(cstring))
|
||||
return (arrPtr, count.csize)
|
||||
|
||||
# Register callbacks
|
||||
proc registerMessageReadyCallback*(callback: CMessageReadyCallback) =
|
||||
onMessageReadyCallback = callback
|
||||
proc freeCString*(cs: cstring) {.inline, gcsafe.} =
|
||||
if cs != nil: deallocShared(cs)
|
||||
|
||||
proc registerMessageSentCallback*(callback: CMessageSentCallback) =
|
||||
onMessageSentCallback = callback
|
||||
proc freeSeqByte*(bufferPtr: pointer) {.inline, gcsafe, cdecl.} =
|
||||
if bufferPtr != nil: deallocShared(bufferPtr)
|
||||
|
||||
proc registerMissingDepsCallback*(callback: CMissingDepsCallback) =
|
||||
onMissingDepsCallback = callback
|
||||
proc freeSeqCString*(arrPtr: ptr ptr cstring, count: csize) {.inline, gcsafe, cdecl.} =
|
||||
if arrPtr != nil:
|
||||
for i in 0..<count:
|
||||
freeCString(cast[cstring](arrPtr[i]))
|
||||
deallocShared(arrPtr)
|
||||
|
||||
proc registerPeriodicSyncCallback*(callback: CPeriodicSyncCallback) =
|
||||
onPeriodicSyncCallback = callback
|
||||
# --- Result Conversion Helpers ---
|
||||
|
||||
# Individual adapter functions
|
||||
proc onMessageReadyAdapter(messageId: MessageID) {.gcsafe, raises: [].} =
|
||||
if onMessageReadyCallback != nil:
|
||||
try:
|
||||
onMessageReadyCallback(cstring(messageId))
|
||||
except:
|
||||
discard # Ignore exceptions
|
||||
proc toCResultOk*(): CResult =
|
||||
CResult(is_ok: true, error_message: nil)
|
||||
|
||||
proc onMessageSentAdapter(messageId: MessageID) {.gcsafe, raises: [].} =
|
||||
if onMessageSentCallback != nil:
|
||||
try:
|
||||
onMessageSentCallback(cstring(messageId))
|
||||
except:
|
||||
discard
|
||||
proc toCResultErr*(err: ReliabilityError): CResult =
|
||||
CResult(is_ok: false, error_message: allocCString($err))
|
||||
|
||||
proc onMissingDependenciesAdapter(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe, raises: [].} =
|
||||
if onMissingDepsCallback != nil and missingDeps.len > 0:
|
||||
try:
|
||||
let joinedDeps = missingDeps.join(",")
|
||||
onMissingDepsCallback(cstring(messageId), cstring(joinedDeps), cint(missingDeps.len))
|
||||
except:
|
||||
discard
|
||||
proc toCResultErrStr*(errMsg: string): CResult =
|
||||
CResult(is_ok: false, error_message: allocCString(errMsg))
|
||||
|
||||
proc onPeriodicSyncAdapter() {.gcsafe, raises: [].} =
|
||||
if onPeriodicSyncCallback != nil:
|
||||
try:
|
||||
onPeriodicSyncCallback()
|
||||
except:
|
||||
discard
|
||||
# --- Callback Wrappers (Nim -> C) ---
|
||||
# These still accept the ReliabilityManager instance directly
|
||||
|
||||
# Apply registered callbacks to a ReliabilityManager
|
||||
proc applyCallbacks*(rm: ReliabilityManager): bool =
|
||||
if rm == nil:
|
||||
lastError = ReliabilityError.reInvalidArgument
|
||||
return false
|
||||
# These wrappers now need to handle the user_data explicitly if needed,
|
||||
# but the C callbacks themselves don't take it directly anymore (except PeriodicSync).
|
||||
# The user_data is stored in rm.cUserData.
|
||||
|
||||
try:
|
||||
rm.setCallbacks(
|
||||
onMessageReadyAdapter,
|
||||
onMessageSentAdapter,
|
||||
onMissingDependenciesAdapter,
|
||||
onPeriodicSyncAdapter
|
||||
proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
|
||||
let cbPtr = rm.cMessageReadyCallback
|
||||
if cbPtr != nil:
|
||||
let cb = cast[CMessageReadyCallback](cbPtr)
|
||||
# Call the C callback without user_data, as per the updated typedef
|
||||
cb(messageId.cstring)
|
||||
|
||||
proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) {.gcsafe.} =
|
||||
let cbPtr = rm.cMessageSentCallback
|
||||
if cbPtr != nil:
|
||||
let cb = cast[CMessageSentCallback](cbPtr)
|
||||
# Call the C callback without user_data
|
||||
cb(messageId.cstring)
|
||||
|
||||
proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||
let cbPtr = rm.cMissingDependenciesCallback
|
||||
if cbPtr != nil:
|
||||
var cDeps = newSeq[cstring](missingDeps.len)
|
||||
for i, dep in missingDeps:
|
||||
cDeps[i] = dep.cstring
|
||||
let cDepsPtr = if cDeps.len > 0: cDeps[0].addr else: nil
|
||||
let cb = cast[CMissingDependenciesCallback](cbPtr)
|
||||
# Call the C callback without user_data
|
||||
cb(messageId.cstring, cast[ptr ptr cstring](cDepsPtr), missingDeps.len.csize)
|
||||
|
||||
proc nimPeriodicSyncCallback(rm: ReliabilityManager) {.gcsafe.} =
|
||||
let cbPtr = rm.cPeriodicSyncCallback
|
||||
if cbPtr != nil:
|
||||
let cb = cast[CPeriodicSyncCallback](cbPtr)
|
||||
cb(rm.cUserData)
|
||||
|
||||
# --- Exported C Functions - Using Opaque Pointer (pointer/void*) ---
|
||||
|
||||
proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
let channelId = $channelIdCStr
|
||||
if channelId.len == 0:
|
||||
echo "Error creating ReliabilityManager: Channel ID cannot be empty"
|
||||
return nil # Return nil pointer
|
||||
let rmResult = newReliabilityManager(channelId)
|
||||
if rmResult.isOk:
|
||||
let rm = rmResult.get()
|
||||
# Initialize C callback fields to nil
|
||||
rm.cMessageReadyCallback = nil
|
||||
rm.cMessageSentCallback = nil
|
||||
rm.cMissingDependenciesCallback = nil
|
||||
rm.cPeriodicSyncCallback = nil
|
||||
rm.cUserData = nil
|
||||
# Assign Nim wrappers that capture the 'rm' instance directly
|
||||
rm.onMessageReady = proc(msgId: MessageID) {.gcsafe.} = nimMessageReadyCallback(rm, msgId)
|
||||
rm.onMessageSent = proc(msgId: MessageID) {.gcsafe.} = nimMessageSentCallback(rm, msgId)
|
||||
rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) {.gcsafe.} = nimMissingDependenciesCallback(rm, msgId, deps)
|
||||
rm.onPeriodicSync = proc() {.gcsafe.} = nimPeriodicSyncCallback(rm)
|
||||
|
||||
# Return the Nim ref object cast to the opaque pointer type
|
||||
return cast[CReliabilityManagerHandle](rm)
|
||||
else:
|
||||
echo "Error creating ReliabilityManager: ", rmResult.error
|
||||
return nil # Return nil pointer
|
||||
|
||||
proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
if handle != nil:
|
||||
# Cast opaque pointer back to Nim ref type
|
||||
let rm = cast[ReliabilityManager](handle)
|
||||
cleanup(rm) # Call Nim cleanup
|
||||
# Nim GC will collect 'rm' eventually as the handle is the only reference
|
||||
else:
|
||||
echo "Warning: CleanupReliabilityManager called with NULL handle"
|
||||
|
||||
proc ResetReliabilityManager*(handle: CReliabilityManagerHandle): CResult {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
if handle == nil:
|
||||
return toCResultErrStr("ReliabilityManager handle is NULL")
|
||||
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
|
||||
let result = resetReliabilityManager(rm)
|
||||
if result.isOk:
|
||||
return toCResultOk()
|
||||
else:
|
||||
return toCResultErr(result.error)
|
||||
|
||||
proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
if handle == nil:
|
||||
return CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
|
||||
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
|
||||
|
||||
if messageC == nil and messageLen > 0:
|
||||
return CWrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0"))
|
||||
if messageIdCStr == nil:
|
||||
return CWrapResult(base_result: toCResultErrStr("Message ID pointer is NULL"))
|
||||
|
||||
let messageId = $messageIdCStr
|
||||
var messageNim: seq[byte]
|
||||
if messageLen > 0:
|
||||
messageNim = newSeq[byte](messageLen)
|
||||
copyMem(messageNim[0].addr, messageC, messageLen.Natural)
|
||||
else:
|
||||
messageNim = @[]
|
||||
|
||||
let wrapResult = wrapOutgoingMessage(rm, messageNim, messageId)
|
||||
if wrapResult.isOk:
|
||||
let (wrappedDataPtr, wrappedDataLen) = allocSeqByte(wrapResult.get())
|
||||
return CWrapResult(
|
||||
base_result: toCResultOk(),
|
||||
message: wrappedDataPtr,
|
||||
message_len: wrappedDataLen
|
||||
)
|
||||
return true
|
||||
except:
|
||||
lastError = ReliabilityError.reInternalError
|
||||
return false
|
||||
|
||||
# Wrapper for creating a ReliabilityManager
|
||||
proc safeNewReliabilityManager(channelId: string, config: ReliabilityConfig = defaultConfig()): ReliabilityManager =
|
||||
let res = newReliabilityManager(channelId, config)
|
||||
if res.isOk:
|
||||
return res.get
|
||||
else:
|
||||
lastError = res.error
|
||||
return nil
|
||||
return CWrapResult(base_result: toCResultErr(wrapResult.error))
|
||||
|
||||
# Wrapper for wrapping outgoing messages
|
||||
proc safeWrapOutgoingMessage(rm: ReliabilityManager, message: seq[byte], messageId: MessageID): seq[byte] =
|
||||
if rm == nil:
|
||||
lastError = ReliabilityError.reInvalidArgument
|
||||
return @[]
|
||||
proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize): CUnwrapResult {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
if handle == nil:
|
||||
return CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
|
||||
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
|
||||
|
||||
let res = rm.wrapOutgoingMessage(message, messageId)
|
||||
if res.isOk:
|
||||
return res.get
|
||||
if messageC == nil and messageLen > 0:
|
||||
return CUnwrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0"))
|
||||
|
||||
var messageNim: seq[byte]
|
||||
if messageLen > 0:
|
||||
messageNim = newSeq[byte](messageLen)
|
||||
copyMem(messageNim[0].addr, messageC, messageLen.Natural)
|
||||
else:
|
||||
lastError = res.error
|
||||
return @[]
|
||||
messageNim = @[]
|
||||
|
||||
# Wrapper for unwrapping received messages
|
||||
proc safeUnwrapReceivedMessage(rm: ReliabilityManager, message: seq[byte]): tuple[message: seq[byte], missingDeps: seq[MessageID]] =
|
||||
if rm == nil:
|
||||
lastError = ReliabilityError.reInvalidArgument
|
||||
return (@[], @[])
|
||||
|
||||
let res = rm.unwrapReceivedMessage(message)
|
||||
if res.isOk:
|
||||
return res.get
|
||||
else:
|
||||
lastError = res.error
|
||||
return (@[], @[])
|
||||
|
||||
# Wrapper for marking dependencies as met
|
||||
proc safeMarkDependenciesMet(rm: ReliabilityManager, messageIds: seq[MessageID]): bool =
|
||||
if rm == nil:
|
||||
lastError = ReliabilityError.reInvalidArgument
|
||||
return false
|
||||
|
||||
let res = rm.markDependenciesMet(messageIds)
|
||||
if res.isOk:
|
||||
return true
|
||||
else:
|
||||
lastError = res.error
|
||||
return false
|
||||
|
||||
# Helper to create a Duration from milliseconds
|
||||
proc durationFromMs(ms: int64): Duration =
|
||||
initDuration(milliseconds = ms)
|
||||
|
||||
# Wrapper for creating a ReliabilityConfig with Duration values in milliseconds
|
||||
proc configFromMs(
|
||||
bloomFilterCapacity: int = DefaultBloomFilterCapacity,
|
||||
bloomFilterErrorRate: float = DefaultBloomFilterErrorRate,
|
||||
bloomFilterWindowMs: int64 = 3600000, # 1 hour default
|
||||
maxMessageHistory: int = DefaultMaxMessageHistory,
|
||||
maxCausalHistory: int = DefaultMaxCausalHistory,
|
||||
resendIntervalMs: int64 = 60000, # 1 minute default
|
||||
maxResendAttempts: int = DefaultMaxResendAttempts,
|
||||
syncMessageIntervalMs: int64 = 30000, # 30 seconds default
|
||||
bufferSweepIntervalMs: int64 = 60000 # 1 minute default
|
||||
): ReliabilityConfig =
|
||||
var config = ReliabilityConfig(
|
||||
bloomFilterCapacity: bloomFilterCapacity,
|
||||
bloomFilterErrorRate: bloomFilterErrorRate,
|
||||
bloomFilterWindow: durationFromMs(bloomFilterWindowMs),
|
||||
maxMessageHistory: maxMessageHistory,
|
||||
maxCausalHistory: maxCausalHistory,
|
||||
resendInterval: durationFromMs(resendIntervalMs),
|
||||
maxResendAttempts: maxResendAttempts,
|
||||
syncMessageInterval: durationFromMs(syncMessageIntervalMs),
|
||||
bufferSweepInterval: durationFromMs(bufferSweepIntervalMs)
|
||||
let unwrapResult = unwrapReceivedMessage(rm, messageNim)
|
||||
if unwrapResult.isOk:
|
||||
let (unwrappedContent, missingDepsNim) = unwrapResult.get()
|
||||
let (contentPtr, contentLen) = allocSeqByte(unwrappedContent)
|
||||
let (depsPtr, depsCount) = allocSeqCString(missingDepsNim)
|
||||
return CUnwrapResult(
|
||||
base_result: toCResultOk(),
|
||||
message: contentPtr,
|
||||
message_len: contentLen,
|
||||
missing_deps: depsPtr,
|
||||
missing_deps_count: depsCount
|
||||
)
|
||||
return config
|
||||
else:
|
||||
return CUnwrapResult(base_result: toCResultErr(unwrapResult.error))
|
||||
|
||||
# Helper to parse comma-separated string into seq[MessageID]
|
||||
proc parseMessageIDs*(commaSeparated: string): seq[MessageID] =
|
||||
if commaSeparated.len == 0:
|
||||
return @[]
|
||||
return commaSeparated.split(',')
|
||||
proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr ptr cstring, count: csize): CResult {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
if handle == nil:
|
||||
return toCResultErrStr("ReliabilityManager handle is NULL")
|
||||
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
|
||||
|
||||
# Constants
|
||||
exportConsts:
|
||||
DefaultBloomFilterCapacity
|
||||
DefaultBloomFilterErrorRate
|
||||
DefaultMaxMessageHistory
|
||||
DefaultMaxCausalHistory
|
||||
DefaultMaxResendAttempts
|
||||
MaxMessageSize
|
||||
if messageIDsC == nil and count > 0:
|
||||
return toCResultErrStr("MessageIDs pointer is NULL but count > 0")
|
||||
|
||||
# Enums
|
||||
exportEnums:
|
||||
ReliabilityError
|
||||
var messageIDsNim = newSeq[string](count)
|
||||
for i in 0..<count:
|
||||
let currentCStr = cast[cstring](messageIDsC[i])
|
||||
if currentCStr != nil:
|
||||
messageIDsNim[i] = $currentCStr
|
||||
else:
|
||||
return toCResultErrStr("NULL message ID found in array")
|
||||
|
||||
# Helper procs
|
||||
exportProcs:
|
||||
checkError
|
||||
takeError
|
||||
configFromMs
|
||||
durationFromMs
|
||||
parseMessageIDs
|
||||
registerMessageReadyCallback
|
||||
registerMessageSentCallback
|
||||
registerMissingDepsCallback
|
||||
registerPeriodicSyncCallback
|
||||
applyCallbacks
|
||||
let result = markDependenciesMet(rm, messageIDsNim)
|
||||
if result.isOk:
|
||||
return toCResultOk()
|
||||
else:
|
||||
return toCResultErr(result.error)
|
||||
|
||||
# Core objects
|
||||
exportObject ReliabilityConfig:
|
||||
constructor:
|
||||
configFromMs(int, float, int64, int, int, int64, int, int64, int64)
|
||||
proc RegisterCallbacks*(handle: CReliabilityManagerHandle,
|
||||
cMessageReady: pointer,
|
||||
cMessageSent: pointer,
|
||||
cMissingDependencies: pointer,
|
||||
cPeriodicSync: pointer,
|
||||
cUserDataPtr: pointer) {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
if handle == nil:
|
||||
echo "Error: Cannot register callbacks: NULL ReliabilityManager handle"
|
||||
return
|
||||
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
|
||||
# Lock the specific manager instance while modifying its fields
|
||||
withLock rm.lock:
|
||||
rm.cMessageReadyCallback = cMessageReady
|
||||
rm.cMessageSentCallback = cMessageSent
|
||||
rm.cMissingDependenciesCallback = cMissingDependencies
|
||||
rm.cPeriodicSyncCallback = cPeriodicSync
|
||||
rm.cUserData = cUserDataPtr
|
||||
|
||||
# Main ref object
|
||||
exportRefObject ReliabilityManager:
|
||||
constructor:
|
||||
safeNewReliabilityManager(string, ReliabilityConfig)
|
||||
procs:
|
||||
safeWrapOutgoingMessage(ReliabilityManager, seq[byte], MessageID)
|
||||
safeUnwrapReceivedMessage(ReliabilityManager, seq[byte])
|
||||
safeMarkDependenciesMet(ReliabilityManager, seq[MessageID])
|
||||
checkUnacknowledgedMessages(ReliabilityManager)
|
||||
startPeriodicTasks(ReliabilityManager)
|
||||
cleanup(ReliabilityManager)
|
||||
getMessageHistory(ReliabilityManager)
|
||||
getOutgoingBuffer(ReliabilityManager)
|
||||
getIncomingBuffer(ReliabilityManager)
|
||||
proc StartPeriodicTasks*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl, gcsafe.} =
|
||||
if handle == nil:
|
||||
echo "Error: Cannot start periodic tasks: NULL ReliabilityManager handle"
|
||||
return
|
||||
let rm = cast[ReliabilityManager](handle) # Cast opaque pointer
|
||||
startPeriodicTasks(rm)
|
||||
|
||||
# Sequences
|
||||
exportSeq seq[byte]:
|
||||
discard
|
||||
# --- Memory Freeing Functions - Added cdecl ---
|
||||
|
||||
exportSeq seq[MessageID]:
|
||||
discard
|
||||
proc FreeCResultError*(result: CResult) {.exportc, dynlib, gcsafe, cdecl.} =
|
||||
freeCString(result.error_message)
|
||||
|
||||
# Finally generate the files
|
||||
writeFiles("bindings/generated", "sds_bindings")
|
||||
proc FreeCWrapResult*(result: CWrapResult) {.exportc, dynlib, gcsafe, cdecl.} =
|
||||
freeCString(result.base_result.error_message)
|
||||
freeSeqByte(result.message)
|
||||
|
||||
proc FreeCUnwrapResult*(result: CUnwrapResult) {.exportc, dynlib, gcsafe, cdecl.} =
|
||||
freeCString(result.base_result.error_message)
|
||||
freeSeqByte(result.message)
|
||||
freeSeqCString(result.missing_deps, result.missing_deps_count)
|
||||
|
||||
BIN
bindings/generated/libbindings.dylib
Executable file
BIN
bindings/generated/libbindings.dylib
Executable file
Binary file not shown.
@ -9,7 +9,6 @@ srcDir = "src"
|
||||
requires "nim >= 2.0.8"
|
||||
requires "chronicles"
|
||||
requires "libp2p"
|
||||
requires "genny >= 0.1.0"
|
||||
|
||||
# Tasks
|
||||
task test, "Run the test suite":
|
||||
|
||||
307
sds_wrapper.go
Normal file
307
sds_wrapper.go
Normal file
@ -0,0 +1,307 @@
|
||||
package main
|
||||
|
||||
/*
|
||||
#cgo CFLAGS: -I${SRCDIR}/bindings
|
||||
#cgo LDFLAGS: -L${SRCDIR}/bindings/generated -lbindings
|
||||
#cgo LDFLAGS: -Wl,-rpath,${SRCDIR}/bindings/generated
|
||||
|
||||
#include <stdlib.h> // For C.free
|
||||
#include "bindings/bindings.h" // Update include path
|
||||
|
||||
// Forward declarations for Go callback functions exported to C
|
||||
// These are the functions Nim will eventually call via the pointers we give it.
|
||||
extern void goMessageReadyCallback(char* messageID);
|
||||
extern void goMessageSentCallback(char* messageID);
|
||||
extern void goMissingDependenciesCallback(char* messageID, char** missingDeps, size_t missingDepsCount);
|
||||
extern void goPeriodicSyncCallback();
|
||||
|
||||
// Helper function to call the C memory freeing functions
|
||||
static void callFreeCResultError(CResult res) { FreeCResultError(res); }
|
||||
static void callFreeCWrapResult(CWrapResult res) { FreeCWrapResult(res); }
|
||||
static void callFreeCUnwrapResult(CUnwrapResult res) { FreeCUnwrapResult(res); }
|
||||
|
||||
*/
|
||||
import "C"
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// --- Go Types ---
|
||||
|
||||
// ReliabilityManagerHandle represents the opaque handle to the Nim object
|
||||
type ReliabilityManagerHandle unsafe.Pointer
|
||||
|
||||
// MessageID is a type alias for string for clarity
|
||||
type MessageID string
|
||||
|
||||
// Callbacks holds the Go functions to be called by the Nim library
|
||||
type Callbacks struct {
|
||||
OnMessageReady func(messageId MessageID)
|
||||
OnMessageSent func(messageId MessageID)
|
||||
OnMissingDependencies func(messageId MessageID, missingDeps []MessageID)
|
||||
OnPeriodicSync func()
|
||||
}
|
||||
|
||||
// Global map to store callbacks associated with handles (necessary due to cgo limitations)
|
||||
var (
|
||||
callbackRegistry = make(map[ReliabilityManagerHandle]*Callbacks)
|
||||
registryMutex sync.RWMutex
|
||||
)
|
||||
|
||||
// --- Go Wrapper Functions ---
|
||||
|
||||
// NewReliabilityManager creates a new instance of the Nim ReliabilityManager
|
||||
func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) {
|
||||
cChannelId := C.CString(channelId)
|
||||
defer C.free(unsafe.Pointer(cChannelId))
|
||||
|
||||
handle := C.NewReliabilityManager(cChannelId)
|
||||
if handle == nil {
|
||||
// Note: Nim side currently just prints to stdout on creation failure
|
||||
return nil, errors.New("failed to create ReliabilityManager (check Nim logs/stdout)")
|
||||
}
|
||||
return ReliabilityManagerHandle(handle), nil
|
||||
}
|
||||
|
||||
// CleanupReliabilityManager frees the resources associated with the handle
|
||||
func CleanupReliabilityManager(handle ReliabilityManagerHandle) {
|
||||
if handle == nil {
|
||||
return
|
||||
}
|
||||
registryMutex.Lock()
|
||||
delete(callbackRegistry, handle)
|
||||
registryMutex.Unlock()
|
||||
C.CleanupReliabilityManager(unsafe.Pointer(handle))
|
||||
}
|
||||
|
||||
// ResetReliabilityManager resets the state of the manager
|
||||
func ResetReliabilityManager(handle ReliabilityManagerHandle) error {
|
||||
if handle == nil {
|
||||
return errors.New("handle is nil")
|
||||
}
|
||||
cResult := C.ResetReliabilityManager(unsafe.Pointer(handle))
|
||||
if !cResult.is_ok {
|
||||
errMsg := C.GoString(cResult.error_message)
|
||||
C.callFreeCResultError(cResult) // Free the error message
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WrapOutgoingMessage wraps a message with reliability metadata
|
||||
func WrapOutgoingMessage(handle ReliabilityManagerHandle, message []byte, messageId MessageID) ([]byte, error) {
|
||||
if handle == nil {
|
||||
return nil, errors.New("handle is nil")
|
||||
}
|
||||
cMessageId := C.CString(string(messageId))
|
||||
defer C.free(unsafe.Pointer(cMessageId))
|
||||
|
||||
var cMessagePtr unsafe.Pointer
|
||||
if len(message) > 0 {
|
||||
cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed
|
||||
defer C.free(cMessagePtr)
|
||||
} else {
|
||||
cMessagePtr = nil
|
||||
}
|
||||
cMessageLen := C.size_t(len(message))
|
||||
|
||||
cWrapResult := C.WrapOutgoingMessage(unsafe.Pointer(handle), cMessagePtr, cMessageLen, cMessageId)
|
||||
|
||||
if !cWrapResult.base_result.is_ok {
|
||||
errMsg := C.GoString(cWrapResult.base_result.error_message)
|
||||
C.callFreeCWrapResult(cWrapResult) // Free error and potentially allocated message
|
||||
return nil, errors.New(errMsg)
|
||||
}
|
||||
|
||||
// Copy the wrapped message from C memory to Go slice
|
||||
// Explicitly cast the message pointer to unsafe.Pointer
|
||||
wrappedMessage := C.GoBytes(unsafe.Pointer(cWrapResult.message), C.int(cWrapResult.message_len))
|
||||
C.callFreeCWrapResult(cWrapResult) // Free the C-allocated message buffer
|
||||
|
||||
return wrappedMessage, nil
|
||||
}
|
||||
|
||||
// UnwrapReceivedMessage unwraps a received message
|
||||
func UnwrapReceivedMessage(handle ReliabilityManagerHandle, message []byte) ([]byte, []MessageID, error) {
|
||||
if handle == nil {
|
||||
return nil, nil, errors.New("handle is nil")
|
||||
}
|
||||
|
||||
var cMessagePtr unsafe.Pointer
|
||||
if len(message) > 0 {
|
||||
cMessagePtr = C.CBytes(message)
|
||||
defer C.free(cMessagePtr)
|
||||
} else {
|
||||
cMessagePtr = nil
|
||||
}
|
||||
cMessageLen := C.size_t(len(message))
|
||||
|
||||
cUnwrapResult := C.UnwrapReceivedMessage(unsafe.Pointer(handle), cMessagePtr, cMessageLen)
|
||||
|
||||
if !cUnwrapResult.base_result.is_ok {
|
||||
errMsg := C.GoString(cUnwrapResult.base_result.error_message)
|
||||
C.callFreeCUnwrapResult(cUnwrapResult) // Free error and potentially allocated fields
|
||||
return nil, nil, errors.New(errMsg)
|
||||
}
|
||||
|
||||
// Copy unwrapped message content
|
||||
// Explicitly cast the message pointer to unsafe.Pointer
|
||||
unwrappedContent := C.GoBytes(unsafe.Pointer(cUnwrapResult.message), C.int(cUnwrapResult.message_len))
|
||||
|
||||
// Copy missing dependencies
|
||||
missingDeps := make([]MessageID, cUnwrapResult.missing_deps_count)
|
||||
if cUnwrapResult.missing_deps_count > 0 {
|
||||
// Convert C array of C strings to Go slice of strings
|
||||
cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(cUnwrapResult.missing_deps))[:cUnwrapResult.missing_deps_count:cUnwrapResult.missing_deps_count]
|
||||
for i, s := range cDepsArray {
|
||||
missingDeps[i] = MessageID(C.GoString(s))
|
||||
}
|
||||
}
|
||||
|
||||
C.callFreeCUnwrapResult(cUnwrapResult) // Free C-allocated message, deps array, and strings
|
||||
|
||||
return unwrappedContent, missingDeps, nil
|
||||
}
|
||||
|
||||
// MarkDependenciesMet informs the library that dependencies are met
|
||||
func MarkDependenciesMet(handle ReliabilityManagerHandle, messageIDs []MessageID) error {
|
||||
if handle == nil {
|
||||
return errors.New("handle is nil")
|
||||
}
|
||||
if len(messageIDs) == 0 {
|
||||
return nil // Nothing to do
|
||||
}
|
||||
|
||||
// Convert Go string slice to C array of C strings (char**)
|
||||
cMessageIDs := make([]*C.char, len(messageIDs))
|
||||
for i, id := range messageIDs {
|
||||
cMessageIDs[i] = C.CString(string(id))
|
||||
defer C.free(unsafe.Pointer(cMessageIDs[i])) // Ensure each CString is freed
|
||||
}
|
||||
|
||||
// Create a pointer (**C.char) to the first element of the slice
|
||||
var cMessageIDsPtr **C.char
|
||||
if len(cMessageIDs) > 0 {
|
||||
cMessageIDsPtr = &cMessageIDs[0]
|
||||
} else {
|
||||
cMessageIDsPtr = nil // Handle empty slice case
|
||||
}
|
||||
|
||||
// Pass the address of the pointer variable (&cMessageIDsPtr), which is of type ***C.char
|
||||
cResult := C.MarkDependenciesMet(unsafe.Pointer(handle), &cMessageIDsPtr, C.size_t(len(messageIDs)))
|
||||
|
||||
if !cResult.is_ok {
|
||||
errMsg := C.GoString(cResult.error_message)
|
||||
C.callFreeCResultError(cResult)
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterCallbacks sets the Go callback functions
|
||||
func RegisterCallbacks(handle ReliabilityManagerHandle, callbacks Callbacks) error {
|
||||
if handle == nil {
|
||||
return errors.New("handle is nil")
|
||||
}
|
||||
|
||||
registryMutex.Lock()
|
||||
callbackRegistry[handle] = &callbacks
|
||||
registryMutex.Unlock()
|
||||
|
||||
// Pass the C relay functions to Nim
|
||||
// Nim will store these function pointers. When Nim calls them, they execute the C relay,
|
||||
// Pass pointers to the exported Go functions directly.
|
||||
// Nim expects function pointers matching the C callback typedefs.
|
||||
// Cgo makes the exported Go functions available as C function pointers.
|
||||
// Cast these function pointers to unsafe.Pointer to match the void* expected by the C function.
|
||||
C.RegisterCallbacks(
|
||||
unsafe.Pointer(handle),
|
||||
unsafe.Pointer(C.goMessageReadyCallback),
|
||||
unsafe.Pointer(C.goMessageSentCallback),
|
||||
unsafe.Pointer(C.goMissingDependenciesCallback),
|
||||
unsafe.Pointer(C.goPeriodicSyncCallback),
|
||||
unsafe.Pointer(handle), // Pass handle as user_data
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartPeriodicTasks starts the background tasks in the Nim library
|
||||
func StartPeriodicTasks(handle ReliabilityManagerHandle) error {
|
||||
if handle == nil {
|
||||
return errors.New("handle is nil")
|
||||
}
|
||||
C.StartPeriodicTasks(unsafe.Pointer(handle))
|
||||
// Assuming StartPeriodicTasks doesn't return an error status in C API
|
||||
return nil
|
||||
}
|
||||
|
||||
// --- Go Callback Implementations (Exported to C) ---
|
||||
|
||||
func goMessageReadyCallback(messageID *C.char) {
|
||||
msgIdStr := C.GoString(messageID)
|
||||
registryMutex.RLock()
|
||||
defer registryMutex.RUnlock()
|
||||
|
||||
// Find the correct Go callback based on handle (this is tricky without handle passed)
|
||||
// For now, iterate through all registered callbacks. This is NOT ideal for multiple managers.
|
||||
// A better approach would involve passing the handle back through user_data if possible,
|
||||
// or maintaining a single global callback handler if only one manager instance is expected.
|
||||
// Let's assume a single instance for simplicity for now.
|
||||
for _, callbacks := range callbackRegistry {
|
||||
if callbacks != nil && callbacks.OnMessageReady != nil {
|
||||
// Run in a goroutine to avoid blocking the C thread
|
||||
go callbacks.OnMessageReady(MessageID(msgIdStr))
|
||||
}
|
||||
}
|
||||
fmt.Printf("Go: Message Ready: %s\n", msgIdStr) // Debug print
|
||||
}
|
||||
|
||||
func goMessageSentCallback(messageID *C.char) {
|
||||
msgIdStr := C.GoString(messageID)
|
||||
registryMutex.RLock()
|
||||
defer registryMutex.RUnlock()
|
||||
|
||||
for _, callbacks := range callbackRegistry {
|
||||
if callbacks != nil && callbacks.OnMessageSent != nil {
|
||||
go callbacks.OnMessageSent(MessageID(msgIdStr))
|
||||
}
|
||||
}
|
||||
fmt.Printf("Go: Message Sent: %s\n", msgIdStr) // Debug print
|
||||
}
|
||||
|
||||
func goMissingDependenciesCallback(messageID *C.char, missingDeps **C.char, missingDepsCount C.size_t) {
|
||||
msgIdStr := C.GoString(messageID)
|
||||
deps := make([]MessageID, missingDepsCount)
|
||||
if missingDepsCount > 0 {
|
||||
// Convert C array of C strings to Go slice
|
||||
cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(missingDeps))[:missingDepsCount:missingDepsCount]
|
||||
for i, s := range cDepsArray {
|
||||
deps[i] = MessageID(C.GoString(s))
|
||||
}
|
||||
}
|
||||
|
||||
registryMutex.RLock()
|
||||
defer registryMutex.RUnlock()
|
||||
|
||||
for _, callbacks := range callbackRegistry {
|
||||
if callbacks != nil && callbacks.OnMissingDependencies != nil {
|
||||
go callbacks.OnMissingDependencies(MessageID(msgIdStr), deps)
|
||||
}
|
||||
}
|
||||
fmt.Printf("Go: Missing Deps for %s: %v\n", msgIdStr, deps) // Debug print
|
||||
}
|
||||
|
||||
func goPeriodicSyncCallback() {
|
||||
registryMutex.RLock()
|
||||
defer registryMutex.RUnlock()
|
||||
|
||||
for _, callbacks := range callbackRegistry {
|
||||
if callbacks != nil && callbacks.OnPeriodicSync != nil {
|
||||
go callbacks.OnPeriodicSync()
|
||||
}
|
||||
}
|
||||
fmt.Println("Go: Periodic Sync Requested") // Debug print
|
||||
}
|
||||
262
sds_wrapper_test.go
Normal file
262
sds_wrapper_test.go
Normal file
@ -0,0 +1,262 @@
|
||||
package main
|
||||
import (
|
||||
// "fmt"
|
||||
// "sync"
|
||||
"testing"
|
||||
// "time"
|
||||
)
|
||||
|
||||
// Test basic creation, cleanup, and reset
|
||||
func TestLifecycle(t *testing.T) {
|
||||
channelID := "test-lifecycle"
|
||||
handle, err := NewReliabilityManager(channelID)
|
||||
if err != nil {
|
||||
t.Fatalf("NewReliabilityManager failed: %v", err)
|
||||
}
|
||||
if handle == nil {
|
||||
t.Fatal("NewReliabilityManager returned a nil handle")
|
||||
}
|
||||
defer CleanupReliabilityManager(handle) // Ensure cleanup even on test failure
|
||||
|
||||
err = ResetReliabilityManager(handle)
|
||||
if err != nil {
|
||||
t.Errorf("ResetReliabilityManager failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test wrapping and unwrapping a simple message
|
||||
func TestWrapUnwrap(t *testing.T) {
|
||||
channelID := "test-wrap-unwrap"
|
||||
handle, err := NewReliabilityManager(channelID)
|
||||
if err != nil {
|
||||
t.Fatalf("NewReliabilityManager failed: %v", err)
|
||||
}
|
||||
defer CleanupReliabilityManager(handle)
|
||||
|
||||
originalPayload := []byte("hello reliability")
|
||||
messageID := MessageID("msg-wrap-1")
|
||||
|
||||
wrappedMsg, err := WrapOutgoingMessage(handle, originalPayload, messageID)
|
||||
if err != nil {
|
||||
t.Fatalf("WrapOutgoingMessage failed: %v", err)
|
||||
}
|
||||
if len(wrappedMsg) == 0 {
|
||||
t.Fatal("WrapOutgoingMessage returned empty bytes")
|
||||
}
|
||||
|
||||
// Simulate receiving the wrapped message
|
||||
unwrappedPayload, missingDeps, err := UnwrapReceivedMessage(handle, wrappedMsg)
|
||||
if err != nil {
|
||||
t.Fatalf("UnwrapReceivedMessage failed: %v", err)
|
||||
}
|
||||
|
||||
if string(unwrappedPayload) != string(originalPayload) {
|
||||
t.Errorf("Unwrapped payload mismatch: got %q, want %q", unwrappedPayload, originalPayload)
|
||||
}
|
||||
if len(missingDeps) != 0 {
|
||||
t.Errorf("Expected 0 missing dependencies, got %d: %v", len(missingDeps), missingDeps)
|
||||
}
|
||||
}
|
||||
|
||||
// // Test dependency handling
|
||||
// func TestDependencies(t *testing.T) {
|
||||
// channelID := "test-deps"
|
||||
// handle, err := NewReliabilityManager(channelID)
|
||||
// if err != nil {
|
||||
// t.Fatalf("NewReliabilityManager failed: %v", err)
|
||||
// }
|
||||
// defer CleanupReliabilityManager(handle)
|
||||
|
||||
// // 1. Send message 1 (will become a dependency)
|
||||
// payload1 := []byte("message one")
|
||||
// msgID1 := MessageID("msg-dep-1")
|
||||
// wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
|
||||
// if err != nil {
|
||||
// t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
|
||||
// }
|
||||
// // Simulate receiving msg1 to add it to history (implicitly acknowledges it)
|
||||
// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
|
||||
// if err != nil {
|
||||
// t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
|
||||
// }
|
||||
|
||||
// // 2. Send message 2 (depends on message 1 implicitly via causal history)
|
||||
// payload2 := []byte("message two")
|
||||
// msgID2 := MessageID("msg-dep-2")
|
||||
// wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
|
||||
// if err != nil {
|
||||
// t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
|
||||
// }
|
||||
|
||||
// // 3. Create a new manager to simulate a different peer receiving msg2 without msg1
|
||||
// handle2, err := NewReliabilityManager(channelID) // Same channel ID
|
||||
// if err != nil {
|
||||
// t.Fatalf("NewReliabilityManager (2) failed: %v", err)
|
||||
// }
|
||||
// defer CleanupReliabilityManager(handle2)
|
||||
|
||||
// // 4. Unwrap message 2 on the second manager - should report msg1 as missing
|
||||
// _, missingDeps, err := UnwrapReceivedMessage(handle2, wrappedMsg2)
|
||||
// if err != nil {
|
||||
// t.Fatalf("UnwrapReceivedMessage (2) on handle2 failed: %v", err)
|
||||
// }
|
||||
|
||||
// if len(missingDeps) == 0 {
|
||||
// t.Fatalf("Expected missing dependencies, got none")
|
||||
// }
|
||||
// foundDep1 := false
|
||||
// for _, dep := range missingDeps {
|
||||
// if dep == msgID1 {
|
||||
// foundDep1 = true
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// if !foundDep1 {
|
||||
// t.Errorf("Expected missing dependency %q, got %v", msgID1, missingDeps)
|
||||
// }
|
||||
|
||||
// // 5. Mark the dependency as met
|
||||
// err = MarkDependenciesMet(handle2, []MessageID{msgID1})
|
||||
// if err != nil {
|
||||
// t.Fatalf("MarkDependenciesMet failed: %v", err)
|
||||
// }
|
||||
|
||||
// // Ideally, we'd check if the message is now moved from an internal buffer,
|
||||
// // but the current API doesn't expose buffer state. We rely on callbacks for this.
|
||||
// }
|
||||
|
||||
// // Test callbacks
|
||||
// func TestCallbacks(t *testing.T) {
|
||||
// channelID := "test-callbacks"
|
||||
// handle, err := NewReliabilityManager(channelID)
|
||||
// if err != nil {
|
||||
// t.Fatalf("NewReliabilityManager failed: %v", err)
|
||||
// }
|
||||
// defer CleanupReliabilityManager(handle)
|
||||
|
||||
// var wg sync.WaitGroup
|
||||
// receivedReady := make(map[MessageID]bool)
|
||||
// receivedSent := make(map[MessageID]bool)
|
||||
// receivedMissing := make(map[MessageID][]MessageID)
|
||||
// syncRequested := false
|
||||
// var cbMutex sync.Mutex // Protect access to callback tracking maps/vars
|
||||
|
||||
// callbacks := Callbacks{
|
||||
// OnMessageReady: func(messageId MessageID) {
|
||||
// fmt.Printf("Test: OnMessageReady received: %s\n", messageId)
|
||||
// cbMutex.Lock()
|
||||
// receivedReady[messageId] = true
|
||||
// cbMutex.Unlock()
|
||||
// wg.Done()
|
||||
// },
|
||||
// OnMessageSent: func(messageId MessageID) {
|
||||
// fmt.Printf("Test: OnMessageSent received: %s\n", messageId)
|
||||
// cbMutex.Lock()
|
||||
// receivedSent[messageId] = true
|
||||
// cbMutex.Unlock()
|
||||
// wg.Done()
|
||||
// },
|
||||
// OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) {
|
||||
// fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps)
|
||||
// cbMutex.Lock()
|
||||
// receivedMissing[messageId] = missingDeps
|
||||
// cbMutex.Unlock()
|
||||
// wg.Done()
|
||||
// },
|
||||
// OnPeriodicSync: func() {
|
||||
// fmt.Println("Test: OnPeriodicSync received")
|
||||
// cbMutex.Lock()
|
||||
// syncRequested = true
|
||||
// cbMutex.Unlock()
|
||||
// // Don't wg.Done() here, it might be called multiple times
|
||||
// },
|
||||
// }
|
||||
|
||||
// err = RegisterCallbacks(handle, callbacks)
|
||||
// if err != nil {
|
||||
// t.Fatalf("RegisterCallbacks failed: %v", err)
|
||||
// }
|
||||
|
||||
// // Start tasks AFTER registering callbacks
|
||||
// err = StartPeriodicTasks(handle)
|
||||
// if err != nil {
|
||||
// t.Fatalf("StartPeriodicTasks failed: %v", err)
|
||||
// }
|
||||
|
||||
// // --- Test Scenario ---
|
||||
|
||||
// // 1. Send msg1
|
||||
// wg.Add(1) // Expect OnMessageSent for msg1 eventually
|
||||
// payload1 := []byte("callback test 1")
|
||||
// msgID1 := MessageID("cb-msg-1")
|
||||
// wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
|
||||
// if err != nil {
|
||||
// t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
|
||||
// }
|
||||
|
||||
// // 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history)
|
||||
// wg.Add(1) // Expect OnMessageReady for msg1
|
||||
// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
|
||||
// if err != nil {
|
||||
// t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
|
||||
// }
|
||||
|
||||
// // 3. Send msg2 (depends on msg1)
|
||||
// wg.Add(1) // Expect OnMessageSent for msg2 eventually
|
||||
// payload2 := []byte("callback test 2")
|
||||
// msgID2 := MessageID("cb-msg-2")
|
||||
// wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
|
||||
// if err != nil {
|
||||
// t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
|
||||
// }
|
||||
|
||||
// // 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2)
|
||||
// wg.Add(1) // Expect OnMessageReady for msg2
|
||||
// _, _, err = UnwrapReceivedMessage(handle, wrappedMsg2)
|
||||
// if err != nil {
|
||||
// t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err)
|
||||
// }
|
||||
|
||||
// // --- Verification ---
|
||||
// // Wait for expected callbacks with a timeout
|
||||
// waitTimeout(&wg, 5*time.Second, t)
|
||||
|
||||
// cbMutex.Lock()
|
||||
// defer cbMutex.Unlock()
|
||||
|
||||
// if !receivedReady[msgID1] {
|
||||
// t.Errorf("OnMessageReady not called for %s", msgID1)
|
||||
// }
|
||||
// if !receivedReady[msgID2] {
|
||||
// t.Errorf("OnMessageReady not called for %s", msgID2)
|
||||
// }
|
||||
// if !receivedSent[msgID1] {
|
||||
// t.Errorf("OnMessageSent not called for %s", msgID1)
|
||||
// }
|
||||
// if !receivedSent[msgID2] {
|
||||
// t.Errorf("OnMessageSent not called for %s", msgID2)
|
||||
// }
|
||||
// // We didn't explicitly test missing deps in this path
|
||||
// if len(receivedMissing) > 0 {
|
||||
// t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing)
|
||||
// }
|
||||
// // Periodic sync is harder to guarantee in a short test, just check if it was ever true
|
||||
// if !syncRequested {
|
||||
// t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout")
|
||||
// }
|
||||
// }
|
||||
|
||||
// // Helper function to wait for WaitGroup with a timeout
|
||||
// func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) {
|
||||
// c := make(chan struct{})
|
||||
// go func() {
|
||||
// defer close(c)
|
||||
// wg.Wait()
|
||||
// }()
|
||||
// select {
|
||||
// case <-c:
|
||||
// // Completed normally
|
||||
// case <-time.After(timeout):
|
||||
// t.Fatalf("Timed out waiting for callbacks")
|
||||
// }
|
||||
// }
|
||||
@ -24,11 +24,19 @@ type
|
||||
channelId*: string
|
||||
config*: ReliabilityConfig
|
||||
lock*: Lock
|
||||
# Nim callbacks (used internally or if not using C bindings)
|
||||
onMessageReady*: proc(messageId: MessageID) {.gcsafe.}
|
||||
onMessageSent*: proc(messageId: MessageID) {.gcsafe.}
|
||||
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
|
||||
onPeriodicSync*: PeriodicSyncCallback
|
||||
|
||||
# C callback pointers and user data (for FFI)
|
||||
cMessageReadyCallback*: pointer
|
||||
cMessageSentCallback*: pointer
|
||||
cMissingDependenciesCallback*: pointer
|
||||
cPeriodicSyncCallback*: pointer
|
||||
cUserData*: pointer
|
||||
|
||||
ReliabilityError* = enum
|
||||
reInvalidArgument
|
||||
reOutOfMemory
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user