mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-04 07:03:09 +00:00
Merge pull request #7 from waku-org/gabrielmer-feat-init-implementation
feat: libsds refactor
This commit is contained in:
commit
69a1872a2e
9
.gitignore
vendored
9
.gitignore
vendored
@ -5,3 +5,12 @@ nph
|
|||||||
docs
|
docs
|
||||||
for_reference
|
for_reference
|
||||||
do_not_commit
|
do_not_commit
|
||||||
|
build/*
|
||||||
|
sds.nims
|
||||||
|
/.update.timestamp
|
||||||
|
|
||||||
|
# Nimbus Build System
|
||||||
|
nimbus-build-system.paths
|
||||||
|
|
||||||
|
# Nimble packages
|
||||||
|
/vendor/.nimble
|
||||||
|
|||||||
55
.gitmodules
vendored
Normal file
55
.gitmodules
vendored
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
[submodule "vendor/nimbus-build-system"]
|
||||||
|
path = vendor/nimbus-build-system
|
||||||
|
url = https://github.com/status-im/nimbus-build-system.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-chronos"]
|
||||||
|
path = vendor/nim-chronos
|
||||||
|
url = https://github.com/status-im/nim-chronos.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-results"]
|
||||||
|
path = vendor/nim-results
|
||||||
|
url = https://github.com/arnetheduck/nim-results.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-stew"]
|
||||||
|
path = vendor/nim-stew
|
||||||
|
url = https://github.com/status-im/nim-stew.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-chronicles"]
|
||||||
|
path = vendor/nim-chronicles
|
||||||
|
url = https://github.com/status-im/nim-chronicles.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-faststreams"]
|
||||||
|
path = vendor/nim-faststreams
|
||||||
|
url = https://github.com/status-im/nim-faststreams.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-json-serialization"]
|
||||||
|
path = vendor/nim-json-serialization
|
||||||
|
url = https://github.com/status-im/nim-json-serialization.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-serialization"]
|
||||||
|
path = vendor/nim-serialization
|
||||||
|
url = https://github.com/status-im/nim-serialization.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-taskpools"]
|
||||||
|
path = vendor/nim-taskpools
|
||||||
|
url = https://github.com/status-im/nim-taskpools.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-confutils"]
|
||||||
|
path = vendor/nim-confutils
|
||||||
|
url = https://github.com/status-im/nim-confutils.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-libp2p"]
|
||||||
|
path = vendor/nim-libp2p
|
||||||
|
url = https://github.com/vacp2p/nim-libp2p.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
51
Makefile
Normal file
51
Makefile
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
.PHONY: libsds
|
||||||
|
|
||||||
|
export BUILD_SYSTEM_DIR := vendor/nimbus-build-system
|
||||||
|
# we don't want an error here, so we can handle things later, in the ".DEFAULT" target
|
||||||
|
-include $(BUILD_SYSTEM_DIR)/makefiles/variables.mk
|
||||||
|
|
||||||
|
ifeq ($(NIM_PARAMS),)
|
||||||
|
# "variables.mk" was not included, so we update the submodules.
|
||||||
|
GIT_SUBMODULE_UPDATE := git submodule update --init --recursive
|
||||||
|
.DEFAULT:
|
||||||
|
+@ echo -e "Git submodules not found. Running '$(GIT_SUBMODULE_UPDATE)'.\n"; \
|
||||||
|
$(GIT_SUBMODULE_UPDATE); \
|
||||||
|
echo
|
||||||
|
# Now that the included *.mk files appeared, and are newer than this file, Make will restart itself:
|
||||||
|
# https://www.gnu.org/software/make/manual/make.html#Remaking-Makefiles
|
||||||
|
#
|
||||||
|
# After restarting, it will execute its original goal, so we don't have to start a child Make here
|
||||||
|
# with "$(MAKE) $(MAKECMDGOALS)". Isn't hidden control flow great?
|
||||||
|
|
||||||
|
else # "variables.mk" was included. Business as usual until the end of this file.
|
||||||
|
|
||||||
|
# default target, because it's the first one that doesn't start with '.'
|
||||||
|
all: | libsds
|
||||||
|
|
||||||
|
sds.nims:
|
||||||
|
ln -s sds.nimble $@
|
||||||
|
|
||||||
|
update: | update-common
|
||||||
|
rm -rf sds.nims && \
|
||||||
|
$(MAKE) sds.nims $(HANDLE_OUTPUT)
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -rf build
|
||||||
|
|
||||||
|
deps: | sds.nims
|
||||||
|
|
||||||
|
# must be included after the default target
|
||||||
|
-include $(BUILD_SYSTEM_DIR)/makefiles/targets.mk
|
||||||
|
|
||||||
|
STATIC ?= 0
|
||||||
|
|
||||||
|
libsds: deps
|
||||||
|
rm -f build/libsds*
|
||||||
|
ifeq ($(STATIC), 1)
|
||||||
|
echo -e $(BUILD_MSG) "build/$@.a" && \
|
||||||
|
$(ENV_SCRIPT) nim libsdsStatic $(NIM_PARAMS) sds.nims
|
||||||
|
else
|
||||||
|
echo -e $(BUILD_MSG) "build/$@.so" && \
|
||||||
|
$(ENV_SCRIPT) nim libsdsDynamic $(NIM_PARAMS) sds.nims
|
||||||
|
endif
|
||||||
|
endif
|
||||||
@ -1,132 +0,0 @@
|
|||||||
#ifndef BINDINGS_H
|
|
||||||
#define BINDINGS_H
|
|
||||||
|
|
||||||
#include <stddef.h> // For size_t
|
|
||||||
#include <stdint.h> // For standard integer types
|
|
||||||
#include <stdbool.h> // For bool type
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
// Opaque struct declaration (handle replaces direct pointer usage)
|
|
||||||
typedef struct ReliabilityManager ReliabilityManager; // Keep forward declaration
|
|
||||||
|
|
||||||
// Define MessageID as a C string
|
|
||||||
typedef const char* MessageID; // Keep const for the typedef itself
|
|
||||||
|
|
||||||
// --- Result Types ---
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
bool is_ok;
|
|
||||||
char* error_message;
|
|
||||||
} CResult;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
CResult base_result;
|
|
||||||
unsigned char* message;
|
|
||||||
size_t message_len;
|
|
||||||
MessageID* missing_deps;
|
|
||||||
size_t missing_deps_count;
|
|
||||||
} CUnwrapResult;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
CResult base_result;
|
|
||||||
unsigned char* message;
|
|
||||||
size_t message_len;
|
|
||||||
} CWrapResult;
|
|
||||||
|
|
||||||
|
|
||||||
// --- Callback Function Pointer Types ---
|
|
||||||
|
|
||||||
// Define event types (enum or constants)
|
|
||||||
typedef enum {
|
|
||||||
EVENT_MESSAGE_READY = 1,
|
|
||||||
EVENT_MESSAGE_SENT = 2,
|
|
||||||
EVENT_MISSING_DEPENDENCIES = 3,
|
|
||||||
EVENT_PERIODIC_SYNC = 4
|
|
||||||
} CEventType;
|
|
||||||
|
|
||||||
// Single callback type for all events
|
|
||||||
// Nim will call this, passing the handle and event-specific data
|
|
||||||
typedef void (*CEventCallback)(void* handle, CEventType eventType, void* data1, void* data2, size_t data3);
|
|
||||||
|
|
||||||
|
|
||||||
// --- Core API Functions ---
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Creates a new ReliabilityManager instance.
|
|
||||||
* @param channelId A unique identifier for the communication channel.
|
|
||||||
* @return An opaque handle (void*) representing the instance, or NULL on failure.
|
|
||||||
*/
|
|
||||||
void* NewReliabilityManager(char* channelId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Cleans up resources associated with a ReliabilityManager instance.
|
|
||||||
* @param handle The opaque handle (void*) of the instance to clean up.
|
|
||||||
*/
|
|
||||||
void CleanupReliabilityManager(void* handle);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Resets the ReliabilityManager instance.
|
|
||||||
* @param handle The opaque handle (void*) of the instance.
|
|
||||||
* @return CResult indicating success or failure.
|
|
||||||
*/
|
|
||||||
CResult ResetReliabilityManager(void* handle);
|
|
||||||
/**
|
|
||||||
* @brief Wraps an outgoing message.
|
|
||||||
* @param handle The opaque handle (void*) of the instance.
|
|
||||||
* @param message Pointer to the raw message content.
|
|
||||||
* @param messageLen Length of the raw message content.
|
|
||||||
* @param messageId A unique identifier for this message.
|
|
||||||
* @return CWrapResult containing the wrapped message or an error.
|
|
||||||
*/
|
|
||||||
CWrapResult WrapOutgoingMessage(void* handle, void* message, size_t messageLen, char* messageId);
|
|
||||||
/**
|
|
||||||
* @brief Unwraps a received message.
|
|
||||||
* @param handle The opaque handle (void*) of the instance.
|
|
||||||
* @param message Pointer to the received message data.
|
|
||||||
* @param messageLen Length of the received message data.
|
|
||||||
* @return CUnwrapResult containing the unwrapped content, missing dependencies, or an error.
|
|
||||||
*/
|
|
||||||
CUnwrapResult UnwrapReceivedMessage(void* handle, void* message, size_t messageLen);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Marks specified message dependencies as met.
|
|
||||||
* @param handle The opaque handle (void*) of the instance.
|
|
||||||
* @param messageIDs An array of message IDs to mark as met.
|
|
||||||
* @param count The number of message IDs in the array.
|
|
||||||
* @return CResult indicating success or failure.
|
|
||||||
*/
|
|
||||||
CResult MarkDependenciesMet(void* handle, char** messageIDs, size_t count); // Reverted to char**
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Registers callback functions.
|
|
||||||
* @param handle The opaque handle (void*) of the instance.
|
|
||||||
* @param messageReady Callback for when a message is ready.
|
|
||||||
* @param messageSent Callback for when an outgoing message is acknowledged.
|
|
||||||
* @param eventCallback The single callback function to handle all events.
|
|
||||||
* @param user_data A pointer to user-defined data (optional, could be managed in Go).
|
|
||||||
*/
|
|
||||||
void RegisterCallback(void* handle, CEventCallback eventCallback, void* user_data); // Renamed and simplified
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Starts the background periodic tasks.
|
|
||||||
* @param handle The opaque handle (void*) of the instance.
|
|
||||||
*/
|
|
||||||
void StartPeriodicTasks(void* handle);
|
|
||||||
|
|
||||||
|
|
||||||
// --- Memory Freeing Functions ---
|
|
||||||
|
|
||||||
void FreeCResultError(CResult result);
|
|
||||||
void FreeCWrapResult(CWrapResult result);
|
|
||||||
void FreeCUnwrapResult(CUnwrapResult result);
|
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
} // extern "C"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif // BINDINGS_H
|
|
||||||
@ -1,318 +0,0 @@
|
|||||||
import std/[locks, typetraits, tables] # Added tables
|
|
||||||
import chronos
|
|
||||||
import results
|
|
||||||
import ../src/[reliability, reliability_utils, message]
|
|
||||||
|
|
||||||
type
|
|
||||||
CReliabilityManagerHandle* = pointer
|
|
||||||
|
|
||||||
type
|
|
||||||
# Callback Types (Imported from C Header)
|
|
||||||
CEventType* {.importc: "CEventType", header: "bindings.h", pure.} = enum
|
|
||||||
EVENT_MESSAGE_READY = 1,
|
|
||||||
EVENT_MESSAGE_SENT = 2,
|
|
||||||
EVENT_MISSING_DEPENDENCIES = 3,
|
|
||||||
EVENT_PERIODIC_SYNC = 4
|
|
||||||
|
|
||||||
CEventCallback* = proc(handle: pointer, eventType: CEventType, data1: pointer, data2: pointer, data3: csize_t) {.cdecl.} # Use csize_t
|
|
||||||
|
|
||||||
CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object
|
|
||||||
is_ok*: bool
|
|
||||||
error_message*: cstring
|
|
||||||
|
|
||||||
CWrapResult* {.importc: "CWrapResult", header: "bindings.h", bycopy.} = object
|
|
||||||
base_result*: CResult
|
|
||||||
message*: pointer
|
|
||||||
message_len*: csize_t
|
|
||||||
|
|
||||||
CUnwrapResult* {.importc: "CUnwrapResult", header: "bindings.h", bycopy.} = object
|
|
||||||
base_result*: CResult
|
|
||||||
message*: pointer
|
|
||||||
message_len*: csize_t
|
|
||||||
missing_deps*: ptr cstring
|
|
||||||
missing_deps_count*: csize_t
|
|
||||||
|
|
||||||
# --- Callback Registry ---
|
|
||||||
type
|
|
||||||
CallbackRegistry = Table[CReliabilityManagerHandle, CEventCallback]
|
|
||||||
|
|
||||||
var
|
|
||||||
callbackRegistry: CallbackRegistry
|
|
||||||
registryLock: Lock
|
|
||||||
|
|
||||||
initLock(registryLock)
|
|
||||||
|
|
||||||
# --- Memory Management Helpers ---
|
|
||||||
|
|
||||||
proc allocCString*(s: string): cstring {.inline, gcsafe.} =
|
|
||||||
if s.len == 0: return nil
|
|
||||||
result = cast[cstring](allocShared(s.len + 1))
|
|
||||||
copyMem(result, s.cstring, s.len + 1)
|
|
||||||
|
|
||||||
proc allocSeqByte*(s: seq[byte]): (pointer, csize_t) {.inline, gcsafe.} =
|
|
||||||
if s.len == 0: return (nil, 0)
|
|
||||||
let len = s.len
|
|
||||||
let bufferPtr = allocShared(len)
|
|
||||||
if len > 0:
|
|
||||||
copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural)
|
|
||||||
return (bufferPtr, len.csize_t)
|
|
||||||
|
|
||||||
proc allocSeqCString*(s: seq[string]): (ptr cstring, csize_t) {.inline, gcsafe, cdecl.} =
|
|
||||||
if s.len == 0: return (nil, 0)
|
|
||||||
let count = s.len
|
|
||||||
# Allocate memory for 'count' cstring pointers, cast to ptr UncheckedArray
|
|
||||||
let arrPtr = cast[ptr UncheckedArray[cstring]](allocShared(count * sizeof(cstring)))
|
|
||||||
for i in 0..<count:
|
|
||||||
# Allocate each string and store its pointer in the array using unchecked array indexing
|
|
||||||
arrPtr[i] = allocCString(s[i])
|
|
||||||
# Return pointer to the first element, cast back to ptr cstring
|
|
||||||
return (cast[ptr cstring](arrPtr), count.csize_t)
|
|
||||||
|
|
||||||
proc freeCString*(cs: cstring) {.inline, gcsafe.} =
|
|
||||||
if cs != nil: deallocShared(cs)
|
|
||||||
|
|
||||||
proc freeSeqByte*(bufferPtr: pointer) {.inline, gcsafe, cdecl.} =
|
|
||||||
if bufferPtr != nil: deallocShared(bufferPtr)
|
|
||||||
|
|
||||||
# Corrected to accept ptr cstring
|
|
||||||
proc freeSeqCString*(arrPtr: ptr cstring, count: csize_t) {.inline, gcsafe, cdecl.} =
|
|
||||||
if arrPtr != nil:
|
|
||||||
# Cast to ptr UncheckedArray for proper iteration/indexing before freeing
|
|
||||||
let arr = cast[ptr UncheckedArray[cstring]](arrPtr)
|
|
||||||
for i in 0..<count:
|
|
||||||
freeCString(arr[i]) # Free each individual cstring
|
|
||||||
deallocShared(arrPtr) # Free the array pointer itself
|
|
||||||
|
|
||||||
# --- Result Conversion Helpers ---
|
|
||||||
|
|
||||||
proc toCResultOk*(): CResult =
|
|
||||||
CResult(is_ok: true, error_message: nil)
|
|
||||||
|
|
||||||
proc toCResultErr*(err: ReliabilityError): CResult =
|
|
||||||
CResult(is_ok: false, error_message: allocCString($err))
|
|
||||||
|
|
||||||
proc toCResultErrStr*(errMsg: string): CResult =
|
|
||||||
CResult(is_ok: false, error_message: allocCString(errMsg))
|
|
||||||
|
|
||||||
# --- Callback Wrappers (Nim -> C) ---
|
|
||||||
# These wrappers call the single global Go callback relay.
|
|
||||||
|
|
||||||
proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) =
|
|
||||||
echo "[Nim Binding] nimMessageReadyCallback called for: ", messageId
|
|
||||||
let handle = cast[CReliabilityManagerHandle](rm)
|
|
||||||
var cb: CEventCallback
|
|
||||||
withLock registryLock:
|
|
||||||
if not callbackRegistry.hasKey(handle):
|
|
||||||
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle)
|
|
||||||
return
|
|
||||||
cb = callbackRegistry[handle]
|
|
||||||
|
|
||||||
# Pass handle, event type, and messageId (as data1)
|
|
||||||
cb(handle, EVENT_MESSAGE_READY, cast[pointer](messageId.cstring), nil, 0)
|
|
||||||
|
|
||||||
proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) =
|
|
||||||
echo "[Nim Binding] nimMessageSentCallback called for: ", messageId
|
|
||||||
let handle = cast[CReliabilityManagerHandle](rm)
|
|
||||||
var cb: CEventCallback
|
|
||||||
withLock registryLock:
|
|
||||||
if not callbackRegistry.hasKey(handle):
|
|
||||||
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle)
|
|
||||||
return
|
|
||||||
cb = callbackRegistry[handle]
|
|
||||||
|
|
||||||
cb(handle, EVENT_MESSAGE_SENT, cast[pointer](messageId.cstring), nil, 0)
|
|
||||||
|
|
||||||
proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) =
|
|
||||||
echo "[Nim Binding] nimMissingDependenciesCallback called for: ", messageId, " with deps: ", $missingDeps
|
|
||||||
let handle = cast[CReliabilityManagerHandle](rm)
|
|
||||||
var cb: CEventCallback
|
|
||||||
withLock registryLock:
|
|
||||||
if not callbackRegistry.hasKey(handle):
|
|
||||||
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle)
|
|
||||||
return
|
|
||||||
cb = callbackRegistry[handle]
|
|
||||||
|
|
||||||
# Prepare data for the callback
|
|
||||||
var cDepsPtr: ptr cstring = nil
|
|
||||||
var cDepsCount: csize_t = 0
|
|
||||||
var cDepsNim: seq[cstring] = @[] # Keep Nim seq alive during call
|
|
||||||
if missingDeps.len > 0:
|
|
||||||
cDepsNim = newSeq[cstring](missingDeps.len)
|
|
||||||
for i, dep in missingDeps:
|
|
||||||
cDepsNim[i] = dep.cstring # Nim GC manages these cstrings via the seq
|
|
||||||
cDepsPtr = cast[ptr cstring](cDepsNim[0].addr)
|
|
||||||
cDepsCount = missingDeps.len.csize_t
|
|
||||||
|
|
||||||
cb(handle, EVENT_MISSING_DEPENDENCIES, cast[pointer](messageId.cstring), cast[pointer](cDepsPtr), cDepsCount)
|
|
||||||
|
|
||||||
proc nimPeriodicSyncCallback(rm: ReliabilityManager) =
|
|
||||||
echo "[Nim Binding] nimPeriodicSyncCallback called"
|
|
||||||
let handle = cast[CReliabilityManagerHandle](rm)
|
|
||||||
var cb: CEventCallback
|
|
||||||
withLock registryLock:
|
|
||||||
if not callbackRegistry.hasKey(handle):
|
|
||||||
echo "[Nim Binding] No callback registered for handle: ", cast[int](handle)
|
|
||||||
return
|
|
||||||
cb = callbackRegistry[handle]
|
|
||||||
|
|
||||||
cb(handle, EVENT_PERIODIC_SYNC, nil, nil, 0)
|
|
||||||
|
|
||||||
# --- Exported C Functions - Using Opaque Pointer ---
|
|
||||||
|
|
||||||
proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} =
|
|
||||||
let channelId = $channelIdCStr
|
|
||||||
if channelId.len == 0:
|
|
||||||
echo "Error creating ReliabilityManager: Channel ID cannot be empty"
|
|
||||||
return nil # Return nil pointer
|
|
||||||
let rmResult = newReliabilityManager(channelId)
|
|
||||||
if rmResult.isOk:
|
|
||||||
let rm = rmResult.get()
|
|
||||||
# Assign anonymous procs that capture 'rm' and call the wrappers
|
|
||||||
# Ensure signatures match the non-gcsafe fields in ReliabilityManager
|
|
||||||
rm.onMessageReady = proc(msgId: MessageID) = nimMessageReadyCallback(rm, msgId)
|
|
||||||
rm.onMessageSent = proc(msgId: MessageID) = nimMessageSentCallback(rm, msgId)
|
|
||||||
rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) = nimMissingDependenciesCallback(rm, msgId, deps)
|
|
||||||
rm.onPeriodicSync = proc() = nimPeriodicSyncCallback(rm)
|
|
||||||
|
|
||||||
# Return the Nim ref object cast to the opaque pointer type
|
|
||||||
let handle = cast[CReliabilityManagerHandle](rm)
|
|
||||||
GC_ref(rm) # Prevent GC from moving the object while Go holds the handle
|
|
||||||
return handle
|
|
||||||
else:
|
|
||||||
echo "Error creating ReliabilityManager: ", rmResult.error
|
|
||||||
return nil # Return nil pointer
|
|
||||||
|
|
||||||
proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} =
|
|
||||||
let handlePtr = handle
|
|
||||||
if handlePtr != nil:
|
|
||||||
# Go side should handle removing the handle from its registry.
|
|
||||||
# We just need to unref the Nim object.
|
|
||||||
# No need to interact with gEventCallback here.
|
|
||||||
|
|
||||||
# Cast opaque pointer back to Nim ref type
|
|
||||||
let rm = cast[ReliabilityManager](handlePtr)
|
|
||||||
cleanup(rm) # Call Nim cleanup
|
|
||||||
GC_unref(rm) # Allow GC to collect the object now that Go is done
|
|
||||||
else:
|
|
||||||
echo "Warning: CleanupReliabilityManager called with NULL handle"
|
|
||||||
|
|
||||||
proc ResetReliabilityManager*(handle: CReliabilityManagerHandle): CResult {.exportc, dynlib, cdecl, gcsafe.} =
|
|
||||||
if handle == nil:
|
|
||||||
return toCResultErrStr("ReliabilityManager handle is NULL")
|
|
||||||
let rm = cast[ReliabilityManager](handle)
|
|
||||||
let result = resetReliabilityManager(rm)
|
|
||||||
if result.isOk:
|
|
||||||
return toCResultOk()
|
|
||||||
else:
|
|
||||||
return toCResultErr(result.error)
|
|
||||||
|
|
||||||
proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe
|
|
||||||
if handle == nil:
|
|
||||||
return CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
|
|
||||||
let rm = cast[ReliabilityManager](handle)
|
|
||||||
|
|
||||||
if messageC == nil and messageLen > 0:
|
|
||||||
return CWrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0"))
|
|
||||||
if messageIdCStr == nil:
|
|
||||||
return CWrapResult(base_result: toCResultErrStr("Message ID pointer is NULL"))
|
|
||||||
|
|
||||||
let messageId = $messageIdCStr
|
|
||||||
var messageNim: seq[byte]
|
|
||||||
if messageLen > 0:
|
|
||||||
messageNim = newSeq[byte](messageLen)
|
|
||||||
copyMem(messageNim[0].addr, messageC, messageLen.Natural)
|
|
||||||
else:
|
|
||||||
messageNim = @[]
|
|
||||||
|
|
||||||
let wrapResult = wrapOutgoingMessage(rm, messageNim, messageId)
|
|
||||||
if wrapResult.isOk:
|
|
||||||
let (wrappedDataPtr, wrappedDataLen) = allocSeqByte(wrapResult.get())
|
|
||||||
return CWrapResult(
|
|
||||||
base_result: toCResultOk(),
|
|
||||||
message: wrappedDataPtr,
|
|
||||||
message_len: wrappedDataLen
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return CWrapResult(base_result: toCResultErr(wrapResult.error))
|
|
||||||
|
|
||||||
proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t): CUnwrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe
|
|
||||||
if handle == nil:
|
|
||||||
return CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL"))
|
|
||||||
let rm = cast[ReliabilityManager](handle)
|
|
||||||
|
|
||||||
if messageC == nil and messageLen > 0:
|
|
||||||
return CUnwrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0"))
|
|
||||||
|
|
||||||
var messageNim: seq[byte]
|
|
||||||
if messageLen > 0:
|
|
||||||
messageNim = newSeq[byte](messageLen)
|
|
||||||
copyMem(messageNim[0].addr, messageC, messageLen.Natural)
|
|
||||||
else:
|
|
||||||
messageNim = @[]
|
|
||||||
|
|
||||||
let unwrapResult = unwrapReceivedMessage(rm, messageNim)
|
|
||||||
if unwrapResult.isOk:
|
|
||||||
let (unwrappedContent, missingDepsNim) = unwrapResult.get()
|
|
||||||
let (contentPtr, contentLen) = allocSeqByte(unwrappedContent)
|
|
||||||
let (depsPtr, depsCount) = allocSeqCString(missingDepsNim)
|
|
||||||
return CUnwrapResult(
|
|
||||||
base_result: toCResultOk(),
|
|
||||||
message: contentPtr,
|
|
||||||
message_len: contentLen,
|
|
||||||
missing_deps: depsPtr,
|
|
||||||
missing_deps_count: depsCount
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return CUnwrapResult(base_result: toCResultErr(unwrapResult.error))
|
|
||||||
|
|
||||||
proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr cstring, count: csize_t): CResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe
|
|
||||||
if handle == nil:
|
|
||||||
return toCResultErrStr("ReliabilityManager handle is NULL")
|
|
||||||
let rm = cast[ReliabilityManager](handle)
|
|
||||||
|
|
||||||
if messageIDsC == nil and count > 0:
|
|
||||||
return toCResultErrStr("MessageIDs pointer is NULL but count > 0")
|
|
||||||
|
|
||||||
var messageIDsNim = newSeq[string](count)
|
|
||||||
# Cast to ptr UncheckedArray for indexing
|
|
||||||
let messageIDsCArray = cast[ptr UncheckedArray[cstring]](messageIDsC)
|
|
||||||
for i in 0..<count:
|
|
||||||
let currentCStr = messageIDsCArray[i] # Use unchecked array indexing
|
|
||||||
if currentCStr != nil:
|
|
||||||
messageIDsNim[i] = $currentCStr
|
|
||||||
else:
|
|
||||||
return toCResultErrStr("NULL message ID found in array")
|
|
||||||
|
|
||||||
let result = markDependenciesMet(rm, messageIDsNim)
|
|
||||||
if result.isOk:
|
|
||||||
return toCResultOk()
|
|
||||||
else:
|
|
||||||
return toCResultErr(result.error)
|
|
||||||
|
|
||||||
proc RegisterCallback*(handle: CReliabilityManagerHandle,
|
|
||||||
cEventCallback: CEventCallback,
|
|
||||||
cUserDataPtr: pointer) {.exportc, dynlib, cdecl.} =
|
|
||||||
withLock registryLock:
|
|
||||||
callbackRegistry[handle] = cEventCallback
|
|
||||||
echo "[Nim Binding] Registered callback for handle: ", cast[int](handle)
|
|
||||||
|
|
||||||
proc StartPeriodicTasks*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} =
|
|
||||||
if handle == nil:
|
|
||||||
echo "Error: Cannot start periodic tasks: NULL ReliabilityManager handle"
|
|
||||||
return
|
|
||||||
let rm = cast[ReliabilityManager](handle)
|
|
||||||
startPeriodicTasks(rm)
|
|
||||||
|
|
||||||
# --- Memory Freeing Functions ---
|
|
||||||
|
|
||||||
proc FreeCResultError*(result: CResult) {.exportc, dynlib, gcsafe, cdecl.} =
|
|
||||||
freeCString(result.error_message)
|
|
||||||
|
|
||||||
proc FreeCWrapResult*(result: CWrapResult) {.exportc, dynlib, gcsafe, cdecl.} =
|
|
||||||
freeCString(result.base_result.error_message)
|
|
||||||
freeSeqByte(result.message)
|
|
||||||
|
|
||||||
proc FreeCUnwrapResult*(result: CUnwrapResult) {.exportc, dynlib, gcsafe, cdecl.} =
|
|
||||||
freeCString(result.base_result.error_message)
|
|
||||||
freeSeqByte(result.message)
|
|
||||||
freeSeqCString(result.missing_deps, result.missing_deps_count)
|
|
||||||
Binary file not shown.
73
library/alloc.nim
Normal file
73
library/alloc.nim
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
## Can be shared safely between threads
|
||||||
|
type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
|
||||||
|
|
||||||
|
proc alloc*(str: cstring): cstring =
|
||||||
|
# Byte allocation from the given address.
|
||||||
|
# There should be the corresponding manual deallocation with deallocShared !
|
||||||
|
if str.isNil():
|
||||||
|
var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator
|
||||||
|
ret[0] = '\0' # Set the null terminator
|
||||||
|
return ret
|
||||||
|
|
||||||
|
let ret = cast[cstring](allocShared(len(str) + 1))
|
||||||
|
copyMem(ret, str, len(str) + 1)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc alloc*(str: string): cstring =
|
||||||
|
## Byte allocation from the given address.
|
||||||
|
## There should be the corresponding manual deallocation with deallocShared !
|
||||||
|
var ret = cast[cstring](allocShared(str.len + 1))
|
||||||
|
let s = cast[seq[char]](str)
|
||||||
|
for i in 0 ..< str.len:
|
||||||
|
ret[i] = s[i]
|
||||||
|
ret[str.len] = '\0'
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
|
||||||
|
let data = allocShared(sizeof(T) * s.len)
|
||||||
|
if s.len != 0:
|
||||||
|
copyMem(data, unsafeAddr s[0], s.len)
|
||||||
|
return (cast[ptr UncheckedArray[T]](data), s.len)
|
||||||
|
|
||||||
|
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
|
||||||
|
if not s.data.isNil:
|
||||||
|
when T is cstring:
|
||||||
|
# For array of cstrings, deallocate each string first
|
||||||
|
for i in 0 ..< s.len:
|
||||||
|
if not s.data[i].isNil:
|
||||||
|
# Deallocate each cstring
|
||||||
|
deallocShared(s.data[i])
|
||||||
|
|
||||||
|
deallocShared(s.data)
|
||||||
|
s.len = 0
|
||||||
|
|
||||||
|
proc toSeq*[T](s: SharedSeq[T]): seq[T] =
|
||||||
|
## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required
|
||||||
|
## as req[T] is a GC managed type.
|
||||||
|
var ret = newSeq[T]()
|
||||||
|
for i in 0 ..< s.len:
|
||||||
|
ret.add(s.data[i])
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc allocSharedSeqFromCArray*[T](arr: ptr T, len: int): SharedSeq[T] =
|
||||||
|
## Creates a SharedSeq[T] from a C array pointer and length.
|
||||||
|
## The data is copied to shared memory.
|
||||||
|
## There should be a corresponding manual deallocation with deallocSharedSeq!
|
||||||
|
if arr.isNil or len <= 0:
|
||||||
|
return (nil, 0)
|
||||||
|
|
||||||
|
when T is cstring:
|
||||||
|
# Special handling for arrays of cstrings
|
||||||
|
let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * len))
|
||||||
|
let cstrArr = cast[ptr UncheckedArray[cstring]](arr)
|
||||||
|
|
||||||
|
for i in 0 ..< len:
|
||||||
|
# Use the existing alloc proc to properly allocate each cstring
|
||||||
|
data[i] = cstrArr[i].alloc()
|
||||||
|
|
||||||
|
return (data, len)
|
||||||
|
else:
|
||||||
|
# Original handling for non-cstring types
|
||||||
|
let data = allocShared(sizeof(T) * len)
|
||||||
|
copyMem(data, arr, sizeof(T) * len)
|
||||||
|
return (cast[ptr UncheckedArray[T]](data), len)
|
||||||
6
library/events/json_base_event.nim
Normal file
6
library/events/json_base_event.nim
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
type JsonEvent* = ref object of RootObj # https://rfc.vac.dev/spec/36/#jsonsignal-type
|
||||||
|
eventType* {.requiresInit.}: string
|
||||||
|
|
||||||
|
method `$`*(jsonEvent: JsonEvent): string {.base.} =
|
||||||
|
discard
|
||||||
|
# All events should implement this
|
||||||
11
library/events/json_message_ready_event.nim
Normal file
11
library/events/json_message_ready_event.nim
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
import std/json
|
||||||
|
import ./json_base_event, ../../src/[message]
|
||||||
|
|
||||||
|
type JsonMessageReadyEvent* = ref object of JsonEvent
|
||||||
|
messageId*: MessageID
|
||||||
|
|
||||||
|
proc new*(T: type JsonMessageReadyEvent, messageId: MessageID): T =
|
||||||
|
return JsonMessageReadyEvent(eventType: "message_ready", messageId: messageId)
|
||||||
|
|
||||||
|
method `$`*(jsonMessageReady: JsonMessageReadyEvent): string =
|
||||||
|
$(%*jsonMessageReady)
|
||||||
11
library/events/json_message_sent_event.nim
Normal file
11
library/events/json_message_sent_event.nim
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
import std/json
|
||||||
|
import ./json_base_event, ../../src/[message]
|
||||||
|
|
||||||
|
type JsonMessageSentEvent* = ref object of JsonEvent
|
||||||
|
messageId*: MessageID
|
||||||
|
|
||||||
|
proc new*(T: type JsonMessageSentEvent, messageId: MessageID): T =
|
||||||
|
return JsonMessageSentEvent(eventType: "message_sent", messageId: messageId)
|
||||||
|
|
||||||
|
method `$`*(jsonMessageSent: JsonMessageSentEvent): string =
|
||||||
|
$(%*jsonMessageSent)
|
||||||
18
library/events/json_missing_dependencies_event.nim
Normal file
18
library/events/json_missing_dependencies_event.nim
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
import std/json
|
||||||
|
import ./json_base_event, ../../src/[message]
|
||||||
|
|
||||||
|
type JsonMissingDependenciesEvent* = ref object of JsonEvent
|
||||||
|
messageId*: MessageID
|
||||||
|
missingDeps: seq[MessageID]
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type JsonMissingDependenciesEvent,
|
||||||
|
messageId: MessageID,
|
||||||
|
missingDeps: seq[MessageID],
|
||||||
|
): T =
|
||||||
|
return JsonMissingDependenciesEvent(
|
||||||
|
eventType: "missing_dependencies", messageId: messageId, missingDeps: missingDeps
|
||||||
|
)
|
||||||
|
|
||||||
|
method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string =
|
||||||
|
$(%*jsonMissingDependencies)
|
||||||
10
library/events/json_periodic_sync_event.nim
Normal file
10
library/events/json_periodic_sync_event.nim
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
import std/json
|
||||||
|
import ./json_base_event
|
||||||
|
|
||||||
|
type JsonPeriodicSyncEvent* = ref object of JsonEvent
|
||||||
|
|
||||||
|
proc new*(T: type JsonPeriodicSyncEvent): T =
|
||||||
|
return JsonPeriodicSyncEvent(eventType: "periodic_sync")
|
||||||
|
|
||||||
|
method `$`*(jsonPeriodicSync: JsonPeriodicSyncEvent): string =
|
||||||
|
$(%*jsonPeriodicSync)
|
||||||
30
library/ffi_types.nim
Normal file
30
library/ffi_types.nim
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
################################################################################
|
||||||
|
### Exported types
|
||||||
|
|
||||||
|
type SdsCallBack* = proc(
|
||||||
|
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||||
|
) {.cdecl, gcsafe, raises: [].}
|
||||||
|
|
||||||
|
const RET_OK*: cint = 0
|
||||||
|
const RET_ERR*: cint = 1
|
||||||
|
const RET_MISSING_CALLBACK*: cint = 2
|
||||||
|
|
||||||
|
### End of exported types
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### FFI utils
|
||||||
|
|
||||||
|
template foreignThreadGc*(body: untyped) =
|
||||||
|
when declared(setupForeignThreadGc):
|
||||||
|
setupForeignThreadGc()
|
||||||
|
|
||||||
|
body
|
||||||
|
|
||||||
|
when declared(tearDownForeignThreadGc):
|
||||||
|
tearDownForeignThreadGc()
|
||||||
|
|
||||||
|
type onDone* = proc()
|
||||||
|
|
||||||
|
### End of FFI utils
|
||||||
|
################################################################################
|
||||||
62
library/libsds.h
Normal file
62
library/libsds.h
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
|
||||||
|
// Generated manually and inspired by the one generated by the Nim Compiler.
|
||||||
|
// In order to see the header file generated by Nim just run `make libsds`
|
||||||
|
// from the root repo folder and the header should be created in
|
||||||
|
// nimcache/release/libsds/libsds.h
|
||||||
|
#ifndef __libsds__
|
||||||
|
#define __libsds__
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
// The possible returned values for the functions that return int
|
||||||
|
#define RET_OK 0
|
||||||
|
#define RET_ERR 1
|
||||||
|
#define RET_MISSING_CALLBACK 2
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData);
|
||||||
|
|
||||||
|
|
||||||
|
// --- Core API Functions ---
|
||||||
|
|
||||||
|
|
||||||
|
void* NewReliabilityManager(const char* channelId, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
void SetEventCallback(void* ctx, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
int CleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
int ResetReliabilityManager(void* ctx, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
int WrapOutgoingMessage(void* ctx,
|
||||||
|
void* message,
|
||||||
|
size_t messageLen,
|
||||||
|
const char* messageId,
|
||||||
|
SdsCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
|
int UnwrapReceivedMessage(void* ctx,
|
||||||
|
void* message,
|
||||||
|
size_t messageLen,
|
||||||
|
SdsCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
|
int MarkDependenciesMet(void* ctx,
|
||||||
|
char** messageIDs,
|
||||||
|
size_t count,
|
||||||
|
SdsCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
|
int StartPeriodicTasks(void* ctx, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* __libsds__ */
|
||||||
304
library/libsds.nim
Normal file
304
library/libsds.nim
Normal file
@ -0,0 +1,304 @@
|
|||||||
|
{.pragma: exported, exportc, cdecl, raises: [].}
|
||||||
|
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||||
|
{.passc: "-fPIC".}
|
||||||
|
|
||||||
|
when defined(linux):
|
||||||
|
{.passl: "-Wl,-soname,libsds.so".}
|
||||||
|
|
||||||
|
import std/[locks, typetraits, tables, atomics], chronos, chronicles
|
||||||
|
import
|
||||||
|
./sds_thread/sds_thread,
|
||||||
|
./alloc,
|
||||||
|
./ffi_types,
|
||||||
|
./sds_thread/inter_thread_communication/sds_thread_request,
|
||||||
|
./sds_thread/inter_thread_communication/requests/
|
||||||
|
[sds_lifecycle_request, sds_message_request, sds_dependencies_request],
|
||||||
|
../src/[reliability, reliability_utils, message],
|
||||||
|
./events/[
|
||||||
|
json_message_ready_event, json_message_sent_event, json_missing_dependencies_event,
|
||||||
|
json_periodic_sync_event,
|
||||||
|
]
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### Wrapper around the reliability manager
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### Not-exported components
|
||||||
|
|
||||||
|
template checkLibsdsParams*(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
) =
|
||||||
|
ctx[].userData = userData
|
||||||
|
|
||||||
|
if isNil(callback):
|
||||||
|
return RET_MISSING_CALLBACK
|
||||||
|
|
||||||
|
template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped) =
|
||||||
|
if isNil(ctx[].eventCallback):
|
||||||
|
error eventName & " - eventCallback is nil"
|
||||||
|
return
|
||||||
|
|
||||||
|
if isNil(ctx[].eventUserData):
|
||||||
|
error eventName & " - eventUserData is nil"
|
||||||
|
return
|
||||||
|
|
||||||
|
foreignThreadGc:
|
||||||
|
try:
|
||||||
|
let event = body
|
||||||
|
cast[SdsCallBack](ctx[].eventCallback)(
|
||||||
|
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
|
||||||
|
)
|
||||||
|
except Exception, CatchableError:
|
||||||
|
let msg =
|
||||||
|
"Exception " & eventName & " when calling 'eventCallBack': " &
|
||||||
|
getCurrentExceptionMsg()
|
||||||
|
cast[SdsCallBack](ctx[].eventCallback)(
|
||||||
|
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||||
|
)
|
||||||
|
|
||||||
|
proc handleRequest(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
requestType: RequestType,
|
||||||
|
content: pointer,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint =
|
||||||
|
sds_thread.sendRequestToSdsThread(ctx, requestType, content, callback, userData).isOkOr:
|
||||||
|
let msg = "libsds error: " & $error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
return RET_OK
|
||||||
|
|
||||||
|
proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback =
|
||||||
|
return proc(messageId: MessageID) {.gcsafe.} =
|
||||||
|
callEventCallback(ctx, "onMessageReady"):
|
||||||
|
$JsonMessageReadyEvent.new(messageId)
|
||||||
|
|
||||||
|
proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback =
|
||||||
|
return proc(messageId: MessageID) {.gcsafe.} =
|
||||||
|
callEventCallback(ctx, "onMessageSent"):
|
||||||
|
$JsonMessageSentEvent.new(messageId)
|
||||||
|
|
||||||
|
proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback =
|
||||||
|
return proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
|
||||||
|
callEventCallback(ctx, "onMissingDependencies"):
|
||||||
|
$JsonMissingDependenciesEvent.new(messageId, missingDeps)
|
||||||
|
|
||||||
|
proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback =
|
||||||
|
return proc() {.gcsafe.} =
|
||||||
|
callEventCallback(ctx, "onPeriodicSync"):
|
||||||
|
$JsonPeriodicSyncEvent.new()
|
||||||
|
|
||||||
|
### End of not-exported components
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### Library setup
|
||||||
|
|
||||||
|
# Every Nim library must have this function called - the name is derived from
|
||||||
|
# the `--nimMainPrefix` command line option
|
||||||
|
proc libsdsNimMain() {.importc.}
|
||||||
|
|
||||||
|
# To control when the library has been initialized
|
||||||
|
var initialized: Atomic[bool]
|
||||||
|
|
||||||
|
if defined(android):
|
||||||
|
# Redirect chronicles to Android System logs
|
||||||
|
when compiles(defaultChroniclesStream.outputs[0].writer):
|
||||||
|
defaultChroniclesStream.outputs[0].writer = proc(
|
||||||
|
logLevel: LogLevel, msg: LogOutputStr
|
||||||
|
) {.raises: [].} =
|
||||||
|
echo logLevel, msg
|
||||||
|
|
||||||
|
proc initializeLibrary() {.exported.} =
|
||||||
|
if not initialized.exchange(true):
|
||||||
|
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
|
||||||
|
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
|
||||||
|
libsdsNimMain()
|
||||||
|
when declared(setupForeignThreadGc):
|
||||||
|
setupForeignThreadGc()
|
||||||
|
when declared(nimGC_setStackBottom):
|
||||||
|
var locals {.volatile, noinit.}: pointer
|
||||||
|
locals = addr(locals)
|
||||||
|
nimGC_setStackBottom(locals)
|
||||||
|
|
||||||
|
### End of library setup
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### Exported procs
|
||||||
|
|
||||||
|
proc NewReliabilityManager(
|
||||||
|
channelId: cstring, callback: SdsCallBack, userData: pointer
|
||||||
|
): pointer {.dynlib, exportc, cdecl.} =
|
||||||
|
initializeLibrary()
|
||||||
|
|
||||||
|
## Creates a new instance of the Reliability Manager.
|
||||||
|
if isNil(callback):
|
||||||
|
echo "error: missing callback in NewReliabilityManager"
|
||||||
|
return nil
|
||||||
|
|
||||||
|
## Create the SDS thread that will keep waiting for req from the main thread.
|
||||||
|
var ctx = sds_thread.createSdsThread().valueOr:
|
||||||
|
let msg = "Error in createSdsThread: " & $error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return nil
|
||||||
|
|
||||||
|
ctx.userData = userData
|
||||||
|
|
||||||
|
let appCallbacks = AppCallbacks(
|
||||||
|
messageReadyCb: onMessageReady(ctx),
|
||||||
|
messageSentCb: onMessageSent(ctx),
|
||||||
|
missingDependenciesCb: onMissingDependencies(ctx),
|
||||||
|
periodicSyncCb: onPeriodicSync(ctx),
|
||||||
|
)
|
||||||
|
|
||||||
|
let retCode = handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.LIFECYCLE,
|
||||||
|
SdsLifecycleRequest.createShared(
|
||||||
|
SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, channelId, appCallbacks
|
||||||
|
),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
if retCode == RET_ERR:
|
||||||
|
return nil
|
||||||
|
|
||||||
|
return ctx
|
||||||
|
|
||||||
|
proc SetEventCallback(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
) {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
ctx[].eventCallback = cast[pointer](callback)
|
||||||
|
ctx[].eventUserData = userData
|
||||||
|
|
||||||
|
proc CleanupReliabilityManager(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
sds_thread.destroySdsThread(ctx).isOkOr:
|
||||||
|
let msg = "libsds error: " & $error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
## always need to invoke the callback although we don't retrieve value to the caller
|
||||||
|
callback(RET_OK, nil, 0, userData)
|
||||||
|
|
||||||
|
return RET_OK
|
||||||
|
|
||||||
|
proc ResetReliabilityManager(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.LIFECYCLE,
|
||||||
|
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc WrapOutgoingMessage(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
message: pointer,
|
||||||
|
messageLen: csize_t,
|
||||||
|
messageId: cstring,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
if message == nil and messageLen > 0:
|
||||||
|
let msg = "libsds error: " & "message pointer is NULL but length > 0"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
if messageId == nil:
|
||||||
|
let msg = "libsds error: " & "message ID pointer is NULL"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.MESSAGE,
|
||||||
|
SdsMessageRequest.createShared(
|
||||||
|
SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId
|
||||||
|
),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc UnwrapReceivedMessage(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
message: pointer,
|
||||||
|
messageLen: csize_t,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
if message == nil and messageLen > 0:
|
||||||
|
let msg = "libsds error: " & "message pointer is NULL but length > 0"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.MESSAGE,
|
||||||
|
SdsMessageRequest.createShared(
|
||||||
|
SdsMessageMsgType.UNWRAP_MESSAGE, message, messageLen
|
||||||
|
),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc MarkDependenciesMet(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
messageIds: pointer,
|
||||||
|
count: csize_t,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
if messageIds == nil and count > 0:
|
||||||
|
let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.DEPENDENCIES,
|
||||||
|
SdsDependenciesRequest.createShared(
|
||||||
|
SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count
|
||||||
|
),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc StartPeriodicTasks(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.LIFECYCLE,
|
||||||
|
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.START_PERIODIC_TASKS),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
### End of exported procs
|
||||||
|
################################################################################
|
||||||
@ -0,0 +1,48 @@
|
|||||||
|
import std/[options, json, strutils, net, sequtils]
|
||||||
|
import chronos, chronicles, results, confutils, confutils/std/net
|
||||||
|
|
||||||
|
import ../../../alloc
|
||||||
|
import ../../../../src/[reliability_utils, reliability, message]
|
||||||
|
|
||||||
|
type SdsDependenciesMsgType* = enum
|
||||||
|
MARK_DEPENDENCIES_MET
|
||||||
|
|
||||||
|
type SdsDependenciesRequest* = object
|
||||||
|
operation: SdsDependenciesMsgType
|
||||||
|
messageIds: SharedSeq[cstring]
|
||||||
|
count: csize_t
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type SdsDependenciesRequest,
|
||||||
|
op: SdsDependenciesMsgType,
|
||||||
|
messageIds: pointer,
|
||||||
|
count: csize_t = 0,
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].operation = op
|
||||||
|
ret[].count = count
|
||||||
|
ret[].messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc destroyShared(self: ptr SdsDependenciesRequest) =
|
||||||
|
deallocSharedSeq(self[].messageIds)
|
||||||
|
deallocShared(self)
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
self: ptr SdsDependenciesRequest, rm: ptr ReliabilityManager
|
||||||
|
): Future[Result[string, string]] {.async.} =
|
||||||
|
defer:
|
||||||
|
destroyShared(self)
|
||||||
|
|
||||||
|
case self.operation
|
||||||
|
of MARK_DEPENDENCIES_MET:
|
||||||
|
let messageIdsC = self.messageIds.toSeq()
|
||||||
|
let messageIds = messageIdsC.mapIt($it)
|
||||||
|
|
||||||
|
markDependenciesMet(rm[], messageIds).isOkOr:
|
||||||
|
error "MARK_DEPENDENCIES_MET failed", error = error
|
||||||
|
return err("error processing MARK_DEPENDENCIES_MET request: " & $error)
|
||||||
|
|
||||||
|
return ok("")
|
||||||
|
|
||||||
|
return ok("")
|
||||||
@ -0,0 +1,70 @@
|
|||||||
|
import std/[options, json, strutils, net]
|
||||||
|
import chronos, chronicles, results, confutils, confutils/std/net
|
||||||
|
|
||||||
|
import ../../../alloc
|
||||||
|
import ../../../../src/[reliability_utils, reliability, message]
|
||||||
|
|
||||||
|
type SdsLifecycleMsgType* = enum
|
||||||
|
CREATE_RELIABILITY_MANAGER
|
||||||
|
RESET_RELIABILITY_MANAGER
|
||||||
|
START_PERIODIC_TASKS
|
||||||
|
|
||||||
|
type SdsLifecycleRequest* = object
|
||||||
|
operation: SdsLifecycleMsgType
|
||||||
|
channelId: cstring
|
||||||
|
appCallbacks: AppCallbacks
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type SdsLifecycleRequest,
|
||||||
|
op: SdsLifecycleMsgType,
|
||||||
|
channelId: cstring = "",
|
||||||
|
appCallbacks: AppCallbacks = nil,
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].operation = op
|
||||||
|
ret[].appCallbacks = appCallbacks
|
||||||
|
ret[].channelId = channelId.alloc()
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc destroyShared(self: ptr SdsLifecycleRequest) =
|
||||||
|
deallocShared(self[].channelId)
|
||||||
|
deallocShared(self)
|
||||||
|
|
||||||
|
proc createReliabilityManager(
|
||||||
|
channelIdCStr: cstring, appCallbacks: AppCallbacks = nil
|
||||||
|
): Future[Result[ReliabilityManager, string]] {.async.} =
|
||||||
|
let channelId = $channelIdCStr
|
||||||
|
if channelId.len == 0:
|
||||||
|
error "Failed creating ReliabilityManager: Channel ID cannot be empty"
|
||||||
|
return err("Failed creating ReliabilityManager: Channel ID cannot be empty")
|
||||||
|
|
||||||
|
let rm = newReliabilityManager(channelId).valueOr:
|
||||||
|
error "Failed creating reliability manager", error = error
|
||||||
|
return err("Failed creating reliability manager: " & $error)
|
||||||
|
|
||||||
|
rm.setCallbacks(
|
||||||
|
appCallbacks.messageReadyCb, appCallbacks.messageSentCb,
|
||||||
|
appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb,
|
||||||
|
)
|
||||||
|
|
||||||
|
return ok(rm)
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
self: ptr SdsLifecycleRequest, rm: ptr ReliabilityManager
|
||||||
|
): Future[Result[string, string]] {.async.} =
|
||||||
|
defer:
|
||||||
|
destroyShared(self)
|
||||||
|
|
||||||
|
case self.operation
|
||||||
|
of CREATE_RELIABILITY_MANAGER:
|
||||||
|
rm[] = (await createReliabilityManager(self.channelId, self.appCallbacks)).valueOr:
|
||||||
|
error "CREATE_RELIABILITY_MANAGER failed", error = error
|
||||||
|
return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error)
|
||||||
|
of RESET_RELIABILITY_MANAGER:
|
||||||
|
resetReliabilityManager(rm[]).isOkOr:
|
||||||
|
error "RESET_RELIABILITY_MANAGER failed", error = error
|
||||||
|
return err("error processing RESET_RELIABILITY_MANAGER request: " & $error)
|
||||||
|
of START_PERIODIC_TASKS:
|
||||||
|
rm[].startPeriodicTasks()
|
||||||
|
|
||||||
|
return ok("")
|
||||||
@ -0,0 +1,69 @@
|
|||||||
|
import std/[options, json, strutils, net, sequtils]
|
||||||
|
import chronos, chronicles, results, confutils, confutils/std/net
|
||||||
|
|
||||||
|
import ../../../alloc
|
||||||
|
import ../../../../src/[reliability_utils, reliability, message]
|
||||||
|
|
||||||
|
type SdsMessageMsgType* = enum
|
||||||
|
WRAP_MESSAGE
|
||||||
|
UNWRAP_MESSAGE
|
||||||
|
|
||||||
|
type SdsMessageRequest* = object
|
||||||
|
operation: SdsMessageMsgType
|
||||||
|
message: SharedSeq[byte]
|
||||||
|
messageLen: csize_t
|
||||||
|
messageId: cstring
|
||||||
|
|
||||||
|
type SdsUnwrapResponse* = object
|
||||||
|
message*: seq[byte]
|
||||||
|
missingDeps*: seq[MessageID]
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type SdsMessageRequest,
|
||||||
|
op: SdsMessageMsgType,
|
||||||
|
message: pointer,
|
||||||
|
messageLen: csize_t = 0,
|
||||||
|
messageId: cstring = "",
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].operation = op
|
||||||
|
ret[].messageLen = messageLen
|
||||||
|
ret[].messageId = messageId.alloc()
|
||||||
|
ret[].message = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc destroyShared(self: ptr SdsMessageRequest) =
|
||||||
|
deallocSharedSeq(self[].message)
|
||||||
|
deallocShared(self[].messageId)
|
||||||
|
deallocShared(self)
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
self: ptr SdsMessageRequest, rm: ptr ReliabilityManager
|
||||||
|
): Future[Result[string, string]] {.async.} =
|
||||||
|
defer:
|
||||||
|
destroyShared(self)
|
||||||
|
|
||||||
|
case self.operation
|
||||||
|
of WRAP_MESSAGE:
|
||||||
|
let messageBytes = self.message.toSeq()
|
||||||
|
|
||||||
|
let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId).valueOr:
|
||||||
|
error "WRAP_MESSAGE failed", error = error
|
||||||
|
return err("error processing WRAP_MESSAGE request: " & $error)
|
||||||
|
|
||||||
|
# returns a comma-separates string of bytes
|
||||||
|
return ok(wrappedMessage.mapIt($it).join(","))
|
||||||
|
of UNWRAP_MESSAGE:
|
||||||
|
let messageBytes = self.message.toSeq()
|
||||||
|
|
||||||
|
let (unwrappedMessage, missingDeps) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
|
||||||
|
error "UNWRAP_MESSAGE failed", error = error
|
||||||
|
return err("error processing UNWRAP_MESSAGE request: " & $error)
|
||||||
|
|
||||||
|
let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps)
|
||||||
|
|
||||||
|
# return the result as a json string
|
||||||
|
return ok($(%*(res)))
|
||||||
|
|
||||||
|
return ok("")
|
||||||
@ -0,0 +1,78 @@
|
|||||||
|
## This file contains the base message request type that will be handled.
|
||||||
|
## The requests are created by the main thread and processed by
|
||||||
|
## the SDS Thread.
|
||||||
|
|
||||||
|
import std/json, results
|
||||||
|
import chronos, chronos/threadsync
|
||||||
|
import
|
||||||
|
../../ffi_types,
|
||||||
|
./requests/[sds_lifecycle_request, sds_message_request, sds_dependencies_request],
|
||||||
|
../../../src/[reliability_utils]
|
||||||
|
|
||||||
|
type RequestType* {.pure.} = enum
|
||||||
|
LIFECYCLE
|
||||||
|
MESSAGE
|
||||||
|
DEPENDENCIES
|
||||||
|
|
||||||
|
type SdsThreadRequest* = object
|
||||||
|
reqType: RequestType
|
||||||
|
reqContent: pointer
|
||||||
|
callback: SdsCallBack
|
||||||
|
userData: pointer
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type SdsThreadRequest,
|
||||||
|
reqType: RequestType,
|
||||||
|
reqContent: pointer,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].reqType = reqType
|
||||||
|
ret[].reqContent = reqContent
|
||||||
|
ret[].callback = callback
|
||||||
|
ret[].userData = userData
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc handleRes[T: string | void](
|
||||||
|
res: Result[T, string], request: ptr SdsThreadRequest
|
||||||
|
) =
|
||||||
|
## Handles the Result responses, which can either be Result[string, string] or
|
||||||
|
## Result[void, string].
|
||||||
|
|
||||||
|
defer:
|
||||||
|
deallocShared(request)
|
||||||
|
|
||||||
|
if res.isErr():
|
||||||
|
foreignThreadGc:
|
||||||
|
let msg = "libsds error: handleRes fireSyncRes error: " & $res.error
|
||||||
|
request[].callback(
|
||||||
|
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
foreignThreadGc:
|
||||||
|
var msg: cstring = ""
|
||||||
|
when T is string:
|
||||||
|
msg = res.get().cstring()
|
||||||
|
request[].callback(
|
||||||
|
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
T: type SdsThreadRequest, request: ptr SdsThreadRequest, rm: ptr ReliabilityManager
|
||||||
|
) {.async.} =
|
||||||
|
let retFut =
|
||||||
|
case request[].reqType
|
||||||
|
of LIFECYCLE:
|
||||||
|
cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm)
|
||||||
|
of MESSAGE:
|
||||||
|
cast[ptr SdsMessageRequest](request[].reqContent).process(rm)
|
||||||
|
of DEPENDENCIES:
|
||||||
|
cast[ptr SdsDependenciesRequest](request[].reqContent).process(rm)
|
||||||
|
|
||||||
|
handleRes(await retFut, request)
|
||||||
|
|
||||||
|
proc `$`*(self: SdsThreadRequest): string =
|
||||||
|
return $self.reqType
|
||||||
132
library/sds_thread/sds_thread.nim
Normal file
132
library/sds_thread/sds_thread.nim
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
{.pragma: exported, exportc, cdecl, raises: [].}
|
||||||
|
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||||
|
{.passc: "-fPIC".}
|
||||||
|
|
||||||
|
import std/[options, atomics, os, net, locks]
|
||||||
|
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||||
|
import
|
||||||
|
../ffi_types,
|
||||||
|
./inter_thread_communication/sds_thread_request,
|
||||||
|
../../src/[reliability_utils]
|
||||||
|
|
||||||
|
type SdsContext* = object
|
||||||
|
thread: Thread[(ptr SdsContext)]
|
||||||
|
lock: Lock
|
||||||
|
reqChannel: ChannelSPSCSingle[ptr SdsThreadRequest]
|
||||||
|
reqSignal: ThreadSignalPtr
|
||||||
|
# to inform The SDS Thread (a.k.a TST) that a new request is sent
|
||||||
|
reqReceivedSignal: ThreadSignalPtr
|
||||||
|
# to inform the main thread that the request is rx by TST
|
||||||
|
userData*: pointer
|
||||||
|
eventCallback*: pointer
|
||||||
|
eventUserdata*: pointer
|
||||||
|
running: Atomic[bool] # To control when the thread is running
|
||||||
|
|
||||||
|
proc runSds(ctx: ptr SdsContext) {.async.} =
|
||||||
|
## This is the worker body. This runs the SDS instance
|
||||||
|
## and attends library user requests (stop, connect_to, etc.)
|
||||||
|
|
||||||
|
var rm: ReliabilityManager
|
||||||
|
|
||||||
|
while true:
|
||||||
|
await ctx.reqSignal.wait()
|
||||||
|
|
||||||
|
if ctx.running.load == false:
|
||||||
|
break
|
||||||
|
|
||||||
|
## Trying to get a request from the libsds requestor thread
|
||||||
|
var request: ptr SdsThreadRequest
|
||||||
|
let recvOk = ctx.reqChannel.tryRecv(request)
|
||||||
|
if not recvOk:
|
||||||
|
error "sds thread could not receive a request"
|
||||||
|
continue
|
||||||
|
|
||||||
|
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||||
|
if fireRes.isErr():
|
||||||
|
error "could not fireSync back to requester thread", error = fireRes.error
|
||||||
|
|
||||||
|
## Handle the request
|
||||||
|
asyncSpawn SdsThreadRequest.process(request, addr rm)
|
||||||
|
|
||||||
|
proc run(ctx: ptr SdsContext) {.thread.} =
|
||||||
|
## Launch sds worker
|
||||||
|
waitFor runSds(ctx)
|
||||||
|
|
||||||
|
proc createSdsThread*(): Result[ptr SdsContext, string] =
|
||||||
|
## This proc is called from the main thread and it creates
|
||||||
|
## the SDS working thread.
|
||||||
|
var ctx = createShared(SdsContext, 1)
|
||||||
|
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||||
|
return err("couldn't create reqSignal ThreadSignalPtr")
|
||||||
|
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||||
|
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
||||||
|
ctx.lock.initLock()
|
||||||
|
|
||||||
|
ctx.running.store(true)
|
||||||
|
|
||||||
|
try:
|
||||||
|
createThread(ctx.thread, run, ctx)
|
||||||
|
except ValueError, ResourceExhaustedError:
|
||||||
|
# and freeShared for typed allocations!
|
||||||
|
freeShared(ctx)
|
||||||
|
|
||||||
|
return err("failed to create the SDS thread: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
return ok(ctx)
|
||||||
|
|
||||||
|
proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] =
|
||||||
|
ctx.running.store(false)
|
||||||
|
|
||||||
|
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||||
|
return err("error in destroySdsThread: " & $error)
|
||||||
|
if not signaledOnTime:
|
||||||
|
return err("failed to signal reqSignal on time in destroySdsThread")
|
||||||
|
|
||||||
|
joinThread(ctx.thread)
|
||||||
|
ctx.lock.deinitLock()
|
||||||
|
?ctx.reqSignal.close()
|
||||||
|
?ctx.reqReceivedSignal.close()
|
||||||
|
freeShared(ctx)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc sendRequestToSdsThread*(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
reqType: RequestType,
|
||||||
|
reqContent: pointer,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): Result[void, string] =
|
||||||
|
let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData)
|
||||||
|
|
||||||
|
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||||
|
# between threads assumes that there aren't concurrent requests.
|
||||||
|
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
|
||||||
|
# requests concurrently and spare us the need of locks
|
||||||
|
ctx.lock.acquire()
|
||||||
|
defer:
|
||||||
|
ctx.lock.release()
|
||||||
|
## Sending the request
|
||||||
|
let sentOk = ctx.reqChannel.trySend(req)
|
||||||
|
if not sentOk:
|
||||||
|
deallocShared(req)
|
||||||
|
return err("Couldn't send a request to the sds thread: " & $req[])
|
||||||
|
|
||||||
|
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||||
|
if fireSyncRes.isErr():
|
||||||
|
deallocShared(req)
|
||||||
|
return err("failed fireSync: " & $fireSyncRes.error)
|
||||||
|
|
||||||
|
if fireSyncRes.get() == false:
|
||||||
|
deallocShared(req)
|
||||||
|
return err("Couldn't fireSync in time")
|
||||||
|
|
||||||
|
## wait until the SDS Thread properly received the request
|
||||||
|
let res = ctx.reqReceivedSignal.waitSync()
|
||||||
|
if res.isErr():
|
||||||
|
deallocShared(req)
|
||||||
|
return err("Couldn't receive reqReceivedSignal signal")
|
||||||
|
|
||||||
|
## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the
|
||||||
|
## process proc.
|
||||||
|
ok()
|
||||||
@ -1,32 +0,0 @@
|
|||||||
# Package
|
|
||||||
version = "0.1.0"
|
|
||||||
author = "Waku Team"
|
|
||||||
description = "E2E Reliability Protocol API"
|
|
||||||
license = "MIT"
|
|
||||||
srcDir = "src"
|
|
||||||
|
|
||||||
# Dependencies
|
|
||||||
requires "nim >= 2.0.8"
|
|
||||||
requires "chronicles"
|
|
||||||
requires "libp2p"
|
|
||||||
|
|
||||||
# Tasks
|
|
||||||
task test, "Run the test suite":
|
|
||||||
exec "nim c -r tests/test_bloom.nim"
|
|
||||||
exec "nim c -r tests/test_reliability.nim"
|
|
||||||
|
|
||||||
task bindings, "Generate bindings":
|
|
||||||
proc compile(libName: string, flags = "") =
|
|
||||||
exec "nim c -f " & flags & " -d:release --app:lib --mm:arc --tlsEmulation:off --out:" & libName & " --outdir:bindings/generated bindings/bindings.nim"
|
|
||||||
|
|
||||||
# Create required directories
|
|
||||||
mkDir "bindings/generated"
|
|
||||||
|
|
||||||
when defined(windows):
|
|
||||||
compile "reliability.dll"
|
|
||||||
elif defined(macosx):
|
|
||||||
compile "libsds.dylib.arm", "--cpu:arm64 -l:'-target arm64-apple-macos11' -t:'-target arm64-apple-macos11'"
|
|
||||||
compile "libsds.dylib.x64", "--cpu:amd64 -l:'-target x86_64-apple-macos10.12' -t:'-target x86_64-apple-macos10.12'"
|
|
||||||
exec "lipo bindings/generated/libsds.dylib.arm bindings/generated/libsds.dylib.x64 -output bindings/generated/libsds.dylib -create"
|
|
||||||
else:
|
|
||||||
compile "libsds.so"
|
|
||||||
39
sds.nimble
Normal file
39
sds.nimble
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
# Package
|
||||||
|
version = "0.1.0"
|
||||||
|
author = "Waku Team"
|
||||||
|
description = "E2E Reliability Protocol API"
|
||||||
|
license = "MIT"
|
||||||
|
srcDir = "src"
|
||||||
|
|
||||||
|
# Dependencies
|
||||||
|
requires "nim >= 2.0.8"
|
||||||
|
requires "chronicles"
|
||||||
|
requires "libp2p"
|
||||||
|
|
||||||
|
proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") =
|
||||||
|
if not dirExists "build":
|
||||||
|
mkDir "build"
|
||||||
|
# allow something like "nim nimbus --verbosity:0 --hints:off nimbus.nims"
|
||||||
|
var extra_params = params
|
||||||
|
for i in 2 ..< paramCount():
|
||||||
|
extra_params &= " " & paramStr(i)
|
||||||
|
if `type` == "static":
|
||||||
|
exec "nim c" & " --out:build/" & name &
|
||||||
|
".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --undef:metrics --nimMainPrefix:libsds --skipParentCfg:on " &
|
||||||
|
extra_params & " " & srcDir & name & ".nim"
|
||||||
|
else:
|
||||||
|
exec "nim c" & " --out:build/" & name &
|
||||||
|
".so --threads:on --app:lib --opt:size --noMain --mm:refc --header --undef:metrics --nimMainPrefix:libsds --skipParentCfg:on " &
|
||||||
|
extra_params & " " & srcDir & name & ".nim"
|
||||||
|
|
||||||
|
# Tasks
|
||||||
|
task test, "Run the test suite":
|
||||||
|
exec "nim c -r tests/test_bloom.nim"
|
||||||
|
exec "nim c -r tests/test_reliability.nim"
|
||||||
|
|
||||||
|
task libsdsDynamic, "Generate bindings":
|
||||||
|
let name = "libsds"
|
||||||
|
buildLibrary name,
|
||||||
|
"library/",
|
||||||
|
"",
|
||||||
|
"dynamic"
|
||||||
280
sds_wrapper.go
280
sds_wrapper.go
@ -1,280 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
/*
|
|
||||||
#cgo CFLAGS: -I${SRCDIR}/bindings
|
|
||||||
#cgo LDFLAGS: -L${SRCDIR}/bindings/generated -lbindings
|
|
||||||
#cgo LDFLAGS: -Wl,-rpath,${SRCDIR}/bindings/generated
|
|
||||||
|
|
||||||
#include <stdlib.h> // For C.free
|
|
||||||
#include "bindings/bindings.h" // Update include path
|
|
||||||
|
|
||||||
// Forward declaration for the single Go callback relay function
|
|
||||||
extern void globalCallbackRelay(void* handle, CEventType eventType, void* data1, void* data2, size_t data3);
|
|
||||||
|
|
||||||
// Helper function to call the C memory freeing functions
|
|
||||||
static void callFreeCResultError(CResult res) { FreeCResultError(res); }
|
|
||||||
static void callFreeCWrapResult(CWrapResult res) { FreeCWrapResult(res); }
|
|
||||||
static void callFreeCUnwrapResult(CUnwrapResult res) { FreeCUnwrapResult(res); }
|
|
||||||
|
|
||||||
*/
|
|
||||||
import "C"
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"unsafe"
|
|
||||||
)
|
|
||||||
|
|
||||||
// --- Go Types ---
|
|
||||||
|
|
||||||
// ReliabilityManagerHandle represents the opaque handle to the Nim object
|
|
||||||
type ReliabilityManagerHandle unsafe.Pointer
|
|
||||||
|
|
||||||
// MessageID is a type alias for string for clarity
|
|
||||||
type MessageID string
|
|
||||||
|
|
||||||
// Callbacks holds the Go functions to be called by the Nim library
|
|
||||||
type Callbacks struct {
|
|
||||||
OnMessageReady func(messageId MessageID)
|
|
||||||
OnMessageSent func(messageId MessageID)
|
|
||||||
OnMissingDependencies func(messageId MessageID, missingDeps []MessageID)
|
|
||||||
OnPeriodicSync func()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Global map to store callbacks associated with handles
|
|
||||||
var (
|
|
||||||
callbackRegistry = make(map[ReliabilityManagerHandle]*Callbacks)
|
|
||||||
registryMutex sync.RWMutex
|
|
||||||
)
|
|
||||||
|
|
||||||
// --- Go Wrapper Functions ---
|
|
||||||
|
|
||||||
// NewReliabilityManager creates a new instance of the Nim ReliabilityManager
|
|
||||||
func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) {
|
|
||||||
cChannelId := C.CString(channelId)
|
|
||||||
defer C.free(unsafe.Pointer(cChannelId))
|
|
||||||
|
|
||||||
handle := C.NewReliabilityManager(cChannelId)
|
|
||||||
if handle == nil {
|
|
||||||
// Note: Nim side currently just prints to stdout on creation failure
|
|
||||||
return nil, errors.New("failed to create ReliabilityManager (check Nim logs/stdout)")
|
|
||||||
}
|
|
||||||
return ReliabilityManagerHandle(handle), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CleanupReliabilityManager frees the resources associated with the handle
|
|
||||||
func CleanupReliabilityManager(handle ReliabilityManagerHandle) {
|
|
||||||
if handle == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
registryMutex.Lock()
|
|
||||||
delete(callbackRegistry, handle)
|
|
||||||
registryMutex.Unlock()
|
|
||||||
C.CleanupReliabilityManager(unsafe.Pointer(handle))
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResetReliabilityManager resets the state of the manager
|
|
||||||
func ResetReliabilityManager(handle ReliabilityManagerHandle) error {
|
|
||||||
if handle == nil {
|
|
||||||
return errors.New("handle is nil")
|
|
||||||
}
|
|
||||||
cResult := C.ResetReliabilityManager(unsafe.Pointer(handle))
|
|
||||||
if !cResult.is_ok {
|
|
||||||
errMsg := C.GoString(cResult.error_message)
|
|
||||||
C.callFreeCResultError(cResult) // Free the error message
|
|
||||||
return errors.New(errMsg)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WrapOutgoingMessage wraps a message with reliability metadata
|
|
||||||
func WrapOutgoingMessage(handle ReliabilityManagerHandle, message []byte, messageId MessageID) ([]byte, error) {
|
|
||||||
if handle == nil {
|
|
||||||
return nil, errors.New("handle is nil")
|
|
||||||
}
|
|
||||||
cMessageId := C.CString(string(messageId))
|
|
||||||
defer C.free(unsafe.Pointer(cMessageId))
|
|
||||||
|
|
||||||
var cMessagePtr unsafe.Pointer
|
|
||||||
if len(message) > 0 {
|
|
||||||
cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed
|
|
||||||
defer C.free(cMessagePtr)
|
|
||||||
} else {
|
|
||||||
cMessagePtr = nil
|
|
||||||
}
|
|
||||||
cMessageLen := C.size_t(len(message))
|
|
||||||
|
|
||||||
cWrapResult := C.WrapOutgoingMessage(unsafe.Pointer(handle), cMessagePtr, cMessageLen, cMessageId)
|
|
||||||
|
|
||||||
if !cWrapResult.base_result.is_ok {
|
|
||||||
errMsg := C.GoString(cWrapResult.base_result.error_message)
|
|
||||||
C.callFreeCWrapResult(cWrapResult) // Free error and potentially allocated message
|
|
||||||
return nil, errors.New(errMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy the wrapped message from C memory to Go slice
|
|
||||||
// Explicitly cast the message pointer to unsafe.Pointer
|
|
||||||
wrappedMessage := C.GoBytes(unsafe.Pointer(cWrapResult.message), C.int(cWrapResult.message_len))
|
|
||||||
C.callFreeCWrapResult(cWrapResult) // Free the C-allocated message buffer
|
|
||||||
|
|
||||||
return wrappedMessage, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnwrapReceivedMessage unwraps a received message
|
|
||||||
func UnwrapReceivedMessage(handle ReliabilityManagerHandle, message []byte) ([]byte, []MessageID, error) {
|
|
||||||
if handle == nil {
|
|
||||||
return nil, nil, errors.New("handle is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
var cMessagePtr unsafe.Pointer
|
|
||||||
if len(message) > 0 {
|
|
||||||
cMessagePtr = C.CBytes(message)
|
|
||||||
defer C.free(cMessagePtr)
|
|
||||||
} else {
|
|
||||||
cMessagePtr = nil
|
|
||||||
}
|
|
||||||
cMessageLen := C.size_t(len(message))
|
|
||||||
|
|
||||||
cUnwrapResult := C.UnwrapReceivedMessage(unsafe.Pointer(handle), cMessagePtr, cMessageLen)
|
|
||||||
|
|
||||||
if !cUnwrapResult.base_result.is_ok {
|
|
||||||
errMsg := C.GoString(cUnwrapResult.base_result.error_message)
|
|
||||||
C.callFreeCUnwrapResult(cUnwrapResult) // Free error and potentially allocated fields
|
|
||||||
return nil, nil, errors.New(errMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy unwrapped message content
|
|
||||||
unwrappedContent := C.GoBytes(unsafe.Pointer(cUnwrapResult.message), C.int(cUnwrapResult.message_len))
|
|
||||||
|
|
||||||
// Copy missing dependencies
|
|
||||||
missingDeps := make([]MessageID, cUnwrapResult.missing_deps_count)
|
|
||||||
if cUnwrapResult.missing_deps_count > 0 {
|
|
||||||
// Convert C array of C strings to Go slice of strings
|
|
||||||
cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(cUnwrapResult.missing_deps))[:cUnwrapResult.missing_deps_count:cUnwrapResult.missing_deps_count]
|
|
||||||
for i, s := range cDepsArray {
|
|
||||||
missingDeps[i] = MessageID(C.GoString(s))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
C.callFreeCUnwrapResult(cUnwrapResult) // Free C-allocated message, deps array, and strings
|
|
||||||
|
|
||||||
return unwrappedContent, missingDeps, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// MarkDependenciesMet informs the library that dependencies are met
|
|
||||||
func MarkDependenciesMet(handle ReliabilityManagerHandle, messageIDs []MessageID) error {
|
|
||||||
if handle == nil {
|
|
||||||
return errors.New("handle is nil")
|
|
||||||
}
|
|
||||||
if len(messageIDs) == 0 {
|
|
||||||
return nil // Nothing to do
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert Go string slice to C array of C strings (char**)
|
|
||||||
cMessageIDs := make([]*C.char, len(messageIDs))
|
|
||||||
for i, id := range messageIDs {
|
|
||||||
cMessageIDs[i] = C.CString(string(id))
|
|
||||||
defer C.free(unsafe.Pointer(cMessageIDs[i])) // Ensure each CString is freed
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a pointer (**C.char) to the first element of the slice
|
|
||||||
var cMessageIDsPtr **C.char
|
|
||||||
if len(cMessageIDs) > 0 {
|
|
||||||
cMessageIDsPtr = &cMessageIDs[0]
|
|
||||||
} else {
|
|
||||||
cMessageIDsPtr = nil // Handle empty slice case
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pass the pointer variable (cMessageIDsPtr) directly, which is of type **C.char
|
|
||||||
cResult := C.MarkDependenciesMet(unsafe.Pointer(handle), cMessageIDsPtr, C.size_t(len(messageIDs)))
|
|
||||||
|
|
||||||
if !cResult.is_ok {
|
|
||||||
errMsg := C.GoString(cResult.error_message)
|
|
||||||
C.callFreeCResultError(cResult)
|
|
||||||
return errors.New(errMsg)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterCallback sets the single Go callback relay function
|
|
||||||
func RegisterCallback(handle ReliabilityManagerHandle, callbacks Callbacks) error {
|
|
||||||
if handle == nil {
|
|
||||||
return errors.New("handle is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store the Go callbacks associated with this handle
|
|
||||||
registryMutex.Lock()
|
|
||||||
callbackRegistry[handle] = &callbacks
|
|
||||||
registryMutex.Unlock()
|
|
||||||
|
|
||||||
// Register the single global Go relay function with the Nim library
|
|
||||||
// Nim will call globalCallbackRelay, passing the handle as the first argument.
|
|
||||||
C.RegisterCallback(
|
|
||||||
unsafe.Pointer(handle),
|
|
||||||
(C.CEventCallback)(C.globalCallbackRelay), // Pass the Go relay function pointer
|
|
||||||
nil, // user_data is not used here, handle is passed directly by Nim wrapper
|
|
||||||
)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartPeriodicTasks starts the background tasks in the Nim library
|
|
||||||
func StartPeriodicTasks(handle ReliabilityManagerHandle) error {
|
|
||||||
if handle == nil {
|
|
||||||
return errors.New("handle is nil")
|
|
||||||
}
|
|
||||||
C.StartPeriodicTasks(unsafe.Pointer(handle))
|
|
||||||
// Assuming StartPeriodicTasks doesn't return an error status in C API
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// globalCallbackRelay is called by Nim for all events.
|
|
||||||
// It uses the handle to find the correct Go Callbacks struct and dispatch the call.
|
|
||||||
//export globalCallbackRelay
|
|
||||||
func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 unsafe.Pointer, data2 unsafe.Pointer, data3 C.size_t) {
|
|
||||||
goHandle := ReliabilityManagerHandle(handle)
|
|
||||||
|
|
||||||
registryMutex.RLock()
|
|
||||||
callbacks, ok := callbackRegistry[goHandle]
|
|
||||||
registryMutex.RUnlock()
|
|
||||||
|
|
||||||
if !ok || callbacks == nil {
|
|
||||||
fmt.Printf("Go: globalCallbackRelay: No callbacks registered for handle %v\n", goHandle) // Uncommented
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use a goroutine to avoid blocking the Nim thread
|
|
||||||
go func() {
|
|
||||||
switch eventType {
|
|
||||||
case C.EVENT_MESSAGE_READY:
|
|
||||||
if callbacks.OnMessageReady != nil {
|
|
||||||
msgIdStr := C.GoString((*C.char)(data1))
|
|
||||||
callbacks.OnMessageReady(MessageID(msgIdStr))
|
|
||||||
}
|
|
||||||
case C.EVENT_MESSAGE_SENT:
|
|
||||||
if callbacks.OnMessageSent != nil {
|
|
||||||
msgIdStr := C.GoString((*C.char)(data1))
|
|
||||||
callbacks.OnMessageSent(MessageID(msgIdStr))
|
|
||||||
}
|
|
||||||
case C.EVENT_MISSING_DEPENDENCIES:
|
|
||||||
if callbacks.OnMissingDependencies != nil {
|
|
||||||
msgIdStr := C.GoString((*C.char)(data1))
|
|
||||||
depsCount := int(data3)
|
|
||||||
deps := make([]MessageID, depsCount)
|
|
||||||
if depsCount > 0 {
|
|
||||||
// Convert C array of C strings (**char) to Go slice
|
|
||||||
cDepsArray := (*[1 << 30]*C.char)(data2)[:depsCount:depsCount]
|
|
||||||
for i, s := range cDepsArray {
|
|
||||||
deps[i] = MessageID(C.GoString(s))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
callbacks.OnMissingDependencies(MessageID(msgIdStr), deps)
|
|
||||||
}
|
|
||||||
case C.EVENT_PERIODIC_SYNC:
|
|
||||||
if callbacks.OnPeriodicSync != nil {
|
|
||||||
callbacks.OnPeriodicSync()
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
fmt.Printf("Go: globalCallbackRelay: Received unknown event type %d for handle %v\n", eventType, goHandle)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
@ -1,259 +0,0 @@
|
|||||||
package main
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Test basic creation, cleanup, and reset
|
|
||||||
func TestLifecycle(t *testing.T) {
|
|
||||||
channelID := "test-lifecycle"
|
|
||||||
handle, err := NewReliabilityManager(channelID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("NewReliabilityManager failed: %v", err)
|
|
||||||
}
|
|
||||||
if handle == nil {
|
|
||||||
t.Fatal("NewReliabilityManager returned a nil handle")
|
|
||||||
}
|
|
||||||
defer CleanupReliabilityManager(handle) // Ensure cleanup even on test failure
|
|
||||||
|
|
||||||
err = ResetReliabilityManager(handle)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("ResetReliabilityManager failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test wrapping and unwrapping a simple message
|
|
||||||
func TestWrapUnwrap(t *testing.T) {
|
|
||||||
channelID := "test-wrap-unwrap"
|
|
||||||
handle, err := NewReliabilityManager(channelID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("NewReliabilityManager failed: %v", err)
|
|
||||||
}
|
|
||||||
defer CleanupReliabilityManager(handle)
|
|
||||||
|
|
||||||
originalPayload := []byte("hello reliability")
|
|
||||||
messageID := MessageID("msg-wrap-1")
|
|
||||||
|
|
||||||
wrappedMsg, err := WrapOutgoingMessage(handle, originalPayload, messageID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("WrapOutgoingMessage failed: %v", err)
|
|
||||||
}
|
|
||||||
if len(wrappedMsg) == 0 {
|
|
||||||
t.Fatal("WrapOutgoingMessage returned empty bytes")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simulate receiving the wrapped message
|
|
||||||
unwrappedPayload, missingDeps, err := UnwrapReceivedMessage(handle, wrappedMsg)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("UnwrapReceivedMessage failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(unwrappedPayload) != string(originalPayload) {
|
|
||||||
t.Errorf("Unwrapped payload mismatch: got %q, want %q", unwrappedPayload, originalPayload)
|
|
||||||
}
|
|
||||||
if len(missingDeps) != 0 {
|
|
||||||
t.Errorf("Expected 0 missing dependencies, got %d: %v", len(missingDeps), missingDeps)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test dependency handling
|
|
||||||
func TestDependencies(t *testing.T) {
|
|
||||||
channelID := "test-deps"
|
|
||||||
handle, err := NewReliabilityManager(channelID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("NewReliabilityManager failed: %v", err)
|
|
||||||
}
|
|
||||||
defer CleanupReliabilityManager(handle)
|
|
||||||
|
|
||||||
// 1. Send message 1 (will become a dependency)
|
|
||||||
payload1 := []byte("message one")
|
|
||||||
msgID1 := MessageID("msg-dep-1")
|
|
||||||
wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
|
|
||||||
}
|
|
||||||
// Simulate receiving msg1 to add it to history (implicitly acknowledges it)
|
|
||||||
_, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Send message 2 (depends on message 1 implicitly via causal history)
|
|
||||||
payload2 := []byte("message two")
|
|
||||||
msgID2 := MessageID("msg-dep-2")
|
|
||||||
wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Create a new manager to simulate a different peer receiving msg2 without msg1
|
|
||||||
handle2, err := NewReliabilityManager(channelID) // Same channel ID
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("NewReliabilityManager (2) failed: %v", err)
|
|
||||||
}
|
|
||||||
defer CleanupReliabilityManager(handle2)
|
|
||||||
|
|
||||||
// 4. Unwrap message 2 on the second manager - should report msg1 as missing
|
|
||||||
_, missingDeps, err := UnwrapReceivedMessage(handle2, wrappedMsg2)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("UnwrapReceivedMessage (2) on handle2 failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(missingDeps) == 0 {
|
|
||||||
t.Fatalf("Expected missing dependencies, got none")
|
|
||||||
}
|
|
||||||
foundDep1 := false
|
|
||||||
for _, dep := range missingDeps {
|
|
||||||
if dep == msgID1 {
|
|
||||||
foundDep1 = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !foundDep1 {
|
|
||||||
t.Errorf("Expected missing dependency %q, got %v", msgID1, missingDeps)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 5. Mark the dependency as met
|
|
||||||
err = MarkDependenciesMet(handle2, []MessageID{msgID1})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("MarkDependenciesMet failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test callbacks
|
|
||||||
func TestCallbacks(t *testing.T) {
|
|
||||||
channelID := "test-callbacks"
|
|
||||||
handle, err := NewReliabilityManager(channelID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("NewReliabilityManager failed: %v", err)
|
|
||||||
}
|
|
||||||
defer CleanupReliabilityManager(handle)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
receivedReady := make(map[MessageID]bool)
|
|
||||||
receivedSent := make(map[MessageID]bool)
|
|
||||||
receivedMissing := make(map[MessageID][]MessageID)
|
|
||||||
syncRequested := false
|
|
||||||
var cbMutex sync.Mutex // Protect access to callback tracking maps/vars
|
|
||||||
|
|
||||||
callbacks := Callbacks{
|
|
||||||
OnMessageReady: func(messageId MessageID) {
|
|
||||||
fmt.Printf("Test: OnMessageReady received: %s\n", messageId)
|
|
||||||
cbMutex.Lock()
|
|
||||||
receivedReady[messageId] = true
|
|
||||||
cbMutex.Unlock()
|
|
||||||
wg.Done()
|
|
||||||
},
|
|
||||||
OnMessageSent: func(messageId MessageID) {
|
|
||||||
fmt.Printf("Test: OnMessageSent received: %s\n", messageId)
|
|
||||||
cbMutex.Lock()
|
|
||||||
receivedSent[messageId] = true
|
|
||||||
cbMutex.Unlock()
|
|
||||||
wg.Done()
|
|
||||||
},
|
|
||||||
OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) {
|
|
||||||
fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps)
|
|
||||||
cbMutex.Lock()
|
|
||||||
receivedMissing[messageId] = missingDeps
|
|
||||||
cbMutex.Unlock()
|
|
||||||
wg.Done()
|
|
||||||
},
|
|
||||||
OnPeriodicSync: func() {
|
|
||||||
fmt.Println("Test: OnPeriodicSync received")
|
|
||||||
cbMutex.Lock()
|
|
||||||
syncRequested = true
|
|
||||||
cbMutex.Unlock()
|
|
||||||
// Don't wg.Done() here, it might be called multiple times
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err = RegisterCallback(handle, callbacks)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("RegisterCallback failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start tasks AFTER registering callbacks
|
|
||||||
err = StartPeriodicTasks(handle)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("StartPeriodicTasks failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Test Scenario ---
|
|
||||||
|
|
||||||
// 1. Send msg1
|
|
||||||
wg.Add(1) // Expect OnMessageSent for msg1 eventually
|
|
||||||
payload1 := []byte("callback test 1")
|
|
||||||
msgID1 := MessageID("cb-msg-1")
|
|
||||||
wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("WrapOutgoingMessage (1) failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history)
|
|
||||||
wg.Add(1) // Expect OnMessageReady for msg1
|
|
||||||
_, _, err = UnwrapReceivedMessage(handle, wrappedMsg1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Send msg2 (depends on msg1)
|
|
||||||
wg.Add(1) // Expect OnMessageSent for msg2 eventually
|
|
||||||
payload2 := []byte("callback test 2")
|
|
||||||
msgID2 := MessageID("cb-msg-2")
|
|
||||||
wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("WrapOutgoingMessage (2) failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2)
|
|
||||||
wg.Add(1) // Expect OnMessageReady for msg2
|
|
||||||
_, _, err = UnwrapReceivedMessage(handle, wrappedMsg2)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Verification ---
|
|
||||||
// Wait for expected callbacks with a timeout
|
|
||||||
waitTimeout(&wg, 5*time.Second, t)
|
|
||||||
|
|
||||||
cbMutex.Lock()
|
|
||||||
defer cbMutex.Unlock()
|
|
||||||
|
|
||||||
if !receivedReady[msgID1] {
|
|
||||||
t.Errorf("OnMessageReady not called for %s", msgID1)
|
|
||||||
}
|
|
||||||
if !receivedReady[msgID2] {
|
|
||||||
t.Errorf("OnMessageReady not called for %s", msgID2)
|
|
||||||
}
|
|
||||||
if !receivedSent[msgID1] {
|
|
||||||
t.Errorf("OnMessageSent not called for %s", msgID1)
|
|
||||||
}
|
|
||||||
if !receivedSent[msgID2] {
|
|
||||||
t.Errorf("OnMessageSent not called for %s", msgID2)
|
|
||||||
}
|
|
||||||
// We didn't explicitly test missing deps in this path
|
|
||||||
if len(receivedMissing) > 0 {
|
|
||||||
t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing)
|
|
||||||
}
|
|
||||||
// Periodic sync is harder to guarantee in a short test, just check if it was ever true
|
|
||||||
if !syncRequested {
|
|
||||||
t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper function to wait for WaitGroup with a timeout
|
|
||||||
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) {
|
|
||||||
c := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(c)
|
|
||||||
wg.Wait()
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-c:
|
|
||||||
// Completed normally
|
|
||||||
case <-time.After(timeout):
|
|
||||||
t.Fatalf("Timed out waiting for callbacks")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -2,7 +2,9 @@ import std/[times, locks, tables, sets]
|
|||||||
import chronos, results
|
import chronos, results
|
||||||
import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter]
|
import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter]
|
||||||
|
|
||||||
proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager, ReliabilityError] =
|
proc newReliabilityManager*(
|
||||||
|
channelId: string, config: ReliabilityConfig = defaultConfig()
|
||||||
|
): Result[ReliabilityManager, ReliabilityError] =
|
||||||
## Creates a new ReliabilityManager with the specified channel ID and configuration.
|
## Creates a new ReliabilityManager with the specified channel ID and configuration.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -13,14 +15,12 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau
|
|||||||
## A Result containing either a new ReliabilityManager instance or an error.
|
## A Result containing either a new ReliabilityManager instance or an error.
|
||||||
if channelId.len == 0:
|
if channelId.len == 0:
|
||||||
return err(reInvalidArgument)
|
return err(reInvalidArgument)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
let bloomFilter = newRollingBloomFilter(
|
let bloomFilter = newRollingBloomFilter(
|
||||||
config.bloomFilterCapacity,
|
config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow
|
||||||
config.bloomFilterErrorRate,
|
|
||||||
config.bloomFilterWindow
|
|
||||||
)
|
)
|
||||||
|
|
||||||
let rm = ReliabilityManager(
|
let rm = ReliabilityManager(
|
||||||
lamportTimestamp: 0,
|
lamportTimestamp: 0,
|
||||||
messageHistory: @[],
|
messageHistory: @[],
|
||||||
@ -28,7 +28,7 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau
|
|||||||
outgoingBuffer: @[],
|
outgoingBuffer: @[],
|
||||||
incomingBuffer: @[],
|
incomingBuffer: @[],
|
||||||
channelId: channelId,
|
channelId: channelId,
|
||||||
config: config
|
config: config,
|
||||||
)
|
)
|
||||||
initLock(rm.lock)
|
initLock(rm.lock)
|
||||||
return ok(rm)
|
return ok(rm)
|
||||||
@ -40,27 +40,25 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) =
|
|||||||
while i < rm.outgoingBuffer.len:
|
while i < rm.outgoingBuffer.len:
|
||||||
var acknowledged = false
|
var acknowledged = false
|
||||||
let outMsg = rm.outgoingBuffer[i]
|
let outMsg = rm.outgoingBuffer[i]
|
||||||
|
|
||||||
# Check if message is in causal history
|
# Check if message is in causal history
|
||||||
for msgID in msg.causalHistory:
|
for msgID in msg.causalHistory:
|
||||||
if outMsg.message.messageId == msgID:
|
if outMsg.message.messageId == msgID:
|
||||||
acknowledged = true
|
acknowledged = true
|
||||||
break
|
break
|
||||||
|
|
||||||
# Check bloom filter if not already acknowledged
|
# Check bloom filter if not already acknowledged
|
||||||
if not acknowledged and msg.bloomFilter.len > 0:
|
if not acknowledged and msg.bloomFilter.len > 0:
|
||||||
let bfResult = deserializeBloomFilter(msg.bloomFilter)
|
let bfResult = deserializeBloomFilter(msg.bloomFilter)
|
||||||
if bfResult.isOk:
|
if bfResult.isOk:
|
||||||
var rbf = RollingBloomFilter(
|
var rbf = RollingBloomFilter(
|
||||||
filter: bfResult.get(),
|
filter: bfResult.get(), window: rm.bloomFilter.window, messages: @[]
|
||||||
window: rm.bloomFilter.window,
|
|
||||||
messages: @[]
|
|
||||||
)
|
)
|
||||||
if rbf.contains(outMsg.message.messageId):
|
if rbf.contains(outMsg.message.messageId):
|
||||||
acknowledged = true
|
acknowledged = true
|
||||||
else:
|
else:
|
||||||
logError("Failed to deserialize bloom filter")
|
logError("Failed to deserialize bloom filter")
|
||||||
|
|
||||||
if acknowledged:
|
if acknowledged:
|
||||||
if rm.onMessageSent != nil:
|
if rm.onMessageSent != nil:
|
||||||
rm.onMessageSent(outMsg.message.messageId)
|
rm.onMessageSent(outMsg.message.messageId)
|
||||||
@ -68,7 +66,9 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) =
|
|||||||
else:
|
else:
|
||||||
inc i
|
inc i
|
||||||
|
|
||||||
proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: MessageID): Result[seq[byte], ReliabilityError] =
|
proc wrapOutgoingMessage*(
|
||||||
|
rm: ReliabilityManager, message: seq[byte], messageId: MessageID
|
||||||
|
): Result[seq[byte], ReliabilityError] =
|
||||||
## Wraps an outgoing message with reliability metadata.
|
## Wraps an outgoing message with reliability metadata.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -84,7 +84,7 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId:
|
|||||||
withLock rm.lock:
|
withLock rm.lock:
|
||||||
try:
|
try:
|
||||||
rm.updateLamportTimestamp(getTime().toUnix)
|
rm.updateLamportTimestamp(getTime().toUnix)
|
||||||
|
|
||||||
# Serialize current bloom filter
|
# Serialize current bloom filter
|
||||||
var bloomBytes: seq[byte]
|
var bloomBytes: seq[byte]
|
||||||
let bfResult = serializeBloomFilter(rm.bloomFilter.filter)
|
let bfResult = serializeBloomFilter(rm.bloomFilter.filter)
|
||||||
@ -100,15 +100,13 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId:
|
|||||||
causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory),
|
causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory),
|
||||||
channelId: rm.channelId,
|
channelId: rm.channelId,
|
||||||
content: message,
|
content: message,
|
||||||
bloomFilter: bloomBytes
|
bloomFilter: bloomBytes,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Add to outgoing buffer
|
# Add to outgoing buffer
|
||||||
rm.outgoingBuffer.add(UnacknowledgedMessage(
|
rm.outgoingBuffer.add(
|
||||||
message: msg,
|
UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)
|
||||||
sendTime: getTime(),
|
)
|
||||||
resendAttempts: 0
|
|
||||||
))
|
|
||||||
|
|
||||||
# Add to causal history and bloom filter
|
# Add to causal history and bloom filter
|
||||||
rm.bloomFilter.add(msg.messageId)
|
rm.bloomFilter.add(msg.messageId)
|
||||||
@ -156,7 +154,7 @@ proc processIncomingBuffer(rm: ReliabilityManager) =
|
|||||||
if rm.onMessageReady != nil:
|
if rm.onMessageReady != nil:
|
||||||
rm.onMessageReady(msg.messageId)
|
rm.onMessageReady(msg.messageId)
|
||||||
processed.incl(msgId)
|
processed.incl(msgId)
|
||||||
|
|
||||||
# Add any dependent messages that might now be ready
|
# Add any dependent messages that might now be ready
|
||||||
if msgId in dependencies:
|
if msgId in dependencies:
|
||||||
for dependentId in dependencies[msgId]:
|
for dependentId in dependencies[msgId]:
|
||||||
@ -170,7 +168,11 @@ proc processIncomingBuffer(rm: ReliabilityManager) =
|
|||||||
|
|
||||||
rm.incomingBuffer = newIncomingBuffer
|
rm.incomingBuffer = newIncomingBuffer
|
||||||
|
|
||||||
proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] =
|
proc unwrapReceivedMessage*(
|
||||||
|
rm: ReliabilityManager, message: seq[byte]
|
||||||
|
): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] {.
|
||||||
|
gcsafe
|
||||||
|
.} =
|
||||||
## Unwraps a received message and processes its reliability metadata.
|
## Unwraps a received message and processes its reliability metadata.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -182,7 +184,7 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[
|
|||||||
let msgResult = deserializeMessage(message)
|
let msgResult = deserializeMessage(message)
|
||||||
if not msgResult.isOk:
|
if not msgResult.isOk:
|
||||||
return err(msgResult.error)
|
return err(msgResult.error)
|
||||||
|
|
||||||
let msg = msgResult.get
|
let msg = msgResult.get
|
||||||
if rm.bloomFilter.contains(msg.messageId):
|
if rm.bloomFilter.contains(msg.messageId):
|
||||||
return ok((msg.content, @[]))
|
return ok((msg.content, @[]))
|
||||||
@ -225,7 +227,9 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[
|
|||||||
except:
|
except:
|
||||||
return err(reInternalError)
|
return err(reInternalError)
|
||||||
|
|
||||||
proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): Result[void, ReliabilityError] =
|
proc markDependenciesMet*(
|
||||||
|
rm: ReliabilityManager, messageIds: seq[MessageID]
|
||||||
|
): Result[void, ReliabilityError] =
|
||||||
## Marks the specified message dependencies as met.
|
## Marks the specified message dependencies as met.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -240,16 +244,18 @@ proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): R
|
|||||||
rm.bloomFilter.add(msgId)
|
rm.bloomFilter.add(msgId)
|
||||||
# rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application?
|
# rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application?
|
||||||
rm.processIncomingBuffer()
|
rm.processIncomingBuffer()
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
except:
|
except:
|
||||||
return err(reInternalError)
|
return err(reInternalError)
|
||||||
|
|
||||||
proc setCallbacks*(rm: ReliabilityManager,
|
proc setCallbacks*(
|
||||||
onMessageReady: proc(messageId: MessageID) {.gcsafe.},
|
rm: ReliabilityManager,
|
||||||
onMessageSent: proc(messageId: MessageID) {.gcsafe.},
|
onMessageReady: MessageReadyCallback,
|
||||||
onMissingDependencies: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.},
|
onMessageSent: MessageSentCallback,
|
||||||
onPeriodicSync: PeriodicSyncCallback = nil) =
|
onMissingDependencies: MissingDependenciesCallback,
|
||||||
|
onPeriodicSync: PeriodicSyncCallback = nil,
|
||||||
|
) =
|
||||||
## Sets the callback functions for various events in the ReliabilityManager.
|
## Sets the callback functions for various events in the ReliabilityManager.
|
||||||
##
|
##
|
||||||
## Parameters:
|
## Parameters:
|
||||||
@ -268,7 +274,7 @@ proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} =
|
|||||||
withLock rm.lock:
|
withLock rm.lock:
|
||||||
let now = getTime()
|
let now = getTime()
|
||||||
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]
|
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for unackMsg in rm.outgoingBuffer:
|
for unackMsg in rm.outgoingBuffer:
|
||||||
let elapsed = now - unackMsg.sendTime
|
let elapsed = now - unackMsg.sendTime
|
||||||
@ -298,7 +304,7 @@ proc periodicBufferSweep(rm: ReliabilityManager) {.async: (raises: [CancelledErr
|
|||||||
rm.cleanBloomFilter()
|
rm.cleanBloomFilter()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logError("Error in periodic buffer sweep: " & e.msg)
|
logError("Error in periodic buffer sweep: " & e.msg)
|
||||||
|
|
||||||
await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))
|
await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))
|
||||||
|
|
||||||
proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} =
|
proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} =
|
||||||
@ -333,10 +339,9 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE
|
|||||||
rm.outgoingBuffer.setLen(0)
|
rm.outgoingBuffer.setLen(0)
|
||||||
rm.incomingBuffer.setLen(0)
|
rm.incomingBuffer.setLen(0)
|
||||||
rm.bloomFilter = newRollingBloomFilter(
|
rm.bloomFilter = newRollingBloomFilter(
|
||||||
rm.config.bloomFilterCapacity,
|
rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate,
|
||||||
rm.config.bloomFilterErrorRate,
|
rm.config.bloomFilterWindow,
|
||||||
rm.config.bloomFilterWindow
|
|
||||||
)
|
)
|
||||||
return ok()
|
return ok()
|
||||||
except:
|
except:
|
||||||
return err(reInternalError)
|
return err(reInternalError)
|
||||||
|
|||||||
@ -2,8 +2,21 @@ import std/[times, locks]
|
|||||||
import ./[rolling_bloom_filter, message]
|
import ./[rolling_bloom_filter, message]
|
||||||
|
|
||||||
type
|
type
|
||||||
|
MessageReadyCallback* = proc(messageId: MessageID) {.gcsafe.}
|
||||||
|
|
||||||
|
MessageSentCallback* = proc(messageId: MessageID) {.gcsafe.}
|
||||||
|
|
||||||
|
MissingDependenciesCallback* =
|
||||||
|
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
|
||||||
|
|
||||||
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
|
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
|
||||||
|
|
||||||
|
AppCallbacks* = ref object
|
||||||
|
messageReadyCb*: MessageReadyCallback
|
||||||
|
messageSentCb*: MessageSentCallback
|
||||||
|
missingDependenciesCb*: MissingDependenciesCallback
|
||||||
|
periodicSyncCb*: PeriodicSyncCallback
|
||||||
|
|
||||||
ReliabilityConfig* = object
|
ReliabilityConfig* = object
|
||||||
bloomFilterCapacity*: int
|
bloomFilterCapacity*: int
|
||||||
bloomFilterErrorRate*: float
|
bloomFilterErrorRate*: float
|
||||||
@ -24,9 +37,10 @@ type
|
|||||||
channelId*: string
|
channelId*: string
|
||||||
config*: ReliabilityConfig
|
config*: ReliabilityConfig
|
||||||
lock*: Lock
|
lock*: Lock
|
||||||
onMessageReady*: proc(messageId: MessageID)
|
onMessageReady*: proc(messageId: MessageID) {.gcsafe.}
|
||||||
onMessageSent*: proc(messageId: MessageID)
|
onMessageSent*: proc(messageId: MessageID) {.gcsafe.}
|
||||||
onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID])
|
onMissingDependencies*:
|
||||||
|
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}
|
||||||
onPeriodicSync*: proc()
|
onPeriodicSync*: proc()
|
||||||
|
|
||||||
ReliabilityError* = enum
|
ReliabilityError* = enum
|
||||||
@ -51,7 +65,7 @@ proc defaultConfig*(): ReliabilityConfig =
|
|||||||
resendInterval: DefaultResendInterval,
|
resendInterval: DefaultResendInterval,
|
||||||
maxResendAttempts: DefaultMaxResendAttempts,
|
maxResendAttempts: DefaultMaxResendAttempts,
|
||||||
syncMessageInterval: DefaultSyncMessageInterval,
|
syncMessageInterval: DefaultSyncMessageInterval,
|
||||||
bufferSweepInterval: DefaultBufferSweepInterval
|
bufferSweepInterval: DefaultBufferSweepInterval,
|
||||||
)
|
)
|
||||||
|
|
||||||
proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
|
proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
|
||||||
@ -76,7 +90,9 @@ proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [
|
|||||||
if rm.messageHistory.len > rm.config.maxMessageHistory:
|
if rm.messageHistory.len > rm.config.maxMessageHistory:
|
||||||
rm.messageHistory.delete(0)
|
rm.messageHistory.delete(0)
|
||||||
|
|
||||||
proc updateLamportTimestamp*(rm: ReliabilityManager, msgTs: int64) {.gcsafe, raises: [].} =
|
proc updateLamportTimestamp*(
|
||||||
|
rm: ReliabilityManager, msgTs: int64
|
||||||
|
) {.gcsafe, raises: [].} =
|
||||||
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1
|
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1
|
||||||
|
|
||||||
proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] =
|
proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] =
|
||||||
|
|||||||
1
vendor/nim-chronicles
vendored
Submodule
1
vendor/nim-chronicles
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit a8fb38a10bcb548df78e9a70bd77b26bb50abd12
|
||||||
1
vendor/nim-chronos
vendored
Submodule
1
vendor/nim-chronos
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit b55e2816eb45f698ddaca8d8473e401502562db2
|
||||||
1
vendor/nim-confutils
vendored
Submodule
1
vendor/nim-confutils
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit e214b3992a31acece6a9aada7d0a1ad37c928f3b
|
||||||
1
vendor/nim-faststreams
vendored
Submodule
1
vendor/nim-faststreams
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 2b08c774afaafd600cf4c6f994cf78b8aa090c0c
|
||||||
1
vendor/nim-json-serialization
vendored
Submodule
1
vendor/nim-json-serialization
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 2b1c5eb11df3647a2cee107cd4cce3593cbb8bcf
|
||||||
1
vendor/nim-libp2p
vendored
Submodule
1
vendor/nim-libp2p
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit ac25da6cea158768bbc060b7be2fbe004206f3bb
|
||||||
1
vendor/nim-results
vendored
Submodule
1
vendor/nim-results
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit df8113dda4c2d74d460a8fa98252b0b771bf1f27
|
||||||
1
vendor/nim-serialization
vendored
Submodule
1
vendor/nim-serialization
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 548d0adc9797a10b2db7f788b804330306293088
|
||||||
1
vendor/nim-stew
vendored
Submodule
1
vendor/nim-stew
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit d7a6868ba84165e7fdde427af9a1fc3f5f5cc151
|
||||||
1
vendor/nim-taskpools
vendored
Submodule
1
vendor/nim-taskpools
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 7b74a716a40249720fd7da428113147942b9642d
|
||||||
1
vendor/nimbus-build-system
vendored
Submodule
1
vendor/nimbus-build-system
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 5f10509cf880dc035e517ca7bac3163cd5206ba8
|
||||||
Loading…
x
Reference in New Issue
Block a user