diff --git a/sds/sds.go b/sds/sds.go index 65be925..62a3e13 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -85,12 +85,27 @@ package sds static void cGoResetReliabilityManager(void* rmCtx, void* resp) { ResetReliabilityManager(rmCtx, (SdsCallBack) GoCallback, resp); } + + static void cGoWrapOutgoingMessage(void* rmCtx, + void* message, + size_t messageLen, + const char* messageId, + void* resp) { + WrapOutgoingMessage(rmCtx, + message, + messageLen, + messageId, + (SdsCallBack) GoCallback, + resp); + } */ import "C" import ( "encoding/json" "errors" "fmt" + "strconv" + "strings" "sync" "time" "unsafe" @@ -117,6 +132,8 @@ type ReliabilityManager struct { channelId string } +type MessageID string + func NewReliabilityManager(channelId string, name string) (*ReliabilityManager, error) { Debug("Creating new Reliability Manager: %v", name) rm := &ReliabilityManager{ @@ -267,3 +284,60 @@ func (rm *ReliabilityManager) Reset() error { return errors.New(errMsg) } + +func (rm *ReliabilityManager) WrapOutgoingMessage(message []byte, messageId MessageID) ([]byte, error) { + if rm == nil { + err := errors.New("reliability manager is nil") + Error("Failed to wrap outgoing message %v", err) + return nil, err + } + + Debug("Wraping outgoing message %v", messageId) + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + cMessageId := C.CString(string(messageId)) + defer C.free(unsafe.Pointer(cMessageId)) + + var cMessagePtr unsafe.Pointer + if len(message) > 0 { + cMessagePtr = C.CBytes(message) // C.CBytes allocates memory that needs to be freed + defer C.free(cMessagePtr) + } else { + cMessagePtr = nil + } + cMessageLen := C.size_t(len(message)) + + wg.Add(1) + C.cGoWrapOutgoingMessage(rm.rmCtx, cMessagePtr, cMessageLen, cMessageId, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + resStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if resStr == "" { + Debug("Received empty res string for messageId: %v", messageId) + return nil, nil + } + Debug("Successfully wrapped message %s", messageId) + + parts := strings.Split(resStr, ",") + bytes := make([]byte, len(parts)) + + for i, part := range parts { + n, err := strconv.Atoi(strings.TrimSpace(part)) + if err != nil { + panic(err) + } + bytes[i] = byte(n) + } + + return bytes, nil + } + + errMsg := "error WrapOutgoingMessage: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to wrap message %v: %v", messageId, errMsg) + + return nil, errors.New(errMsg) +} diff --git a/sds/sds_test.go b/sds/sds_test.go index e71e53e..12dfe23 100644 --- a/sds/sds_test.go +++ b/sds/sds_test.go @@ -1,6 +1,7 @@ package sds import ( + "fmt" "testing" "github.com/stretchr/testify/require" @@ -33,3 +34,18 @@ func TestReset(t *testing.T) { require.NoError(t, err) } + +func TestWrap(t *testing.T) { + + rm, err := NewReliabilityManager("my-channel-id", "rm") + require.NoError(t, err) + defer rm.Cleanup() + + msg := []byte{1, 2, 3, 4, 5} + + res, err := rm.WrapOutgoingMessage(msg, "my-message-id") + require.NoError(t, err) + + fmt.Println("---------- len(res): ", len(res)) + +}