mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-02 14:13:07 +00:00
feat: extensive set of initial features.
This commit is contained in:
parent
89160b58d4
commit
074eafe895
19
.gitignore
vendored
19
.gitignore
vendored
@ -1,7 +1,16 @@
|
|||||||
nimcache
|
|
||||||
nimcache/*
|
|
||||||
tests/test_bloom
|
|
||||||
nim-bloom/bloom
|
|
||||||
.DS_Store
|
.DS_Store
|
||||||
src/.DS_Store
|
tests/test_reliability
|
||||||
|
tests/bloom
|
||||||
nph
|
nph
|
||||||
|
docs
|
||||||
|
for_reference
|
||||||
|
do_not_commit
|
||||||
|
build/*
|
||||||
|
sds.nims
|
||||||
|
/.update.timestamp
|
||||||
|
|
||||||
|
# Nimbus Build System
|
||||||
|
nimbus-build-system.paths
|
||||||
|
|
||||||
|
# Nimble packages
|
||||||
|
/vendor/.nimble
|
||||||
|
|||||||
55
.gitmodules
vendored
Normal file
55
.gitmodules
vendored
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
[submodule "vendor/nimbus-build-system"]
|
||||||
|
path = vendor/nimbus-build-system
|
||||||
|
url = https://github.com/status-im/nimbus-build-system.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-chronos"]
|
||||||
|
path = vendor/nim-chronos
|
||||||
|
url = https://github.com/status-im/nim-chronos.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-results"]
|
||||||
|
path = vendor/nim-results
|
||||||
|
url = https://github.com/arnetheduck/nim-results.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-stew"]
|
||||||
|
path = vendor/nim-stew
|
||||||
|
url = https://github.com/status-im/nim-stew.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-chronicles"]
|
||||||
|
path = vendor/nim-chronicles
|
||||||
|
url = https://github.com/status-im/nim-chronicles.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-faststreams"]
|
||||||
|
path = vendor/nim-faststreams
|
||||||
|
url = https://github.com/status-im/nim-faststreams.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-json-serialization"]
|
||||||
|
path = vendor/nim-json-serialization
|
||||||
|
url = https://github.com/status-im/nim-json-serialization.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-serialization"]
|
||||||
|
path = vendor/nim-serialization
|
||||||
|
url = https://github.com/status-im/nim-serialization.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-taskpools"]
|
||||||
|
path = vendor/nim-taskpools
|
||||||
|
url = https://github.com/status-im/nim-taskpools.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-confutils"]
|
||||||
|
path = vendor/nim-confutils
|
||||||
|
url = https://github.com/status-im/nim-confutils.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
|
[submodule "vendor/nim-libp2p"]
|
||||||
|
path = vendor/nim-libp2p
|
||||||
|
url = https://github.com/vacp2p/nim-libp2p.git
|
||||||
|
ignore = untracked
|
||||||
|
branch = master
|
||||||
51
Makefile
Normal file
51
Makefile
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
.PHONY: libsds
|
||||||
|
|
||||||
|
export BUILD_SYSTEM_DIR := vendor/nimbus-build-system
|
||||||
|
# we don't want an error here, so we can handle things later, in the ".DEFAULT" target
|
||||||
|
-include $(BUILD_SYSTEM_DIR)/makefiles/variables.mk
|
||||||
|
|
||||||
|
ifeq ($(NIM_PARAMS),)
|
||||||
|
# "variables.mk" was not included, so we update the submodules.
|
||||||
|
GIT_SUBMODULE_UPDATE := git submodule update --init --recursive
|
||||||
|
.DEFAULT:
|
||||||
|
+@ echo -e "Git submodules not found. Running '$(GIT_SUBMODULE_UPDATE)'.\n"; \
|
||||||
|
$(GIT_SUBMODULE_UPDATE); \
|
||||||
|
echo
|
||||||
|
# Now that the included *.mk files appeared, and are newer than this file, Make will restart itself:
|
||||||
|
# https://www.gnu.org/software/make/manual/make.html#Remaking-Makefiles
|
||||||
|
#
|
||||||
|
# After restarting, it will execute its original goal, so we don't have to start a child Make here
|
||||||
|
# with "$(MAKE) $(MAKECMDGOALS)". Isn't hidden control flow great?
|
||||||
|
|
||||||
|
else # "variables.mk" was included. Business as usual until the end of this file.
|
||||||
|
|
||||||
|
# default target, because it's the first one that doesn't start with '.'
|
||||||
|
all: | libsds
|
||||||
|
|
||||||
|
sds.nims:
|
||||||
|
ln -s sds.nimble $@
|
||||||
|
|
||||||
|
update: | update-common
|
||||||
|
rm -rf sds.nims && \
|
||||||
|
$(MAKE) sds.nims $(HANDLE_OUTPUT)
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -rf build
|
||||||
|
|
||||||
|
deps: | sds.nims
|
||||||
|
|
||||||
|
# must be included after the default target
|
||||||
|
-include $(BUILD_SYSTEM_DIR)/makefiles/targets.mk
|
||||||
|
|
||||||
|
STATIC ?= 0
|
||||||
|
|
||||||
|
libsds: deps
|
||||||
|
rm -f build/libsds*
|
||||||
|
ifeq ($(STATIC), 1)
|
||||||
|
echo -e $(BUILD_MSG) "build/$@.a" && \
|
||||||
|
$(ENV_SCRIPT) nim libsdsStatic $(NIM_PARAMS) sds.nims
|
||||||
|
else
|
||||||
|
echo -e $(BUILD_MSG) "build/$@.so" && \
|
||||||
|
$(ENV_SCRIPT) nim libsdsDynamic $(NIM_PARAMS) sds.nims
|
||||||
|
endif
|
||||||
|
endif
|
||||||
8
env.sh
Normal file
8
env.sh
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# We use ${BASH_SOURCE[0]} instead of $0 to allow sourcing this file
|
||||||
|
# and we fall back to a Zsh-specific special var to also support Zsh.
|
||||||
|
REL_PATH="$(dirname ${BASH_SOURCE[0]:-${(%):-%x}})"
|
||||||
|
ABS_PATH="$(cd ${REL_PATH}; pwd)"
|
||||||
|
source ${ABS_PATH}/vendor/nimbus-build-system/scripts/env.sh
|
||||||
|
|
||||||
73
library/alloc.nim
Normal file
73
library/alloc.nim
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
## Can be shared safely between threads
|
||||||
|
type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
|
||||||
|
|
||||||
|
proc alloc*(str: cstring): cstring =
|
||||||
|
# Byte allocation from the given address.
|
||||||
|
# There should be the corresponding manual deallocation with deallocShared !
|
||||||
|
if str.isNil():
|
||||||
|
var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator
|
||||||
|
ret[0] = '\0' # Set the null terminator
|
||||||
|
return ret
|
||||||
|
|
||||||
|
let ret = cast[cstring](allocShared(len(str) + 1))
|
||||||
|
copyMem(ret, str, len(str) + 1)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc alloc*(str: string): cstring =
|
||||||
|
## Byte allocation from the given address.
|
||||||
|
## There should be the corresponding manual deallocation with deallocShared !
|
||||||
|
var ret = cast[cstring](allocShared(str.len + 1))
|
||||||
|
let s = cast[seq[char]](str)
|
||||||
|
for i in 0 ..< str.len:
|
||||||
|
ret[i] = s[i]
|
||||||
|
ret[str.len] = '\0'
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
|
||||||
|
let data = allocShared(sizeof(T) * s.len)
|
||||||
|
if s.len != 0:
|
||||||
|
copyMem(data, unsafeAddr s[0], s.len)
|
||||||
|
return (cast[ptr UncheckedArray[T]](data), s.len)
|
||||||
|
|
||||||
|
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
|
||||||
|
if not s.data.isNil:
|
||||||
|
when T is cstring:
|
||||||
|
# For array of cstrings, deallocate each string first
|
||||||
|
for i in 0 ..< s.len:
|
||||||
|
if not s.data[i].isNil:
|
||||||
|
# Deallocate each cstring
|
||||||
|
deallocShared(s.data[i])
|
||||||
|
|
||||||
|
deallocShared(s.data)
|
||||||
|
s.len = 0
|
||||||
|
|
||||||
|
proc toSeq*[T](s: SharedSeq[T]): seq[T] =
|
||||||
|
## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required
|
||||||
|
## as req[T] is a GC managed type.
|
||||||
|
var ret = newSeq[T]()
|
||||||
|
for i in 0 ..< s.len:
|
||||||
|
ret.add(s.data[i])
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc allocSharedSeqFromCArray*[T](arr: ptr T, len: int): SharedSeq[T] =
|
||||||
|
## Creates a SharedSeq[T] from a C array pointer and length.
|
||||||
|
## The data is copied to shared memory.
|
||||||
|
## There should be a corresponding manual deallocation with deallocSharedSeq!
|
||||||
|
if arr.isNil or len <= 0:
|
||||||
|
return (nil, 0)
|
||||||
|
|
||||||
|
when T is cstring:
|
||||||
|
# Special handling for arrays of cstrings
|
||||||
|
let data = cast[ptr UncheckedArray[cstring]](allocShared(sizeof(cstring) * len))
|
||||||
|
let cstrArr = cast[ptr UncheckedArray[cstring]](arr)
|
||||||
|
|
||||||
|
for i in 0 ..< len:
|
||||||
|
# Use the existing alloc proc to properly allocate each cstring
|
||||||
|
data[i] = cstrArr[i].alloc()
|
||||||
|
|
||||||
|
return (data, len)
|
||||||
|
else:
|
||||||
|
# Original handling for non-cstring types
|
||||||
|
let data = allocShared(sizeof(T) * len)
|
||||||
|
copyMem(data, arr, sizeof(T) * len)
|
||||||
|
return (cast[ptr UncheckedArray[T]](data), len)
|
||||||
6
library/events/json_base_event.nim
Normal file
6
library/events/json_base_event.nim
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
type JsonEvent* = ref object of RootObj # https://rfc.vac.dev/spec/36/#jsonsignal-type
|
||||||
|
eventType* {.requiresInit.}: string
|
||||||
|
|
||||||
|
method `$`*(jsonEvent: JsonEvent): string {.base.} =
|
||||||
|
discard
|
||||||
|
# All events should implement this
|
||||||
11
library/events/json_message_ready_event.nim
Normal file
11
library/events/json_message_ready_event.nim
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
import std/json
|
||||||
|
import ./json_base_event, ../../src/[message]
|
||||||
|
|
||||||
|
type JsonMessageReadyEvent* = ref object of JsonEvent
|
||||||
|
messageId*: SdsMessageID
|
||||||
|
|
||||||
|
proc new*(T: type JsonMessageReadyEvent, messageId: SdsMessageID): T =
|
||||||
|
return JsonMessageReadyEvent(eventType: "message_ready", messageId: messageId)
|
||||||
|
|
||||||
|
method `$`*(jsonMessageReady: JsonMessageReadyEvent): string =
|
||||||
|
$(%*jsonMessageReady)
|
||||||
11
library/events/json_message_sent_event.nim
Normal file
11
library/events/json_message_sent_event.nim
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
import std/json
|
||||||
|
import ./json_base_event, ../../src/[message]
|
||||||
|
|
||||||
|
type JsonMessageSentEvent* = ref object of JsonEvent
|
||||||
|
messageId*: SdsMessageID
|
||||||
|
|
||||||
|
proc new*(T: type JsonMessageSentEvent, messageId: SdsMessageID): T =
|
||||||
|
return JsonMessageSentEvent(eventType: "message_sent", messageId: messageId)
|
||||||
|
|
||||||
|
method `$`*(jsonMessageSent: JsonMessageSentEvent): string =
|
||||||
|
$(%*jsonMessageSent)
|
||||||
18
library/events/json_missing_dependencies_event.nim
Normal file
18
library/events/json_missing_dependencies_event.nim
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
import std/json
|
||||||
|
import ./json_base_event, ../../src/[message]
|
||||||
|
|
||||||
|
type JsonMissingDependenciesEvent* = ref object of JsonEvent
|
||||||
|
messageId*: SdsMessageID
|
||||||
|
missingDeps: seq[SdsMessageID]
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type JsonMissingDependenciesEvent,
|
||||||
|
messageId: SdsMessageID,
|
||||||
|
missingDeps: seq[SdsMessageID],
|
||||||
|
): T =
|
||||||
|
return JsonMissingDependenciesEvent(
|
||||||
|
eventType: "missing_dependencies", messageId: messageId, missingDeps: missingDeps
|
||||||
|
)
|
||||||
|
|
||||||
|
method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string =
|
||||||
|
$(%*jsonMissingDependencies)
|
||||||
10
library/events/json_periodic_sync_event.nim
Normal file
10
library/events/json_periodic_sync_event.nim
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
import std/json
|
||||||
|
import ./json_base_event
|
||||||
|
|
||||||
|
type JsonPeriodicSyncEvent* = ref object of JsonEvent
|
||||||
|
|
||||||
|
proc new*(T: type JsonPeriodicSyncEvent): T =
|
||||||
|
return JsonPeriodicSyncEvent(eventType: "periodic_sync")
|
||||||
|
|
||||||
|
method `$`*(jsonPeriodicSync: JsonPeriodicSyncEvent): string =
|
||||||
|
$(%*jsonPeriodicSync)
|
||||||
30
library/ffi_types.nim
Normal file
30
library/ffi_types.nim
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
################################################################################
|
||||||
|
### Exported types
|
||||||
|
|
||||||
|
type SdsCallBack* = proc(
|
||||||
|
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||||
|
) {.cdecl, gcsafe, raises: [].}
|
||||||
|
|
||||||
|
const RET_OK*: cint = 0
|
||||||
|
const RET_ERR*: cint = 1
|
||||||
|
const RET_MISSING_CALLBACK*: cint = 2
|
||||||
|
|
||||||
|
### End of exported types
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### FFI utils
|
||||||
|
|
||||||
|
template foreignThreadGc*(body: untyped) =
|
||||||
|
when declared(setupForeignThreadGc):
|
||||||
|
setupForeignThreadGc()
|
||||||
|
|
||||||
|
body
|
||||||
|
|
||||||
|
when declared(tearDownForeignThreadGc):
|
||||||
|
tearDownForeignThreadGc()
|
||||||
|
|
||||||
|
type onDone* = proc()
|
||||||
|
|
||||||
|
### End of FFI utils
|
||||||
|
################################################################################
|
||||||
62
library/libsds.h
Normal file
62
library/libsds.h
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
|
||||||
|
// Generated manually and inspired by the one generated by the Nim Compiler.
|
||||||
|
// In order to see the header file generated by Nim just run `make libsds`
|
||||||
|
// from the root repo folder and the header should be created in
|
||||||
|
// nimcache/release/libsds/libsds.h
|
||||||
|
#ifndef __libsds__
|
||||||
|
#define __libsds__
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
// The possible returned values for the functions that return int
|
||||||
|
#define RET_OK 0
|
||||||
|
#define RET_ERR 1
|
||||||
|
#define RET_MISSING_CALLBACK 2
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData);
|
||||||
|
|
||||||
|
|
||||||
|
// --- Core API Functions ---
|
||||||
|
|
||||||
|
|
||||||
|
void* NewReliabilityManager(const char* channelId, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
void SetEventCallback(void* ctx, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
int CleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
int ResetReliabilityManager(void* ctx, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
int WrapOutgoingMessage(void* ctx,
|
||||||
|
void* message,
|
||||||
|
size_t messageLen,
|
||||||
|
const char* messageId,
|
||||||
|
SdsCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
|
int UnwrapReceivedMessage(void* ctx,
|
||||||
|
void* message,
|
||||||
|
size_t messageLen,
|
||||||
|
SdsCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
|
int MarkDependenciesMet(void* ctx,
|
||||||
|
char** messageIDs,
|
||||||
|
size_t count,
|
||||||
|
SdsCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
|
int StartPeriodicTasks(void* ctx, SdsCallBack callback, void* userData);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* __libsds__ */
|
||||||
304
library/libsds.nim
Normal file
304
library/libsds.nim
Normal file
@ -0,0 +1,304 @@
|
|||||||
|
{.pragma: exported, exportc, cdecl, raises: [].}
|
||||||
|
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||||
|
{.passc: "-fPIC".}
|
||||||
|
|
||||||
|
when defined(linux):
|
||||||
|
{.passl: "-Wl,-soname,libsds.so".}
|
||||||
|
|
||||||
|
import std/[locks, typetraits, tables, atomics], chronos, chronicles
|
||||||
|
import
|
||||||
|
./sds_thread/sds_thread,
|
||||||
|
./alloc,
|
||||||
|
./ffi_types,
|
||||||
|
./sds_thread/inter_thread_communication/sds_thread_request,
|
||||||
|
./sds_thread/inter_thread_communication/requests/
|
||||||
|
[sds_lifecycle_request, sds_message_request, sds_dependencies_request],
|
||||||
|
../src/[reliability, reliability_utils, message],
|
||||||
|
./events/[
|
||||||
|
json_message_ready_event, json_message_sent_event, json_missing_dependencies_event,
|
||||||
|
json_periodic_sync_event,
|
||||||
|
]
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### Wrapper around the reliability manager
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### Not-exported components
|
||||||
|
|
||||||
|
template checkLibsdsParams*(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
) =
|
||||||
|
ctx[].userData = userData
|
||||||
|
|
||||||
|
if isNil(callback):
|
||||||
|
return RET_MISSING_CALLBACK
|
||||||
|
|
||||||
|
template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped) =
|
||||||
|
if isNil(ctx[].eventCallback):
|
||||||
|
error eventName & " - eventCallback is nil"
|
||||||
|
return
|
||||||
|
|
||||||
|
if isNil(ctx[].eventUserData):
|
||||||
|
error eventName & " - eventUserData is nil"
|
||||||
|
return
|
||||||
|
|
||||||
|
foreignThreadGc:
|
||||||
|
try:
|
||||||
|
let event = body
|
||||||
|
cast[SdsCallBack](ctx[].eventCallback)(
|
||||||
|
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
|
||||||
|
)
|
||||||
|
except Exception, CatchableError:
|
||||||
|
let msg =
|
||||||
|
"Exception " & eventName & " when calling 'eventCallBack': " &
|
||||||
|
getCurrentExceptionMsg()
|
||||||
|
cast[SdsCallBack](ctx[].eventCallback)(
|
||||||
|
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||||
|
)
|
||||||
|
|
||||||
|
proc handleRequest(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
requestType: RequestType,
|
||||||
|
content: pointer,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint =
|
||||||
|
sds_thread.sendRequestToSdsThread(ctx, requestType, content, callback, userData).isOkOr:
|
||||||
|
let msg = "libsds error: " & $error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
return RET_OK
|
||||||
|
|
||||||
|
proc onMessageReady(ctx: ptr SdsContext): MessageReadyCallback =
|
||||||
|
return proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
callEventCallback(ctx, "onMessageReady"):
|
||||||
|
$JsonMessageReadyEvent.new(messageId)
|
||||||
|
|
||||||
|
proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback =
|
||||||
|
return proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
callEventCallback(ctx, "onMessageSent"):
|
||||||
|
$JsonMessageSentEvent.new(messageId)
|
||||||
|
|
||||||
|
proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback =
|
||||||
|
return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||||
|
callEventCallback(ctx, "onMissingDependencies"):
|
||||||
|
$JsonMissingDependenciesEvent.new(messageId, missingDeps)
|
||||||
|
|
||||||
|
proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback =
|
||||||
|
return proc() {.gcsafe.} =
|
||||||
|
callEventCallback(ctx, "onPeriodicSync"):
|
||||||
|
$JsonPeriodicSyncEvent.new()
|
||||||
|
|
||||||
|
### End of not-exported components
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### Library setup
|
||||||
|
|
||||||
|
# Every Nim library must have this function called - the name is derived from
|
||||||
|
# the `--nimMainPrefix` command line option
|
||||||
|
proc libsdsNimMain() {.importc.}
|
||||||
|
|
||||||
|
# To control when the library has been initialized
|
||||||
|
var initialized: Atomic[bool]
|
||||||
|
|
||||||
|
if defined(android):
|
||||||
|
# Redirect chronicles to Android System logs
|
||||||
|
when compiles(defaultChroniclesStream.outputs[0].writer):
|
||||||
|
defaultChroniclesStream.outputs[0].writer = proc(
|
||||||
|
logLevel: LogLevel, msg: LogOutputStr
|
||||||
|
) {.raises: [].} =
|
||||||
|
echo logLevel, msg
|
||||||
|
|
||||||
|
proc initializeLibrary() {.exported.} =
|
||||||
|
if not initialized.exchange(true):
|
||||||
|
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
|
||||||
|
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
|
||||||
|
libsdsNimMain()
|
||||||
|
when declared(setupForeignThreadGc):
|
||||||
|
setupForeignThreadGc()
|
||||||
|
when declared(nimGC_setStackBottom):
|
||||||
|
var locals {.volatile, noinit.}: pointer
|
||||||
|
locals = addr(locals)
|
||||||
|
nimGC_setStackBottom(locals)
|
||||||
|
|
||||||
|
### End of library setup
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
################################################################################
|
||||||
|
### Exported procs
|
||||||
|
|
||||||
|
proc NewReliabilityManager(
|
||||||
|
channelId: cstring, callback: SdsCallBack, userData: pointer
|
||||||
|
): pointer {.dynlib, exportc, cdecl.} =
|
||||||
|
initializeLibrary()
|
||||||
|
|
||||||
|
## Creates a new instance of the Reliability Manager.
|
||||||
|
if isNil(callback):
|
||||||
|
echo "error: missing callback in NewReliabilityManager"
|
||||||
|
return nil
|
||||||
|
|
||||||
|
## Create the SDS thread that will keep waiting for req from the main thread.
|
||||||
|
var ctx = sds_thread.createSdsThread().valueOr:
|
||||||
|
let msg = "Error in createSdsThread: " & $error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return nil
|
||||||
|
|
||||||
|
ctx.userData = userData
|
||||||
|
|
||||||
|
let appCallbacks = AppCallbacks(
|
||||||
|
messageReadyCb: onMessageReady(ctx),
|
||||||
|
messageSentCb: onMessageSent(ctx),
|
||||||
|
missingDependenciesCb: onMissingDependencies(ctx),
|
||||||
|
periodicSyncCb: onPeriodicSync(ctx),
|
||||||
|
)
|
||||||
|
|
||||||
|
let retCode = handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.LIFECYCLE,
|
||||||
|
SdsLifecycleRequest.createShared(
|
||||||
|
SdsLifecycleMsgType.CREATE_RELIABILITY_MANAGER, channelId, appCallbacks
|
||||||
|
),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
if retCode == RET_ERR:
|
||||||
|
return nil
|
||||||
|
|
||||||
|
return ctx
|
||||||
|
|
||||||
|
proc SetEventCallback(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
) {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
ctx[].eventCallback = cast[pointer](callback)
|
||||||
|
ctx[].eventUserData = userData
|
||||||
|
|
||||||
|
proc CleanupReliabilityManager(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
sds_thread.destroySdsThread(ctx).isOkOr:
|
||||||
|
let msg = "libsds error: " & $error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
## always need to invoke the callback although we don't retrieve value to the caller
|
||||||
|
callback(RET_OK, nil, 0, userData)
|
||||||
|
|
||||||
|
return RET_OK
|
||||||
|
|
||||||
|
proc ResetReliabilityManager(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.LIFECYCLE,
|
||||||
|
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc WrapOutgoingMessage(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
message: pointer,
|
||||||
|
messageLen: csize_t,
|
||||||
|
messageId: cstring,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
if message == nil and messageLen > 0:
|
||||||
|
let msg = "libsds error: " & "message pointer is NULL but length > 0"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
if messageId == nil:
|
||||||
|
let msg = "libsds error: " & "message ID pointer is NULL"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.MESSAGE,
|
||||||
|
SdsMessageRequest.createShared(
|
||||||
|
SdsMessageMsgType.WRAP_MESSAGE, message, messageLen, messageId
|
||||||
|
),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc UnwrapReceivedMessage(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
message: pointer,
|
||||||
|
messageLen: csize_t,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
if message == nil and messageLen > 0:
|
||||||
|
let msg = "libsds error: " & "message pointer is NULL but length > 0"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.MESSAGE,
|
||||||
|
SdsMessageRequest.createShared(
|
||||||
|
SdsMessageMsgType.UNWRAP_MESSAGE, message, messageLen
|
||||||
|
),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc MarkDependenciesMet(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
messageIds: pointer,
|
||||||
|
count: csize_t,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
if messageIds == nil and count > 0:
|
||||||
|
let msg = "libsds error: " & "MessageIDs pointer is NULL but count > 0"
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.DEPENDENCIES,
|
||||||
|
SdsDependenciesRequest.createShared(
|
||||||
|
SdsDependenciesMsgType.MARK_DEPENDENCIES_MET, messageIds, count
|
||||||
|
),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc StartPeriodicTasks(
|
||||||
|
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
initializeLibrary()
|
||||||
|
checkLibsdsParams(ctx, callback, userData)
|
||||||
|
handleRequest(
|
||||||
|
ctx,
|
||||||
|
RequestType.LIFECYCLE,
|
||||||
|
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.START_PERIODIC_TASKS),
|
||||||
|
callback,
|
||||||
|
userData,
|
||||||
|
)
|
||||||
|
|
||||||
|
### End of exported procs
|
||||||
|
################################################################################
|
||||||
@ -0,0 +1,48 @@
|
|||||||
|
import std/[options, json, strutils, net, sequtils]
|
||||||
|
import chronos, chronicles, results, confutils, confutils/std/net
|
||||||
|
|
||||||
|
import ../../../alloc
|
||||||
|
import ../../../../src/[reliability_utils, reliability, message]
|
||||||
|
|
||||||
|
type SdsDependenciesMsgType* = enum
|
||||||
|
MARK_DEPENDENCIES_MET
|
||||||
|
|
||||||
|
type SdsDependenciesRequest* = object
|
||||||
|
operation: SdsDependenciesMsgType
|
||||||
|
messageIds: SharedSeq[cstring]
|
||||||
|
count: csize_t
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type SdsDependenciesRequest,
|
||||||
|
op: SdsDependenciesMsgType,
|
||||||
|
messageIds: pointer,
|
||||||
|
count: csize_t = 0,
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].operation = op
|
||||||
|
ret[].count = count
|
||||||
|
ret[].messageIds = allocSharedSeqFromCArray(cast[ptr cstring](messageIds), count.int)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc destroyShared(self: ptr SdsDependenciesRequest) =
|
||||||
|
deallocSharedSeq(self[].messageIds)
|
||||||
|
deallocShared(self)
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
self: ptr SdsDependenciesRequest, rm: ptr ReliabilityManager
|
||||||
|
): Future[Result[string, string]] {.async.} =
|
||||||
|
defer:
|
||||||
|
destroyShared(self)
|
||||||
|
|
||||||
|
case self.operation
|
||||||
|
of MARK_DEPENDENCIES_MET:
|
||||||
|
let messageIdsC = self.messageIds.toSeq()
|
||||||
|
let messageIds = messageIdsC.mapIt($it)
|
||||||
|
|
||||||
|
markDependenciesMet(rm[], messageIds).isOkOr:
|
||||||
|
error "MARK_DEPENDENCIES_MET failed", error = error
|
||||||
|
return err("error processing MARK_DEPENDENCIES_MET request: " & $error)
|
||||||
|
|
||||||
|
return ok("")
|
||||||
|
|
||||||
|
return ok("")
|
||||||
@ -0,0 +1,70 @@
|
|||||||
|
import std/[options, json, strutils, net]
|
||||||
|
import chronos, chronicles, results, confutils, confutils/std/net
|
||||||
|
|
||||||
|
import ../../../alloc
|
||||||
|
import ../../../../src/[reliability_utils, reliability, message]
|
||||||
|
|
||||||
|
type SdsLifecycleMsgType* = enum
|
||||||
|
CREATE_RELIABILITY_MANAGER
|
||||||
|
RESET_RELIABILITY_MANAGER
|
||||||
|
START_PERIODIC_TASKS
|
||||||
|
|
||||||
|
type SdsLifecycleRequest* = object
|
||||||
|
operation: SdsLifecycleMsgType
|
||||||
|
channelId: cstring
|
||||||
|
appCallbacks: AppCallbacks
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type SdsLifecycleRequest,
|
||||||
|
op: SdsLifecycleMsgType,
|
||||||
|
channelId: cstring = "",
|
||||||
|
appCallbacks: AppCallbacks = nil,
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].operation = op
|
||||||
|
ret[].appCallbacks = appCallbacks
|
||||||
|
ret[].channelId = channelId.alloc()
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc destroyShared(self: ptr SdsLifecycleRequest) =
|
||||||
|
deallocShared(self[].channelId)
|
||||||
|
deallocShared(self)
|
||||||
|
|
||||||
|
proc createReliabilityManager(
|
||||||
|
channelIdCStr: cstring, appCallbacks: AppCallbacks = nil
|
||||||
|
): Future[Result[ReliabilityManager, string]] {.async.} =
|
||||||
|
let channelId = $channelIdCStr
|
||||||
|
if channelId.len == 0:
|
||||||
|
error "Failed creating ReliabilityManager: Channel ID cannot be empty"
|
||||||
|
return err("Failed creating ReliabilityManager: Channel ID cannot be empty")
|
||||||
|
|
||||||
|
let rm = newReliabilityManager(some(channelId)).valueOr:
|
||||||
|
error "Failed creating reliability manager", error = error
|
||||||
|
return err("Failed creating reliability manager: " & $error)
|
||||||
|
|
||||||
|
rm.setCallbacks(
|
||||||
|
appCallbacks.messageReadyCb, appCallbacks.messageSentCb,
|
||||||
|
appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb,
|
||||||
|
)
|
||||||
|
|
||||||
|
return ok(rm)
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
self: ptr SdsLifecycleRequest, rm: ptr ReliabilityManager
|
||||||
|
): Future[Result[string, string]] {.async.} =
|
||||||
|
defer:
|
||||||
|
destroyShared(self)
|
||||||
|
|
||||||
|
case self.operation
|
||||||
|
of CREATE_RELIABILITY_MANAGER:
|
||||||
|
rm[] = (await createReliabilityManager(self.channelId, self.appCallbacks)).valueOr:
|
||||||
|
error "CREATE_RELIABILITY_MANAGER failed", error = error
|
||||||
|
return err("error processing CREATE_RELIABILITY_MANAGER request: " & $error)
|
||||||
|
of RESET_RELIABILITY_MANAGER:
|
||||||
|
resetReliabilityManager(rm[]).isOkOr:
|
||||||
|
error "RESET_RELIABILITY_MANAGER failed", error = error
|
||||||
|
return err("error processing RESET_RELIABILITY_MANAGER request: " & $error)
|
||||||
|
of START_PERIODIC_TASKS:
|
||||||
|
rm[].startPeriodicTasks()
|
||||||
|
|
||||||
|
return ok("")
|
||||||
@ -0,0 +1,69 @@
|
|||||||
|
import std/[options, json, strutils, net, sequtils]
|
||||||
|
import chronos, chronicles, results, confutils, confutils/std/net
|
||||||
|
|
||||||
|
import ../../../alloc
|
||||||
|
import ../../../../src/[reliability_utils, reliability, message]
|
||||||
|
|
||||||
|
type SdsMessageMsgType* = enum
|
||||||
|
WRAP_MESSAGE
|
||||||
|
UNWRAP_MESSAGE
|
||||||
|
|
||||||
|
type SdsMessageRequest* = object
|
||||||
|
operation: SdsMessageMsgType
|
||||||
|
message: SharedSeq[byte]
|
||||||
|
messageLen: csize_t
|
||||||
|
messageId: cstring
|
||||||
|
|
||||||
|
type SdsUnwrapResponse* = object
|
||||||
|
message*: seq[byte]
|
||||||
|
missingDeps*: seq[SdsMessageID]
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type SdsMessageRequest,
|
||||||
|
op: SdsMessageMsgType,
|
||||||
|
message: pointer,
|
||||||
|
messageLen: csize_t = 0,
|
||||||
|
messageId: cstring = "",
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].operation = op
|
||||||
|
ret[].messageLen = messageLen
|
||||||
|
ret[].messageId = messageId.alloc()
|
||||||
|
ret[].message = allocSharedSeqFromCArray(cast[ptr byte](message), messageLen.int)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc destroyShared(self: ptr SdsMessageRequest) =
|
||||||
|
deallocSharedSeq(self[].message)
|
||||||
|
deallocShared(self[].messageId)
|
||||||
|
deallocShared(self)
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
self: ptr SdsMessageRequest, rm: ptr ReliabilityManager
|
||||||
|
): Future[Result[string, string]] {.async.} =
|
||||||
|
defer:
|
||||||
|
destroyShared(self)
|
||||||
|
|
||||||
|
case self.operation
|
||||||
|
of WRAP_MESSAGE:
|
||||||
|
let messageBytes = self.message.toSeq()
|
||||||
|
|
||||||
|
let wrappedMessage = wrapOutgoingMessage(rm[], messageBytes, $self.messageId).valueOr:
|
||||||
|
error "WRAP_MESSAGE failed", error = error
|
||||||
|
return err("error processing WRAP_MESSAGE request: " & $error)
|
||||||
|
|
||||||
|
# returns a comma-separates string of bytes
|
||||||
|
return ok(wrappedMessage.mapIt($it).join(","))
|
||||||
|
of UNWRAP_MESSAGE:
|
||||||
|
let messageBytes = self.message.toSeq()
|
||||||
|
|
||||||
|
let (unwrappedMessage, missingDeps) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
|
||||||
|
error "UNWRAP_MESSAGE failed", error = error
|
||||||
|
return err("error processing UNWRAP_MESSAGE request: " & $error)
|
||||||
|
|
||||||
|
let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps)
|
||||||
|
|
||||||
|
# return the result as a json string
|
||||||
|
return ok($(%*(res)))
|
||||||
|
|
||||||
|
return ok("")
|
||||||
@ -0,0 +1,78 @@
|
|||||||
|
## This file contains the base message request type that will be handled.
|
||||||
|
## The requests are created by the main thread and processed by
|
||||||
|
## the SDS Thread.
|
||||||
|
|
||||||
|
import std/json, results
|
||||||
|
import chronos, chronos/threadsync
|
||||||
|
import
|
||||||
|
../../ffi_types,
|
||||||
|
./requests/[sds_lifecycle_request, sds_message_request, sds_dependencies_request],
|
||||||
|
../../../src/[reliability_utils]
|
||||||
|
|
||||||
|
type RequestType* {.pure.} = enum
|
||||||
|
LIFECYCLE
|
||||||
|
MESSAGE
|
||||||
|
DEPENDENCIES
|
||||||
|
|
||||||
|
type SdsThreadRequest* = object
|
||||||
|
reqType: RequestType
|
||||||
|
reqContent: pointer
|
||||||
|
callback: SdsCallBack
|
||||||
|
userData: pointer
|
||||||
|
|
||||||
|
proc createShared*(
|
||||||
|
T: type SdsThreadRequest,
|
||||||
|
reqType: RequestType,
|
||||||
|
reqContent: pointer,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].reqType = reqType
|
||||||
|
ret[].reqContent = reqContent
|
||||||
|
ret[].callback = callback
|
||||||
|
ret[].userData = userData
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc handleRes[T: string | void](
|
||||||
|
res: Result[T, string], request: ptr SdsThreadRequest
|
||||||
|
) =
|
||||||
|
## Handles the Result responses, which can either be Result[string, string] or
|
||||||
|
## Result[void, string].
|
||||||
|
|
||||||
|
defer:
|
||||||
|
deallocShared(request)
|
||||||
|
|
||||||
|
if res.isErr():
|
||||||
|
foreignThreadGc:
|
||||||
|
let msg = "libsds error: handleRes fireSyncRes error: " & $res.error
|
||||||
|
request[].callback(
|
||||||
|
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
foreignThreadGc:
|
||||||
|
var msg: cstring = ""
|
||||||
|
when T is string:
|
||||||
|
msg = res.get().cstring()
|
||||||
|
request[].callback(
|
||||||
|
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
proc process*(
|
||||||
|
T: type SdsThreadRequest, request: ptr SdsThreadRequest, rm: ptr ReliabilityManager
|
||||||
|
) {.async.} =
|
||||||
|
let retFut =
|
||||||
|
case request[].reqType
|
||||||
|
of LIFECYCLE:
|
||||||
|
cast[ptr SdsLifecycleRequest](request[].reqContent).process(rm)
|
||||||
|
of MESSAGE:
|
||||||
|
cast[ptr SdsMessageRequest](request[].reqContent).process(rm)
|
||||||
|
of DEPENDENCIES:
|
||||||
|
cast[ptr SdsDependenciesRequest](request[].reqContent).process(rm)
|
||||||
|
|
||||||
|
handleRes(await retFut, request)
|
||||||
|
|
||||||
|
proc `$`*(self: SdsThreadRequest): string =
|
||||||
|
return $self.reqType
|
||||||
132
library/sds_thread/sds_thread.nim
Normal file
132
library/sds_thread/sds_thread.nim
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
{.pragma: exported, exportc, cdecl, raises: [].}
|
||||||
|
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||||
|
{.passc: "-fPIC".}
|
||||||
|
|
||||||
|
import std/[options, atomics, os, net, locks]
|
||||||
|
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||||
|
import
|
||||||
|
../ffi_types,
|
||||||
|
./inter_thread_communication/sds_thread_request,
|
||||||
|
../../src/[reliability_utils]
|
||||||
|
|
||||||
|
type SdsContext* = object
|
||||||
|
thread: Thread[(ptr SdsContext)]
|
||||||
|
lock: Lock
|
||||||
|
reqChannel: ChannelSPSCSingle[ptr SdsThreadRequest]
|
||||||
|
reqSignal: ThreadSignalPtr
|
||||||
|
# to inform The SDS Thread (a.k.a TST) that a new request is sent
|
||||||
|
reqReceivedSignal: ThreadSignalPtr
|
||||||
|
# to inform the main thread that the request is rx by TST
|
||||||
|
userData*: pointer
|
||||||
|
eventCallback*: pointer
|
||||||
|
eventUserdata*: pointer
|
||||||
|
running: Atomic[bool] # To control when the thread is running
|
||||||
|
|
||||||
|
proc runSds(ctx: ptr SdsContext) {.async.} =
|
||||||
|
## This is the worker body. This runs the SDS instance
|
||||||
|
## and attends library user requests (stop, connect_to, etc.)
|
||||||
|
|
||||||
|
var rm: ReliabilityManager
|
||||||
|
|
||||||
|
while true:
|
||||||
|
await ctx.reqSignal.wait()
|
||||||
|
|
||||||
|
if ctx.running.load == false:
|
||||||
|
break
|
||||||
|
|
||||||
|
## Trying to get a request from the libsds requestor thread
|
||||||
|
var request: ptr SdsThreadRequest
|
||||||
|
let recvOk = ctx.reqChannel.tryRecv(request)
|
||||||
|
if not recvOk:
|
||||||
|
error "sds thread could not receive a request"
|
||||||
|
continue
|
||||||
|
|
||||||
|
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||||
|
if fireRes.isErr():
|
||||||
|
error "could not fireSync back to requester thread", error = fireRes.error
|
||||||
|
|
||||||
|
## Handle the request
|
||||||
|
asyncSpawn SdsThreadRequest.process(request, addr rm)
|
||||||
|
|
||||||
|
proc run(ctx: ptr SdsContext) {.thread.} =
|
||||||
|
## Launch sds worker
|
||||||
|
waitFor runSds(ctx)
|
||||||
|
|
||||||
|
proc createSdsThread*(): Result[ptr SdsContext, string] =
|
||||||
|
## This proc is called from the main thread and it creates
|
||||||
|
## the SDS working thread.
|
||||||
|
var ctx = createShared(SdsContext, 1)
|
||||||
|
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||||
|
return err("couldn't create reqSignal ThreadSignalPtr")
|
||||||
|
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||||
|
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
||||||
|
ctx.lock.initLock()
|
||||||
|
|
||||||
|
ctx.running.store(true)
|
||||||
|
|
||||||
|
try:
|
||||||
|
createThread(ctx.thread, run, ctx)
|
||||||
|
except ValueError, ResourceExhaustedError:
|
||||||
|
# and freeShared for typed allocations!
|
||||||
|
freeShared(ctx)
|
||||||
|
|
||||||
|
return err("failed to create the SDS thread: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
return ok(ctx)
|
||||||
|
|
||||||
|
proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] =
|
||||||
|
ctx.running.store(false)
|
||||||
|
|
||||||
|
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||||
|
return err("error in destroySdsThread: " & $error)
|
||||||
|
if not signaledOnTime:
|
||||||
|
return err("failed to signal reqSignal on time in destroySdsThread")
|
||||||
|
|
||||||
|
joinThread(ctx.thread)
|
||||||
|
ctx.lock.deinitLock()
|
||||||
|
?ctx.reqSignal.close()
|
||||||
|
?ctx.reqReceivedSignal.close()
|
||||||
|
freeShared(ctx)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc sendRequestToSdsThread*(
|
||||||
|
ctx: ptr SdsContext,
|
||||||
|
reqType: RequestType,
|
||||||
|
reqContent: pointer,
|
||||||
|
callback: SdsCallBack,
|
||||||
|
userData: pointer,
|
||||||
|
): Result[void, string] =
|
||||||
|
let req = SdsThreadRequest.createShared(reqType, reqContent, callback, userData)
|
||||||
|
|
||||||
|
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||||
|
# between threads assumes that there aren't concurrent requests.
|
||||||
|
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
|
||||||
|
# requests concurrently and spare us the need of locks
|
||||||
|
ctx.lock.acquire()
|
||||||
|
defer:
|
||||||
|
ctx.lock.release()
|
||||||
|
## Sending the request
|
||||||
|
let sentOk = ctx.reqChannel.trySend(req)
|
||||||
|
if not sentOk:
|
||||||
|
deallocShared(req)
|
||||||
|
return err("Couldn't send a request to the sds thread: " & $req[])
|
||||||
|
|
||||||
|
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||||
|
if fireSyncRes.isErr():
|
||||||
|
deallocShared(req)
|
||||||
|
return err("failed fireSync: " & $fireSyncRes.error)
|
||||||
|
|
||||||
|
if fireSyncRes.get() == false:
|
||||||
|
deallocShared(req)
|
||||||
|
return err("Couldn't fireSync in time")
|
||||||
|
|
||||||
|
## wait until the SDS Thread properly received the request
|
||||||
|
let res = ctx.reqReceivedSignal.waitSync()
|
||||||
|
if res.isErr():
|
||||||
|
deallocShared(req)
|
||||||
|
return err("Couldn't receive reqReceivedSignal signal")
|
||||||
|
|
||||||
|
## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the
|
||||||
|
## process proc.
|
||||||
|
ok()
|
||||||
39
sds.nimble
Normal file
39
sds.nimble
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
# Package
|
||||||
|
version = "0.1.0"
|
||||||
|
author = "Waku Team"
|
||||||
|
description = "E2E Reliability Protocol API"
|
||||||
|
license = "MIT"
|
||||||
|
srcDir = "src"
|
||||||
|
|
||||||
|
# Dependencies
|
||||||
|
requires "nim >= 2.0.8"
|
||||||
|
requires "chronicles"
|
||||||
|
requires "libp2p"
|
||||||
|
|
||||||
|
proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") =
|
||||||
|
if not dirExists "build":
|
||||||
|
mkDir "build"
|
||||||
|
# allow something like "nim nimbus --verbosity:0 --hints:off nimbus.nims"
|
||||||
|
var extra_params = params
|
||||||
|
for i in 2 ..< paramCount():
|
||||||
|
extra_params &= " " & paramStr(i)
|
||||||
|
if `type` == "static":
|
||||||
|
exec "nim c" & " --out:build/" & name &
|
||||||
|
".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --undef:metrics --nimMainPrefix:libsds --skipParentCfg:on " &
|
||||||
|
extra_params & " " & srcDir & name & ".nim"
|
||||||
|
else:
|
||||||
|
exec "nim c" & " --out:build/" & name &
|
||||||
|
".so --threads:on --app:lib --opt:size --noMain --mm:refc --header --undef:metrics --nimMainPrefix:libsds --skipParentCfg:on " &
|
||||||
|
extra_params & " " & srcDir & name & ".nim"
|
||||||
|
|
||||||
|
# Tasks
|
||||||
|
task test, "Run the test suite":
|
||||||
|
exec "nim c -r tests/test_bloom.nim"
|
||||||
|
exec "nim c -r tests/test_reliability.nim"
|
||||||
|
|
||||||
|
task libsdsDynamic, "Generate bindings":
|
||||||
|
let name = "libsds"
|
||||||
|
buildLibrary name,
|
||||||
|
"library/",
|
||||||
|
"",
|
||||||
|
"dynamic"
|
||||||
@ -1,14 +1,14 @@
|
|||||||
import std/times
|
import std/[times, options, sets]
|
||||||
|
|
||||||
type
|
type
|
||||||
SdsMessageID* = seq[byte]
|
SdsMessageID* = string
|
||||||
SdsChannelID* = seq[byte]
|
SdsChannelID* = string
|
||||||
|
|
||||||
SdsMessage* = object
|
SdsMessage* = object
|
||||||
messageId*: SdsMessageID
|
messageId*: SdsMessageID
|
||||||
lamportTimestamp*: int64
|
lamportTimestamp*: int64
|
||||||
causalHistory*: seq[SdsMessageID]
|
causalHistory*: seq[SdsMessageID]
|
||||||
channelId*: SdsChannelID
|
channelId*: Option[SdsChannelID]
|
||||||
content*: seq[byte]
|
content*: seq[byte]
|
||||||
bloomFilter*: seq[byte]
|
bloomFilter*: seq[byte]
|
||||||
|
|
||||||
@ -17,6 +17,10 @@ type
|
|||||||
sendTime*: Time
|
sendTime*: Time
|
||||||
resendAttempts*: int
|
resendAttempts*: int
|
||||||
|
|
||||||
|
IncomingMessage* = object
|
||||||
|
message*: SdsMessage
|
||||||
|
missingDeps*: HashSet[SdsMessageID]
|
||||||
|
|
||||||
const
|
const
|
||||||
DefaultMaxMessageHistory* = 1000
|
DefaultMaxMessageHistory* = 1000
|
||||||
DefaultMaxCausalHistory* = 10
|
DefaultMaxCausalHistory* = 10
|
||||||
|
|||||||
@ -12,7 +12,8 @@ proc encode*(msg: SdsMessage): ProtoBuffer =
|
|||||||
for hist in msg.causalHistory:
|
for hist in msg.causalHistory:
|
||||||
pb.write(3, hist)
|
pb.write(3, hist)
|
||||||
|
|
||||||
pb.write(4, msg.channelId)
|
if msg.channelId.isSome():
|
||||||
|
pb.write(4, msg.channelId.get())
|
||||||
pb.write(5, msg.content)
|
pb.write(5, msg.content)
|
||||||
pb.write(6, msg.bloomFilter)
|
pb.write(6, msg.bloomFilter)
|
||||||
pb.finish()
|
pb.finish()
|
||||||
@ -31,13 +32,16 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
|||||||
return err(ProtobufError.missingRequiredField("lamportTimestamp"))
|
return err(ProtobufError.missingRequiredField("lamportTimestamp"))
|
||||||
msg.lamportTimestamp = int64(timestamp)
|
msg.lamportTimestamp = int64(timestamp)
|
||||||
|
|
||||||
var causalHistory: seq[seq[byte]]
|
var causalHistory: seq[SdsMessageID]
|
||||||
let histResult = pb.getRepeatedField(3, causalHistory)
|
let histResult = pb.getRepeatedField(3, causalHistory)
|
||||||
if histResult.isOk:
|
if histResult.isOk:
|
||||||
msg.causalHistory = causalHistory
|
msg.causalHistory = causalHistory
|
||||||
|
|
||||||
if not ?pb.getField(4, msg.channelId):
|
var channelId: SdsChannelID
|
||||||
return err(ProtobufError.missingRequiredField("channelId"))
|
if ?pb.getField(4, channelId):
|
||||||
|
msg.channelId = some(channelId)
|
||||||
|
else:
|
||||||
|
msg.channelId = none[SdsChannelID]()
|
||||||
|
|
||||||
if not ?pb.getField(5, msg.content):
|
if not ?pb.getField(5, msg.content):
|
||||||
return err(ProtobufError.missingRequiredField("content"))
|
return err(ProtobufError.missingRequiredField("content"))
|
||||||
|
|||||||
349
src/reliability.nim
Normal file
349
src/reliability.nim
Normal file
@ -0,0 +1,349 @@
|
|||||||
|
import std/[times, locks, tables, sets, options]
|
||||||
|
import chronos, results, chronicles
|
||||||
|
import ./[message, protobuf, reliability_utils, rolling_bloom_filter]
|
||||||
|
|
||||||
|
proc newReliabilityManager*(
|
||||||
|
channelId: Option[SdsChannelID], config: ReliabilityConfig = defaultConfig()
|
||||||
|
): Result[ReliabilityManager, ReliabilityError] =
|
||||||
|
## Creates a new ReliabilityManager with the specified channel ID and configuration.
|
||||||
|
##
|
||||||
|
## Parameters:
|
||||||
|
## - channelId: A unique identifier for the communication channel.
|
||||||
|
## - config: Configuration options for the ReliabilityManager. If not provided, default configuration is used.
|
||||||
|
##
|
||||||
|
## Returns:
|
||||||
|
## A Result containing either a new ReliabilityManager instance or an error.
|
||||||
|
if not channelId.isSome():
|
||||||
|
return err(ReliabilityError.reInvalidArgument)
|
||||||
|
|
||||||
|
try:
|
||||||
|
let bloomFilter =
|
||||||
|
newRollingBloomFilter(config.bloomFilterCapacity, config.bloomFilterErrorRate)
|
||||||
|
|
||||||
|
let rm = ReliabilityManager(
|
||||||
|
lamportTimestamp: 0,
|
||||||
|
messageHistory: @[],
|
||||||
|
bloomFilter: bloomFilter,
|
||||||
|
outgoingBuffer: @[],
|
||||||
|
incomingBuffer: initTable[SdsMessageID, IncomingMessage](),
|
||||||
|
channelId: channelId,
|
||||||
|
config: config,
|
||||||
|
)
|
||||||
|
initLock(rm.lock)
|
||||||
|
return ok(rm)
|
||||||
|
except Exception:
|
||||||
|
error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg()
|
||||||
|
return err(ReliabilityError.reOutOfMemory)
|
||||||
|
|
||||||
|
proc isAcknowledged*(
|
||||||
|
msg: UnacknowledgedMessage,
|
||||||
|
causalHistory: seq[SdsMessageID],
|
||||||
|
rbf: Option[RollingBloomFilter],
|
||||||
|
): bool =
|
||||||
|
if msg.message.messageId in causalHistory:
|
||||||
|
return true
|
||||||
|
|
||||||
|
if rbf.isSome():
|
||||||
|
return rbf.get().contains(msg.message.messageId)
|
||||||
|
|
||||||
|
false
|
||||||
|
|
||||||
|
proc reviewAckStatus(rm: ReliabilityManager, msg: SdsMessage) {.gcsafe.} =
|
||||||
|
# Parse bloom filter
|
||||||
|
var rbf: Option[RollingBloomFilter]
|
||||||
|
if msg.bloomFilter.len > 0:
|
||||||
|
let bfResult = deserializeBloomFilter(msg.bloomFilter)
|
||||||
|
if bfResult.isOk():
|
||||||
|
rbf = some(
|
||||||
|
RollingBloomFilter(
|
||||||
|
filter: bfResult.get(),
|
||||||
|
capacity: bfResult.get().capacity,
|
||||||
|
minCapacity: (
|
||||||
|
bfResult.get().capacity.float * (100 - CapacityFlexPercent).float / 100.0
|
||||||
|
).int,
|
||||||
|
maxCapacity: (
|
||||||
|
bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0
|
||||||
|
).int,
|
||||||
|
messages: @[],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
error "Failed to deserialize bloom filter", error = bfResult.error
|
||||||
|
rbf = none[RollingBloomFilter]()
|
||||||
|
else:
|
||||||
|
rbf = none[RollingBloomFilter]()
|
||||||
|
|
||||||
|
# Keep track of indices to delete
|
||||||
|
var toDelete: seq[int] = @[]
|
||||||
|
var i = 0
|
||||||
|
|
||||||
|
while i < rm.outgoingBuffer.len:
|
||||||
|
let outMsg = rm.outgoingBuffer[i]
|
||||||
|
if outMsg.isAcknowledged(msg.causalHistory, rbf):
|
||||||
|
if not rm.onMessageSent.isNil():
|
||||||
|
rm.onMessageSent(outMsg.message.messageId)
|
||||||
|
toDelete.add(i)
|
||||||
|
inc i
|
||||||
|
|
||||||
|
for i in countdown(toDelete.high, 0): # Delete in reverse order to maintain indices
|
||||||
|
rm.outgoingBuffer.delete(toDelete[i])
|
||||||
|
|
||||||
|
proc wrapOutgoingMessage*(
|
||||||
|
rm: ReliabilityManager, message: seq[byte], messageId: SdsMessageID
|
||||||
|
): Result[seq[byte], ReliabilityError] =
|
||||||
|
## Wraps an outgoing message with reliability metadata.
|
||||||
|
##
|
||||||
|
## Parameters:
|
||||||
|
## - message: The content of the message to be sent.
|
||||||
|
## - messageId: Unique identifier for the message
|
||||||
|
##
|
||||||
|
## Returns:
|
||||||
|
## A Result containing either wrapped message bytes or an error.
|
||||||
|
if message.len == 0:
|
||||||
|
return err(ReliabilityError.reInvalidArgument)
|
||||||
|
if message.len > MaxMessageSize:
|
||||||
|
return err(ReliabilityError.reMessageTooLarge)
|
||||||
|
|
||||||
|
withLock rm.lock:
|
||||||
|
try:
|
||||||
|
rm.updateLamportTimestamp(getTime().toUnix)
|
||||||
|
|
||||||
|
let bfResult = serializeBloomFilter(rm.bloomFilter.filter)
|
||||||
|
if bfResult.isErr:
|
||||||
|
error "Failed to serialize bloom filter"
|
||||||
|
return err(ReliabilityError.reSerializationError)
|
||||||
|
|
||||||
|
let msg = SdsMessage(
|
||||||
|
messageId: messageId,
|
||||||
|
lamportTimestamp: rm.lamportTimestamp,
|
||||||
|
causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory),
|
||||||
|
channelId: rm.channelId,
|
||||||
|
content: message,
|
||||||
|
bloomFilter: bfResult.get(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add to outgoing buffer
|
||||||
|
rm.outgoingBuffer.add(
|
||||||
|
UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add to causal history and bloom filter
|
||||||
|
rm.bloomFilter.add(msg.messageId)
|
||||||
|
rm.addToHistory(msg.messageId)
|
||||||
|
|
||||||
|
return serializeMessage(msg)
|
||||||
|
except Exception:
|
||||||
|
error "Failed to wrap message", msg = getCurrentExceptionMsg()
|
||||||
|
return err(ReliabilityError.reSerializationError)
|
||||||
|
|
||||||
|
proc processIncomingBuffer(rm: ReliabilityManager) {.gcsafe.} =
|
||||||
|
withLock rm.lock:
|
||||||
|
if rm.incomingBuffer.len == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
var processed = initHashSet[SdsMessageID]()
|
||||||
|
var readyToProcess = newSeq[SdsMessageID]()
|
||||||
|
|
||||||
|
# Find initially ready messages
|
||||||
|
for msgId, entry in rm.incomingBuffer:
|
||||||
|
if entry.missingDeps.len == 0:
|
||||||
|
readyToProcess.add(msgId)
|
||||||
|
|
||||||
|
while readyToProcess.len > 0:
|
||||||
|
let msgId = readyToProcess.pop()
|
||||||
|
if msgId in processed:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if msgId in rm.incomingBuffer:
|
||||||
|
rm.addToHistory(msgId)
|
||||||
|
if not rm.onMessageReady.isNil():
|
||||||
|
rm.onMessageReady(msgId)
|
||||||
|
processed.incl(msgId)
|
||||||
|
|
||||||
|
# Update dependencies for remaining messages
|
||||||
|
for remainingId, entry in rm.incomingBuffer:
|
||||||
|
if remainingId notin processed:
|
||||||
|
if msgId in entry.missingDeps:
|
||||||
|
rm.incomingBuffer[remainingId].missingDeps.excl(msgId)
|
||||||
|
if rm.incomingBuffer[remainingId].missingDeps.len == 0:
|
||||||
|
readyToProcess.add(remainingId)
|
||||||
|
|
||||||
|
# Remove processed messages
|
||||||
|
for msgId in processed:
|
||||||
|
rm.incomingBuffer.del(msgId)
|
||||||
|
|
||||||
|
proc unwrapReceivedMessage*(
|
||||||
|
rm: ReliabilityManager, message: seq[byte]
|
||||||
|
): Result[tuple[message: seq[byte], missingDeps: seq[SdsMessageID]], ReliabilityError] =
|
||||||
|
## Unwraps a received message and processes its reliability metadata.
|
||||||
|
##
|
||||||
|
## Parameters:
|
||||||
|
## - message: The received message bytes
|
||||||
|
##
|
||||||
|
## Returns:
|
||||||
|
## A Result containing either tuple of (processed message, missing dependencies) or an error.
|
||||||
|
try:
|
||||||
|
let msg = deserializeMessage(message).valueOr:
|
||||||
|
return err(ReliabilityError.reDeserializationError)
|
||||||
|
|
||||||
|
if msg.messageId in rm.messageHistory:
|
||||||
|
return ok((msg.content, @[]))
|
||||||
|
|
||||||
|
rm.bloomFilter.add(msg.messageId)
|
||||||
|
|
||||||
|
# Update Lamport timestamp
|
||||||
|
rm.updateLamportTimestamp(msg.lamportTimestamp)
|
||||||
|
|
||||||
|
# Review ACK status for outgoing messages
|
||||||
|
rm.reviewAckStatus(msg)
|
||||||
|
|
||||||
|
var missingDeps = rm.checkDependencies(msg.causalHistory)
|
||||||
|
|
||||||
|
if missingDeps.len == 0:
|
||||||
|
# Check if any dependencies are still in incoming buffer
|
||||||
|
var depsInBuffer = false
|
||||||
|
for msgId, entry in rm.incomingBuffer.pairs():
|
||||||
|
if msgId in msg.causalHistory:
|
||||||
|
depsInBuffer = true
|
||||||
|
break
|
||||||
|
|
||||||
|
if depsInBuffer:
|
||||||
|
rm.incomingBuffer[msg.messageId] =
|
||||||
|
IncomingMessage(message: msg, missingDeps: initHashSet[SdsMessageID]())
|
||||||
|
else:
|
||||||
|
# All dependencies met, add to history
|
||||||
|
rm.addToHistory(msg.messageId)
|
||||||
|
rm.processIncomingBuffer()
|
||||||
|
if not rm.onMessageReady.isNil():
|
||||||
|
rm.onMessageReady(msg.messageId)
|
||||||
|
else:
|
||||||
|
rm.incomingBuffer[msg.messageId] =
|
||||||
|
IncomingMessage(message: msg, missingDeps: missingDeps.toHashSet())
|
||||||
|
if not rm.onMissingDependencies.isNil():
|
||||||
|
rm.onMissingDependencies(msg.messageId, missingDeps)
|
||||||
|
|
||||||
|
return ok((msg.content, missingDeps))
|
||||||
|
except Exception:
|
||||||
|
error "Failed to unwrap message", msg = getCurrentExceptionMsg()
|
||||||
|
return err(ReliabilityError.reDeserializationError)
|
||||||
|
|
||||||
|
proc markDependenciesMet*(
|
||||||
|
rm: ReliabilityManager, messageIds: seq[SdsMessageID]
|
||||||
|
): Result[void, ReliabilityError] =
|
||||||
|
## Marks the specified message dependencies as met.
|
||||||
|
##
|
||||||
|
## Parameters:
|
||||||
|
## - messageIds: A sequence of message IDs to mark as met.
|
||||||
|
##
|
||||||
|
## Returns:
|
||||||
|
## A Result indicating success or an error.
|
||||||
|
try:
|
||||||
|
# Add all messageIds to bloom filter
|
||||||
|
for msgId in messageIds:
|
||||||
|
if not rm.bloomFilter.contains(msgId):
|
||||||
|
rm.bloomFilter.add(msgId)
|
||||||
|
# rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application?
|
||||||
|
|
||||||
|
# Update any pending messages that depend on this one
|
||||||
|
for pendingId, entry in rm.incomingBuffer:
|
||||||
|
if msgId in entry.missingDeps:
|
||||||
|
rm.incomingBuffer[pendingId].missingDeps.excl(msgId)
|
||||||
|
|
||||||
|
rm.processIncomingBuffer()
|
||||||
|
return ok()
|
||||||
|
except Exception:
|
||||||
|
error "Failed to mark dependencies as met", msg = getCurrentExceptionMsg()
|
||||||
|
return err(ReliabilityError.reInternalError)
|
||||||
|
|
||||||
|
proc setCallbacks*(
|
||||||
|
rm: ReliabilityManager,
|
||||||
|
onMessageReady: MessageReadyCallback,
|
||||||
|
onMessageSent: MessageSentCallback,
|
||||||
|
onMissingDependencies: MissingDependenciesCallback,
|
||||||
|
onPeriodicSync: PeriodicSyncCallback = nil,
|
||||||
|
) =
|
||||||
|
## Sets the callback functions for various events in the ReliabilityManager.
|
||||||
|
##
|
||||||
|
## Parameters:
|
||||||
|
## - onMessageReady: Callback function called when a message is ready to be processed.
|
||||||
|
## - onMessageSent: Callback function called when a message is confirmed as sent.
|
||||||
|
## - onMissingDependencies: Callback function called when a message has missing dependencies.
|
||||||
|
## - onPeriodicSync: Callback function called to notify about periodic sync
|
||||||
|
withLock rm.lock:
|
||||||
|
rm.onMessageReady = onMessageReady
|
||||||
|
rm.onMessageSent = onMessageSent
|
||||||
|
rm.onMissingDependencies = onMissingDependencies
|
||||||
|
rm.onPeriodicSync = onPeriodicSync
|
||||||
|
|
||||||
|
proc checkUnacknowledgedMessages(rm: ReliabilityManager) {.gcsafe.} =
|
||||||
|
## Checks and processes unacknowledged messages in the outgoing buffer.
|
||||||
|
withLock rm.lock:
|
||||||
|
let now = getTime()
|
||||||
|
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]
|
||||||
|
|
||||||
|
for unackMsg in rm.outgoingBuffer:
|
||||||
|
let elapsed = now - unackMsg.sendTime
|
||||||
|
if elapsed > rm.config.resendInterval:
|
||||||
|
# Time to attempt resend
|
||||||
|
if unackMsg.resendAttempts < rm.config.maxResendAttempts:
|
||||||
|
var updatedMsg = unackMsg
|
||||||
|
updatedMsg.resendAttempts += 1
|
||||||
|
updatedMsg.sendTime = now
|
||||||
|
newOutgoingBuffer.add(updatedMsg)
|
||||||
|
else:
|
||||||
|
if not rm.onMessageSent.isNil():
|
||||||
|
rm.onMessageSent(unackMsg.message.messageId)
|
||||||
|
else:
|
||||||
|
newOutgoingBuffer.add(unackMsg)
|
||||||
|
|
||||||
|
rm.outgoingBuffer = newOutgoingBuffer
|
||||||
|
|
||||||
|
proc periodicBufferSweep(
|
||||||
|
rm: ReliabilityManager
|
||||||
|
) {.async: (raises: [CancelledError]), gcsafe.} =
|
||||||
|
## Periodically sweeps the buffer to clean up and check unacknowledged messages.
|
||||||
|
while true:
|
||||||
|
try:
|
||||||
|
rm.checkUnacknowledgedMessages()
|
||||||
|
rm.cleanBloomFilter()
|
||||||
|
except Exception:
|
||||||
|
error "Error in periodic buffer sweep", msg = getCurrentExceptionMsg()
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))
|
||||||
|
|
||||||
|
proc periodicSyncMessage(
|
||||||
|
rm: ReliabilityManager
|
||||||
|
) {.async: (raises: [CancelledError]), gcsafe.} =
|
||||||
|
## Periodically notifies to send a sync message to maintain connectivity.
|
||||||
|
while true:
|
||||||
|
try:
|
||||||
|
if not rm.onPeriodicSync.isNil():
|
||||||
|
rm.onPeriodicSync()
|
||||||
|
except Exception:
|
||||||
|
error "Error in periodic sync", msg = getCurrentExceptionMsg()
|
||||||
|
await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds))
|
||||||
|
|
||||||
|
proc startPeriodicTasks*(rm: ReliabilityManager) =
|
||||||
|
## Starts the periodic tasks for buffer sweeping and sync message sending.
|
||||||
|
##
|
||||||
|
## This procedure should be called after creating a ReliabilityManager to enable automatic maintenance.
|
||||||
|
asyncSpawn rm.periodicBufferSweep()
|
||||||
|
asyncSpawn rm.periodicSyncMessage()
|
||||||
|
|
||||||
|
proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityError] =
|
||||||
|
## Resets the ReliabilityManager to its initial state.
|
||||||
|
##
|
||||||
|
## This procedure clears all buffers and resets the Lamport timestamp.
|
||||||
|
withLock rm.lock:
|
||||||
|
try:
|
||||||
|
rm.lamportTimestamp = 0
|
||||||
|
rm.messageHistory.setLen(0)
|
||||||
|
rm.outgoingBuffer.setLen(0)
|
||||||
|
rm.incomingBuffer.clear()
|
||||||
|
rm.bloomFilter = newRollingBloomFilter(
|
||||||
|
rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate
|
||||||
|
)
|
||||||
|
return ok()
|
||||||
|
except Exception:
|
||||||
|
error "Failed to reset ReliabilityManager", msg = getCurrentExceptionMsg()
|
||||||
|
return err(ReliabilityError.reInternalError)
|
||||||
@ -1,10 +1,23 @@
|
|||||||
import std/[times, locks]
|
import std/[times, locks, options]
|
||||||
import chronicles
|
import chronicles
|
||||||
import ./[rolling_bloom_filter, message]
|
import ./[rolling_bloom_filter, message]
|
||||||
|
|
||||||
type
|
type
|
||||||
|
MessageReadyCallback* = proc(messageId: SdsMessageID) {.gcsafe.}
|
||||||
|
|
||||||
|
MessageSentCallback* = proc(messageId: SdsMessageID) {.gcsafe.}
|
||||||
|
|
||||||
|
MissingDependenciesCallback* =
|
||||||
|
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.}
|
||||||
|
|
||||||
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
|
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
|
||||||
|
|
||||||
|
AppCallbacks* = ref object
|
||||||
|
messageReadyCb*: MessageReadyCallback
|
||||||
|
messageSentCb*: MessageSentCallback
|
||||||
|
missingDependenciesCb*: MissingDependenciesCallback
|
||||||
|
periodicSyncCb*: PeriodicSyncCallback
|
||||||
|
|
||||||
ReliabilityConfig* = object
|
ReliabilityConfig* = object
|
||||||
bloomFilterCapacity*: int
|
bloomFilterCapacity*: int
|
||||||
bloomFilterErrorRate*: float
|
bloomFilterErrorRate*: float
|
||||||
@ -20,8 +33,8 @@ type
|
|||||||
messageHistory*: seq[SdsMessageID]
|
messageHistory*: seq[SdsMessageID]
|
||||||
bloomFilter*: RollingBloomFilter
|
bloomFilter*: RollingBloomFilter
|
||||||
outgoingBuffer*: seq[UnacknowledgedMessage]
|
outgoingBuffer*: seq[UnacknowledgedMessage]
|
||||||
incomingBuffer*: seq[SdsMessage]
|
incomingBuffer*: Table[SdsMessageID, IncomingMessage]
|
||||||
channelId*: SdsChannelID
|
channelId*: Option[SdsChannelID]
|
||||||
config*: ReliabilityConfig
|
config*: ReliabilityConfig
|
||||||
lock*: Lock
|
lock*: Lock
|
||||||
onMessageReady*: proc(messageId: SdsMessageID) {.gcsafe.}
|
onMessageReady*: proc(messageId: SdsMessageID) {.gcsafe.}
|
||||||
@ -59,7 +72,7 @@ proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
|
|||||||
try:
|
try:
|
||||||
withLock rm.lock:
|
withLock rm.lock:
|
||||||
rm.outgoingBuffer.setLen(0)
|
rm.outgoingBuffer.setLen(0)
|
||||||
rm.incomingBuffer.setLen(0)
|
rm.incomingBuffer.clear()
|
||||||
rm.messageHistory.setLen(0)
|
rm.messageHistory.setLen(0)
|
||||||
except Exception:
|
except Exception:
|
||||||
error "Error during cleanup", error = getCurrentExceptionMsg()
|
error "Error during cleanup", error = getCurrentExceptionMsg()
|
||||||
@ -84,6 +97,15 @@ proc updateLamportTimestamp*(
|
|||||||
proc getRecentSdsMessageIDs*(rm: ReliabilityManager, n: int): seq[SdsMessageID] =
|
proc getRecentSdsMessageIDs*(rm: ReliabilityManager, n: int): seq[SdsMessageID] =
|
||||||
result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1]
|
result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1]
|
||||||
|
|
||||||
|
proc checkDependencies*(
|
||||||
|
rm: ReliabilityManager, deps: seq[SdsMessageID]
|
||||||
|
): seq[SdsMessageID] =
|
||||||
|
var missingDeps: seq[SdsMessageID] = @[]
|
||||||
|
for depId in deps:
|
||||||
|
if depId notin rm.messageHistory:
|
||||||
|
missingDeps.add(depId)
|
||||||
|
return missingDeps
|
||||||
|
|
||||||
proc getMessageHistory*(rm: ReliabilityManager): seq[SdsMessageID] =
|
proc getMessageHistory*(rm: ReliabilityManager): seq[SdsMessageID] =
|
||||||
withLock rm.lock:
|
withLock rm.lock:
|
||||||
result = rm.messageHistory
|
result = rm.messageHistory
|
||||||
@ -92,6 +114,8 @@ proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] =
|
|||||||
withLock rm.lock:
|
withLock rm.lock:
|
||||||
result = rm.outgoingBuffer
|
result = rm.outgoingBuffer
|
||||||
|
|
||||||
proc getIncomingBuffer*(rm: ReliabilityManager): seq[SdsMessage] =
|
proc getIncomingBuffer*(
|
||||||
|
rm: ReliabilityManager
|
||||||
|
): Table[SdsMessageID, message.IncomingMessage] =
|
||||||
withLock rm.lock:
|
withLock rm.lock:
|
||||||
result = rm.incomingBuffer
|
result = rm.incomingBuffer
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import unittest, results, strutils
|
import unittest, results, strutils
|
||||||
import ../src/bloom
|
import ../src/bloom
|
||||||
from random import rand, randomize
|
from random import rand, randomize
|
||||||
import ../src/[message, protobuf, protobufutil, reliability_utils, rolling_bloom_filter]
|
|
||||||
|
|
||||||
suite "bloom filter":
|
suite "bloom filter":
|
||||||
setup:
|
setup:
|
||||||
|
|||||||
496
tests/test_reliability.nim
Normal file
496
tests/test_reliability.nim
Normal file
@ -0,0 +1,496 @@
|
|||||||
|
import unittest, results, chronos, std/[times, options, tables]
|
||||||
|
import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter]
|
||||||
|
|
||||||
|
# Core functionality tests
|
||||||
|
suite "Core Operations":
|
||||||
|
var rm: ReliabilityManager
|
||||||
|
|
||||||
|
setup:
|
||||||
|
let rmResult = newReliabilityManager(some("testChannel"))
|
||||||
|
check rmResult.isOk()
|
||||||
|
rm = rmResult.get()
|
||||||
|
|
||||||
|
teardown:
|
||||||
|
if not rm.isNil:
|
||||||
|
rm.cleanup()
|
||||||
|
|
||||||
|
test "can create with default config":
|
||||||
|
let config = defaultConfig()
|
||||||
|
check:
|
||||||
|
config.bloomFilterCapacity == DefaultBloomFilterCapacity
|
||||||
|
config.bloomFilterErrorRate == DefaultBloomFilterErrorRate
|
||||||
|
config.maxMessageHistory == DefaultMaxMessageHistory
|
||||||
|
|
||||||
|
test "basic message wrapping and unwrapping":
|
||||||
|
let msg = @[byte(1), 2, 3]
|
||||||
|
let msgId = "test-msg-1"
|
||||||
|
|
||||||
|
let wrappedResult = rm.wrapOutgoingMessage(msg, msgId)
|
||||||
|
check wrappedResult.isOk()
|
||||||
|
let wrapped = wrappedResult.get()
|
||||||
|
check wrapped.len > 0
|
||||||
|
|
||||||
|
let unwrapResult = rm.unwrapReceivedMessage(wrapped)
|
||||||
|
check unwrapResult.isOk()
|
||||||
|
let (unwrapped, missingDeps) = unwrapResult.get()
|
||||||
|
check:
|
||||||
|
unwrapped == msg
|
||||||
|
missingDeps.len == 0
|
||||||
|
|
||||||
|
test "message ordering":
|
||||||
|
# Create messages with different timestamps
|
||||||
|
let msg1 = SdsMessage(
|
||||||
|
messageId: "msg1",
|
||||||
|
lamportTimestamp: 1,
|
||||||
|
causalHistory: @[],
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(1)],
|
||||||
|
bloomFilter: @[],
|
||||||
|
)
|
||||||
|
|
||||||
|
let msg2 = SdsMessage(
|
||||||
|
messageId: "msg2",
|
||||||
|
lamportTimestamp: 5,
|
||||||
|
causalHistory: @[],
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(2)],
|
||||||
|
bloomFilter: @[],
|
||||||
|
)
|
||||||
|
|
||||||
|
let serialized1 = serializeMessage(msg1)
|
||||||
|
let serialized2 = serializeMessage(msg2)
|
||||||
|
check:
|
||||||
|
serialized1.isOk()
|
||||||
|
serialized2.isOk()
|
||||||
|
|
||||||
|
# Process out of order
|
||||||
|
discard rm.unwrapReceivedMessage(serialized2.get())
|
||||||
|
let timestamp1 = rm.lamportTimestamp
|
||||||
|
discard rm.unwrapReceivedMessage(serialized1.get())
|
||||||
|
let timestamp2 = rm.lamportTimestamp
|
||||||
|
|
||||||
|
check timestamp2 > timestamp1
|
||||||
|
|
||||||
|
# Reliability mechanism tests
|
||||||
|
suite "Reliability Mechanisms":
|
||||||
|
var rm: ReliabilityManager
|
||||||
|
|
||||||
|
setup:
|
||||||
|
let rmResult = newReliabilityManager(some("testChannel"))
|
||||||
|
check rmResult.isOk()
|
||||||
|
rm = rmResult.get()
|
||||||
|
|
||||||
|
teardown:
|
||||||
|
if not rm.isNil:
|
||||||
|
rm.cleanup()
|
||||||
|
|
||||||
|
test "dependency detection and resolution":
|
||||||
|
var messageReadyCount = 0
|
||||||
|
var messageSentCount = 0
|
||||||
|
var missingDepsCount = 0
|
||||||
|
|
||||||
|
rm.setCallbacks(
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
messageReadyCount += 1,
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
messageSentCount += 1,
|
||||||
|
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||||
|
missingDepsCount += 1,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create dependency chain: msg3 -> msg2 -> msg1
|
||||||
|
let id1 = "msg1"
|
||||||
|
let id2 = "msg2"
|
||||||
|
let id3 = "msg3"
|
||||||
|
|
||||||
|
# Create messages with dependencies
|
||||||
|
let msg2 = SdsMessage(
|
||||||
|
messageId: id2,
|
||||||
|
lamportTimestamp: 2,
|
||||||
|
causalHistory: @[id1], # msg2 depends on msg1
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(2)],
|
||||||
|
bloomFilter: @[],
|
||||||
|
)
|
||||||
|
|
||||||
|
let msg3 = SdsMessage(
|
||||||
|
messageId: id3,
|
||||||
|
lamportTimestamp: 3,
|
||||||
|
causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(3)],
|
||||||
|
bloomFilter: @[],
|
||||||
|
)
|
||||||
|
|
||||||
|
let serialized2 = serializeMessage(msg2)
|
||||||
|
let serialized3 = serializeMessage(msg3)
|
||||||
|
check:
|
||||||
|
serialized2.isOk()
|
||||||
|
serialized3.isOk()
|
||||||
|
|
||||||
|
# First try processing msg3 (which depends on msg2 which depends on msg1)
|
||||||
|
let unwrapResult3 = rm.unwrapReceivedMessage(serialized3.get())
|
||||||
|
check unwrapResult3.isOk()
|
||||||
|
let (_, missingDeps3) = unwrapResult3.get()
|
||||||
|
|
||||||
|
check:
|
||||||
|
missingDepsCount == 1 # Should trigger missing deps callback
|
||||||
|
missingDeps3.len == 2 # Should be missing both msg1 and msg2
|
||||||
|
id1 in missingDeps3
|
||||||
|
id2 in missingDeps3
|
||||||
|
|
||||||
|
# Then try processing msg2 (which only depends on msg1)
|
||||||
|
let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get())
|
||||||
|
check unwrapResult2.isOk()
|
||||||
|
let (_, missingDeps2) = unwrapResult2.get()
|
||||||
|
|
||||||
|
check:
|
||||||
|
missingDepsCount == 2 # Should have triggered another missing deps callback
|
||||||
|
missingDeps2.len == 1 # Should only be missing msg1
|
||||||
|
id1 in missingDeps2
|
||||||
|
messageReadyCount == 0 # No messages should be ready yet
|
||||||
|
|
||||||
|
# Mark first dependency (msg1) as met
|
||||||
|
let markResult1 = rm.markDependenciesMet(@[id1])
|
||||||
|
check markResult1.isOk()
|
||||||
|
|
||||||
|
let incomingBuffer = rm.getIncomingBuffer()
|
||||||
|
|
||||||
|
check:
|
||||||
|
incomingBuffer.len == 0
|
||||||
|
messageReadyCount == 2 # Both msg2 and msg3 should be ready
|
||||||
|
missingDepsCount == 2 # Should still be 2 from the initial missing deps
|
||||||
|
|
||||||
|
test "acknowledgment via causal history":
|
||||||
|
var messageReadyCount = 0
|
||||||
|
var messageSentCount = 0
|
||||||
|
var missingDepsCount = 0
|
||||||
|
|
||||||
|
rm.setCallbacks(
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
messageReadyCount += 1,
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
messageSentCount += 1,
|
||||||
|
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||||
|
missingDepsCount += 1,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send our message
|
||||||
|
let msg1 = @[byte(1)]
|
||||||
|
let id1 = "msg1"
|
||||||
|
let wrap1 = rm.wrapOutgoingMessage(msg1, id1)
|
||||||
|
check wrap1.isOk()
|
||||||
|
|
||||||
|
# Create a message that has our message in causal history
|
||||||
|
let msg2 = SdsMessage(
|
||||||
|
messageId: "msg2",
|
||||||
|
lamportTimestamp: rm.lamportTimestamp + 1,
|
||||||
|
causalHistory: @[id1], # Include our message in causal history
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(2)],
|
||||||
|
bloomFilter: @[] # Test with an empty bloom filter
|
||||||
|
,
|
||||||
|
)
|
||||||
|
|
||||||
|
let serializedMsg2 = serializeMessage(msg2)
|
||||||
|
check serializedMsg2.isOk()
|
||||||
|
|
||||||
|
# Process the "received" message - should trigger callbacks
|
||||||
|
let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get())
|
||||||
|
check unwrapResult.isOk()
|
||||||
|
|
||||||
|
check:
|
||||||
|
messageReadyCount == 1 # For msg2 which we "received"
|
||||||
|
messageSentCount == 1 # For msg1 which was acknowledged via causal history
|
||||||
|
|
||||||
|
test "acknowledgment via bloom filter":
|
||||||
|
var messageSentCount = 0
|
||||||
|
|
||||||
|
rm.setCallbacks(
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
messageSentCount += 1,
|
||||||
|
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send our message
|
||||||
|
let msg1 = @[byte(1)]
|
||||||
|
let id1 = "msg1"
|
||||||
|
let wrap1 = rm.wrapOutgoingMessage(msg1, id1)
|
||||||
|
check wrap1.isOk()
|
||||||
|
|
||||||
|
# Create a message with bloom filter containing our message
|
||||||
|
var otherPartyBloomFilter =
|
||||||
|
newRollingBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
|
||||||
|
otherPartyBloomFilter.add(id1)
|
||||||
|
|
||||||
|
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
|
||||||
|
check bfResult.isOk()
|
||||||
|
|
||||||
|
let msg2 = SdsMessage(
|
||||||
|
messageId: "msg2",
|
||||||
|
lamportTimestamp: rm.lamportTimestamp + 1,
|
||||||
|
causalHistory: @[], # Empty causal history as we're using bloom filter
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(2)],
|
||||||
|
bloomFilter: bfResult.get(),
|
||||||
|
)
|
||||||
|
|
||||||
|
let serializedMsg2 = serializeMessage(msg2)
|
||||||
|
check serializedMsg2.isOk()
|
||||||
|
|
||||||
|
let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get())
|
||||||
|
check unwrapResult.isOk()
|
||||||
|
|
||||||
|
check messageSentCount == 1 # Our message should be acknowledged via bloom filter
|
||||||
|
|
||||||
|
# Periodic task & Buffer management tests
|
||||||
|
suite "Periodic Tasks & Buffer Management":
|
||||||
|
var rm: ReliabilityManager
|
||||||
|
|
||||||
|
setup:
|
||||||
|
let rmResult = newReliabilityManager(some("testChannel"))
|
||||||
|
check rmResult.isOk()
|
||||||
|
rm = rmResult.get()
|
||||||
|
|
||||||
|
teardown:
|
||||||
|
if not rm.isNil:
|
||||||
|
rm.cleanup()
|
||||||
|
|
||||||
|
test "outgoing buffer management":
|
||||||
|
var messageSentCount = 0
|
||||||
|
|
||||||
|
rm.setCallbacks(
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
messageSentCount += 1,
|
||||||
|
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add multiple messages
|
||||||
|
for i in 0 .. 5:
|
||||||
|
let msg = @[byte(i)]
|
||||||
|
let id = "msg" & $i
|
||||||
|
let wrap = rm.wrapOutgoingMessage(msg, id)
|
||||||
|
check wrap.isOk()
|
||||||
|
|
||||||
|
let outBuffer = rm.getOutgoingBuffer()
|
||||||
|
check outBuffer.len == 6
|
||||||
|
|
||||||
|
# Create message that acknowledges some messages
|
||||||
|
let ackMsg = SdsMessage(
|
||||||
|
messageId: "ack1",
|
||||||
|
lamportTimestamp: rm.lamportTimestamp + 1,
|
||||||
|
causalHistory: @["msg0", "msg2", "msg4"],
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(100)],
|
||||||
|
bloomFilter: @[],
|
||||||
|
)
|
||||||
|
|
||||||
|
let serializedAck = serializeMessage(ackMsg)
|
||||||
|
check serializedAck.isOk()
|
||||||
|
|
||||||
|
# Process the acknowledgment
|
||||||
|
discard rm.unwrapReceivedMessage(serializedAck.get())
|
||||||
|
|
||||||
|
let finalBuffer = rm.getOutgoingBuffer()
|
||||||
|
check:
|
||||||
|
finalBuffer.len == 3 # Should have removed acknowledged messages
|
||||||
|
messageSentCount == 3
|
||||||
|
# Should have triggered sent callback for acknowledged messages
|
||||||
|
|
||||||
|
test "periodic buffer sweep and bloom clean":
|
||||||
|
var messageSentCount = 0
|
||||||
|
|
||||||
|
var config = defaultConfig()
|
||||||
|
config.resendInterval = initDuration(milliseconds = 100) # Short for testing
|
||||||
|
config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps
|
||||||
|
config.bloomFilterCapacity = 2 # Small capacity for testing
|
||||||
|
config.maxResendAttempts = 3 # Set a low number of max attempts
|
||||||
|
|
||||||
|
let rmResultP = newReliabilityManager(some("testChannel"), config)
|
||||||
|
check rmResultP.isOk()
|
||||||
|
let rm = rmResultP.get()
|
||||||
|
|
||||||
|
rm.setCallbacks(
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
messageSentCount += 1,
|
||||||
|
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
)
|
||||||
|
|
||||||
|
# First message - should be cleaned from bloom filter later
|
||||||
|
let msg1 = @[byte(1)]
|
||||||
|
let id1 = "msg1"
|
||||||
|
let wrap1 = rm.wrapOutgoingMessage(msg1, id1)
|
||||||
|
check wrap1.isOk()
|
||||||
|
|
||||||
|
let initialBuffer = rm.getOutgoingBuffer()
|
||||||
|
check:
|
||||||
|
initialBuffer[0].resendAttempts == 0
|
||||||
|
rm.bloomFilter.contains(id1)
|
||||||
|
|
||||||
|
rm.startPeriodicTasks()
|
||||||
|
|
||||||
|
# Wait long enough for bloom filter
|
||||||
|
waitFor sleepAsync(chronos.milliseconds(500))
|
||||||
|
|
||||||
|
# Add new messages
|
||||||
|
let msg2 = @[byte(2)]
|
||||||
|
let id2 = "msg2"
|
||||||
|
let wrap2 = rm.wrapOutgoingMessage(msg2, id2)
|
||||||
|
check wrap2.isOk()
|
||||||
|
|
||||||
|
let msg3 = @[byte(3)]
|
||||||
|
let id3 = "msg3"
|
||||||
|
let wrap3 = rm.wrapOutgoingMessage(msg3, id3)
|
||||||
|
check wrap3.isOk()
|
||||||
|
|
||||||
|
let finalBuffer = rm.getOutgoingBuffer()
|
||||||
|
check:
|
||||||
|
finalBuffer.len == 2
|
||||||
|
# Only msg2 and msg3 should be in buffer, msg1 should be removed after max retries
|
||||||
|
finalBuffer[0].message.messageId == id2 # Verify it's the second message
|
||||||
|
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
|
||||||
|
not rm.bloomFilter.contains(id1) # Bloom filter cleaning check
|
||||||
|
rm.bloomFilter.contains(id3) # New message still in filter
|
||||||
|
|
||||||
|
rm.cleanup()
|
||||||
|
|
||||||
|
test "periodic sync callback":
|
||||||
|
var syncCallCount = 0
|
||||||
|
rm.setCallbacks(
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
proc() {.gcsafe.} =
|
||||||
|
syncCallCount += 1,
|
||||||
|
)
|
||||||
|
|
||||||
|
rm.startPeriodicTasks()
|
||||||
|
waitFor sleepAsync(chronos.seconds(1))
|
||||||
|
rm.cleanup()
|
||||||
|
|
||||||
|
check syncCallCount > 0
|
||||||
|
|
||||||
|
# Special cases handling
|
||||||
|
suite "Special Cases Handling":
|
||||||
|
var rm: ReliabilityManager
|
||||||
|
|
||||||
|
setup:
|
||||||
|
let rmResult = newReliabilityManager(some("testChannel"))
|
||||||
|
check rmResult.isOk()
|
||||||
|
rm = rmResult.get()
|
||||||
|
|
||||||
|
teardown:
|
||||||
|
if not rm.isNil:
|
||||||
|
rm.cleanup()
|
||||||
|
|
||||||
|
test "message history limits":
|
||||||
|
# Add messages up to max history size
|
||||||
|
for i in 0 .. rm.config.maxMessageHistory + 5:
|
||||||
|
let msg = @[byte(i)]
|
||||||
|
let id = "msg" & $i
|
||||||
|
let wrap = rm.wrapOutgoingMessage(msg, id)
|
||||||
|
check wrap.isOk()
|
||||||
|
|
||||||
|
let history = rm.getMessageHistory()
|
||||||
|
check:
|
||||||
|
history.len <= rm.config.maxMessageHistory
|
||||||
|
history[^1] == "msg" & $(rm.config.maxMessageHistory + 5)
|
||||||
|
|
||||||
|
test "invalid bloom filter handling":
|
||||||
|
let msgInvalid = SdsMessage(
|
||||||
|
messageId: "invalid-bf",
|
||||||
|
lamportTimestamp: 1,
|
||||||
|
causalHistory: @[],
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(1)],
|
||||||
|
bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data
|
||||||
|
,
|
||||||
|
)
|
||||||
|
|
||||||
|
let serializedInvalid = serializeMessage(msgInvalid)
|
||||||
|
check serializedInvalid.isOk()
|
||||||
|
|
||||||
|
# Should handle invalid bloom filter gracefully
|
||||||
|
let result = rm.unwrapReceivedMessage(serializedInvalid.get())
|
||||||
|
check:
|
||||||
|
result.isOk()
|
||||||
|
result.get()[1].len == 0 # No missing dependencies
|
||||||
|
|
||||||
|
test "duplicate message handling":
|
||||||
|
var messageReadyCount = 0
|
||||||
|
rm.setCallbacks(
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
messageReadyCount += 1,
|
||||||
|
proc(messageId: SdsMessageID) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
|
||||||
|
discard,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create and process a message
|
||||||
|
let msg = SdsMessage(
|
||||||
|
messageId: "dup-msg",
|
||||||
|
lamportTimestamp: 1,
|
||||||
|
causalHistory: @[],
|
||||||
|
channelId: some("testChannel"),
|
||||||
|
content: @[byte(1)],
|
||||||
|
bloomFilter: @[],
|
||||||
|
)
|
||||||
|
|
||||||
|
let serialized = serializeMessage(msg)
|
||||||
|
check serialized.isOk()
|
||||||
|
|
||||||
|
# Process same message twice
|
||||||
|
let result1 = rm.unwrapReceivedMessage(serialized.get())
|
||||||
|
check result1.isOk()
|
||||||
|
let result2 = rm.unwrapReceivedMessage(serialized.get())
|
||||||
|
check:
|
||||||
|
result2.isOk()
|
||||||
|
result2.get()[1].len == 0 # No missing deps on second process
|
||||||
|
messageReadyCount == 1 # Message should only be processed once
|
||||||
|
|
||||||
|
test "error handling":
|
||||||
|
# Empty message
|
||||||
|
let emptyMsg: seq[byte] = @[]
|
||||||
|
let emptyResult = rm.wrapOutgoingMessage(emptyMsg, "empty")
|
||||||
|
check:
|
||||||
|
not emptyResult.isOk()
|
||||||
|
emptyResult.error == reInvalidArgument
|
||||||
|
|
||||||
|
# Oversized message
|
||||||
|
let largeMsg = newSeq[byte](MaxMessageSize + 1)
|
||||||
|
let largeResult = rm.wrapOutgoingMessage(largeMsg, "large")
|
||||||
|
check:
|
||||||
|
not largeResult.isOk()
|
||||||
|
largeResult.error == reMessageTooLarge
|
||||||
|
|
||||||
|
suite "cleanup":
|
||||||
|
test "cleanup works correctly":
|
||||||
|
let rmResult = newReliabilityManager(some("testChannel"))
|
||||||
|
check rmResult.isOk()
|
||||||
|
let rm = rmResult.get()
|
||||||
|
|
||||||
|
# Add some messages
|
||||||
|
let msg = @[byte(1), 2, 3]
|
||||||
|
let msgId = "test-msg-1"
|
||||||
|
discard rm.wrapOutgoingMessage(msg, msgId)
|
||||||
|
|
||||||
|
rm.cleanup()
|
||||||
|
|
||||||
|
let outBuffer = rm.getOutgoingBuffer()
|
||||||
|
let history = rm.getMessageHistory()
|
||||||
|
check:
|
||||||
|
outBuffer.len == 0
|
||||||
|
history.len == 0
|
||||||
1
vendor/nim-chronicles
vendored
Submodule
1
vendor/nim-chronicles
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit a8fb38a10bcb548df78e9a70bd77b26bb50abd12
|
||||||
1
vendor/nim-chronos
vendored
Submodule
1
vendor/nim-chronos
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit b55e2816eb45f698ddaca8d8473e401502562db2
|
||||||
1
vendor/nim-confutils
vendored
Submodule
1
vendor/nim-confutils
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit e214b3992a31acece6a9aada7d0a1ad37c928f3b
|
||||||
1
vendor/nim-faststreams
vendored
Submodule
1
vendor/nim-faststreams
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 2b08c774afaafd600cf4c6f994cf78b8aa090c0c
|
||||||
1
vendor/nim-json-serialization
vendored
Submodule
1
vendor/nim-json-serialization
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 2b1c5eb11df3647a2cee107cd4cce3593cbb8bcf
|
||||||
1
vendor/nim-libp2p
vendored
Submodule
1
vendor/nim-libp2p
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit ac25da6cea158768bbc060b7be2fbe004206f3bb
|
||||||
1
vendor/nim-results
vendored
Submodule
1
vendor/nim-results
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit df8113dda4c2d74d460a8fa98252b0b771bf1f27
|
||||||
1
vendor/nim-serialization
vendored
Submodule
1
vendor/nim-serialization
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 548d0adc9797a10b2db7f788b804330306293088
|
||||||
1
vendor/nim-stew
vendored
Submodule
1
vendor/nim-stew
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit d7a6868ba84165e7fdde427af9a1fc3f5f5cc151
|
||||||
1
vendor/nim-taskpools
vendored
Submodule
1
vendor/nim-taskpools
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 7b74a716a40249720fd7da428113147942b9642d
|
||||||
1
vendor/nimbus-build-system
vendored
Submodule
1
vendor/nimbus-build-system
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 5f10509cf880dc035e517ca7bac3163cd5206ba8
|
||||||
Loading…
x
Reference in New Issue
Block a user