style: cleanup and format

This commit is contained in:
Igor Sirotin 2025-11-19 18:49:07 +00:00
parent 3bcf73b531
commit e78f763079
No known key found for this signature in database
GPG Key ID: 0EABBCB40CB9AD4A
2 changed files with 85 additions and 87 deletions

View File

@ -135,16 +135,15 @@ import (
"strconv"
"strings"
"sync"
"time"
"unsafe"
)
//export SdsGoCallback
func SdsGoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
if resp != nil {
m := (*C.SdsResp)(resp)
m.ret = ret
m.msg = msg
if resp != nil {
m := (*C.SdsResp)(resp)
m.ret = ret
m.msg = msg
m.len = len
wg := (*sync.WaitGroup)(m.ffiWg)
wg.Done()
@ -152,10 +151,10 @@ func SdsGoCallback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) {
}
func NewReliabilityManager() (*ReliabilityManager, error) {
Debug("Creating new Reliability Manager")
rm := &ReliabilityManager{}
Debug("Creating new Reliability Manager")
rm := &ReliabilityManager{}
wg := sync.WaitGroup{}
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
@ -179,20 +178,20 @@ func NewReliabilityManager() (*ReliabilityManager, error) {
//export sdsGlobalEventCallback
func sdsGlobalEventCallback(callerRet C.int, msg *C.char, len C.size_t, userData unsafe.Pointer) {
if callerRet == C.RET_OK {
eventStr := C.GoStringN(msg, C.int(len))
rm, ok := rmRegistry[userData] // userData contains rm's ctx
if ok {
rm.OnEvent(eventStr)
}
} else {
if len != 0 {
errMsg := C.GoStringN(msg, C.int(len))
Error("sdsGlobalEventCallback retCode not ok, retCode: %v: %v", callerRet, errMsg)
} else {
Error("sdsGlobalEventCallback retCode not ok, retCode: %v", callerRet)
}
}
if callerRet == C.RET_OK {
eventStr := C.GoStringN(msg, C.int(len))
rm, ok := rmRegistry[userData] // userData contains rm's ctx
if ok {
rm.OnEvent(eventStr)
}
} else {
if len != 0 {
errMsg := C.GoStringN(msg, C.int(len))
Error("sdsGlobalEventCallback retCode not ok, retCode: %v: %v", callerRet, errMsg)
} else {
Error("sdsGlobalEventCallback retCode not ok, retCode: %v", callerRet)
}
}
}
func (rm *ReliabilityManager) Cleanup() error {

View File

@ -1,25 +1,25 @@
package sds
import (
"encoding/json"
"unsafe"
"time"
"encoding/json"
"time"
"unsafe"
)
const requestTimeout = 30 * time.Second
const EventChanBufferSize = 1024
type EventCallbacks struct {
OnMessageReady func(messageId MessageID, channelId string)
OnMessageSent func(messageId MessageID, channelId string)
OnMissingDependencies func(messageId MessageID, missingDeps []MessageID, channelId string)
OnPeriodicSync func()
OnMessageReady func(messageId MessageID, channelId string)
OnMessageSent func(messageId MessageID, channelId string)
OnMissingDependencies func(messageId MessageID, missingDeps []MessageID, channelId string)
OnPeriodicSync func()
}
// ReliabilityManager represents an instance of a nim-sds ReliabilityManager
type ReliabilityManager struct {
rmCtx unsafe.Pointer
callbacks EventCallbacks
rmCtx unsafe.Pointer
callbacks EventCallbacks
}
// The event callback sends back the rm ctx to know to which
@ -30,94 +30,93 @@ type ReliabilityManager struct {
var rmRegistry map[unsafe.Pointer]*ReliabilityManager
func init() {
rmRegistry = make(map[unsafe.Pointer]*ReliabilityManager)
rmRegistry = make(map[unsafe.Pointer]*ReliabilityManager)
}
func registerReliabilityManager(rm *ReliabilityManager) {
_, ok := rmRegistry[rm.rmCtx]
if !ok {
rmRegistry[rm.rmCtx] = rm
}
_, ok := rmRegistry[rm.rmCtx]
if !ok {
rmRegistry[rm.rmCtx] = rm
}
}
func unregisterReliabilityManager(rm *ReliabilityManager) {
delete(rmRegistry, rm.rmCtx)
delete(rmRegistry, rm.rmCtx)
}
type jsonEvent struct {
EventType string `json:"eventType"`
EventType string `json:"eventType"`
}
type msgEvent struct {
MessageId MessageID `json:"messageId"`
ChannelId string `json:"channelId"`
MessageId MessageID `json:"messageId"`
ChannelId string `json:"channelId"`
}
type missingDepsEvent struct {
MessageId MessageID `json:"messageId"`
MissingDeps []MessageID `json:"missingDeps"`
ChannelId string `json:"channelId"`
MessageId MessageID `json:"messageId"`
MissingDeps []MessageID `json:"missingDeps"`
ChannelId string `json:"channelId"`
}
func (rm *ReliabilityManager) RegisterCallbacks(callbacks EventCallbacks) {
rm.callbacks = callbacks
rm.callbacks = callbacks
}
func (rm *ReliabilityManager) OnEvent(eventStr string) {
jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
if err != nil {
Error("could not unmarshal sds event string: %v", err)
return
}
jsonEvent := jsonEvent{}
err := json.Unmarshal([]byte(eventStr), &jsonEvent)
if err != nil {
Error("could not unmarshal sds event string: %v", err)
return
}
switch jsonEvent.EventType {
case "message_ready":
rm.parseMessageReadyEvent(eventStr)
case "message_sent":
rm.parseMessageSentEvent(eventStr)
case "missing_dependencies":
rm.parseMissingDepsEvent(eventStr)
case "periodic_sync":
if rm.callbacks.OnPeriodicSync != nil {
rm.callbacks.OnPeriodicSync()
}
}
switch jsonEvent.EventType {
case "message_ready":
rm.parseMessageReadyEvent(eventStr)
case "message_sent":
rm.parseMessageSentEvent(eventStr)
case "missing_dependencies":
rm.parseMissingDepsEvent(eventStr)
case "periodic_sync":
if rm.callbacks.OnPeriodicSync != nil {
rm.callbacks.OnPeriodicSync()
}
}
}
func (rm *ReliabilityManager) parseMessageReadyEvent(eventStr string) {
msgEvent := msgEvent{}
err := json.Unmarshal([]byte(eventStr), &msgEvent)
if err != nil {
Error("could not parse message ready event %v", err)
}
msgEvent := msgEvent{}
err := json.Unmarshal([]byte(eventStr), &msgEvent)
if err != nil {
Error("could not parse message ready event %v", err)
}
if rm.callbacks.OnMessageReady != nil {
rm.callbacks.OnMessageReady(msgEvent.MessageId, msgEvent.ChannelId)
}
if rm.callbacks.OnMessageReady != nil {
rm.callbacks.OnMessageReady(msgEvent.MessageId, msgEvent.ChannelId)
}
}
func (rm *ReliabilityManager) parseMessageSentEvent(eventStr string) {
msgEvent := msgEvent{}
err := json.Unmarshal([]byte(eventStr), &msgEvent)
if err != nil {
Error("could not parse message sent event %v", err)
}
msgEvent := msgEvent{}
err := json.Unmarshal([]byte(eventStr), &msgEvent)
if err != nil {
Error("could not parse message sent event %v", err)
}
if rm.callbacks.OnMessageSent != nil {
rm.callbacks.OnMessageSent(msgEvent.MessageId, msgEvent.ChannelId)
}
if rm.callbacks.OnMessageSent != nil {
rm.callbacks.OnMessageSent(msgEvent.MessageId, msgEvent.ChannelId)
}
}
func (rm *ReliabilityManager) parseMissingDepsEvent(eventStr string) {
missingDepsEvent := missingDepsEvent{}
err := json.Unmarshal([]byte(eventStr), &missingDepsEvent)
if err != nil {
Error("could not parse missing dependencies event %v", err)
}
missingDepsEvent := missingDepsEvent{}
err := json.Unmarshal([]byte(eventStr), &missingDepsEvent)
if err != nil {
Error("could not parse missing dependencies event %v", err)
}
if rm.callbacks.OnMissingDependencies != nil {
rm.callbacks.OnMissingDependencies(missingDepsEvent.MessageId, missingDepsEvent.MissingDeps, missingDepsEvent.ChannelId)
}
if rm.callbacks.OnMissingDependencies != nil {
rm.callbacks.OnMissingDependencies(missingDepsEvent.MessageId, missingDepsEvent.MissingDeps, missingDepsEvent.ChannelId)
}
}