mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
parent
c94751ad43
commit
7e1a860978
@ -534,7 +534,13 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error {
|
|||||||
// Compile the watches
|
// Compile the watches
|
||||||
var watchPlans []*watch.Plan
|
var watchPlans []*watch.Plan
|
||||||
for _, params := range cfg.Watches {
|
for _, params := range cfg.Watches {
|
||||||
// Parse the watches, excluding the handler
|
if handlerType, ok := params["handler_type"]; !ok {
|
||||||
|
params["handler_type"] = "script"
|
||||||
|
} else if handlerType != "http" && handlerType != "script" {
|
||||||
|
return fmt.Errorf("Handler type '%s' not recognized", params["handler_type"])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the watches, excluding 'handler' and 'args'
|
||||||
wp, err := watch.ParseExempt(params, []string{"handler", "args"})
|
wp, err := watch.ParseExempt(params, []string{"handler", "args"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to parse watch (%#v): %v", params, err)
|
return fmt.Errorf("Failed to parse watch (%#v): %v", params, err)
|
||||||
@ -563,11 +569,11 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error {
|
|||||||
} else if hasArgs && !ok {
|
} else if hasArgs && !ok {
|
||||||
return fmt.Errorf("Watch args must be a list of strings")
|
return fmt.Errorf("Watch args must be a list of strings")
|
||||||
}
|
}
|
||||||
if hasHandler && hasArgs {
|
if hasHandler && hasArgs || hasHandler && wp.HandlerType == "http" || hasArgs && wp.HandlerType == "http" {
|
||||||
return fmt.Errorf("Cannot define both watch handler and args")
|
return fmt.Errorf("Only one watch handler allowed")
|
||||||
}
|
}
|
||||||
if !hasHandler && !hasArgs {
|
if !hasHandler && !hasArgs && wp.HandlerType != "http" {
|
||||||
return fmt.Errorf("Must define either watch handler or args")
|
return fmt.Errorf("Must define a watch handler")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the watch plan
|
// Store the watch plan
|
||||||
@ -590,13 +596,14 @@ func (a *Agent) reloadWatches(cfg *config.RuntimeConfig) error {
|
|||||||
for _, wp := range watchPlans {
|
for _, wp := range watchPlans {
|
||||||
a.watchPlans = append(a.watchPlans, wp)
|
a.watchPlans = append(a.watchPlans, wp)
|
||||||
go func(wp *watch.Plan) {
|
go func(wp *watch.Plan) {
|
||||||
var handler interface{}
|
|
||||||
if h, ok := wp.Exempt["handler"]; ok {
|
if h, ok := wp.Exempt["handler"]; ok {
|
||||||
handler = h
|
wp.Handler = makeWatchHandler(a.LogOutput, h)
|
||||||
|
} else if h, ok := wp.Exempt["args"]; ok {
|
||||||
|
wp.Handler = makeWatchHandler(a.LogOutput, h)
|
||||||
} else {
|
} else {
|
||||||
handler = wp.Exempt["args"]
|
httpConfig := wp.Exempt["http_handler_config"].(*watch.HttpHandlerConfig)
|
||||||
|
wp.Handler = makeHTTPWatchHandler(a.LogOutput, httpConfig)
|
||||||
}
|
}
|
||||||
wp.Handler = makeWatchHandler(a.LogOutput, handler)
|
|
||||||
wp.LogOutput = a.LogOutput
|
wp.LogOutput = a.LogOutput
|
||||||
if err := wp.Run(addr); err != nil {
|
if err := wp.Run(addr); err != nil {
|
||||||
a.logger.Printf("[ERR] Failed to run watch: %v", err)
|
a.logger.Printf("[ERR] Failed to run watch: %v", err)
|
||||||
|
@ -10,8 +10,12 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"crypto/tls"
|
||||||
"github.com/armon/circbuf"
|
"github.com/armon/circbuf"
|
||||||
"github.com/hashicorp/consul/watch"
|
"github.com/hashicorp/consul/watch"
|
||||||
|
"github.com/hashicorp/go-cleanhttp"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -87,3 +91,77 @@ func makeWatchHandler(logOutput io.Writer, handler interface{}) watch.HandlerFun
|
|||||||
}
|
}
|
||||||
return fn
|
return fn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeHTTPWatchHandler(logOutput io.Writer, config *watch.HttpHandlerConfig) watch.HandlerFunc {
|
||||||
|
logger := log.New(logOutput, "", log.LstdFlags)
|
||||||
|
|
||||||
|
fn := func(idx uint64, data interface{}) {
|
||||||
|
trans := cleanhttp.DefaultTransport()
|
||||||
|
|
||||||
|
// Skip SSL certificate verification if TLSSkipVerify is true
|
||||||
|
if trans.TLSClientConfig == nil {
|
||||||
|
trans.TLSClientConfig = &tls.Config{
|
||||||
|
InsecureSkipVerify: config.TLSSkipVerify,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trans.TLSClientConfig.InsecureSkipVerify = config.TLSSkipVerify
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, config.Timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Create the HTTP client.
|
||||||
|
httpClient := &http.Client{
|
||||||
|
Transport: trans,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup the input
|
||||||
|
var inp bytes.Buffer
|
||||||
|
enc := json.NewEncoder(&inp)
|
||||||
|
if err := enc.Encode(data); err != nil {
|
||||||
|
logger.Printf("[ERR] agent: Failed to encode data for http watch '%s': %v", config.Path, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest(config.Method, config.Path, &inp)
|
||||||
|
if err != nil {
|
||||||
|
logger.Printf("[ERR] agent: Failed to setup http watch: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
req.Header.Add("X-Consul-Index", strconv.FormatUint(idx, 10))
|
||||||
|
for key, values := range config.Header {
|
||||||
|
for _, val := range values {
|
||||||
|
req.Header.Add(key, val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp, err := httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
logger.Printf("[ERR] agent: Failed to invoke http watch handler '%s': %v", config.Path, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Collect the output
|
||||||
|
output, _ := circbuf.NewBuffer(WatchBufSize)
|
||||||
|
io.Copy(output, resp.Body)
|
||||||
|
|
||||||
|
// Get the output, add a message about truncation
|
||||||
|
outputStr := string(output.Bytes())
|
||||||
|
if output.TotalWritten() > output.Size() {
|
||||||
|
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
|
||||||
|
output.Size(), output.TotalWritten(), outputStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
|
||||||
|
// Log the output
|
||||||
|
logger.Printf("[TRACE] agent: http watch handler '%s' output: %s", config.Path, outputStr)
|
||||||
|
} else {
|
||||||
|
logger.Printf("[ERR] agent: http watch handler '%s' got '%s' with output: %s",
|
||||||
|
config.Path, resp.Status, outputStr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fn
|
||||||
|
}
|
||||||
|
@ -1,9 +1,13 @@
|
|||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/hashicorp/consul/watch"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMakeWatchHandler(t *testing.T) {
|
func TestMakeWatchHandler(t *testing.T) {
|
||||||
@ -28,3 +32,34 @@ func TestMakeWatchHandler(t *testing.T) {
|
|||||||
t.Fatalf("bad: %s", raw)
|
t.Fatalf("bad: %s", raw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMakeHTTPWatchHandler(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
idx := r.Header.Get("X-Consul-Index")
|
||||||
|
if idx != "100" {
|
||||||
|
t.Fatalf("bad: %s", idx)
|
||||||
|
}
|
||||||
|
// Get the first one
|
||||||
|
customHeader := r.Header.Get("X-Custom")
|
||||||
|
if customHeader != "abc" {
|
||||||
|
t.Fatalf("bad: %s", idx)
|
||||||
|
}
|
||||||
|
body, err := ioutil.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if string(body) != "[\"foo\",\"bar\",\"baz\"]\n" {
|
||||||
|
t.Fatalf("bad: %s", body)
|
||||||
|
}
|
||||||
|
w.Write([]byte("Ok, i see"))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
config := watch.HttpHandlerConfig{
|
||||||
|
Path: server.URL,
|
||||||
|
Header: map[string][]string{"X-Custom": {"abc", "def"}},
|
||||||
|
Timeout: time.Minute,
|
||||||
|
}
|
||||||
|
handler := makeHTTPWatchHandler(os.Stderr, &config)
|
||||||
|
handler(100, []string{"foo", "bar", "baz"})
|
||||||
|
}
|
||||||
|
@ -7,17 +7,22 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
|
"github.com/mitchellh/mapstructure"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const DefaultTimeout = 10 * time.Second
|
||||||
|
|
||||||
// Plan is the parsed version of a watch specification. A watch provides
|
// Plan is the parsed version of a watch specification. A watch provides
|
||||||
// the details of a query, which generates a view into the Consul data store.
|
// the details of a query, which generates a view into the Consul data store.
|
||||||
// This view is watched for changes and a handler is invoked to take any
|
// This view is watched for changes and a handler is invoked to take any
|
||||||
// appropriate actions.
|
// appropriate actions.
|
||||||
type Plan struct {
|
type Plan struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
Token string
|
Token string
|
||||||
Type string
|
Type string
|
||||||
Exempt map[string]interface{}
|
HandlerType string
|
||||||
|
Exempt map[string]interface{}
|
||||||
|
|
||||||
Watcher WatcherFunc
|
Watcher WatcherFunc
|
||||||
Handler HandlerFunc
|
Handler HandlerFunc
|
||||||
@ -34,6 +39,15 @@ type Plan struct {
|
|||||||
cancelFunc context.CancelFunc
|
cancelFunc context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type HttpHandlerConfig struct {
|
||||||
|
Path string `mapstructure:"path"`
|
||||||
|
Method string `mapstructure:"method"`
|
||||||
|
Timeout time.Duration `mapstructure:"-"`
|
||||||
|
TimeoutRaw string `mapstructure:"timeout"`
|
||||||
|
Header map[string][]string `mapstructure:"header"`
|
||||||
|
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
|
||||||
|
}
|
||||||
|
|
||||||
// WatcherFunc is used to watch for a diff
|
// WatcherFunc is used to watch for a diff
|
||||||
type WatcherFunc func(*Plan) (uint64, interface{}, error)
|
type WatcherFunc func(*Plan) (uint64, interface{}, error)
|
||||||
|
|
||||||
@ -50,6 +64,7 @@ func Parse(params map[string]interface{}) (*Plan, error) {
|
|||||||
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
|
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
|
||||||
plan := &Plan{
|
plan := &Plan{
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
|
Exempt: make(map[string]interface{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse the generic parameters
|
// Parse the generic parameters
|
||||||
@ -62,12 +77,31 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error)
|
|||||||
if err := assignValue(params, "type", &plan.Type); err != nil {
|
if err := assignValue(params, "type", &plan.Type); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure there is a watch type
|
// Ensure there is a watch type
|
||||||
if plan.Type == "" {
|
if plan.Type == "" {
|
||||||
return nil, fmt.Errorf("Watch type must be specified")
|
return nil, fmt.Errorf("Watch type must be specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the specific handler
|
||||||
|
if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
switch plan.HandlerType {
|
||||||
|
case "http":
|
||||||
|
if _, ok := params["http_handler_config"]; !ok {
|
||||||
|
return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set")
|
||||||
|
}
|
||||||
|
config, err := parseHttpHandlerConfig(params["http_handler_config"])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err))
|
||||||
|
}
|
||||||
|
plan.Exempt["http_handler_config"] = config
|
||||||
|
delete(params, "http_handler_config")
|
||||||
|
|
||||||
|
case "script":
|
||||||
|
// Let the caller check for configuration in exempt parameters
|
||||||
|
}
|
||||||
|
|
||||||
// Look for a factory function
|
// Look for a factory function
|
||||||
factory := watchFuncFactory[plan.Type]
|
factory := watchFuncFactory[plan.Type]
|
||||||
if factory == nil {
|
if factory == nil {
|
||||||
@ -83,7 +117,6 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error)
|
|||||||
|
|
||||||
// Remove the exempt parameters
|
// Remove the exempt parameters
|
||||||
if len(exempt) > 0 {
|
if len(exempt) > 0 {
|
||||||
plan.Exempt = make(map[string]interface{})
|
|
||||||
for _, ex := range exempt {
|
for _, ex := range exempt {
|
||||||
val, ok := params[ex]
|
val, ok := params[ex]
|
||||||
if ok {
|
if ok {
|
||||||
@ -129,3 +162,27 @@ func assignValueBool(params map[string]interface{}, name string, out *bool) erro
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parse the 'http_handler_config' parameters
|
||||||
|
func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) {
|
||||||
|
var config HttpHandlerConfig
|
||||||
|
if err := mapstructure.Decode(configParams, &config); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Path == "" {
|
||||||
|
return nil, fmt.Errorf("Requires 'path' to be set")
|
||||||
|
}
|
||||||
|
if config.Method == "" {
|
||||||
|
config.Method = "POST"
|
||||||
|
}
|
||||||
|
if config.TimeoutRaw == "" {
|
||||||
|
config.Timeout = DefaultTimeout
|
||||||
|
} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil {
|
||||||
|
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err))
|
||||||
|
} else {
|
||||||
|
config.Timeout = timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
return &config, nil
|
||||||
|
}
|
||||||
|
@ -10,7 +10,7 @@ description: |-
|
|||||||
|
|
||||||
Watches are a way of specifying a view of data (e.g. list of nodes, KV pairs, health
|
Watches are a way of specifying a view of data (e.g. list of nodes, KV pairs, health
|
||||||
checks) which is monitored for updates. When an update is detected, an external handler
|
checks) which is monitored for updates. When an update is detected, an external handler
|
||||||
is invoked. A handler can be any executable. As an example, you could watch the status
|
is invoked. A handler can be any executable or HTTP endpoint. As an example, you could watch the status
|
||||||
of health checks and notify an external system when a check is critical.
|
of health checks and notify an external system when a check is critical.
|
||||||
|
|
||||||
Watches are implemented using blocking queries in the [HTTP API](/api/index.html).
|
Watches are implemented using blocking queries in the [HTTP API](/api/index.html).
|
||||||
@ -32,24 +32,67 @@ in a JSON body when using agent configuration or as CLI flags for the watch comm
|
|||||||
## Handlers
|
## Handlers
|
||||||
|
|
||||||
The watch configuration specifies the view of data to be monitored.
|
The watch configuration specifies the view of data to be monitored.
|
||||||
Once that view is updated, the specified handler is invoked. The handler
|
Once that view is updated, the specified handler is invoked. Supported handlers
|
||||||
can be any executable.
|
are any executable or HTTP endpoint. A handler receives JSON formatted data
|
||||||
|
with invocation info, following a format that depends on the type of the watch.
|
||||||
A handler should read its input from stdin and expect to read
|
Each watch type documents the format type. Because they map directly to an HTTP
|
||||||
JSON formatted data. The format of the data depends on the type of the
|
API, handlers should expect the input to match the format of the API. A Consul
|
||||||
watch. Each watch type documents the format type. Because they
|
index is also given, corresponding to the responses from the
|
||||||
map directly to an HTTP API, handlers should expect the input to
|
|
||||||
match the format of the API.
|
|
||||||
|
|
||||||
Additionally, the `CONSUL_INDEX` environment variable will be set.
|
|
||||||
This maps to the `X-Consul-Index` value in responses from the
|
|
||||||
[HTTP API](/api/index.html).
|
[HTTP API](/api/index.html).
|
||||||
|
|
||||||
|
### Executable
|
||||||
|
|
||||||
|
An executable handler reads the JSON invocation info from stdin. Additionally,
|
||||||
|
the `CONSUL_INDEX` environment variable will be set to the Consul index
|
||||||
|
Anything written to stdout is logged.
|
||||||
|
|
||||||
|
Here is an example configuration, where `handler_type` is optionally set to
|
||||||
|
`script`:
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
{
|
||||||
|
"type": "key",
|
||||||
|
"key": "foo/bar/baz",
|
||||||
|
"handler_type": "script",
|
||||||
|
"args": ["/usr/bin/my-service-handler.sh", "-redis"]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
Prior to Consul 1.0, watches used a single `handler` field to define the command to run, and
|
Prior to Consul 1.0, watches used a single `handler` field to define the command to run, and
|
||||||
would always run in a shell. In Consul 1.0, the `args` array was added so that handlers can be
|
would always run in a shell. In Consul 1.0, the `args` array was added so that handlers can be
|
||||||
run without a shell. The `handler` field is deprecated, and you should include the shell in
|
run without a shell. The `handler` field is deprecated, and you should include the shell in
|
||||||
the `args` to run under a shell, eg. `"args": ["sh", "-c", "..."]`.
|
the `args` to run under a shell, eg. `"args": ["sh", "-c", "..."]`.
|
||||||
|
|
||||||
|
### HTTP endpoint
|
||||||
|
|
||||||
|
A HTTP handler sends a HTTP request when a watch is invoked. The JSON
|
||||||
|
invocation info is sent as a payload along the request. Consul index is sent in
|
||||||
|
the header `X-Consul-Index`. Any response is logged.
|
||||||
|
|
||||||
|
The HTTP handler can be configured by setting `handler_type` to `http`. The
|
||||||
|
`http_handler_config` map must provide a `path` field with a URL to the HTTP
|
||||||
|
endpoint. HTTP method is `POST` as a default, but can be set to any method.
|
||||||
|
Though a JSON payload is sent in all cases. The `header`, `timeout` and
|
||||||
|
`tls_skip_verify` field is also optional and configured the same way as in
|
||||||
|
[HTTP checks](/docs/agent/checks.html).
|
||||||
|
|
||||||
|
Here is an example configuration:
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
{
|
||||||
|
"type": "key",
|
||||||
|
"key": "foo/bar/baz",
|
||||||
|
"handler_type": "http",
|
||||||
|
"http_handler_config": {
|
||||||
|
"path":"https://localhost:8000/watch",
|
||||||
|
"method": "POST",
|
||||||
|
"header": {"x-foo":["bar", "baz"]},
|
||||||
|
"timeout": "10s",
|
||||||
|
"tls_skip_verify": false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
## Global Parameters
|
## Global Parameters
|
||||||
|
|
||||||
In addition to the parameters supported by each option type, there
|
In addition to the parameters supported by each option type, there
|
||||||
|
Loading…
x
Reference in New Issue
Block a user