sds-go-bindings/sds/sds_common.go

134 lines
3.6 KiB
Go
Raw Permalink Normal View History

2025-11-19 18:36:37 +00:00
package sds
import (
2025-11-19 18:49:07 +00:00
"encoding/json"
"time"
"unsafe"
"go.uber.org/zap"
2025-11-19 18:36:37 +00:00
)
const requestTimeout = 30 * time.Second
const EventChanBufferSize = 1024
type EventCallbacks struct {
2025-11-19 18:49:07 +00:00
OnMessageReady func(messageId MessageID, channelId string)
OnMessageSent func(messageId MessageID, channelId string)
OnMissingDependencies func(messageId MessageID, missingDeps []MessageID, channelId string)
OnPeriodicSync func()
2025-11-19 18:36:37 +00:00
}
// ReliabilityManager represents an instance of a nim-sds ReliabilityManager
type ReliabilityManager struct {
logger *zap.Logger
2025-11-19 18:49:07 +00:00
rmCtx unsafe.Pointer
callbacks EventCallbacks
2025-11-19 18:36:37 +00:00
}
// The event callback sends back the rm ctx to know to which
// rm is the event being emited for. Since we only have a global
// callback in the go side, We register all the rm's that we create
// so we can later obtain which instance of `ReliabilityManager` it should
// be invoked depending on the ctx received
var rmRegistry map[unsafe.Pointer]*ReliabilityManager
func init() {
2025-11-19 18:49:07 +00:00
rmRegistry = make(map[unsafe.Pointer]*ReliabilityManager)
2025-11-19 18:36:37 +00:00
}
func registerReliabilityManager(rm *ReliabilityManager) {
2025-11-19 18:49:07 +00:00
_, ok := rmRegistry[rm.rmCtx]
if !ok {
rmRegistry[rm.rmCtx] = rm
}
2025-11-19 18:36:37 +00:00
}
func unregisterReliabilityManager(rm *ReliabilityManager) {
2025-11-19 18:49:07 +00:00
delete(rmRegistry, rm.rmCtx)
2025-11-19 18:36:37 +00:00
}
type jsonEvent struct {
2025-11-19 18:49:07 +00:00
EventType string `json:"eventType"`
2025-11-19 18:36:37 +00:00
}
type msgEvent struct {
2025-11-19 18:49:07 +00:00
MessageId MessageID `json:"messageId"`
ChannelId string `json:"channelId"`
2025-11-19 18:36:37 +00:00
}
type missingDepsEvent struct {
2025-11-19 18:49:07 +00:00
MessageId MessageID `json:"messageId"`
MissingDeps []MessageID `json:"missingDeps"`
ChannelId string `json:"channelId"`
2025-11-19 18:36:37 +00:00
}
func (rm *ReliabilityManager) RegisterCallbacks(callbacks EventCallbacks) {
2025-11-19 18:49:07 +00:00
rm.callbacks = callbacks
2025-11-19 18:36:37 +00:00
}
func (rm *ReliabilityManager) OnEvent(eventStr string) {
2025-11-19 18:49:07 +00:00
jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
if err != nil {
rm.logger.Error("failed to unmarshal sds event string", zap.Error(err))
2025-11-19 18:49:07 +00:00
return
}
switch jsonEvent.EventType {
case "message_ready":
rm.parseMessageReadyEvent(eventStr)
case "message_sent":
rm.parseMessageSentEvent(eventStr)
case "missing_dependencies":
rm.parseMissingDepsEvent(eventStr)
case "periodic_sync":
if rm.callbacks.OnPeriodicSync != nil {
rm.callbacks.OnPeriodicSync()
}
}
2025-11-19 18:36:37 +00:00
}
func (rm *ReliabilityManager) OnCallbackError(callerRet int, err string) {
rm.logger.Error("sds callback error",
zap.Int("retCode", callerRet),
zap.String("errMsg", err))
}
2025-11-19 18:36:37 +00:00
func (rm *ReliabilityManager) parseMessageReadyEvent(eventStr string) {
2025-11-19 18:49:07 +00:00
msgEvent := msgEvent{}
err := json.Unmarshal([]byte(eventStr), &msgEvent)
if err != nil {
rm.logger.Error("failed to parse message ready event", zap.Error(err))
2025-11-19 18:49:07 +00:00
}
if rm.callbacks.OnMessageReady != nil {
rm.callbacks.OnMessageReady(msgEvent.MessageId, msgEvent.ChannelId)
}
2025-11-19 18:36:37 +00:00
}
func (rm *ReliabilityManager) parseMessageSentEvent(eventStr string) {
2025-11-19 18:49:07 +00:00
msgEvent := msgEvent{}
err := json.Unmarshal([]byte(eventStr), &msgEvent)
if err != nil {
rm.logger.Error("failed to parse message sent event", zap.Error(err))
return
2025-11-19 18:49:07 +00:00
}
if rm.callbacks.OnMessageSent != nil {
rm.callbacks.OnMessageSent(msgEvent.MessageId, msgEvent.ChannelId)
}
2025-11-19 18:36:37 +00:00
}
func (rm *ReliabilityManager) parseMissingDepsEvent(eventStr string) {
2025-11-19 18:49:07 +00:00
missingDepsEvent := missingDepsEvent{}
err := json.Unmarshal([]byte(eventStr), &missingDepsEvent)
if err != nil {
rm.logger.Error("failed to parse missing dependencies event", zap.Error(err))
return
2025-11-19 18:49:07 +00:00
}
if rm.callbacks.OnMissingDependencies != nil {
rm.callbacks.OnMissingDependencies(missingDepsEvent.MessageId, missingDepsEvent.MissingDeps, missingDepsEvent.ChannelId)
}
2025-11-19 18:36:37 +00:00
}