From 975fd83dd1e068163b2d02348ba7560bf652abf1 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 9 Apr 2025 18:20:28 +0300 Subject: [PATCH 01/20] adding makefile --- .gitignore | 25 +++++++++++++++++++++++++ go.mod | 3 +++ sds/Makefile | 43 +++++++++++++++++++++++++++++++++++++++++++ sds/sds.go | 1 + 4 files changed, 72 insertions(+) create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 sds/Makefile create mode 100644 sds/sds.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eea2e3e --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# Generated dependencies and cache +third_party +nimcache \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1e64416 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/waku-org/waku-go-bindings + +go 1.22.10 diff --git a/sds/Makefile b/sds/Makefile new file mode 100644 index 0000000..8976491 --- /dev/null +++ b/sds/Makefile @@ -0,0 +1,43 @@ +# Makefile for Waku Go Bindings + +# Directories +THIRD_PARTY_DIR := ../third_party +NIM_SDS_REPO := https://github.com/waku-org/nim-sds +NIM_SDS_DIR := $(THIRD_PARTY_DIR)/nim-sds + +.PHONY: all clean prepare build-libsds build + +# Default target +all: build + +# Prepare third_party directory and clone nim-sds +# TODO: remove the "git checkout gabrielmer-feat-init-implementation" part +prepare: + @echo "Creating third_party directory..." + @mkdir -p $(THIRD_PARTY_DIR) + + @echo "Cloning nim-sds repository..." + @if [ ! -d "$(NIM_SDS_DIR)" ]; then \ + cd $(THIRD_PARTY_DIR) && \ + git clone $(NIM_SDS_REPO) && \ + cd $(NIM_SDS_DIR) && \ + git checkout gabrielmer-feat-init-implementation; \ + else \ + echo "nim-sds repository already exists."; \ + fi + +# Build libsds +build-libsds: prepare + @echo "Building libsds..." + @cd $(NIM_SDS_DIR) && make libsds + +# Build SDS Go Bindings +build: build-libsds + @echo "Building SDS Go Bindings..." + go build ./... + +# Clean up generated files +clean: + @echo "Cleaning up..." + @rm -rf $(THIRD_PARTY_DIR) + @rm -f sds-go-bindings \ No newline at end of file diff --git a/sds/sds.go b/sds/sds.go new file mode 100644 index 0000000..bc13b40 --- /dev/null +++ b/sds/sds.go @@ -0,0 +1 @@ +package sds From 355de25fec1578e519895eb4beabb69a41cdddbc Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 9 Apr 2025 19:04:18 +0300 Subject: [PATCH 02/20] initial NewReliabilityManager implementation --- go.mod | 5 ++ go.sum | 4 ++ sds/logging.go | 47 +++++++++++++ sds/sds.go | 187 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 243 insertions(+) create mode 100644 go.sum create mode 100644 sds/logging.go diff --git a/go.mod b/go.mod index 1e64416..7eec51b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module github.com/waku-org/waku-go-bindings go 1.22.10 + +require ( + go.uber.org/multierr v1.10.0 // indirect + go.uber.org/zap v1.27.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e1a34f3 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= diff --git a/sds/logging.go b/sds/logging.go new file mode 100644 index 0000000..71e7295 --- /dev/null +++ b/sds/logging.go @@ -0,0 +1,47 @@ +package sds + +import ( + "sync" + + "go.uber.org/zap" +) + +var ( + once sync.Once + sugar *zap.SugaredLogger +) + +func _getLogger() *zap.SugaredLogger { + once.Do(func() { + + config := zap.NewDevelopmentConfig() + l, err := config.Build() + if err != nil { + panic(err) + } + sugar = l.Sugar() + }) + return sugar +} + +func SetLogger(newLogger *zap.Logger) { + once.Do(func() {}) + + sugar = newLogger.Sugar() +} + +func Debug(msg string, args ...interface{}) { + _getLogger().Debugf(msg, args...) +} + +func Info(msg string, args ...interface{}) { + _getLogger().Infof(msg, args...) +} + +func Warn(msg string, args ...interface{}) { + _getLogger().Warnf(msg, args...) +} + +func Error(msg string, args ...interface{}) { + _getLogger().Errorf(msg, args...) +} diff --git a/sds/sds.go b/sds/sds.go index bc13b40..981cc86 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -1 +1,188 @@ package sds + +/* + #cgo LDFLAGS: -L../third_party/nim-sds/build/ -lsds + #cgo LDFLAGS: -L../third_party/nim-sds -Wl,-rpath,../third_party/nim-sds/build/ + + #include "../third_party/nim-sds/library/libsds.h" + #include + #include + + extern void globalEventCallback(int ret, char* msg, size_t len, void* userData); + + typedef struct { + int ret; + char* msg; + size_t len; + void* ffiWg; + } Resp; + + static void* allocResp(void* wg) { + Resp* r = calloc(1, sizeof(Resp)); + r->ffiWg = wg; + return r; + } + + static void freeResp(void* resp) { + if (resp != NULL) { + free(resp); + } + } + + static char* getMyCharPtr(void* resp) { + if (resp == NULL) { + return NULL; + } + Resp* m = (Resp*) resp; + return m->msg; + } + + static size_t getMyCharLen(void* resp) { + if (resp == NULL) { + return 0; + } + Resp* m = (Resp*) resp; + return m->len; + } + + static int getRet(void* resp) { + if (resp == NULL) { + return 0; + } + Resp* m = (Resp*) resp; + return m->ret; + } + + // resp must be set != NULL in case interest on retrieving data from the callback + void GoCallback(int ret, char* msg, size_t len, void* resp); + + static void* cGoNewReliabilityManager(const char* channelId, void* resp) { + // We pass NULL because we are not interested in retrieving data from this callback + void* ret = NewReliabilityManager(channelId, (SdsCallBack) GoCallback, resp); + return ret; + } +*/ +import "C" +import ( + "encoding/json" + "errors" + "fmt" + "sync" + "time" + "unsafe" +) + +const requestTimeout = 30 * time.Second + +//export GoCallback +func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { + if resp != nil { + m := (*C.Resp)(resp) + m.ret = ret + m.msg = msg + m.len = len + wg := (*sync.WaitGroup)(m.ffiWg) + wg.Done() + } +} + +// ReliabilityManager represents an instance of a nim-sds ReliabilityManager +type ReliabilityManager struct { + rmCtx unsafe.Pointer + rmName string + channelId string +} + +func NewReliabilityManager(channelId string, rmName string) (*ReliabilityManager, error) { + Debug("Creating new WakuNode: %v", rmName) + rm := &ReliabilityManager{ + channelId: channelId, + rmName: rmName, + } + + wg := sync.WaitGroup{} + + var cChannelId = C.CString(string(channelId)) + var resp = C.allocResp(unsafe.Pointer(&wg)) + + defer C.free(unsafe.Pointer(cChannelId)) + defer C.freeResp(resp) + + if C.getRet(resp) != C.RET_OK { + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("error NewReliabilityManager for %s: %v", rmName, errMsg) + return nil, errors.New(errMsg) + } + + wg.Add(1) + rm.rmCtx = C.cGoNewReliabilityManager(cChannelId, resp) + wg.Wait() + + // C.cGoWakuSetEventCallback(n.wakuCtx) TODO + registerReliabilityManager(rm) + + Debug("Successfully created WakuNode: %s", rmName) + return rm, nil +} + +// The event callback sends back the rm ctx to know to which +// rm is the event being emited for. Since we only have a global +// callback in the go side, We register all the rm's that we create +// so we can later obtain which instance of `ReliabilityManager` it should +// be invoked depending on the ctx received + +var rmRegistry map[unsafe.Pointer]*ReliabilityManager + +func init() { + rmRegistry = make(map[unsafe.Pointer]*ReliabilityManager) +} + +func registerReliabilityManager(rm *ReliabilityManager) { + _, ok := rmRegistry[rm.rmCtx] + if !ok { + rmRegistry[rm.rmCtx] = rm + } +} + +func unregisterReliabilityManager(rm *ReliabilityManager) { + delete(rmRegistry, rm.rmCtx) +} + +//export globalEventCallback +func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { + if callerRet == C.RET_OK { + eventStr := C.GoStringN(msg, C.int(len)) + rm, ok := rmRegistry[userData] // userData contains node's ctx + if ok { + rm.OnEvent(eventStr) + } + } else { + if len != 0 { + errMsg := C.GoStringN(msg, C.int(len)) + Error("globalEventCallback retCode not ok, retCode: %v: %v", callerRet, errMsg) + } else { + Error("globalEventCallback retCode not ok, retCode: %v", callerRet) + } + } +} + +type jsonEvent struct { + EventType string `json:"eventType"` +} + +func (n *ReliabilityManager) OnEvent(eventStr string) { + jsonEvent := jsonEvent{} + err := json.Unmarshal([]byte(eventStr), &jsonEvent) + if err != nil { + Error("could not unmarshal sds event string: %v", err) + + return + } + + switch jsonEvent.EventType { + case "event 1": + fmt.Println("-------- received event 1") + case "event 2": + fmt.Println("-------- received event 1") + } +} From e4d22bd6cfa4968a50b34c1c15f597f13fae3d8f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 10 Apr 2025 12:45:33 +0300 Subject: [PATCH 03/20] adding event callback --- sds/sds.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/sds/sds.go b/sds/sds.go index 981cc86..fffccf4 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -61,6 +61,22 @@ package sds void* ret = NewReliabilityManager(channelId, (SdsCallBack) GoCallback, resp); return ret; } + + static void cGoSetEventCallback(void* rmCtx) { + // The 'globalEventCallback' Go function is shared amongst all possible Reliability Manager instances. + + // Given that the 'globalEventCallback' is shared, we pass again the + // rmCtx instance but in this case is needed to pick up the correct method + // that will handle the event. + + // In other words, for every call libsds makes to globalEventCallback, + // the 'userData' parameter will bring the context of the rm that registered + // that globalEventCallback. + + // This technique is needed because cgo only allows to export Go functions and not methods. + + SetEventCallback(rmCtx, (SdsCallBack) globalEventCallback, rmCtx); + } */ import "C" import ( @@ -118,7 +134,7 @@ func NewReliabilityManager(channelId string, rmName string) (*ReliabilityManager rm.rmCtx = C.cGoNewReliabilityManager(cChannelId, resp) wg.Wait() - // C.cGoWakuSetEventCallback(n.wakuCtx) TODO + C.cGoSetEventCallback(rm.rmCtx) registerReliabilityManager(rm) Debug("Successfully created WakuNode: %s", rmName) From 74cffc39dd49d900a5a4f2ece5e56c934e7a4bf6 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 10 Apr 2025 14:11:29 +0300 Subject: [PATCH 04/20] adding destroy function and test --- go.mod | 11 +++++++++-- go.sum | 16 ++++++++++++++++ sds/sds.go | 49 +++++++++++++++++++++++++++++++++++++++++-------- sds/sds_test.go | 22 ++++++++++++++++++++++ 4 files changed, 88 insertions(+), 10 deletions(-) create mode 100644 sds/sds_test.go diff --git a/go.mod b/go.mod index 7eec51b..07b99e5 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,13 @@ module github.com/waku-org/waku-go-bindings go 1.22.10 require ( - go.uber.org/multierr v1.10.0 // indirect - go.uber.org/zap v1.27.0 // indirect + github.com/stretchr/testify v1.8.1 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index e1a34f3..ea719a5 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,20 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sds/sds.go b/sds/sds.go index fffccf4..17bae21 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -77,6 +77,10 @@ package sds SetEventCallback(rmCtx, (SdsCallBack) globalEventCallback, rmCtx); } + + static void cGoCleanupReliabilityManager(void* rmCtx, void* resp) { + CleanupReliabilityManager(rmCtx, (SdsCallBack) GoCallback, resp); + } */ import "C" import ( @@ -105,15 +109,15 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // ReliabilityManager represents an instance of a nim-sds ReliabilityManager type ReliabilityManager struct { rmCtx unsafe.Pointer - rmName string + name string channelId string } -func NewReliabilityManager(channelId string, rmName string) (*ReliabilityManager, error) { - Debug("Creating new WakuNode: %v", rmName) +func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, error) { + Debug("Creating new Reliability Manager: %v", name) rm := &ReliabilityManager{ channelId: channelId, - rmName: rmName, + name: name, } wg := sync.WaitGroup{} @@ -126,7 +130,7 @@ func NewReliabilityManager(channelId string, rmName string) (*ReliabilityManager if C.getRet(resp) != C.RET_OK { errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("error NewReliabilityManager for %s: %v", rmName, errMsg) + Error("error NewReliabilityManager for %s: %v", name, errMsg) return nil, errors.New(errMsg) } @@ -137,7 +141,7 @@ func NewReliabilityManager(channelId string, rmName string) (*ReliabilityManager C.cGoSetEventCallback(rm.rmCtx) registerReliabilityManager(rm) - Debug("Successfully created WakuNode: %s", rmName) + Debug("Successfully created Reliability Manager: %s", name) return rm, nil } @@ -168,7 +172,7 @@ func unregisterReliabilityManager(rm *ReliabilityManager) { func globalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) { if callerRet == C.RET_OK { eventStr := C.GoStringN(msg, C.int(len)) - rm, ok := rmRegistry[userData] // userData contains node's ctx + rm, ok := rmRegistry[userData] // userData contains rm's ctx if ok { rm.OnEvent(eventStr) } @@ -186,7 +190,7 @@ type jsonEvent struct { EventType string `json:"eventType"` } -func (n *ReliabilityManager) OnEvent(eventStr string) { +func (rm *ReliabilityManager) OnEvent(eventStr string) { jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) if err != nil { @@ -202,3 +206,32 @@ func (n *ReliabilityManager) OnEvent(eventStr string) { fmt.Println("-------- received event 1") } } + +func (rm *ReliabilityManager) Cleanup() error { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to destroy %v", err) + return err + } + + Debug("Destroying %v", rm.name) + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoCleanupReliabilityManager(rm.rmCtx, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + unregisterReliabilityManager(rm) + Debug("Successfully destroyed %s", rm.name) + return nil + } + + errMsg := "error CleanupReliabilityManager: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to destroy %v: %v", rm.name, errMsg) + + return errors.New(errMsg) +} diff --git a/sds/sds_test.go b/sds/sds_test.go new file mode 100644 index 0000000..8158de8 --- /dev/null +++ b/sds/sds_test.go @@ -0,0 +1,22 @@ +package sds + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCreateAndCleanup(t *testing.T) { + + rm1, err := NewReliabilityManager("my-channel-id", "rm1") + require.NoError(t, err) + + rm2, err := NewReliabilityManager("my-channel-id-2", "rm2") + require.NoError(t, err) + + err = rm1.Cleanup() + require.NoError(t, err) + + err = rm2.Cleanup() + require.NoError(t, err) +} From f31ac1a8f7da01f70bc7143e0e1292fa4f47d627 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 10 Apr 2025 14:33:17 +0300 Subject: [PATCH 05/20] adding reset function --- sds/sds.go | 40 ++++++++++++++++++++++++++++++++++++---- sds/sds_test.go | 15 ++++++++++++++- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index 17bae21..65be925 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -81,6 +81,10 @@ package sds static void cGoCleanupReliabilityManager(void* rmCtx, void* resp) { CleanupReliabilityManager(rmCtx, (SdsCallBack) GoCallback, resp); } + + static void cGoResetReliabilityManager(void* rmCtx, void* resp) { + ResetReliabilityManager(rmCtx, (SdsCallBack) GoCallback, resp); + } */ import "C" import ( @@ -210,11 +214,11 @@ func (rm *ReliabilityManager) OnEvent(eventStr string) { func (rm *ReliabilityManager) Cleanup() error { if rm == nil { err := errors.New("reliability manager is nil") - Error("Failed to destroy %v", err) + Error("Failed to cleanup %v", err) return err } - Debug("Destroying %v", rm.name) + Debug("Cleaning up %v", rm.name) wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -226,12 +230,40 @@ func (rm *ReliabilityManager) Cleanup() error { if C.getRet(resp) == C.RET_OK { unregisterReliabilityManager(rm) - Debug("Successfully destroyed %s", rm.name) + Debug("Successfully cleaned up %s", rm.name) return nil } errMsg := "error CleanupReliabilityManager: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to destroy %v: %v", rm.name, errMsg) + Error("Failed to cleanup %v: %v", rm.name, errMsg) + + return errors.New(errMsg) +} + +func (rm *ReliabilityManager) Reset() error { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to reset %v", err) + return err + } + + Debug("Resetting %v", rm.name) + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoResetReliabilityManager(rm.rmCtx, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + Debug("Successfully resetted %s", rm.name) + return nil + } + + errMsg := "error ResetReliabilityManager: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to reset %v: %v", rm.name, errMsg) return errors.New(errMsg) } diff --git a/sds/sds_test.go b/sds/sds_test.go index 8158de8..e71e53e 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -8,7 +8,7 @@ import ( func TestCreateAndCleanup(t *testing.T) { - rm1, err := NewReliabilityManager("my-channel-id", "rm1") + rm1, err := NewReliabilityManager("my-channel-id-1", "rm1") require.NoError(t, err) rm2, err := NewReliabilityManager("my-channel-id-2", "rm2") @@ -20,3 +20,16 @@ func TestCreateAndCleanup(t *testing.T) { err = rm2.Cleanup() require.NoError(t, err) } + +func TestReset(t *testing.T) { + + rm, err := NewReliabilityManager("my-channel-id", "rm") + require.NoError(t, err) + + err = rm.Reset() + require.NoError(t, err) + + err = rm.Cleanup() + require.NoError(t, err) + +} From 8ec288388947f28fba8470673e370fd63316abb3 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Apr 2025 13:13:56 +0300 Subject: [PATCH 06/20] adding WrapOutgoingMessage --- sds/sds.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++++ sds/sds_test.go | 16 +++++++++++ 2 files changed, 90 insertions(+) diff --git a/sds/sds.go b/sds/sds.go index 65be925..62a3e13 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -85,12 +85,27 @@ package sds static void cGoResetReliabilityManager(void* rmCtx, void* resp) { ResetReliabilityManager(rmCtx, (SdsCallBack) GoCallback, resp); } + + static void cGoWrapOutgoingMessage(void* rmCtx, + void* message, + size_t messageLen, + const char* messageId, + void* resp) { + WrapOutgoingMessage(rmCtx, + message, + messageLen, + messageId, + (SdsCallBack) GoCallback, + resp); + } */ import "C" import ( "encoding/json" "errors" "fmt" + "strconv" + "strings" "sync" "time" "unsafe" @@ -117,6 +132,8 @@ type ReliabilityManager struct { channelId string } +type MessageID string + func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, error) { Debug("Creating new Reliability Manager: %v", name) rm := &ReliabilityManager{ @@ -267,3 +284,60 @@ func (rm *ReliabilityManager) Reset() error { return errors.New(errMsg) } + +func (rm *ReliabilityManager) WrapOutgoingMessage(message []byte, messageId MessageID) ([]byte, error) { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to wrap outgoing message %v", err) + return nil, err + } + + Debug("Wraping outgoing message %v", messageId) + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + cMessageId := C.CString(string(messageId)) + defer C.free(unsafe.Pointer(cMessageId)) + + var cMessagePtr unsafe.Pointer + if len(message) > 0 { + cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed + defer C.free(cMessagePtr) + } else { + cMessagePtr = nil + } + cMessageLen := C.size_t(len(message)) + + wg.Add(1) + C.cGoWrapOutgoingMessage(rm.rmCtx, cMessagePtr, cMessageLen, cMessageId, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + resStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if resStr == "" { + Debug("Received empty res string for messageId: %v", messageId) + return nil, nil + } + Debug("Successfully wrapped message %s", messageId) + + parts := strings.Split(resStr, ",") + bytes := make([]byte, len(parts)) + + for i, part := range parts { + n, err := strconv.Atoi(strings.TrimSpace(part)) + if err != nil { + panic(err) + } + bytes[i] = byte(n) + } + + return bytes, nil + } + + errMsg := "error WrapOutgoingMessage: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to wrap message %v: %v", messageId, errMsg) + + return nil, errors.New(errMsg) +} diff --git a/sds/sds_test.go b/sds/sds_test.go index e71e53e..12dfe23 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -1,6 +1,7 @@ package sds import ( + "fmt" "testing" "github.com/stretchr/testify/require" @@ -33,3 +34,18 @@ func TestReset(t *testing.T) { require.NoError(t, err) } + +func TestWrap(t *testing.T) { + + rm, err := NewReliabilityManager("my-channel-id", "rm") + require.NoError(t, err) + defer rm.Cleanup() + + msg := []byte{1, 2, 3, 4, 5} + + res, err := rm.WrapOutgoingMessage(msg, "my-message-id") + require.NoError(t, err) + + fmt.Println("---------- len(res): ", len(res)) + +} From 86c8595b0cf255dc5ca3e81ab25f510332172b64 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Apr 2025 17:07:47 +0300 Subject: [PATCH 07/20] adding UnwrapReceivedMessage and removing rm name --- sds/sds.go | 87 +++++++++++++++++++++++++++++++++++++++++-------- sds/sds_test.go | 30 +++++++++-------- sds/types.go | 8 +++++ 3 files changed, 97 insertions(+), 28 deletions(-) create mode 100644 sds/types.go diff --git a/sds/sds.go b/sds/sds.go index 62a3e13..ed06581 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -98,6 +98,16 @@ package sds (SdsCallBack) GoCallback, resp); } + static void cGoUnwrapReceivedMessage(void* rmCtx, + void* message, + size_t messageLen, + void* resp) { + UnwrapReceivedMessage(rmCtx, + message, + messageLen, + (SdsCallBack) GoCallback, + resp); + } */ import "C" import ( @@ -128,17 +138,13 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // ReliabilityManager represents an instance of a nim-sds ReliabilityManager type ReliabilityManager struct { rmCtx unsafe.Pointer - name string channelId string } -type MessageID string - -func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, error) { - Debug("Creating new Reliability Manager: %v", name) +func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { + Debug("Creating new Reliability Manager") rm := &ReliabilityManager{ channelId: channelId, - name: name, } wg := sync.WaitGroup{} @@ -151,7 +157,7 @@ func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, if C.getRet(resp) != C.RET_OK { errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("error NewReliabilityManager for %s: %v", name, errMsg) + Error("error NewReliabilityManager: %v", errMsg) return nil, errors.New(errMsg) } @@ -162,7 +168,7 @@ func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, C.cGoSetEventCallback(rm.rmCtx) registerReliabilityManager(rm) - Debug("Successfully created Reliability Manager: %s", name) + Debug("Successfully created Reliability Manager") return rm, nil } @@ -235,7 +241,7 @@ func (rm *ReliabilityManager) Cleanup() error { return err } - Debug("Cleaning up %v", rm.name) + Debug("Cleaning up reliability manager") wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -247,12 +253,12 @@ func (rm *ReliabilityManager) Cleanup() error { if C.getRet(resp) == C.RET_OK { unregisterReliabilityManager(rm) - Debug("Successfully cleaned up %s", rm.name) + Debug("Successfully cleaned up reliability manager") return nil } errMsg := "error CleanupReliabilityManager: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to cleanup %v: %v", rm.name, errMsg) + Error("Failed to cleanup reliability manager: %v", errMsg) return errors.New(errMsg) } @@ -264,7 +270,7 @@ func (rm *ReliabilityManager) Reset() error { return err } - Debug("Resetting %v", rm.name) + Debug("Resetting reliability manager") wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -275,12 +281,12 @@ func (rm *ReliabilityManager) Reset() error { wg.Wait() if C.getRet(resp) == C.RET_OK { - Debug("Successfully resetted %s", rm.name) + Debug("Successfully resetted reliability manager") return nil } errMsg := "error ResetReliabilityManager: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to reset %v: %v", rm.name, errMsg) + Error("Failed to reset reliability manager: %v", errMsg) return errors.New(errMsg) } @@ -341,3 +347,56 @@ func (rm *ReliabilityManager) WrapOutgoingMessage(message []byte, messageId Mess return nil, errors.New(errMsg) } + +func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedMessage, error) { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to unwrap received message %v", err) + return nil, err + } + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + var cMessagePtr unsafe.Pointer + if len(message) > 0 { + cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed + defer C.free(cMessagePtr) + } else { + cMessagePtr = nil + } + cMessageLen := C.size_t(len(message)) + + wg.Add(1) + C.cGoUnwrapReceivedMessage(rm.rmCtx, cMessagePtr, cMessageLen, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + resStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if resStr == "" { + Debug("Received empty res string") + return nil, nil + } + Debug("Successfully unwrapped message") + + fmt.Println("------------ UnwrapReceivedMessage res: ", resStr) + + unwrappedMessage := UnwrappedMessage{} + err := json.Unmarshal([]byte(resStr), &unwrappedMessage) + if err != nil { + Error("Failed to unmarshal unwrapped message") + return nil, err + } + + fmt.Println(unwrappedMessage.Message) + fmt.Println(unwrappedMessage.MissingDeps) + + return &unwrappedMessage, nil + } + + errMsg := "error UnwrapReceivedMessage: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to unwrap message: %v", errMsg) + + return nil, errors.New(errMsg) +} diff --git a/sds/sds_test.go b/sds/sds_test.go index 12dfe23..d17fbfb 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -1,7 +1,6 @@ package sds import ( - "fmt" "testing" "github.com/stretchr/testify/require" @@ -9,22 +8,17 @@ import ( func TestCreateAndCleanup(t *testing.T) { - rm1, err := NewReliabilityManager("my-channel-id-1", "rm1") - require.NoError(t, err) - - rm2, err := NewReliabilityManager("my-channel-id-2", "rm2") + rm1, err := NewReliabilityManager("my-channel-id-1") require.NoError(t, err) err = rm1.Cleanup() require.NoError(t, err) - err = rm2.Cleanup() - require.NoError(t, err) } func TestReset(t *testing.T) { - rm, err := NewReliabilityManager("my-channel-id", "rm") + rm, err := NewReliabilityManager("my-channel-id") require.NoError(t, err) err = rm.Reset() @@ -35,17 +29,25 @@ func TestReset(t *testing.T) { } -func TestWrap(t *testing.T) { - - rm, err := NewReliabilityManager("my-channel-id", "rm") +// Test wrapping and unwrapping a simple message +func TestWrapUnwrap(t *testing.T) { + channelID := "test-wrap-unwrap" + rm, err := NewReliabilityManager(channelID) require.NoError(t, err) defer rm.Cleanup() - msg := []byte{1, 2, 3, 4, 5} + originalPayload := []byte("hello reliability") + messageID := MessageID("msg-wrap-1") - res, err := rm.WrapOutgoingMessage(msg, "my-message-id") + wrappedMsg, err := rm.WrapOutgoingMessage(originalPayload, messageID) require.NoError(t, err) - fmt.Println("---------- len(res): ", len(res)) + require.Greater(t, len(wrappedMsg), 0, "Expected non-empty wrapped message") + // Simulate receiving the wrapped message + unwrappedMessage, err := rm.UnwrapReceivedMessage(wrappedMsg) + require.NoError(t, err) + + require.Equal(t, string(*unwrappedMessage.Message), string(originalPayload), "Expected unwrapped and original payloads to be equal") + require.Equal(t, len(*unwrappedMessage.MissingDeps), 0, "Expexted to be no missing dependencies") } diff --git a/sds/types.go b/sds/types.go new file mode 100644 index 0000000..cbe3b9b --- /dev/null +++ b/sds/types.go @@ -0,0 +1,8 @@ +package sds + +type MessageID string + +type UnwrappedMessage struct { + Message *[]byte `json:"message"` + MissingDeps *[]MessageID `json:"missingDeps"` +} From 188ef6faf013fe79a08fd9fbbc7de7a314b27775 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Apr 2025 17:12:39 +0300 Subject: [PATCH 08/20] lifecycle test --- sds/sds_test.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/sds/sds_test.go b/sds/sds_test.go index d17fbfb..e1c681d 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -6,27 +6,17 @@ import ( "github.com/stretchr/testify/require" ) -func TestCreateAndCleanup(t *testing.T) { - - rm1, err := NewReliabilityManager("my-channel-id-1") +// Test basic creation, cleanup, and reset +func TestLifecycle(t *testing.T) { + channelID := "test-lifecycle" + rm, err := NewReliabilityManager(channelID) require.NoError(t, err) + require.NotNil(t, rm, "Expected ReliabilityManager to be not nil") - err = rm1.Cleanup() - require.NoError(t, err) - -} - -func TestReset(t *testing.T) { - - rm, err := NewReliabilityManager("my-channel-id") - require.NoError(t, err) + defer rm.Cleanup() // Ensure cleanup even on test failure err = rm.Reset() require.NoError(t, err) - - err = rm.Cleanup() - require.NoError(t, err) - } // Test wrapping and unwrapping a simple message From 8b5c79b35136cbcd902aee74583f2075927b801e Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Apr 2025 18:28:27 +0300 Subject: [PATCH 09/20] initial implementation of dependencies met (not working yet) --- sds/sds.go | 64 +++++++++++++++++++++++++++++++++++++++++++++---- sds/sds_test.go | 48 +++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 5 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index ed06581..e626b86 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -108,6 +108,18 @@ package sds (SdsCallBack) GoCallback, resp); } + + static void cGoMarkDependenciesMet(void* rmCtx, + char** messageIDs, + size_t count, + void* resp) { + UnwrapReceivedMessage(rmCtx, + messageIDs, + count, + (SdsCallBack) GoCallback, + resp); + } + */ import "C" import ( @@ -380,8 +392,6 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM } Debug("Successfully unwrapped message") - fmt.Println("------------ UnwrapReceivedMessage res: ", resStr) - unwrappedMessage := UnwrappedMessage{} err := json.Unmarshal([]byte(resStr), &unwrappedMessage) if err != nil { @@ -389,9 +399,6 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM return nil, err } - fmt.Println(unwrappedMessage.Message) - fmt.Println(unwrappedMessage.MissingDeps) - return &unwrappedMessage, nil } @@ -400,3 +407,50 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM return nil, errors.New(errMsg) } + +// MarkDependenciesMet informs the library that dependencies are met +func (rm *ReliabilityManager) MarkDependenciesMet(messageIDs []MessageID) error { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to mark dependencies met %v", err) + return err + } + + if len(messageIDs) == 0 { + return nil // Nothing to do + } + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + // Convert Go string slice to C array of C strings (char**) + cMessageIDs := make([]*C.char, len(messageIDs)) + for i, id := range messageIDs { + cMessageIDs[i] = C.CString(string(id)) + defer C.free(unsafe.Pointer(cMessageIDs[i])) // Ensure each CString is freed + } + + // Create a pointer (**C.char) to the first element of the slice + var cMessageIDsPtr **C.char + if len(cMessageIDs) > 0 { + cMessageIDsPtr = &cMessageIDs[0] + } else { + cMessageIDsPtr = nil // Handle empty slice case + } + + wg.Add(1) + // Pass the pointer variable (cMessageIDsPtr) directly, which is of type **C.char + C.cGoMarkDependenciesMet(rm.rmCtx, cMessageIDsPtr, C.size_t(len(messageIDs)), resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + Debug("Successfully marked dependencies as met") + return nil + } + + errMsg := "error MarkDependenciesMet: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to mark dependencies as met: %v", errMsg) + + return errors.New(errMsg) +} diff --git a/sds/sds_test.go b/sds/sds_test.go index e1c681d..cab0155 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -41,3 +41,51 @@ func TestWrapUnwrap(t *testing.T) { require.Equal(t, string(*unwrappedMessage.Message), string(originalPayload), "Expected unwrapped and original payloads to be equal") require.Equal(t, len(*unwrappedMessage.MissingDeps), 0, "Expexted to be no missing dependencies") } + +// Test dependency handling +func TestDependencies(t *testing.T) { + channelID := "test-deps" + rm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer rm.Cleanup() + + // 1. Send message 1 (will become a dependency) + payload1 := []byte("message one") + msgID1 := MessageID("msg-dep-1") + wrappedMsg1, err := rm.WrapOutgoingMessage(payload1, msgID1) + require.NoError(t, err) + + // Simulate receiving msg1 to add it to history (implicitly acknowledges it) + _, err = rm.UnwrapReceivedMessage(wrappedMsg1) + require.NoError(t, err) + + // 2. Send message 2 (depends on message 1 implicitly via causal history) + payload2 := []byte("message two") + msgID2 := MessageID("msg-dep-2") + wrappedMsg2, err := rm.WrapOutgoingMessage(payload2, msgID2) + require.NoError(t, err) + + // 3. Create a new manager to simulate a different peer receiving msg2 without msg1 + rm2, err := NewReliabilityManager(channelID) // Same channel ID + require.NoError(t, err) + defer rm2.Cleanup() + + // 4. Unwrap message 2 on the second manager - should report msg1 as missing + unwrappedMessage2, err := rm2.UnwrapReceivedMessage(wrappedMsg2) + require.NoError(t, err) + + require.Greater(t, len(*unwrappedMessage2.MissingDeps), 0, "Expected missing dependencies, got none") + + foundDep1 := false + for _, dep := range *unwrappedMessage2.MissingDeps { + if dep == msgID1 { + foundDep1 = true + break + } + } + require.True(t, foundDep1, "Expected missing dependency %q, got %v", msgID1, *unwrappedMessage2.MissingDeps) + + // 5. Mark the dependency as met + err = rm2.MarkDependenciesMet([]MessageID{msgID1}) + require.NoError(t, err) +} From 45e4f122b2deb00e34a33787e0537725b74372ae Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Apr 2025 19:03:29 +0300 Subject: [PATCH 10/20] fix copy-paste bug --- sds/sds.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sds/sds.go b/sds/sds.go index e626b86..82a7bc4 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -113,7 +113,7 @@ package sds char** messageIDs, size_t count, void* resp) { - UnwrapReceivedMessage(rmCtx, + MarkDependenciesMet(rmCtx, messageIDs, count, (SdsCallBack) GoCallback, From 94fe0d4919108f15601e63b49d157add45ad0c6f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 11:51:28 +0300 Subject: [PATCH 11/20] adding StartPeriodicTasks --- sds/sds.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sds/sds.go b/sds/sds.go index 82a7bc4..18f9ed2 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -120,6 +120,10 @@ package sds resp); } + static void cGoStartPeriodicTasks(void* rmCtx, void* resp) { + StartPeriodicTasks(rmCtx, (SdsCallBack) GoCallback, resp); + } + */ import "C" import ( @@ -454,3 +458,31 @@ func (rm *ReliabilityManager) MarkDependenciesMet(messageIDs []MessageID) error return errors.New(errMsg) } + +func (rm *ReliabilityManager) StartPeriodicTasks() error { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to start periodic tasks %v", err) + return err + } + + Debug("Starting periodic tasks") + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoStartPeriodicTasks(rm.rmCtx, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + Debug("Successfully started periodic tasks") + return nil + } + + errMsg := "error StartPeriodicTasks: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to start periodic tasks: %v", errMsg) + + return errors.New(errMsg) +} From 53419ac8199d37a012e67b69145e18566ab98c99 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 12:20:11 +0300 Subject: [PATCH 12/20] parsing message ready event --- sds/sds.go | 43 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index 18f9ed2..40b8080 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -138,6 +138,7 @@ import ( ) const requestTimeout = 30 * time.Second +const EventChanBufferSize = 1024 //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { @@ -153,8 +154,9 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // ReliabilityManager represents an instance of a nim-sds ReliabilityManager type ReliabilityManager struct { - rmCtx unsafe.Pointer - channelId string + rmCtx unsafe.Pointer + channelId string + MsgReadyChan chan MessageID } func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { @@ -181,6 +183,8 @@ func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { rm.rmCtx = C.cGoNewReliabilityManager(cChannelId, resp) wg.Wait() + rm.MsgReadyChan = make(chan MessageID, EventChanBufferSize) + C.cGoSetEventCallback(rm.rmCtx) registerReliabilityManager(rm) @@ -233,7 +237,14 @@ type jsonEvent struct { EventType string `json:"eventType"` } +type msgReady struct { + MessageId MessageID `json:"messageId"` +} + func (rm *ReliabilityManager) OnEvent(eventStr string) { + + fmt.Println("------------------- received event: ", eventStr) + jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) if err != nil { @@ -243,10 +254,30 @@ func (rm *ReliabilityManager) OnEvent(eventStr string) { } switch jsonEvent.EventType { - case "event 1": - fmt.Println("-------- received event 1") - case "event 2": - fmt.Println("-------- received event 1") + case "message_ready": + rm.parseMessageReadyEvent(eventStr) + case "message_sent": + fmt.Println("-------- received event 2") + case "missing_dependencies": + fmt.Println("-------- received event 3") + case "periodic_sync": + fmt.Println("-------- received event 4") + } + +} + +func (rm *ReliabilityManager) parseMessageReadyEvent(eventStr string) { + + msgReady := msgReady{} + err := json.Unmarshal([]byte(eventStr), &msgReady) + if err != nil { + Error("could not parse message ready event %v", err) + } + + select { + case rm.MsgReadyChan <- msgReady.MessageId: + default: + Warn("Can't deliver message ready event, MsgReadyChan is full") } } From 8eef70bd673b6135fb74b29611a2a0afe0b62dc0 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 13:02:00 +0300 Subject: [PATCH 13/20] including make update to the makefile --- sds/Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/sds/Makefile b/sds/Makefile index 8976491..d03d7ec 100644 --- a/sds/Makefile +++ b/sds/Makefile @@ -22,6 +22,7 @@ prepare: git clone $(NIM_SDS_REPO) && \ cd $(NIM_SDS_DIR) && \ git checkout gabrielmer-feat-init-implementation; \ + make update; \ else \ echo "nim-sds repository already exists."; \ fi From b9f1d74a78f907fe69480bdee711e39e12776fcd Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 14:06:55 +0300 Subject: [PATCH 14/20] registering callbacks --- sds/sds.go | 70 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index 40b8080..b1ed01c 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -152,11 +152,18 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { } } +type EventCallbacks struct { + OnMessageReady func(messageId MessageID) + OnMessageSent func(messageId MessageID) + OnMissingDependencies func(messageId MessageID, missingDeps []MessageID) + OnPeriodicSync func() +} + // ReliabilityManager represents an instance of a nim-sds ReliabilityManager type ReliabilityManager struct { - rmCtx unsafe.Pointer - channelId string - MsgReadyChan chan MessageID + rmCtx unsafe.Pointer + channelId string + callbacks EventCallbacks } func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { @@ -183,8 +190,6 @@ func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { rm.rmCtx = C.cGoNewReliabilityManager(cChannelId, resp) wg.Wait() - rm.MsgReadyChan = make(chan MessageID, EventChanBufferSize) - C.cGoSetEventCallback(rm.rmCtx) registerReliabilityManager(rm) @@ -237,10 +242,19 @@ type jsonEvent struct { EventType string `json:"eventType"` } -type msgReady struct { +type msgEvent struct { MessageId MessageID `json:"messageId"` } +type missingDepsEvent struct { + MessageId MessageID `json:"messageId"` + MissingDeps []MessageID `json:"missingDeps"` +} + +func (rm *ReliabilityManager) RegisterCallbacks(callbacks EventCallbacks) { + rm.callbacks = callbacks +} + func (rm *ReliabilityManager) OnEvent(eventStr string) { fmt.Println("------------------- received event: ", eventStr) @@ -257,27 +271,53 @@ func (rm *ReliabilityManager) OnEvent(eventStr string) { case "message_ready": rm.parseMessageReadyEvent(eventStr) case "message_sent": - fmt.Println("-------- received event 2") + rm.parseMessageSentEvent(eventStr) case "missing_dependencies": - fmt.Println("-------- received event 3") + rm.parseMissingDepsEvent(eventStr) case "periodic_sync": - fmt.Println("-------- received event 4") + if rm.callbacks.OnPeriodicSync != nil { + rm.callbacks.OnPeriodicSync() + } } } func (rm *ReliabilityManager) parseMessageReadyEvent(eventStr string) { - msgReady := msgReady{} - err := json.Unmarshal([]byte(eventStr), &msgReady) + msgEvent := msgEvent{} + err := json.Unmarshal([]byte(eventStr), &msgEvent) if err != nil { Error("could not parse message ready event %v", err) } - select { - case rm.MsgReadyChan <- msgReady.MessageId: - default: - Warn("Can't deliver message ready event, MsgReadyChan is full") + if rm.callbacks.OnMessageReady != nil { + rm.callbacks.OnMessageReady(msgEvent.MessageId) + } +} + +func (rm *ReliabilityManager) parseMessageSentEvent(eventStr string) { + + msgEvent := msgEvent{} + err := json.Unmarshal([]byte(eventStr), &msgEvent) + if err != nil { + Error("could not parse message sent event %v", err) + } + + if rm.callbacks.OnMessageSent != nil { + rm.callbacks.OnMessageSent(msgEvent.MessageId) + } +} + +func (rm *ReliabilityManager) parseMissingDepsEvent(eventStr string) { + + missingDepsEvent := missingDepsEvent{} + err := json.Unmarshal([]byte(eventStr), &missingDepsEvent) + if err != nil { + Error("could not parse missing dependencies event %v", err) + } + + if rm.callbacks.OnMissingDependencies != nil { + rm.callbacks.OnMissingDependencies(missingDepsEvent.MessageId, missingDepsEvent.MissingDeps) } } From d3781a636a75f5a3e356b5ab7785054317fde7fc Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 17:24:37 +0300 Subject: [PATCH 15/20] adding callbacks test (not yet working) and little fixes --- sds/sds.go | 4 +- sds/sds_test.go | 124 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index b1ed01c..8d77e82 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -257,7 +257,7 @@ func (rm *ReliabilityManager) RegisterCallbacks(callbacks EventCallbacks) { func (rm *ReliabilityManager) OnEvent(eventStr string) { - fmt.Println("------------------- received event: ", eventStr) + fmt.Println("------------------- received event: ", eventStr) // TODO: remove after debugging jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) @@ -385,7 +385,7 @@ func (rm *ReliabilityManager) WrapOutgoingMessage(message []byte, messageId Mess return nil, err } - Debug("Wraping outgoing message %v", messageId) + Debug("Wrapping outgoing message %v", messageId) wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) diff --git a/sds/sds_test.go b/sds/sds_test.go index cab0155..7407fd3 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -1,7 +1,10 @@ package sds import ( + "fmt" + "sync" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -89,3 +92,124 @@ func TestDependencies(t *testing.T) { err = rm2.MarkDependenciesMet([]MessageID{msgID1}) require.NoError(t, err) } + +// Test callbacks +func TestCallbacks(t *testing.T) { + channelID := "test-callbacks" + rm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer rm.Cleanup() + + var wg sync.WaitGroup + receivedReady := make(map[MessageID]bool) + receivedSent := make(map[MessageID]bool) + receivedMissing := make(map[MessageID][]MessageID) + syncRequested := false + var cbMutex sync.Mutex // Protect access to callback tracking maps/vars + + callbacks := EventCallbacks{ + OnMessageReady: func(messageId MessageID) { + fmt.Printf("Test: OnMessageReady received: %s\n", messageId) + cbMutex.Lock() + receivedReady[messageId] = true + cbMutex.Unlock() + wg.Done() + }, + OnMessageSent: func(messageId MessageID) { + fmt.Printf("Test: OnMessageSent received: %s\n", messageId) + cbMutex.Lock() + receivedSent[messageId] = true + cbMutex.Unlock() + wg.Done() + }, + OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { + fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps) + cbMutex.Lock() + receivedMissing[messageId] = missingDeps + cbMutex.Unlock() + wg.Done() + }, + OnPeriodicSync: func() { + fmt.Println("Test: OnPeriodicSync received") + cbMutex.Lock() + syncRequested = true + cbMutex.Unlock() + // Don't wg.Done() here, it might be called multiple times + }, + } + + rm.RegisterCallbacks(callbacks) + + // Start tasks AFTER registering callbacks + err = rm.StartPeriodicTasks() + require.NoError(t, err) + + // --- Test Scenario --- + + // 1. Send msg1 + wg.Add(1) // Expect OnMessageSent for msg1 eventually + payload1 := []byte("callback test 1") + msgID1 := MessageID("cb-msg-1") + wrappedMsg1, err := rm.WrapOutgoingMessage(payload1, msgID1) + require.NoError(t, err) + + // 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history) + wg.Add(1) // Expect OnMessageReady for msg1 + _, err = rm.UnwrapReceivedMessage(wrappedMsg1) + require.NoError(t, err) + + // 3. Send msg2 (depends on msg1) + wg.Add(1) // Expect OnMessageSent for msg2 eventually + payload2 := []byte("callback test 2") + msgID2 := MessageID("cb-msg-2") + wrappedMsg2, err := rm.WrapOutgoingMessage(payload2, msgID2) + require.NoError(t, err) + + // 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2) + wg.Add(1) // Expect OnMessageReady for msg2 + _, err = rm.UnwrapReceivedMessage(wrappedMsg2) + require.NoError(t, err) + + // --- Verification --- + // Wait for expected callbacks with a timeout + waitTimeout(&wg, 5*time.Second, t) + + cbMutex.Lock() + defer cbMutex.Unlock() + + if !receivedReady[msgID1] { + t.Errorf("OnMessageReady not called for %s", msgID1) + } + if !receivedReady[msgID2] { + t.Errorf("OnMessageReady not called for %s", msgID2) + } + if !receivedSent[msgID1] { + t.Errorf("OnMessageSent not called for %s", msgID1) + } + if !receivedSent[msgID2] { + t.Errorf("OnMessageSent not called for %s", msgID2) + } + // We didn't explicitly test missing deps in this path + if len(receivedMissing) > 0 { + t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing) + } + // Periodic sync is harder to guarantee in a short test, just check if it was ever true + if !syncRequested { + t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout") + } +} + +// Helper function to wait for WaitGroup with a timeout +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + // Completed normally + case <-time.After(timeout): + t.Fatalf("Timed out waiting for callbacks") + } +} From ef141fa42045226fec2d20e520fa5f8cab288aad Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 17:30:52 +0300 Subject: [PATCH 16/20] updating readme --- README.md | 46 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c3d6c7a..3dc1a91 100644 --- a/README.md +++ b/README.md @@ -1 +1,45 @@ -# sds-go-bindings \ No newline at end of file +# SDS Go Bindings + +This repository provides Go bindings for the SDS library, enabling seamless integration with Go projects. + +## Installation + +To build the required dependencies for this module, the `make` command needs to be executed. If you are integrating this module into another project via `go get`, ensure that you navigate to the `sds-go-bindings/sds` directory and run `make`. + +### Steps to Install + +Follow these steps to install and set up the module: + +1. Retrieve the module using `go get`: + ``` + go get -u github.com/waku-org/waku-go-bindings + ``` +2. Navigate to the module's directory: + ``` + cd $(go list -m -f '{{.Dir}}' github.com/waku-org/sds-go-bindings) + ``` +3. Prepare third_party directory which will clone `nim-sds` + ``` + sudo mkdir third_party + sudo chown $USER third_party + ``` +4. Build the dependencies: + ``` + make -C sds + ``` + +Now the module is ready for use in your project. + +### Note + +In order to easily build the libsds library on demand, it is recommended to add the following target in your project's Makefile: + +``` +LIBSDS_DEP_PATH=$(shell go list -m -f '{{.Dir}}' github.com/waku-org/sds-go-bindings) + +buildlib: + cd $(LIBSDS_DEP_PATH) &&\ + sudo mkdir -p third_party &&\ + sudo chown $(USER) third_party &&\ + make -C sds +``` From a785ba85ede7701cfb0054972fb40e145ca5edd6 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 18:38:09 +0300 Subject: [PATCH 17/20] removing waku references --- README.md | 2 +- go.mod | 2 +- sds/Makefile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 3dc1a91..4178753 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Follow these steps to install and set up the module: 1. Retrieve the module using `go get`: ``` - go get -u github.com/waku-org/waku-go-bindings + go get -u github.com/waku-org/sds-go-bindings ``` 2. Navigate to the module's directory: ``` diff --git a/go.mod b/go.mod index 07b99e5..9c1a6e2 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/waku-org/waku-go-bindings +module github.com/waku-org/sds-go-bindings go 1.22.10 diff --git a/sds/Makefile b/sds/Makefile index d03d7ec..edd1bf8 100644 --- a/sds/Makefile +++ b/sds/Makefile @@ -1,4 +1,4 @@ -# Makefile for Waku Go Bindings +# Makefile for SDS Go Bindings # Directories THIRD_PARTY_DIR := ../third_party From 32ea974f2ed1e62dfb3c43298de925185f56d49b Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 21 Apr 2025 13:04:46 +0300 Subject: [PATCH 18/20] adding event tests --- sds/sds_test.go | 488 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 406 insertions(+), 82 deletions(-) diff --git a/sds/sds_test.go b/sds/sds_test.go index 7407fd3..58caac2 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -1,7 +1,6 @@ package sds import ( - "fmt" "sync" "testing" "time" @@ -93,112 +92,437 @@ func TestDependencies(t *testing.T) { require.NoError(t, err) } -// Test callbacks -func TestCallbacks(t *testing.T) { - channelID := "test-callbacks" +// Test OnMessageReady callback +func TestCallback_OnMessageReady(t *testing.T) { + channelID := "test-cb-ready" + + // Create sender and receiver RMs + senderRm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer senderRm.Cleanup() + + receiverRm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer receiverRm.Cleanup() + + // Use a channel for signaling + readyChan := make(chan MessageID, 1) + + callbacks := EventCallbacks{ + OnMessageReady: func(messageId MessageID) { + // Non-blocking send to channel + select { + case readyChan <- messageId: + default: + // Avoid blocking if channel is full or test already timed out + } + }, + } + + // Register callback only on the receiver + receiverRm.RegisterCallbacks(callbacks) + + // Scenario: Wrap message on sender, unwrap on receiver + payload := []byte("ready test") + msgID := MessageID("cb-ready-1") + + // Wrap on sender + wrappedMsg, err := senderRm.WrapOutgoingMessage(payload, msgID) + require.NoError(t, err) + + // Unwrap on receiver + _, err = receiverRm.UnwrapReceivedMessage(wrappedMsg) + require.NoError(t, err) + + // Verification - Wait on channel with timeout + select { + case receivedMsgID := <-readyChan: + // Mark as called implicitly since we received on channel + if receivedMsgID != msgID { + t.Errorf("OnMessageReady called with wrong ID: got %q, want %q", receivedMsgID, msgID) + } + case <-time.After(2 * time.Second): + // If timeout occurs, the channel receive failed. + t.Errorf("Timed out waiting for OnMessageReady callback on readyChan") + } +} + +// Test OnMessageSent callback (via causal history ACK) +func TestCallback_OnMessageSent(t *testing.T) { + channelID := "test-cb-sent" + + // Create two RMs + rm1, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer rm1.Cleanup() + + rm2, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer rm2.Cleanup() + + var wg sync.WaitGroup + sentCalled := false + var sentMsgID MessageID + var cbMutex sync.Mutex + + callbacks := EventCallbacks{ + OnMessageSent: func(messageId MessageID) { + cbMutex.Lock() + sentCalled = true + sentMsgID = messageId + cbMutex.Unlock() + wg.Done() + }, + } + + // Register callback on rm1 (the original sender) + rm1.RegisterCallbacks(callbacks) + + // Scenario: rm1 sends msg1, rm2 receives msg1, + // rm2 sends msg2 (acking msg1), rm1 receives msg2. + + // 1. rm1 sends msg1 + payload1 := []byte("sent test 1") + msgID1 := MessageID("cb-sent-1") + wrappedMsg1, err := rm1.WrapOutgoingMessage(payload1, msgID1) + require.NoError(t, err) + // Note: msg1 is now in rm1's outgoing buffer + + // 2. rm2 receives msg1 (to update its state) + _, err = rm2.UnwrapReceivedMessage(wrappedMsg1) + require.NoError(t, err) + + // 3. rm2 sends msg2 (will include msg1 in causal history) + payload2 := []byte("sent test 2") + msgID2 := MessageID("cb-sent-2") + wrappedMsg2, err := rm2.WrapOutgoingMessage(payload2, msgID2) + require.NoError(t, err) + + // 4. rm1 receives msg2 (should trigger ACK for msg1) + wg.Add(1) // Expect OnMessageSent for msg1 on rm1 + _, err = rm1.UnwrapReceivedMessage(wrappedMsg2) + require.NoError(t, err) + + // Verification + waitTimeout(&wg, 2*time.Second, t) + + cbMutex.Lock() + defer cbMutex.Unlock() + if !sentCalled { + t.Errorf("OnMessageSent was not called") + } + // We primarily care that msg1 was ACKed. + if sentMsgID != msgID1 { + t.Errorf("OnMessageSent called with wrong ID: got %q, want %q", sentMsgID, msgID1) + } +} + +// Test OnMissingDependencies callback +func TestCallback_OnMissingDependencies(t *testing.T) { + channelID := "test-cb-missing" + + // Use separate sender/receiver RMs explicitly + senderRm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer senderRm.Cleanup() + + receiverRm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer receiverRm.Cleanup() + + var wg sync.WaitGroup + missingCalled := false + var missingMsgID MessageID + var missingDepsList []MessageID + var cbMutex sync.Mutex + + callbacks := EventCallbacks{ + OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { + cbMutex.Lock() + missingCalled = true + missingMsgID = messageId + missingDepsList = missingDeps // Copy slice + cbMutex.Unlock() + wg.Done() + }, + } + + // Register callback only on the receiver rm + receiverRm.RegisterCallbacks(callbacks) + + // Scenario: Sender sends msg1, then sender sends msg2 (depends on msg1), + // then receiver receives msg2 (which hasn't seen msg1). + + // 1. Sender sends msg1 + payload1 := []byte("missing test 1") + msgID1 := MessageID("cb-miss-1") + _, err = senderRm.WrapOutgoingMessage(payload1, msgID1) + require.NoError(t, err) + + // 2. Sender sends msg2 (depends on msg1) + payload2 := []byte("missing test 2") + msgID2 := MessageID("cb-miss-2") + wrappedMsg2, err := senderRm.WrapOutgoingMessage(payload2, msgID2) + require.NoError(t, err) + + // 3. Receiver receives msg2 (haven't seen msg1) + wg.Add(1) // Expect OnMissingDependencies + _, err = receiverRm.UnwrapReceivedMessage(wrappedMsg2) + require.NoError(t, err) + + // Verification + waitTimeout(&wg, 2*time.Second, t) + + cbMutex.Lock() + defer cbMutex.Unlock() + if !missingCalled { + t.Errorf("OnMissingDependencies was not called") + } + if missingMsgID != msgID2 { + t.Errorf("OnMissingDependencies called for wrong ID: got %q, want %q", missingMsgID, msgID2) + } + foundDep := false + for _, dep := range missingDepsList { + if dep == msgID1 { + foundDep = true + break + } + } + if !foundDep { + t.Errorf("OnMissingDependencies did not report %q as missing, got: %v", msgID1, missingDepsList) + } +} + +// Test OnPeriodicSync callback +func TestCallback_OnPeriodicSync(t *testing.T) { + channelID := "test-cb-sync" rm, err := NewReliabilityManager(channelID) require.NoError(t, err) defer rm.Cleanup() - var wg sync.WaitGroup - receivedReady := make(map[MessageID]bool) - receivedSent := make(map[MessageID]bool) - receivedMissing := make(map[MessageID][]MessageID) - syncRequested := false - var cbMutex sync.Mutex // Protect access to callback tracking maps/vars + syncCalled := false + var cbMutex sync.Mutex + // Use a channel to signal when the callback is hit + syncChan := make(chan bool, 1) callbacks := EventCallbacks{ - OnMessageReady: func(messageId MessageID) { - fmt.Printf("Test: OnMessageReady received: %s\n", messageId) - cbMutex.Lock() - receivedReady[messageId] = true - cbMutex.Unlock() - wg.Done() - }, - OnMessageSent: func(messageId MessageID) { - fmt.Printf("Test: OnMessageSent received: %s\n", messageId) - cbMutex.Lock() - receivedSent[messageId] = true - cbMutex.Unlock() - wg.Done() - }, - OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { - fmt.Printf("Test: OnMissingDependencies received for %s: %v\n", messageId, missingDeps) - cbMutex.Lock() - receivedMissing[messageId] = missingDeps - cbMutex.Unlock() - wg.Done() - }, OnPeriodicSync: func() { - fmt.Println("Test: OnPeriodicSync received") cbMutex.Lock() - syncRequested = true + if !syncCalled { // Only signal the first time + syncCalled = true + syncChan <- true + } cbMutex.Unlock() - // Don't wg.Done() here, it might be called multiple times }, } rm.RegisterCallbacks(callbacks) - // Start tasks AFTER registering callbacks + // Start periodic tasks err = rm.StartPeriodicTasks() require.NoError(t, err) - // --- Test Scenario --- - - // 1. Send msg1 - wg.Add(1) // Expect OnMessageSent for msg1 eventually - payload1 := []byte("callback test 1") - msgID1 := MessageID("cb-msg-1") - wrappedMsg1, err := rm.WrapOutgoingMessage(payload1, msgID1) - require.NoError(t, err) - - // 2. Receive msg1 (triggers OnMessageReady for msg1, OnMessageSent for msg1 via causal history) - wg.Add(1) // Expect OnMessageReady for msg1 - _, err = rm.UnwrapReceivedMessage(wrappedMsg1) - require.NoError(t, err) - - // 3. Send msg2 (depends on msg1) - wg.Add(1) // Expect OnMessageSent for msg2 eventually - payload2 := []byte("callback test 2") - msgID2 := MessageID("cb-msg-2") - wrappedMsg2, err := rm.WrapOutgoingMessage(payload2, msgID2) - require.NoError(t, err) - - // 4. Receive msg2 (triggers OnMessageReady for msg2, OnMessageSent for msg2) - wg.Add(1) // Expect OnMessageReady for msg2 - _, err = rm.UnwrapReceivedMessage(wrappedMsg2) - require.NoError(t, err) - // --- Verification --- - // Wait for expected callbacks with a timeout - waitTimeout(&wg, 5*time.Second, t) + // Wait for the periodic sync callback with a timeout (needs to be longer than sync interval) + select { + case <-syncChan: + // Success + case <-time.After(10 * time.Second): + t.Errorf("Timed out waiting for OnPeriodicSync callback") + } cbMutex.Lock() defer cbMutex.Unlock() - - if !receivedReady[msgID1] { - t.Errorf("OnMessageReady not called for %s", msgID1) - } - if !receivedReady[msgID2] { - t.Errorf("OnMessageReady not called for %s", msgID2) - } - if !receivedSent[msgID1] { - t.Errorf("OnMessageSent not called for %s", msgID1) - } - if !receivedSent[msgID2] { - t.Errorf("OnMessageSent not called for %s", msgID2) - } - // We didn't explicitly test missing deps in this path - if len(receivedMissing) > 0 { - t.Errorf("Unexpected OnMissingDependencies calls: %v", receivedMissing) - } - // Periodic sync is harder to guarantee in a short test, just check if it was ever true - if !syncRequested { + if !syncCalled { + // This might happen if the timeout was too short t.Logf("Warning: OnPeriodicSync might not have been called within the test timeout") } } +// Combined Test for multiple callbacks +func TestCallbacks_Combined(t *testing.T) { + channelID := "test-cb-combined" + + // Create sender and receiver handles + senderRm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer senderRm.Cleanup() + + receiverRm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer receiverRm.Cleanup() + + // Channels for synchronization + readyChan1 := make(chan bool, 1) + sentChan1 := make(chan bool, 1) + missingChan := make(chan []MessageID, 1) + + // Use maps for verification + receivedReady := make(map[MessageID]bool) + receivedSent := make(map[MessageID]bool) + var cbMutex sync.Mutex + + callbacksReceiver := EventCallbacks{ + OnMessageReady: func(messageId MessageID) { + cbMutex.Lock() + receivedReady[messageId] = true + cbMutex.Unlock() + if messageId == "cb-comb-1" { + // Use non-blocking send + select { + case readyChan1 <- true: + default: + } + } + }, + OnMessageSent: func(messageId MessageID) { + // This callback is registered on Receiver, but Sent events + // are typically relevant to the Sender. We don't expect this. + t.Errorf("Unexpected OnMessageSent call on Receiver for %s", messageId) + }, + OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { + // This callback is registered on Receiver, used for handleReceiver2 below + }, + } + + callbacksSender := EventCallbacks{ + OnMessageReady: func(messageId MessageID) { + // Not expected on sender in this test flow + }, + OnMessageSent: func(messageId MessageID) { + cbMutex.Lock() + receivedSent[messageId] = true + cbMutex.Unlock() + if messageId == "cb-comb-1" { + select { + case sentChan1 <- true: + default: + } + } + }, + OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { + // Not expected on sender + }, + } + + // Register callbacks + receiverRm.RegisterCallbacks(callbacksReceiver) + senderRm.RegisterCallbacks(callbacksSender) + + // --- Test Scenario --- + msgID1 := MessageID("cb-comb-1") + msgID2 := MessageID("cb-comb-2") + msgID3 := MessageID("cb-comb-3") + payload1 := []byte("combined test 1") + payload2 := []byte("combined test 2") + payload3 := []byte("combined test 3") + + // 1. Sender sends msg1 + wrappedMsg1, err := senderRm.WrapOutgoingMessage(payload1, msgID1) + require.NoError(t, err) + + // 2. Receiver receives msg1 + _, err = receiverRm.UnwrapReceivedMessage(wrappedMsg1) + require.NoError(t, err) + + // 3. Receiver sends msg2 (depends on msg1 implicitly via state) + wrappedMsg2, err := receiverRm.WrapOutgoingMessage(payload2, msgID2) + require.NoError(t, err) + + // 4. Sender receives msg2 from Receiver (acks msg1 for sender) + _, err = senderRm.UnwrapReceivedMessage(wrappedMsg2) + require.NoError(t, err) + + // 5. Sender sends msg3 (depends on msg2) + wrappedMsg3, err := senderRm.WrapOutgoingMessage(payload3, msgID3) + require.NoError(t, err) + + // 6. Create Receiver2, register missing deps callback + receiverRm2, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer receiverRm2.Cleanup() + + callbacksReceiver2 := EventCallbacks{ + OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { + if messageId == msgID3 { + select { + case missingChan <- missingDeps: + default: + } + } + }, + } + + receiverRm2.RegisterCallbacks(callbacksReceiver2) + + // 7. Receiver2 receives msg3 (should report missing msg1, msg2) + _, err = receiverRm2.UnwrapReceivedMessage(wrappedMsg3) + require.NoError(t, err) + + // --- Verification --- + timeout := 5 * time.Second + expectedReady1 := false + expectedSent1 := false + var reportedMissingDeps []MessageID + missingDepsReceived := false + + receivedCount := 0 + expectedCount := 3 // ready1, sent1, missingDeps + timer := time.NewTimer(timeout) + defer timer.Stop() + + for receivedCount < expectedCount { + select { + case <-readyChan1: + if !expectedReady1 { // Avoid double counting if signaled twice + expectedReady1 = true + receivedCount++ + } + case <-sentChan1: + if !expectedSent1 { + expectedSent1 = true + receivedCount++ + } + case deps := <-missingChan: + if !missingDepsReceived { + reportedMissingDeps = deps + missingDepsReceived = true + receivedCount++ + } + case <-timer.C: + t.Fatalf("Timed out waiting for combined callbacks (received %d out of %d)", receivedCount, expectedCount) + } + } + + // Check results + cbMutex.Lock() + defer cbMutex.Unlock() + + if !expectedReady1 || !receivedReady[msgID1] { + t.Errorf("OnMessageReady not called/verified for %s", msgID1) + } + if !expectedSent1 || !receivedSent[msgID1] { + t.Errorf("OnMessageSent not called/verified for %s", msgID1) + } + if !missingDepsReceived { + t.Errorf("OnMissingDependencies not called/verified for %s", msgID3) + } else { + foundDep1 := false + foundDep2 := false + for _, dep := range reportedMissingDeps { + if dep == msgID1 { + foundDep1 = true + } + if dep == msgID2 { + foundDep2 = true + } + } + if !foundDep1 || !foundDep2 { + t.Errorf("OnMissingDependencies for %s reported wrong deps: got %v, want %s and %s", msgID3, reportedMissingDeps, msgID1, msgID2) + } + } +} + // Helper function to wait for WaitGroup with a timeout func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, t *testing.T) { c := make(chan struct{}) From 780e0423e44fb038406799711c6dc1ededcb314e Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Mon, 21 Apr 2025 13:07:23 +0300 Subject: [PATCH 19/20] removing print and fixing comments --- sds/sds.go | 3 --- sds/sds_test.go | 4 ++-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index 8d77e82..4d45aad 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -129,7 +129,6 @@ import "C" import ( "encoding/json" "errors" - "fmt" "strconv" "strings" "sync" @@ -257,8 +256,6 @@ func (rm *ReliabilityManager) RegisterCallbacks(callbacks EventCallbacks) { func (rm *ReliabilityManager) OnEvent(eventStr string) { - fmt.Println("------------------- received event: ", eventStr) // TODO: remove after debugging - jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) if err != nil { diff --git a/sds/sds_test.go b/sds/sds_test.go index 58caac2..36e37f2 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -343,7 +343,7 @@ func TestCallback_OnPeriodicSync(t *testing.T) { func TestCallbacks_Combined(t *testing.T) { channelID := "test-cb-combined" - // Create sender and receiver handles + // Create sender and receiver RMs senderRm, err := NewReliabilityManager(channelID) require.NoError(t, err) defer senderRm.Cleanup() @@ -381,7 +381,7 @@ func TestCallbacks_Combined(t *testing.T) { t.Errorf("Unexpected OnMessageSent call on Receiver for %s", messageId) }, OnMissingDependencies: func(messageId MessageID, missingDeps []MessageID) { - // This callback is registered on Receiver, used for handleReceiver2 below + // This callback is registered on Receiver, used for receiverRm2 below }, } From ccf3beb5eec365da8946072d904e456fc448ce76 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 25 Apr 2025 11:17:25 +0200 Subject: [PATCH 20/20] adding extra context for reliability manager nil errors --- sds/sds.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index 4d45aad..fec7caf 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -320,7 +320,7 @@ func (rm *ReliabilityManager) parseMissingDepsEvent(eventStr string) { func (rm *ReliabilityManager) Cleanup() error { if rm == nil { - err := errors.New("reliability manager is nil") + err := errors.New("reliability manager is nil in Cleanup") Error("Failed to cleanup %v", err) return err } @@ -349,7 +349,7 @@ func (rm *ReliabilityManager) Cleanup() error { func (rm *ReliabilityManager) Reset() error { if rm == nil { - err := errors.New("reliability manager is nil") + err := errors.New("reliability manager is nil in Reset") Error("Failed to reset %v", err) return err } @@ -377,7 +377,7 @@ func (rm *ReliabilityManager) Reset() error { func (rm *ReliabilityManager) WrapOutgoingMessage(message []byte, messageId MessageID) ([]byte, error) { if rm == nil { - err := errors.New("reliability manager is nil") + err := errors.New("reliability manager is nil in WrapOutgoingMessage") Error("Failed to wrap outgoing message %v", err) return nil, err } @@ -434,7 +434,7 @@ func (rm *ReliabilityManager) WrapOutgoingMessage(message []byte, messageId Mess func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedMessage, error) { if rm == nil { - err := errors.New("reliability manager is nil") + err := errors.New("reliability manager is nil in UnwrapReceivedMessage") Error("Failed to unwrap received message %v", err) return nil, err } @@ -483,7 +483,7 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM // MarkDependenciesMet informs the library that dependencies are met func (rm *ReliabilityManager) MarkDependenciesMet(messageIDs []MessageID) error { if rm == nil { - err := errors.New("reliability manager is nil") + err := errors.New("reliability manager is nil in MarkDependenciesMet") Error("Failed to mark dependencies met %v", err) return err } @@ -529,7 +529,7 @@ func (rm *ReliabilityManager) MarkDependenciesMet(messageIDs []MessageID) error func (rm *ReliabilityManager) StartPeriodicTasks() error { if rm == nil { - err := errors.New("reliability manager is nil") + err := errors.New("reliability manager is nil in StartPeriodicTasks") Error("Failed to start periodic tasks %v", err) return err }