diff --git a/.changelog/10368.txt b/.changelog/10368.txt new file mode 100644 index 0000000000..7df442df76 --- /dev/null +++ b/.changelog/10368.txt @@ -0,0 +1,3 @@ +```release-note:improvement +monitoring: optimize the monitoring endpoint to avoid losing logs when under high load. +``` diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 148026a93a..2f611a76fe 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" @@ -1191,6 +1192,9 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( // a gzip stream it will go ahead and write out the HTTP response header resp.Write([]byte("")) flusher.Flush() + const flushDelay = 200 * time.Millisecond + flushTicker := time.NewTicker(flushDelay) + defer flushTicker.Stop() // Stream logs until the connection is closed. for { @@ -1200,9 +1204,13 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( if droppedCount > 0 { s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount) } + flusher.Flush() return nil, nil + case log := <-logsCh: fmt.Fprint(resp, string(log)) + + case <-flushTicker.C: flusher.Flush() } } diff --git a/logging/monitor/monitor.go b/logging/monitor/monitor.go index a8e4e52324..e286bf76c7 100644 --- a/logging/monitor/monitor.go +++ b/logging/monitor/monitor.go @@ -2,6 +2,7 @@ package monitor import ( "errors" + "sync" log "github.com/hashicorp/go-hclog" ) @@ -80,21 +81,22 @@ func (d *monitor) Stop() int { // received log messages over the returned channel. func (d *monitor) Start() <-chan []byte { // register our sink with the logger - d.logger.RegisterSink(d.sink) streamCh := make(chan []byte, d.bufSize) - // run a go routine that listens for streamed // log messages and sends them to streamCh + + wg := new(sync.WaitGroup) + wg.Add(1) go func() { defer close(streamCh) - + wg.Done() for { select { - case log := <-d.logCh: + case logLine := <-d.logCh: select { case <-d.doneCh: return - case streamCh <- log: + case streamCh <- logLine: } case <-d.doneCh: return @@ -102,6 +104,9 @@ func (d *monitor) Start() <-chan []byte { } }() + //wait for the consumer loop to start before registering the sink to avoid filling the log channel + wg.Wait() + d.logger.RegisterSink(d.sink) return streamCh }