mirror of https://github.com/status-im/consul.git
improve monitor performance (#10368)
* remove flush for each write to http response in the agent monitor endpoint * fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover. * start log reading goroutine before adding the sink to avoid filling the log channel before getting a chance of reading from it * flush every 500ms to optimize log writing in the http server side. * add changelog file * add issue url to changelog * fix changelog url * Update changelog Co-authored-by: Daniel Nephin <dnephin@hashicorp.com> * use ticker to flush and avoid race condition when flushing in a different goroutine * stop the ticker when done Co-authored-by: Daniel Nephin <dnephin@hashicorp.com> * Revert "fix race condition when we stop and start monitor multiple times, the doneCh is closed and never recover." This reverts commit 1eeddf7a * wait for log consumer loop to start before registering the sink Co-authored-by: Daniel Nephin <dnephin@hashicorp.com>
This commit is contained in:
parent
2f1f98088e
commit
c8ba2d40fd
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
monitoring: optimize the monitoring endpoint to avoid losing logs when under high load.
|
||||||
|
```
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
|
@ -1204,6 +1205,9 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request)
|
||||||
// a gzip stream it will go ahead and write out the HTTP response header
|
// a gzip stream it will go ahead and write out the HTTP response header
|
||||||
resp.Write([]byte(""))
|
resp.Write([]byte(""))
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
|
const flushDelay = 200 * time.Millisecond
|
||||||
|
flushTicker := time.NewTicker(flushDelay)
|
||||||
|
defer flushTicker.Stop()
|
||||||
|
|
||||||
// Stream logs until the connection is closed.
|
// Stream logs until the connection is closed.
|
||||||
for {
|
for {
|
||||||
|
@ -1213,9 +1217,13 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request)
|
||||||
if droppedCount > 0 {
|
if droppedCount > 0 {
|
||||||
s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount)
|
s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount)
|
||||||
}
|
}
|
||||||
|
flusher.Flush()
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
||||||
case log := <-logsCh:
|
case log := <-logsCh:
|
||||||
fmt.Fprint(resp, string(log))
|
fmt.Fprint(resp, string(log))
|
||||||
|
|
||||||
|
case <-flushTicker.C:
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package monitor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
log "github.com/hashicorp/go-hclog"
|
log "github.com/hashicorp/go-hclog"
|
||||||
)
|
)
|
||||||
|
@ -80,21 +81,22 @@ func (d *monitor) Stop() int {
|
||||||
// received log messages over the returned channel.
|
// received log messages over the returned channel.
|
||||||
func (d *monitor) Start() <-chan []byte {
|
func (d *monitor) Start() <-chan []byte {
|
||||||
// register our sink with the logger
|
// register our sink with the logger
|
||||||
d.logger.RegisterSink(d.sink)
|
|
||||||
streamCh := make(chan []byte, d.bufSize)
|
streamCh := make(chan []byte, d.bufSize)
|
||||||
|
|
||||||
// run a go routine that listens for streamed
|
// run a go routine that listens for streamed
|
||||||
// log messages and sends them to streamCh
|
// log messages and sends them to streamCh
|
||||||
|
|
||||||
|
wg := new(sync.WaitGroup)
|
||||||
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(streamCh)
|
defer close(streamCh)
|
||||||
|
wg.Done()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case log := <-d.logCh:
|
case logLine := <-d.logCh:
|
||||||
select {
|
select {
|
||||||
case <-d.doneCh:
|
case <-d.doneCh:
|
||||||
return
|
return
|
||||||
case streamCh <- log:
|
case streamCh <- logLine:
|
||||||
}
|
}
|
||||||
case <-d.doneCh:
|
case <-d.doneCh:
|
||||||
return
|
return
|
||||||
|
@ -102,6 +104,9 @@ func (d *monitor) Start() <-chan []byte {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
//wait for the consumer loop to start before registering the sink to avoid filling the log channel
|
||||||
|
wg.Wait()
|
||||||
|
d.logger.RegisterSink(d.sink)
|
||||||
return streamCh
|
return streamCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue