consul/api/watch/watch.go
Matt Keeler 222afeae4c
Move the watch package into the api module (#5664)
* Move the watch package into the api module

It was already just a thin wrapper around the API anyways. The biggest change was to the testing. Instead of using a test agent directly from the agent package it now uses the binary on the PATH just like the other API tests.

The other big changes were to fix up the connect based watch tests so that we didn’t need to pull in the connect package (and therefore all of Consul)
2019-04-26 12:33:01 -04:00

259 lines
7.5 KiB
Go

package watch
import (
"context"
"fmt"
"io"
"sync"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/mitchellh/mapstructure"
)
const DefaultTimeout = 10 * time.Second
// Plan is the parsed version of a watch specification. A watch provides
// the details of a query, which generates a view into the Consul data store.
// This view is watched for changes and a handler is invoked to take any
// appropriate actions.
type Plan struct {
Datacenter string
Token string
Type string
HandlerType string
Exempt map[string]interface{}
Watcher WatcherFunc
// Handler is kept for backward compatibility but only supports watches based
// on index param. To support hash based watches, set HybridHandler instead.
Handler HandlerFunc
HybridHandler HybridHandlerFunc
LogOutput io.Writer
address string
client *consulapi.Client
lastParamVal BlockingParamVal
lastResult interface{}
stop bool
stopCh chan struct{}
stopLock sync.Mutex
cancelFunc context.CancelFunc
}
type HttpHandlerConfig struct {
Path string `mapstructure:"path"`
Method string `mapstructure:"method"`
Timeout time.Duration `mapstructure:"-"`
TimeoutRaw string `mapstructure:"timeout"`
Header map[string][]string `mapstructure:"header"`
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
}
// BlockingParamVal is an interface representing the common operations needed for
// different styles of blocking. It's used to abstract the core watch plan from
// whether we are performing index-based or hash-based blocking.
type BlockingParamVal interface {
// Equal returns whether the other param value should be considered equal
// (i.e. representing no change in the watched resource). Equal must not panic
// if other is nil.
Equal(other BlockingParamVal) bool
// Next is called when deciding which value to use on the next blocking call.
// It assumes the BlockingParamVal value it is called on is the most recent one
// returned and passes the previous one which may be nil as context. This
// allows types to customize logic around ordering without assuming there is
// an order. For example WaitIndexVal can check that the index didn't go
// backwards and if it did then reset to 0. Most other cases should just
// return themselves (the most recent value) to be used in the next request.
Next(previous BlockingParamVal) BlockingParamVal
}
// WaitIndexVal is a type representing a Consul index that implements
// BlockingParamVal.
type WaitIndexVal uint64
// Equal implements BlockingParamVal
func (idx WaitIndexVal) Equal(other BlockingParamVal) bool {
if otherIdx, ok := other.(WaitIndexVal); ok {
return idx == otherIdx
}
return false
}
// Next implements BlockingParamVal
func (idx WaitIndexVal) Next(previous BlockingParamVal) BlockingParamVal {
if previous == nil {
return idx
}
prevIdx, ok := previous.(WaitIndexVal)
if ok && prevIdx > idx {
// This value is smaller than the previous index, reset.
return WaitIndexVal(0)
}
return idx
}
// WaitHashVal is a type representing a Consul content hash that implements
// BlockingParamVal.
type WaitHashVal string
// Equal implements BlockingParamVal
func (h WaitHashVal) Equal(other BlockingParamVal) bool {
if otherHash, ok := other.(WaitHashVal); ok {
return h == otherHash
}
return false
}
// Next implements BlockingParamVal
func (h WaitHashVal) Next(previous BlockingParamVal) BlockingParamVal {
return h
}
// WatcherFunc is used to watch for a diff.
type WatcherFunc func(*Plan) (BlockingParamVal, interface{}, error)
// HandlerFunc is used to handle new data. It only works for index-based watches
// (which is almost all end points currently) and is kept for backwards
// compatibility until more places can make use of hash-based watches too.
type HandlerFunc func(uint64, interface{})
// HybridHandlerFunc is used to handle new data. It can support either
// index-based or hash-based watches via the BlockingParamVal.
type HybridHandlerFunc func(BlockingParamVal, interface{})
// Parse takes a watch query and compiles it into a WatchPlan or an error
func Parse(params map[string]interface{}) (*Plan, error) {
return ParseExempt(params, nil)
}
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
// Any exempt parameters are stored in the Exempt map
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
plan := &Plan{
stopCh: make(chan struct{}),
Exempt: make(map[string]interface{}),
}
// Parse the generic parameters
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
return nil, err
}
if err := assignValue(params, "token", &plan.Token); err != nil {
return nil, err
}
if err := assignValue(params, "type", &plan.Type); err != nil {
return nil, err
}
// Ensure there is a watch type
if plan.Type == "" {
return nil, fmt.Errorf("Watch type must be specified")
}
// Get the specific handler
if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil {
return nil, err
}
switch plan.HandlerType {
case "http":
if _, ok := params["http_handler_config"]; !ok {
return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set")
}
config, err := parseHttpHandlerConfig(params["http_handler_config"])
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err))
}
plan.Exempt["http_handler_config"] = config
delete(params, "http_handler_config")
case "script":
// Let the caller check for configuration in exempt parameters
}
// Look for a factory function
factory := watchFuncFactory[plan.Type]
if factory == nil {
return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
}
// Get the watch func
fn, err := factory(params)
if err != nil {
return nil, err
}
plan.Watcher = fn
// Remove the exempt parameters
if len(exempt) > 0 {
for _, ex := range exempt {
val, ok := params[ex]
if ok {
plan.Exempt[ex] = val
delete(params, ex)
}
}
}
// Ensure all parameters are consumed
if len(params) != 0 {
var bad []string
for key := range params {
bad = append(bad, key)
}
return nil, fmt.Errorf("Invalid parameters: %v", bad)
}
return plan, nil
}
// assignValue is used to extract a value ensuring it is a string
func assignValue(params map[string]interface{}, name string, out *string) error {
if raw, ok := params[name]; ok {
val, ok := raw.(string)
if !ok {
return fmt.Errorf("Expecting %s to be a string", name)
}
*out = val
delete(params, name)
}
return nil
}
// assignValueBool is used to extract a value ensuring it is a bool
func assignValueBool(params map[string]interface{}, name string, out *bool) error {
if raw, ok := params[name]; ok {
val, ok := raw.(bool)
if !ok {
return fmt.Errorf("Expecting %s to be a boolean", name)
}
*out = val
delete(params, name)
}
return nil
}
// Parse the 'http_handler_config' parameters
func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) {
var config HttpHandlerConfig
if err := mapstructure.Decode(configParams, &config); err != nil {
return nil, err
}
if config.Path == "" {
return nil, fmt.Errorf("Requires 'path' to be set")
}
if config.Method == "" {
config.Method = "POST"
}
if config.TimeoutRaw == "" {
config.Timeout = DefaultTimeout
} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil {
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err))
} else {
config.Timeout = timeout
}
return &config, nil
}