registering callbacks

This commit is contained in:
Gabriel mermelstein 2025-04-16 14:06:55 +03:00
parent 8eef70bd67
commit b9f1d74a78
No known key found for this signature in database
GPG Key ID: 82B8134785FEAE0D

View File

@ -152,11 +152,18 @@ func GoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
}
}
type EventCallbacks struct {
OnMessageReady func(messageId MessageID)
OnMessageSent func(messageId MessageID)
OnMissingDependencies func(messageId MessageID, missingDeps []MessageID)
OnPeriodicSync func()
}
// ReliabilityManager represents an instance of a nim-sds ReliabilityManager
type ReliabilityManager struct {
rmCtx unsafe.Pointer
channelId string
MsgReadyChan chan MessageID
rmCtx unsafe.Pointer
channelId string
callbacks EventCallbacks
}
func NewReliabilityManager(channelId string) (*ReliabilityManager, error) {
@ -183,8 +190,6 @@ func NewReliabilityManager(channelId string) (*ReliabilityManager, error) {
rm.rmCtx = C.cGoNewReliabilityManager(cChannelId, resp)
wg.Wait()
rm.MsgReadyChan = make(chan MessageID, EventChanBufferSize)
C.cGoSetEventCallback(rm.rmCtx)
registerReliabilityManager(rm)
@ -237,10 +242,19 @@ type jsonEvent struct {
EventType string `json:"eventType"`
}
type msgReady struct {
type msgEvent struct {
MessageId MessageID `json:"messageId"`
}
type missingDepsEvent struct {
MessageId MessageID `json:"messageId"`
MissingDeps []MessageID `json:"missingDeps"`
}
func (rm *ReliabilityManager) RegisterCallbacks(callbacks EventCallbacks) {
rm.callbacks = callbacks
}
func (rm *ReliabilityManager) OnEvent(eventStr string) {
fmt.Println("------------------- received event: ", eventStr)
@ -257,27 +271,53 @@ func (rm *ReliabilityManager) OnEvent(eventStr string) {
case "message_ready":
rm.parseMessageReadyEvent(eventStr)
case "message_sent":
fmt.Println("-------- received event 2")
rm.parseMessageSentEvent(eventStr)
case "missing_dependencies":
fmt.Println("-------- received event 3")
rm.parseMissingDepsEvent(eventStr)
case "periodic_sync":
fmt.Println("-------- received event 4")
if rm.callbacks.OnPeriodicSync != nil {
rm.callbacks.OnPeriodicSync()
}
}
}
func (rm *ReliabilityManager) parseMessageReadyEvent(eventStr string) {
msgReady := msgReady{}
err := json.Unmarshal([]byte(eventStr), &msgReady)
msgEvent := msgEvent{}
err := json.Unmarshal([]byte(eventStr), &msgEvent)
if err != nil {
Error("could not parse message ready event %v", err)
}
select {
case rm.MsgReadyChan <- msgReady.MessageId:
default:
Warn("Can't deliver message ready event, MsgReadyChan is full")
if rm.callbacks.OnMessageReady != nil {
rm.callbacks.OnMessageReady(msgEvent.MessageId)
}
}
func (rm *ReliabilityManager) parseMessageSentEvent(eventStr string) {
msgEvent := msgEvent{}
err := json.Unmarshal([]byte(eventStr), &msgEvent)
if err != nil {
Error("could not parse message sent event %v", err)
}
if rm.callbacks.OnMessageSent != nil {
rm.callbacks.OnMessageSent(msgEvent.MessageId)
}
}
func (rm *ReliabilityManager) parseMissingDepsEvent(eventStr string) {
missingDepsEvent := missingDepsEvent{}
err := json.Unmarshal([]byte(eventStr), &missingDepsEvent)
if err != nil {
Error("could not parse missing dependencies event %v", err)
}
if rm.callbacks.OnMissingDependencies != nil {
rm.callbacks.OnMissingDependencies(missingDepsEvent.MessageId, missingDepsEvent.MissingDeps)
}
}