mirror of
https://github.com/logos-messaging/sds-go-bindings.git
synced 2026-01-02 06:03:12 +00:00
134 lines
3.6 KiB
Go
134 lines
3.6 KiB
Go
package sds
|
|
|
|
import (
|
|
"encoding/json"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const requestTimeout = 30 * time.Second
|
|
const EventChanBufferSize = 1024
|
|
|
|
type EventCallbacks struct {
|
|
OnMessageReady func(messageId MessageID, channelId string)
|
|
OnMessageSent func(messageId MessageID, channelId string)
|
|
OnMissingDependencies func(messageId MessageID, missingDeps []MessageID, channelId string)
|
|
OnPeriodicSync func()
|
|
}
|
|
|
|
// ReliabilityManager represents an instance of a nim-sds ReliabilityManager
|
|
type ReliabilityManager struct {
|
|
logger *zap.Logger
|
|
rmCtx unsafe.Pointer
|
|
callbacks EventCallbacks
|
|
}
|
|
|
|
// The event callback sends back the rm ctx to know to which
|
|
// rm is the event being emited for. Since we only have a global
|
|
// callback in the go side, We register all the rm's that we create
|
|
// so we can later obtain which instance of `ReliabilityManager` it should
|
|
// be invoked depending on the ctx received
|
|
var rmRegistry map[unsafe.Pointer]*ReliabilityManager
|
|
|
|
func init() {
|
|
rmRegistry = make(map[unsafe.Pointer]*ReliabilityManager)
|
|
}
|
|
|
|
func registerReliabilityManager(rm *ReliabilityManager) {
|
|
_, ok := rmRegistry[rm.rmCtx]
|
|
if !ok {
|
|
rmRegistry[rm.rmCtx] = rm
|
|
}
|
|
}
|
|
|
|
func unregisterReliabilityManager(rm *ReliabilityManager) {
|
|
delete(rmRegistry, rm.rmCtx)
|
|
}
|
|
|
|
type jsonEvent struct {
|
|
EventType string `json:"eventType"`
|
|
}
|
|
|
|
type msgEvent struct {
|
|
MessageId MessageID `json:"messageId"`
|
|
ChannelId string `json:"channelId"`
|
|
}
|
|
|
|
type missingDepsEvent struct {
|
|
MessageId MessageID `json:"messageId"`
|
|
MissingDeps []MessageID `json:"missingDeps"`
|
|
ChannelId string `json:"channelId"`
|
|
}
|
|
|
|
func (rm *ReliabilityManager) RegisterCallbacks(callbacks EventCallbacks) {
|
|
rm.callbacks = callbacks
|
|
}
|
|
|
|
func (rm *ReliabilityManager) OnEvent(eventStr string) {
|
|
jsonEvent := jsonEvent{}
|
|
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
|
|
if err != nil {
|
|
rm.logger.Error("failed to unmarshal sds event string", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
switch jsonEvent.EventType {
|
|
case "message_ready":
|
|
rm.parseMessageReadyEvent(eventStr)
|
|
case "message_sent":
|
|
rm.parseMessageSentEvent(eventStr)
|
|
case "missing_dependencies":
|
|
rm.parseMissingDepsEvent(eventStr)
|
|
case "periodic_sync":
|
|
if rm.callbacks.OnPeriodicSync != nil {
|
|
rm.callbacks.OnPeriodicSync()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rm *ReliabilityManager) OnCallbackError(callerRet int, err string) {
|
|
rm.logger.Error("sds callback error",
|
|
zap.Int("retCode", callerRet),
|
|
zap.String("errMsg", err))
|
|
}
|
|
|
|
func (rm *ReliabilityManager) parseMessageReadyEvent(eventStr string) {
|
|
msgEvent := msgEvent{}
|
|
err := json.Unmarshal([]byte(eventStr), &msgEvent)
|
|
if err != nil {
|
|
rm.logger.Error("failed to parse message ready event", zap.Error(err))
|
|
}
|
|
|
|
if rm.callbacks.OnMessageReady != nil {
|
|
rm.callbacks.OnMessageReady(msgEvent.MessageId, msgEvent.ChannelId)
|
|
}
|
|
}
|
|
|
|
func (rm *ReliabilityManager) parseMessageSentEvent(eventStr string) {
|
|
msgEvent := msgEvent{}
|
|
err := json.Unmarshal([]byte(eventStr), &msgEvent)
|
|
if err != nil {
|
|
rm.logger.Error("failed to parse message sent event", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if rm.callbacks.OnMessageSent != nil {
|
|
rm.callbacks.OnMessageSent(msgEvent.MessageId, msgEvent.ChannelId)
|
|
}
|
|
}
|
|
|
|
func (rm *ReliabilityManager) parseMissingDepsEvent(eventStr string) {
|
|
missingDepsEvent := missingDepsEvent{}
|
|
err := json.Unmarshal([]byte(eventStr), &missingDepsEvent)
|
|
if err != nil {
|
|
rm.logger.Error("failed to parse missing dependencies event", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if rm.callbacks.OnMissingDependencies != nil {
|
|
rm.callbacks.OnMissingDependencies(missingDepsEvent.MessageId, missingDepsEvent.MissingDeps, missingDepsEvent.ChannelId)
|
|
}
|
|
}
|