Compare commits

..

17 Commits

Author SHA1 Message Date
e301dad197
nix: use Nix Flake from NBS repo to provide Nim
This way we can track same Nim as in vendor folder.

Notably this upgrades from Nim 2.2.4 to 2.2.6.

Depends on:
https://github.com/status-im/nimbus-build-system/pull/112

Signed-off-by: Jakub Sokołowski <jakub@status.im>
2026-02-01 20:53:14 +01:00
Ivan FB
19c48ef602
Merge pull request #46 from logos-messaging/use-epoll-in-android
force epoll is being used in Android
2026-01-29 23:28:00 +01:00
shash256
9f7ae0c7df
feat: support retrieval hints for efficient message retrieval from store nodes (#18)
* feat: updates for retrieval hint

* use HistoryEntry for deps

* chore: rearrange helper funcs

* chore: address review comments

* fix: simplify with mapIt
2026-01-29 09:52:40 +00:00
Ivan FB
a8a5e42530
Merge pull request #45 from logos-messaging/fix-shebangs
fix: use env instead of hardcoding bash path
2026-01-29 10:23:17 +01:00
Ivan Folgueira Bande
4aa800ad2b
force epoll is being used in Android 2026-01-27 22:41:46 +01:00
c6e54b70ee
fix: use env instead of hardcoding bash path
Causes issues in Nix shells and derivations.

Signed-off-by: Jakub Sokołowski <jakub@status.im>
2026-01-27 15:28:57 +01:00
Ivan FB
239f619625
Merge pull request #42 from logos-messaging/initialize-lock
initialize ctxPoolLock to avoid crash on Windows/iOS
2026-01-14 12:16:15 +01:00
Ivan Folgueira Bande
be4c283581
initialize ctxPoolLock 2026-01-14 11:49:04 +01:00
Siddarth Kumar
fb8039c5a5
chore: fix iOS build
otherwise iOS linker fails with
  Undefined symbols for architecture arm64
2025-12-23 19:47:02 +04:00
Igor Sirotin
8d33a7f7da
feat: thread pool (#40)
* feat: thread pool

* proper pass ARCH in Makefile when building for Android

---------

Co-authored-by: Ivan Folgueira Bande <ivansete@status.im>
2025-12-22 18:10:45 +00:00
Ivan Folgueira Bande
e67639ee08
get arch from uname -m if ARCH env var is not set 2025-12-17 16:02:53 +01:00
Ivan FB
ac31e5adf2
Merge pull request #37 from logos-messaging/fix/buildForAppleSilicon
fix: Add condition to check hostCpu and then build based on that
2025-12-15 12:06:17 +01:00
Khushboo Mehta
13c3c348fa fix: Add condition to check hostCpu and then build based on that 2025-12-15 12:01:51 +01:00
Ivan FB
0b4d3cc03f
Merge pull request #38 from logos-messaging/rm-log
rm UNWRAP_MESSAGE failed error
2025-12-10 09:51:41 +01:00
Ivan Folgueira Bande
191928adc6
rm UNWRAP_MESSAGE failed error 2025-12-09 11:45:35 +01:00
Ivan Folgueira Bande
ae445d5585
rename ANDROID_NDK_HOME to ANDROID_NDK_ROOT 2025-11-28 19:34:51 +01:00
Ivan Folgueira Bande
024b8c50e9
adapt Makefile and sds.nimble to support iOS target 2025-11-27 23:01:39 +01:00
25 changed files with 538 additions and 184 deletions

47
.github/workflows/ci-nix.yml vendored Normal file
View File

@ -0,0 +1,47 @@
name: ci / nix
permissions:
contents: read
pull-requests: read
checks: write
on:
pull_request:
branches: [master]
jobs:
build:
strategy:
fail-fast: false
matrix:
system:
- aarch64-darwin
- x86_64-linux
nixpkg:
- libsds
- libsds-android-arm64
- libsds-android-amd64
- libsds-android-x86
- libsds-android-arm
include:
- system: aarch64-darwin
runs_on: [self-hosted, macOS, ARM64]
- system: x86_64-linux
runs_on: [self-hosted, Linux, X64]
name: '${{ matrix.system }} / ${{ matrix.nixpkg }}'
runs-on: ${{ matrix.runs_on }}
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: 'Run Nix build for ${{ matrix.nixpkg }}'
shell: bash
run: |
nix build -L '.?submodules=1#${{ matrix.nixpkg }}' \
--print-out-paths --accept-flake-config
- name: 'Show result contents'
shell: bash
run: find result/ -type f

View File

@ -1,25 +0,0 @@
---
name: ci / nix-builds
on:
pull_request:
branches: [master]
jobs:
build:
name: Build Nix Flake packages
runs-on: [self-hosted, Linux]
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Build library
shell: bash
run: |
nix build '.?submodules=1#libsds' \
--print-out-paths --accept-flake-config
- name: Build Android library
shell: bash
run: |
nix build '.?submodules=1#libsds-android-arm64' \
--print-out-paths --accept-flake-config

View File

@ -73,6 +73,7 @@ ifeq ($(detected_OS),Windows)
BUILD_COMMAND := $(BUILD_COMMAND)Windows BUILD_COMMAND := $(BUILD_COMMAND)Windows
else ifeq ($(detected_OS),Darwin) else ifeq ($(detected_OS),Darwin)
BUILD_COMMAND := $(BUILD_COMMAND)Mac BUILD_COMMAND := $(BUILD_COMMAND)Mac
export IOS_SDK_PATH := $(shell xcrun --sdk iphoneos --show-sdk-path)
else ifeq ($(detected_OS),Linux) else ifeq ($(detected_OS),Linux)
BUILD_COMMAND := $(BUILD_COMMAND)Linux BUILD_COMMAND := $(BUILD_COMMAND)Linux
endif endif
@ -93,16 +94,16 @@ libsds: | deps
ANDROID_TARGET ?= 30 ANDROID_TARGET ?= 30
ifeq ($(detected_OS),Darwin) ifeq ($(detected_OS),Darwin)
ANDROID_TOOLCHAIN_DIR := $(ANDROID_NDK_HOME)/toolchains/llvm/prebuilt/darwin-x86_64 ANDROID_TOOLCHAIN_DIR := $(ANDROID_NDK_ROOT)/toolchains/llvm/prebuilt/darwin-x86_64
else else
ANDROID_TOOLCHAIN_DIR := $(ANDROID_NDK_HOME)/toolchains/llvm/prebuilt/linux-x86_64 ANDROID_TOOLCHAIN_DIR := $(ANDROID_NDK_ROOT)/toolchains/llvm/prebuilt/linux-x86_64
endif endif
# Fixes "clang: not found" errors # Fixes "clang: not found" errors
PATH := $(ANDROID_TOOLCHAIN_DIR)/bin:$(PATH) PATH := $(ANDROID_TOOLCHAIN_DIR)/bin:$(PATH)
libsds-android-precheck: libsds-android-precheck:
ifndef ANDROID_NDK_HOME ifndef ANDROID_NDK_ROOT
$(error ANDROID_NDK_HOME is not set) $(error ANDROID_NDK_ROOT is not set)
endif endif
build-libsds-for-android-arch: NIM_PARAMS := $(NIM_PARAMS) --passC="--sysroot=$(ANDROID_TOOLCHAIN_DIR)/sysroot" build-libsds-for-android-arch: NIM_PARAMS := $(NIM_PARAMS) --passC="--sysroot=$(ANDROID_TOOLCHAIN_DIR)/sysroot"
@ -114,7 +115,7 @@ build-libsds-for-android-arch: NIM_PARAMS := $(NIM_PARAMS) --passC="-I$(ANDROID_
build-libsds-for-android-arch: NIM_PARAMS := $(NIM_PARAMS) --passL="-L$(ANDROID_TOOLCHAIN_DIR)/sysroot/usr/lib/$(ARCH_DIRNAME)/$(ANDROID_TARGET)" build-libsds-for-android-arch: NIM_PARAMS := $(NIM_PARAMS) --passL="-L$(ANDROID_TOOLCHAIN_DIR)/sysroot/usr/lib/$(ARCH_DIRNAME)/$(ANDROID_TARGET)"
build-libsds-for-android-arch: build-libsds-for-android-arch:
CC=$(ANDROID_TOOLCHAIN_DIR)/bin/$(ANDROID_ARCH)$(ANDROID_TARGET)-clang \ CC=$(ANDROID_TOOLCHAIN_DIR)/bin/$(ANDROID_ARCH)$(ANDROID_TARGET)-clang \
CPU=$(CPU) ABIDIR=$(ABIDIR) \ ARCH=$(ARCH) ABIDIR=$(ABIDIR) \
ARCH_DIRNAME=$(ARCH_DIRNAME) \ ARCH_DIRNAME=$(ARCH_DIRNAME) \
ANDROID_ARCH=$(ANDROID_ARCH) \ ANDROID_ARCH=$(ANDROID_ARCH) \
ANDROID_TOOLCHAIN_DIR=$(ANDROID_TOOLCHAIN_DIR) \ ANDROID_TOOLCHAIN_DIR=$(ANDROID_TOOLCHAIN_DIR) \
@ -122,37 +123,37 @@ build-libsds-for-android-arch:
nim libsdsAndroid $(NIM_PARAMS) sds.nims nim libsdsAndroid $(NIM_PARAMS) sds.nims
libsds-android-arm64: ANDROID_ARCH=aarch64-linux-android libsds-android-arm64: ANDROID_ARCH=aarch64-linux-android
libsds-android-arm64: CPU=arm64 libsds-android-arm64: ARCH=arm64
libsds-android-arm64: ABIDIR=arm64-v8a libsds-android-arm64: ABIDIR=arm64-v8a
libsds-android-arm64: ARCH_DIRNAME=aarch64-linux-android libsds-android-arm64: ARCH_DIRNAME=aarch64-linux-android
libsds-android-arm64: | libsds-android-precheck build deps libsds-android-arm64: | libsds-android-precheck build deps
$(MAKE) build-libsds-for-android-arch ANDROID_ARCH=$(ANDROID_ARCH) \ $(MAKE) build-libsds-for-android-arch ANDROID_ARCH=$(ANDROID_ARCH) \
CPU=$(CPU) ABIDIR=$(ABIDIR) ARCH_DIRNAME=$(ARCH_DIRNAME) ARCH=$(ARCH) ABIDIR=$(ABIDIR) ARCH_DIRNAME=$(ARCH_DIRNAME)
libsds-android-amd64: ANDROID_ARCH=x86_64-linux-android libsds-android-amd64: ANDROID_ARCH=x86_64-linux-android
libsds-android-amd64: CPU=amd64 libsds-android-amd64: ARCH=amd64
libsds-android-amd64: ABIDIR=x86_64 libsds-android-amd64: ABIDIR=x86_64
libsds-android-amd64: ARCH_DIRNAME=x86_64-linux-android libsds-android-amd64: ARCH_DIRNAME=x86_64-linux-android
libsds-android-amd64: | libsds-android-precheck build deps libsds-android-amd64: | libsds-android-precheck build deps
$(MAKE) build-libsds-for-android-arch ANDROID_ARCH=$(ANDROID_ARCH) \ $(MAKE) build-libsds-for-android-arch ANDROID_ARCH=$(ANDROID_ARCH) \
CPU=$(CPU) ABIDIR=$(ABIDIR) ARCH_DIRNAME=$(ARCH_DIRNAME) ARCH=$(ARCH) ABIDIR=$(ABIDIR) ARCH_DIRNAME=$(ARCH_DIRNAME)
libsds-android-x86: ANDROID_ARCH=i686-linux-android libsds-android-x86: ANDROID_ARCH=i686-linux-android
libsds-android-x86: CPU=i386 libsds-android-x86: ARCH=i386
libsds-android-x86: ABIDIR=x86 libsds-android-x86: ABIDIR=x86
libsds-android-x86: ARCH_DIRNAME=i686-linux-android libsds-android-x86: ARCH_DIRNAME=i686-linux-android
libsds-android-x86: | libsds-android-precheck build deps libsds-android-x86: | libsds-android-precheck build deps
$(MAKE) build-libsds-for-android-arch ANDROID_ARCH=$(ANDROID_ARCH) \ $(MAKE) build-libsds-for-android-arch ANDROID_ARCH=$(ANDROID_ARCH) \
CPU=$(CPU) ABIDIR=$(ABIDIR) ARCH_DIRNAME=$(ARCH_DIRNAME) ARCH=$(ARCH) ABIDIR=$(ABIDIR) ARCH_DIRNAME=$(ARCH_DIRNAME)
libsds-android-arm: ANDROID_ARCH=armv7a-linux-androideabi libsds-android-arm: ANDROID_ARCH=armv7a-linux-androideabi
libsds-android-arm: CPU=arm libsds-android-arm: ARCH=arm
libsds-android-arm: ABIDIR=armeabi-v7a libsds-android-arm: ABIDIR=armeabi-v7a
libsds-android-arm: ARCH_DIRNAME=arm-linux-androideabi libsds-android-arm: ARCH_DIRNAME=arm-linux-androideabi
libsds-android-arm: | libsds-android-precheck build deps libsds-android-arm: | libsds-android-precheck build deps
# cross-rs target architecture name does not match the one used in android # cross-rs target architecture name does not match the one used in android
$(MAKE) build-libsds-for-android-arch ANDROID_ARCH=$(ANDROID_ARCH) \ $(MAKE) build-libsds-for-android-arch ANDROID_ARCH=$(ANDROID_ARCH) \
CPU=$(CPU) ABIDIR=$(ABIDIR) ARCH_DIRNAME=$(ARCH_DIRNAME) \ ARCH=$(ARCH) ABIDIR=$(ABIDIR) ARCH_DIRNAME=$(ARCH_DIRNAME) \
libsds-android: libsds-android:
ifeq ($(ARCH),arm64) ifeq ($(ARCH),arm64)
@ -171,3 +172,9 @@ else
endif endif
endif endif
# Target iOS
libsds-ios: | deps
$(ENV_SCRIPT) nim libsdsIOS $(NIM_PARAMS) sds.nims

View File

@ -30,8 +30,8 @@ unzip android-ndk-r27c-linux.zip
Then, add the following to your ~/.bashrc file: Then, add the following to your ~/.bashrc file:
```code ```code
export ANDROID_NDK_HOME=$HOME/android-ndk-r27c export ANDROID_NDK_ROOT=$HOME/android-ndk-r27c
export PATH=$ANDROID_NDK_HOME/toolchains/llvm/prebuilt/linux-x86_64/bin:$PATH export PATH=$ANDROID_NDK_ROOT/toolchains/llvm/prebuilt/linux-x86_64/bin:$PATH
``` ```
Then, use one of the following commands, according to the current architecture: Then, use one of the following commands, according to the current architecture:

3
env.sh
View File

@ -1,8 +1,7 @@
#!/bin/bash #!/usr/bin/env bash
# We use ${BASH_SOURCE[0]} instead of $0 to allow sourcing this file # We use ${BASH_SOURCE[0]} instead of $0 to allow sourcing this file
# and we fall back to a Zsh-specific special var to also support Zsh. # and we fall back to a Zsh-specific special var to also support Zsh.
REL_PATH="$(dirname ${BASH_SOURCE[0]:-${(%):-%x}})" REL_PATH="$(dirname ${BASH_SOURCE[0]:-${(%):-%x}})"
ABS_PATH="$(cd ${REL_PATH}; pwd)" ABS_PATH="$(cd ${REL_PATH}; pwd)"
source ${ABS_PATH}/vendor/nimbus-build-system/scripts/env.sh source ${ABS_PATH}/vendor/nimbus-build-system/scripts/env.sh

23
flake.lock generated
View File

@ -1,5 +1,27 @@
{ {
"nodes": { "nodes": {
"nimbusBuildSystem": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1769685810,
"narHash": "sha256-mANqcQcRXb08ZR8WF6SoyVrL/nXUePtMZfpNx25BlLI=",
"ref": "refs/heads/master",
"rev": "0e7a764edae92b224326b3700f062702489ce2b4",
"revCount": 238,
"submodules": true,
"type": "git",
"url": "file:./vendor/nimbus-build-system"
},
"original": {
"submodules": true,
"type": "git",
"url": "file:./vendor/nimbus-build-system"
}
},
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1757590060, "lastModified": 1757590060,
@ -18,6 +40,7 @@
}, },
"root": { "root": {
"inputs": { "inputs": {
"nimbusBuildSystem": "nimbusBuildSystem",
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
} }
} }

View File

@ -7,10 +7,17 @@
}; };
inputs = { inputs = {
# We are pinning the commit because ultimately we want to use same commit across different projects.
# A commit from nixpkgs 24.11 release : https://github.com/NixOS/nixpkgs/tree/release-24.11
nixpkgs.url = "github:NixOS/nixpkgs?rev=0ef228213045d2cdb5a169a95d63ded38670b293"; nixpkgs.url = "github:NixOS/nixpkgs?rev=0ef228213045d2cdb5a169a95d63ded38670b293";
# WARNING: Remember to update commit and use 'nix flake update' to update flake.lock.
nimbusBuildSystem = {
url = "git+file:./vendor/nimbus-build-system?submodules=1";
inputs.nixpkgs.follows = "nixpkgs";
};
}; };
outputs = { self, nixpkgs }: outputs = { self, nixpkgs, nimbusBuildSystem }:
let let
stableSystems = [ stableSystems = [
"x86_64-linux" "aarch64-linux" "x86_64-linux" "aarch64-linux"
@ -40,38 +47,36 @@
in rec { in rec {
packages = forAllSystems (system: let packages = forAllSystems (system: let
pkgs = pkgsFor.${system}; pkgs = pkgsFor.${system};
targets = builtins.filter nim = nimbusBuildSystem.packages.${system}.nim;
(t: !(pkgs.stdenv.isDarwin && builtins.match "libsds-android.*" t != null))
[ buildTargets = pkgs.callPackage ./nix/default.nix {
"libsds-android-arm64" inherit stableSystems nim;
"libsds-android-amd64" src = self;
"libsds-android-x86" };
"libsds-android-arm"
]; skipAndroidOnDarwin = t: !(pkgs.stdenv.isDarwin);
targets = [
"libsds-android-arm64"
"libsds-android-amd64"
"libsds-android-x86"
"libsds-android-arm"
];
in rec { in rec {
# non-Android package # non-Android package
libsds = pkgs.callPackage ./nix/default.nix { libsds = buildTargets.override { targets = [ "libsds" ]; };
inherit stableSystems;
src = self;
targets = [ "libsds" ];
};
default = libsds; default = libsds;
} }
# Generate a package for each target dynamically # Generate a package for each target dynamically
// builtins.listToAttrs (map (name: { // builtins.listToAttrs (map (name: {
inherit name; inherit name;
value = pkgs.callPackage ./nix/default.nix { value = buildTargets.override { targets = [ name ]; };
inherit stableSystems;
src = self;
targets = [ name ];
};
}) targets)); }) targets));
devShells = forAllSystems (system: let devShells = forAllSystems (system: {
pkgs = pkgsFor.${system}; default = pkgsFor.${system}.callPackage ./nix/shell.nix {
in { inherit (nimbusBuildSystem.packages.${system}) nim;
default = pkgs.callPackage ./nix/shell.nix { } ; };
}); });
}; };

