Add monitor http endpoint

This commit is contained in:
Kyle Havlovitz 2016-11-16 16:45:26 -05:00
parent 4dc8bb1426
commit 124f907063
9 changed files with 234 additions and 2 deletions

View File

@ -1,6 +1,7 @@
package api package api
import ( import (
"bufio"
"fmt" "fmt"
) )
@ -411,3 +412,37 @@ func (a *Agent) DisableNodeMaintenance() error {
resp.Body.Close() resp.Body.Close()
return nil return nil
} }
// Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop the
// log stream
func (a *Agent) Monitor(loglevel string, stopCh chan struct{}) (chan string, error) {
r := a.c.newRequest("GET", "/v1/agent/monitor")
if loglevel != "" {
r.params.Add("loglevel", loglevel)
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
logCh := make(chan string, 64)
go func() {
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for {
select {
case <-stopCh:
close(logCh)
return
default:
}
if scanner.Scan() {
logCh <- scanner.Text()
}
}
}()
return logCh, nil
}

View File

@ -3,6 +3,7 @@ package api
import ( import (
"strings" "strings"
"testing" "testing"
"time"
) )
func TestAgent_Self(t *testing.T) { func TestAgent_Self(t *testing.T) {
@ -558,6 +559,29 @@ func TestAgent_ForceLeave(t *testing.T) {
} }
} }
func TestAgent_Monitor(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
agent := c.Agent()
logCh, err := agent.Monitor("info", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the first log message and validate it
select {
case log := <-logCh:
if !strings.Contains(log, "[INFO] raft: Initial configuration") {
t.Fatalf("bad: %q", log)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to get a log message")
}
}
func TestServiceMaintenance(t *testing.T) { func TestServiceMaintenance(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClient(t) c, s := makeClient(t)

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
@ -66,6 +67,9 @@ type Agent struct {
// Output sink for logs // Output sink for logs
logOutput io.Writer logOutput io.Writer
// Used for streaming logs to
logWriter *logger.LogWriter
// We have one of a client or a server, depending // We have one of a client or a server, depending
// on our configuration // on our configuration
server *consul.Server server *consul.Server

View File

@ -2,12 +2,15 @@ package agent
import ( import (
"fmt" "fmt"
"log"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/logutils"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -393,6 +396,61 @@ func (s *HTTPServer) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Re
return nil, nil return nil, nil
} }
func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Only GET supported
if req.Method != "GET" {
resp.WriteHeader(405)
return nil, nil
}
// Get the provided loglevel
logLevel := req.URL.Query().Get("loglevel")
if logLevel == "" {
logLevel = "INFO"
}
// Upper case the log level
logLevel = strings.ToUpper(logLevel)
// Create a level filter
filter := logger.LevelFilter()
filter.MinLevel = logutils.LogLevel(logLevel)
if !logger.ValidateLevelFilter(filter.MinLevel, filter) {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Unknown log level: %s", filter.MinLevel)))
return nil, nil
}
flusher, ok := resp.(http.Flusher)
if !ok {
return nil, fmt.Errorf("Streaming not supported")
}
// Set up a log handler
handler := &httpLogHandler{
filter: filter,
logCh: make(chan string, 512),
logger: s.logger,
}
s.agent.logWriter.RegisterHandler(handler)
defer s.agent.logWriter.DeregisterHandler(handler)
notify := resp.(http.CloseNotifier).CloseNotify()
// Stream logs until the connection is closed
for {
select {
case <-notify:
return nil, nil
case log := <-handler.logCh:
resp.Write([]byte(log + "\n"))
flusher.Flush()
}
}
return nil, nil
}
// syncChanges is a helper function which wraps a blocking call to sync // syncChanges is a helper function which wraps a blocking call to sync
// services and checks to the server. If the operation fails, we only // services and checks to the server. If the operation fails, we only
// only warn because the write did succeed and anti-entropy will sync later. // only warn because the write did succeed and anti-entropy will sync later.
@ -401,3 +459,27 @@ func (s *HTTPServer) syncChanges() {
s.logger.Printf("[ERR] agent: failed to sync changes: %v", err) s.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
} }
} }
type httpLogHandler struct {
filter *logutils.LevelFilter
logCh chan string
logger *log.Logger
}
func (h *httpLogHandler) HandleLog(log string) {
// Check the log level
if !h.filter.Check([]byte(log)) {
return
}
// Do a non-blocking send
select {
case h.logCh <- log:
default:
// We can't log synchronously, since we are already being invoked
// from the logWriter, and a log will need to invoke Write() which
// already holds the lock. We must therefor do the log async, so
// as to not deadlock
go h.logger.Printf("[WARN] Dropping logs to monitor http endpoint")
}
}

View File

@ -1,8 +1,10 @@
package agent package agent
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
@ -12,6 +14,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -1019,3 +1022,69 @@ func TestHTTPAgentRegisterServiceCheck(t *testing.T) {
t.Fatalf("bad: %#v", result["memcached_check2"]) t.Fatalf("bad: %#v", result["memcached_check2"])
} }
} }
func TestHTTPAgent_Monitor(t *testing.T) {
logWriter := logger.NewLogWriter(512)
expectedLogs := bytes.Buffer{}
logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter)
dir, srv := makeHTTPServerWithConfigLog(t, nil, logger)
srv.agent.logWriter = logWriter
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Begin streaming logs from the monitor endpoint
req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil)
resp := newClosableRecorder()
go func() {
if _, err := srv.AgentMonitor(resp, req); err != nil {
t.Fatalf("err: %s", err)
}
}()
// Write the incoming logs to a channel for reading
logCh := make(chan string, 0)
go func() {
for {
line, err := resp.Body.ReadString('\n')
if err != nil && err != io.EOF {
t.Fatalf("err: %v", err)
}
if line != "" {
logCh <- line
}
}
}()
// Verify that the first 5 logs we get match the expected stream
for i := 0; i < 5; i++ {
select {
case log := <-logCh:
expected, err := expectedLogs.ReadString('\n')
if err != nil {
t.Fatalf("err: %v", err)
}
if log != expected {
t.Fatalf("bad: %q %q", expected, log)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to get log within timeout")
}
}
}
type closableRecorder struct {
*httptest.ResponseRecorder
closer chan bool
}
func newClosableRecorder() *closableRecorder {
r := httptest.NewRecorder()
closer := make(chan bool)
return &closableRecorder{r, closer}
}
func (r *closableRecorder) CloseNotify() <-chan bool {
return r.closer
}

