mirror of https://github.com/status-im/consul.git
local state: replace multi-map state with structs
The state of the service and health check records was spread out over multiple maps guarded by a single lock. Access to the maps has to happen in a coordinated effort and the tests often violated this which made them brittle and racy. This patch replaces the multiple maps with a single one for both checks and services to make the code less fragile. This is also necessary since moving the local state into its own package creates circular dependencies for the tests. To avoid this the tests can no longer access internal data structures which they should not be doing in the first place. The tests still don't compile but this is a ncessary step in that direction.
This commit is contained in:
parent
6027a9e2a5
commit
0a9ac9749e
|
@ -1386,29 +1386,31 @@ OUTER:
|
||||||
|
|
||||||
// reapServicesInternal does a single pass, looking for services to reap.
|
// reapServicesInternal does a single pass, looking for services to reap.
|
||||||
func (a *Agent) reapServicesInternal() {
|
func (a *Agent) reapServicesInternal() {
|
||||||
reaped := make(map[string]struct{})
|
reaped := make(map[string]bool)
|
||||||
for checkID, check := range a.state.CriticalChecks() {
|
for checkID, cs := range a.state.CriticalCheckStates() {
|
||||||
|
serviceID := cs.Check.ServiceID
|
||||||
|
|
||||||
// There's nothing to do if there's no service.
|
// There's nothing to do if there's no service.
|
||||||
if check.Check.ServiceID == "" {
|
if serviceID == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// There might be multiple checks for one service, so
|
// There might be multiple checks for one service, so
|
||||||
// we don't need to reap multiple times.
|
// we don't need to reap multiple times.
|
||||||
serviceID := check.Check.ServiceID
|
if reaped[serviceID] {
|
||||||
if _, ok := reaped[serviceID]; ok {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if there's a timeout.
|
// See if there's a timeout.
|
||||||
|
// todo(fs): this looks fishy... why is there anoter data structure in the agent with its own lock?
|
||||||
a.checkLock.Lock()
|
a.checkLock.Lock()
|
||||||
timeout, ok := a.checkReapAfter[checkID]
|
timeout := a.checkReapAfter[checkID]
|
||||||
a.checkLock.Unlock()
|
a.checkLock.Unlock()
|
||||||
|
|
||||||
// Reap, if necessary. We keep track of which service
|
// Reap, if necessary. We keep track of which service
|
||||||
// this is so that we won't try to remove it again.
|
// this is so that we won't try to remove it again.
|
||||||
if ok && check.CriticalFor > timeout {
|
if timeout > 0 && cs.CriticalFor() > timeout {
|
||||||
reaped[serviceID] = struct{}{}
|
reaped[serviceID] = true
|
||||||
a.RemoveService(serviceID, true)
|
a.RemoveService(serviceID, true)
|
||||||
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
|
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
|
||||||
checkID, serviceID)
|
checkID, serviceID)
|
||||||
|
|
|
@ -21,12 +21,6 @@ import (
|
||||||
// permissionDenied is returned when an ACL based rejection happens.
|
// permissionDenied is returned when an ACL based rejection happens.
|
||||||
const permissionDenied = "Permission denied"
|
const permissionDenied = "Permission denied"
|
||||||
|
|
||||||
// syncStatus is used to represent the difference between
|
|
||||||
// the local and remote state, and if action needs to be taken
|
|
||||||
type syncStatus struct {
|
|
||||||
inSync bool // Is this in sync with the server
|
|
||||||
}
|
|
||||||
|
|
||||||
// Config is the configuration for the State. It is
|
// Config is the configuration for the State. It is
|
||||||
// populated during NewLocalAgent from the agent configuration to avoid
|
// populated during NewLocalAgent from the agent configuration to avoid
|
||||||
// race conditions with the agent configuration.
|
// race conditions with the agent configuration.
|
||||||
|
@ -40,6 +34,62 @@ type Config struct {
|
||||||
TaggedAddresses map[string]string
|
TaggedAddresses map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServiceState describes the state of a service record.
|
||||||
|
type ServiceState struct {
|
||||||
|
// Service is the local copy of the service record.
|
||||||
|
Service *structs.NodeService
|
||||||
|
|
||||||
|
// Token is the ACL to update the service record on the server.
|
||||||
|
Token string
|
||||||
|
|
||||||
|
// InSync contains whether the local state of the service record
|
||||||
|
// is in sync with the remote state on the server.
|
||||||
|
InSync bool
|
||||||
|
|
||||||
|
// Deleted is true when the service record has been marked as deleted
|
||||||
|
// but has not been removed on the server yet.
|
||||||
|
Deleted bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckState describes the state of a health check record.
|
||||||
|
type CheckState struct {
|
||||||
|
// Check is the local copy of the health check record.
|
||||||
|
Check *structs.HealthCheck
|
||||||
|
|
||||||
|
// Token is the ACL record to update the health check record
|
||||||
|
// on the server.
|
||||||
|
Token string
|
||||||
|
|
||||||
|
// CriticalTime is the last time the health check status went
|
||||||
|
// from non-critical to critical. When the health check is not
|
||||||
|
// in critical state the value is the zero value.
|
||||||
|
CriticalTime time.Time
|
||||||
|
|
||||||
|
// DeferCheck is used to delay the sync of a health check when
|
||||||
|
// only the status has changed.
|
||||||
|
// todo(fs): ^^ this needs double checking...
|
||||||
|
DeferCheck *time.Timer
|
||||||
|
|
||||||
|
// InSync contains whether the local state of the health check
|
||||||
|
// record is in sync with the remote state on the server.
|
||||||
|
InSync bool
|
||||||
|
|
||||||
|
// Deleted is true when the health check record has been marked as
|
||||||
|
// deleted but has not been removed on the server yet.
|
||||||
|
Deleted bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Critical returns true when the health check is in critical state.
|
||||||
|
func (c *CheckState) Critical() bool {
|
||||||
|
return !c.CriticalTime.IsZero()
|
||||||
|
}
|
||||||
|
|
||||||
|
// CriticalFor returns the amount of time the service has been in critical
|
||||||
|
// state. Its value is undefined when the service is not in critical state.
|
||||||
|
func (c *CheckState) CriticalFor() time.Duration {
|
||||||
|
return time.Since(c.CriticalTime)
|
||||||
|
}
|
||||||
|
|
||||||
type delegate interface {
|
type delegate interface {
|
||||||
RPC(method string, args interface{}, reply interface{}) error
|
RPC(method string, args interface{}, reply interface{}) error
|
||||||
}
|
}
|
||||||
|
@ -62,18 +112,10 @@ type State struct {
|
||||||
nodeInfoInSync bool
|
nodeInfoInSync bool
|
||||||
|
|
||||||
// Services tracks the local services
|
// Services tracks the local services
|
||||||
services map[string]*structs.NodeService
|
services map[string]*ServiceState
|
||||||
serviceStatus map[string]syncStatus
|
|
||||||
serviceTokens map[string]string
|
|
||||||
|
|
||||||
// Checks tracks the local checks
|
// Checks tracks the local checks
|
||||||
checks map[types.CheckID]*structs.HealthCheck
|
checks map[types.CheckID]*CheckState
|
||||||
checkStatus map[types.CheckID]syncStatus
|
|
||||||
checkTokens map[types.CheckID]string
|
|
||||||
checkCriticalTime map[types.CheckID]time.Time
|
|
||||||
|
|
||||||
// Used to track checks that are being deferred
|
|
||||||
deferCheck map[types.CheckID]*time.Timer
|
|
||||||
|
|
||||||
// metadata tracks the local metadata fields
|
// metadata tracks the local metadata fields
|
||||||
metadata map[string]string
|
metadata map[string]string
|
||||||
|
@ -86,6 +128,7 @@ type State struct {
|
||||||
// is stored in the raft log.
|
// is stored in the raft log.
|
||||||
discardCheckOutput atomic.Value // bool
|
discardCheckOutput atomic.Value // bool
|
||||||
|
|
||||||
|
// tokens contains the ACL tokens
|
||||||
tokens *token.Store
|
tokens *token.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,14 +137,8 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan stru
|
||||||
l := &State{
|
l := &State{
|
||||||
config: c,
|
config: c,
|
||||||
logger: lg,
|
logger: lg,
|
||||||
services: make(map[string]*structs.NodeService),
|
services: make(map[string]*ServiceState),
|
||||||
serviceStatus: make(map[string]syncStatus),
|
checks: make(map[types.CheckID]*CheckState),
|
||||||
serviceTokens: make(map[string]string),
|
|
||||||
checks: make(map[types.CheckID]*structs.HealthCheck),
|
|
||||||
checkStatus: make(map[types.CheckID]syncStatus),
|
|
||||||
checkTokens: make(map[types.CheckID]string),
|
|
||||||
checkCriticalTime: make(map[types.CheckID]time.Time),
|
|
||||||
deferCheck: make(map[types.CheckID]*time.Timer),
|
|
||||||
metadata: make(map[string]string),
|
metadata: make(map[string]string),
|
||||||
triggerCh: triggerCh,
|
triggerCh: triggerCh,
|
||||||
tokens: tokens,
|
tokens: tokens,
|
||||||
|
@ -137,7 +174,10 @@ func (l *State) ServiceToken(id string) string {
|
||||||
|
|
||||||
// serviceToken returns an ACL token associated with a service.
|
// serviceToken returns an ACL token associated with a service.
|
||||||
func (l *State) serviceToken(id string) string {
|
func (l *State) serviceToken(id string) string {
|
||||||
token := l.serviceTokens[id]
|
var token string
|
||||||
|
if s := l.services[id]; s != nil {
|
||||||
|
token = s.Token
|
||||||
|
}
|
||||||
if token == "" {
|
if token == "" {
|
||||||
token = l.tokens.UserToken()
|
token = l.tokens.UserToken()
|
||||||
}
|
}
|
||||||
|
@ -147,37 +187,48 @@ func (l *State) serviceToken(id string) string {
|
||||||
// AddService is used to add a service entry to the local state.
|
// AddService is used to add a service entry to the local state.
|
||||||
// This entry is persistent and the agent will make a best effort to
|
// This entry is persistent and the agent will make a best effort to
|
||||||
// ensure it is registered
|
// ensure it is registered
|
||||||
func (l *State) AddService(service *structs.NodeService, token string) {
|
// todo(fs): where is the persistence happening?
|
||||||
// Assign the ID if none given
|
func (l *State) AddService(service *structs.NodeService, token string) error {
|
||||||
if service.ID == "" && service.Service != "" {
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
|
||||||
|
if service == nil {
|
||||||
|
return fmt.Errorf("no service")
|
||||||
|
}
|
||||||
|
|
||||||
|
// use the service name as id if the id was omitted
|
||||||
|
// todo(fs): is this for backwards compatibility?
|
||||||
|
if service.ID == "" {
|
||||||
service.ID = service.Service
|
service.ID = service.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
l.Lock()
|
l.services[service.ID] = &ServiceState{
|
||||||
defer l.Unlock()
|
Service: service,
|
||||||
|
Token: token,
|
||||||
l.services[service.ID] = service
|
}
|
||||||
l.serviceStatus[service.ID] = syncStatus{}
|
|
||||||
l.serviceTokens[service.ID] = token
|
|
||||||
l.changeMade()
|
l.changeMade()
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveService is used to remove a service entry from the local state.
|
// RemoveService is used to remove a service entry from the local state.
|
||||||
// The agent will make a best effort to ensure it is deregistered
|
// The agent will make a best effort to ensure it is deregistered.
|
||||||
func (l *State) RemoveService(serviceID string) error {
|
func (l *State) RemoveService(id string) error {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
if _, ok := l.services[serviceID]; ok {
|
s := l.services[id]
|
||||||
delete(l.services, serviceID)
|
if s == nil || s.Deleted {
|
||||||
// Leave the service token around, if any, until we successfully
|
return fmt.Errorf("Service %q does not exist", id)
|
||||||
// delete the service.
|
|
||||||
l.serviceStatus[serviceID] = syncStatus{inSync: false}
|
|
||||||
l.changeMade()
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("Service does not exist")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// To remove the service on the server we need the token.
|
||||||
|
// Therefore, we mark the service as deleted and keep the
|
||||||
|
// entry around until it is actually removed.
|
||||||
|
s.InSync = false
|
||||||
|
s.Deleted = true
|
||||||
|
l.changeMade()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,20 +237,28 @@ func (l *State) RemoveService(serviceID string) error {
|
||||||
func (l *State) Service(id string) *structs.NodeService {
|
func (l *State) Service(id string) *structs.NodeService {
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
return l.services[id]
|
|
||||||
|
s := l.services[id]
|
||||||
|
if s == nil || s.Deleted {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return s.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
// Services returns the locally registered services that the
|
// Services returns the locally registered services that the
|
||||||
// agent is aware of and are being kept in sync with the server
|
// agent is aware of and are being kept in sync with the server
|
||||||
func (l *State) Services() map[string]*structs.NodeService {
|
func (l *State) Services() map[string]*structs.NodeService {
|
||||||
services := make(map[string]*structs.NodeService)
|
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
|
|
||||||
for name, serv := range l.services {
|
m := make(map[string]*structs.NodeService)
|
||||||
services[name] = serv
|
for id, s := range l.services {
|
||||||
|
if s.Deleted {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
return services
|
m[id] = s.Service
|
||||||
|
}
|
||||||
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckToken is used to return the configured health check token for a
|
// CheckToken is used to return the configured health check token for a
|
||||||
|
@ -211,8 +270,12 @@ func (l *State) CheckToken(checkID types.CheckID) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkToken returns an ACL token associated with a check.
|
// checkToken returns an ACL token associated with a check.
|
||||||
func (l *State) checkToken(checkID types.CheckID) string {
|
func (l *State) checkToken(id types.CheckID) string {
|
||||||
token := l.checkTokens[checkID]
|
var token string
|
||||||
|
c := l.checks[id]
|
||||||
|
if c != nil {
|
||||||
|
token = c.Token
|
||||||
|
}
|
||||||
if token == "" {
|
if token == "" {
|
||||||
token = l.tokens.UserToken()
|
token = l.tokens.UserToken()
|
||||||
}
|
}
|
||||||
|
@ -226,8 +289,9 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
// Set the node name
|
if check == nil {
|
||||||
check.Node = l.config.NodeName
|
return fmt.Errorf("no check")
|
||||||
|
}
|
||||||
|
|
||||||
if l.discardCheckOutput.Load().(bool) {
|
if l.discardCheckOutput.Load().(bool) {
|
||||||
check.Output = ""
|
check.Output = ""
|
||||||
|
@ -236,38 +300,51 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
|
||||||
// if there is a serviceID associated with the check, make sure it exists before adding it
|
// if there is a serviceID associated with the check, make sure it exists before adding it
|
||||||
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
|
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
|
||||||
if check.ServiceID != "" && l.services[check.ServiceID] == nil {
|
if check.ServiceID != "" && l.services[check.ServiceID] == nil {
|
||||||
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
|
return fmt.Errorf("Check %q refers to non-existent service %q does not exist", check.CheckID, check.ServiceID)
|
||||||
}
|
}
|
||||||
|
|
||||||
l.checks[check.CheckID] = check
|
// hard-set the node name
|
||||||
l.checkStatus[check.CheckID] = syncStatus{}
|
check.Node = l.config.NodeName
|
||||||
l.checkTokens[check.CheckID] = token
|
|
||||||
delete(l.checkCriticalTime, check.CheckID)
|
l.checks[check.CheckID] = &CheckState{
|
||||||
|
Check: check,
|
||||||
|
Token: token,
|
||||||
|
}
|
||||||
l.changeMade()
|
l.changeMade()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveCheck is used to remove a health check from the local state.
|
// RemoveCheck is used to remove a health check from the local state.
|
||||||
// The agent will make a best effort to ensure it is deregistered
|
// The agent will make a best effort to ensure it is deregistered
|
||||||
func (l *State) RemoveCheck(checkID types.CheckID) {
|
// todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well.
|
||||||
|
// todo(fs): Check code that calls this to handle the error.
|
||||||
|
func (l *State) RemoveCheck(id types.CheckID) error {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
delete(l.checks, checkID)
|
c := l.checks[id]
|
||||||
// Leave the check token around, if any, until we successfully delete
|
if c == nil || c.Deleted {
|
||||||
// the check.
|
return fmt.Errorf("Check %q does not exist", id)
|
||||||
delete(l.checkCriticalTime, checkID)
|
}
|
||||||
l.checkStatus[checkID] = syncStatus{inSync: false}
|
|
||||||
|
// To remove the check on the server we need the token.
|
||||||
|
// Therefore, we mark the service as deleted and keep the
|
||||||
|
// entry around until it is actually removed.
|
||||||
|
c.InSync = false
|
||||||
|
c.Deleted = true
|
||||||
l.changeMade()
|
l.changeMade()
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateCheck is used to update the status of a check
|
// UpdateCheck is used to update the status of a check
|
||||||
func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
|
func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
check, ok := l.checks[checkID]
|
c := l.checks[id]
|
||||||
if !ok {
|
if c == nil || c.Deleted {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,16 +355,15 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
|
||||||
// Update the critical time tracking (this doesn't cause a server updates
|
// Update the critical time tracking (this doesn't cause a server updates
|
||||||
// so we can always keep this up to date).
|
// so we can always keep this up to date).
|
||||||
if status == api.HealthCritical {
|
if status == api.HealthCritical {
|
||||||
_, wasCritical := l.checkCriticalTime[checkID]
|
if !c.Critical() {
|
||||||
if !wasCritical {
|
c.CriticalTime = time.Now()
|
||||||
l.checkCriticalTime[checkID] = time.Now()
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
delete(l.checkCriticalTime, checkID)
|
c.CriticalTime = time.Time{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do nothing if update is idempotent
|
// Do nothing if update is idempotent
|
||||||
if check.Status == status && check.Output == output {
|
if c.Check.Status == status && c.Check.Output == output {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -295,28 +371,34 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
|
||||||
// frequent updates of output. Instead, we update the output internally,
|
// frequent updates of output. Instead, we update the output internally,
|
||||||
// and periodically do a write-back to the servers. If there is a status
|
// and periodically do a write-back to the servers. If there is a status
|
||||||
// change we do the write immediately.
|
// change we do the write immediately.
|
||||||
if l.config.CheckUpdateInterval > 0 && check.Status == status {
|
if l.config.CheckUpdateInterval > 0 && c.Check.Status == status {
|
||||||
check.Output = output
|
c.Check.Output = output
|
||||||
if _, ok := l.deferCheck[checkID]; !ok {
|
if c.DeferCheck == nil {
|
||||||
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval)
|
d := l.config.CheckUpdateInterval
|
||||||
deferSync := time.AfterFunc(intv, func() {
|
intv := time.Duration(uint64(d)/2) + lib.RandomStagger(d)
|
||||||
|
c.DeferCheck = time.AfterFunc(intv, func() {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
if _, ok := l.checkStatus[checkID]; ok {
|
defer l.Unlock()
|
||||||
l.checkStatus[checkID] = syncStatus{inSync: false}
|
|
||||||
l.changeMade()
|
c := l.checks[id]
|
||||||
|
if c == nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
delete(l.deferCheck, checkID)
|
c.DeferCheck = nil
|
||||||
l.Unlock()
|
if c.Deleted {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.InSync = false
|
||||||
|
l.changeMade()
|
||||||
})
|
})
|
||||||
l.deferCheck[checkID] = deferSync
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update status and mark out of sync
|
// Update status and mark out of sync
|
||||||
check.Status = status
|
c.Check.Status = status
|
||||||
check.Output = output
|
c.Check.Output = output
|
||||||
l.checkStatus[checkID] = syncStatus{inSync: false}
|
c.InSync = false
|
||||||
l.changeMade()
|
l.changeMade()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,7 +407,12 @@ func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
|
||||||
func (l *State) Check(id types.CheckID) *structs.HealthCheck {
|
func (l *State) Check(id types.CheckID) *structs.HealthCheck {
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
return l.checks[id]
|
|
||||||
|
c := l.checks[id]
|
||||||
|
if c == nil || c.Deleted {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.Check
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks returns the locally registered checks that the
|
// Checks returns the locally registered checks that the
|
||||||
|
@ -334,78 +421,83 @@ func (l *State) Checks() map[types.CheckID]*structs.HealthCheck {
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
|
|
||||||
checks := make(map[types.CheckID]*structs.HealthCheck)
|
m := make(map[types.CheckID]*structs.HealthCheck)
|
||||||
for id, c := range l.checks {
|
for id, c := range l.checks {
|
||||||
|
if c.Deleted {
|
||||||
|
continue
|
||||||
|
}
|
||||||
c2 := new(structs.HealthCheck)
|
c2 := new(structs.HealthCheck)
|
||||||
*c2 = *c
|
*c2 = *c.Check
|
||||||
checks[id] = c2
|
m[id] = c2
|
||||||
}
|
}
|
||||||
return checks
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// CriticalCheck is used to return the duration a check has been critical along
|
// CriticalCheckStates returns the locally registered checks that the
|
||||||
// with its associated health check.
|
// agent is aware of and are being kept in sync with the server
|
||||||
type CriticalCheck struct {
|
func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
|
||||||
CriticalFor time.Duration
|
|
||||||
Check *structs.HealthCheck
|
|
||||||
}
|
|
||||||
|
|
||||||
// CriticalChecks returns locally registered health checks that the agent is
|
|
||||||
// aware of and are being kept in sync with the server, and that are in a
|
|
||||||
// critical state. This also returns information about how long each check has
|
|
||||||
// been critical.
|
|
||||||
func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck {
|
|
||||||
checks := make(map[types.CheckID]CriticalCheck)
|
|
||||||
|
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
|
|
||||||
now := time.Now()
|
m := make(map[types.CheckID]*CheckState)
|
||||||
for checkID, criticalTime := range l.checkCriticalTime {
|
for id, c := range l.checks {
|
||||||
checks[checkID] = CriticalCheck{
|
if c.Deleted || !c.Critical() {
|
||||||
CriticalFor: now.Sub(criticalTime),
|
continue
|
||||||
Check: l.checks[checkID],
|
|
||||||
}
|
}
|
||||||
|
m[id] = c
|
||||||
}
|
}
|
||||||
|
return m
|
||||||
return checks
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metadata returns the local node metadata fields that the
|
// Metadata returns the local node metadata fields that the
|
||||||
// agent is aware of and are being kept in sync with the server
|
// agent is aware of and are being kept in sync with the server
|
||||||
func (l *State) Metadata() map[string]string {
|
func (l *State) Metadata() map[string]string {
|
||||||
metadata := make(map[string]string)
|
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
|
m := make(map[string]string)
|
||||||
for key, value := range l.metadata {
|
for k, v := range l.metadata {
|
||||||
metadata[key] = value
|
m[k] = v
|
||||||
}
|
}
|
||||||
return metadata
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateSyncState does a read of the server state, and updates
|
// UpdateSyncState does a read of the server state, and updates
|
||||||
// the local sync status as appropriate
|
// the local sync status as appropriate
|
||||||
func (l *State) UpdateSyncState() error {
|
func (l *State) UpdateSyncState() error {
|
||||||
|
// 1. get all checks and services from the master
|
||||||
req := structs.NodeSpecificRequest{
|
req := structs.NodeSpecificRequest{
|
||||||
Datacenter: l.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: l.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()},
|
QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()},
|
||||||
}
|
}
|
||||||
|
|
||||||
var out1 structs.IndexedNodeServices
|
var out1 structs.IndexedNodeServices
|
||||||
var out2 structs.IndexedHealthChecks
|
if err := l.delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil {
|
||||||
if e := l.delegate.RPC("Catalog.NodeServices", &req, &out1); e != nil {
|
return err
|
||||||
return e
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var out2 structs.IndexedHealthChecks
|
||||||
if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
checks := out2.HealthChecks
|
|
||||||
|
// 2. create useful data structures for traversal
|
||||||
|
remoteServices := make(map[string]*structs.NodeService)
|
||||||
|
if out1.NodeServices != nil {
|
||||||
|
remoteServices = out1.NodeServices.Services
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteChecks := make(map[types.CheckID]*structs.HealthCheck, len(out2.HealthChecks))
|
||||||
|
for _, rc := range out2.HealthChecks {
|
||||||
|
remoteChecks[rc.CheckID] = rc
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. perform sync
|
||||||
|
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
// Check the node info
|
// sync node info
|
||||||
if out1.NodeServices == nil || out1.NodeServices.Node == nil ||
|
if out1.NodeServices == nil || out1.NodeServices.Node == nil ||
|
||||||
out1.NodeServices.Node.ID != l.config.NodeID ||
|
out1.NodeServices.Node.ID != l.config.NodeID ||
|
||||||
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) ||
|
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) ||
|
||||||
|
@ -413,82 +505,90 @@ func (l *State) UpdateSyncState() error {
|
||||||
l.nodeInfoInSync = false
|
l.nodeInfoInSync = false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check all our services
|
// sync services
|
||||||
services := make(map[string]*structs.NodeService)
|
|
||||||
if out1.NodeServices != nil {
|
|
||||||
services = out1.NodeServices.Services
|
|
||||||
}
|
|
||||||
|
|
||||||
for id := range l.services {
|
// sync local services that do not exist remotely
|
||||||
// If the local service doesn't exist remotely, then sync it
|
for id, s := range l.services {
|
||||||
if _, ok := services[id]; !ok {
|
if remoteServices[id] == nil {
|
||||||
l.serviceStatus[id] = syncStatus{inSync: false}
|
s.InSync = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for id, service := range services {
|
for id, rs := range remoteServices {
|
||||||
// If we don't have the service locally, deregister it
|
// If we don't have the service locally, deregister it
|
||||||
existing, ok := l.services[id]
|
ls := l.services[id]
|
||||||
if !ok {
|
if ls == nil {
|
||||||
// The consul service is created automatically, and does
|
// The consul service is created automatically and does
|
||||||
// not need to be deregistered.
|
// not need to be deregistered.
|
||||||
if id == structs.ConsulServiceID {
|
if id == structs.ConsulServiceID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
l.serviceStatus[id] = syncStatus{inSync: false}
|
|
||||||
|
l.services[id] = &ServiceState{Deleted: true}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the service is scheduled for removal skip it.
|
||||||
|
// todo(fs): is this correct?
|
||||||
|
if ls.Deleted {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// If our definition is different, we need to update it. Make a
|
// If our definition is different, we need to update it. Make a
|
||||||
// copy so that we don't retain a pointer to any actual state
|
// copy so that we don't retain a pointer to any actual state
|
||||||
// store info for in-memory RPCs.
|
// store info for in-memory RPCs.
|
||||||
if existing.EnableTagOverride {
|
if ls.Service.EnableTagOverride {
|
||||||
existing.Tags = make([]string, len(service.Tags))
|
ls.Service.Tags = make([]string, len(rs.Tags))
|
||||||
copy(existing.Tags, service.Tags)
|
copy(ls.Service.Tags, rs.Tags)
|
||||||
}
|
}
|
||||||
equal := existing.IsSame(service)
|
ls.InSync = ls.Service.IsSame(rs)
|
||||||
l.serviceStatus[id] = syncStatus{inSync: equal}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Index the remote health checks to improve efficiency
|
// sync checks
|
||||||
checkIndex := make(map[types.CheckID]*structs.HealthCheck, len(checks))
|
|
||||||
for _, check := range checks {
|
|
||||||
checkIndex[check.CheckID] = check
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync any check which doesn't exist on the remote side
|
// sync local checks which do not exist remotely
|
||||||
for id := range l.checks {
|
for id, c := range l.checks {
|
||||||
if _, ok := checkIndex[id]; !ok {
|
if remoteChecks[id] == nil {
|
||||||
l.checkStatus[id] = syncStatus{inSync: false}
|
c.InSync = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, check := range checks {
|
for id, rc := range remoteChecks {
|
||||||
|
|
||||||
|
lc := l.checks[id]
|
||||||
|
|
||||||
// If we don't have the check locally, deregister it
|
// If we don't have the check locally, deregister it
|
||||||
id := check.CheckID
|
if lc == nil {
|
||||||
existing, ok := l.checks[id]
|
// The Serf check is created automatically and does not
|
||||||
if !ok {
|
|
||||||
// The Serf check is created automatically, and does not
|
|
||||||
// need to be deregistered.
|
// need to be deregistered.
|
||||||
if id == structs.SerfCheckID {
|
if id == structs.SerfCheckID {
|
||||||
|
l.logger.Printf("Skipping remote check %q since it is managed automatically", id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
l.checkStatus[id] = syncStatus{inSync: false}
|
|
||||||
|
l.checks[id] = &CheckState{Deleted: true}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the check is scheduled for removal skip it.
|
||||||
|
// todo(fs): is this correct?
|
||||||
|
if lc.Deleted {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// If our definition is different, we need to update it
|
// If our definition is different, we need to update it
|
||||||
var equal bool
|
|
||||||
if l.config.CheckUpdateInterval == 0 {
|
if l.config.CheckUpdateInterval == 0 {
|
||||||
equal = existing.IsSame(check)
|
lc.InSync = lc.Check.IsSame(rc)
|
||||||
} else {
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Copy the existing check before potentially modifying
|
// Copy the existing check before potentially modifying
|
||||||
// it before the compare operation.
|
// it before the compare operation.
|
||||||
eCopy := existing.Clone()
|
lcCopy := lc.Check.Clone()
|
||||||
|
|
||||||
// Copy the server's check before modifying, otherwise
|
// Copy the server's check before modifying, otherwise
|
||||||
// in-memory RPCs will have side effects.
|
// in-memory RPCs will have side effects.
|
||||||
cCopy := check.Clone()
|
rcCopy := rc.Clone()
|
||||||
|
|
||||||
// If there's a defer timer active then we've got a
|
// If there's a defer timer active then we've got a
|
||||||
// potentially spammy check so we don't sync the output
|
// potentially spammy check so we don't sync the output
|
||||||
|
@ -497,15 +597,11 @@ func (l *State) UpdateSyncState() error {
|
||||||
// output now. This is especially important for checks
|
// output now. This is especially important for checks
|
||||||
// that don't change state after they are created, in
|
// that don't change state after they are created, in
|
||||||
// which case we'd never see their output synced back ever.
|
// which case we'd never see their output synced back ever.
|
||||||
if _, ok := l.deferCheck[id]; ok {
|
if lc.DeferCheck != nil {
|
||||||
eCopy.Output = ""
|
lcCopy.Output = ""
|
||||||
cCopy.Output = ""
|
rcCopy.Output = ""
|
||||||
}
|
}
|
||||||
equal = eCopy.IsSame(cCopy)
|
lc.InSync = lcCopy.IsSame(rcCopy)
|
||||||
}
|
|
||||||
|
|
||||||
// Update the status
|
|
||||||
l.checkStatus[id] = syncStatus{inSync: equal}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -521,39 +617,38 @@ func (l *State) SyncChanges() error {
|
||||||
// API works.
|
// API works.
|
||||||
|
|
||||||
// Sync the services
|
// Sync the services
|
||||||
for id, status := range l.serviceStatus {
|
for id, s := range l.services {
|
||||||
if _, ok := l.services[id]; !ok {
|
var err error
|
||||||
if err := l.deleteService(id); err != nil {
|
switch {
|
||||||
return err
|
case s.Deleted:
|
||||||
}
|
err = l.deleteService(id)
|
||||||
} else if !status.inSync {
|
case !s.InSync:
|
||||||
if err := l.syncService(id); err != nil {
|
err = l.syncService(id)
|
||||||
return err
|
default:
|
||||||
}
|
|
||||||
} else {
|
|
||||||
l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id)
|
l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id)
|
||||||
}
|
}
|
||||||
}
|
if err != nil {
|
||||||
|
|
||||||
// Sync the checks
|
|
||||||
for id, status := range l.checkStatus {
|
|
||||||
if _, ok := l.checks[id]; !ok {
|
|
||||||
if err := l.deleteCheck(id); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else if !status.inSync {
|
|
||||||
// Cancel a deferred sync
|
|
||||||
if timer := l.deferCheck[id]; timer != nil {
|
|
||||||
timer.Stop()
|
|
||||||
delete(l.deferCheck, id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := l.syncCheck(id); err != nil {
|
for id, c := range l.checks {
|
||||||
return err
|
var err error
|
||||||
|
switch {
|
||||||
|
case c.Deleted:
|
||||||
|
err = l.deleteCheck(id)
|
||||||
|
case !c.InSync:
|
||||||
|
if c.DeferCheck != nil {
|
||||||
|
c.DeferCheck.Stop()
|
||||||
|
c.DeferCheck = nil
|
||||||
}
|
}
|
||||||
} else {
|
err = l.syncCheck(id)
|
||||||
|
default:
|
||||||
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
|
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now sync the node level info if we need to, and didn't do any of
|
// Now sync the node level info if we need to, and didn't do any of
|
||||||
|
@ -593,9 +688,26 @@ func (l *State) UnloadMetadata() {
|
||||||
func (l *State) Stats() map[string]string {
|
func (l *State) Stats() map[string]string {
|
||||||
l.RLock()
|
l.RLock()
|
||||||
defer l.RUnlock()
|
defer l.RUnlock()
|
||||||
|
|
||||||
|
services := 0
|
||||||
|
for _, s := range l.services {
|
||||||
|
if s.Deleted {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
services++
|
||||||
|
}
|
||||||
|
|
||||||
|
checks := 0
|
||||||
|
for _, c := range l.checks {
|
||||||
|
if c.Deleted {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
checks++
|
||||||
|
}
|
||||||
|
|
||||||
return map[string]string{
|
return map[string]string{
|
||||||
"services": strconv.Itoa(len(l.services)),
|
"services": strconv.Itoa(services),
|
||||||
"checks": strconv.Itoa(len(l.checks)),
|
"checks": strconv.Itoa(checks),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -614,12 +726,13 @@ func (l *State) deleteService(id string) error {
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
||||||
if err == nil || strings.Contains(err.Error(), "Unknown service") {
|
if err == nil || strings.Contains(err.Error(), "Unknown service") {
|
||||||
delete(l.serviceStatus, id)
|
delete(l.services, id)
|
||||||
delete(l.serviceTokens, id)
|
|
||||||
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
|
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
|
||||||
return nil
|
return nil
|
||||||
} else if acl.IsErrPermissionDenied(err) {
|
}
|
||||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
if acl.IsErrPermissionDenied(err) {
|
||||||
|
// todo(fs): why is the service in sync here?
|
||||||
|
l.services[id].InSync = true
|
||||||
l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id)
|
l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -641,12 +754,14 @@ func (l *State) deleteCheck(id types.CheckID) error {
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
||||||
if err == nil || strings.Contains(err.Error(), "Unknown check") {
|
if err == nil || strings.Contains(err.Error(), "Unknown check") {
|
||||||
delete(l.checkStatus, id)
|
// todo(fs): do we need to stop the deferCheck timer here?
|
||||||
delete(l.checkTokens, id)
|
delete(l.checks, id)
|
||||||
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
|
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
|
||||||
return nil
|
return nil
|
||||||
} else if acl.IsErrPermissionDenied(err) {
|
}
|
||||||
l.checkStatus[id] = syncStatus{inSync: true}
|
if acl.IsErrPermissionDenied(err) {
|
||||||
|
// todo(fs): why is the check in sync here?
|
||||||
|
l.checks[id].InSync = true
|
||||||
l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id)
|
l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -655,17 +770,6 @@ func (l *State) deleteCheck(id types.CheckID) error {
|
||||||
|
|
||||||
// syncService is used to sync a service to the server
|
// syncService is used to sync a service to the server
|
||||||
func (l *State) syncService(id string) error {
|
func (l *State) syncService(id string) error {
|
||||||
req := structs.RegisterRequest{
|
|
||||||
Datacenter: l.config.Datacenter,
|
|
||||||
ID: l.config.NodeID,
|
|
||||||
Node: l.config.NodeName,
|
|
||||||
Address: l.config.AdvertiseAddr,
|
|
||||||
TaggedAddresses: l.config.TaggedAddresses,
|
|
||||||
NodeMeta: l.metadata,
|
|
||||||
Service: l.services[id],
|
|
||||||
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the service has associated checks that are out of sync,
|
// If the service has associated checks that are out of sync,
|
||||||
// piggyback them on the service sync so they are part of the
|
// piggyback them on the service sync so they are part of the
|
||||||
// same transaction and are registered atomically. We only let
|
// same transaction and are registered atomically. We only let
|
||||||
|
@ -673,12 +777,28 @@ func (l *State) syncService(id string) error {
|
||||||
// otherwise we need to register them separately so they don't
|
// otherwise we need to register them separately so they don't
|
||||||
// pick up privileges from the service token.
|
// pick up privileges from the service token.
|
||||||
var checks structs.HealthChecks
|
var checks structs.HealthChecks
|
||||||
for _, check := range l.checks {
|
for checkID, c := range l.checks {
|
||||||
if check.ServiceID == id && (l.serviceToken(id) == l.checkToken(check.CheckID)) {
|
if c.Deleted || c.InSync {
|
||||||
if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync {
|
continue
|
||||||
checks = append(checks, check)
|
|
||||||
}
|
}
|
||||||
|
if c.Check.ServiceID != id {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
if l.serviceToken(id) != l.checkToken(checkID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
checks = append(checks, c.Check)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: l.config.Datacenter,
|
||||||
|
ID: l.config.NodeID,
|
||||||
|
Node: l.config.NodeName,
|
||||||
|
Address: l.config.AdvertiseAddr,
|
||||||
|
TaggedAddresses: l.config.TaggedAddresses,
|
||||||
|
NodeMeta: l.metadata,
|
||||||
|
Service: l.services[id].Service,
|
||||||
|
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backwards-compatibility for Consul < 0.5
|
// Backwards-compatibility for Consul < 0.5
|
||||||
|
@ -691,20 +811,24 @@ func (l *State) syncService(id string) error {
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
l.services[id].InSync = true
|
||||||
// Given how the register API works, this info is also updated
|
// Given how the register API works, this info is also updated
|
||||||
// every time we sync a service.
|
// every time we sync a service.
|
||||||
l.nodeInfoInSync = true
|
l.nodeInfoInSync = true
|
||||||
|
for _, check := range checks {
|
||||||
|
l.checks[check.CheckID].InSync = true
|
||||||
|
}
|
||||||
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
||||||
for _, check := range checks {
|
return nil
|
||||||
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
|
}
|
||||||
|
if acl.IsErrPermissionDenied(err) {
|
||||||
|
// todo(fs): why are the service and the checks in sync here?
|
||||||
|
// todo(fs): why is the node info not in sync here?
|
||||||
|
l.services[id].InSync = true
|
||||||
|
for _, check := range checks {
|
||||||
|
l.checks[check.CheckID].InSync = true
|
||||||
}
|
}
|
||||||
} else if acl.IsErrPermissionDenied(err) {
|
|
||||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
|
||||||
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
|
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
|
||||||
for _, check := range checks {
|
|
||||||
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -712,14 +836,7 @@ func (l *State) syncService(id string) error {
|
||||||
|
|
||||||
// syncCheck is used to sync a check to the server
|
// syncCheck is used to sync a check to the server
|
||||||
func (l *State) syncCheck(id types.CheckID) error {
|
func (l *State) syncCheck(id types.CheckID) error {
|
||||||
// Pull in the associated service if any
|
c := l.checks[id]
|
||||||
check := l.checks[id]
|
|
||||||
var service *structs.NodeService
|
|
||||||
if check.ServiceID != "" {
|
|
||||||
if serv, ok := l.services[check.ServiceID]; ok {
|
|
||||||
service = serv
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
req := structs.RegisterRequest{
|
req := structs.RegisterRequest{
|
||||||
Datacenter: l.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
|
@ -728,20 +845,29 @@ func (l *State) syncCheck(id types.CheckID) error {
|
||||||
Address: l.config.AdvertiseAddr,
|
Address: l.config.AdvertiseAddr,
|
||||||
TaggedAddresses: l.config.TaggedAddresses,
|
TaggedAddresses: l.config.TaggedAddresses,
|
||||||
NodeMeta: l.metadata,
|
NodeMeta: l.metadata,
|
||||||
Service: service,
|
Check: c.Check,
|
||||||
Check: l.checks[id],
|
|
||||||
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pull in the associated service if any
|
||||||
|
s := l.services[c.Check.ServiceID]
|
||||||
|
if s != nil && !s.Deleted {
|
||||||
|
req.Service = s.Service
|
||||||
|
}
|
||||||
|
|
||||||
var out struct{}
|
var out struct{}
|
||||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l.checkStatus[id] = syncStatus{inSync: true}
|
l.checks[id].InSync = true
|
||||||
// Given how the register API works, this info is also updated
|
// Given how the register API works, this info is also updated
|
||||||
// every time we sync a check.
|
// every time we sync a check.
|
||||||
l.nodeInfoInSync = true
|
l.nodeInfoInSync = true
|
||||||
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||||
} else if acl.IsErrPermissionDenied(err) {
|
return nil
|
||||||
l.checkStatus[id] = syncStatus{inSync: true}
|
}
|
||||||
|
if acl.IsErrPermissionDenied(err) {
|
||||||
|
// todo(fs): why is the check in sync here?
|
||||||
|
l.checks[id].InSync = true
|
||||||
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
|
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -763,7 +889,10 @@ func (l *State) syncNodeInfo() error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l.nodeInfoInSync = true
|
l.nodeInfoInSync = true
|
||||||
l.logger.Printf("[INFO] agent: Synced node info")
|
l.logger.Printf("[INFO] agent: Synced node info")
|
||||||
} else if acl.IsErrPermissionDenied(err) {
|
return nil
|
||||||
|
}
|
||||||
|
if acl.IsErrPermissionDenied(err) {
|
||||||
|
// todo(fs): why is the node info in sync here?
|
||||||
l.nodeInfoInSync = true
|
l.nodeInfoInSync = true
|
||||||
l.logger.Printf("[WARN] agent: Node info update blocked by ACLs")
|
l.logger.Printf("[WARN] agent: Node info update blocked by ACLs")
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue