From 53419ac8199d37a012e67b69145e18566ab98c99 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 12:20:11 +0300 Subject: [PATCH] parsing message ready event --- sds/sds.go | 43 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index 18f9ed2..40b8080 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -138,6 +138,7 @@ import ( ) const requestTimeout = 30 * time.Second +const EventChanBufferSize = 1024 //export GoCallback func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { @@ -153,8 +154,9 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { // ReliabilityManager represents an instance of a nim-sds ReliabilityManager type ReliabilityManager struct { - rmCtx unsafe.Pointer - channelId string + rmCtx unsafe.Pointer + channelId string + MsgReadyChan chan MessageID } func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { @@ -181,6 +183,8 @@ func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { rm.rmCtx = C.cGoNewReliabilityManager(cChannelId, resp) wg.Wait() + rm.MsgReadyChan = make(chan MessageID, EventChanBufferSize) + C.cGoSetEventCallback(rm.rmCtx) registerReliabilityManager(rm) @@ -233,7 +237,14 @@ type jsonEvent struct { EventType string `json:"eventType"` } +type msgReady struct { + MessageId MessageID `json:"messageId"` +} + func (rm *ReliabilityManager) OnEvent(eventStr string) { + + fmt.Println("------------------- received event: ", eventStr) + jsonEvent := jsonEvent{} err := json.Unmarshal([]byte(eventStr), &jsonEvent) if err != nil { @@ -243,10 +254,30 @@ func (rm *ReliabilityManager) OnEvent(eventStr string) { } switch jsonEvent.EventType { - case "event 1": - fmt.Println("-------- received event 1") - case "event 2": - fmt.Println("-------- received event 1") + case "message_ready": + rm.parseMessageReadyEvent(eventStr) + case "message_sent": + fmt.Println("-------- received event 2") + case "missing_dependencies": + fmt.Println("-------- received event 3") + case "periodic_sync": + fmt.Println("-------- received event 4") + } + +} + +func (rm *ReliabilityManager) parseMessageReadyEvent(eventStr string) { + + msgReady := msgReady{} + err := json.Unmarshal([]byte(eventStr), &msgReady) + if err != nil { + Error("could not parse message ready event %v", err) + } + + select { + case rm.MsgReadyChan <- msgReady.MessageId: + default: + Warn("Can't deliver message ready event, MsgReadyChan is full") } }