From b9f1d74a78f907fe69480bdee711e39e12776fcd Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 16 Apr 2025 14:06:55 +0300 Subject: [PATCH] registering callbacks --- sds/sds.go | 70 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/sds/sds.go b/sds/sds.go index 40b8080..b1ed01c 100644 --- a/sds/sds.go +++ b/sds/sds.go @@ -152,11 +152,18 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { } } +type EventCallbacks struct { + OnMessageReady func(messageId MessageID) + OnMessageSent func(messageId MessageID) + OnMissingDependencies func(messageId MessageID, missingDeps []MessageID) + OnPeriodicSync func() +} + // ReliabilityManager represents an instance of a nim-sds ReliabilityManager type ReliabilityManager struct { - rmCtx unsafe.Pointer - channelId string - MsgReadyChan chan MessageID + rmCtx unsafe.Pointer + channelId string + callbacks EventCallbacks } func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { @@ -183,8 +190,6 @@ func NewReliabilityManager(channelId string) (*ReliabilityManager, error) { rm.rmCtx = C.cGoNewReliabilityManager(cChannelId, resp) wg.Wait() - rm.MsgReadyChan = make(chan MessageID, EventChanBufferSize) - C.cGoSetEventCallback(rm.rmCtx) registerReliabilityManager(rm) @@ -237,10 +242,19 @@ type jsonEvent struct { EventType string `json:"eventType"` } -type msgReady struct { +type msgEvent struct { MessageId MessageID `json:"messageId"` } +type missingDepsEvent struct { + MessageId MessageID `json:"messageId"` + MissingDeps []MessageID `json:"missingDeps"` +} + +func (rm *ReliabilityManager) RegisterCallbacks(callbacks EventCallbacks) { + rm.callbacks = callbacks +} + func (rm *ReliabilityManager) OnEvent(eventStr string) { fmt.Println("------------------- received event: ", eventStr) @@ -257,27 +271,53 @@ func (rm *ReliabilityManager) OnEvent(eventStr string) { case "message_ready": rm.parseMessageReadyEvent(eventStr) case "message_sent": - fmt.Println("-------- received event 2") + rm.parseMessageSentEvent(eventStr) case "missing_dependencies": - fmt.Println("-------- received event 3") + rm.parseMissingDepsEvent(eventStr) case "periodic_sync": - fmt.Println("-------- received event 4") + if rm.callbacks.OnPeriodicSync != nil { + rm.callbacks.OnPeriodicSync() + } } } func (rm *ReliabilityManager) parseMessageReadyEvent(eventStr string) { - msgReady := msgReady{} - err := json.Unmarshal([]byte(eventStr), &msgReady) + msgEvent := msgEvent{} + err := json.Unmarshal([]byte(eventStr), &msgEvent) if err != nil { Error("could not parse message ready event %v", err) } - select { - case rm.MsgReadyChan <- msgReady.MessageId: - default: - Warn("Can't deliver message ready event, MsgReadyChan is full") + if rm.callbacks.OnMessageReady != nil { + rm.callbacks.OnMessageReady(msgEvent.MessageId) + } +} + +func (rm *ReliabilityManager) parseMessageSentEvent(eventStr string) { + + msgEvent := msgEvent{} + err := json.Unmarshal([]byte(eventStr), &msgEvent) + if err != nil { + Error("could not parse message sent event %v", err) + } + + if rm.callbacks.OnMessageSent != nil { + rm.callbacks.OnMessageSent(msgEvent.MessageId) + } +} + +func (rm *ReliabilityManager) parseMissingDepsEvent(eventStr string) { + + missingDepsEvent := missingDepsEvent{} + err := json.Unmarshal([]byte(eventStr), &missingDepsEvent) + if err != nil { + Error("could not parse missing dependencies event %v", err) + } + + if rm.callbacks.OnMissingDependencies != nil { + rm.callbacks.OnMissingDependencies(missingDepsEvent.MessageId, missingDepsEvent.MissingDeps) } }