mirror of
https://github.com/logos-messaging/sds-go-bindings.git
synced 2026-01-02 06:03:12 +00:00
initial implementation of dependencies met (not working yet)
This commit is contained in:
parent
188ef6faf0
commit
8b5c79b351
64
sds/sds.go
64
sds/sds.go
@ -108,6 +108,18 @@ package sds
|
||||
(SdsCallBack) GoCallback,
|
||||
resp);
|
||||
}
|
||||
|
||||
static void cGoMarkDependenciesMet(void* rmCtx,
|
||||
char** messageIDs,
|
||||
size_t count,
|
||||
void* resp) {
|
||||
UnwrapReceivedMessage(rmCtx,
|
||||
messageIDs,
|
||||
count,
|
||||
(SdsCallBack) GoCallback,
|
||||
resp);
|
||||
}
|
||||
|
||||
*/
|
||||
import "C"
|
||||
import (
|
||||
@ -380,8 +392,6 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM
|
||||
}
|
||||
Debug("Successfully unwrapped message")
|
||||
|
||||
fmt.Println("------------ UnwrapReceivedMessage res: ", resStr)
|
||||
|
||||
unwrappedMessage := UnwrappedMessage{}
|
||||
err := json.Unmarshal([]byte(resStr), &unwrappedMessage)
|
||||
if err != nil {
|
||||
@ -389,9 +399,6 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Println(unwrappedMessage.Message)
|
||||
fmt.Println(unwrappedMessage.MissingDeps)
|
||||
|
||||
return &unwrappedMessage, nil
|
||||
}
|
||||
|
||||
@ -400,3 +407,50 @@ func (rm *ReliabilityManager) UnwrapReceivedMessage(message []byte) (*UnwrappedM
|
||||
|
||||
return nil, errors.New(errMsg)
|
||||
}
|
||||
|
||||
// MarkDependenciesMet informs the library that dependencies are met
|
||||
func (rm *ReliabilityManager) MarkDependenciesMet(messageIDs []MessageID) error {
|
||||
if rm == nil {
|
||||
err := errors.New("reliability manager is nil")
|
||||
Error("Failed to mark dependencies met %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if len(messageIDs) == 0 {
|
||||
return nil // Nothing to do
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
var resp = C.allocResp(unsafe.Pointer(&wg))
|
||||
defer C.freeResp(resp)
|
||||
|
||||
// Convert Go string slice to C array of C strings (char**)
|
||||
cMessageIDs := make([]*C.char, len(messageIDs))
|
||||
for i, id := range messageIDs {
|
||||
cMessageIDs[i] = C.CString(string(id))
|
||||
defer C.free(unsafe.Pointer(cMessageIDs[i])) // Ensure each CString is freed
|
||||
}
|
||||
|
||||
// Create a pointer (**C.char) to the first element of the slice
|
||||
var cMessageIDsPtr **C.char
|
||||
if len(cMessageIDs) > 0 {
|
||||
cMessageIDsPtr = &cMessageIDs[0]
|
||||
} else {
|
||||
cMessageIDsPtr = nil // Handle empty slice case
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
// Pass the pointer variable (cMessageIDsPtr) directly, which is of type **C.char
|
||||
C.cGoMarkDependenciesMet(rm.rmCtx, cMessageIDsPtr, C.size_t(len(messageIDs)), resp)
|
||||
wg.Wait()
|
||||
|
||||
if C.getRet(resp) == C.RET_OK {
|
||||
Debug("Successfully marked dependencies as met")
|
||||
return nil
|
||||
}
|
||||
|
||||
errMsg := "error MarkDependenciesMet: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
|
||||
Error("Failed to mark dependencies as met: %v", errMsg)
|
||||
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
|
||||
@ -41,3 +41,51 @@ func TestWrapUnwrap(t *testing.T) {
|
||||
require.Equal(t, string(*unwrappedMessage.Message), string(originalPayload), "Expected unwrapped and original payloads to be equal")
|
||||
require.Equal(t, len(*unwrappedMessage.MissingDeps), 0, "Expexted to be no missing dependencies")
|
||||
}
|
||||
|
||||
// Test dependency handling
|
||||
func TestDependencies(t *testing.T) {
|
||||
channelID := "test-deps"
|
||||
rm, err := NewReliabilityManager(channelID)
|
||||
require.NoError(t, err)
|
||||
defer rm.Cleanup()
|
||||
|
||||
// 1. Send message 1 (will become a dependency)
|
||||
payload1 := []byte("message one")
|
||||
msgID1 := MessageID("msg-dep-1")
|
||||
wrappedMsg1, err := rm.WrapOutgoingMessage(payload1, msgID1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Simulate receiving msg1 to add it to history (implicitly acknowledges it)
|
||||
_, err = rm.UnwrapReceivedMessage(wrappedMsg1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// 2. Send message 2 (depends on message 1 implicitly via causal history)
|
||||
payload2 := []byte("message two")
|
||||
msgID2 := MessageID("msg-dep-2")
|
||||
wrappedMsg2, err := rm.WrapOutgoingMessage(payload2, msgID2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// 3. Create a new manager to simulate a different peer receiving msg2 without msg1
|
||||
rm2, err := NewReliabilityManager(channelID) // Same channel ID
|
||||
require.NoError(t, err)
|
||||
defer rm2.Cleanup()
|
||||
|
||||
// 4. Unwrap message 2 on the second manager - should report msg1 as missing
|
||||
unwrappedMessage2, err := rm2.UnwrapReceivedMessage(wrappedMsg2)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Greater(t, len(*unwrappedMessage2.MissingDeps), 0, "Expected missing dependencies, got none")
|
||||
|
||||
foundDep1 := false
|
||||
for _, dep := range *unwrappedMessage2.MissingDeps {
|
||||
if dep == msgID1 {
|
||||
foundDep1 = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, foundDep1, "Expected missing dependency %q, got %v", msgID1, *unwrappedMessage2.MissingDeps)
|
||||
|
||||
// 5. Mark the dependency as met
|
||||
err = rm2.MarkDependenciesMet([]MessageID{msgID1})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user