From 8b5c79b35136cbcd902aee74583f2075927b801e Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 15 Apr 2025 18:28:27 +0300 Subject: [PATCH] initial implementation of dependencies met (not working yet) --- sds/sds.go | 64 +++++++++++++++++++++++++++++++++++++++++++++---- sds/sds_test.go | 48 +++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 5 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index ed06581..e626b86 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -108,6 +108,18 @@ package sds (SdsCallBack) GoCallback, resp); } + + static void cGoMarkDependenciesMet(void* rmCtx, + char** messageIDs, + size_t count, + void* resp) { + UnwrapReceivedMessage(rmCtx, + messageIDs, + count, + (SdsCallBack) GoCallback, + resp); + } + */ import "C" import ( @@ -380,8 +392,6 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM } Debug("Successfully unwrapped message") - fmt.Println("------------ UnwrapReceivedMessage res: ", resStr) - unwrappedMessage := UnwrappedMessage{} err := json.Unmarshal([]byte(resStr), &unwrappedMessage) if err != nil { @@ -389,9 +399,6 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM return nil, err } - fmt.Println(unwrappedMessage.Message) - fmt.Println(unwrappedMessage.MissingDeps) - return &unwrappedMessage, nil } @@ -400,3 +407,50 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM return nil, errors.New(errMsg) } + +// MarkDependenciesMet informs the library that dependencies are met +func (rm *ReliabilityManager) MarkDependenciesMet(messageIDs []MessageID) error { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to mark dependencies met %v", err) + return err + } + + if len(messageIDs) == 0 { + return nil // Nothing to do + } + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + // Convert Go string slice to C array of C strings (char**) + cMessageIDs := make([]*C.char, len(messageIDs)) + for i, id := range messageIDs { + cMessageIDs[i] = C.CString(string(id)) + defer C.free(unsafe.Pointer(cMessageIDs[i])) // Ensure each CString is freed + } + + // Create a pointer (**C.char) to the first element of the slice + var cMessageIDsPtr **C.char + if len(cMessageIDs) > 0 { + cMessageIDsPtr = &cMessageIDs[0] + } else { + cMessageIDsPtr = nil // Handle empty slice case + } + + wg.Add(1) + // Pass the pointer variable (cMessageIDsPtr) directly, which is of type **C.char + C.cGoMarkDependenciesMet(rm.rmCtx, cMessageIDsPtr, C.size_t(len(messageIDs)), resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + Debug("Successfully marked dependencies as met") + return nil + } + + errMsg := "error MarkDependenciesMet: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to mark dependencies as met: %v", errMsg) + + return errors.New(errMsg) +} diff --git a/sds/sds_test.go b/sds/sds_test.go index e1c681d..cab0155 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -41,3 +41,51 @@ func TestWrapUnwrap(t *testing.T) { require.Equal(t, string(*unwrappedMessage.Message), string(originalPayload), "Expected unwrapped and original payloads to be equal") require.Equal(t, len(*unwrappedMessage.MissingDeps), 0, "Expexted to be no missing dependencies") } + +// Test dependency handling +func TestDependencies(t *testing.T) { + channelID := "test-deps" + rm, err := NewReliabilityManager(channelID) + require.NoError(t, err) + defer rm.Cleanup() + + // 1. Send message 1 (will become a dependency) + payload1 := []byte("message one") + msgID1 := MessageID("msg-dep-1") + wrappedMsg1, err := rm.WrapOutgoingMessage(payload1, msgID1) + require.NoError(t, err) + + // Simulate receiving msg1 to add it to history (implicitly acknowledges it) + _, err = rm.UnwrapReceivedMessage(wrappedMsg1) + require.NoError(t, err) + + // 2. Send message 2 (depends on message 1 implicitly via causal history) + payload2 := []byte("message two") + msgID2 := MessageID("msg-dep-2") + wrappedMsg2, err := rm.WrapOutgoingMessage(payload2, msgID2) + require.NoError(t, err) + + // 3. Create a new manager to simulate a different peer receiving msg2 without msg1 + rm2, err := NewReliabilityManager(channelID) // Same channel ID + require.NoError(t, err) + defer rm2.Cleanup() + + // 4. Unwrap message 2 on the second manager - should report msg1 as missing + unwrappedMessage2, err := rm2.UnwrapReceivedMessage(wrappedMsg2) + require.NoError(t, err) + + require.Greater(t, len(*unwrappedMessage2.MissingDeps), 0, "Expected missing dependencies, got none") + + foundDep1 := false + for _, dep := range *unwrappedMessage2.MissingDeps { + if dep == msgID1 { + foundDep1 = true + break + } + } + require.True(t, foundDep1, "Expected missing dependency %q, got %v", msgID1, *unwrappedMessage2.MissingDeps) + + // 5. Mark the dependency as met + err = rm2.MarkDependenciesMet([]MessageID{msgID1}) + require.NoError(t, err) +}