View File

@ -471,6 +471,7 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
return err return err
} }
agent.logWriter = logWriter
c.agent = agent c.agent = agent
// Setup the RPC listener // Setup the RPC listener

View File

@ -251,6 +251,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
} }
s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf))
s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor))
s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))

View File

@ -28,6 +28,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) {
} }
func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) { func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) {
return makeHTTPServerWithConfigLog(t, cb, nil)
}
func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer) (string, *HTTPServer) {
configTry := 0 configTry := 0
RECONF: RECONF:
configTry += 1 configTry += 1
@ -36,7 +40,7 @@ RECONF:
cb(conf) cb(conf)
} }
dir, agent := makeAgent(t, conf) dir, agent := makeAgentLog(t, conf, l)
servers, err := NewHTTPServers(agent, conf, agent.logOutput) servers, err := NewHTTPServers(agent, conf, agent.logOutput)
if err != nil { if err != nil {
if configTry < 3 { if configTry < 3 {

View File

@ -21,6 +21,7 @@ The following endpoints are supported:
* [`/v1/agent/members`](#agent_members) : Returns the members as seen by the local serf agent * [`/v1/agent/members`](#agent_members) : Returns the members as seen by the local serf agent
* [`/v1/agent/self`](#agent_self) : Returns the local node configuration * [`/v1/agent/self`](#agent_self) : Returns the local node configuration
* [`/v1/agent/maintenance`](#agent_maintenance) : Manages node maintenance mode * [`/v1/agent/maintenance`](#agent_maintenance) : Manages node maintenance mode
* [`/v1/agent/monitor`](#agent_monitor) : Streams logs from the local agent
* [`/v1/agent/join/<address>`](#agent_join) : Triggers the local agent to join a node * [`/v1/agent/join/<address>`](#agent_join) : Triggers the local agent to join a node
* [`/v1/agent/force-leave/<node>`](#agent_force_leave): Forces removal of a node * [`/v1/agent/force-leave/<node>`](#agent_force_leave): Forces removal of a node
* [`/v1/agent/check/register`](#agent_check_register) : Registers a new local check * [`/v1/agent/check/register`](#agent_check_register) : Registers a new local check
@ -211,6 +212,17 @@ to aid human operators. If no reason is provided, a default value will be used i
The return code is 200 on success. The return code is 200 on success.
### <a name="agent_monitor"></a> /v1/agent/monitor
Added in Consul 0.7.2, This endpoint is hit with a GET and will stream logs from the
local agent until the connection is closed.
The `?loglevel` flag is optional. If provided, its value should be a text string
containing a log level to filter on, such as `info`. If no loglevel is provided,
`info` will be used as a default.
The return code is 200 on success.
### <a name="agent_join"></a> /v1/agent/join/\<address\> ### <a name="agent_join"></a> /v1/agent/join/\<address\>
This endpoint is hit with a GET and is used to instruct the agent to attempt to This endpoint is hit with a GET and is used to instruct the agent to attempt to