From 86c8595b0cf255dc5ca3e81ab25f510332172b64 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Apr 2025 17:07:47 +0300 Subject: [PATCH] adding UnwrapReceivedMessage and removing rm name --- sds/sds.go | 87 +++++++++++++++++++++++++++++++++++++++++-------- sds/sds_test.go | 30 +++++++++-------- sds/types.go | 8 +++++ 3 files changed, 97 insertions(+), 28 deletions(-) create mode 100644 sds/types.go diff --git a/sds/sds.go b/sds/sds.go index 62a3e13..ed06581 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -98,6 +98,16 @@ package sds (SdsCallBack) GoCallback, resp); } + static void cGoUnwrapReceivedMessage(void* rmCtx, + void* message, + size_t messageLen, + void* resp) { + UnwrapReceivedMessage(rmCtx, + message, + messageLen, + (SdsCallBack) GoCallback, + resp); + } */ import "C" import ( @@ -128,17 +138,13 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // ReliabilityManager represents an instance of a nim-sds ReliabilityManager type ReliabilityManager struct { rmCtx unsafe.Pointer - name string channelId string } -type MessageID string - -func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, error) { - Debug("Creating new Reliability Manager: %v", name) +func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { + Debug("Creating new Reliability Manager") rm := &ReliabilityManager{ channelId: channelId, - name: name, } wg := sync.WaitGroup{} @@ -151,7 +157,7 @@ func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, if C.getRet(resp) != C.RET_OK { errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("error NewReliabilityManager for %s: %v", name, errMsg) + Error("error NewReliabilityManager: %v", errMsg) return nil, errors.New(errMsg) } @@ -162,7 +168,7 @@ func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, C.cGoSetEventCallback(rm.rmCtx) registerReliabilityManager(rm) - Debug("Successfully created Reliability Manager: %s", name) + Debug("Successfully created Reliability Manager") return rm, nil } @@ -235,7 +241,7 @@ func (rm *ReliabilityManager) Cleanup() error { return err } - Debug("Cleaning up %v", rm.name) + Debug("Cleaning up reliability manager") wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -247,12 +253,12 @@ func (rm *ReliabilityManager) Cleanup() error { if C.getRet(resp) == C.RET_OK { unregisterReliabilityManager(rm) - Debug("Successfully cleaned up %s", rm.name) + Debug("Successfully cleaned up reliability manager") return nil } errMsg := "error CleanupReliabilityManager: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to cleanup %v: %v", rm.name, errMsg) + Error("Failed to cleanup reliability manager: %v", errMsg) return errors.New(errMsg) } @@ -264,7 +270,7 @@ func (rm *ReliabilityManager) Reset() error { return err } - Debug("Resetting %v", rm.name) + Debug("Resetting reliability manager") wg := sync.WaitGroup{} var resp = C.allocResp(unsafe.Pointer(&wg)) @@ -275,12 +281,12 @@ func (rm *ReliabilityManager) Reset() error { wg.Wait() if C.getRet(resp) == C.RET_OK { - Debug("Successfully resetted %s", rm.name) + Debug("Successfully resetted reliability manager") return nil } errMsg := "error ResetReliabilityManager: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - Error("Failed to reset %v: %v", rm.name, errMsg) + Error("Failed to reset reliability manager: %v", errMsg) return errors.New(errMsg) } @@ -341,3 +347,56 @@ func (rm *ReliabilityManager) WrapOutgoingMessage(message []byte, messageId Mess return nil, errors.New(errMsg) } + +func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedMessage, error) { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to unwrap received message %v", err) + return nil, err + } + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + var cMessagePtr unsafe.Pointer + if len(message) > 0 { + cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed + defer C.free(cMessagePtr) + } else { + cMessagePtr = nil + } + cMessageLen := C.size_t(len(message)) + + wg.Add(1) + C.cGoUnwrapReceivedMessage(rm.rmCtx, cMessagePtr, cMessageLen, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + resStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if resStr == "" { + Debug("Received empty res string") + return nil, nil + } + Debug("Successfully unwrapped message") + + fmt.Println("------------ UnwrapReceivedMessage res: ", resStr) + + unwrappedMessage := UnwrappedMessage{} + err := json.Unmarshal([]byte(resStr), &unwrappedMessage) + if err != nil { + Error("Failed to unmarshal unwrapped message") + return nil, err + } + + fmt.Println(unwrappedMessage.Message) + fmt.Println(unwrappedMessage.MissingDeps) + + return &unwrappedMessage, nil + } + + errMsg := "error UnwrapReceivedMessage: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to unwrap message: %v", errMsg) + + return nil, errors.New(errMsg) +} diff --git a/sds/sds_test.go b/sds/sds_test.go index 12dfe23..d17fbfb 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -1,7 +1,6 @@ package sds import ( - "fmt" "testing" "github.com/stretchr/testify/require" @@ -9,22 +8,17 @@ import ( func TestCreateAndCleanup(t *testing.T) { - rm1, err := NewReliabilityManager("my-channel-id-1", "rm1") - require.NoError(t, err) - - rm2, err := NewReliabilityManager("my-channel-id-2", "rm2") + rm1, err := NewReliabilityManager("my-channel-id-1") require.NoError(t, err) err = rm1.Cleanup() require.NoError(t, err) - err = rm2.Cleanup() - require.NoError(t, err) } func TestReset(t *testing.T) { - rm, err := NewReliabilityManager("my-channel-id", "rm") + rm, err := NewReliabilityManager("my-channel-id") require.NoError(t, err) err = rm.Reset() @@ -35,17 +29,25 @@ func TestReset(t *testing.T) { } -func TestWrap(t *testing.T) { - - rm, err := NewReliabilityManager("my-channel-id", "rm") +// Test wrapping and unwrapping a simple message +func TestWrapUnwrap(t *testing.T) { + channelID := "test-wrap-unwrap" + rm, err := NewReliabilityManager(channelID) require.NoError(t, err) defer rm.Cleanup() - msg := []byte{1, 2, 3, 4, 5} + originalPayload := []byte("hello reliability") + messageID := MessageID("msg-wrap-1") - res, err := rm.WrapOutgoingMessage(msg, "my-message-id") + wrappedMsg, err := rm.WrapOutgoingMessage(originalPayload, messageID) require.NoError(t, err) - fmt.Println("---------- len(res): ", len(res)) + require.Greater(t, len(wrappedMsg), 0, "Expected non-empty wrapped message") + // Simulate receiving the wrapped message + unwrappedMessage, err := rm.UnwrapReceivedMessage(wrappedMsg) + require.NoError(t, err) + + require.Equal(t, string(*unwrappedMessage.Message), string(originalPayload), "Expected unwrapped and original payloads to be equal") + require.Equal(t, len(*unwrappedMessage.MissingDeps), 0, "Expexted to be no missing dependencies") } diff --git a/sds/types.go b/sds/types.go new file mode 100644 index 0000000..cbe3b9b --- /dev/null +++ b/sds/types.go @@ -0,0 +1,8 @@ +package sds + +type MessageID string + +type UnwrappedMessage struct { + Message *[]byte `json:"message"` + MissingDeps *[]MessageID `json:"missingDeps"` +}