fix_: status backend server websocket IO wait (#6154)

This commit is contained in:
frank 2024-12-06 17:29:11 +08:00 committed by GitHub
parent 50933aa328
commit 81133570e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 35 additions and 7 deletions

View File

@ -53,10 +53,26 @@ func (s *Server) signalHandler(data []byte) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
for connection := range s.connections { deleteConnection := func(connection *websocket.Conn) {
err := connection.WriteMessage(websocket.TextMessage, data) delete(s.connections, connection)
err := connection.Close()
if err != nil { if err != nil {
log.Error("failed to write message: %w", err) log.Error("failed to close connection", "error", err)
}
}
for connection := range s.connections {
err := connection.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err != nil {
log.Error("failed to set write deadline", "error", err)
deleteConnection(connection)
continue
}
err = connection.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Error("failed to write signal message", "error", err)
deleteConnection(connection)
} }
} }
} }
@ -130,6 +146,7 @@ func (s *Server) signals(w http.ResponseWriter, r *http.Request) {
log.Error("failed to upgrade connection: %w", err) log.Error("failed to upgrade connection: %w", err)
return return
} }
log.Debug("new websocket connection")
s.connections[connection] = struct{}{} s.connections[connection] = struct{}{}
} }

View File

@ -318,7 +318,7 @@ func main() {
// Check if profiling shall be enabled. // Check if profiling shall be enabled.
if *pprofEnabled { if *pprofEnabled {
profiling.NewProfiler(*pprofPort).Go() profiling.NewProfiler(fmt.Sprintf(":%d", *pprofPort)).Go()
} }
if config.PushNotificationServerConfig.Enabled { if config.PushNotificationServerConfig.Enabled {

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"runtime"
"unsafe" "unsafe"
"go.uber.org/zap" "go.uber.org/zap"
@ -1071,6 +1072,17 @@ func writeHeapProfile(dataDir string) string { //nolint: deadcode
return makeJSONResponse(err) return makeJSONResponse(err)
} }
// StartProfiling starts profiling and HTTP server for pprof
func StartProfiling(address string) string {
return callWithResponse(startProfiling, address)
}
func startProfiling(address string) string {
runtime.SetMutexProfileFraction(5)
profiling.NewProfiler(address).Go()
return makeJSONResponse(nil)
}
func makeJSONResponse(err error) string { func makeJSONResponse(err error) string {
errString := "" errString := ""
if err != nil { if err != nil {

View File

@ -1,7 +1,6 @@
package profiling package profiling
import ( import (
"fmt"
"net/http" "net/http"
hpprof "net/http/pprof" hpprof "net/http/pprof"
"time" "time"
@ -19,7 +18,7 @@ type Profiler struct {
// NewProfiler creates an instance of the profiler with // NewProfiler creates an instance of the profiler with
// the given port. // the given port.
func NewProfiler(port int) *Profiler { func NewProfiler(address string) *Profiler {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", hpprof.Index) mux.HandleFunc("/debug/pprof/", hpprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", hpprof.Cmdline) mux.HandleFunc("/debug/pprof/cmdline", hpprof.Cmdline)
@ -28,7 +27,7 @@ func NewProfiler(port int) *Profiler {
mux.HandleFunc("/debug/pprof/trace", hpprof.Trace) mux.HandleFunc("/debug/pprof/trace", hpprof.Trace)
p := Profiler{ p := Profiler{
server: &http.Server{ server: &http.Server{
Addr: fmt.Sprintf(":%d", port), Addr: address,
ReadHeaderTimeout: 5 * time.Second, ReadHeaderTimeout: 5 * time.Second,
Handler: mux, Handler: mux,
}, },