adding WrapOutgoingMessage

This commit is contained in:
Gabriel mermelstein 2025-04-15 13:13:56 +03:00
parent f31ac1a8f7
commit 8ec2883889
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D
2 changed files with 90 additions and 0 deletions

View File

@ -85,12 +85,27 @@ package sds
static void cGoResetReliabilityManager(void* rmCtx, void* resp) { static void cGoResetReliabilityManager(void* rmCtx, void* resp) {
ResetReliabilityManager(rmCtx, (SdsCallBack) GoCallback, resp); ResetReliabilityManager(rmCtx, (SdsCallBack) GoCallback, resp);
} }
static void cGoWrapOutgoingMessage(void* rmCtx,
void* message,
size_t messageLen,
const char* messageId,
void* resp) {
WrapOutgoingMessage(rmCtx,
message,
messageLen,
messageId,
(SdsCallBack) GoCallback,
resp);
}
*/ */
import "C" import "C"
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings"
"sync" "sync"
"time" "time"
"unsafe" "unsafe"
@ -117,6 +132,8 @@ type ReliabilityManager struct {
channelId string channelId string
} }
type MessageID string
func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, error) { func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, error) {
Debug("Creating new Reliability Manager: %v", name) Debug("Creating new Reliability Manager: %v", name)
rm := &ReliabilityManager{ rm := &ReliabilityManager{
@ -267,3 +284,60 @@ func (rm *ReliabilityManager) Reset() error {
return errors.New(errMsg) return errors.New(errMsg)
} }
func (rm *ReliabilityManager) WrapOutgoingMessage(message []byte, messageId MessageID) ([]byte, error) {
if rm == nil {
err := errors.New("reliability manager is nil")
Error("Failed to wrap outgoing message %v", err)
return nil, err
}
Debug("Wraping outgoing message %v", messageId)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
cMessageId := C.CString(string(messageId))
defer C.free(unsafe.Pointer(cMessageId))
var cMessagePtr unsafe.Pointer
if len(message) > 0 {
cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed
defer C.free(cMessagePtr)
} else {
cMessagePtr = nil
}
cMessageLen := C.size_t(len(message))
wg.Add(1)
C.cGoWrapOutgoingMessage(rm.rmCtx, cMessagePtr, cMessageLen, cMessageId, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
resStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if resStr == "" {
Debug("Received empty res string for messageId: %v", messageId)
return nil, nil
}
Debug("Successfully wrapped message %s", messageId)
parts := strings.Split(resStr, ",")
bytes := make([]byte, len(parts))
for i, part := range parts {
n, err := strconv.Atoi(strings.TrimSpace(part))
if err != nil {
panic(err)
}
bytes[i] = byte(n)
}
return bytes, nil
}
errMsg := "error WrapOutgoingMessage: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to wrap message %v: %v", messageId, errMsg)
return nil, errors.New(errMsg)
}

View File

@ -1,6 +1,7 @@
package sds package sds
import ( import (
"fmt"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -33,3 +34,18 @@ func TestReset(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func TestWrap(t *testing.T) {
rm, err := NewReliabilityManager("my-channel-id", "rm")
require.NoError(t, err)
defer rm.Cleanup()
msg := []byte{1, 2, 3, 4, 5}
res, err := rm.WrapOutgoingMessage(msg, "my-message-id")
require.NoError(t, err)
fmt.Println("---------- len(res): ", len(res))
}