View File

@ -1,15 +1,15 @@
import std/json import std/json
import ./json_base_event, ../../src/[message] import ./json_base_event, ../../src/[message], std/base64
type JsonMissingDependenciesEvent* = ref object of JsonEvent type JsonMissingDependenciesEvent* = ref object of JsonEvent
messageId*: SdsMessageID messageId*: SdsMessageID
missingDeps: seq[SdsMessageID] missingDeps*: seq[HistoryEntry]
channelId*: SdsChannelID channelId*: SdsChannelID
proc new*( proc new*(
T: type JsonMissingDependenciesEvent, T: type JsonMissingDependenciesEvent,
messageId: SdsMessageID, messageId: SdsMessageID,
missingDeps: seq[SdsMessageID], missingDeps: seq[HistoryEntry],
channelId: SdsChannelID, channelId: SdsChannelID,
): T = ): T =
return JsonMissingDependenciesEvent( return JsonMissingDependenciesEvent(
@ -17,4 +17,15 @@ proc new*(
) )
method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string = method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string =
$(%*jsonMissingDependencies) var node = newJObject()
node["eventType"] = %*jsonMissingDependencies.eventType
node["messageId"] = %*jsonMissingDependencies.messageId
node["channelId"] = %*jsonMissingDependencies.channelId
var missingDepsNode = newJArray()
for dep in jsonMissingDependencies.missingDeps:
var depNode = newJObject()
depNode["messageId"] = %*dep.messageId
depNode["retrievalHint"] = %*encode(dep.retrievalHint)
missingDepsNode.add(depNode)
node["missingDeps"] = missingDepsNode
$node

View File

@ -5,6 +5,10 @@ type SdsCallBack* = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} ) {.cdecl, gcsafe, raises: [].}
type SdsRetrievalHintProvider* = proc(
messageId: cstring, hint: ptr cstring, hintLen: ptr csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
const RET_OK*: cint = 0 const RET_OK*: cint = 0
const RET_ERR*: cint = 1 const RET_ERR*: cint = 1
const RET_MISSING_CALLBACK*: cint = 2 const RET_MISSING_CALLBACK*: cint = 2

View File

@ -20,6 +20,8 @@ extern "C" {
typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData); typedef void (*SdsCallBack) (int callerRet, const char* msg, size_t len, void* userData);
typedef void (*SdsRetrievalHintProvider) (const char* messageId, char** hint, size_t* hintLen, void* userData);
// --- Core API Functions --- // --- Core API Functions ---
@ -28,6 +30,8 @@ void* SdsNewReliabilityManager(SdsCallBack callback, void* userData);
void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData); void SdsSetEventCallback(void* ctx, SdsCallBack callback, void* userData);
void SdsSetRetrievalHintProvider(void* ctx, SdsRetrievalHintProvider callback, void* userData);
int SdsCleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData); int SdsCleanupReliabilityManager(void* ctx, SdsCallBack callback, void* userData);
int SdsResetReliabilityManager(void* ctx, SdsCallBack callback, void* userData); int SdsResetReliabilityManager(void* ctx, SdsCallBack callback, void* userData);

View File

@ -5,7 +5,7 @@
when defined(linux): when defined(linux):
{.passl: "-Wl,-soname,libsds.so".} {.passl: "-Wl,-soname,libsds.so".}
import std/[typetraits, tables, atomics], chronos, chronicles import std/[typetraits, tables, atomics, locks], chronos, chronicles
import import
./sds_thread/sds_thread, ./sds_thread/sds_thread,
./alloc, ./alloc,
@ -57,6 +57,29 @@ template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
) )
var
ctxPool: seq[ptr SdsContext]
ctxPoolLock: Lock
proc acquireCtx(callback: SdsCallBack, userData: pointer): ptr SdsContext =
ctxPoolLock.acquire()
defer: ctxPoolLock.release()
if ctxPool.len > 0:
result = ctxPool.pop()
else:
result = sds_thread.createSdsThread().valueOr:
let msg = "Error in createSdsThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
proc releaseCtx(ctx: ptr SdsContext) =
ctxPoolLock.acquire()
defer: ctxPoolLock.release()
ctx.userData = nil
ctx.eventCallback = nil
ctx.eventUserData = nil
ctxPool.add(ctx)
proc handleRequest( proc handleRequest(
ctx: ptr SdsContext, ctx: ptr SdsContext,
requestType: RequestType, requestType: RequestType,
@ -82,7 +105,7 @@ proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback =
$JsonMessageSentEvent.new(messageId, channelId) $JsonMessageSentEvent.new(messageId, channelId)
proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback = proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback =
return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = return proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
callEventCallback(ctx, "onMissingDependencies"): callEventCallback(ctx, "onMissingDependencies"):
$JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId) $JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId)
@ -91,6 +114,25 @@ proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback =
callEventCallback(ctx, "onPeriodicSync"): callEventCallback(ctx, "onPeriodicSync"):
$JsonPeriodicSyncEvent.new() $JsonPeriodicSyncEvent.new()
proc onRetrievalHint(ctx: ptr SdsContext): RetrievalHintProvider =
return proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} =
if isNil(ctx.retrievalHintProvider):
return @[]
var hint: cstring
var hintLen: csize_t
cast[SdsRetrievalHintProvider](ctx.retrievalHintProvider)(
messageId.cstring, addr hint, addr hintLen, ctx.retrievalHintUserData
)
if not isNil(hint) and hintLen > 0:
var hintBytes = newSeq[byte](hintLen)
copyMem(addr hintBytes[0], hint, hintLen)
deallocShared(hint)
return hintBytes
return @[]
### End of not-exported components ### End of not-exported components
################################################################################ ################################################################################
@ -117,6 +159,7 @@ proc initializeLibrary() {.exported.} =
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime. ## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix ## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
libsdsNimMain() libsdsNimMain()
ctxPoolLock.initLock() # ensure the lock is initialized once (fix Windows crash)
when declared(setupForeignThreadGc): when declared(setupForeignThreadGc):
setupForeignThreadGc() setupForeignThreadGc()
when declared(nimGC_setStackBottom): when declared(nimGC_setStackBottom):
@ -140,10 +183,9 @@ proc SdsNewReliabilityManager(
echo "error: missing callback in NewReliabilityManager" echo "error: missing callback in NewReliabilityManager"
return nil return nil
## Create the SDS thread that will keep waiting for req from the main thread. ## Create or reuse the SDS thread that will keep waiting for req from the main thread.
var ctx = sds_thread.createSdsThread().valueOr: var ctx = acquireCtx(callback, userData)
let msg = "Error in createSdsThread: " & $error if ctx.isNil():
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil return nil
ctx.userData = userData ctx.userData = userData
@ -153,6 +195,7 @@ proc SdsNewReliabilityManager(
messageSentCb: onMessageSent(ctx), messageSentCb: onMessageSent(ctx),
missingDependenciesCb: onMissingDependencies(ctx), missingDependenciesCb: onMissingDependencies(ctx),
periodicSyncCb: onPeriodicSync(ctx), periodicSyncCb: onPeriodicSync(ctx),
retrievalHintProvider: onRetrievalHint(ctx),
) )
let retCode = handleRequest( let retCode = handleRequest(
@ -177,20 +220,33 @@ proc SdsSetEventCallback(
ctx[].eventCallback = cast[pointer](callback) ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData ctx[].eventUserData = userData
proc SdsSetRetrievalHintProvider(
ctx: ptr SdsContext, callback: SdsRetrievalHintProvider, userData: pointer
) {.dynlib, exportc.} =
initializeLibrary()
ctx[].retrievalHintProvider = cast[pointer](callback)
ctx[].retrievalHintUserData = userData
proc SdsCleanupReliabilityManager( proc SdsCleanupReliabilityManager(
ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer ctx: ptr SdsContext, callback: SdsCallBack, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc.} =
initializeLibrary() initializeLibrary()
checkLibsdsParams(ctx, callback, userData) checkLibsdsParams(ctx, callback, userData)
sds_thread.destroySdsThread(ctx).isOkOr: let resetRes = handleRequest(
let msg = "libsds error: " & $error ctx,
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) RequestType.LIFECYCLE,
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER),
callback,
userData,
)
if resetRes == RET_ERR:
return RET_ERR return RET_ERR
## always need to invoke the callback although we don't retrieve value to the caller releaseCtx(ctx)
callback(RET_OK, nil, 0, userData)
# handleRequest already invoked the callback; nothing else to signal here.
return RET_OK return RET_OK
proc SdsResetReliabilityManager( proc SdsResetReliabilityManager(

View File

@ -40,6 +40,7 @@ proc createReliabilityManager(
rm.setCallbacks( rm.setCallbacks(
appCallbacks.messageReadyCb, appCallbacks.messageSentCb, appCallbacks.messageReadyCb, appCallbacks.messageSentCb,
appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb, appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb,
appCallbacks.retrievalHintProvider,
) )
return ok(rm) return ok(rm)

View File

@ -1,4 +1,4 @@
import std/[json, strutils, net, sequtils] import std/[json, strutils, net, sequtils, base64]
import chronos, chronicles, results import chronos, chronicles, results
import ../../../alloc import ../../../alloc
@ -17,7 +17,7 @@ type SdsMessageRequest* = object
type SdsUnwrapResponse* = object type SdsUnwrapResponse* = object
message*: seq[byte] message*: seq[byte]
missingDeps*: seq[SdsMessageID] missingDeps*: seq[HistoryEntry]
channelId*: string channelId*: string
proc createShared*( proc createShared*(
@ -62,13 +62,22 @@ proc process*(
of UNWRAP_MESSAGE: of UNWRAP_MESSAGE:
let messageBytes = self.message.toSeq() let messageBytes = self.message.toSeq()
let (unwrappedMessage, missingDeps, channelId) = unwrapReceivedMessage(rm[], messageBytes).valueOr: let (unwrappedMessage, missingDeps, extractedChannelId) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
error "UNWRAP_MESSAGE failed", error = error
return err("error processing UNWRAP_MESSAGE request: " & $error) return err("error processing UNWRAP_MESSAGE request: " & $error)
let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps, channelId: channelId) let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps, channelId: extractedChannelId)
# return the result as a json string # return the result as a json string
return ok($(%*(res))) var node = newJObject()
node["message"] = %*res.message
node["channelId"] = %*extractedChannelId
var missingDepsNode = newJArray()
for dep in res.missingDeps:
var depNode = newJObject()
depNode["messageId"] = %*dep.messageId
depNode["retrievalHint"] = %*encode(dep.retrievalHint)
missingDepsNode.add(depNode)
node["missingDeps"] = missingDepsNode
return ok($node)
return ok("") return ok("")

View File

@ -20,6 +20,8 @@ type SdsContext* = object
userData*: pointer userData*: pointer
eventCallback*: pointer eventCallback*: pointer
eventUserdata*: pointer eventUserdata*: pointer
retrievalHintProvider*: pointer
retrievalHintUserData*: pointer
running: Atomic[bool] # To control when the thread is running running: Atomic[bool] # To control when the thread is running
proc runSds(ctx: ptr SdsContext) {.async.} = proc runSds(ctx: ptr SdsContext) {.async.} =

View File

@ -1,14 +1,14 @@
{ {
config ? {}, pkgs,
pkgs ? import <nixpkgs> { },
src ? ../., src ? ../.,
targets ? ["libsds-android-arm64"], # Nimbus-build-system package.
nim ? null,
# Options: 0,1,2
verbosity ? 2, verbosity ? 2,
useSystemNim ? true, # Make targets
quickAndDirty ? true, targets ? ["libsds-android-arm64"],
stableSystems ? [ # These are the only platforms tested in CI and considered stable.
"x86_64-linux" "aarch64-linux" stableSystems ? ["x86_64-linux" "aarch64-linux" "x86_64-darwin" "aarch64-darwin" "x86_64-windows"],
]
}: }:
assert pkgs.lib.assertMsg ((src.submodules or true) == true) assert pkgs.lib.assertMsg ((src.submodules or true) == true)
@ -22,55 +22,49 @@ let
containsAndroid = s: (match ".*android.*" s) != null; containsAndroid = s: (match ".*android.*" s) != null;
isAndroidBuild = any containsAndroid targets; isAndroidBuild = any containsAndroid targets;
version = substring 0 8 (src.sourceInfo.rev or "dirty"); tools = callPackage ./tools.nix {};
in stdenv.mkDerivation rec { revision = substring 0 8 (src.rev or src.dirtyRev or "00000000");
version = tools.findKeyValue "^version = \"([a-f0-9.-]+)\"$" ../sds.nimble;
in stdenv.mkDerivation {
pname = "nim-sds"; pname = "nim-sds";
inherit src version; inherit src;
version = "${version}-${revision}";
env = {
NIMFLAGS = "-d:disableMarchNative";
ANDROID_SDK_ROOT = optionalString isAndroidBuild pkgs.androidPkgs.sdk;
ANDROID_NDK_ROOT = optionalString isAndroidBuild pkgs.androidPkgs.ndk;
};
buildInputs = with pkgs; [ buildInputs = with pkgs; [
openssl openssl gmp zip
gmp
zip
]; ];
# Dependencies that should only exist in the build environment. # Dependencies that should only exist in the build environment.
nativeBuildInputs = let nativeBuildInputs = with pkgs; [
# Fix for Nim compiler calling 'git rev-parse' and 'lsb_release'. nim cmake which patchelf
fakeGit = writeScriptBin "git" "echo ${version}";
in with pkgs; [
cmake
which
nim-unwrapped-2_2
fakeGit
] ++ optionals stdenv.isLinux [ ] ++ optionals stdenv.isLinux [
pkgs.lsb-release pkgs.lsb-release
]; ];
ANDROID_SDK_ROOT = optionalString isAndroidBuild pkgs.androidPkgs.sdk;
ANDROID_NDK_HOME = optionalString isAndroidBuild pkgs.androidPkgs.ndk;
NIMFLAGS = "-d:disableMarchNative -d:git_revision_override=${version}";
XDG_CACHE_HOME = "/tmp";
makeFlags = targets ++ [ makeFlags = targets ++ [
"V=${toString verbosity}" "V=${toString verbosity}"
# Built from nimbus-build-system via flake.
"USE_SYSTEM_NIM=1" "USE_SYSTEM_NIM=1"
]; ];
configurePhase = '' configurePhase = ''
patchShebangs . vendor/nimbus-build-system > /dev/null # Avoid /tmp write errors.
make nimbus-build-system-paths export XDG_CACHE_HOME=$TMPDIR/cache
patchShebangs . vendor/nimbus-build-system/scripts
make nimbus-build-system-nimble-dir make nimbus-build-system-nimble-dir
''; '';
preBuild = ''
ln -s sds.nimble sds.nims
'';
installPhase = let installPhase = let
androidManifest = '' androidManifest = ''
<manifest xmlns:android=\"http://schemas.android.com/apk/res/android\" package=\"org.waku.${pname}\" /> <manifest xmlns:android=\"http://schemas.android.com/apk/res/android\" package=\"org.waku.nim-sds\" />
''; '';
in if isAndroidBuild then '' in if isAndroidBuild then ''
mkdir -p $out/jni mkdir -p $out/jni
@ -88,6 +82,6 @@ in stdenv.mkDerivation rec {
description = "Nim implementation of the e2e reliability protocol"; description = "Nim implementation of the e2e reliability protocol";
homepage = "https://github.com/status-im/nim-sds"; homepage = "https://github.com/status-im/nim-sds";
license = licenses.mit; license = licenses.mit;
platforms = ["x86_64-linux" "aarch64-linux" "x86_64-darwin" "aarch64-darwin" "x86_64-windows"]; platforms = stableSystems;
}; };
} }

View File

@ -8,13 +8,13 @@ mkShell {
export ANDROID_HOME="${androidPkgs.sdk}" export ANDROID_HOME="${androidPkgs.sdk}"
export ANDROID_NDK_ROOT="${androidPkgs.ndk}" export ANDROID_NDK_ROOT="${androidPkgs.ndk}"
export ANDROID_SDK_ROOT="$ANDROID_HOME" export ANDROID_SDK_ROOT="$ANDROID_HOME"
export ANDROID_NDK_HOME="${androidPkgs.ndk}" export ANDROID_NDK_ROOT="${androidPkgs.ndk}"
export PATH="$ANDROID_NDK_ROOT:$PATH" export PATH="$ANDROID_NDK_ROOT:$PATH"
export PATH="$ANDROID_SDK_ROOT/tools:$PATH" export PATH="$ANDROID_SDK_ROOT/tools:$PATH"
export PATH="$ANDROID_SDK_ROOT/tools/bin:$PATH" export PATH="$ANDROID_SDK_ROOT/tools/bin:$PATH"
export PATH="$ANDROID_SDK_ROOT/platform-tools:$PATH" export PATH="$ANDROID_SDK_ROOT/platform-tools:$PATH"
export PATH="$(echo $ANDROID_SDK_ROOT/cmdline-tools/*/bin):$PATH" export PATH="$(echo $ANDROID_SDK_ROOT/cmdline-tools/*/bin):$PATH"
export PATH="$(echo $ANDROID_NDK_HOME/toolchains/llvm/prebuilt/*/bin):$PATH" export PATH="$(echo $ANDROID_NDK_ROOT/toolchains/llvm/prebuilt/*/bin):$PATH"
''; '';
} }

View File

@ -1,27 +1,27 @@
{ {
pkgs ? import <nixpkgs> { }, pkgs ? import <nixpkgs> { },
nim ? null,
}: }:
let let
inherit (pkgs) lib stdenv; inherit (pkgs) lib stdenv;
/* No Android SDK for Darwin aarch64. */
isMacM1 = stdenv.isDarwin && stdenv.isAarch64;
in pkgs.mkShell { in pkgs.mkShell {
inputsFrom = lib.optionals (!isMacM1) [ inputsFrom = [
pkgs.androidShell pkgs.androidShell
]; ];
buildInputs = with pkgs; [ buildInputs = with pkgs; [
nim
which which
git git
cmake cmake
nim-unwrapped-2_2
] ++ lib.optionals stdenv.isDarwin [ ] ++ lib.optionals stdenv.isDarwin [
pkgs.libiconv pkgs.libiconv
]; ];
# Avoid compiling Nim itself. # Avoid compiling Nim itself.
shellHook = '' shellHook = ''
export MAKEFLAGS='USE_SYSTEM_NIM=1' export USE_SYSTEM_NIM=1
''; '';
} }

15
nix/tools.nix Normal file
View File

@ -0,0 +1,15 @@
{ pkgs ? import <nixpkgs> { } }:
let
inherit (pkgs.lib) fileContents last splitString flatten remove;
inherit (builtins) map match;
in {
findKeyValue = regex: sourceFile:
let
linesFrom = file: splitString "\n" (fileContents file);
matching = regex: lines: map (line: match regex line) lines;
extractMatch = matches: last (flatten (remove null matches));
in
extractMatch (matching regex (linesFrom sourceFile));
}

View File

@ -1,5 +1,7 @@
mode = ScriptMode.Verbose mode = ScriptMode.Verbose
import strutils, os
# Package # Package
version = "0.1.0" version = "0.1.0"
author = "Waku Team" author = "Waku Team"
@ -38,6 +40,12 @@ proc buildLibrary(
" --threads:on --app:lib --opt:size --noMain --mm:refc --header --nimMainPrefix:libsds --skipParentCfg:on " & " --threads:on --app:lib --opt:size --noMain --mm:refc --header --nimMainPrefix:libsds --skipParentCfg:on " &
extra_params & " " & srcDir & name & ".nim" extra_params & " " & srcDir & name & ".nim"
proc getArch(): string =
let arch = getEnv("ARCH")
if arch != "": return $arch
let (archFromUname, _) = gorgeEx("uname -m")
return $archFromUname
# Tasks # Tasks
task test, "Run the test suite": task test, "Run the test suite":
exec "nim c -r tests/test_bloom.nim" exec "nim c -r tests/test_bloom.nim"
@ -62,9 +70,14 @@ task libsdsDynamicLinux, "Generate bindings":
task libsdsDynamicMac, "Generate bindings": task libsdsDynamicMac, "Generate bindings":
let outLibNameAndExt = "libsds.dylib" let outLibNameAndExt = "libsds.dylib"
let name = "libsds" let name = "libsds"
let arch = getArch()
let sdkPath = staticExec("xcrun --show-sdk-path").strip()
let archFlags = (if arch == "arm64": "--cpu:arm64 --passC:\"-arch arm64\" --passL:\"-arch arm64\" --passC:\"-isysroot " & sdkPath & "\" --passL:\"-isysroot " & sdkPath & "\""
else: "--cpu:amd64 --passC:\"-arch x86_64\" --passL:\"-arch x86_64\" --passC:\"-isysroot " & sdkPath & "\" --passL:\"-isysroot " & sdkPath & "\"")
buildLibrary outLibNameAndExt, buildLibrary outLibNameAndExt,
name, "library/", name, "library/",
"""-d:chronicles_line_numbers --warning:Deprecated:off --warning:UnusedImport:on -d:chronicles_log_level=TRACE """, archFlags & " -d:chronicles_line_numbers --warning:Deprecated:off --warning:UnusedImport:on -d:chronicles_log_level=TRACE",
"dynamic" "dynamic"
task libsdsStaticWindows, "Generate bindings": task libsdsStaticWindows, "Generate bindings":
@ -86,14 +99,78 @@ task libsdsStaticLinux, "Generate bindings":
task libsdsStaticMac, "Generate bindings": task libsdsStaticMac, "Generate bindings":
let outLibNameAndExt = "libsds.a" let outLibNameAndExt = "libsds.a"
let name = "libsds" let name = "libsds"
let arch = getArch()
let sdkPath = staticExec("xcrun --show-sdk-path").strip()
let archFlags = (if arch == "arm64": "--cpu:arm64 --passC:\"-arch arm64\" --passL:\"-arch arm64\" --passC:\"-isysroot " & sdkPath & "\" --passL:\"-isysroot " & sdkPath & "\""
else: "--cpu:amd64 --passC:\"-arch x86_64\" --passL:\"-arch x86_64\" --passC:\"-isysroot " & sdkPath & "\" --passL:\"-isysroot " & sdkPath & "\"")
buildLibrary outLibNameAndExt, buildLibrary outLibNameAndExt,
name, "library/", name, "library/",
"""-d:chronicles_line_numbers --warning:Deprecated:off --warning:UnusedImport:on -d:chronicles_log_level=TRACE """, archFlags & " -d:chronicles_line_numbers --warning:Deprecated:off --warning:UnusedImport:on -d:chronicles_log_level=TRACE",
"static" "static"
# Build Mobile iOS
proc buildMobileIOS(srcDir = ".", sdkPath = "") =
echo "Building iOS libsds library"
let outDir = "build"
let nimcacheDir = outDir & "/nimcache"
if not dirExists outDir:
mkDir outDir
if sdkPath.len == 0:
quit "Error: Xcode/iOS SDK not found"
let aFile = outDir & "/libsds.a"
let aFileTmp = outDir & "/libsds_tmp.a"
let arch = getArch()
# 1) Generate C sources from Nim (no linking)
# Use unique symbol prefix to avoid conflicts with other Nim libraries
exec "nim c" &
" --nimcache:" & nimcacheDir & " --os:ios --cpu:" & arch &
" --compileOnly:on" &
" --noMain --mm:orc" &
" --threads:on --opt:size --header" &
" --nimMainPrefix:libsds --skipParentCfg:on" &
" --cc:clang" &
" -d:useMalloc" &
" " & srcDir & "/libsds.nim"
# 2) Compile all generated C files to object files with hidden visibility
# This prevents symbol conflicts with other Nim libraries (e.g., libnim_status_client)
let clangFlags = "-arch " & arch & " -isysroot " & sdkPath &
" -I./vendor/nimbus-build-system/vendor/Nim/lib/" &
" -fembed-bitcode -miphoneos-version-min=16.0 -O2" &
" -fvisibility=hidden"
var objectFiles: seq[string] = @[]
for cFile in listFiles(nimcacheDir):
if cFile.endsWith(".c"):
let oFile = cFile.changeFileExt("o")
exec "clang " & clangFlags & " -c " & cFile & " -o " & oFile
objectFiles.add(oFile)
# 3) Create static library from all object files
exec "ar rcs " & aFileTmp & " " & objectFiles.join(" ")
# 4) Use libtool to localize all non-public symbols
# Keep only Sds* functions as global, hide everything else to prevent conflicts
# with nim runtime symbols from libnim_status_client
let keepSymbols = "_Sds*:_libsdsNimMain:_libsdsDatInit*:_libsdsInit*:_NimMainModule__libsds*"
exec "xcrun libtool -static -o " & aFile & " " & aFileTmp &
" -exported_symbols_list /dev/stdin <<< '" & keepSymbols & "' 2>/dev/null || cp " & aFileTmp & " " & aFile
echo "✔ iOS library created: " & aFile
task libsdsIOS, "Build the mobile bindings for iOS":
let srcDir = "./library"
let sdkPath = getEnv("IOS_SDK_PATH")
buildMobileIOS srcDir, sdkPath
### Mobile Android ### Mobile Android
proc buildMobileAndroid(srcDir = ".", params = "") = proc buildMobileAndroid(srcDir = ".", params = "") =
let cpu = getEnv("CPU") let cpu = getArch()
let outDir = "build/" let outDir = "build/"
if not dirExists outDir: if not dirExists outDir:
@ -106,7 +183,7 @@ proc buildMobileAndroid(srcDir = ".", params = "") =
exec "nim c" & " --out:" & outDir & exec "nim c" & " --out:" & outDir &
"/libsds.so --threads:on --app:lib --opt:size --noMain --mm:refc --nimMainPrefix:libsds " & "/libsds.so --threads:on --app:lib --opt:size --noMain --mm:refc --nimMainPrefix:libsds " &
"-d:chronicles_sinks=textlines[dynamic] --header --passL:-L" & outdir & "-d:chronicles_sinks=textlines[dynamic] --header --passL:-L" & outdir &
" --passL:-llog --cpu:" & cpu & " --os:android -d:androidNDK " & extra_params & " " & " --passL:-llog --cpu:" & cpu & " --os:android -d:androidNDK -d:chronosEventEngine=epoll " & extra_params & " " &
srcDir & "/libsds.nim" srcDir & "/libsds.nim"
task libsdsAndroid, "Build the mobile bindings for Android": task libsdsAndroid, "Build the mobile bindings for Android":

View File

@ -4,10 +4,14 @@ type
SdsMessageID* = string SdsMessageID* = string
SdsChannelID* = string SdsChannelID* = string
HistoryEntry* = object
messageId*: SdsMessageID
retrievalHint*: seq[byte] ## Optional hint for efficient retrieval (e.g., Waku message hash)
SdsMessage* = object SdsMessage* = object
messageId*: SdsMessageID messageId*: SdsMessageID
lamportTimestamp*: int64 lamportTimestamp*: int64
causalHistory*: seq[SdsMessageID] causalHistory*: seq[HistoryEntry]
channelId*: SdsChannelID channelId*: SdsChannelID
content*: seq[byte] content*: seq[byte]
bloomFilter*: seq[byte] bloomFilter*: seq[byte]

View File

@ -1,5 +1,4 @@
import libp2p/protobuf/minprotobuf import libp2p/protobuf/minprotobuf
import std/options
import endians import endians
import ../src/[message, protobufutil, bloom, reliability_utils] import ../src/[message, protobufutil, bloom, reliability_utils]
@ -9,8 +8,13 @@ proc encode*(msg: SdsMessage): ProtoBuffer =
pb.write(1, msg.messageId) pb.write(1, msg.messageId)
pb.write(2, uint64(msg.lamportTimestamp)) pb.write(2, uint64(msg.lamportTimestamp))
for hist in msg.causalHistory: for entry in msg.causalHistory:
pb.write(3, hist) var entryPb = initProtoBuffer()
entryPb.write(1, entry.messageId)
if entry.retrievalHint.len > 0:
entryPb.write(2, entry.retrievalHint)
entryPb.finish()
pb.write(3, entryPb.buffer)
pb.write(4, msg.channelId) pb.write(4, msg.channelId)
pb.write(5, msg.content) pb.write(5, msg.content)
@ -31,10 +35,24 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
return err(ProtobufError.missingRequiredField("lamportTimestamp")) return err(ProtobufError.missingRequiredField("lamportTimestamp"))
msg.lamportTimestamp = int64(timestamp) msg.lamportTimestamp = int64(timestamp)
var causalHistory: seq[SdsMessageID] # Handle both old and new causal history formats
let histResult = pb.getRepeatedField(3, causalHistory) var historyBuffers: seq[seq[byte]]
if histResult.isOk: if pb.getRepeatedField(3, historyBuffers).isOk():
msg.causalHistory = causalHistory # New format: repeated HistoryEntry
for histBuffer in historyBuffers:
let entryPb = initProtoBuffer(histBuffer)
var entry: HistoryEntry
if not ?entryPb.getField(1, entry.messageId):
return err(ProtobufError.missingRequiredField("HistoryEntry.messageId"))
# retrievalHint is optional
discard entryPb.getField(2, entry.retrievalHint)
msg.causalHistory.add(entry)
else:
# Try old format: repeated string
var causalHistory: seq[SdsMessageID]
let histResult = pb.getRepeatedField(3, causalHistory)
if histResult.isOk():
msg.causalHistory = toCausalHistory(causalHistory)
if not ?pb.getField(4, msg.channelId): if not ?pb.getField(4, msg.channelId):
return err(ProtobufError.missingRequiredField("channelId")) return err(ProtobufError.missingRequiredField("channelId"))

View File

@ -24,10 +24,10 @@ proc newReliabilityManager*(
proc isAcknowledged*( proc isAcknowledged*(
msg: UnacknowledgedMessage, msg: UnacknowledgedMessage,
causalHistory: seq[SdsMessageID], causalHistory: seq[HistoryEntry],
rbf: Option[RollingBloomFilter], rbf: Option[RollingBloomFilter],
): bool = ): bool =
if msg.message.messageId in causalHistory: if msg.message.messageId in causalHistory.getMessageIds():
return true return true
if rbf.isSome(): if rbf.isSome():
@ -112,7 +112,7 @@ proc wrapOutgoingMessage*(
let msg = SdsMessage( let msg = SdsMessage(
messageId: messageId, messageId: messageId,
lamportTimestamp: channel.lamportTimestamp, lamportTimestamp: channel.lamportTimestamp,
causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory, channelId), causalHistory: rm.getRecentHistoryEntries(rm.config.maxCausalHistory, channelId),
channelId: channelId, channelId: channelId,
content: message, content: message,
bloomFilter: bfResult.get(), bloomFilter: bfResult.get(),
@ -176,7 +176,7 @@ proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gc
proc unwrapReceivedMessage*( proc unwrapReceivedMessage*(
rm: ReliabilityManager, message: seq[byte] rm: ReliabilityManager, message: seq[byte]
): Result[ ): Result[
tuple[message: seq[byte], missingDeps: seq[SdsMessageID], channelId: SdsChannelID], tuple[message: seq[byte], missingDeps: seq[HistoryEntry], channelId: SdsChannelID],
ReliabilityError, ReliabilityError,
] = ] =
## Unwraps a received message and processes its reliability metadata. ## Unwraps a received message and processes its reliability metadata.
@ -209,7 +209,7 @@ proc unwrapReceivedMessage*(
if missingDeps.len == 0: if missingDeps.len == 0:
var depsInBuffer = false var depsInBuffer = false
for msgId, entry in channel.incomingBuffer.pairs(): for msgId, entry in channel.incomingBuffer.pairs():
if msgId in msg.causalHistory: if msgId in msg.causalHistory.getMessageIds():
depsInBuffer = true depsInBuffer = true
break break
# Check if any dependencies are still in incoming buffer # Check if any dependencies are still in incoming buffer
@ -224,7 +224,7 @@ proc unwrapReceivedMessage*(
rm.onMessageReady(msg.messageId, channelId) rm.onMessageReady(msg.messageId, channelId)
else: else:
channel.incomingBuffer[msg.messageId] = channel.incomingBuffer[msg.messageId] =
IncomingMessage(message: msg, missingDeps: missingDeps.toHashSet()) IncomingMessage(message: msg, missingDeps: missingDeps.getMessageIds().toHashSet())
if not rm.onMissingDependencies.isNil(): if not rm.onMissingDependencies.isNil():
rm.onMissingDependencies(msg.messageId, missingDeps, channelId) rm.onMissingDependencies(msg.messageId, missingDeps, channelId)
@ -271,6 +271,7 @@ proc setCallbacks*(
onMessageSent: MessageSentCallback, onMessageSent: MessageSentCallback,
onMissingDependencies: MissingDependenciesCallback, onMissingDependencies: MissingDependenciesCallback,
onPeriodicSync: PeriodicSyncCallback = nil, onPeriodicSync: PeriodicSyncCallback = nil,
onRetrievalHint: RetrievalHintProvider = nil
) = ) =
## Sets the callback functions for various events in the ReliabilityManager. ## Sets the callback functions for various events in the ReliabilityManager.
## ##
@ -279,11 +280,13 @@ proc setCallbacks*(
## - onMessageSent: Callback function called when a message is confirmed as sent. ## - onMessageSent: Callback function called when a message is confirmed as sent.
## - onMissingDependencies: Callback function called when a message has missing dependencies. ## - onMissingDependencies: Callback function called when a message has missing dependencies.
## - onPeriodicSync: Callback function called to notify about periodic sync ## - onPeriodicSync: Callback function called to notify about periodic sync
## - onRetrievalHint: Callback function called to get a retrieval hint for a message ID.
withLock rm.lock: withLock rm.lock:
rm.onMessageReady = onMessageReady rm.onMessageReady = onMessageReady
rm.onMessageSent = onMessageSent rm.onMessageSent = onMessageSent
rm.onMissingDependencies = onMissingDependencies rm.onMissingDependencies = onMissingDependencies
rm.onPeriodicSync = onPeriodicSync rm.onPeriodicSync = onPeriodicSync
rm.onRetrievalHint = onRetrievalHint
proc checkUnacknowledgedMessages( proc checkUnacknowledgedMessages(
rm: ReliabilityManager, channelId: SdsChannelID rm: ReliabilityManager, channelId: SdsChannelID

View File

@ -1,4 +1,4 @@
import std/[times, locks, tables] import std/[times, locks, tables, sequtils]
import chronicles, results import chronicles, results
import ./[rolling_bloom_filter, message] import ./[rolling_bloom_filter, message]
@ -10,9 +10,11 @@ type
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.}
MissingDependenciesCallback* = proc( MissingDependenciesCallback* = proc(
messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.} ) {.gcsafe.}
RetrievalHintProvider* = proc(messageId: SdsMessageID): seq[byte] {.gcsafe.}
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
AppCallbacks* = ref object AppCallbacks* = ref object
@ -20,6 +22,7 @@ type
messageSentCb*: MessageSentCallback messageSentCb*: MessageSentCallback
missingDependenciesCb*: MissingDependenciesCallback missingDependenciesCb*: MissingDependenciesCallback
periodicSyncCb*: PeriodicSyncCallback periodicSyncCb*: PeriodicSyncCallback
retrievalHintProvider*: RetrievalHintProvider
ReliabilityConfig* = object ReliabilityConfig* = object
bloomFilterCapacity*: int bloomFilterCapacity*: int
@ -45,9 +48,10 @@ type
onMessageReady*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} onMessageReady*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.}
onMessageSent*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} onMessageSent*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.}
onMissingDependencies*: proc( onMissingDependencies*: proc(
messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.} ) {.gcsafe.}
onPeriodicSync*: PeriodicSyncCallback onPeriodicSync*: PeriodicSyncCallback
onRetrievalHint*: RetrievalHintProvider
ReliabilityError* {.pure.} = enum ReliabilityError* {.pure.} = enum
reInvalidArgument reInvalidArgument
@ -120,31 +124,56 @@ proc updateLamportTimestamp*(
error "Failed to update lamport timestamp", error "Failed to update lamport timestamp",
channelId = channelId, msgTs = msgTs, error = getCurrentExceptionMsg() channelId = channelId, msgTs = msgTs, error = getCurrentExceptionMsg()
proc getRecentSdsMessageIDs*( # Helper functions for HistoryEntry
proc newHistoryEntry*(messageId: SdsMessageID, retrievalHint: seq[byte] = @[]): HistoryEntry =
## Creates a new HistoryEntry with optional retrieval hint
HistoryEntry(messageId: messageId, retrievalHint: retrievalHint)
proc toCausalHistory*(messageIds: seq[SdsMessageID]): seq[HistoryEntry] =
## Converts a sequence of message IDs to HistoryEntry sequence (for backward compatibility)
return messageIds.mapIt(newHistoryEntry(it))
proc getMessageIds*(causalHistory: seq[HistoryEntry]): seq[SdsMessageID] =
## Extracts message IDs from HistoryEntry sequence
return causalHistory.mapIt(it.messageId)
proc getRecentHistoryEntries*(
rm: ReliabilityManager, n: int, channelId: SdsChannelID rm: ReliabilityManager, n: int, channelId: SdsChannelID
): seq[SdsMessageID] = ): seq[HistoryEntry] =
## Get recent history entries for sending in causal history.
## Populates retrieval hints for our own messages using the provider callback.
try: try:
if channelId in rm.channels: if channelId in rm.channels:
let channel = rm.channels[channelId] let channel = rm.channels[channelId]
result = channel.messageHistory[max(0, channel.messageHistory.len - n) .. ^1] let recentMessageIds = channel.messageHistory[max(0, channel.messageHistory.len - n) .. ^1]
if rm.onRetrievalHint.isNil():
return toCausalHistory(recentMessageIds)
else:
var entries: seq[HistoryEntry] = @[]
for msgId in recentMessageIds:
let hint = rm.onRetrievalHint(msgId)
entries.add(newHistoryEntry(msgId, hint))
return entries
else: else:
result = @[] return @[]
except Exception: except Exception:
error "Failed to get recent message IDs", error "Failed to get recent history entries",
channelId = channelId, n = n, error = getCurrentExceptionMsg() channelId = channelId, n = n, error = getCurrentExceptionMsg()
result = @[] return @[]
proc checkDependencies*( proc checkDependencies*(
rm: ReliabilityManager, deps: seq[SdsMessageID], channelId: SdsChannelID rm: ReliabilityManager, deps: seq[HistoryEntry], channelId: SdsChannelID
): seq[SdsMessageID] = ): seq[HistoryEntry] =
var missingDeps: seq[SdsMessageID] = @[] ## Check which dependencies are missing from our message history.
var missingDeps: seq[HistoryEntry] = @[]
try: try:
if channelId in rm.channels: if channelId in rm.channels:
let channel = rm.channels[channelId] let channel = rm.channels[channelId]
for depId in deps: for dep in deps:
if depId notin channel.messageHistory: if dep.messageId notin channel.messageHistory:
missingDeps.add(depId) missingDeps.add(dep)
else: else:
# Channel doesn't exist, all deps are missing
missingDeps = deps missingDeps = deps
except Exception: except Exception:
error "Failed to check dependencies", error "Failed to check dependencies",

View File

@ -99,7 +99,7 @@ suite "Reliability Mechanisms":
messageReadyCount += 1, messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1, messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1, missingDepsCount += 1,
) )
@ -112,7 +112,7 @@ suite "Reliability Mechanisms":
let msg2 = SdsMessage( let msg2 = SdsMessage(
messageId: id2, messageId: id2,
lamportTimestamp: 2, lamportTimestamp: 2,
causalHistory: @[id1], # msg2 depends on msg1 causalHistory: toCausalHistory(@[id1]), # msg2 depends on msg1
channelId: testChannel, channelId: testChannel,
content: @[byte(2)], content: @[byte(2)],
bloomFilter: @[], bloomFilter: @[],
@ -121,7 +121,7 @@ suite "Reliability Mechanisms":
let msg3 = SdsMessage( let msg3 = SdsMessage(
messageId: id3, messageId: id3,
lamportTimestamp: 3, lamportTimestamp: 3,
causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2 causalHistory: toCausalHistory(@[id1, id2]), # msg3 depends on both msg1 and msg2
channelId: testChannel, channelId: testChannel,
content: @[byte(3)], content: @[byte(3)],
bloomFilter: @[], bloomFilter: @[],
@ -141,8 +141,8 @@ suite "Reliability Mechanisms":
check: check:
missingDepsCount == 1 # Should trigger missing deps callback missingDepsCount == 1 # Should trigger missing deps callback
missingDeps3.len == 2 # Should be missing both msg1 and msg2 missingDeps3.len == 2 # Should be missing both msg1 and msg2
id1 in missingDeps3 id1 in missingDeps3.getMessageIds()
id2 in missingDeps3 id2 in missingDeps3.getMessageIds()
# Then try processing msg2 (which only depends on msg1) # Then try processing msg2 (which only depends on msg1)
let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get()) let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get())
@ -152,7 +152,7 @@ suite "Reliability Mechanisms":
check: check:
missingDepsCount == 2 # Should have triggered another missing deps callback missingDepsCount == 2 # Should have triggered another missing deps callback
missingDeps2.len == 1 # Should only be missing msg1 missingDeps2.len == 1 # Should only be missing msg1
id1 in missingDeps2 id1 in missingDeps2.getMessageIds()
messageReadyCount == 0 # No messages should be ready yet messageReadyCount == 0 # No messages should be ready yet
# Mark first dependency (msg1) as met # Mark first dependency (msg1) as met
@ -176,7 +176,7 @@ suite "Reliability Mechanisms":
messageReadyCount += 1, messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1, messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1, missingDepsCount += 1,
) )
@ -190,7 +190,7 @@ suite "Reliability Mechanisms":
let msg2 = SdsMessage( let msg2 = SdsMessage(
messageId: "msg2", messageId: "msg2",
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1, lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
causalHistory: @[id1], # Include our message in causal history causalHistory: toCausalHistory(@[id1]), # Include our message in causal history
channelId: testChannel, channelId: testChannel,
content: @[byte(2)], content: @[byte(2)],
bloomFilter: @[] # Test with an empty bloom filter bloomFilter: @[] # Test with an empty bloom filter
@ -216,7 +216,7 @@ suite "Reliability Mechanisms":
discard, discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1, messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard, discard,
) )
@ -251,6 +251,77 @@ suite "Reliability Mechanisms":
check messageSentCount == 1 # Our message should be acknowledged via bloom filter check messageSentCount == 1 # Our message should be acknowledged via bloom filter
test "retrieval hints":
var messageReadyCount = 0
var messageSentCount = 0
var missingDepsCount = 0
rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1,
nil,
proc(messageId: SdsMessageID): seq[byte] =
return cast[seq[byte]]("hint:" & messageId)
)
# Send a first message to populate history
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Send a second message, which should have the first in its causal history
let msg2 = @[byte(2)]
let id2 = "msg2"
let wrap2 = rm.wrapOutgoingMessage(msg2, id2, testChannel)
check wrap2.isOk()
# Check that the wrapped message contains the hint
let unwrappedMsg2 = deserializeMessage(wrap2.get()).get()
check unwrappedMsg2.causalHistory.len > 0
check unwrappedMsg2.causalHistory[0].messageId == id1
check unwrappedMsg2.causalHistory[0].retrievalHint == cast[seq[byte]]("hint:" & id1)
# Create a message with a missing dependency (no retrieval hint)
let msg3 = SdsMessage(
messageId: "msg3",
lamportTimestamp: 3,
causalHistory: toCausalHistory(@["missing-dep"]),
channelId: testChannel,
content: @[byte(3)],
bloomFilter: @[],
)
let serialized3 = serializeMessage(msg3).get()
let unwrapResult3 = rm.unwrapReceivedMessage(serialized3)
check unwrapResult3.isOk()
let (_, missingDeps3, _) = unwrapResult3.get()
check missingDeps3.len == 1
check missingDeps3[0].messageId == "missing-dep"
# The hint is empty because it was not provided by the remote sender
check missingDeps3[0].retrievalHint.len == 0
# Test with a message that HAS a retrieval hint from remote
let msg4 = SdsMessage(
messageId: "msg4",
lamportTimestamp: 4,
causalHistory: @[newHistoryEntry("another-missing", cast[seq[byte]]("remote-hint"))],
channelId: testChannel,
content: @[byte(4)],
bloomFilter: @[],
)
let serialized4 = serializeMessage(msg4).get()
let unwrapResult4 = rm.unwrapReceivedMessage(serialized4)
check unwrapResult4.isOk()
let (_, missingDeps4, _) = unwrapResult4.get()
check missingDeps4.len == 1
check missingDeps4[0].messageId == "another-missing"
# The hint should be preserved from the remote sender
check missingDeps4[0].retrievalHint == cast[seq[byte]]("remote-hint")
# Periodic task & Buffer management tests # Periodic task & Buffer management tests
suite "Periodic Tasks & Buffer Management": suite "Periodic Tasks & Buffer Management":
var rm: ReliabilityManager var rm: ReliabilityManager
@ -273,7 +344,7 @@ suite "Periodic Tasks & Buffer Management":
discard, discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1, messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard, discard,
) )
@ -291,7 +362,7 @@ suite "Periodic Tasks & Buffer Management":
let ackMsg = SdsMessage( let ackMsg = SdsMessage(
messageId: "ack1", messageId: "ack1",
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1, lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
causalHistory: @["msg0", "msg2", "msg4"], causalHistory: toCausalHistory(@["msg0", "msg2", "msg4"]),
channelId: testChannel, channelId: testChannel,
content: @[byte(100)], content: @[byte(100)],
bloomFilter: @[], bloomFilter: @[],
@ -328,7 +399,7 @@ suite "Periodic Tasks & Buffer Management":
discard, discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1, messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard, discard,
) )
@ -377,7 +448,7 @@ suite "Periodic Tasks & Buffer Management":
discard, discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard, discard,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard, discard,
proc() {.gcsafe.} = proc() {.gcsafe.} =
syncCallCount += 1, syncCallCount += 1,
@ -420,7 +491,7 @@ suite "Special Cases Handling":
let msgInvalid = SdsMessage( let msgInvalid = SdsMessage(
messageId: "invalid-bf", messageId: "invalid-bf",
lamportTimestamp: 1, lamportTimestamp: 1,
causalHistory: @[], causalHistory: toCausalHistory(@[]),
channelId: testChannel, channelId: testChannel,
content: @[byte(1)], content: @[byte(1)],
bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data
@ -443,7 +514,7 @@ suite "Special Cases Handling":
messageReadyCount += 1, messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard, discard,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard, discard,
) )
@ -451,7 +522,7 @@ suite "Special Cases Handling":
let msg = SdsMessage( let msg = SdsMessage(
messageId: "dup-msg", messageId: "dup-msg",
lamportTimestamp: 1, lamportTimestamp: 1,
causalHistory: @[], causalHistory: toCausalHistory(@[]),
channelId: testChannel, channelId: testChannel,
content: @[byte(1)], content: @[byte(1)],
bloomFilter: @[], bloomFilter: @[],
@ -601,7 +672,7 @@ suite "Multi-Channel ReliabilityManager Tests":
readyMessageCount += 1, readyMessageCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
sentMessageCount += 1, sentMessageCount += 1,
proc(messageId: SdsMessageID, deps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = proc(messageId: SdsMessageID, deps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1 missingDepsCount += 1
) )
@ -624,7 +695,7 @@ suite "Multi-Channel ReliabilityManager Tests":
let ackMsg1 = SdsMessage( let ackMsg1 = SdsMessage(
messageId: "ack1", messageId: "ack1",
lamportTimestamp: rm.channels[channel1].lamportTimestamp + 1, lamportTimestamp: rm.channels[channel1].lamportTimestamp + 1,
causalHistory: @[msgId1], # Acknowledge msg1 causalHistory: toCausalHistory(@[msgId1]), # Acknowledge msg1
channelId: channel1, channelId: channel1,
content: @[byte(100)], content: @[byte(100)],
bloomFilter: @[], bloomFilter: @[],
@ -633,7 +704,7 @@ suite "Multi-Channel ReliabilityManager Tests":
let ackMsg2 = SdsMessage( let ackMsg2 = SdsMessage(
messageId: "ack2", messageId: "ack2",
lamportTimestamp: rm.channels[channel2].lamportTimestamp + 1, lamportTimestamp: rm.channels[channel2].lamportTimestamp + 1,
causalHistory: @[msgId2], # Acknowledge msg2 causalHistory: toCausalHistory(@[msgId2]), # Acknowledge msg2
channelId: channel2, channelId: channel2,
content: @[byte(101)], content: @[byte(101)],
bloomFilter: @[], bloomFilter: @[],

@ -1 +1 @@
Subproject commit 0be0663e1af76e869837226a4ef3e586fcc737d3 Subproject commit 0e7a764edae92b224326b3700f062702489ce2b4