mirror of https://github.com/status-im/consul.git
Revert "local state: replace multi-map state with structs"
This reverts commit ccbae7da5b
.
This commit is contained in:
parent
9ed4b2d631
commit
623e07760a
|
@ -1386,31 +1386,29 @@ OUTER:
|
|||
|
||||
// reapServicesInternal does a single pass, looking for services to reap.
|
||||
func (a *Agent) reapServicesInternal() {
|
||||
reaped := make(map[string]bool)
|
||||
for checkID, cs := range a.state.CriticalCheckStates() {
|
||||
serviceID := cs.Check.ServiceID
|
||||
|
||||
reaped := make(map[string]struct{})
|
||||
for checkID, check := range a.state.CriticalChecks() {
|
||||
// There's nothing to do if there's no service.
|
||||
if serviceID == "" {
|
||||
if check.Check.ServiceID == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// There might be multiple checks for one service, so
|
||||
// we don't need to reap multiple times.
|
||||
if reaped[serviceID] {
|
||||
serviceID := check.Check.ServiceID
|
||||
if _, ok := reaped[serviceID]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// See if there's a timeout.
|
||||
// todo(fs): this looks fishy... why is there anoter data structure in the agent with its own lock?
|
||||
a.checkLock.Lock()
|
||||
timeout := a.checkReapAfter[checkID]
|
||||
timeout, ok := a.checkReapAfter[checkID]
|
||||
a.checkLock.Unlock()
|
||||
|
||||
// Reap, if necessary. We keep track of which service
|
||||
// this is so that we won't try to remove it again.
|
||||
if timeout > 0 && cs.CriticalFor() > timeout {
|
||||
reaped[serviceID] = true
|
||||
if ok && check.CriticalFor > timeout {
|
||||
reaped[serviceID] = struct{}{}
|
||||
a.RemoveService(serviceID, true)
|
||||
a.logger.Printf("[INFO] agent: Check %q for service %q has been critical for too long; deregistered service",
|
||||
checkID, serviceID)
|
||||
|
|
|
@ -21,6 +21,12 @@ import (
|
|||
// permissionDenied is returned when an ACL based rejection happens.
|
||||
const permissionDenied = "Permission denied"
|
||||
|
||||
// syncStatus is used to represent the difference between
|
||||
// the local and remote state, and if action needs to be taken
|
||||
type syncStatus struct {
|
||||
inSync bool // Is this in sync with the server
|
||||
}
|
||||
|
||||
// Config is the configuration for the State. It is
|
||||
// populated during NewLocalAgent from the agent configuration to avoid
|
||||
// race conditions with the agent configuration.
|
||||
|
@ -34,62 +40,6 @@ type Config struct {
|
|||
TaggedAddresses map[string]string
|
||||
}
|
||||
|
||||
// ServiceState describes the state of a service record.
|
||||
type ServiceState struct {
|
||||
// Service is the local copy of the service record.
|
||||
Service *structs.NodeService
|
||||
|
||||
// Token is the ACL to update the service record on the server.
|
||||
Token string
|
||||
|
||||
// InSync contains whether the local state of the service record
|
||||
// is in sync with the remote state on the server.
|
||||
InSync bool
|
||||
|
||||
// Deleted is true when the service record has been marked as deleted
|
||||
// but has not been removed on the server yet.
|
||||
Deleted bool
|
||||
}
|
||||
|
||||
// CheckState describes the state of a health check record.
|
||||
type CheckState struct {
|
||||
// Check is the local copy of the health check record.
|
||||
Check *structs.HealthCheck
|
||||
|
||||
// Token is the ACL record to update the health check record
|
||||
// on the server.
|
||||
Token string
|
||||
|
||||
// CriticalTime is the last time the health check status went
|
||||
// from non-critical to critical. When the health check is not
|
||||
// in critical state the value is the zero value.
|
||||
CriticalTime time.Time
|
||||
|
||||
// DeferCheck is used to delay the sync of a health check when
|
||||
// only the status has changed.
|
||||
// todo(fs): ^^ this needs double checking...
|
||||
DeferCheck *time.Timer
|
||||
|
||||
// InSync contains whether the local state of the health check
|
||||
// record is in sync with the remote state on the server.
|
||||
InSync bool
|
||||
|
||||
// Deleted is true when the health check record has been marked as
|
||||
// deleted but has not been removed on the server yet.
|
||||
Deleted bool
|
||||
}
|
||||
|
||||
// Critical returns true when the health check is in critical state.
|
||||
func (c *CheckState) Critical() bool {
|
||||
return !c.CriticalTime.IsZero()
|
||||
}
|
||||
|
||||
// CriticalFor returns the amount of time the service has been in critical
|
||||
// state. Its value is undefined when the service is not in critical state.
|
||||
func (c *CheckState) CriticalFor() time.Duration {
|
||||
return time.Since(c.CriticalTime)
|
||||
}
|
||||
|
||||
type delegate interface {
|
||||
RPC(method string, args interface{}, reply interface{}) error
|
||||
}
|
||||
|
@ -112,10 +62,18 @@ type State struct {
|
|||
nodeInfoInSync bool
|
||||
|
||||
// Services tracks the local services
|
||||
services map[string]*ServiceState
|
||||
services map[string]*structs.NodeService
|
||||
serviceStatus map[string]syncStatus
|
||||
serviceTokens map[string]string
|
||||
|
||||
// Checks tracks the local checks
|
||||
checks map[types.CheckID]*CheckState
|
||||
checks map[types.CheckID]*structs.HealthCheck
|
||||
checkStatus map[types.CheckID]syncStatus
|
||||
checkTokens map[types.CheckID]string
|
||||
checkCriticalTime map[types.CheckID]time.Time
|
||||
|
||||
// Used to track checks that are being deferred
|
||||
deferCheck map[types.CheckID]*time.Timer
|
||||
|
||||
// metadata tracks the local metadata fields
|
||||
metadata map[string]string
|
||||
|
@ -128,20 +86,25 @@ type State struct {
|
|||
// is stored in the raft log.
|
||||
discardCheckOutput atomic.Value // bool
|
||||
|
||||
// tokens contains the ACL tokens
|
||||
tokens *token.Store
|
||||
}
|
||||
|
||||
// NewLocalState creates a is used to initialize the local state
|
||||
func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State {
|
||||
l := &State{
|
||||
config: c,
|
||||
logger: lg,
|
||||
services: make(map[string]*ServiceState),
|
||||
checks: make(map[types.CheckID]*CheckState),
|
||||
metadata: make(map[string]string),
|
||||
triggerCh: triggerCh,
|
||||
tokens: tokens,
|
||||
config: c,
|
||||
logger: lg,
|
||||
services: make(map[string]*structs.NodeService),
|
||||
serviceStatus: make(map[string]syncStatus),
|
||||
serviceTokens: make(map[string]string),
|
||||
checks: make(map[types.CheckID]*structs.HealthCheck),
|
||||
checkStatus: make(map[types.CheckID]syncStatus),
|
||||
checkTokens: make(map[types.CheckID]string),
|
||||
checkCriticalTime: make(map[types.CheckID]time.Time),
|
||||
deferCheck: make(map[types.CheckID]*time.Timer),
|
||||
metadata: make(map[string]string),
|
||||
triggerCh: triggerCh,
|
||||
tokens: tokens,
|
||||
}
|
||||
l.discardCheckOutput.Store(c.DiscardCheckOutput)
|
||||
return l
|
||||
|
@ -174,10 +137,7 @@ func (l *State) ServiceToken(id string) string {
|
|||
|
||||
// serviceToken returns an ACL token associated with a service.
|
||||
func (l *State) serviceToken(id string) string {
|
||||
var token string
|
||||
if s := l.services[id]; s != nil {
|
||||
token = s.Token
|
||||
}
|
||||
token := l.serviceTokens[id]
|
||||
if token == "" {
|
||||
token = l.tokens.UserToken()
|
||||
}
|
||||
|
@ -187,47 +147,36 @@ func (l *State) serviceToken(id string) string {
|
|||
// AddService is used to add a service entry to the local state.
|
||||
// This entry is persistent and the agent will make a best effort to
|
||||
// ensure it is registered
|
||||
// todo(fs): where is the persistence happening?
|
||||
func (l *State) AddService(service *structs.NodeService, token string) error {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
if service == nil {
|
||||
return fmt.Errorf("no service")
|
||||
}
|
||||
|
||||
// use the service name as id if the id was omitted
|
||||
// todo(fs): is this for backwards compatibility?
|
||||
if service.ID == "" {
|
||||
func (l *State) AddService(service *structs.NodeService, token string) {
|
||||
// Assign the ID if none given
|
||||
if service.ID == "" && service.Service != "" {
|
||||
service.ID = service.Service
|
||||
}
|
||||
|
||||
l.services[service.ID] = &ServiceState{
|
||||
Service: service,
|
||||
Token: token,
|
||||
}
|
||||
l.changeMade()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveService is used to remove a service entry from the local state.
|
||||
// The agent will make a best effort to ensure it is deregistered.
|
||||
func (l *State) RemoveService(id string) error {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
s := l.services[id]
|
||||
if s == nil || s.Deleted {
|
||||
return fmt.Errorf("Service %q does not exist", id)
|
||||
}
|
||||
|
||||
// To remove the service on the server we need the token.
|
||||
// Therefore, we mark the service as deleted and keep the
|
||||
// entry around until it is actually removed.
|
||||
s.InSync = false
|
||||
s.Deleted = true
|
||||
l.services[service.ID] = service
|
||||
l.serviceStatus[service.ID] = syncStatus{}
|
||||
l.serviceTokens[service.ID] = token
|
||||
l.changeMade()
|
||||
}
|
||||
|
||||
// RemoveService is used to remove a service entry from the local state.
|
||||
// The agent will make a best effort to ensure it is deregistered
|
||||
func (l *State) RemoveService(serviceID string) error {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
if _, ok := l.services[serviceID]; ok {
|
||||
delete(l.services, serviceID)
|
||||
// Leave the service token around, if any, until we successfully
|
||||
// delete the service.
|
||||
l.serviceStatus[serviceID] = syncStatus{inSync: false}
|
||||
l.changeMade()
|
||||
} else {
|
||||
return fmt.Errorf("Service does not exist")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -237,28 +186,20 @@ func (l *State) RemoveService(id string) error {
|
|||
func (l *State) Service(id string) *structs.NodeService {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
s := l.services[id]
|
||||
if s == nil || s.Deleted {
|
||||
return nil
|
||||
}
|
||||
return s.Service
|
||||
return l.services[id]
|
||||
}
|
||||
|
||||
// Services returns the locally registered services that the
|
||||
// agent is aware of and are being kept in sync with the server
|
||||
func (l *State) Services() map[string]*structs.NodeService {
|
||||
services := make(map[string]*structs.NodeService)
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
m := make(map[string]*structs.NodeService)
|
||||
for id, s := range l.services {
|
||||
if s.Deleted {
|
||||
continue
|
||||
}
|
||||
m[id] = s.Service
|
||||
for name, serv := range l.services {
|
||||
services[name] = serv
|
||||
}
|
||||
return m
|
||||
return services
|
||||
}
|
||||
|
||||
// CheckToken is used to return the configured health check token for a
|
||||
|
@ -270,12 +211,8 @@ func (l *State) CheckToken(checkID types.CheckID) string {
|
|||
}
|
||||
|
||||
// checkToken returns an ACL token associated with a check.
|
||||
func (l *State) checkToken(id types.CheckID) string {
|
||||
var token string
|
||||
c := l.checks[id]
|
||||
if c != nil {
|
||||
token = c.Token
|
||||
}
|
||||
func (l *State) checkToken(checkID types.CheckID) string {
|
||||
token := l.checkTokens[checkID]
|
||||
if token == "" {
|
||||
token = l.tokens.UserToken()
|
||||
}
|
||||
|
@ -289,9 +226,8 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
|
|||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
if check == nil {
|
||||
return fmt.Errorf("no check")
|
||||
}
|
||||
// Set the node name
|
||||
check.Node = l.config.NodeName
|
||||
|
||||
if l.discardCheckOutput.Load().(bool) {
|
||||
check.Output = ""
|
||||
|
@ -300,51 +236,38 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
|
|||
// if there is a serviceID associated with the check, make sure it exists before adding it
|
||||
// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
|
||||
if check.ServiceID != "" && l.services[check.ServiceID] == nil {
|
||||
return fmt.Errorf("Check %q refers to non-existent service %q does not exist", check.CheckID, check.ServiceID)
|
||||
return fmt.Errorf("ServiceID %q does not exist", check.ServiceID)
|
||||
}
|
||||
|
||||
// hard-set the node name
|
||||
check.Node = l.config.NodeName
|
||||
|
||||
l.checks[check.CheckID] = &CheckState{
|
||||
Check: check,
|
||||
Token: token,
|
||||
}
|
||||
l.checks[check.CheckID] = check
|
||||
l.checkStatus[check.CheckID] = syncStatus{}
|
||||
l.checkTokens[check.CheckID] = token
|
||||
delete(l.checkCriticalTime, check.CheckID)
|
||||
l.changeMade()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveCheck is used to remove a health check from the local state.
|
||||
// The agent will make a best effort to ensure it is deregistered
|
||||
// todo(fs): RemoveService returns an error for a non-existant service. RemoveCheck should as well.
|
||||
// todo(fs): Check code that calls this to handle the error.
|
||||
func (l *State) RemoveCheck(id types.CheckID) error {
|
||||
func (l *State) RemoveCheck(checkID types.CheckID) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
c := l.checks[id]
|
||||
if c == nil || c.Deleted {
|
||||
return fmt.Errorf("Check %q does not exist", id)
|
||||
}
|
||||
|
||||
// To remove the check on the server we need the token.
|
||||
// Therefore, we mark the service as deleted and keep the
|
||||
// entry around until it is actually removed.
|
||||
c.InSync = false
|
||||
c.Deleted = true
|
||||
delete(l.checks, checkID)
|
||||
// Leave the check token around, if any, until we successfully delete
|
||||
// the check.
|
||||
delete(l.checkCriticalTime, checkID)
|
||||
l.checkStatus[checkID] = syncStatus{inSync: false}
|
||||
l.changeMade()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateCheck is used to update the status of a check
|
||||
func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
||||
func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
c := l.checks[id]
|
||||
if c == nil || c.Deleted {
|
||||
check, ok := l.checks[checkID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -355,15 +278,16 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
|||
// Update the critical time tracking (this doesn't cause a server updates
|
||||
// so we can always keep this up to date).
|
||||
if status == api.HealthCritical {
|
||||
if !c.Critical() {
|
||||
c.CriticalTime = time.Now()
|
||||
_, wasCritical := l.checkCriticalTime[checkID]
|
||||
if !wasCritical {
|
||||
l.checkCriticalTime[checkID] = time.Now()
|
||||
}
|
||||
} else {
|
||||
c.CriticalTime = time.Time{}
|
||||
delete(l.checkCriticalTime, checkID)
|
||||
}
|
||||
|
||||
// Do nothing if update is idempotent
|
||||
if c.Check.Status == status && c.Check.Output == output {
|
||||
if check.Status == status && check.Output == output {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -371,34 +295,28 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
|||
// frequent updates of output. Instead, we update the output internally,
|
||||
// and periodically do a write-back to the servers. If there is a status
|
||||
// change we do the write immediately.
|
||||
if l.config.CheckUpdateInterval > 0 && c.Check.Status == status {
|
||||
c.Check.Output = output
|
||||
if c.DeferCheck == nil {
|
||||
d := l.config.CheckUpdateInterval
|
||||
intv := time.Duration(uint64(d)/2) + lib.RandomStagger(d)
|
||||
c.DeferCheck = time.AfterFunc(intv, func() {
|
||||
if l.config.CheckUpdateInterval > 0 && check.Status == status {
|
||||
check.Output = output
|
||||
if _, ok := l.deferCheck[checkID]; !ok {
|
||||
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval)
|
||||
deferSync := time.AfterFunc(intv, func() {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
c := l.checks[id]
|
||||
if c == nil {
|
||||
return
|
||||
if _, ok := l.checkStatus[checkID]; ok {
|
||||
l.checkStatus[checkID] = syncStatus{inSync: false}
|
||||
l.changeMade()
|
||||
}
|
||||
c.DeferCheck = nil
|
||||
if c.Deleted {
|
||||
return
|
||||
}
|
||||
c.InSync = false
|
||||
l.changeMade()
|
||||
delete(l.deferCheck, checkID)
|
||||
l.Unlock()
|
||||
})
|
||||
l.deferCheck[checkID] = deferSync
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Update status and mark out of sync
|
||||
c.Check.Status = status
|
||||
c.Check.Output = output
|
||||
c.InSync = false
|
||||
check.Status = status
|
||||
check.Output = output
|
||||
l.checkStatus[checkID] = syncStatus{inSync: false}
|
||||
l.changeMade()
|
||||
}
|
||||
|
||||
|
@ -407,12 +325,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) {
|
|||
func (l *State) Check(id types.CheckID) *structs.HealthCheck {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
c := l.checks[id]
|
||||
if c == nil || c.Deleted {
|
||||
return nil
|
||||
}
|
||||
return c.Check
|
||||
return l.checks[id]
|
||||
}
|
||||
|
||||
// Checks returns the locally registered checks that the
|
||||
|
@ -421,83 +334,78 @@ func (l *State) Checks() map[types.CheckID]*structs.HealthCheck {
|
|||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
m := make(map[types.CheckID]*structs.HealthCheck)
|
||||
checks := make(map[types.CheckID]*structs.HealthCheck)
|
||||
for id, c := range l.checks {
|
||||
if c.Deleted {
|
||||
continue
|
||||
}
|
||||
c2 := new(structs.HealthCheck)
|
||||
*c2 = *c.Check
|
||||
m[id] = c2
|
||||
*c2 = *c
|
||||
checks[id] = c2
|
||||
}
|
||||
return m
|
||||
return checks
|
||||
}
|
||||
|
||||
// CriticalCheckStates returns the locally registered checks that the
|
||||
// agent is aware of and are being kept in sync with the server
|
||||
func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
|
||||
// CriticalCheck is used to return the duration a check has been critical along
|
||||
// with its associated health check.
|
||||
type CriticalCheck struct {
|
||||
CriticalFor time.Duration
|
||||
Check *structs.HealthCheck
|
||||
}
|
||||
|
||||
// CriticalChecks returns locally registered health checks that the agent is
|
||||
// aware of and are being kept in sync with the server, and that are in a
|
||||
// critical state. This also returns information about how long each check has
|
||||
// been critical.
|
||||
func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck {
|
||||
checks := make(map[types.CheckID]CriticalCheck)
|
||||
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
m := make(map[types.CheckID]*CheckState)
|
||||
for id, c := range l.checks {
|
||||
if c.Deleted || !c.Critical() {
|
||||
continue
|
||||
now := time.Now()
|
||||
for checkID, criticalTime := range l.checkCriticalTime {
|
||||
checks[checkID] = CriticalCheck{
|
||||
CriticalFor: now.Sub(criticalTime),
|
||||
Check: l.checks[checkID],
|
||||
}
|
||||
m[id] = c
|
||||
}
|
||||
return m
|
||||
|
||||
return checks
|
||||
}
|
||||
|
||||
// Metadata returns the local node metadata fields that the
|
||||
// agent is aware of and are being kept in sync with the server
|
||||
func (l *State) Metadata() map[string]string {
|
||||
metadata := make(map[string]string)
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
m := make(map[string]string)
|
||||
for k, v := range l.metadata {
|
||||
m[k] = v
|
||||
|
||||
for key, value := range l.metadata {
|
||||
metadata[key] = value
|
||||
}
|
||||
return m
|
||||
return metadata
|
||||
}
|
||||
|
||||
// UpdateSyncState does a read of the server state, and updates
|
||||
// the local sync status as appropriate
|
||||
func (l *State) UpdateSyncState() error {
|
||||
// 1. get all checks and services from the master
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()},
|
||||
}
|
||||
|
||||
var out1 structs.IndexedNodeServices
|
||||
if err := l.delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var out2 structs.IndexedHealthChecks
|
||||
if e := l.delegate.RPC("Catalog.NodeServices", &req, &out1); e != nil {
|
||||
return e
|
||||
}
|
||||
if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. create useful data structures for traversal
|
||||
remoteServices := make(map[string]*structs.NodeService)
|
||||
if out1.NodeServices != nil {
|
||||
remoteServices = out1.NodeServices.Services
|
||||
}
|
||||
|
||||
remoteChecks := make(map[types.CheckID]*structs.HealthCheck, len(out2.HealthChecks))
|
||||
for _, rc := range out2.HealthChecks {
|
||||
remoteChecks[rc.CheckID] = rc
|
||||
}
|
||||
|
||||
// 3. perform sync
|
||||
checks := out2.HealthChecks
|
||||
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
// sync node info
|
||||
// Check the node info
|
||||
if out1.NodeServices == nil || out1.NodeServices.Node == nil ||
|
||||
out1.NodeServices.Node.ID != l.config.NodeID ||
|
||||
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) ||
|
||||
|
@ -505,103 +413,99 @@ func (l *State) UpdateSyncState() error {
|
|||
l.nodeInfoInSync = false
|
||||
}
|
||||
|
||||
// sync services
|
||||
// Check all our services
|
||||
services := make(map[string]*structs.NodeService)
|
||||
if out1.NodeServices != nil {
|
||||
services = out1.NodeServices.Services
|
||||
}
|
||||
|
||||
// sync local services that do not exist remotely
|
||||
for id, s := range l.services {
|
||||
if remoteServices[id] == nil {
|
||||
s.InSync = false
|
||||
for id := range l.services {
|
||||
// If the local service doesn't exist remotely, then sync it
|
||||
if _, ok := services[id]; !ok {
|
||||
l.serviceStatus[id] = syncStatus{inSync: false}
|
||||
}
|
||||
}
|
||||
|
||||
for id, rs := range remoteServices {
|
||||
for id, service := range services {
|
||||
// If we don't have the service locally, deregister it
|
||||
ls := l.services[id]
|
||||
if ls == nil {
|
||||
// The consul service is created automatically and does
|
||||
existing, ok := l.services[id]
|
||||
if !ok {
|
||||
// The consul service is created automatically, and does
|
||||
// not need to be deregistered.
|
||||
if id == structs.ConsulServiceID {
|
||||
continue
|
||||
}
|
||||
|
||||
l.services[id] = &ServiceState{Deleted: true}
|
||||
continue
|
||||
}
|
||||
|
||||
// If the service is scheduled for removal skip it.
|
||||
// todo(fs): is this correct?
|
||||
if ls.Deleted {
|
||||
l.serviceStatus[id] = syncStatus{inSync: false}
|
||||
continue
|
||||
}
|
||||
|
||||
// If our definition is different, we need to update it. Make a
|
||||
// copy so that we don't retain a pointer to any actual state
|
||||
// store info for in-memory RPCs.
|
||||
if ls.Service.EnableTagOverride {
|
||||
ls.Service.Tags = make([]string, len(rs.Tags))
|
||||
copy(ls.Service.Tags, rs.Tags)
|
||||
if existing.EnableTagOverride {
|
||||
existing.Tags = make([]string, len(service.Tags))
|
||||
copy(existing.Tags, service.Tags)
|
||||
}
|
||||
ls.InSync = ls.Service.IsSame(rs)
|
||||
equal := existing.IsSame(service)
|
||||
l.serviceStatus[id] = syncStatus{inSync: equal}
|
||||
}
|
||||
|
||||
// sync checks
|
||||
// Index the remote health checks to improve efficiency
|
||||
checkIndex := make(map[types.CheckID]*structs.HealthCheck, len(checks))
|
||||
for _, check := range checks {
|
||||
checkIndex[check.CheckID] = check
|
||||
}
|
||||
|
||||
// sync local checks which do not exist remotely
|
||||
for id, c := range l.checks {
|
||||
if remoteChecks[id] == nil {
|
||||
c.InSync = false
|
||||
// Sync any check which doesn't exist on the remote side
|
||||
for id := range l.checks {
|
||||
if _, ok := checkIndex[id]; !ok {
|
||||
l.checkStatus[id] = syncStatus{inSync: false}
|
||||
}
|
||||
}
|
||||
|
||||
for id, rc := range remoteChecks {
|
||||
|
||||
lc := l.checks[id]
|
||||
|
||||
for _, check := range checks {
|
||||
// If we don't have the check locally, deregister it
|
||||
if lc == nil {
|
||||
// The Serf check is created automatically and does not
|
||||
id := check.CheckID
|
||||
existing, ok := l.checks[id]
|
||||
if !ok {
|
||||
// The Serf check is created automatically, and does not
|
||||
// need to be deregistered.
|
||||
if id == structs.SerfCheckID {
|
||||
l.logger.Printf("Skipping remote check %q since it is managed automatically", id)
|
||||
continue
|
||||
}
|
||||
|
||||
l.checks[id] = &CheckState{Deleted: true}
|
||||
continue
|
||||
}
|
||||
|
||||
// If the check is scheduled for removal skip it.
|
||||
// todo(fs): is this correct?
|
||||
if lc.Deleted {
|
||||
l.checkStatus[id] = syncStatus{inSync: false}
|
||||
continue
|
||||
}
|
||||
|
||||
// If our definition is different, we need to update it
|
||||
var equal bool
|
||||
if l.config.CheckUpdateInterval == 0 {
|
||||
lc.InSync = lc.Check.IsSame(rc)
|
||||
continue
|
||||
equal = existing.IsSame(check)
|
||||
} else {
|
||||
// Copy the existing check before potentially modifying
|
||||
// it before the compare operation.
|
||||
eCopy := existing.Clone()
|
||||
|
||||
// Copy the server's check before modifying, otherwise
|
||||
// in-memory RPCs will have side effects.
|
||||
cCopy := check.Clone()
|
||||
|
||||
// If there's a defer timer active then we've got a
|
||||
// potentially spammy check so we don't sync the output
|
||||
// during this sweep since the timer will mark the check
|
||||
// out of sync for us. Otherwise, it is safe to sync the
|
||||
// output now. This is especially important for checks
|
||||
// that don't change state after they are created, in
|
||||
// which case we'd never see their output synced back ever.
|
||||
if _, ok := l.deferCheck[id]; ok {
|
||||
eCopy.Output = ""
|
||||
cCopy.Output = ""
|
||||
}
|
||||
equal = eCopy.IsSame(cCopy)
|
||||
}
|
||||
|
||||
// Copy the existing check before potentially modifying
|
||||
// it before the compare operation.
|
||||
lcCopy := lc.Check.Clone()
|
||||
|
||||
// Copy the server's check before modifying, otherwise
|
||||
// in-memory RPCs will have side effects.
|
||||
rcCopy := rc.Clone()
|
||||
|
||||
// If there's a defer timer active then we've got a
|
||||
// potentially spammy check so we don't sync the output
|
||||
// during this sweep since the timer will mark the check
|
||||
// out of sync for us. Otherwise, it is safe to sync the
|
||||
// output now. This is especially important for checks
|
||||
// that don't change state after they are created, in
|
||||
// which case we'd never see their output synced back ever.
|
||||
if lc.DeferCheck != nil {
|
||||
lcCopy.Output = ""
|
||||
rcCopy.Output = ""
|
||||
}
|
||||
lc.InSync = lcCopy.IsSame(rcCopy)
|
||||
// Update the status
|
||||
l.checkStatus[id] = syncStatus{inSync: equal}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -617,38 +521,39 @@ func (l *State) SyncChanges() error {
|
|||
// API works.
|
||||
|
||||
// Sync the services
|
||||
for id, s := range l.services {
|
||||
var err error
|
||||
switch {
|
||||
case s.Deleted:
|
||||
err = l.deleteService(id)
|
||||
case !s.InSync:
|
||||
err = l.syncService(id)
|
||||
default:
|
||||
for id, status := range l.serviceStatus {
|
||||
if _, ok := l.services[id]; !ok {
|
||||
if err := l.deleteService(id); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !status.inSync {
|
||||
if err := l.syncService(id); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
l.logger.Printf("[DEBUG] agent: Service '%s' in sync", id)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for id, c := range l.checks {
|
||||
var err error
|
||||
switch {
|
||||
case c.Deleted:
|
||||
err = l.deleteCheck(id)
|
||||
case !c.InSync:
|
||||
if c.DeferCheck != nil {
|
||||
c.DeferCheck.Stop()
|
||||
c.DeferCheck = nil
|
||||
// Sync the checks
|
||||
for id, status := range l.checkStatus {
|
||||
if _, ok := l.checks[id]; !ok {
|
||||
if err := l.deleteCheck(id); err != nil {
|
||||
return err
|
||||
}
|
||||
err = l.syncCheck(id)
|
||||
default:
|
||||
} else if !status.inSync {
|
||||
// Cancel a deferred sync
|
||||
if timer := l.deferCheck[id]; timer != nil {
|
||||
timer.Stop()
|
||||
delete(l.deferCheck, id)
|
||||
}
|
||||
|
||||
if err := l.syncCheck(id); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Now sync the node level info if we need to, and didn't do any of
|
||||
|
@ -688,26 +593,9 @@ func (l *State) UnloadMetadata() {
|
|||
func (l *State) Stats() map[string]string {
|
||||
l.RLock()
|
||||
defer l.RUnlock()
|
||||
|
||||
services := 0
|
||||
for _, s := range l.services {
|
||||
if s.Deleted {
|
||||
continue
|
||||
}
|
||||
services++
|
||||
}
|
||||
|
||||
checks := 0
|
||||
for _, c := range l.checks {
|
||||
if c.Deleted {
|
||||
continue
|
||||
}
|
||||
checks++
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
"services": strconv.Itoa(services),
|
||||
"checks": strconv.Itoa(checks),
|
||||
"services": strconv.Itoa(len(l.services)),
|
||||
"checks": strconv.Itoa(len(l.checks)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -726,13 +614,12 @@ func (l *State) deleteService(id string) error {
|
|||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
||||
if err == nil || strings.Contains(err.Error(), "Unknown service") {
|
||||
delete(l.services, id)
|
||||
delete(l.serviceStatus, id)
|
||||
delete(l.serviceTokens, id)
|
||||
l.logger.Printf("[INFO] agent: Deregistered service '%s'", id)
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why is the service in sync here?
|
||||
l.services[id].InSync = true
|
||||
} else if acl.IsErrPermissionDenied(err) {
|
||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id)
|
||||
return nil
|
||||
}
|
||||
|
@ -754,14 +641,12 @@ func (l *State) deleteCheck(id types.CheckID) error {
|
|||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Deregister", &req, &out)
|
||||
if err == nil || strings.Contains(err.Error(), "Unknown check") {
|
||||
// todo(fs): do we need to stop the deferCheck timer here?
|
||||
delete(l.checks, id)
|
||||
delete(l.checkStatus, id)
|
||||
delete(l.checkTokens, id)
|
||||
l.logger.Printf("[INFO] agent: Deregistered check '%s'", id)
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why is the check in sync here?
|
||||
l.checks[id].InSync = true
|
||||
} else if acl.IsErrPermissionDenied(err) {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id)
|
||||
return nil
|
||||
}
|
||||
|
@ -770,26 +655,6 @@ func (l *State) deleteCheck(id types.CheckID) error {
|
|||
|
||||
// syncService is used to sync a service to the server
|
||||
func (l *State) syncService(id string) error {
|
||||
// If the service has associated checks that are out of sync,
|
||||
// piggyback them on the service sync so they are part of the
|
||||
// same transaction and are registered atomically. We only let
|
||||
// checks ride on service registrations with the same token,
|
||||
// otherwise we need to register them separately so they don't
|
||||
// pick up privileges from the service token.
|
||||
var checks structs.HealthChecks
|
||||
for checkID, c := range l.checks {
|
||||
if c.Deleted || c.InSync {
|
||||
continue
|
||||
}
|
||||
if c.Check.ServiceID != id {
|
||||
continue
|
||||
}
|
||||
if l.serviceToken(id) != l.checkToken(checkID) {
|
||||
continue
|
||||
}
|
||||
checks = append(checks, c.Check)
|
||||
}
|
||||
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
ID: l.config.NodeID,
|
||||
|
@ -797,10 +662,25 @@ func (l *State) syncService(id string) error {
|
|||
Address: l.config.AdvertiseAddr,
|
||||
TaggedAddresses: l.config.TaggedAddresses,
|
||||
NodeMeta: l.metadata,
|
||||
Service: l.services[id].Service,
|
||||
Service: l.services[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
||||
}
|
||||
|
||||
// If the service has associated checks that are out of sync,
|
||||
// piggyback them on the service sync so they are part of the
|
||||
// same transaction and are registered atomically. We only let
|
||||
// checks ride on service registrations with the same token,
|
||||
// otherwise we need to register them separately so they don't
|
||||
// pick up privileges from the service token.
|
||||
var checks structs.HealthChecks
|
||||
for _, check := range l.checks {
|
||||
if check.ServiceID == id && (l.serviceToken(id) == l.checkToken(check.CheckID)) {
|
||||
if stat, ok := l.checkStatus[check.CheckID]; !ok || !stat.inSync {
|
||||
checks = append(checks, check)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Backwards-compatibility for Consul < 0.5
|
||||
if len(checks) == 1 {
|
||||
req.Check = checks[0]
|
||||
|
@ -811,24 +691,20 @@ func (l *State) syncService(id string) error {
|
|||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.services[id].InSync = true
|
||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||
// Given how the register API works, this info is also updated
|
||||
// every time we sync a service.
|
||||
l.nodeInfoInSync = true
|
||||
for _, check := range checks {
|
||||
l.checks[check.CheckID].InSync = true
|
||||
}
|
||||
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why are the service and the checks in sync here?
|
||||
// todo(fs): why is the node info not in sync here?
|
||||
l.services[id].InSync = true
|
||||
for _, check := range checks {
|
||||
l.checks[check.CheckID].InSync = true
|
||||
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
|
||||
}
|
||||
} else if acl.IsErrPermissionDenied(err) {
|
||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id)
|
||||
for _, check := range checks {
|
||||
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
@ -836,7 +712,14 @@ func (l *State) syncService(id string) error {
|
|||
|
||||
// syncCheck is used to sync a check to the server
|
||||
func (l *State) syncCheck(id types.CheckID) error {
|
||||
c := l.checks[id]
|
||||
// Pull in the associated service if any
|
||||
check := l.checks[id]
|
||||
var service *structs.NodeService
|
||||
if check.ServiceID != "" {
|
||||
if serv, ok := l.services[check.ServiceID]; ok {
|
||||
service = serv
|
||||
}
|
||||
}
|
||||
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
|
@ -845,29 +728,20 @@ func (l *State) syncCheck(id types.CheckID) error {
|
|||
Address: l.config.AdvertiseAddr,
|
||||
TaggedAddresses: l.config.TaggedAddresses,
|
||||
NodeMeta: l.metadata,
|
||||
Check: c.Check,
|
||||
Service: service,
|
||||
Check: l.checks[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
||||
}
|
||||
|
||||
// Pull in the associated service if any
|
||||
s := l.services[c.Check.ServiceID]
|
||||
if s != nil && !s.Deleted {
|
||||
req.Service = s.Service
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
err := l.delegate.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.checks[id].InSync = true
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
// Given how the register API works, this info is also updated
|
||||
// every time we sync a check.
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why is the check in sync here?
|
||||
l.checks[id].InSync = true
|
||||
} else if acl.IsErrPermissionDenied(err) {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
|
||||
return nil
|
||||
}
|
||||
|
@ -889,10 +763,7 @@ func (l *State) syncNodeInfo() error {
|
|||
if err == nil {
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced node info")
|
||||
return nil
|
||||
}
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// todo(fs): why is the node info in sync here?
|
||||
} else if acl.IsErrPermissionDenied(err) {
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[WARN] agent: Node info update blocked by ACLs")
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue