agent: Adding support to edge trigger consul server coming up for state sync

This commit is contained in:
Armon Dadgar 2014-02-07 12:03:31 -08:00
parent e27aa3e21d
commit f8bd1a1ac3
1 changed files with 17 additions and 0 deletions

View File

@ -46,6 +46,10 @@ type localState struct {
checks map[string]*structs.HealthCheck checks map[string]*structs.HealthCheck
checkStatus map[string]syncStatus checkStatus map[string]syncStatus
// consulCh is used to inform of a change to the known
// consul nodes. This may be used to retry a sync run
consulCh chan struct{}
// triggerCh is used to inform of a change to local state // triggerCh is used to inform of a change to local state
// that requires anti-entropy with the server // that requires anti-entropy with the server
triggerCh chan struct{} triggerCh chan struct{}
@ -60,6 +64,7 @@ func (l *localState) Init(config *Config, iface consul.Interface, logger *log.Lo
l.serviceStatus = make(map[string]syncStatus) l.serviceStatus = make(map[string]syncStatus)
l.checks = make(map[string]*structs.HealthCheck) l.checks = make(map[string]*structs.HealthCheck)
l.checkStatus = make(map[string]syncStatus) l.checkStatus = make(map[string]syncStatus)
l.consulCh = make(chan struct{}, 1)
l.triggerCh = make(chan struct{}, 1) l.triggerCh = make(chan struct{}, 1)
} }
@ -71,6 +76,16 @@ func (l *localState) changeMade() {
} }
} }
// ConsulServerUp is used to inform that a new consul server is now
// up. This can be used to speed up the sync process if we are blocking
// waiting to discover a consul server
func (l *localState) ConsulServerUp() {
select {
case l.consulCh <- struct{}{}:
default:
}
}
// Pause is used to pause state syncronization, this can be // Pause is used to pause state syncronization, this can be
// used to make batch changes // used to make batch changes
func (l *localState) Pause() { func (l *localState) Pause() {
@ -199,6 +214,8 @@ SYNC:
if err := l.setSyncState(); err != nil { if err := l.setSyncState(); err != nil {
l.logger.Printf("[ERR] agent: failed to sync remote state: %v", err) l.logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
select { select {
case <-l.consulCh:
continue
case <-time.After(aeScale(syncRetryIntv, len(l.iface.LANMembers()))): case <-time.After(aeScale(syncRetryIntv, len(l.iface.LANMembers()))):
continue continue
case <-shutdownCh: case <-shutdownCh: