From c8ba2d40fd8d6915c3e469bfa5bb252f7ab31b70 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 15 Jun 2021 12:05:52 -0400 Subject: [PATCH] improve monitor performance (#10368) * remove flush for each write to http response in the agent monitor endpoint * fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover. * start log reading goroutine before adding the sink to avoid filling the log channel before getting a chance of reading from it * flush every 500ms to optimize log writing in the http server side. * add changelog file * add issue url to changelog * fix changelog url * Update changelog Co-authored-by: Daniel Nephin * use ticker to flush and avoid race condition when flushing in a different goroutine * stop the ticker when done Co-authored-by: Daniel Nephin * Revert "fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover." This reverts commit 1eeddf7a * wait for log consumer loop to start before registering the sink Co-authored-by: Daniel Nephin --- .changelog/10368.txt | 3 +++ agent/agent_endpoint.go | 8 ++++++++ logging/monitor/monitor.go | 15 ++++++++++----- 3 files changed, 21 insertions(+), 5 deletions(-) create mode 100644 .changelog/10368.txt diff --git a/.changelog/10368.txt b/.changelog/10368.txt new file mode 100644 index 0000000000..7df442df76 --- /dev/null +++ b/.changelog/10368.txt @@ -0,0 +1,3 @@ +```release-note:improvement +monitoring: optimize the monitoring endpoint to avoid losing logs when under high load. +``` diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index add31531bb..8543c5d663 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -5,6 +5,7 @@ import ( "net/http" "strconv" "strings" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" @@ -1204,6 +1205,9 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) // a gzip stream it will go ahead and write out the HTTP response header resp.Write([]byte("")) flusher.Flush() + const flushDelay = 200 * time.Millisecond + flushTicker := time.NewTicker(flushDelay) + defer flushTicker.Stop() // Stream logs until the connection is closed. for { @@ -1213,9 +1217,13 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) if droppedCount > 0 { s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount) } + flusher.Flush() return nil, nil + case log := <-logsCh: fmt.Fprint(resp, string(log)) + + case <-flushTicker.C: flusher.Flush() } } diff --git a/logging/monitor/monitor.go b/logging/monitor/monitor.go index a8e4e52324..e286bf76c7 100644 --- a/logging/monitor/monitor.go +++ b/logging/monitor/monitor.go @@ -2,6 +2,7 @@ package monitor import ( "errors" + "sync" log "github.com/hashicorp/go-hclog" ) @@ -80,21 +81,22 @@ func (d *monitor) Stop() int { // received log messages over the returned channel. func (d *monitor) Start() <-chan []byte { // register our sink with the logger - d.logger.RegisterSink(d.sink) streamCh := make(chan []byte, d.bufSize) - // run a go routine that listens for streamed // log messages and sends them to streamCh + + wg := new(sync.WaitGroup) + wg.Add(1) go func() { defer close(streamCh) - + wg.Done() for { select { - case log := <-d.logCh: + case logLine := <-d.logCh: select { case <-d.doneCh: return - case streamCh <- log: + case streamCh <- logLine: } case <-d.doneCh: return @@ -102,6 +104,9 @@ func (d *monitor) Start() <-chan []byte { } }() + //wait for the consumer loop to start before registering the sink to avoid filling the log channel + wg.Wait() + d.logger.RegisterSink(d.sink) return streamCh }