diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..31fc1d5 Binary files /dev/null and b/.DS_Store differ diff --git a/.gitignore b/.gitignore index 1431936..898171b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,16 @@ -nimcache -nimcache/* -tests/test_bloom -nim-bloom/bloom .DS_Store -src/.DS_Store -nph \ No newline at end of file +tests/test_reliability +tests/bloom +nph +docs +for_reference +do_not_commit +build/* +sds.nims +/.update.timestamp + +# Nimbus Build System +nimbus-build-system.paths + +# Nimble packages +/vendor/.nimble diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..fe38e76 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,55 @@ +[submodule "vendor/nimbus-build-system"] + path = vendor/nimbus-build-system + url = https://github.com/status-im/nimbus-build-system.git + ignore = untracked + branch = master +[submodule "vendor/nim-chronos"] + path = vendor/nim-chronos + url = https://github.com/status-im/nim-chronos.git + ignore = untracked + branch = master +[submodule "vendor/nim-results"] + path = vendor/nim-results + url = https://github.com/arnetheduck/nim-results.git + ignore = untracked + branch = master +[submodule "vendor/nim-stew"] + path = vendor/nim-stew + url = https://github.com/status-im/nim-stew.git + ignore = untracked + branch = master +[submodule "vendor/nim-chronicles"] + path = vendor/nim-chronicles + url = https://github.com/status-im/nim-chronicles.git + ignore = untracked + branch = master +[submodule "vendor/nim-faststreams"] + path = vendor/nim-faststreams + url = https://github.com/status-im/nim-faststreams.git + ignore = untracked + branch = master +[submodule "vendor/nim-json-serialization"] + path = vendor/nim-json-serialization + url = https://github.com/status-im/nim-json-serialization.git + ignore = untracked + branch = master +[submodule "vendor/nim-serialization"] + path = vendor/nim-serialization + url = https://github.com/status-im/nim-serialization.git + ignore = untracked + branch = master +[submodule "vendor/nim-taskpools"] + path = vendor/nim-taskpools + url = https://github.com/status-im/nim-taskpools.git + ignore = untracked + branch = master +[submodule "vendor/nim-confutils"] + path = vendor/nim-confutils + url = https://github.com/status-im/nim-confutils.git + ignore = untracked + branch = master +[submodule "vendor/nim-libp2p"] + path = vendor/nim-libp2p + url = https://github.com/vacp2p/nim-libp2p.git + ignore = untracked + branch = master diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..31788b2 --- /dev/null +++ b/Makefile @@ -0,0 +1,51 @@ +.PHONY: libsds + +export BUILD_SYSTEM_DIR := vendor/nimbus-build-system +# we don't want an error here, so we can handle things later, in the ".DEFAULT" target +-include $(BUILD_SYSTEM_DIR)/makefiles/variables.mk + +ifeq ($(NIM_PARAMS),) +# "variables.mk" was not included, so we update the submodules. +GIT_SUBMODULE_UPDATE := git submodule update --init --recursive +.DEFAULT: + +@ echo -e "Git submodules not found. Running '$(GIT_SUBMODULE_UPDATE)'.\n"; \ + $(GIT_SUBMODULE_UPDATE); \ + echo +# Now that the included *.mk files appeared, and are newer than this file, Make will restart itself: +# https://www.gnu.org/software/make/manual/make.html#Remaking-Makefiles +# +# After restarting, it will execute its original goal, so we don't have to start a child Make here +# with "$(MAKE) $(MAKECMDGOALS)". Isn't hidden control flow great? + +else # "variables.mk" was included. Business as usual until the end of this file. + +# default target, because it's the first one that doesn't start with '.' +all: | libsds + +sds.nims: + ln -s sds.nimble $@ + +update: | update-common + rm -rf sds.nims && \ + $(MAKE) sds.nims $(HANDLE_OUTPUT) + +clean: + rm -rf build + +deps: | sds.nims + +# must be included after the default target +-include $(BUILD_SYSTEM_DIR)/makefiles/targets.mk + +STATIC ?= 0 + +libsds: deps + rm -f build/libsds* +ifeq ($(STATIC), 1) + echo -e $(BUILD_MSG) "build/$@.a" && \ + $(ENV_SCRIPT) nim libsdsStatic $(NIM_PARAMS) sds.nims +else + echo -e $(BUILD_MSG) "build/$@.so" && \ + $(ENV_SCRIPT) nim libsdsDynamic $(NIM_PARAMS) sds.nims +endif +endif \ No newline at end of file diff --git a/env.sh b/env.sh new file mode 100644 index 0000000..f90ba9a --- /dev/null +++ b/env.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +# We use ${BASH_SOURCE[0]} instead of $0 to allow sourcing this file +# and we fall back to a Zsh-specific special var to also support Zsh. +REL_PATH="$(dirname ${BASH_SOURCE[0]:-${(%):-%x}})" +ABS_PATH="$(cd ${REL_PATH}; pwd)" +source ${ABS_PATH}/vendor/nimbus-build-system/scripts/env.sh + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..25c9fb4 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module sds-bindings + +go 1.22.5 diff --git a/library/alloc.nim b/library/alloc.nim new file mode 100644 index 0000000..b185dd7 --- /dev/null +++ b/library/alloc.nim @@ -0,0 +1,73 @@ +## Can be shared safely between threads +type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int] + +proc alloc*(str: cstring): cstring = + # Byte allocation from the given address. + # There should be the corresponding manual deallocation with deallocShared ! + if str.isNil(): + var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator + ret[0] = '\0' # Set the null terminator + return ret + + let ret = cast[cstring](allocShared(len(str) + 1)) + copyMem(ret, str, len(str) + 1) + return ret + +proc alloc*(str: string): cstring = + ## Byte allocation from the given address. + ## There should be the corresponding manual deallocation with deallocShared ! + var ret = cast[cstring](allocShared(str.len + 1)) + let s = cast[seq[char]](str) + for i in 0 ..< str.len: + ret[i] = s[i] + ret[str.len] = '\0' + return ret + +proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] = + let data = allocShared(sizeof(T) * s.len) + if s.len != 0: + copyMem(data, unsafeAddr s[0], s.len) + return (cast[ptr UncheckedArray[T]](data), s.len) + +proc deallocSharedSeq*[T](s: var SharedSeq[T]) = + if not s.data.isNil: + when T is cstring: + # For array of cstrings, deallocate each string first + for i in 0 ..< s.len: + if not s.data[i].isNil: + # Deallocate each cstring + deallocShared(s.data[i]) + + deallocShared(s.data) + s.len = 0 + +proc toSeq*[T](s: SharedSeq[T]): seq[T] = + ## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required + ## as req[T] is a GC managed type. + var ret = newSeq[T]() + for i in 0 ..< s.len: + ret.add(s.data[i]) + return ret + +proc allocSharedSeqFromCArray*[T](arr: ptr T, len: int): SharedSeq[T] = + ## Creates a SharedSeq[T] from a C array pointer and length. + ## The data is copied to shared memory. + ## There should be a corresponding manual deallocation with deallocSharedSeq! + if arr.isNil or len <= 0: + return (nil, 0) + + when T is cstring: + # Special handling for arrays of cstrings + let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * len)) + let cstrArr = cast[ptr UncheckedArray[cstring]](arr) + + for i in 0 ..< len: + # Use the existing alloc proc to properly allocate each cstring + data[i] = cstrArr[i].alloc() + + return (data, len) + else: + # Original handling for non-cstring types + let data = allocShared(sizeof(T) * len) + copyMem(data, arr, sizeof(T) * len) + return (cast[ptr UncheckedArray[T]](data), len) diff --git a/library/events/json_base_event.nim b/library/events/json_base_event.nim new file mode 100644 index 0000000..8c51d2c --- /dev/null +++ b/library/events/json_base_event.nim @@ -0,0 +1,6 @@ +type JsonEvent* = ref object of RootObj # https://rfc.vac.dev/spec/36/#jsonsignal-type + eventType* {.requiresInit.}: string + +method `$`*(jsonEvent: JsonEvent): string {.base.} = + discard + # All events should implement this diff --git a/library/events/json_message_ready_event.nim b/library/events/json_message_ready_event.nim new file mode 100644 index 0000000..d64f251 --- /dev/null +++ b/library/events/json_message_ready_event.nim @@ -0,0 +1,11 @@ +import std/json +import ./json_base_event, ../../src/[message] + +type JsonMessageReadyEvent* = ref object of JsonEvent + messageId*: SdsMessageID + +proc new*(T: type JsonMessageReadyEvent, messageId: SdsMessageID): T = + return JsonMessageReadyEvent(eventType: "message_ready", messageId: messageId) + +method `$`*(jsonMessageReady: JsonMessageReadyEvent): string = + $(%*jsonMessageReady) diff --git a/library/events/json_message_sent_event.nim b/library/events/json_message_sent_event.nim new file mode 100644 index 0000000..a9c7439 --- /dev/null +++ b/library/events/json_message_sent_event.nim @@ -0,0 +1,11 @@ +import std/json +import ./json_base_event, ../../src/[message] + +type JsonMessageSentEvent* = ref object of JsonEvent + messageId*: SdsMessageID + +proc new*(T: type JsonMessageSentEvent, messageId: SdsMessageID): T = + return JsonMessageSentEvent(eventType: "message_sent", messageId: messageId) + +method `$`*(jsonMessageSent: JsonMessageSentEvent): string = + $(%*jsonMessageSent) diff --git a/library/events/json_missing_dependencies_event.nim b/library/events/json_missing_dependencies_event.nim new file mode 100644 index 0000000..b7af04c --- /dev/null +++ b/library/events/json_missing_dependencies_event.nim @@ -0,0 +1,18 @@ +import std/json +import ./json_base_event, ../../src/[message] + +type JsonMissingDependenciesEvent* = ref object of JsonEvent + messageId*: SdsMessageID + missingDeps: seq[SdsMessageID] + +proc new*( + T: type JsonMissingDependenciesEvent, + messageId: SdsMessageID, + missingDeps: seq[SdsMessageID], +): T = + return JsonMissingDependenciesEvent( + eventType: "missing_dependencies", messageId: messageId, missingDeps: missingDeps + ) + +method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string = + $(%*jsonMissingDependencies) diff --git a/library/events/json_periodic_sync_event.nim b/library/events/json_periodic_sync_event.nim new file mode 100644 index 0000000..03e875a --- /dev/null +++ b/library/events/json_periodic_sync_event.nim @@ -0,0 +1,10 @@ +import std/json +import ./json_base_event + +type JsonPeriodicSyncEvent* = ref object of JsonEvent + +proc new*(T: type JsonPeriodicSyncEvent): T = + return JsonPeriodicSyncEvent(eventType: "periodic_sync") + +method `$`*(jsonPeriodicSync: JsonPeriodicSyncEvent): string = + $(%*jsonPeriodicSync) diff --git a/library/ffi_types.nim b/library/ffi_types.nim new file mode 100644 index 0000000..e2445a2 --- /dev/null +++ b/library/ffi_types.nim @@ -0,0 +1,30 @@ +################################################################################ +### Exported types + +type SdsCallBack* = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} + +const RET_OK*: cint = 0 +const RET_ERR*: cint = 1 +const RET_MISSING_CALLBACK*: cint = 2 + +### End of exported types +################################################################################ + +################################################################################ +### FFI utils + +template foreignThreadGc*(body: untyped) = + when declared(setupForeignThreadGc): + setupForeignThreadGc() + + body + + when declared(tearDownForeignThreadGc): + tearDownForeignThreadGc() + +type onDone* = proc() + +### End of FFI utils +################################################################################ diff --git a/library/libsds.h b/library/libsds.h new file mode 100644 index 0000000..044151a --- /dev/null +++ b/library/libsds.h @@ -0,0 +1,62 @@ + +// Generated manually and inspired by the one generated by the Nim Compiler. +// In order to see the header file generated by Nim just run `make libsds` +// from the root repo folder and the header should be created in +// nimcache/release/libsds/libsds.h +#ifndef __libsds__ +#define __libsds__ + +#include +#include + +// The possible returned values for the functions that return int +#define RET_OK 0 +#define RET_ERR 1 +#define RET_MISSING_CALLBACK 2 + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData); + + +// --- Core API Functions --- + + +void* NewReliabilityManager(const char* channelId, SdsCallBack callback, void* userData); + +void SetEventCallback(void* ctx, SdsCallBack callback, void* userData); + +int CleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData); + +int ResetReliabilityManager(void* ctx, SdsCallBack callback, void* userData); + +int WrapOutgoingMessage(void* ctx, + void* message, + size_t messageLen, + const char* messageId, + SdsCallBack callback, + void* userData); + +int UnwrapReceivedMessage(void* ctx, + void* message, + size_t messageLen, + SdsCallBack callback, + void* userData); + +int MarkDependenciesMet(void* ctx, + char** messageIDs, + size_t count, + SdsCallBack callback, + void* userData); + +int StartPeriodicTasks(void* ctx, SdsCallBack callback, void* userData); + + + +#ifdef __cplusplus +} +#endif + +#endif /* __libsds__ */ \ No newline at end of file diff --git a/library/libsds.nim b/library/libsds.nim new file mode 100644 index 0000000..90d3b83 --- /dev/null +++ b/library/libsds.nim @@ -0,0 +1,304 @@ +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +when defined(linux): + {.passl: "-Wl,-soname,libsds.so".} + +import std/[locks, typetraits, tables, atomics], chronos, chronicles +import + ./sds_thread/sds_thread, + ./alloc, + ./ffi_types, + ./sds_thread/inter_thread_communication/sds_thread_request, + ./sds_thread/inter_thread_communication/requests/ + [sds_lifecycle_request, sds_message_request, sds_dependencies_request], + ../src/[reliability, reliability_utils, message], + ./events/[ + json_message_ready_event, json_message_sent_event, json_missing_dependencies_event, + json_periodic_sync_event, + ] + +################################################################################ +### Wrapper around the reliability manager +################################################################################ + +################################################################################ +### Not-exported components + +template checkLibsdsParams*( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +) = + ctx[].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK + +template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped) = + if isNil(ctx[].eventCallback): + error eventName & " - eventCallback is nil" + return + + if isNil(ctx[].eventUserData): + error eventName & " - eventUserData is nil" + return + + foreignThreadGc: + try: + let event = body + cast[SdsCallBack](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except Exception, CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[SdsCallBack](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) + +proc handleRequest( + ctx: ptr SdsContext, + requestType: RequestType, + content: pointer, + callback: SdsCallBack, + userData: pointer, +): cint = + sds_thread.sendRequestToSdsThread(ctx, requestType, content, callback, userData).isOkOr: + let msg = "libsds error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + return RET_OK + +proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback = + return proc(messageId: SdsMessageID) {.gcsafe.} = + callEventCallback(ctx, "onMessageReady"): + $JsonMessageReadyEvent.new(messageId) + +proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback = + return proc(messageId: SdsMessageID) {.gcsafe.} = + callEventCallback(ctx, "onMessageSent"): + $JsonMessageSentEvent.new(messageId) + +proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback = + return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + callEventCallback(ctx, "onMissingDependencies"): + $JsonMissingDependenciesEvent.new(messageId, missingDeps) + +proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback = + return proc() {.gcsafe.} = + callEventCallback(ctx, "onPeriodicSync"): + $JsonPeriodicSyncEvent.new() + +### End of not-exported components +################################################################################ + +################################################################################ +### Library setup + +# Every Nim library must have this function called - the name is derived from +# the `--nimMainPrefix` command line option +proc libsdsNimMain() {.importc.} + +# To control when the library has been initialized +var initialized: Atomic[bool] + +if defined(android): + # Redirect chronicles to Android System logs + when compiles(defaultChroniclesStream.outputs[0].writer): + defaultChroniclesStream.outputs[0].writer = proc( + logLevel: LogLevel, msg: LogOutputStr + ) {.raises: [].} = + echo logLevel, msg + +proc initializeLibrary() {.exported.} = + if not initialized.exchange(true): + ## Every Nim library needs to call `NimMain` once exactly, to initialize the Nim runtime. + ## Being `` the value given in the optional compilation flag --nimMainPrefix:yourprefix + libsdsNimMain() + when declared(setupForeignThreadGc): + setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + +### End of library setup +################################################################################ + +################################################################################ +### Exported procs + +proc NewReliabilityManager( + channelId: cstring, callback: SdsCallBack, userData: pointer +): pointer {.dynlib, exportc, cdecl.} = + initializeLibrary() + + ## Creates a new instance of the Reliability Manager. + if isNil(callback): + echo "error: missing callback in NewReliabilityManager" + return nil + + ## Create the SDS thread that will keep waiting for req from the main thread. + var ctx = sds_thread.createSdsThread().valueOr: + let msg = "Error in createSdsThread: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return nil + + ctx.userData = userData + + let appCallbacks = AppCallbacks( + messageReadyCb: onMessageReady(ctx), + messageSentCb: onMessageSent(ctx), + missingDependenciesCb: onMissingDependencies(ctx), + periodicSyncCb: onPeriodicSync(ctx), + ) + + let retCode = handleRequest( + ctx, + RequestType.LIFECYCLE, + SdsLifecycleRequest.createShared( + SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, channelId, appCallbacks + ), + callback, + userData, + ) + + if retCode == RET_ERR: + return nil + + return ctx + +proc SetEventCallback( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +) {.dynlib, exportc.} = + initializeLibrary() + ctx[].eventCallback = cast[pointer](callback) + ctx[].eventUserData = userData + +proc CleanupReliabilityManager( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + sds_thread.destroySdsThread(ctx).isOkOr: + let msg = "libsds error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + ## always need to invoke the callback although we don't retrieve value to the caller + callback(RET_OK, nil, 0, userData) + + return RET_OK + +proc ResetReliabilityManager( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + handleRequest( + ctx, + RequestType.LIFECYCLE, + SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER), + callback, + userData, + ) + +proc WrapOutgoingMessage( + ctx: ptr SdsContext, + message: pointer, + messageLen: csize_t, + messageId: cstring, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if message == nil and messageLen > 0: + let msg = "libsds error: " & "message pointer is NULL but length > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + if messageId == nil: + let msg = "libsds error: " & "message ID pointer is NULL" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + handleRequest( + ctx, + RequestType.MESSAGE, + SdsMessageRequest.createShared( + SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId + ), + callback, + userData, + ) + +proc UnwrapReceivedMessage( + ctx: ptr SdsContext, + message: pointer, + messageLen: csize_t, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if message == nil and messageLen > 0: + let msg = "libsds error: " & "message pointer is NULL but length > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + handleRequest( + ctx, + RequestType.MESSAGE, + SdsMessageRequest.createShared( + SdsMessageMsgType.UNWRAP_MESSAGE, message, messageLen + ), + callback, + userData, + ) + +proc MarkDependenciesMet( + ctx: ptr SdsContext, + messageIds: pointer, + count: csize_t, + callback: SdsCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + + if messageIds == nil and count > 0: + let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + handleRequest( + ctx, + RequestType.DEPENDENCIES, + SdsDependenciesRequest.createShared( + SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count + ), + callback, + userData, + ) + +proc StartPeriodicTasks( + ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibsdsParams(ctx, callback, userData) + handleRequest( + ctx, + RequestType.LIFECYCLE, + SdsLifecycleRequest.createShared(SdsLifecycleMsgType.START_PERIODIC_TASKS), + callback, + userData, + ) + +### End of exported procs +################################################################################ diff --git a/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim new file mode 100644 index 0000000..ebb45c2 --- /dev/null +++ b/library/sds_thread/inter_thread_communication/requests/sds_dependencies_request.nim @@ -0,0 +1,48 @@ +import std/[options, json, strutils, net, sequtils] +import chronos, chronicles, results, confutils, confutils/std/net + +import ../../../alloc +import ../../../../src/[reliability_utils, reliability, message] + +type SdsDependenciesMsgType* = enum + MARK_DEPENDENCIES_MET + +type SdsDependenciesRequest* = object + operation: SdsDependenciesMsgType + messageIds: SharedSeq[cstring] + count: csize_t + +proc createShared*( + T: type SdsDependenciesRequest, + op: SdsDependenciesMsgType, + messageIds: pointer, + count: csize_t = 0, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].count = count + ret[].messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int) + return ret + +proc destroyShared(self: ptr SdsDependenciesRequest) = + deallocSharedSeq(self[].messageIds) + deallocShared(self) + +proc process*( + self: ptr SdsDependenciesRequest, rm: ptr ReliabilityManager +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of MARK_DEPENDENCIES_MET: + let messageIdsC = self.messageIds.toSeq() + let messageIds = messageIdsC.mapIt($it) + + markDependenciesMet(rm[], messageIds).isOkOr: + error "MARK_DEPENDENCIES_MET failed", error = error + return err("error processing MARK_DEPENDENCIES_MET request: " & $error) + + return ok("") + + return ok("") diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim new file mode 100644 index 0000000..2307fce --- /dev/null +++ b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim @@ -0,0 +1,70 @@ +import std/[options, json, strutils, net] +import chronos, chronicles, results, confutils, confutils/std/net + +import ../../../alloc +import ../../../../src/[reliability_utils, reliability, message] + +type SdsLifecycleMsgType* = enum + CREATE_RELIABILITY_MANAGER + RESET_RELIABILITY_MANAGER + START_PERIODIC_TASKS + +type SdsLifecycleRequest* = object + operation: SdsLifecycleMsgType + channelId: cstring + appCallbacks: AppCallbacks + +proc createShared*( + T: type SdsLifecycleRequest, + op: SdsLifecycleMsgType, + channelId: cstring = "", + appCallbacks: AppCallbacks = nil, +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].appCallbacks = appCallbacks + ret[].channelId = channelId.alloc() + return ret + +proc destroyShared(self: ptr SdsLifecycleRequest) = + deallocShared(self[].channelId) + deallocShared(self) + +proc createReliabilityManager( + channelIdCStr: cstring, appCallbacks: AppCallbacks = nil +): Future[Result[ReliabilityManager, string]] {.async.} = + let channelId = $channelIdCStr + if channelId.len == 0: + error "Failed creating ReliabilityManager: Channel ID cannot be empty" + return err("Failed creating ReliabilityManager: Channel ID cannot be empty") + + let rm = newReliabilityManager(some(channelId)).valueOr: + error "Failed creating reliability manager", error = error + return err("Failed creating reliability manager: " & $error) + + rm.setCallbacks( + appCallbacks.messageReadyCb, appCallbacks.messageSentCb, + appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb, + ) + + return ok(rm) + +proc process*( + self: ptr SdsLifecycleRequest, rm: ptr ReliabilityManager +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of CREATE_RELIABILITY_MANAGER: + rm[] = (await createReliabilityManager(self.channelId, self.appCallbacks)).valueOr: + error "CREATE_RELIABILITY_MANAGER failed", error = error + return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error) + of RESET_RELIABILITY_MANAGER: + resetReliabilityManager(rm[]).isOkOr: + error "RESET_RELIABILITY_MANAGER failed", error = error + return err("error processing RESET_RELIABILITY_MANAGER request: " & $error) + of START_PERIODIC_TASKS: + rm[].startPeriodicTasks() + + return ok("") diff --git a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim new file mode 100644 index 0000000..476a65e --- /dev/null +++ b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim @@ -0,0 +1,69 @@ +import std/[options, json, strutils, net, sequtils] +import chronos, chronicles, results, confutils, confutils/std/net + +import ../../../alloc +import ../../../../src/[reliability_utils, reliability, message] + +type SdsMessageMsgType* = enum + WRAP_MESSAGE + UNWRAP_MESSAGE + +type SdsMessageRequest* = object + operation: SdsMessageMsgType + message: SharedSeq[byte] + messageLen: csize_t + messageId: cstring + +type SdsUnwrapResponse* = object + message*: seq[byte] + missingDeps*: seq[SdsMessageID] + +proc createShared*( + T: type SdsMessageRequest, + op: SdsMessageMsgType, + message: pointer, + messageLen: csize_t = 0, + messageId: cstring = "", +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].messageLen = messageLen + ret[].messageId = messageId.alloc() + ret[].message = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int) + + return ret + +proc destroyShared(self: ptr SdsMessageRequest) = + deallocSharedSeq(self[].message) + deallocShared(self[].messageId) + deallocShared(self) + +proc process*( + self: ptr SdsMessageRequest, rm: ptr ReliabilityManager +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of WRAP_MESSAGE: + let messageBytes = self.message.toSeq() + + let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId).valueOr: + error "WRAP_MESSAGE failed", error = error + return err("error processing WRAP_MESSAGE request: " & $error) + + # returns a comma-separates string of bytes + return ok(wrappedMessage.mapIt($it).join(",")) + of UNWRAP_MESSAGE: + let messageBytes = self.message.toSeq() + + let (unwrappedMessage, missingDeps) = unwrapReceivedMessage(rm[], messageBytes).valueOr: + error "UNWRAP_MESSAGE failed", error = error + return err("error processing UNWRAP_MESSAGE request: " & $error) + + let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps) + + # return the result as a json string + return ok($(%*(res))) + + return ok("") diff --git a/library/sds_thread/inter_thread_communication/sds_thread_request.nim b/library/sds_thread/inter_thread_communication/sds_thread_request.nim new file mode 100644 index 0000000..f40bef4 --- /dev/null +++ b/library/sds_thread/inter_thread_communication/sds_thread_request.nim @@ -0,0 +1,78 @@ +## This file contains the base message request type that will be handled. +## The requests are created by the main thread and processed by +## the SDS Thread. + +import std/json, results +import chronos, chronos/threadsync +import + ../../ffi_types, + ./requests/[sds_lifecycle_request, sds_message_request, sds_dependencies_request], + ../../../src/[reliability_utils] + +type RequestType* {.pure.} = enum + LIFECYCLE + MESSAGE + DEPENDENCIES + +type SdsThreadRequest* = object + reqType: RequestType + reqContent: pointer + callback: SdsCallBack + userData: pointer + +proc createShared*( + T: type SdsThreadRequest, + reqType: RequestType, + reqContent: pointer, + callback: SdsCallBack, + userData: pointer, +): ptr type T = + var ret = createShared(T) + ret[].reqType = reqType + ret[].reqContent = reqContent + ret[].callback = callback + ret[].userData = userData + return ret + +proc handleRes[T: string | void]( + res: Result[T, string], request: ptr SdsThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + + defer: + deallocShared(request) + + if res.isErr(): + foreignThreadGc: + let msg = "libsds error: handleRes fireSyncRes error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + +proc process*( + T: type SdsThreadRequest, request: ptr SdsThreadRequest, rm: ptr ReliabilityManager +) {.async.} = + let retFut = + case request[].reqType + of LIFECYCLE: + cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm) + of MESSAGE: + cast[ptr SdsMessageRequest](request[].reqContent).process(rm) + of DEPENDENCIES: + cast[ptr SdsDependenciesRequest](request[].reqContent).process(rm) + + handleRes(await retFut, request) + +proc `$`*(self: SdsThreadRequest): string = + return $self.reqType diff --git a/library/sds_thread/sds_thread.nim b/library/sds_thread/sds_thread.nim new file mode 100644 index 0000000..4a2cce5 --- /dev/null +++ b/library/sds_thread/sds_thread.nim @@ -0,0 +1,132 @@ +{.pragma: exported, exportc, cdecl, raises: [].} +{.pragma: callback, cdecl, raises: [], gcsafe.} +{.passc: "-fPIC".} + +import std/[options, atomics, os, net, locks] +import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results +import + ../ffi_types, + ./inter_thread_communication/sds_thread_request, + ../../src/[reliability_utils] + +type SdsContext* = object + thread: Thread[(ptr SdsContext)] + lock: Lock + reqChannel: ChannelSPSCSingle[ptr SdsThreadRequest] + reqSignal: ThreadSignalPtr + # to inform The SDS Thread (a.k.a TST) that a new request is sent + reqReceivedSignal: ThreadSignalPtr + # to inform the main thread that the request is rx by TST + userData*: pointer + eventCallback*: pointer + eventUserdata*: pointer + running: Atomic[bool] # To control when the thread is running + +proc runSds(ctx: ptr SdsContext) {.async.} = + ## This is the worker body. This runs the SDS instance + ## and attends library user requests (stop, connect_to, etc.) + + var rm: ReliabilityManager + + while true: + await ctx.reqSignal.wait() + + if ctx.running.load == false: + break + + ## Trying to get a request from the libsds requestor thread + var request: ptr SdsThreadRequest + let recvOk = ctx.reqChannel.tryRecv(request) + if not recvOk: + error "sds thread could not receive a request" + continue + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + ## Handle the request + asyncSpawn SdsThreadRequest.process(request, addr rm) + +proc run(ctx: ptr SdsContext) {.thread.} = + ## Launch sds worker + waitFor runSds(ctx) + +proc createSdsThread*(): Result[ptr SdsContext, string] = + ## This proc is called from the main thread and it creates + ## the SDS working thread. + var ctx = createShared(SdsContext, 1) + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqSignal ThreadSignalPtr") + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.lock.initLock() + + ctx.running.store(true) + + try: + createThread(ctx.thread, run, ctx) + except ValueError, ResourceExhaustedError: + # and freeShared for typed allocations! + freeShared(ctx) + + return err("failed to create the SDS thread: " & getCurrentExceptionMsg()) + + return ok(ctx) + +proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] = + ctx.running.store(false) + + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("error in destroySdsThread: " & $error) + if not signaledOnTime: + return err("failed to signal reqSignal on time in destroySdsThread") + + joinThread(ctx.thread) + ctx.lock.deinitLock() + ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() + freeShared(ctx) + + return ok() + +proc sendRequestToSdsThread*( + ctx: ptr SdsContext, + reqType: RequestType, + reqContent: pointer, + callback: SdsCallBack, + userData: pointer, +): Result[void, string] = + let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData) + + # This lock is only necessary while we use a SP Channel and while the signalling + # between threads assumes that there aren't concurrent requests. + # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive + # requests concurrently and spare us the need of locks + ctx.lock.acquire() + defer: + ctx.lock.release() + ## Sending the request + let sentOk = ctx.reqChannel.trySend(req) + if not sentOk: + deallocShared(req) + return err("Couldn't send a request to the sds thread: " & $req[]) + + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + deallocShared(req) + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + deallocShared(req) + return err("Couldn't fireSync in time") + + ## wait until the SDS Thread properly received the request + let res = ctx.reqReceivedSignal.waitSync() + if res.isErr(): + deallocShared(req) + return err("Couldn't receive reqReceivedSignal signal") + + ## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the + ## process proc. + ok() diff --git a/sds.nimble b/sds.nimble new file mode 100644 index 0000000..2362c28 --- /dev/null +++ b/sds.nimble @@ -0,0 +1,39 @@ +# Package +version = "0.1.0" +author = "Waku Team" +description = "E2E Reliability Protocol API" +license = "MIT" +srcDir = "src" + +# Dependencies +requires "nim >= 2.0.8" +requires "chronicles" +requires "libp2p" + +proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") = + if not dirExists "build": + mkDir "build" + # allow something like "nim nimbus --verbosity:0 --hints:off nimbus.nims" + var extra_params = params + for i in 2 ..< paramCount(): + extra_params &= " " & paramStr(i) + if `type` == "static": + exec "nim c" & " --out:build/" & name & + ".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --undef:metrics --nimMainPrefix:libsds --skipParentCfg:on " & + extra_params & " " & srcDir & name & ".nim" + else: + exec "nim c" & " --out:build/" & name & + ".so --threads:on --app:lib --opt:size --noMain --mm:refc --header --undef:metrics --nimMainPrefix:libsds --skipParentCfg:on " & + extra_params & " " & srcDir & name & ".nim" + +# Tasks +task test, "Run the test suite": + exec "nim c -r tests/test_bloom.nim" + exec "nim c -r tests/test_reliability.nim" + +task libsdsDynamic, "Generate bindings": + let name = "libsds" + buildLibrary name, + "library/", + "", + "dynamic" \ No newline at end of file diff --git a/src/message.nim b/src/message.nim index 83d1f3a..4f6640c 100644 --- a/src/message.nim +++ b/src/message.nim @@ -1,14 +1,14 @@ -import std/times +import std/[times, options, sets] type - SdsMessageID* = seq[byte] - SdsChannelID* = seq[byte] + SdsMessageID* = string + SdsChannelID* = string SdsMessage* = object messageId*: SdsMessageID lamportTimestamp*: int64 causalHistory*: seq[SdsMessageID] - channelId*: SdsChannelID + channelId*: Option[SdsChannelID] content*: seq[byte] bloomFilter*: seq[byte] @@ -17,6 +17,10 @@ type sendTime*: Time resendAttempts*: int + IncomingMessage* = object + message*: SdsMessage + missingDeps*: HashSet[SdsMessageID] + const DefaultMaxMessageHistory* = 1000 DefaultMaxCausalHistory* = 10 diff --git a/src/protobuf.nim b/src/protobuf.nim index 5229182..6a147c8 100644 --- a/src/protobuf.nim +++ b/src/protobuf.nim @@ -12,7 +12,8 @@ proc encode*(msg: SdsMessage): ProtoBuffer = for hist in msg.causalHistory: pb.write(3, hist) - pb.write(4, msg.channelId) + if msg.channelId.isSome(): + pb.write(4, msg.channelId.get()) pb.write(5, msg.content) pb.write(6, msg.bloomFilter) pb.finish() @@ -31,13 +32,16 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] = return err(ProtobufError.missingRequiredField("lamportTimestamp")) msg.lamportTimestamp = int64(timestamp) - var causalHistory: seq[seq[byte]] + var causalHistory: seq[SdsMessageID] let histResult = pb.getRepeatedField(3, causalHistory) if histResult.isOk: msg.causalHistory = causalHistory - if not ?pb.getField(4, msg.channelId): - return err(ProtobufError.missingRequiredField("channelId")) + var channelId: SdsChannelID + if ?pb.getField(4, channelId): + msg.channelId = some(channelId) + else: + msg.channelId = none[SdsChannelID]() if not ?pb.getField(5, msg.content): return err(ProtobufError.missingRequiredField("content")) diff --git a/src/reliability.nim b/src/reliability.nim new file mode 100644 index 0000000..97a39a9 --- /dev/null +++ b/src/reliability.nim @@ -0,0 +1,349 @@ +import std/[times, locks, tables, sets, options] +import chronos, results, chronicles +import ./[message, protobuf, reliability_utils, rolling_bloom_filter] + +proc newReliabilityManager*( + channelId: Option[SdsChannelID], config: ReliabilityConfig = defaultConfig() +): Result[ReliabilityManager, ReliabilityError] = + ## Creates a new ReliabilityManager with the specified channel ID and configuration. + ## + ## Parameters: + ## - channelId: A unique identifier for the communication channel. + ## - config: Configuration options for the ReliabilityManager. If not provided, default configuration is used. + ## + ## Returns: + ## A Result containing either a new ReliabilityManager instance or an error. + if not channelId.isSome(): + return err(ReliabilityError.reInvalidArgument) + + try: + let bloomFilter = + newRollingBloomFilter(config.bloomFilterCapacity, config.bloomFilterErrorRate) + + let rm = ReliabilityManager( + lamportTimestamp: 0, + messageHistory: @[], + bloomFilter: bloomFilter, + outgoingBuffer: @[], + incomingBuffer: initTable[SdsMessageID, IncomingMessage](), + channelId: channelId, + config: config, + ) + initLock(rm.lock) + return ok(rm) + except Exception: + error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reOutOfMemory) + +proc isAcknowledged*( + msg: UnacknowledgedMessage, + causalHistory: seq[SdsMessageID], + rbf: Option[RollingBloomFilter], +): bool = + if msg.message.messageId in causalHistory: + return true + + if rbf.isSome(): + return rbf.get().contains(msg.message.messageId) + + false + +proc reviewAckStatus(rm: ReliabilityManager, msg: SdsMessage) {.gcsafe.} = + # Parse bloom filter + var rbf: Option[RollingBloomFilter] + if msg.bloomFilter.len > 0: + let bfResult = deserializeBloomFilter(msg.bloomFilter) + if bfResult.isOk(): + rbf = some( + RollingBloomFilter( + filter: bfResult.get(), + capacity: bfResult.get().capacity, + minCapacity: ( + bfResult.get().capacity.float * (100 - CapacityFlexPercent).float / 100.0 + ).int, + maxCapacity: ( + bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0 + ).int, + messages: @[], + ) + ) + else: + error "Failed to deserialize bloom filter", error = bfResult.error + rbf = none[RollingBloomFilter]() + else: + rbf = none[RollingBloomFilter]() + + # Keep track of indices to delete + var toDelete: seq[int] = @[] + var i = 0 + + while i < rm.outgoingBuffer.len: + let outMsg = rm.outgoingBuffer[i] + if outMsg.isAcknowledged(msg.causalHistory, rbf): + if not rm.onMessageSent.isNil(): + rm.onMessageSent(outMsg.message.messageId) + toDelete.add(i) + inc i + + for i in countdown(toDelete.high, 0): # Delete in reverse order to maintain indices + rm.outgoingBuffer.delete(toDelete[i]) + +proc wrapOutgoingMessage*( + rm: ReliabilityManager, message: seq[byte], messageId: SdsMessageID +): Result[seq[byte], ReliabilityError] = + ## Wraps an outgoing message with reliability metadata. + ## + ## Parameters: + ## - message: The content of the message to be sent. + ## - messageId: Unique identifier for the message + ## + ## Returns: + ## A Result containing either wrapped message bytes or an error. + if message.len == 0: + return err(ReliabilityError.reInvalidArgument) + if message.len > MaxMessageSize: + return err(ReliabilityError.reMessageTooLarge) + + withLock rm.lock: + try: + rm.updateLamportTimestamp(getTime().toUnix) + + let bfResult = serializeBloomFilter(rm.bloomFilter.filter) + if bfResult.isErr: + error "Failed to serialize bloom filter" + return err(ReliabilityError.reSerializationError) + + let msg = SdsMessage( + messageId: messageId, + lamportTimestamp: rm.lamportTimestamp, + causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory), + channelId: rm.channelId, + content: message, + bloomFilter: bfResult.get(), + ) + + # Add to outgoing buffer + rm.outgoingBuffer.add( + UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0) + ) + + # Add to causal history and bloom filter + rm.bloomFilter.add(msg.messageId) + rm.addToHistory(msg.messageId) + + return serializeMessage(msg) + except Exception: + error "Failed to wrap message", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reSerializationError) + +proc processIncomingBuffer(rm: ReliabilityManager) {.gcsafe.} = + withLock rm.lock: + if rm.incomingBuffer.len == 0: + return + + var processed = initHashSet[SdsMessageID]() + var readyToProcess = newSeq[SdsMessageID]() + + # Find initially ready messages + for msgId, entry in rm.incomingBuffer: + if entry.missingDeps.len == 0: + readyToProcess.add(msgId) + + while readyToProcess.len > 0: + let msgId = readyToProcess.pop() + if msgId in processed: + continue + + if msgId in rm.incomingBuffer: + rm.addToHistory(msgId) + if not rm.onMessageReady.isNil(): + rm.onMessageReady(msgId) + processed.incl(msgId) + + # Update dependencies for remaining messages + for remainingId, entry in rm.incomingBuffer: + if remainingId notin processed: + if msgId in entry.missingDeps: + rm.incomingBuffer[remainingId].missingDeps.excl(msgId) + if rm.incomingBuffer[remainingId].missingDeps.len == 0: + readyToProcess.add(remainingId) + + # Remove processed messages + for msgId in processed: + rm.incomingBuffer.del(msgId) + +proc unwrapReceivedMessage*( + rm: ReliabilityManager, message: seq[byte] +): Result[tuple[message: seq[byte], missingDeps: seq[SdsMessageID]], ReliabilityError] = + ## Unwraps a received message and processes its reliability metadata. + ## + ## Parameters: + ## - message: The received message bytes + ## + ## Returns: + ## A Result containing either tuple of (processed message, missing dependencies) or an error. + try: + let msg = deserializeMessage(message).valueOr: + return err(ReliabilityError.reDeserializationError) + + if msg.messageId in rm.messageHistory: + return ok((msg.content, @[])) + + rm.bloomFilter.add(msg.messageId) + + # Update Lamport timestamp + rm.updateLamportTimestamp(msg.lamportTimestamp) + + # Review ACK status for outgoing messages + rm.reviewAckStatus(msg) + + var missingDeps = rm.checkDependencies(msg.causalHistory) + + if missingDeps.len == 0: + # Check if any dependencies are still in incoming buffer + var depsInBuffer = false + for msgId, entry in rm.incomingBuffer.pairs(): + if msgId in msg.causalHistory: + depsInBuffer = true + break + + if depsInBuffer: + rm.incomingBuffer[msg.messageId] = + IncomingMessage(message: msg, missingDeps: initHashSet[SdsMessageID]()) + else: + # All dependencies met, add to history + rm.addToHistory(msg.messageId) + rm.processIncomingBuffer() + if not rm.onMessageReady.isNil(): + rm.onMessageReady(msg.messageId) + else: + rm.incomingBuffer[msg.messageId] = + IncomingMessage(message: msg, missingDeps: missingDeps.toHashSet()) + if not rm.onMissingDependencies.isNil(): + rm.onMissingDependencies(msg.messageId, missingDeps) + + return ok((msg.content, missingDeps)) + except Exception: + error "Failed to unwrap message", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reDeserializationError) + +proc markDependenciesMet*( + rm: ReliabilityManager, messageIds: seq[SdsMessageID] +): Result[void, ReliabilityError] = + ## Marks the specified message dependencies as met. + ## + ## Parameters: + ## - messageIds: A sequence of message IDs to mark as met. + ## + ## Returns: + ## A Result indicating success or an error. + try: + # Add all messageIds to bloom filter + for msgId in messageIds: + if not rm.bloomFilter.contains(msgId): + rm.bloomFilter.add(msgId) + # rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application? + + # Update any pending messages that depend on this one + for pendingId, entry in rm.incomingBuffer: + if msgId in entry.missingDeps: + rm.incomingBuffer[pendingId].missingDeps.excl(msgId) + + rm.processIncomingBuffer() + return ok() + except Exception: + error "Failed to mark dependencies as met", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reInternalError) + +proc setCallbacks*( + rm: ReliabilityManager, + onMessageReady: MessageReadyCallback, + onMessageSent: MessageSentCallback, + onMissingDependencies: MissingDependenciesCallback, + onPeriodicSync: PeriodicSyncCallback = nil, +) = + ## Sets the callback functions for various events in the ReliabilityManager. + ## + ## Parameters: + ## - onMessageReady: Callback function called when a message is ready to be processed. + ## - onMessageSent: Callback function called when a message is confirmed as sent. + ## - onMissingDependencies: Callback function called when a message has missing dependencies. + ## - onPeriodicSync: Callback function called to notify about periodic sync + withLock rm.lock: + rm.onMessageReady = onMessageReady + rm.onMessageSent = onMessageSent + rm.onMissingDependencies = onMissingDependencies + rm.onPeriodicSync = onPeriodicSync + +proc checkUnacknowledgedMessages(rm: ReliabilityManager) {.gcsafe.} = + ## Checks and processes unacknowledged messages in the outgoing buffer. + withLock rm.lock: + let now = getTime() + var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[] + + for unackMsg in rm.outgoingBuffer: + let elapsed = now - unackMsg.sendTime + if elapsed > rm.config.resendInterval: + # Time to attempt resend + if unackMsg.resendAttempts < rm.config.maxResendAttempts: + var updatedMsg = unackMsg + updatedMsg.resendAttempts += 1 + updatedMsg.sendTime = now + newOutgoingBuffer.add(updatedMsg) + else: + if not rm.onMessageSent.isNil(): + rm.onMessageSent(unackMsg.message.messageId) + else: + newOutgoingBuffer.add(unackMsg) + + rm.outgoingBuffer = newOutgoingBuffer + +proc periodicBufferSweep( + rm: ReliabilityManager +) {.async: (raises: [CancelledError]), gcsafe.} = + ## Periodically sweeps the buffer to clean up and check unacknowledged messages. + while true: + try: + rm.checkUnacknowledgedMessages() + rm.cleanBloomFilter() + except Exception: + error "Error in periodic buffer sweep", msg = getCurrentExceptionMsg() + + await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds)) + +proc periodicSyncMessage( + rm: ReliabilityManager +) {.async: (raises: [CancelledError]), gcsafe.} = + ## Periodically notifies to send a sync message to maintain connectivity. + while true: + try: + if not rm.onPeriodicSync.isNil(): + rm.onPeriodicSync() + except Exception: + error "Error in periodic sync", msg = getCurrentExceptionMsg() + await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds)) + +proc startPeriodicTasks*(rm: ReliabilityManager) = + ## Starts the periodic tasks for buffer sweeping and sync message sending. + ## + ## This procedure should be called after creating a ReliabilityManager to enable automatic maintenance. + asyncSpawn rm.periodicBufferSweep() + asyncSpawn rm.periodicSyncMessage() + +proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityError] = + ## Resets the ReliabilityManager to its initial state. + ## + ## This procedure clears all buffers and resets the Lamport timestamp. + withLock rm.lock: + try: + rm.lamportTimestamp = 0 + rm.messageHistory.setLen(0) + rm.outgoingBuffer.setLen(0) + rm.incomingBuffer.clear() + rm.bloomFilter = newRollingBloomFilter( + rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate + ) + return ok() + except Exception: + error "Failed to reset ReliabilityManager", msg = getCurrentExceptionMsg() + return err(ReliabilityError.reInternalError) diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index a8d376f..3cc23fa 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -1,10 +1,23 @@ -import std/[times, locks] +import std/[times, locks, options] import chronicles import ./[rolling_bloom_filter, message] type + MessageReadyCallback* = proc(messageId: SdsMessageID) {.gcsafe.} + + MessageSentCallback* = proc(messageId: SdsMessageID) {.gcsafe.} + + MissingDependenciesCallback* = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} + PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} + AppCallbacks* = ref object + messageReadyCb*: MessageReadyCallback + messageSentCb*: MessageSentCallback + missingDependenciesCb*: MissingDependenciesCallback + periodicSyncCb*: PeriodicSyncCallback + ReliabilityConfig* = object bloomFilterCapacity*: int bloomFilterErrorRate*: float @@ -20,8 +33,8 @@ type messageHistory*: seq[SdsMessageID] bloomFilter*: RollingBloomFilter outgoingBuffer*: seq[UnacknowledgedMessage] - incomingBuffer*: seq[SdsMessage] - channelId*: SdsChannelID + incomingBuffer*: Table[SdsMessageID, IncomingMessage] + channelId*: Option[SdsChannelID] config*: ReliabilityConfig lock*: Lock onMessageReady*: proc(messageId: SdsMessageID) {.gcsafe.} @@ -59,7 +72,7 @@ proc cleanup*(rm: ReliabilityManager) {.raises: [].} = try: withLock rm.lock: rm.outgoingBuffer.setLen(0) - rm.incomingBuffer.setLen(0) + rm.incomingBuffer.clear() rm.messageHistory.setLen(0) except Exception: error "Error during cleanup", error = getCurrentExceptionMsg() @@ -84,6 +97,15 @@ proc updateLamportTimestamp*( proc getRecentSdsMessageIDs*(rm: ReliabilityManager, n: int): seq[SdsMessageID] = result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1] +proc checkDependencies*( + rm: ReliabilityManager, deps: seq[SdsMessageID] +): seq[SdsMessageID] = + var missingDeps: seq[SdsMessageID] = @[] + for depId in deps: + if depId notin rm.messageHistory: + missingDeps.add(depId) + return missingDeps + proc getMessageHistory*(rm: ReliabilityManager): seq[SdsMessageID] = withLock rm.lock: result = rm.messageHistory @@ -92,6 +114,8 @@ proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] = withLock rm.lock: result = rm.outgoingBuffer -proc getIncomingBuffer*(rm: ReliabilityManager): seq[SdsMessage] = +proc getIncomingBuffer*( + rm: ReliabilityManager +): Table[SdsMessageID, message.IncomingMessage] = withLock rm.lock: result = rm.incomingBuffer diff --git a/tests/test_bloom.nim b/tests/test_bloom.nim index 540735d..ad88bba 100644 --- a/tests/test_bloom.nim +++ b/tests/test_bloom.nim @@ -1,7 +1,6 @@ import unittest, results, strutils import ../src/bloom from random import rand, randomize -import ../src/[message, protobuf, protobufutil, reliability_utils, rolling_bloom_filter] suite "bloom filter": setup: diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim new file mode 100644 index 0000000..a0b2135 --- /dev/null +++ b/tests/test_reliability.nim @@ -0,0 +1,496 @@ +import unittest, results, chronos, std/[times, options, tables] +import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter] + +# Core functionality tests +suite "Core Operations": + var rm: ReliabilityManager + + setup: + let rmResult = newReliabilityManager(some("testChannel")) + check rmResult.isOk() + rm = rmResult.get() + + teardown: + if not rm.isNil: + rm.cleanup() + + test "can create with default config": + let config = defaultConfig() + check: + config.bloomFilterCapacity == DefaultBloomFilterCapacity + config.bloomFilterErrorRate == DefaultBloomFilterErrorRate + config.maxMessageHistory == DefaultMaxMessageHistory + + test "basic message wrapping and unwrapping": + let msg = @[byte(1), 2, 3] + let msgId = "test-msg-1" + + let wrappedResult = rm.wrapOutgoingMessage(msg, msgId) + check wrappedResult.isOk() + let wrapped = wrappedResult.get() + check wrapped.len > 0 + + let unwrapResult = rm.unwrapReceivedMessage(wrapped) + check unwrapResult.isOk() + let (unwrapped, missingDeps) = unwrapResult.get() + check: + unwrapped == msg + missingDeps.len == 0 + + test "message ordering": + # Create messages with different timestamps + let msg1 = SdsMessage( + messageId: "msg1", + lamportTimestamp: 1, + causalHistory: @[], + channelId: some("testChannel"), + content: @[byte(1)], + bloomFilter: @[], + ) + + let msg2 = SdsMessage( + messageId: "msg2", + lamportTimestamp: 5, + causalHistory: @[], + channelId: some("testChannel"), + content: @[byte(2)], + bloomFilter: @[], + ) + + let serialized1 = serializeMessage(msg1) + let serialized2 = serializeMessage(msg2) + check: + serialized1.isOk() + serialized2.isOk() + + # Process out of order + discard rm.unwrapReceivedMessage(serialized2.get()) + let timestamp1 = rm.lamportTimestamp + discard rm.unwrapReceivedMessage(serialized1.get()) + let timestamp2 = rm.lamportTimestamp + + check timestamp2 > timestamp1 + +# Reliability mechanism tests +suite "Reliability Mechanisms": + var rm: ReliabilityManager + + setup: + let rmResult = newReliabilityManager(some("testChannel")) + check rmResult.isOk() + rm = rmResult.get() + + teardown: + if not rm.isNil: + rm.cleanup() + + test "dependency detection and resolution": + var messageReadyCount = 0 + var messageSentCount = 0 + var missingDepsCount = 0 + + rm.setCallbacks( + proc(messageId: SdsMessageID) {.gcsafe.} = + messageReadyCount += 1, + proc(messageId: SdsMessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + missingDepsCount += 1, + ) + + # Create dependency chain: msg3 -> msg2 -> msg1 + let id1 = "msg1" + let id2 = "msg2" + let id3 = "msg3" + + # Create messages with dependencies + let msg2 = SdsMessage( + messageId: id2, + lamportTimestamp: 2, + causalHistory: @[id1], # msg2 depends on msg1 + channelId: some("testChannel"), + content: @[byte(2)], + bloomFilter: @[], + ) + + let msg3 = SdsMessage( + messageId: id3, + lamportTimestamp: 3, + causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2 + channelId: some("testChannel"), + content: @[byte(3)], + bloomFilter: @[], + ) + + let serialized2 = serializeMessage(msg2) + let serialized3 = serializeMessage(msg3) + check: + serialized2.isOk() + serialized3.isOk() + + # First try processing msg3 (which depends on msg2 which depends on msg1) + let unwrapResult3 = rm.unwrapReceivedMessage(serialized3.get()) + check unwrapResult3.isOk() + let (_, missingDeps3) = unwrapResult3.get() + + check: + missingDepsCount == 1 # Should trigger missing deps callback + missingDeps3.len == 2 # Should be missing both msg1 and msg2 + id1 in missingDeps3 + id2 in missingDeps3 + + # Then try processing msg2 (which only depends on msg1) + let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get()) + check unwrapResult2.isOk() + let (_, missingDeps2) = unwrapResult2.get() + + check: + missingDepsCount == 2 # Should have triggered another missing deps callback + missingDeps2.len == 1 # Should only be missing msg1 + id1 in missingDeps2 + messageReadyCount == 0 # No messages should be ready yet + + # Mark first dependency (msg1) as met + let markResult1 = rm.markDependenciesMet(@[id1]) + check markResult1.isOk() + + let incomingBuffer = rm.getIncomingBuffer() + + check: + incomingBuffer.len == 0 + messageReadyCount == 2 # Both msg2 and msg3 should be ready + missingDepsCount == 2 # Should still be 2 from the initial missing deps + + test "acknowledgment via causal history": + var messageReadyCount = 0 + var messageSentCount = 0 + var missingDepsCount = 0 + + rm.setCallbacks( + proc(messageId: SdsMessageID) {.gcsafe.} = + messageReadyCount += 1, + proc(messageId: SdsMessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + missingDepsCount += 1, + ) + + # Send our message + let msg1 = @[byte(1)] + let id1 = "msg1" + let wrap1 = rm.wrapOutgoingMessage(msg1, id1) + check wrap1.isOk() + + # Create a message that has our message in causal history + let msg2 = SdsMessage( + messageId: "msg2", + lamportTimestamp: rm.lamportTimestamp + 1, + causalHistory: @[id1], # Include our message in causal history + channelId: some("testChannel"), + content: @[byte(2)], + bloomFilter: @[] # Test with an empty bloom filter + , + ) + + let serializedMsg2 = serializeMessage(msg2) + check serializedMsg2.isOk() + + # Process the "received" message - should trigger callbacks + let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get()) + check unwrapResult.isOk() + + check: + messageReadyCount == 1 # For msg2 which we "received" + messageSentCount == 1 # For msg1 which was acknowledged via causal history + + test "acknowledgment via bloom filter": + var messageSentCount = 0 + + rm.setCallbacks( + proc(messageId: SdsMessageID) {.gcsafe.} = + discard, + proc(messageId: SdsMessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + discard, + ) + + # Send our message + let msg1 = @[byte(1)] + let id1 = "msg1" + let wrap1 = rm.wrapOutgoingMessage(msg1, id1) + check wrap1.isOk() + + # Create a message with bloom filter containing our message + var otherPartyBloomFilter = + newRollingBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) + otherPartyBloomFilter.add(id1) + + let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter) + check bfResult.isOk() + + let msg2 = SdsMessage( + messageId: "msg2", + lamportTimestamp: rm.lamportTimestamp + 1, + causalHistory: @[], # Empty causal history as we're using bloom filter + channelId: some("testChannel"), + content: @[byte(2)], + bloomFilter: bfResult.get(), + ) + + let serializedMsg2 = serializeMessage(msg2) + check serializedMsg2.isOk() + + let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get()) + check unwrapResult.isOk() + + check messageSentCount == 1 # Our message should be acknowledged via bloom filter + +# Periodic task & Buffer management tests +suite "Periodic Tasks & Buffer Management": + var rm: ReliabilityManager + + setup: + let rmResult = newReliabilityManager(some("testChannel")) + check rmResult.isOk() + rm = rmResult.get() + + teardown: + if not rm.isNil: + rm.cleanup() + + test "outgoing buffer management": + var messageSentCount = 0 + + rm.setCallbacks( + proc(messageId: SdsMessageID) {.gcsafe.} = + discard, + proc(messageId: SdsMessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + discard, + ) + + # Add multiple messages + for i in 0 .. 5: + let msg = @[byte(i)] + let id = "msg" & $i + let wrap = rm.wrapOutgoingMessage(msg, id) + check wrap.isOk() + + let outBuffer = rm.getOutgoingBuffer() + check outBuffer.len == 6 + + # Create message that acknowledges some messages + let ackMsg = SdsMessage( + messageId: "ack1", + lamportTimestamp: rm.lamportTimestamp + 1, + causalHistory: @["msg0", "msg2", "msg4"], + channelId: some("testChannel"), + content: @[byte(100)], + bloomFilter: @[], + ) + + let serializedAck = serializeMessage(ackMsg) + check serializedAck.isOk() + + # Process the acknowledgment + discard rm.unwrapReceivedMessage(serializedAck.get()) + + let finalBuffer = rm.getOutgoingBuffer() + check: + finalBuffer.len == 3 # Should have removed acknowledged messages + messageSentCount == 3 + # Should have triggered sent callback for acknowledged messages + + test "periodic buffer sweep and bloom clean": + var messageSentCount = 0 + + var config = defaultConfig() + config.resendInterval = initDuration(milliseconds = 100) # Short for testing + config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps + config.bloomFilterCapacity = 2 # Small capacity for testing + config.maxResendAttempts = 3 # Set a low number of max attempts + + let rmResultP = newReliabilityManager(some("testChannel"), config) + check rmResultP.isOk() + let rm = rmResultP.get() + + rm.setCallbacks( + proc(messageId: SdsMessageID) {.gcsafe.} = + discard, + proc(messageId: SdsMessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + discard, + ) + + # First message - should be cleaned from bloom filter later + let msg1 = @[byte(1)] + let id1 = "msg1" + let wrap1 = rm.wrapOutgoingMessage(msg1, id1) + check wrap1.isOk() + + let initialBuffer = rm.getOutgoingBuffer() + check: + initialBuffer[0].resendAttempts == 0 + rm.bloomFilter.contains(id1) + + rm.startPeriodicTasks() + + # Wait long enough for bloom filter + waitFor sleepAsync(chronos.milliseconds(500)) + + # Add new messages + let msg2 = @[byte(2)] + let id2 = "msg2" + let wrap2 = rm.wrapOutgoingMessage(msg2, id2) + check wrap2.isOk() + + let msg3 = @[byte(3)] + let id3 = "msg3" + let wrap3 = rm.wrapOutgoingMessage(msg3, id3) + check wrap3.isOk() + + let finalBuffer = rm.getOutgoingBuffer() + check: + finalBuffer.len == 2 + # Only msg2 and msg3 should be in buffer, msg1 should be removed after max retries + finalBuffer[0].message.messageId == id2 # Verify it's the second message + finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts + not rm.bloomFilter.contains(id1) # Bloom filter cleaning check + rm.bloomFilter.contains(id3) # New message still in filter + + rm.cleanup() + + test "periodic sync callback": + var syncCallCount = 0 + rm.setCallbacks( + proc(messageId: SdsMessageID) {.gcsafe.} = + discard, + proc(messageId: SdsMessageID) {.gcsafe.} = + discard, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + discard, + proc() {.gcsafe.} = + syncCallCount += 1, + ) + + rm.startPeriodicTasks() + waitFor sleepAsync(chronos.seconds(1)) + rm.cleanup() + + check syncCallCount > 0 + +# Special cases handling +suite "Special Cases Handling": + var rm: ReliabilityManager + + setup: + let rmResult = newReliabilityManager(some("testChannel")) + check rmResult.isOk() + rm = rmResult.get() + + teardown: + if not rm.isNil: + rm.cleanup() + + test "message history limits": + # Add messages up to max history size + for i in 0 .. rm.config.maxMessageHistory + 5: + let msg = @[byte(i)] + let id = "msg" & $i + let wrap = rm.wrapOutgoingMessage(msg, id) + check wrap.isOk() + + let history = rm.getMessageHistory() + check: + history.len <= rm.config.maxMessageHistory + history[^1] == "msg" & $(rm.config.maxMessageHistory + 5) + + test "invalid bloom filter handling": + let msgInvalid = SdsMessage( + messageId: "invalid-bf", + lamportTimestamp: 1, + causalHistory: @[], + channelId: some("testChannel"), + content: @[byte(1)], + bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data + , + ) + + let serializedInvalid = serializeMessage(msgInvalid) + check serializedInvalid.isOk() + + # Should handle invalid bloom filter gracefully + let result = rm.unwrapReceivedMessage(serializedInvalid.get()) + check: + result.isOk() + result.get()[1].len == 0 # No missing dependencies + + test "duplicate message handling": + var messageReadyCount = 0 + rm.setCallbacks( + proc(messageId: SdsMessageID) {.gcsafe.} = + messageReadyCount += 1, + proc(messageId: SdsMessageID) {.gcsafe.} = + discard, + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + discard, + ) + + # Create and process a message + let msg = SdsMessage( + messageId: "dup-msg", + lamportTimestamp: 1, + causalHistory: @[], + channelId: some("testChannel"), + content: @[byte(1)], + bloomFilter: @[], + ) + + let serialized = serializeMessage(msg) + check serialized.isOk() + + # Process same message twice + let result1 = rm.unwrapReceivedMessage(serialized.get()) + check result1.isOk() + let result2 = rm.unwrapReceivedMessage(serialized.get()) + check: + result2.isOk() + result2.get()[1].len == 0 # No missing deps on second process + messageReadyCount == 1 # Message should only be processed once + + test "error handling": + # Empty message + let emptyMsg: seq[byte] = @[] + let emptyResult = rm.wrapOutgoingMessage(emptyMsg, "empty") + check: + not emptyResult.isOk() + emptyResult.error == reInvalidArgument + + # Oversized message + let largeMsg = newSeq[byte](MaxMessageSize + 1) + let largeResult = rm.wrapOutgoingMessage(largeMsg, "large") + check: + not largeResult.isOk() + largeResult.error == reMessageTooLarge + +suite "cleanup": + test "cleanup works correctly": + let rmResult = newReliabilityManager(some("testChannel")) + check rmResult.isOk() + let rm = rmResult.get() + + # Add some messages + let msg = @[byte(1), 2, 3] + let msgId = "test-msg-1" + discard rm.wrapOutgoingMessage(msg, msgId) + + rm.cleanup() + + let outBuffer = rm.getOutgoingBuffer() + let history = rm.getMessageHistory() + check: + outBuffer.len == 0 + history.len == 0 diff --git a/vendor/nim-chronicles b/vendor/nim-chronicles new file mode 160000 index 0000000..a8fb38a --- /dev/null +++ b/vendor/nim-chronicles @@ -0,0 +1 @@ +Subproject commit a8fb38a10bcb548df78e9a70bd77b26bb50abd12 diff --git a/vendor/nim-chronos b/vendor/nim-chronos new file mode 160000 index 0000000..b55e281 --- /dev/null +++ b/vendor/nim-chronos @@ -0,0 +1 @@ +Subproject commit b55e2816eb45f698ddaca8d8473e401502562db2 diff --git a/vendor/nim-confutils b/vendor/nim-confutils new file mode 160000 index 0000000..e214b39 --- /dev/null +++ b/vendor/nim-confutils @@ -0,0 +1 @@ +Subproject commit e214b3992a31acece6a9aada7d0a1ad37c928f3b diff --git a/vendor/nim-faststreams b/vendor/nim-faststreams new file mode 160000 index 0000000..2b08c77 --- /dev/null +++ b/vendor/nim-faststreams @@ -0,0 +1 @@ +Subproject commit 2b08c774afaafd600cf4c6f994cf78b8aa090c0c diff --git a/vendor/nim-json-serialization b/vendor/nim-json-serialization new file mode 160000 index 0000000..2b1c5eb --- /dev/null +++ b/vendor/nim-json-serialization @@ -0,0 +1 @@ +Subproject commit 2b1c5eb11df3647a2cee107cd4cce3593cbb8bcf diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p new file mode 160000 index 0000000..ac25da6 --- /dev/null +++ b/vendor/nim-libp2p @@ -0,0 +1 @@ +Subproject commit ac25da6cea158768bbc060b7be2fbe004206f3bb diff --git a/vendor/nim-results b/vendor/nim-results new file mode 160000 index 0000000..df8113d --- /dev/null +++ b/vendor/nim-results @@ -0,0 +1 @@ +Subproject commit df8113dda4c2d74d460a8fa98252b0b771bf1f27 diff --git a/vendor/nim-serialization b/vendor/nim-serialization new file mode 160000 index 0000000..548d0ad --- /dev/null +++ b/vendor/nim-serialization @@ -0,0 +1 @@ +Subproject commit 548d0adc9797a10b2db7f788b804330306293088 diff --git a/vendor/nim-stew b/vendor/nim-stew new file mode 160000 index 0000000..d7a6868 --- /dev/null +++ b/vendor/nim-stew @@ -0,0 +1 @@ +Subproject commit d7a6868ba84165e7fdde427af9a1fc3f5f5cc151 diff --git a/vendor/nim-taskpools b/vendor/nim-taskpools new file mode 160000 index 0000000..7b74a71 --- /dev/null +++ b/vendor/nim-taskpools @@ -0,0 +1 @@ +Subproject commit 7b74a716a40249720fd7da428113147942b9642d diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system new file mode 160000 index 0000000..5f10509 --- /dev/null +++ b/vendor/nimbus-build-system @@ -0,0 +1 @@ +Subproject commit 5f10509cf880dc035e517ca7bac3163cd5206ba8