diff --git a/watch/funcs.go b/watch/funcs.go new file mode 100644 index 0000000000..7593a468fb --- /dev/null +++ b/watch/funcs.go @@ -0,0 +1,41 @@ +package watch + +import ( + "fmt" + + "github.com/armon/consul-api" +) + +// watchFactory is a function that can create a new WatchFunc +// from a parameter configuration +type watchFactory func(params map[string][]string) (WatchFunc, error) + +// watchFuncFactory maps each type to a factory function +var watchFuncFactory map[string]watchFactory + +func init() { + watchFuncFactory = map[string]watchFactory{ + "key": keyWatch, + } +} + +// keyWatch is used to return a key watching function +func keyWatch(params map[string][]string) (WatchFunc, error) { + keys := params["key"] + delete(params, "key") + if len(keys) != 1 { + return nil, fmt.Errorf("Must specify a single key to watch") + } + key := keys[0] + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + kv := p.client.KV() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + pair, meta, err := kv.Get(key, &opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, pair, err + } + return fn, nil +} diff --git a/watch/plan.go b/watch/plan.go new file mode 100644 index 0000000000..a43849fc30 --- /dev/null +++ b/watch/plan.go @@ -0,0 +1,104 @@ +package watch + +import ( + "fmt" + "log" + "reflect" + "time" + + "github.com/armon/consul-api" +) + +const ( + // retryInterval is the base retry value + retryInterval = 5 * time.Second + + // maximum back off time, this is to prevent + // exponential runaway + maxBackoffTime = 180 * time.Second +) + +// Run is used to run a watch plan +func (p *WatchPlan) Run(address string) error { + // Setup the client + p.address = address + conf := consulapi.DefaultConfig() + conf.Address = address + conf.Datacenter = p.Datacenter + // TODO: conf.Token = p.Token + client, err := consulapi.NewClient(conf) + if err != nil { + return fmt.Errorf("Failed to connect to agent: %v", err) + } + p.client = client + + // Loop until we are canceled + failures := 0 + for !p.shouldStop() { + // Invoke the handler + index, result, err := p.Func(p) + + // Check if we should terminate since the function + // could have blocked for a while + if p.shouldStop() { + break + } + + // Handle an error in the watch function + if err != nil { + log.Printf("consul.watch: Watch '%s' errored: %v", p.Query, err) + + // Perform an exponential backoff + failures++ + retry := retryInterval * time.Duration(failures*failures) + if retry > maxBackoffTime { + retry = maxBackoffTime + } + select { + case <-time.After(retry): + continue + case <-p.stopCh: + return nil + } + } + + // Clear the failures + failures = 0 + + // If the index is unchanged do nothing + if index == p.lastIndex { + continue + } + + // Update the index, look for change + p.lastIndex = index + if reflect.DeepEqual(p.lastResult, result) { + continue + } + + // Handle the updated result + p.lastResult = result + p.Handler(index, result) + } + return nil +} + +// Stop is used to stop running the watch plan +func (p *WatchPlan) Stop() { + p.stopLock.Lock() + defer p.stopLock.Unlock() + if p.stop { + return + } + p.stop = true + close(p.stopCh) +} + +func (p *WatchPlan) shouldStop() bool { + select { + case <-p.stopCh: + return true + default: + return false + } +} diff --git a/watch/plan_test.go b/watch/plan_test.go new file mode 100644 index 0000000000..99cae3df53 --- /dev/null +++ b/watch/plan_test.go @@ -0,0 +1,48 @@ +package watch + +import ( + "testing" + "time" +) + +func init() { + watchFuncFactory["noop"] = noopWatch +} + +func noopWatch(params map[string][]string) (WatchFunc, error) { + fn := func(p *WatchPlan) (uint64, interface{}, error) { + idx := p.lastIndex + 1 + return idx, idx, nil + } + return fn, nil +} + +func TestRun_Stop(t *testing.T) { + plan, err := Parse("type:noop") + if err != nil { + t.Fatalf("err: %v", err) + } + var expect uint64 = 1 + plan.Handler = func(idx uint64, val interface{}) { + if idx != expect { + t.Fatalf("Bad: %d %d", expect, idx) + } + if val != expect { + t.Fatalf("Bad: %d %d", expect, val) + } + expect++ + } + + time.AfterFunc(10*time.Millisecond, func() { + plan.Stop() + }) + + err = plan.Run("127.0.0.1:8500") + if err != nil { + t.Fatalf("err: %v", err) + } + + if expect == 1 { + t.Fatalf("Bad: %d", expect) + } +} diff --git a/watch/watch.go b/watch/watch.go index c2785bfe1f..9d88f9a35d 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -3,6 +3,9 @@ package watch import ( "fmt" "strings" + "sync" + + "github.com/armon/consul-api" ) // WatchPlan is the parsed version of a watch specification. A watch provides @@ -10,11 +13,29 @@ import ( // This view is watched for changes and a handler is invoked to take any // appropriate actions. type WatchPlan struct { + Query string Datacenter string Token string Type string + Func WatchFunc + Handler HandlerFunc + + address string + client *consulapi.Client + lastIndex uint64 + lastResult interface{} + + stop bool + stopCh chan struct{} + stopLock sync.Mutex } +// WatchFunc is used to watch for a diff +type WatchFunc func(*WatchPlan) (uint64, interface{}, error) + +// HandlerFunc is used to handle new data +type HandlerFunc func(uint64, interface{}) + // Parse takes a watch query and compiles it into a WatchPlan or an error func Parse(query string) (*WatchPlan, error) { tokens, err := tokenize(query) @@ -22,21 +43,48 @@ func Parse(query string) (*WatchPlan, error) { return nil, fmt.Errorf("Failed to parse: %v", err) } params := collapse(tokens) - plan := &WatchPlan{} + plan := &WatchPlan{ + Query: query, + stopCh: make(chan struct{}), + } - if err := assignValue(params, "type", &plan.Type); err != nil { - return nil, err - } - if plan.Type == "" { - return nil, fmt.Errorf("Watch type must be specified") - } + // Parse the generic parameters if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil { return nil, err } if err := assignValue(params, "token", &plan.Token); err != nil { return nil, err } + if err := assignValue(params, "type", &plan.Type); err != nil { + return nil, err + } + // Ensure there is a watch type + if plan.Type == "" { + return nil, fmt.Errorf("Watch type must be specified") + } + + // Look for a factory function + factory := watchFuncFactory[plan.Type] + if factory == nil { + return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type) + } + + // Get the watch func + fn, err := factory(params) + if err != nil { + return nil, err + } + plan.Func = fn + + // Ensure all parameters are consumed + if len(params) != 0 { + var bad []string + for key := range params { + bad = append(bad, key) + } + return nil, fmt.Errorf("Invalid parameters: %v", bad) + } return plan, nil } diff --git a/watch/watch_test.go b/watch/watch_test.go index 35dc6151eb..121da7378c 100644 --- a/watch/watch_test.go +++ b/watch/watch_test.go @@ -90,7 +90,7 @@ func TestCollapse(t *testing.T) { } func TestParseBasic(t *testing.T) { - p, err := Parse("type:key datacenter:dc2 token:12345") + p, err := Parse("type:key datacenter:dc2 token:12345 key:foo") if err != nil { t.Fatalf("err: %v", err) }