From 81133570e4f77a43724a7df1fbff2dd90ce0bbf4 Mon Sep 17 00:00:00 2001 From: frank Date: Fri, 6 Dec 2024 17:29:11 +0800 Subject: [PATCH] fix_: status backend server websocket IO wait (#6154) --- cmd/status-backend/server/server.go | 23 ++++++++++++++++++++--- cmd/statusd/main.go | 2 +- mobile/status.go | 12 ++++++++++++ profiling/profiler.go | 5 ++--- 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/cmd/status-backend/server/server.go b/cmd/status-backend/server/server.go index e2608659a..bfc05f85e 100644 --- a/cmd/status-backend/server/server.go +++ b/cmd/status-backend/server/server.go @@ -53,10 +53,26 @@ func (s *Server) signalHandler(data []byte) { s.lock.Lock() defer s.lock.Unlock() - for connection := range s.connections { - err := connection.WriteMessage(websocket.TextMessage, data) + deleteConnection := func(connection *websocket.Conn) { + delete(s.connections, connection) + err := connection.Close() if err != nil { - log.Error("failed to write message: %w", err) + log.Error("failed to close connection", "error", err) + } + } + + for connection := range s.connections { + err := connection.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err != nil { + log.Error("failed to set write deadline", "error", err) + deleteConnection(connection) + continue + } + + err = connection.WriteMessage(websocket.TextMessage, data) + if err != nil { + log.Error("failed to write signal message", "error", err) + deleteConnection(connection) } } } @@ -130,6 +146,7 @@ func (s *Server) signals(w http.ResponseWriter, r *http.Request) { log.Error("failed to upgrade connection: %w", err) return } + log.Debug("new websocket connection") s.connections[connection] = struct{}{} } diff --git a/cmd/statusd/main.go b/cmd/statusd/main.go index 982055937..541fdc9a5 100644 --- a/cmd/statusd/main.go +++ b/cmd/statusd/main.go @@ -318,7 +318,7 @@ func main() { // Check if profiling shall be enabled. if *pprofEnabled { - profiling.NewProfiler(*pprofPort).Go() + profiling.NewProfiler(fmt.Sprintf(":%d", *pprofPort)).Go() } if config.PushNotificationServerConfig.Enabled { diff --git a/mobile/status.go b/mobile/status.go index 1255bc588..cde0f6d05 100644 --- a/mobile/status.go +++ b/mobile/status.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "runtime" "unsafe" "go.uber.org/zap" @@ -1071,6 +1072,17 @@ func writeHeapProfile(dataDir string) string { //nolint: deadcode return makeJSONResponse(err) } +// StartProfiling starts profiling and HTTP server for pprof +func StartProfiling(address string) string { + return callWithResponse(startProfiling, address) +} + +func startProfiling(address string) string { + runtime.SetMutexProfileFraction(5) + profiling.NewProfiler(address).Go() + return makeJSONResponse(nil) +} + func makeJSONResponse(err error) string { errString := "" if err != nil { diff --git a/profiling/profiler.go b/profiling/profiler.go index 0820d0433..a3a50b6b1 100644 --- a/profiling/profiler.go +++ b/profiling/profiler.go @@ -1,7 +1,6 @@ package profiling import ( - "fmt" "net/http" hpprof "net/http/pprof" "time" @@ -19,7 +18,7 @@ type Profiler struct { // NewProfiler creates an instance of the profiler with // the given port. -func NewProfiler(port int) *Profiler { +func NewProfiler(address string) *Profiler { mux := http.NewServeMux() mux.HandleFunc("/debug/pprof/", hpprof.Index) mux.HandleFunc("/debug/pprof/cmdline", hpprof.Cmdline) @@ -28,7 +27,7 @@ func NewProfiler(port int) *Profiler { mux.HandleFunc("/debug/pprof/trace", hpprof.Trace) p := Profiler{ server: &http.Server{ - Addr: fmt.Sprintf(":%d", port), + Addr: address, ReadHeaderTimeout: 5 * time.Second, Handler: mux, },