consul/watch/watch.go

130 lines
3.0 KiB
Go
Raw Normal View History

2014-08-20 18:19:43 +00:00
package watch
import (
"fmt"
"io"
2014-08-20 20:45:34 +00:00
"sync"
"github.com/armon/consul-api"
2014-08-20 18:19:43 +00:00
)
// WatchPlan is the parsed version of a watch specification. A watch provides
// the details of a query, which generates a view into the Consul data store.
// This view is watched for changes and a handler is invoked to take any
// appropriate actions.
type WatchPlan struct {
Datacenter string
Token string
Type string
2014-08-21 18:38:44 +00:00
Exempt map[string]interface{}
Func WatchFunc
Handler HandlerFunc
LogOutput io.Writer
2014-08-20 20:45:34 +00:00
address string
client *consulapi.Client
lastIndex uint64
lastResult interface{}
stop bool
stopCh chan struct{}
stopLock sync.Mutex
2014-08-20 18:19:43 +00:00
}
2014-08-20 20:45:34 +00:00
// WatchFunc is used to watch for a diff
type WatchFunc func(*WatchPlan) (uint64, interface{}, error)
// HandlerFunc is used to handle new data
type HandlerFunc func(uint64, interface{})
2014-08-20 18:19:43 +00:00
// Parse takes a watch query and compiles it into a WatchPlan or an error
2014-08-21 18:38:44 +00:00
func Parse(params map[string]interface{}) (*WatchPlan, error) {
return ParseExempt(params, nil)
2014-08-20 23:38:15 +00:00
}
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
// Any exempt parameters are stored in the Exempt map
2014-08-21 18:38:44 +00:00
func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, error) {
2014-08-20 20:45:34 +00:00
plan := &WatchPlan{
stopCh: make(chan struct{}),
}
2014-08-20 18:19:43 +00:00
2014-08-20 20:45:34 +00:00
// Parse the generic parameters
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
return nil, err
}
if err := assignValue(params, "token", &plan.Token); err != nil {
return nil, err
}
2014-08-20 18:19:43 +00:00
if err := assignValue(params, "type", &plan.Type); err != nil {
return nil, err
}
2014-08-20 20:45:34 +00:00
// Ensure there is a watch type
2014-08-20 18:19:43 +00:00
if plan.Type == "" {
return nil, fmt.Errorf("Watch type must be specified")
}
2014-08-20 20:45:34 +00:00
// Look for a factory function
factory := watchFuncFactory[plan.Type]
if factory == nil {
return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
2014-08-20 18:19:43 +00:00
}
2014-08-20 20:45:34 +00:00
// Get the watch func
fn, err := factory(params)
if err != nil {
2014-08-20 18:19:43 +00:00
return nil, err
}
2014-08-20 20:45:34 +00:00
plan.Func = fn
2014-08-20 18:19:43 +00:00
2014-08-20 23:38:15 +00:00
// Remove the exempt parameters
if len(exempt) > 0 {
2014-08-21 18:38:44 +00:00
plan.Exempt = make(map[string]interface{})
2014-08-20 23:38:15 +00:00
for _, ex := range exempt {
val, ok := params[ex]
if ok {
plan.Exempt[ex] = val
delete(params, ex)
}
}
}
2014-08-20 20:45:34 +00:00
// Ensure all parameters are consumed
if len(params) != 0 {
var bad []string
for key := range params {
bad = append(bad, key)
}
return nil, fmt.Errorf("Invalid parameters: %v", bad)
}
2014-08-20 18:19:43 +00:00
return plan, nil
}
2014-08-21 18:38:44 +00:00
// assignValue is used to extract a value ensuring it is a string
func assignValue(params map[string]interface{}, name string, out *string) error {
if raw, ok := params[name]; ok {
val, ok := raw.(string)
if !ok {
return fmt.Errorf("Expecting %s to be a string")
2014-08-20 18:19:43 +00:00
}
2014-08-21 18:38:44 +00:00
*out = val
2014-08-20 18:19:43 +00:00
delete(params, name)
}
return nil
}
2014-08-21 18:38:44 +00:00
// assignValueBool is used to extract a value ensuring it is a bool
func assignValueBool(params map[string]interface{}, name string, out *bool) error {
if raw, ok := params[name]; ok {
val, ok := raw.(bool)
if !ok {
return fmt.Errorf("Expecting %s to be a boolean")
2014-08-20 18:19:43 +00:00
}
2014-08-21 18:38:44 +00:00
*out = val
delete(params, name)
2014-08-20 18:19:43 +00:00
}
2014-08-21 18:38:44 +00:00
return nil
2014-08-20 18:19:43 +00:00
}