consul/watch/funcs.go

225 lines
5.9 KiB
Go
Raw Permalink Normal View History

2014-08-20 13:45:34 -07:00
package watch
import (
"fmt"
consulapi "github.com/hashicorp/consul/api"
2014-08-20 13:45:34 -07:00
)
// watchFactory is a function that can create a new WatchFunc
// from a parameter configuration
2014-08-21 11:38:44 -07:00
type watchFactory func(params map[string]interface{}) (WatchFunc, error)
2014-08-20 13:45:34 -07:00
// watchFuncFactory maps each type to a factory function
var watchFuncFactory map[string]watchFactory
func init() {
watchFuncFactory = map[string]watchFactory{
2014-08-20 15:22:22 -07:00
"key": keyWatch,
"keyprefix": keyPrefixWatch,
2014-08-20 15:29:31 -07:00
"services": servicesWatch,
2014-08-20 15:33:13 -07:00
"nodes": nodesWatch,
2014-08-20 15:50:32 -07:00
"service": serviceWatch,
2014-08-20 16:32:12 -07:00
"checks": checksWatch,
2014-08-28 15:41:06 -07:00
"event": eventWatch,
2014-08-20 13:45:34 -07:00
}
}
// keyWatch is used to return a key watching function
2014-08-21 11:38:44 -07:00
func keyWatch(params map[string]interface{}) (WatchFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-20 15:50:32 -07:00
var key string
if err := assignValue(params, "key", &key); err != nil {
return nil, err
}
if key == "" {
2014-08-20 13:45:34 -07:00
return nil, fmt.Errorf("Must specify a single key to watch")
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
2014-08-20 13:45:34 -07:00
pair, meta, err := kv.Get(key, &opts)
if err != nil {
return 0, nil, err
}
2014-08-20 15:18:08 -07:00
if pair == nil {
return meta.LastIndex, nil, err
}
2014-08-20 13:45:34 -07:00
return meta.LastIndex, pair, err
}
return fn, nil
}
2014-08-20 15:22:22 -07:00
// keyPrefixWatch is used to return a key prefix watching function
2014-08-21 11:38:44 -07:00
func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-20 15:50:32 -07:00
var prefix string
if err := assignValue(params, "prefix", &prefix); err != nil {
return nil, err
}
if prefix == "" {
2014-08-20 15:22:22 -07:00
return nil, fmt.Errorf("Must specify a single prefix to watch")
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
2014-08-20 15:22:22 -07:00
pairs, meta, err := kv.List(prefix, &opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, pairs, err
}
return fn, nil
}
2014-08-20 15:29:31 -07:00
// servicesWatch is used to watch the list of available services
2014-08-21 11:38:44 -07:00
func servicesWatch(params map[string]interface{}) (WatchFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-20 15:29:31 -07:00
fn := func(p *WatchPlan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
2014-08-20 15:29:31 -07:00
services, meta, err := catalog.Services(&opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, services, err
}
return fn, nil
}
2014-08-20 15:33:13 -07:00
// nodesWatch is used to watch the list of available nodes
2014-08-21 11:38:44 -07:00
func nodesWatch(params map[string]interface{}) (WatchFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-20 15:33:13 -07:00
fn := func(p *WatchPlan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
2014-08-20 15:33:13 -07:00
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, nodes, err
}
return fn, nil
}
2014-08-20 15:50:32 -07:00
// serviceWatch is used to watch a specific service for changes
2014-08-21 11:38:44 -07:00
func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-21 11:38:44 -07:00
var service, tag string
2014-08-20 15:50:32 -07:00
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if service == "" {
return nil, fmt.Errorf("Must specify a single service to watch")
}
if err := assignValue(params, "tag", &tag); err != nil {
return nil, err
}
passingOnly := false
2014-08-21 11:38:44 -07:00
if err := assignValueBool(params, "passingonly", &passingOnly); err != nil {
return nil, err
2014-08-20 15:50:32 -07:00
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
2014-08-20 15:50:32 -07:00
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, nodes, err
}
return fn, nil
}
2014-08-20 16:32:12 -07:00
// checksWatch is used to watch a specific checks in a given state
2014-08-21 11:38:44 -07:00
func checksWatch(params map[string]interface{}) (WatchFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
2014-08-20 16:32:12 -07:00
var service, state string
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if err := assignValue(params, "state", &state); err != nil {
return nil, err
}
if service != "" && state != "" {
return nil, fmt.Errorf("Cannot specify service and state")
}
if service == "" && state == "" {
state = "any"
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
2014-08-20 16:32:12 -07:00
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
if state != "" {
checks, meta, err = health.State(state, &opts)
} else {
checks, meta, err = health.Checks(service, &opts)
}
if err != nil {
return 0, nil, err
}
return meta.LastIndex, checks, err
}
return fn, nil
}
2014-08-28 15:41:06 -07:00
// eventWatch is used to watch for events, optionally filtering on name
func eventWatch(params map[string]interface{}) (WatchFunc, error) {
// The stale setting doesn't apply to events.
2014-08-28 15:41:06 -07:00
var name string
if err := assignValue(params, "name", &name); err != nil {
return nil, err
}
fn := func(p *WatchPlan) (uint64, interface{}, error) {
event := p.client.Event()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
events, meta, err := event.List(name, &opts)
if err != nil {
return 0, nil, err
}
// Prune to only the new events
for i := 0; i < len(events); i++ {
if event.IDToIndex(events[i].ID) == p.lastIndex {
events = events[i+1:]
break
}
}
return meta.LastIndex, events, err
}
return fn, nil
}