consul/watch/plan.go

130 lines
2.4 KiB
Go
Raw Normal View History

2014-08-20 13:45:34 -07:00
package watch
import (
"fmt"
"log"
"os"
2014-08-20 13:45:34 -07:00
"reflect"
"time"
consulapi "github.com/hashicorp/consul/api"
2014-08-20 13:45:34 -07:00
)
const (
// retryInterval is the base retry value
retryInterval = 5 * time.Second
// maximum back off time, this is to prevent
// exponential runaway
maxBackoffTime = 180 * time.Second
)
// Run is used to run a watch plan
2017-04-20 17:46:29 -07:00
func (p *Plan) Run(address string) error {
2014-08-20 13:45:34 -07:00
// Setup the client
p.address = address
conf := consulapi.DefaultConfig()
conf.Address = address
conf.Datacenter = p.Datacenter
2014-08-20 16:45:37 -07:00
conf.Token = p.Token
2014-08-20 13:45:34 -07:00
client, err := consulapi.NewClient(conf)
if err != nil {
return fmt.Errorf("Failed to connect to agent: %v", err)
}
p.client = client
// Create the logger
output := p.LogOutput
if output == nil {
output = os.Stderr
}
logger := log.New(output, "", log.LstdFlags)
2014-08-20 13:45:34 -07:00
// Loop until we are canceled
failures := 0
2014-08-20 15:18:08 -07:00
OUTER:
2014-08-20 13:45:34 -07:00
for !p.shouldStop() {
// Invoke the handler
2017-04-20 17:46:29 -07:00
index, result, err := p.Watcher(p)
2014-08-20 13:45:34 -07:00
// Check if we should terminate since the function
// could have blocked for a while
if p.shouldStop() {
break
}
// Handle an error in the watch function
if err != nil {
// Perform an exponential backoff
failures++
2017-01-17 09:02:39 +01:00
p.lastIndex = 0
2014-08-20 13:45:34 -07:00
retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime {
retry = maxBackoffTime
}
2018-03-21 15:56:14 +00:00
logger.Printf("[ERR] consul.watch: Watch (type: %s) errored: %v, retry in %v",
p.Type, err, retry)
2014-08-20 13:45:34 -07:00
select {
case <-time.After(retry):
2014-08-20 15:18:08 -07:00
continue OUTER
2014-08-20 13:45:34 -07:00
case <-p.stopCh:
return nil
}
}
// Clear the failures
failures = 0
// If the index is unchanged do nothing
if index == p.lastIndex {
continue
}
// Update the index, look for change
2014-08-21 17:24:20 -07:00
oldIndex := p.lastIndex
2014-08-20 13:45:34 -07:00
p.lastIndex = index
2014-08-21 17:24:20 -07:00
if oldIndex != 0 && reflect.DeepEqual(p.lastResult, result) {
2014-08-20 13:45:34 -07:00
continue
}
if p.lastIndex < oldIndex {
p.lastIndex = 0
}
2014-08-20 13:45:34 -07:00
// Handle the updated result
p.lastResult = result
2014-08-20 15:18:08 -07:00
if p.Handler != nil {
p.Handler(index, result)
}
2014-08-20 13:45:34 -07:00
}
return nil
}
// Stop is used to stop running the watch plan
2017-04-20 17:46:29 -07:00
func (p *Plan) Stop() {
2014-08-20 13:45:34 -07:00
p.stopLock.Lock()
defer p.stopLock.Unlock()
if p.stop {
return
}
p.stop = true
if p.cancelFunc != nil {
p.cancelFunc()
}
2014-08-20 13:45:34 -07:00
close(p.stopCh)
}
2017-04-20 17:46:29 -07:00
func (p *Plan) shouldStop() bool {
2014-08-20 13:45:34 -07:00
select {
case <-p.stopCh:
return true
default:
return false
}
}
func (p *Plan) IsStopped() bool {
p.stopLock.Lock()
defer p.stopLock.Unlock()
return p.stop
}