diff --git a/command/agent/agent.go b/command/agent/agent.go index 77fef1aa9b..a9cc52ec17 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -3,6 +3,7 @@ package agent import ( "fmt" "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/serf" "io" "log" @@ -36,6 +37,10 @@ type Agent struct { shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex + + // state stores a local representation of the node, + // services and checks. Used for anti-entropy. + state localState } // Create is used to create a new Agent. Returns @@ -77,6 +82,14 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { logger: log.New(logOutput, "", log.LstdFlags), logOutput: logOutput, shutdownCh: make(chan struct{}), + state: localState{ + delaySync: make(chan struct{}, 1), + services: make(map[string]*structs.NodeService), + serviceStatus: make(map[string]syncStatus), + checks: make(map[string]*structs.HealthCheck), + checkStatus: make(map[string]syncStatus), + triggerCh: make(chan struct{}, 1), + }, } // Setup either the client or the server @@ -90,6 +103,8 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { return nil, err } + // Start the anti entropy routine + go agent.antiEntropy() return agent, nil } diff --git a/command/agent/command.go b/command/agent/command.go index 89e67d4460..73a66514dc 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -196,6 +196,11 @@ func (c *Command) Run(args []string) int { defer c.httpServer.Shutdown() } + // TODO: Register services/checks + + // Let the agent know we've finished registration + c.agent.RegistrationDone() + c.Ui.Output("Consul agent running!") c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName)) c.Ui.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter)) diff --git a/command/agent/local.go b/command/agent/local.go new file mode 100644 index 0000000000..ff586a798e --- /dev/null +++ b/command/agent/local.go @@ -0,0 +1,322 @@ +package agent + +import ( + "github.com/hashicorp/consul/consul/structs" + "reflect" + "sync" + "time" +) + +const ( + syncRetryIntv = 30 * time.Second + maxDelaySync = 30 * time.Second +) + +// syncStatus is used to represent the difference between +// the local and remote state, and if action needs to be taken +type syncStatus struct { + remoteDelete bool // Should this be deleted from the server + inSync bool // Is this in sync with the server +} + +// localState is used to represent the node's services, +// and checks. We used it to perform anti-entropy with the +// catalog representation +type localState struct { + sync.Mutex + + // delaySync is used to delay the initial sync until + // the client has registered its services and checks. + delaySync chan struct{} + + // Services tracks the local services + services map[string]*structs.NodeService + serviceStatus map[string]syncStatus + + // Checks tracks the local checks + checks map[string]*structs.HealthCheck + checkStatus map[string]syncStatus + + // triggerCh is used to inform of a change to local state + // that requires anti-entropy with the server + triggerCh chan struct{} +} + +// changeMade is used to trigger an anti-entropy run +func (l *localState) changeMade() { + select { + case l.triggerCh <- struct{}{}: + default: + } +} + +// RegistrationDone is called by the Agent client once base Services +// and Checks are registered. This is called to prevent a race +// between clients and the anti-entropy routines +func (a *Agent) RegistrationDone() { + select { + case a.state.delaySync <- struct{}{}: + default: + } +} + +// AddService is used to add a service entry to the local state. +// This entry is persistent and the agent will make a best effort to +// ensure it is registered +func (a *Agent) AddService(service *structs.NodeService) { + a.state.Lock() + defer a.state.Unlock() + + a.state.services[service.ID] = service + a.state.serviceStatus[service.ID] = syncStatus{} + a.state.changeMade() +} + +// RemoveService is used to remove a service entry from the local state. +// The agent will make a best effort to ensure it is deregistered +func (a *Agent) RemoveService(serviceID string) { + a.state.Lock() + defer a.state.Unlock() + + delete(a.state.services, serviceID) + a.state.serviceStatus[serviceID] = syncStatus{remoteDelete: true} + a.state.changeMade() +} + +// AddCheck is used to add a health check to the local state. +// This entry is persistent and the agent will make a best effort to +// ensure it is registered +func (a *Agent) AddCheck(check *structs.HealthCheck) { + a.state.Lock() + defer a.state.Unlock() + + a.state.checks[check.CheckID] = check + a.state.checkStatus[check.CheckID] = syncStatus{} + a.state.changeMade() +} + +// RemoveCheck is used to remove a health check from the local state. +// The agent will make a best effort to ensure it is deregistered +func (a *Agent) RemoveCheck(checkID string) { + a.state.Lock() + defer a.state.Unlock() + + delete(a.state.checks, checkID) + a.state.checkStatus[checkID] = syncStatus{remoteDelete: true} + a.state.changeMade() +} + +// UpdateCheck is used to update the status of a check +func (a *Agent) UpdateCheck(checkID, status string) { + a.state.Lock() + defer a.state.Unlock() + + check, ok := a.state.checks[checkID] + if !ok { + return + } + + // Do nothing if update is idempotent + if check.Status == status { + return + } + + // Update status and mark out of sync + check.Status = status + a.state.checkStatus[checkID] = syncStatus{inSync: false} + a.state.changeMade() +} + +// antiEntropy is a long running method used to perform anti-entropy +// between local and remote state. +func (a *Agent) antiEntropy() { + // Delay the initial sync until client has a chance to register + select { + case <-a.state.delaySync: + case <-time.After(maxDelaySync): + a.logger.Printf("[WARN] Client failed to call RegisterDone within %v", maxDelaySync) + case <-a.shutdownCh: + return + } + +SYNC: + // Sync our state with the servers + for !a.shutdown { + if err := a.setSyncState(); err != nil { + a.logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + time.Sleep(aeScale(syncRetryIntv, len(a.LANMembers()))) + continue + } + break + } + + // Force-trigger AE to pickup any changes + a.state.changeMade() + + // Schedule the next full sync, with a random stagger + aeIntv := aeScale(a.config.AEInterval, len(a.LANMembers())) + aeIntv = aeIntv + randomStagger(aeIntv) + aeTimer := time.After(aeIntv) + + // Wait for sync events + for { + select { + case <-aeTimer: + goto SYNC + case <-a.state.triggerCh: + if err := a.syncChanges(); err != nil { + a.logger.Printf("[ERR] agent: failed to sync changes: %v", err) + } + case <-a.shutdownCh: + return + } + } +} + +// setSyncState does a read of the server state, and updates +// the local syncStatus as appropriate +func (a *Agent) setSyncState() error { + req := structs.NodeSpecificRequest{ + Datacenter: a.config.Datacenter, + Node: a.config.NodeName, + } + var services structs.NodeServices + var checks structs.HealthChecks + if e := a.RPC("Catalog.NodeServices", &req, &services); e != nil { + return e + } + if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil { + return err + } + + a.state.Lock() + defer a.state.Unlock() + + for id, service := range services.Services { + // If we don't have the service locally, deregister it + existing, ok := a.state.services[id] + if !ok { + a.state.serviceStatus[id] = syncStatus{remoteDelete: true} + continue + } + + // If our definition is different, we need to update it + equal := !reflect.DeepEqual(existing, service) + a.state.serviceStatus[id] = syncStatus{inSync: equal} + } + + for _, check := range checks { + // If we don't have the check locally, deregister it + id := check.CheckID + existing, ok := a.state.checks[id] + if !ok { + a.state.checkStatus[id] = syncStatus{remoteDelete: true} + continue + } + + // If our definition is different, we need to update it + equal := !reflect.DeepEqual(existing, check) + a.state.checkStatus[id] = syncStatus{inSync: equal} + } + return nil +} + +// syncChanges is used to scan the status our local services and checks +// and update any that are out of sync with the server +func (a *Agent) syncChanges() error { + a.state.Lock() + defer a.state.Unlock() + + // Sync the services + for id, status := range a.state.serviceStatus { + if status.remoteDelete { + if err := a.deleteService(id); err != nil { + return err + } + } else if !status.inSync { + if err := a.syncService(id); err != nil { + return err + } + } + } + + // Sync the checks + for id, status := range a.state.checkStatus { + if status.remoteDelete { + if err := a.deleteCheck(id); err != nil { + return err + } + } else if !status.inSync { + if err := a.syncCheck(id); err != nil { + return err + } + } + } + return nil +} + +// deleteService is used to delete a service from the server +func (a *Agent) deleteService(id string) error { + req := structs.DeregisterRequest{ + Datacenter: a.config.Datacenter, + Node: a.config.NodeName, + ServiceID: id, + } + var out struct{} + err := a.RPC("Catalog.Deregister", &req, &out) + if err == nil { + delete(a.state.serviceStatus, id) + a.logger.Printf("[INFO] Deregistered service '%s'", id) + } + return err +} + +// deleteCheck is used to delete a service from the server +func (a *Agent) deleteCheck(id string) error { + req := structs.DeregisterRequest{ + Datacenter: a.config.Datacenter, + Node: a.config.NodeName, + CheckID: id, + } + var out struct{} + err := a.RPC("Catalog.Deregister", &req, &out) + if err == nil { + delete(a.state.checkStatus, id) + a.logger.Printf("[INFO] Deregistered check '%s'", id) + } + return err +} + +// syncService is used to sync a service to the server +func (a *Agent) syncService(id string) error { + req := structs.RegisterRequest{ + Datacenter: a.config.Datacenter, + Node: a.config.NodeName, + Address: a.config.AdvertiseAddr, + Service: a.state.services[id], + } + var out struct{} + err := a.RPC("Catalog.Register", &req, &out) + if err == nil { + a.state.serviceStatus[id] = syncStatus{inSync: true} + a.logger.Printf("[INFO] Synced service '%s'", id) + } + return err +} + +// syncCheck is used to sync a service to the server +func (a *Agent) syncCheck(id string) error { + req := structs.RegisterRequest{ + Datacenter: a.config.Datacenter, + Node: a.config.NodeName, + Address: a.config.AdvertiseAddr, + Check: a.state.checks[id], + } + var out struct{} + err := a.RPC("Catalog.Register", &req, &out) + if err == nil { + a.state.checkStatus[id] = syncStatus{inSync: true} + a.logger.Printf("[INFO] Synced check '%s'", id) + } + return err +}