diff --git a/.gitignore b/.gitignore index b8be30d..898171b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,12 @@ nph docs for_reference do_not_commit +build/* +sds.nims +/.update.timestamp + +# Nimbus Build System +nimbus-build-system.paths + +# Nimble packages +/vendor/.nimble diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..fe38e76 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,55 @@ +[submodule "vendor/nimbus-build-system"] + path = vendor/nimbus-build-system + url = https://github.com/status-im/nimbus-build-system.git + ignore = untracked + branch = master +[submodule "vendor/nim-chronos"] + path = vendor/nim-chronos + url = https://github.com/status-im/nim-chronos.git + ignore = untracked + branch = master +[submodule "vendor/nim-results"] + path = vendor/nim-results + url = https://github.com/arnetheduck/nim-results.git + ignore = untracked + branch = master +[submodule "vendor/nim-stew"] + path = vendor/nim-stew + url = https://github.com/status-im/nim-stew.git + ignore = untracked + branch = master +[submodule "vendor/nim-chronicles"] + path = vendor/nim-chronicles + url = https://github.com/status-im/nim-chronicles.git + ignore = untracked + branch = master +[submodule "vendor/nim-faststreams"] + path = vendor/nim-faststreams + url = https://github.com/status-im/nim-faststreams.git + ignore = untracked + branch = master +[submodule "vendor/nim-json-serialization"] + path = vendor/nim-json-serialization + url = https://github.com/status-im/nim-json-serialization.git + ignore = untracked + branch = master +[submodule "vendor/nim-serialization"] + path = vendor/nim-serialization + url = https://github.com/status-im/nim-serialization.git + ignore = untracked + branch = master +[submodule "vendor/nim-taskpools"] + path = vendor/nim-taskpools + url = https://github.com/status-im/nim-taskpools.git + ignore = untracked + branch = master +[submodule "vendor/nim-confutils"] + path = vendor/nim-confutils + url = https://github.com/status-im/nim-confutils.git + ignore = untracked + branch = master +[submodule "vendor/nim-libp2p"] + path = vendor/nim-libp2p + url = https://github.com/vacp2p/nim-libp2p.git + ignore = untracked + branch = master diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..31788b2 --- /dev/null +++ b/Makefile @@ -0,0 +1,51 @@ +.PHONY: libsds + +export BUILD_SYSTEM_DIR := vendor/nimbus-build-system +# we don't want an error here, so we can handle things later, in the ".DEFAULT" target +-include $(BUILD_SYSTEM_DIR)/makefiles/variables.mk + +ifeq ($(NIM_PARAMS),) +# "variables.mk" was not included, so we update the submodules. +GIT_SUBMODULE_UPDATE := git submodule update --init --recursive +.DEFAULT: + +@ echo -e "Git submodules not found. Running '$(GIT_SUBMODULE_UPDATE)'.\n"; \ + $(GIT_SUBMODULE_UPDATE); \ + echo +# Now that the included *.mk files appeared, and are newer than this file, Make will restart itself: +# https://www.gnu.org/software/make/manual/make.html#Remaking-Makefiles +# +# After restarting, it will execute its original goal, so we don't have to start a child Make here +# with "$(MAKE) $(MAKECMDGOALS)". Isn't hidden control flow great? + +else # "variables.mk" was included. Business as usual until the end of this file. + +# default target, because it's the first one that doesn't start with '.' +all: | libsds + +sds.nims: + ln -s sds.nimble $@ + +update: | update-common + rm -rf sds.nims && \ + $(MAKE) sds.nims $(HANDLE_OUTPUT) + +clean: + rm -rf build + +deps: | sds.nims + +# must be included after the default target +-include $(BUILD_SYSTEM_DIR)/makefiles/targets.mk + +STATIC ?= 0 + +libsds: deps + rm -f build/libsds* +ifeq ($(STATIC), 1) + echo -e $(BUILD_MSG) "build/$@.a" && \ + $(ENV_SCRIPT) nim libsdsStatic $(NIM_PARAMS) sds.nims +else + echo -e $(BUILD_MSG) "build/$@.so" && \ + $(ENV_SCRIPT) nim libsdsDynamic $(NIM_PARAMS) sds.nims +endif +endif \ No newline at end of file diff --git a/bindings/bindings.h b/bindings/bindings.h deleted file mode 100644 index 67fd179..0000000 --- a/bindings/bindings.h +++ /dev/null @@ -1,132 +0,0 @@ -#ifndef BINDINGS_H -#define BINDINGS_H - -#include // For size_t -#include // For standard integer types -#include // For bool type - -#ifdef __cplusplus -extern "C" { -#endif - - -// Opaque struct declaration (handle replaces direct pointer usage) -typedef struct ReliabilityManager ReliabilityManager; // Keep forward declaration - -// Define MessageID as a C string -typedef const char* MessageID; // Keep const for the typedef itself - -// --- Result Types --- - -typedef struct { - bool is_ok; - char* error_message; -} CResult; - -typedef struct { - CResult base_result; - unsigned char* message; - size_t message_len; - MessageID* missing_deps; - size_t missing_deps_count; -} CUnwrapResult; - -typedef struct { - CResult base_result; - unsigned char* message; - size_t message_len; -} CWrapResult; - - -// --- Callback Function Pointer Types --- - -// Define event types (enum or constants) -typedef enum { - EVENT_MESSAGE_READY = 1, - EVENT_MESSAGE_SENT = 2, - EVENT_MISSING_DEPENDENCIES = 3, - EVENT_PERIODIC_SYNC = 4 -} CEventType; - -// Single callback type for all events -// Nim will call this, passing the handle and event-specific data -typedef void (*CEventCallback)(void* handle, CEventType eventType, void* data1, void* data2, size_t data3); - - -// --- Core API Functions --- - -/** - * @brief Creates a new ReliabilityManager instance. - * @param channelId A unique identifier for the communication channel. - * @return An opaque handle (void*) representing the instance, or NULL on failure. - */ -void* NewReliabilityManager(char* channelId); - -/** - * @brief Cleans up resources associated with a ReliabilityManager instance. - * @param handle The opaque handle (void*) of the instance to clean up. - */ -void CleanupReliabilityManager(void* handle); - -/** - * @brief Resets the ReliabilityManager instance. - * @param handle The opaque handle (void*) of the instance. - * @return CResult indicating success or failure. - */ -CResult ResetReliabilityManager(void* handle); -/** - * @brief Wraps an outgoing message. - * @param handle The opaque handle (void*) of the instance. - * @param message Pointer to the raw message content. - * @param messageLen Length of the raw message content. - * @param messageId A unique identifier for this message. - * @return CWrapResult containing the wrapped message or an error. - */ -CWrapResult WrapOutgoingMessage(void* handle, void* message, size_t messageLen, char* messageId); -/** - * @brief Unwraps a received message. - * @param handle The opaque handle (void*) of the instance. - * @param message Pointer to the received message data. - * @param messageLen Length of the received message data. - * @return CUnwrapResult containing the unwrapped content, missing dependencies, or an error. - */ -CUnwrapResult UnwrapReceivedMessage(void* handle, void* message, size_t messageLen); - -/** - * @brief Marks specified message dependencies as met. - * @param handle The opaque handle (void*) of the instance. - * @param messageIDs An array of message IDs to mark as met. - * @param count The number of message IDs in the array. - * @return CResult indicating success or failure. - */ -CResult MarkDependenciesMet(void* handle, char** messageIDs, size_t count); // Reverted to char** - -/** - * @brief Registers callback functions. - * @param handle The opaque handle (void*) of the instance. - * @param messageReady Callback for when a message is ready. - * @param messageSent Callback for when an outgoing message is acknowledged. - * @param eventCallback The single callback function to handle all events. - * @param user_data A pointer to user-defined data (optional, could be managed in Go). - */ -void RegisterCallback(void* handle, CEventCallback eventCallback, void* user_data); // Renamed and simplified - -/** - * @brief Starts the background periodic tasks. - * @param handle The opaque handle (void*) of the instance. - */ -void StartPeriodicTasks(void* handle); - - -// --- Memory Freeing Functions --- - -void FreeCResultError(CResult result); -void FreeCWrapResult(CWrapResult result); -void FreeCUnwrapResult(CUnwrapResult result); - - -#ifdef __cplusplus -} // extern "C" -#endif - -#endif // BINDINGS_H diff --git a/bindings/bindings.nim b/bindings/bindings.nim deleted file mode 100644 index 04a0a44..0000000 --- a/bindings/bindings.nim +++ /dev/null @@ -1,318 +0,0 @@ -import std/[locks, typetraits, tables] # Added tables -import chronos -import results -import ../src/[reliability, reliability_utils, message] - -type - CReliabilityManagerHandle* = pointer - -type - # Callback Types (Imported from C Header) - CEventType* {.importc: "CEventType", header: "bindings.h", pure.} = enum - EVENT_MESSAGE_READY = 1, - EVENT_MESSAGE_SENT = 2, - EVENT_MISSING_DEPENDENCIES = 3, - EVENT_PERIODIC_SYNC = 4 - - CEventCallback* = proc(handle: pointer, eventType: CEventType, data1: pointer, data2: pointer, data3: csize_t) {.cdecl.} # Use csize_t - - CResult* {.importc: "CResult", header: "bindings.h", bycopy.} = object - is_ok*: bool - error_message*: cstring - - CWrapResult* {.importc: "CWrapResult", header: "bindings.h", bycopy.} = object - base_result*: CResult - message*: pointer - message_len*: csize_t - - CUnwrapResult* {.importc: "CUnwrapResult", header: "bindings.h", bycopy.} = object - base_result*: CResult - message*: pointer - message_len*: csize_t - missing_deps*: ptr cstring - missing_deps_count*: csize_t - -# --- Callback Registry --- -type - CallbackRegistry = Table[CReliabilityManagerHandle, CEventCallback] - -var - callbackRegistry: CallbackRegistry - registryLock: Lock - -initLock(registryLock) - -# --- Memory Management Helpers --- - -proc allocCString*(s: string): cstring {.inline, gcsafe.} = - if s.len == 0: return nil - result = cast[cstring](allocShared(s.len + 1)) - copyMem(result, s.cstring, s.len + 1) - -proc allocSeqByte*(s: seq[byte]): (pointer, csize_t) {.inline, gcsafe.} = - if s.len == 0: return (nil, 0) - let len = s.len - let bufferPtr = allocShared(len) - if len > 0: - copyMem(bufferPtr, cast[pointer](s[0].unsafeAddr), len.Natural) - return (bufferPtr, len.csize_t) - -proc allocSeqCString*(s: seq[string]): (ptr cstring, csize_t) {.inline, gcsafe, cdecl.} = - if s.len == 0: return (nil, 0) - let count = s.len - # Allocate memory for 'count' cstring pointers, cast to ptr UncheckedArray - let arrPtr = cast[ptr UncheckedArray[cstring]](allocShared(count * sizeof(cstring))) - for i in 0.. C) --- -# These wrappers call the single global Go callback relay. - -proc nimMessageReadyCallback(rm: ReliabilityManager, messageId: MessageID) = - echo "[Nim Binding] nimMessageReadyCallback called for: ", messageId - let handle = cast[CReliabilityManagerHandle](rm) - var cb: CEventCallback - withLock registryLock: - if not callbackRegistry.hasKey(handle): - echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) - return - cb = callbackRegistry[handle] - - # Pass handle, event type, and messageId (as data1) - cb(handle, EVENT_MESSAGE_READY, cast[pointer](messageId.cstring), nil, 0) - -proc nimMessageSentCallback(rm: ReliabilityManager, messageId: MessageID) = - echo "[Nim Binding] nimMessageSentCallback called for: ", messageId - let handle = cast[CReliabilityManagerHandle](rm) - var cb: CEventCallback - withLock registryLock: - if not callbackRegistry.hasKey(handle): - echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) - return - cb = callbackRegistry[handle] - - cb(handle, EVENT_MESSAGE_SENT, cast[pointer](messageId.cstring), nil, 0) - -proc nimMissingDependenciesCallback(rm: ReliabilityManager, messageId: MessageID, missingDeps: seq[MessageID]) = - echo "[Nim Binding] nimMissingDependenciesCallback called for: ", messageId, " with deps: ", $missingDeps - let handle = cast[CReliabilityManagerHandle](rm) - var cb: CEventCallback - withLock registryLock: - if not callbackRegistry.hasKey(handle): - echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) - return - cb = callbackRegistry[handle] - - # Prepare data for the callback - var cDepsPtr: ptr cstring = nil - var cDepsCount: csize_t = 0 - var cDepsNim: seq[cstring] = @[] # Keep Nim seq alive during call - if missingDeps.len > 0: - cDepsNim = newSeq[cstring](missingDeps.len) - for i, dep in missingDeps: - cDepsNim[i] = dep.cstring # Nim GC manages these cstrings via the seq - cDepsPtr = cast[ptr cstring](cDepsNim[0].addr) - cDepsCount = missingDeps.len.csize_t - - cb(handle, EVENT_MISSING_DEPENDENCIES, cast[pointer](messageId.cstring), cast[pointer](cDepsPtr), cDepsCount) - -proc nimPeriodicSyncCallback(rm: ReliabilityManager) = - echo "[Nim Binding] nimPeriodicSyncCallback called" - let handle = cast[CReliabilityManagerHandle](rm) - var cb: CEventCallback - withLock registryLock: - if not callbackRegistry.hasKey(handle): - echo "[Nim Binding] No callback registered for handle: ", cast[int](handle) - return - cb = callbackRegistry[handle] - - cb(handle, EVENT_PERIODIC_SYNC, nil, nil, 0) - -# --- Exported C Functions - Using Opaque Pointer --- - -proc NewReliabilityManager*(channelIdCStr: cstring): CReliabilityManagerHandle {.exportc, dynlib, cdecl, gcsafe.} = - let channelId = $channelIdCStr - if channelId.len == 0: - echo "Error creating ReliabilityManager: Channel ID cannot be empty" - return nil # Return nil pointer - let rmResult = newReliabilityManager(channelId) - if rmResult.isOk: - let rm = rmResult.get() - # Assign anonymous procs that capture 'rm' and call the wrappers - # Ensure signatures match the non-gcsafe fields in ReliabilityManager - rm.onMessageReady = proc(msgId: MessageID) = nimMessageReadyCallback(rm, msgId) - rm.onMessageSent = proc(msgId: MessageID) = nimMessageSentCallback(rm, msgId) - rm.onMissingDependencies = proc(msgId: MessageID, deps: seq[MessageID]) = nimMissingDependenciesCallback(rm, msgId, deps) - rm.onPeriodicSync = proc() = nimPeriodicSyncCallback(rm) - - # Return the Nim ref object cast to the opaque pointer type - let handle = cast[CReliabilityManagerHandle](rm) - GC_ref(rm) # Prevent GC from moving the object while Go holds the handle - return handle - else: - echo "Error creating ReliabilityManager: ", rmResult.error - return nil # Return nil pointer - -proc CleanupReliabilityManager*(handle: CReliabilityManagerHandle) {.exportc, dynlib, cdecl.} = - let handlePtr = handle - if handlePtr != nil: - # Go side should handle removing the handle from its registry. - # We just need to unref the Nim object. - # No need to interact with gEventCallback here. - - # Cast opaque pointer back to Nim ref type - let rm = cast[ReliabilityManager](handlePtr) - cleanup(rm) # Call Nim cleanup - GC_unref(rm) # Allow GC to collect the object now that Go is done - else: - echo "Warning: CleanupReliabilityManager called with NULL handle" - -proc ResetReliabilityManager*(handle: CReliabilityManagerHandle): CResult {.exportc, dynlib, cdecl, gcsafe.} = - if handle == nil: - return toCResultErrStr("ReliabilityManager handle is NULL") - let rm = cast[ReliabilityManager](handle) - let result = resetReliabilityManager(rm) - if result.isOk: - return toCResultOk() - else: - return toCResultErr(result.error) - -proc WrapOutgoingMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t, messageIdCStr: cstring): CWrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe - if handle == nil: - return CWrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) - let rm = cast[ReliabilityManager](handle) - - if messageC == nil and messageLen > 0: - return CWrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0")) - if messageIdCStr == nil: - return CWrapResult(base_result: toCResultErrStr("Message ID pointer is NULL")) - - let messageId = $messageIdCStr - var messageNim: seq[byte] - if messageLen > 0: - messageNim = newSeq[byte](messageLen) - copyMem(messageNim[0].addr, messageC, messageLen.Natural) - else: - messageNim = @[] - - let wrapResult = wrapOutgoingMessage(rm, messageNim, messageId) - if wrapResult.isOk: - let (wrappedDataPtr, wrappedDataLen) = allocSeqByte(wrapResult.get()) - return CWrapResult( - base_result: toCResultOk(), - message: wrappedDataPtr, - message_len: wrappedDataLen - ) - else: - return CWrapResult(base_result: toCResultErr(wrapResult.error)) - -proc UnwrapReceivedMessage*(handle: CReliabilityManagerHandle, messageC: pointer, messageLen: csize_t): CUnwrapResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe - if handle == nil: - return CUnwrapResult(base_result: toCResultErrStr("ReliabilityManager handle is NULL")) - let rm = cast[ReliabilityManager](handle) - - if messageC == nil and messageLen > 0: - return CUnwrapResult(base_result: toCResultErrStr("Message pointer is NULL but length > 0")) - - var messageNim: seq[byte] - if messageLen > 0: - messageNim = newSeq[byte](messageLen) - copyMem(messageNim[0].addr, messageC, messageLen.Natural) - else: - messageNim = @[] - - let unwrapResult = unwrapReceivedMessage(rm, messageNim) - if unwrapResult.isOk: - let (unwrappedContent, missingDepsNim) = unwrapResult.get() - let (contentPtr, contentLen) = allocSeqByte(unwrappedContent) - let (depsPtr, depsCount) = allocSeqCString(missingDepsNim) - return CUnwrapResult( - base_result: toCResultOk(), - message: contentPtr, - message_len: contentLen, - missing_deps: depsPtr, - missing_deps_count: depsCount - ) - else: - return CUnwrapResult(base_result: toCResultErr(unwrapResult.error)) - -proc MarkDependenciesMet*(handle: CReliabilityManagerHandle, messageIDsC: ptr cstring, count: csize_t): CResult {.exportc, dynlib, cdecl.} = # Keep non-gcsafe - if handle == nil: - return toCResultErrStr("ReliabilityManager handle is NULL") - let rm = cast[ReliabilityManager](handle) - - if messageIDsC == nil and count > 0: - return toCResultErrStr("MessageIDs pointer is NULL but count > 0") - - var messageIDsNim = newSeq[string](count) - # Cast to ptr UncheckedArray for indexing - let messageIDsCArray = cast[ptr UncheckedArray[cstring]](messageIDsC) - for i in 0.. +#include + +// The possible returned values for the functions that return int +#define RET_OK 0 +#define RET_ERR 1 +#define RET_MISSING_CALLBACK 2 + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData); + + +// --- Core API Functions --- + + +void* NewReliabilityManager(const char* channelId, SdsCallBack callback, void* userData); + +void SetEventCallback(void* ctx, SdsCallBack callback, void* userData); + +int CleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData); + +int ResetReliabilityManager(void* ctx, SdsCallBack callback, void* userData); + +int WrapOutgoingMessage(void* ctx, + void* message, + size_t messageLen, + const char* messageId, + SdsCallBack callback, + void* userData); + +int UnwrapReceivedMessage(void* ctx, + void* message, + size_t messageLen, + SdsCallBack callback, + void* userData); + +int MarkDependenciesMet(void* ctx, + char** messageIDs, + size_t count, + SdsCallBack callback, + void* userData); + +int StartPeriodicTasks(void* ctx, SdsCallBack callback, void* userData); + + + +#ifdef __cplusplus +} +#endif + +#endif /* __libsds__ */ \ No newline at end of file diff --git a/library/libsds.nim b/library/libsds.nim new file mode 100644 index 0000000..5d9f669 --- /dev/null +++ b/library/libsds.nim @@ -0,0 +1,304 @@ +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +when defined(linux): + {.passl: "-Wl,-soname,libsds.so".} + +import std/[locks, typetraits, tables, atomics], chronos, chronicles +import + ./sds_thread/sds_thread, + ./alloc, + ./ffi_types, + ./sds_thread/inter_thread_communication/sds_thread_request, + ./sds_thread/inter_thread_communication/requests/ + [sds_lifecycle_request, sds_message_request, sds_dependencies_request], + ../src/[reliability, reliability_utils, message], + ./events/[ + json_message_ready_event, json_message_sent_event, json_missing_dependencies_event, + json_periodic_sync_event, + ] + +################################################################################ +### Wrapper around the reliability manager +################################################################################ + +################################################################################ +### Not-exported components + +template checkLibsdsParams*( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +) = + ctx[].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK + +template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped) = + if isNil(ctx[].eventCallback): + error eventName & " - eventCallback is nil" + return + + if isNil(ctx[].eventUserData): + error eventName & " - eventUserData is nil" + return + + foreignThreadGc: + try: + let event = body + cast[SdsCallBack](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except Exception, CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[SdsCallBack](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) + +proc handleRequest( + ctx: ptr SdsContext, + requestType: RequestType, + content: pointer, + callback: SdsCallBack, + userData: pointer, +): cint = + sds_thread.sendRequestToSdsThread(ctx, requestType, content, callback, userData).isOkOr: + let msg = "libsds error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + return RET_OK + +proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback = + return proc(messageId: MessageID) {.gcsafe.} = + callEventCallback(ctx, "onMessageReady"): + $JsonMessageReadyEvent.new(messageId) + +proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback = + return proc(messageId: MessageID) {.gcsafe.} = + callEventCallback(ctx, "onMessageSent"): + $JsonMessageSentEvent.new(messageId) + +proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback = + return proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + callEventCallback(ctx, "onMissingDependencies"): + $JsonMissingDependenciesEvent.new(messageId, missingDeps) + +proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback = + return proc() {.gcsafe.} = + callEventCallback(ctx, "onPeriodicSync"): + $JsonPeriodicSyncEvent.new() + +### End of not-exported components +################################################################################ + +################################################################################ +### Library setup + +# Every Nim library must have this function called - the name is derived from +# the `--nimMainPrefix` command line option +proc libsdsNimMain() {.importc.} + +# To control when the library has been initialized +var initialized: Atomic[bool] + +if defined(android): + # Redirect chronicles to Android System logs + when compiles(defaultChroniclesStream.outputs[0].writer): + defaultChroniclesStream.outputs[0].writer = proc( + logLevel: LogLevel, msg: LogOutputStr + ) {.raises: [].} = + echo logLevel, msg + +proc initializeLibrary() {.exported.} = + if not initialized.exchange(true): + ## Every Nim library needs to call `NimMain` once exactly, to initialize the Nim runtime. + ## Being `` the value given in the optional compilation flag --nimMainPrefix:yourprefix + libsdsNimMain() + when declared(setupForeignThreadGc): + setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + +### End of library setup +################################################################################ + +################################################################################ +### Exported procs + +proc NewReliabilityManager( + channelId: cstring, callback: SdsCallBack, userData: pointer +): pointer {.dynlib, exportc, cdecl.} = + initializeLibrary() + + ## Creates a new instance of the Reliability Manager. + if isNil(callback): + echo "error: missing callback in NewReliabilityManager" + return nil + + ## Create the SDS thread that will keep waiting for req from the main thread. + var ctx = sds_thread.createSdsThread().valueOr: + let msg = "Error in createSdsThread: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return nil + + ctx.userData = userData + + let appCallbacks = AppCallbacks( + messageReadyCb: onMessageReady(ctx), + messageSentCb: onMessageSent(ctx), + missingDependenciesCb: onMissingDependencies(ctx), + periodicSyncCb: onPeriodicSync(ctx), + ) + + let retCode = handleRequest( + ctx, + RequestType.LIFECYCLE, + SdsLifecycleRequest.createShared( + SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, channelId, appCallbacks + ), + callback, + userData, + ) + + if retCode == RET_ERR: + return nil + + return ctx + +proc SetEventCallback( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +) {.dynlib, exportc.} = + initializeLibrary() + ctx[].eventCallback = cast[pointer](callback) + ctx[].eventUserData = userData + +proc CleanupReliabilityManager( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + sds_thread.destroySdsThread(ctx).isOkOr: + let msg = "libsds error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + ## always need to invoke the callback although we don't retrieve value to the caller + callback(RET_OK, nil, 0, userData) + + return RET_OK + +proc ResetReliabilityManager( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + handleRequest( + ctx, + RequestType.LIFECYCLE, + SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER), + callback, + userData, + ) + +proc WrapOutgoingMessage( + ctx: ptr SdsContext, + message: pointer, + messageLen: csize_t, + messageId: cstring, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if message == nil and messageLen > 0: + let msg = "libsds error: " & "message pointer is NULL but length > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + if messageId == nil: + let msg = "libsds error: " & "message ID pointer is NULL" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + handleRequest( + ctx, + RequestType.MESSAGE, + SdsMessageRequest.createShared( + SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId + ), + callback, + userData, + ) + +proc UnwrapReceivedMessage( + ctx: ptr SdsContext, + message: pointer, + messageLen: csize_t, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if message == nil and messageLen > 0: + let msg = "libsds error: " & "message pointer is NULL but length > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + handleRequest( + ctx, + RequestType.MESSAGE, + SdsMessageRequest.createShared( + SdsMessageMsgType.UNWRAP_MESSAGE, message, messageLen + ), + callback, + userData, + ) + +proc MarkDependenciesMet( + ctx: ptr SdsContext, + messageIds: pointer, + count: csize_t, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if messageIds == nil and count > 0: + let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + handleRequest( + ctx, + RequestType.DEPENDENCIES, + SdsDependenciesRequest.createShared( + SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count + ), + callback, + userData, + ) + +proc StartPeriodicTasks( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + handleRequest( + ctx, + RequestType.LIFECYCLE, + SdsLifecycleRequest.createShared(SdsLifecycleMsgType.START_PERIODIC_TASKS), + callback, + userData, + ) + +### End of exported procs +################################################################################ diff --git a/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim new file mode 100644 index 0000000..ebb45c2 --- /dev/null +++ b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim @@ -0,0 +1,48 @@ +import std/[options, json, strutils, net, sequtils] +import chronos, chronicles, results, confutils, confutils/std/net + +import ../../../alloc +import ../../../../src/[reliability_utils, reliability, message] + +type SdsDependenciesMsgType* = enum + MARK_DEPENDENCIES_MET + +type SdsDependenciesRequest* = object + operation: SdsDependenciesMsgType + messageIds: SharedSeq[cstring] + count: csize_t + +proc createShared*( + T: type SdsDependenciesRequest, + op: SdsDependenciesMsgType, + messageIds: pointer, + count: csize_t = 0, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].count = count + ret[].messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int) + return ret + +proc destroyShared(self: ptr SdsDependenciesRequest) = + deallocSharedSeq(self[].messageIds) + deallocShared(self) + +proc process*( + self: ptr SdsDependenciesRequest, rm: ptr ReliabilityManager +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of MARK_DEPENDENCIES_MET: + let messageIdsC = self.messageIds.toSeq() + let messageIds = messageIdsC.mapIt($it) + + markDependenciesMet(rm[], messageIds).isOkOr: + error "MARK_DEPENDENCIES_MET failed", error = error + return err("error processing MARK_DEPENDENCIES_MET request: " & $error) + + return ok("") + + return ok("") diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim new file mode 100644 index 0000000..717e310 --- /dev/null +++ b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim @@ -0,0 +1,70 @@ +import std/[options, json, strutils, net] +import chronos, chronicles, results, confutils, confutils/std/net + +import ../../../alloc +import ../../../../src/[reliability_utils, reliability, message] + +type SdsLifecycleMsgType* = enum + CREATE_RELIABILITY_MANAGER + RESET_RELIABILITY_MANAGER + START_PERIODIC_TASKS + +type SdsLifecycleRequest* = object + operation: SdsLifecycleMsgType + channelId: cstring + appCallbacks: AppCallbacks + +proc createShared*( + T: type SdsLifecycleRequest, + op: SdsLifecycleMsgType, + channelId: cstring = "", + appCallbacks: AppCallbacks = nil, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].appCallbacks = appCallbacks + ret[].channelId = channelId.alloc() + return ret + +proc destroyShared(self: ptr SdsLifecycleRequest) = + deallocShared(self[].channelId) + deallocShared(self) + +proc createReliabilityManager( + channelIdCStr: cstring, appCallbacks: AppCallbacks = nil +): Future[Result[ReliabilityManager, string]] {.async.} = + let channelId = $channelIdCStr + if channelId.len == 0: + error "Failed creating ReliabilityManager: Channel ID cannot be empty" + return err("Failed creating ReliabilityManager: Channel ID cannot be empty") + + let rm = newReliabilityManager(channelId).valueOr: + error "Failed creating reliability manager", error = error + return err("Failed creating reliability manager: " & $error) + + rm.setCallbacks( + appCallbacks.messageReadyCb, appCallbacks.messageSentCb, + appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb, + ) + + return ok(rm) + +proc process*( + self: ptr SdsLifecycleRequest, rm: ptr ReliabilityManager +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of CREATE_RELIABILITY_MANAGER: + rm[] = (await createReliabilityManager(self.channelId, self.appCallbacks)).valueOr: + error "CREATE_RELIABILITY_MANAGER failed", error = error + return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error) + of RESET_RELIABILITY_MANAGER: + resetReliabilityManager(rm[]).isOkOr: + error "RESET_RELIABILITY_MANAGER failed", error = error + return err("error processing RESET_RELIABILITY_MANAGER request: " & $error) + of START_PERIODIC_TASKS: + rm[].startPeriodicTasks() + + return ok("") diff --git a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim new file mode 100644 index 0000000..6917ccf --- /dev/null +++ b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim @@ -0,0 +1,69 @@ +import std/[options, json, strutils, net, sequtils] +import chronos, chronicles, results, confutils, confutils/std/net + +import ../../../alloc +import ../../../../src/[reliability_utils, reliability, message] + +type SdsMessageMsgType* = enum + WRAP_MESSAGE + UNWRAP_MESSAGE + +type SdsMessageRequest* = object + operation: SdsMessageMsgType + message: SharedSeq[byte] + messageLen: csize_t + messageId: cstring + +type SdsUnwrapResponse* = object + message*: seq[byte] + missingDeps*: seq[MessageID] + +proc createShared*( + T: type SdsMessageRequest, + op: SdsMessageMsgType, + message: pointer, + messageLen: csize_t = 0, + messageId: cstring = "", +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].messageLen = messageLen + ret[].messageId = messageId.alloc() + ret[].message = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int) + + return ret + +proc destroyShared(self: ptr SdsMessageRequest) = + deallocSharedSeq(self[].message) + deallocShared(self[].messageId) + deallocShared(self) + +proc process*( + self: ptr SdsMessageRequest, rm: ptr ReliabilityManager +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of WRAP_MESSAGE: + let messageBytes = self.message.toSeq() + + let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId).valueOr: + error "WRAP_MESSAGE failed", error = error + return err("error processing WRAP_MESSAGE request: " & $error) + + # returns a comma-separates string of bytes + return ok(wrappedMessage.mapIt($it).join(",")) + of UNWRAP_MESSAGE: + let messageBytes = self.message.toSeq() + + let (unwrappedMessage, missingDeps) = unwrapReceivedMessage(rm[], messageBytes).valueOr: + error "UNWRAP_MESSAGE failed", error = error + return err("error processing UNWRAP_MESSAGE request: " & $error) + + let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps) + + # return the result as a json string + return ok($(%*(res))) + + return ok("") diff --git a/library/sds_thread/inter_thread_communication/sds_thread_request.nim b/library/sds_thread/inter_thread_communication/sds_thread_request.nim new file mode 100644 index 0000000..f40bef4 --- /dev/null +++ b/library/sds_thread/inter_thread_communication/sds_thread_request.nim @@ -0,0 +1,78 @@ +## This file contains the base message request type that will be handled. +## The requests are created by the main thread and processed by +## the SDS Thread. + +import std/json, results +import chronos, chronos/threadsync +import + ../../ffi_types, + ./requests/[sds_lifecycle_request, sds_message_request, sds_dependencies_request], + ../../../src/[reliability_utils] + +type RequestType* {.pure.} = enum + LIFECYCLE + MESSAGE + DEPENDENCIES + +type SdsThreadRequest* = object + reqType: RequestType + reqContent: pointer + callback: SdsCallBack + userData: pointer + +proc createShared*( + T: type SdsThreadRequest, + reqType: RequestType, + reqContent: pointer, + callback: SdsCallBack, + userData: pointer, +): ptr type T = + var ret = createShared(T) + ret[].reqType = reqType + ret[].reqContent = reqContent + ret[].callback = callback + ret[].userData = userData + return ret + +proc handleRes[T: string | void]( + res: Result[T, string], request: ptr SdsThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + + defer: + deallocShared(request) + + if res.isErr(): + foreignThreadGc: + let msg = "libsds error: handleRes fireSyncRes error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + +proc process*( + T: type SdsThreadRequest, request: ptr SdsThreadRequest, rm: ptr ReliabilityManager +) {.async.} = + let retFut = + case request[].reqType + of LIFECYCLE: + cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm) + of MESSAGE: + cast[ptr SdsMessageRequest](request[].reqContent).process(rm) + of DEPENDENCIES: + cast[ptr SdsDependenciesRequest](request[].reqContent).process(rm) + + handleRes(await retFut, request) + +proc `$`*(self: SdsThreadRequest): string = + return $self.reqType diff --git a/library/sds_thread/sds_thread.nim b/library/sds_thread/sds_thread.nim new file mode 100644 index 0000000..4a2cce5 --- /dev/null +++ b/library/sds_thread/sds_thread.nim @@ -0,0 +1,132 @@ +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +import std/[options, atomics, os, net, locks] +import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results +import + ../ffi_types, + ./inter_thread_communication/sds_thread_request, + ../../src/[reliability_utils] + +type SdsContext* = object + thread: Thread[(ptr SdsContext)] + lock: Lock + reqChannel: ChannelSPSCSingle[ptr SdsThreadRequest] + reqSignal: ThreadSignalPtr + # to inform The SDS Thread (a.k.a TST) that a new request is sent + reqReceivedSignal: ThreadSignalPtr + # to inform the main thread that the request is rx by TST + userData*: pointer + eventCallback*: pointer + eventUserdata*: pointer + running: Atomic[bool] # To control when the thread is running + +proc runSds(ctx: ptr SdsContext) {.async.} = + ## This is the worker body. This runs the SDS instance + ## and attends library user requests (stop, connect_to, etc.) + + var rm: ReliabilityManager + + while true: + await ctx.reqSignal.wait() + + if ctx.running.load == false: + break + + ## Trying to get a request from the libsds requestor thread + var request: ptr SdsThreadRequest + let recvOk = ctx.reqChannel.tryRecv(request) + if not recvOk: + error "sds thread could not receive a request" + continue + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + ## Handle the request + asyncSpawn SdsThreadRequest.process(request, addr rm) + +proc run(ctx: ptr SdsContext) {.thread.} = + ## Launch sds worker + waitFor runSds(ctx) + +proc createSdsThread*(): Result[ptr SdsContext, string] = + ## This proc is called from the main thread and it creates + ## the SDS working thread. + var ctx = createShared(SdsContext, 1) + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqSignal ThreadSignalPtr") + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.lock.initLock() + + ctx.running.store(true) + + try: + createThread(ctx.thread, run, ctx) + except ValueError, ResourceExhaustedError: + # and freeShared for typed allocations! + freeShared(ctx) + + return err("failed to create the SDS thread: " & getCurrentExceptionMsg()) + + return ok(ctx) + +proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] = + ctx.running.store(false) + + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("error in destroySdsThread: " & $error) + if not signaledOnTime: + return err("failed to signal reqSignal on time in destroySdsThread") + + joinThread(ctx.thread) + ctx.lock.deinitLock() + ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() + freeShared(ctx) + + return ok() + +proc sendRequestToSdsThread*( + ctx: ptr SdsContext, + reqType: RequestType, + reqContent: pointer, + callback: SdsCallBack, + userData: pointer, +): Result[void, string] = + let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData) + + # This lock is only necessary while we use a SP Channel and while the signalling + # between threads assumes that there aren't concurrent requests. + # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive + # requests concurrently and spare us the need of locks + ctx.lock.acquire() + defer: + ctx.lock.release() + ## Sending the request + let sentOk = ctx.reqChannel.trySend(req) + if not sentOk: + deallocShared(req) + return err("Couldn't send a request to the sds thread: " & $req[]) + + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deallocShared(req) + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + deallocShared(req) + return err("Couldn't fireSync in time") + + ## wait until the SDS Thread properly received the request + let res = ctx.reqReceivedSignal.waitSync() + if res.isErr(): + deallocShared(req) + return err("Couldn't receive reqReceivedSignal signal") + + ## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the + ## process proc. + ok() diff --git a/reliability.nimble b/reliability.nimble deleted file mode 100644 index 170f644..0000000 --- a/reliability.nimble +++ /dev/null @@ -1,32 +0,0 @@ -# Package -version = "0.1.0" -author = "Waku Team" -description = "E2E Reliability Protocol API" -license = "MIT" -srcDir = "src" - -# Dependencies -requires "nim >= 2.0.8" -requires "chronicles" -requires "libp2p" - -# Tasks -task test, "Run the test suite": - exec "nim c -r tests/test_bloom.nim" - exec "nim c -r tests/test_reliability.nim" - -task bindings, "Generate bindings": - proc compile(libName: string, flags = "") = - exec "nim c -f " & flags & " -d:release --app:lib --mm:arc --tlsEmulation:off --out:" & libName & " --outdir:bindings/generated bindings/bindings.nim" - - # Create required directories - mkDir "bindings/generated" - - when defined(windows): - compile "reliability.dll" - elif defined(macosx): - compile "libsds.dylib.arm", "--cpu:arm64 -l:'-target arm64-apple-macos11' -t:'-target arm64-apple-macos11'" - compile "libsds.dylib.x64", "--cpu:amd64 -l:'-target x86_64-apple-macos10.12' -t:'-target x86_64-apple-macos10.12'" - exec "lipo bindings/generated/libsds.dylib.arm bindings/generated/libsds.dylib.x64 -output bindings/generated/libsds.dylib -create" - else: - compile "libsds.so" \ No newline at end of file diff --git a/sds.nimble b/sds.nimble new file mode 100644 index 0000000..2362c28 --- /dev/null +++ b/sds.nimble @@ -0,0 +1,39 @@ +# Package +version = "0.1.0" +author = "Waku Team" +description = "E2E Reliability Protocol API" +license = "MIT" +srcDir = "src" + +# Dependencies +requires "nim >= 2.0.8" +requires "chronicles" +requires "libp2p" + +proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") = + if not dirExists "build": + mkDir "build" + # allow something like "nim nimbus --verbosity:0 --hints:off nimbus.nims" + var extra_params = params + for i in 2 ..< paramCount(): + extra_params &= " " & paramStr(i) + if `type` == "static": + exec "nim c" & " --out:build/" & name & + ".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --undef:metrics --nimMainPrefix:libsds --skipParentCfg:on " & + extra_params & " " & srcDir & name & ".nim" + else: + exec "nim c" & " --out:build/" & name & + ".so --threads:on --app:lib --opt:size --noMain --mm:refc --header --undef:metrics --nimMainPrefix:libsds --skipParentCfg:on " & + extra_params & " " & srcDir & name & ".nim" + +# Tasks +task test, "Run the test suite": + exec "nim c -r tests/test_bloom.nim" + exec "nim c -r tests/test_reliability.nim" + +task libsdsDynamic, "Generate bindings": + let name = "libsds" + buildLibrary name, + "library/", + "", + "dynamic" \ No newline at end of file diff --git a/sds_wrapper.go b/sds_wrapper.go deleted file mode 100644 index 7c1e523..0000000 --- a/sds_wrapper.go +++ /dev/null @@ -1,280 +0,0 @@ -package main - -/* -#cgo CFLAGS: -I${SRCDIR}/bindings -#cgo LDFLAGS: -L${SRCDIR}/bindings/generated -lbindings -#cgo LDFLAGS: -Wl,-rpath,${SRCDIR}/bindings/generated - -#include // For C.free -#include "bindings/bindings.h" // Update include path - -// Forward declaration for the single Go callback relay function -extern void globalCallbackRelay(void* handle, CEventType eventType, void* data1, void* data2, size_t data3); - -// Helper function to call the C memory freeing functions -static void callFreeCResultError(CResult res) { FreeCResultError(res); } -static void callFreeCWrapResult(CWrapResult res) { FreeCWrapResult(res); } -static void callFreeCUnwrapResult(CUnwrapResult res) { FreeCUnwrapResult(res); } - -*/ -import "C" -import ( - "errors" - "fmt" - "sync" - "unsafe" -) - -// --- Go Types --- - -// ReliabilityManagerHandle represents the opaque handle to the Nim object -type ReliabilityManagerHandle unsafe.Pointer - -// MessageID is a type alias for string for clarity -type MessageID string - -// Callbacks holds the Go functions to be called by the Nim library -type Callbacks struct { - OnMessageReady func(messageId MessageID) - OnMessageSent func(messageId MessageID) - OnMissingDependencies func(messageId MessageID, missingDeps []MessageID) - OnPeriodicSync func() -} - -// Global map to store callbacks associated with handles -var ( - callbackRegistry = make(map[ReliabilityManagerHandle]*Callbacks) - registryMutex sync.RWMutex -) - -// --- Go Wrapper Functions --- - -// NewReliabilityManager creates a new instance of the Nim ReliabilityManager -func NewReliabilityManager(channelId string) (ReliabilityManagerHandle, error) { - cChannelId := C.CString(channelId) - defer C.free(unsafe.Pointer(cChannelId)) - - handle := C.NewReliabilityManager(cChannelId) - if handle == nil { - // Note: Nim side currently just prints to stdout on creation failure - return nil, errors.New("failed to create ReliabilityManager (check Nim logs/stdout)") - } - return ReliabilityManagerHandle(handle), nil -} - -// CleanupReliabilityManager frees the resources associated with the handle -func CleanupReliabilityManager(handle ReliabilityManagerHandle) { - if handle == nil { - return - } - registryMutex.Lock() - delete(callbackRegistry, handle) - registryMutex.Unlock() - C.CleanupReliabilityManager(unsafe.Pointer(handle)) -} - -// ResetReliabilityManager resets the state of the manager -func ResetReliabilityManager(handle ReliabilityManagerHandle) error { - if handle == nil { - return errors.New("handle is nil") - } - cResult := C.ResetReliabilityManager(unsafe.Pointer(handle)) - if !cResult.is_ok { - errMsg := C.GoString(cResult.error_message) - C.callFreeCResultError(cResult) // Free the error message - return errors.New(errMsg) - } - return nil -} - -// WrapOutgoingMessage wraps a message with reliability metadata -func WrapOutgoingMessage(handle ReliabilityManagerHandle, message []byte, messageId MessageID) ([]byte, error) { - if handle == nil { - return nil, errors.New("handle is nil") - } - cMessageId := C.CString(string(messageId)) - defer C.free(unsafe.Pointer(cMessageId)) - - var cMessagePtr unsafe.Pointer - if len(message) > 0 { - cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed - defer C.free(cMessagePtr) - } else { - cMessagePtr = nil - } - cMessageLen := C.size_t(len(message)) - - cWrapResult := C.WrapOutgoingMessage(unsafe.Pointer(handle), cMessagePtr, cMessageLen, cMessageId) - - if !cWrapResult.base_result.is_ok { - errMsg := C.GoString(cWrapResult.base_result.error_message) - C.callFreeCWrapResult(cWrapResult) // Free error and potentially allocated message - return nil, errors.New(errMsg) - } - - // Copy the wrapped message from C memory to Go slice - // Explicitly cast the message pointer to unsafe.Pointer - wrappedMessage := C.GoBytes(unsafe.Pointer(cWrapResult.message), C.int(cWrapResult.message_len)) - C.callFreeCWrapResult(cWrapResult) // Free the C-allocated message buffer - - return wrappedMessage, nil -} - -// UnwrapReceivedMessage unwraps a received message -func UnwrapReceivedMessage(handle ReliabilityManagerHandle, message []byte) ([]byte, []MessageID, error) { - if handle == nil { - return nil, nil, errors.New("handle is nil") - } - - var cMessagePtr unsafe.Pointer - if len(message) > 0 { - cMessagePtr = C.CBytes(message) - defer C.free(cMessagePtr) - } else { - cMessagePtr = nil - } - cMessageLen := C.size_t(len(message)) - - cUnwrapResult := C.UnwrapReceivedMessage(unsafe.Pointer(handle), cMessagePtr, cMessageLen) - - if !cUnwrapResult.base_result.is_ok { - errMsg := C.GoString(cUnwrapResult.base_result.error_message) - C.callFreeCUnwrapResult(cUnwrapResult) // Free error and potentially allocated fields - return nil, nil, errors.New(errMsg) - } - - // Copy unwrapped message content - unwrappedContent := C.GoBytes(unsafe.Pointer(cUnwrapResult.message), C.int(cUnwrapResult.message_len)) - - // Copy missing dependencies - missingDeps := make([]MessageID, cUnwrapResult.missing_deps_count) - if cUnwrapResult.missing_deps_count > 0 { - // Convert C array of C strings to Go slice of strings - cDepsArray := (*[1 << 30]*C.char)(unsafe.Pointer(cUnwrapResult.missing_deps))[:cUnwrapResult.missing_deps_count:cUnwrapResult.missing_deps_count] - for i, s := range cDepsArray { - missingDeps[i] = MessageID(C.GoString(s)) - } - } - - C.callFreeCUnwrapResult(cUnwrapResult) // Free C-allocated message, deps array, and strings - - return unwrappedContent, missingDeps, nil -} - -// MarkDependenciesMet informs the library that dependencies are met -func MarkDependenciesMet(handle ReliabilityManagerHandle, messageIDs []MessageID) error { - if handle == nil { - return errors.New("handle is nil") - } - if len(messageIDs) == 0 { - return nil // Nothing to do - } - - // Convert Go string slice to C array of C strings (char**) - cMessageIDs := make([]*C.char, len(messageIDs)) - for i, id := range messageIDs { - cMessageIDs[i] = C.CString(string(id)) - defer C.free(unsafe.Pointer(cMessageIDs[i])) // Ensure each CString is freed - } - - // Create a pointer (**C.char) to the first element of the slice - var cMessageIDsPtr **C.char - if len(cMessageIDs) > 0 { - cMessageIDsPtr = &cMessageIDs[0] - } else { - cMessageIDsPtr = nil // Handle empty slice case - } - - // Pass the pointer variable (cMessageIDsPtr) directly, which is of type **C.char - cResult := C.MarkDependenciesMet(unsafe.Pointer(handle), cMessageIDsPtr, C.size_t(len(messageIDs))) - - if !cResult.is_ok { - errMsg := C.GoString(cResult.error_message) - C.callFreeCResultError(cResult) - return errors.New(errMsg) - } - return nil -} - -// RegisterCallback sets the single Go callback relay function -func RegisterCallback(handle ReliabilityManagerHandle, callbacks Callbacks) error { - if handle == nil { - return errors.New("handle is nil") - } - - // Store the Go callbacks associated with this handle - registryMutex.Lock() - callbackRegistry[handle] = &callbacks - registryMutex.Unlock() - - // Register the single global Go relay function with the Nim library - // Nim will call globalCallbackRelay, passing the handle as the first argument. - C.RegisterCallback( - unsafe.Pointer(handle), - (C.CEventCallback)(C.globalCallbackRelay), // Pass the Go relay function pointer - nil, // user_data is not used here, handle is passed directly by Nim wrapper - ) - return nil -} - -// StartPeriodicTasks starts the background tasks in the Nim library -func StartPeriodicTasks(handle ReliabilityManagerHandle) error { - if handle == nil { - return errors.New("handle is nil") - } - C.StartPeriodicTasks(unsafe.Pointer(handle)) - // Assuming StartPeriodicTasks doesn't return an error status in C API - return nil -} - -// globalCallbackRelay is called by Nim for all events. -// It uses the handle to find the correct Go Callbacks struct and dispatch the call. -//export globalCallbackRelay -func globalCallbackRelay(handle unsafe.Pointer, eventType C.CEventType, data1 unsafe.Pointer, data2 unsafe.Pointer, data3 C.size_t) { - goHandle := ReliabilityManagerHandle(handle) - - registryMutex.RLock() - callbacks, ok := callbackRegistry[goHandle] - registryMutex.RUnlock() - - if !ok || callbacks == nil { - fmt.Printf("Go: globalCallbackRelay: No callbacks registered for handle %v\n", goHandle) // Uncommented - return - } - - // Use a goroutine to avoid blocking the Nim thread - go func() { - switch eventType { - case C.EVENT_MESSAGE_READY: - if callbacks.OnMessageReady != nil { - msgIdStr := C.GoString((*C.char)(data1)) - callbacks.OnMessageReady(MessageID(msgIdStr)) - } - case C.EVENT_MESSAGE_SENT: - if callbacks.OnMessageSent != nil { - msgIdStr := C.GoString((*C.char)(data1)) - callbacks.OnMessageSent(MessageID(msgIdStr)) - } - case C.EVENT_MISSING_DEPENDENCIES: - if callbacks.OnMissingDependencies != nil { - msgIdStr := C.GoString((*C.char)(data1)) - depsCount := int(data3) - deps := make([]MessageID, depsCount) - if depsCount > 0 { - // Convert C array of C strings (**char) to Go slice - cDepsArray := (*[1 << 30]*C.char)(data2)[:depsCount:depsCount] - for i, s := range cDepsArray { - deps[i] = MessageID(C.GoString(s)) - } - } - callbacks.OnMissingDependencies(MessageID(msgIdStr), deps) - } - case C.EVENT_PERIODIC_SYNC: - if callbacks.OnPeriodicSync != nil { - callbacks.OnPeriodicSync() - } - default: - fmt.Printf("Go: globalCallbackRelay: Received unknown event type %d for handle %v\n", eventType, goHandle) - } - }() -} diff --git a/sds_wrapper_test.go b/sds_wrapper_test.go deleted file mode 100644 index ee1185a..0000000 --- a/sds_wrapper_test.go +++ /dev/null @@ -1,259 +0,0 @@ -package main -import ( - "fmt" - "sync" - "testing" - "time" -) - -// Test basic creation, cleanup, and reset -func TestLifecycle(t *testing.T) { - channelID := "test-lifecycle" - handle, err := NewReliabilityManager(channelID) - if err != nil { - t.Fatalf("NewReliabilityManager failed: %v", err) - } - if handle == nil { - t.Fatal("NewReliabilityManager returned a nil handle") - } - defer CleanupReliabilityManager(handle) // Ensure cleanup even on test failure - - err = ResetReliabilityManager(handle) - if err != nil { - t.Errorf("ResetReliabilityManager failed: %v", err) - } -} - -// Test wrapping and unwrapping a simple message -func TestWrapUnwrap(t *testing.T) { - channelID := "test-wrap-unwrap" - handle, err := NewReliabilityManager(channelID) - if err != nil { - t.Fatalf("NewReliabilityManager failed: %v", err) - } - defer CleanupReliabilityManager(handle) - - originalPayload := []byte("hello reliability") - messageID := MessageID("msg-wrap-1") - - wrappedMsg, err := WrapOutgoingMessage(handle, originalPayload, messageID) - if err != nil { - t.Fatalf("WrapOutgoingMessage failed: %v", err) - } - if len(wrappedMsg) == 0 { - t.Fatal("WrapOutgoingMessage returned empty bytes") - } - - // Simulate receiving the wrapped message - unwrappedPayload, missingDeps, err := UnwrapReceivedMessage(handle, wrappedMsg) - if err != nil { - t.Fatalf("UnwrapReceivedMessage failed: %v", err) - } - - if string(unwrappedPayload) != string(originalPayload) { - t.Errorf("Unwrapped payload mismatch: got %q, want %q", unwrappedPayload, originalPayload) - } - if len(missingDeps) != 0 { - t.Errorf("Expected 0 missing dependencies, got %d: %v", len(missingDeps), missingDeps) - } -} - -// Test dependency handling -func TestDependencies(t *testing.T) { - channelID := "test-deps" - handle, err := NewReliabilityManager(channelID) - if err != nil { - t.Fatalf("NewReliabilityManager failed: %v", err) - } - defer CleanupReliabilityManager(handle) - - // 1. Send message 1 (will become a dependency) - payload1 := []byte("message one") - msgID1 := MessageID("msg-dep-1") - wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1) - if err != nil { - t.Fatalf("WrapOutgoingMessage (1) failed: %v", err) - } - // Simulate receiving msg1 to add it to history (implicitly acknowledges it) - _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1) - if err != nil { - t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err) - } - - // 2. Send message 2 (depends on message 1 implicitly via causal history) - payload2 := []byte("message two") - msgID2 := MessageID("msg-dep-2") - wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2) - if err != nil { - t.Fatalf("WrapOutgoingMessage (2) failed: %v", err) - } - - // 3. Create a new manager to simulate a different peer receiving msg2 without msg1 - handle2, err := NewReliabilityManager(channelID) // Same channel ID - if err != nil { - t.Fatalf("NewReliabilityManager (2) failed: %v", err) - } - defer CleanupReliabilityManager(handle2) - - // 4. Unwrap message 2 on the second manager - should report msg1 as missing - _, missingDeps, err := UnwrapReceivedMessage(handle2, wrappedMsg2) - if err != nil { - t.Fatalf("UnwrapReceivedMessage (2) on handle2 failed: %v", err) - } - - if len(missingDeps) == 0 { - t.Fatalf("Expected missing dependencies, got none") - } - foundDep1 := false - for _, dep := range missingDeps { - if dep == msgID1 { - foundDep1 = true - break - } - } - if !foundDep1 { - t.Errorf("Expected missing dependency %q, got %v", msgID1, missingDeps) - } - - // 5. Mark the dependency as met - err = MarkDependenciesMet(handle2, []MessageID{msgID1}) - if err != nil { - t.Fatalf("MarkDependenciesMet failed: %v", err) - } -} - -// Test callbacks -func TestCallbacks(t *testing.T) { - channelID := "test-callbacks" - handle, err := NewReliabilityManager(channelID) - if err != nil { - t.Fatalf("NewReliabilityManager failed: %v", err) - } - defer CleanupReliabilityManager(handle) - - var wg sync.WaitGroup - receivedReady := make(map[MessageID]bool) - receivedSent := make(map[MessageID]bool) - receivedMissing := make(map[MessageID][]MessageID) - syncRequested := false - var cbMutex sync.Mutex // Protect access to callback tracking maps/vars - - callbacks := Callbacks{ - OnMessageReady: func(messageId MessageID) { - fmt.Printf("Test: OnMessageReady received: %s\n", messageId) - cbMutex.Lock() - receivedReady[messageId] = true - cbMutex.Unlock() - wg.Done() - }, - OnMessageSent: func(messageId MessageID) { - fmt.Printf("Test: OnMessageSent received: %s\n", messageId) - cbMutex.Lock() - receivedSent[messageId] = true - cbMutex.Unlock() - wg.Done() - }, - OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { - fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps) - cbMutex.Lock() - receivedMissing[messageId] = missingDeps - cbMutex.Unlock() - wg.Done() - }, - OnPeriodicSync: func() { - fmt.Println("Test: OnPeriodicSync received") - cbMutex.Lock() - syncRequested = true - cbMutex.Unlock() - // Don't wg.Done() here, it might be called multiple times - }, - } - - err = RegisterCallback(handle, callbacks) - if err != nil { - t.Fatalf("RegisterCallback failed: %v", err) - } - - // Start tasks AFTER registering callbacks - err = StartPeriodicTasks(handle) - if err != nil { - t.Fatalf("StartPeriodicTasks failed: %v", err) - } - - // --- Test Scenario --- - - // 1. Send msg1 - wg.Add(1) // Expect OnMessageSent for msg1 eventually - payload1 := []byte("callback test 1") - msgID1 := MessageID("cb-msg-1") - wrappedMsg1, err := WrapOutgoingMessage(handle, payload1, msgID1) - if err != nil { - t.Fatalf("WrapOutgoingMessage (1) failed: %v", err) - } - - // 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history) - wg.Add(1) // Expect OnMessageReady for msg1 - _, _, err = UnwrapReceivedMessage(handle, wrappedMsg1) - if err != nil { - t.Fatalf("UnwrapReceivedMessage (1) failed: %v", err) - } - - // 3. Send msg2 (depends on msg1) - wg.Add(1) // Expect OnMessageSent for msg2 eventually - payload2 := []byte("callback test 2") - msgID2 := MessageID("cb-msg-2") - wrappedMsg2, err := WrapOutgoingMessage(handle, payload2, msgID2) - if err != nil { - t.Fatalf("WrapOutgoingMessage (2) failed: %v", err) - } - - // 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2) - wg.Add(1) // Expect OnMessageReady for msg2 - _, _, err = UnwrapReceivedMessage(handle, wrappedMsg2) - if err != nil { - t.Fatalf("UnwrapReceivedMessage (2) failed: %v", err) - } - - // --- Verification --- - // Wait for expected callbacks with a timeout - waitTimeout(&wg, 5*time.Second, t) - - cbMutex.Lock() - defer cbMutex.Unlock() - - if !receivedReady[msgID1] { - t.Errorf("OnMessageReady not called for %s", msgID1) - } - if !receivedReady[msgID2] { - t.Errorf("OnMessageReady not called for %s", msgID2) - } - if !receivedSent[msgID1] { - t.Errorf("OnMessageSent not called for %s", msgID1) - } - if !receivedSent[msgID2] { - t.Errorf("OnMessageSent not called for %s", msgID2) - } - // We didn't explicitly test missing deps in this path - if len(receivedMissing) > 0 { - t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing) - } - // Periodic sync is harder to guarantee in a short test, just check if it was ever true - if !syncRequested { - t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout") - } -} - -// Helper function to wait for WaitGroup with a timeout -func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) { - c := make(chan struct{}) - go func() { - defer close(c) - wg.Wait() - }() - select { - case <-c: - // Completed normally - case <-time.After(timeout): - t.Fatalf("Timed out waiting for callbacks") - } -} diff --git a/src/reliability.nim b/src/reliability.nim index 29e3c36..1262c7d 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -2,7 +2,9 @@ import std/[times, locks, tables, sets] import chronos, results import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter] -proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager, ReliabilityError] = +proc newReliabilityManager*( + channelId: string, config: ReliabilityConfig = defaultConfig() +): Result[ReliabilityManager, ReliabilityError] = ## Creates a new ReliabilityManager with the specified channel ID and configuration. ## ## Parameters: @@ -13,14 +15,12 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau ## A Result containing either a new ReliabilityManager instance or an error. if channelId.len == 0: return err(reInvalidArgument) - + try: let bloomFilter = newRollingBloomFilter( - config.bloomFilterCapacity, - config.bloomFilterErrorRate, - config.bloomFilterWindow + config.bloomFilterCapacity, config.bloomFilterErrorRate, config.bloomFilterWindow ) - + let rm = ReliabilityManager( lamportTimestamp: 0, messageHistory: @[], @@ -28,7 +28,7 @@ proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defau outgoingBuffer: @[], incomingBuffer: @[], channelId: channelId, - config: config + config: config, ) initLock(rm.lock) return ok(rm) @@ -40,27 +40,25 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) = while i < rm.outgoingBuffer.len: var acknowledged = false let outMsg = rm.outgoingBuffer[i] - + # Check if message is in causal history for msgID in msg.causalHistory: if outMsg.message.messageId == msgID: acknowledged = true break - + # Check bloom filter if not already acknowledged if not acknowledged and msg.bloomFilter.len > 0: let bfResult = deserializeBloomFilter(msg.bloomFilter) if bfResult.isOk: var rbf = RollingBloomFilter( - filter: bfResult.get(), - window: rm.bloomFilter.window, - messages: @[] + filter: bfResult.get(), window: rm.bloomFilter.window, messages: @[] ) if rbf.contains(outMsg.message.messageId): acknowledged = true else: logError("Failed to deserialize bloom filter") - + if acknowledged: if rm.onMessageSent != nil: rm.onMessageSent(outMsg.message.messageId) @@ -68,7 +66,9 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: Message) = else: inc i -proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: MessageID): Result[seq[byte], ReliabilityError] = +proc wrapOutgoingMessage*( + rm: ReliabilityManager, message: seq[byte], messageId: MessageID +): Result[seq[byte], ReliabilityError] = ## Wraps an outgoing message with reliability metadata. ## ## Parameters: @@ -84,7 +84,7 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: withLock rm.lock: try: rm.updateLamportTimestamp(getTime().toUnix) - + # Serialize current bloom filter var bloomBytes: seq[byte] let bfResult = serializeBloomFilter(rm.bloomFilter.filter) @@ -100,15 +100,13 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: causalHistory: rm.getRecentMessageIDs(rm.config.maxCausalHistory), channelId: rm.channelId, content: message, - bloomFilter: bloomBytes + bloomFilter: bloomBytes, ) # Add to outgoing buffer - rm.outgoingBuffer.add(UnacknowledgedMessage( - message: msg, - sendTime: getTime(), - resendAttempts: 0 - )) + rm.outgoingBuffer.add( + UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0) + ) # Add to causal history and bloom filter rm.bloomFilter.add(msg.messageId) @@ -156,7 +154,7 @@ proc processIncomingBuffer(rm: ReliabilityManager) = if rm.onMessageReady != nil: rm.onMessageReady(msg.messageId) processed.incl(msgId) - + # Add any dependent messages that might now be ready if msgId in dependencies: for dependentId in dependencies[msgId]: @@ -170,7 +168,11 @@ proc processIncomingBuffer(rm: ReliabilityManager) = rm.incomingBuffer = newIncomingBuffer -proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] = +proc unwrapReceivedMessage*( + rm: ReliabilityManager, message: seq[byte] +): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] {. + gcsafe +.} = ## Unwraps a received message and processes its reliability metadata. ## ## Parameters: @@ -182,7 +184,7 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[ let msgResult = deserializeMessage(message) if not msgResult.isOk: return err(msgResult.error) - + let msg = msgResult.get if rm.bloomFilter.contains(msg.messageId): return ok((msg.content, @[])) @@ -225,7 +227,9 @@ proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[ except: return err(reInternalError) -proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): Result[void, ReliabilityError] = +proc markDependenciesMet*( + rm: ReliabilityManager, messageIds: seq[MessageID] +): Result[void, ReliabilityError] = ## Marks the specified message dependencies as met. ## ## Parameters: @@ -240,16 +244,18 @@ proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]): R rm.bloomFilter.add(msgId) # rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application? rm.processIncomingBuffer() - + return ok() except: return err(reInternalError) -proc setCallbacks*(rm: ReliabilityManager, - onMessageReady: proc(messageId: MessageID) {.gcsafe.}, - onMessageSent: proc(messageId: MessageID) {.gcsafe.}, - onMissingDependencies: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.}, - onPeriodicSync: PeriodicSyncCallback = nil) = +proc setCallbacks*( + rm: ReliabilityManager, + onMessageReady: MessageReadyCallback, + onMessageSent: MessageSentCallback, + onMissingDependencies: MissingDependenciesCallback, + onPeriodicSync: PeriodicSyncCallback = nil, +) = ## Sets the callback functions for various events in the ReliabilityManager. ## ## Parameters: @@ -268,7 +274,7 @@ proc checkUnacknowledgedMessages*(rm: ReliabilityManager) {.raises: [].} = withLock rm.lock: let now = getTime() var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] - + try: for unackMsg in rm.outgoingBuffer: let elapsed = now - unackMsg.sendTime @@ -298,7 +304,7 @@ proc periodicBufferSweep(rm: ReliabilityManager) {.async: (raises: [CancelledErr rm.cleanBloomFilter() except Exception as e: logError("Error in periodic buffer sweep: " & e.msg) - + await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds)) proc periodicSyncMessage(rm: ReliabilityManager) {.async: (raises: [CancelledError]).} = @@ -333,10 +339,9 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE rm.outgoingBuffer.setLen(0) rm.incomingBuffer.setLen(0) rm.bloomFilter = newRollingBloomFilter( - rm.config.bloomFilterCapacity, - rm.config.bloomFilterErrorRate, - rm.config.bloomFilterWindow + rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate, + rm.config.bloomFilterWindow, ) return ok() except: - return err(reInternalError) \ No newline at end of file + return err(reInternalError) diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index 33b47e6..367e965 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -2,8 +2,21 @@ import std/[times, locks] import ./[rolling_bloom_filter, message] type + MessageReadyCallback* = proc(messageId: MessageID) {.gcsafe.} + + MessageSentCallback* = proc(messageId: MessageID) {.gcsafe.} + + MissingDependenciesCallback* = + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} + PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} + AppCallbacks* = ref object + messageReadyCb*: MessageReadyCallback + messageSentCb*: MessageSentCallback + missingDependenciesCb*: MissingDependenciesCallback + periodicSyncCb*: PeriodicSyncCallback + ReliabilityConfig* = object bloomFilterCapacity*: int bloomFilterErrorRate*: float @@ -24,9 +37,10 @@ type channelId*: string config*: ReliabilityConfig lock*: Lock - onMessageReady*: proc(messageId: MessageID) - onMessageSent*: proc(messageId: MessageID) - onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) + onMessageReady*: proc(messageId: MessageID) {.gcsafe.} + onMessageSent*: proc(messageId: MessageID) {.gcsafe.} + onMissingDependencies*: + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} onPeriodicSync*: proc() ReliabilityError* = enum @@ -51,7 +65,7 @@ proc defaultConfig*(): ReliabilityConfig = resendInterval: DefaultResendInterval, maxResendAttempts: DefaultMaxResendAttempts, syncMessageInterval: DefaultSyncMessageInterval, - bufferSweepInterval: DefaultBufferSweepInterval + bufferSweepInterval: DefaultBufferSweepInterval, ) proc cleanup*(rm: ReliabilityManager) {.raises: [].} = @@ -76,7 +90,9 @@ proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [ if rm.messageHistory.len > rm.config.maxMessageHistory: rm.messageHistory.delete(0) -proc updateLamportTimestamp*(rm: ReliabilityManager, msgTs: int64) {.gcsafe, raises: [].} = +proc updateLamportTimestamp*( + rm: ReliabilityManager, msgTs: int64 +) {.gcsafe, raises: [].} = rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1 proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] = diff --git a/vendor/nim-chronicles b/vendor/nim-chronicles new file mode 160000 index 0000000..a8fb38a --- /dev/null +++ b/vendor/nim-chronicles @@ -0,0 +1 @@ +Subproject commit a8fb38a10bcb548df78e9a70bd77b26bb50abd12 diff --git a/vendor/nim-chronos b/vendor/nim-chronos new file mode 160000 index 0000000..b55e281 --- /dev/null +++ b/vendor/nim-chronos @@ -0,0 +1 @@ +Subproject commit b55e2816eb45f698ddaca8d8473e401502562db2 diff --git a/vendor/nim-confutils b/vendor/nim-confutils new file mode 160000 index 0000000..e214b39 --- /dev/null +++ b/vendor/nim-confutils @@ -0,0 +1 @@ +Subproject commit e214b3992a31acece6a9aada7d0a1ad37c928f3b diff --git a/vendor/nim-faststreams b/vendor/nim-faststreams new file mode 160000 index 0000000..2b08c77 --- /dev/null +++ b/vendor/nim-faststreams @@ -0,0 +1 @@ +Subproject commit 2b08c774afaafd600cf4c6f994cf78b8aa090c0c diff --git a/vendor/nim-json-serialization b/vendor/nim-json-serialization new file mode 160000 index 0000000..2b1c5eb --- /dev/null +++ b/vendor/nim-json-serialization @@ -0,0 +1 @@ +Subproject commit 2b1c5eb11df3647a2cee107cd4cce3593cbb8bcf diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p new file mode 160000 index 0000000..ac25da6 --- /dev/null +++ b/vendor/nim-libp2p @@ -0,0 +1 @@ +Subproject commit ac25da6cea158768bbc060b7be2fbe004206f3bb diff --git a/vendor/nim-results b/vendor/nim-results new file mode 160000 index 0000000..df8113d --- /dev/null +++ b/vendor/nim-results @@ -0,0 +1 @@ +Subproject commit df8113dda4c2d74d460a8fa98252b0b771bf1f27 diff --git a/vendor/nim-serialization b/vendor/nim-serialization new file mode 160000 index 0000000..548d0ad --- /dev/null +++ b/vendor/nim-serialization @@ -0,0 +1 @@ +Subproject commit 548d0adc9797a10b2db7f788b804330306293088 diff --git a/vendor/nim-stew b/vendor/nim-stew new file mode 160000 index 0000000..d7a6868 --- /dev/null +++ b/vendor/nim-stew @@ -0,0 +1 @@ +Subproject commit d7a6868ba84165e7fdde427af9a1fc3f5f5cc151 diff --git a/vendor/nim-taskpools b/vendor/nim-taskpools new file mode 160000 index 0000000..7b74a71 --- /dev/null +++ b/vendor/nim-taskpools @@ -0,0 +1 @@ +Subproject commit 7b74a716a40249720fd7da428113147942b9642d diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system new file mode 160000 index 0000000..5f10509 --- /dev/null +++ b/vendor/nimbus-build-system @@ -0,0 +1 @@ +Subproject commit 5f10509cf880dc035e517ca7bac3163cd5206ba8