From 6027a9e2a51860f7a90f99a539215faa17b8b768 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Mon, 28 Aug 2017 14:17:12 +0200 Subject: [PATCH] local state: move to separate package This patch moves the local state to a separate package to further decouple it from the agent code. The code compiles but the tests do not yet. --- agent/agent.go | 58 +++---- agent/{local.go => local/state.go} | 150 ++++++++++++------- agent/{local_test.go => local/state_test.go} | 2 +- 3 files changed, 128 insertions(+), 82 deletions(-) rename agent/{local.go => local/state.go} (85%) rename agent/{local_test.go => local/state_test.go} (99%) diff --git a/agent/agent.go b/agent/agent.go index 40f98c585c..600432e8b3 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/consul/agent/ae" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/systemd" "github.com/hashicorp/consul/agent/token" @@ -108,7 +109,7 @@ type Agent struct { // state stores a local representation of the node, // services and checks. Used for anti-entropy. - state *localState + state *local.State // sync manages the synchronization of the local // and the remote state. @@ -255,7 +256,19 @@ func (a *Agent) Start() error { triggerCh := make(chan struct{}, 1) // create the local state - a.state = NewLocalState(c, a.logger, a.tokens, triggerCh) + lc := local.Config{ + AdvertiseAddr: c.AdvertiseAddrLAN.String(), + CheckUpdateInterval: c.CheckUpdateInterval, + Datacenter: c.Datacenter, + DiscardCheckOutput: c.DiscardCheckOutput, + NodeID: c.NodeID, + NodeName: c.NodeName, + TaggedAddresses: map[string]string{}, + } + for k, v := range c.TaggedAddresses { + lc.TaggedAddresses[k] = v + } + a.state = local.NewState(lc, a.logger, a.tokens, triggerCh) // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). @@ -293,7 +306,7 @@ func (a *Agent) Start() error { } a.delegate = server - a.state.delegate = server + a.state.SetDelegate(server) a.sync.ClusterSize = func() int { return len(server.LANMembers()) } } else { client, err := consul.NewClientLogger(consulCfg, a.logger) @@ -302,7 +315,7 @@ func (a *Agent) Start() error { } a.delegate = client - a.state.delegate = client + a.state.SetDelegate(client) a.sync.ClusterSize = func() int { return len(client.LANMembers()) } } @@ -2005,15 +2018,13 @@ func (a *Agent) GossipEncrypted() bool { // Stats is used to get various debugging state from the sub-systems func (a *Agent) Stats() map[string]map[string]string { - toString := func(v uint64) string { - return strconv.FormatUint(v, 10) - } stats := a.delegate.Stats() stats["agent"] = map[string]string{ - "check_monitors": toString(uint64(len(a.checkMonitors))), - "check_ttls": toString(uint64(len(a.checkTTLs))), - "checks": toString(uint64(len(a.state.checks))), - "services": toString(uint64(len(a.state.services))), + "check_monitors": strconv.Itoa(len(a.checkMonitors)), + "check_ttls": strconv.Itoa(len(a.checkTTLs)), + } + for k, v := range a.state.Stats() { + stats["agent"][k] = v } revision := a.config.Revision @@ -2136,7 +2147,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { } serviceID := p.Service.ID - if _, ok := a.state.services[serviceID]; ok { + if a.state.Service(serviceID) != nil { // Purge previously persisted service. This allows config to be // preferred over services persisted from the API. a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q", @@ -2215,7 +2226,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error { } checkID := p.Check.CheckID - if _, ok := a.state.checks[checkID]; ok { + if a.state.Check(checkID) != nil { // Purge previously persisted check. This allows config to be // preferred over persisted checks from the API. a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q", @@ -2273,26 +2284,17 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) { // loadMetadata loads node metadata fields from the agent config and // updates them on the local agent. func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error { - a.state.Lock() - defer a.state.Unlock() - - for key, value := range conf.NodeMeta { - a.state.metadata[key] = value + meta := map[string]string{} + for k, v := range conf.NodeMeta { + meta[k] = v } - - a.state.metadata[structs.MetaSegmentKey] = conf.SegmentName - - a.state.changeMade() - - return nil + meta[structs.MetaSegmentKey] = conf.SegmentName + return a.state.LoadMetadata(meta) } // unloadMetadata resets the local metadata state func (a *Agent) unloadMetadata() { - a.state.Lock() - defer a.state.Unlock() - - a.state.metadata = make(map[string]string) + a.state.UnloadMetadata() } // serviceMaintCheckID returns the ID of a given service's maintenance check diff --git a/agent/local.go b/agent/local/state.go similarity index 85% rename from agent/local.go rename to agent/local/state.go index 58da5f7f38..3e8379696b 100644 --- a/agent/local.go +++ b/agent/local/state.go @@ -1,16 +1,16 @@ -package agent +package local import ( "fmt" "log" "reflect" + "strconv" "strings" "sync" "sync/atomic" "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" @@ -18,34 +18,41 @@ import ( "github.com/hashicorp/consul/types" ) +// permissionDenied is returned when an ACL based rejection happens. +const permissionDenied = "Permission denied" + // syncStatus is used to represent the difference between // the local and remote state, and if action needs to be taken type syncStatus struct { inSync bool // Is this in sync with the server } -// localStateConfig is the configuration for the localState. It is +// Config is the configuration for the State. It is // populated during NewLocalAgent from the agent configuration to avoid // race conditions with the agent configuration. -type localStateConfig struct { +type Config struct { AdvertiseAddr string CheckUpdateInterval time.Duration Datacenter string + DiscardCheckOutput bool NodeID types.NodeID NodeName string TaggedAddresses map[string]string - Tokens *token.Store } -// localState is used to represent the node's services, +type delegate interface { + RPC(method string, args interface{}, reply interface{}) error +} + +// State is used to represent the node's services, // and checks. We used it to perform anti-entropy with the // catalog representation -type localState struct { +type State struct { sync.RWMutex logger *log.Logger // Config is the agent config - config localStateConfig + config Config // delegate is the consul interface to use for keeping in sync delegate delegate @@ -78,25 +85,14 @@ type localState struct { // discardCheckOutput stores whether the output of health checks // is stored in the raft log. discardCheckOutput atomic.Value // bool + + tokens *token.Store } // NewLocalState creates a is used to initialize the local state -func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *localState { - lc := localStateConfig{ - AdvertiseAddr: c.AdvertiseAddrLAN.String(), - CheckUpdateInterval: c.CheckUpdateInterval, - Datacenter: c.Datacenter, - NodeID: c.NodeID, - NodeName: c.NodeName, - TaggedAddresses: map[string]string{}, - Tokens: tokens, - } - for k, v := range c.TaggedAddresses { - lc.TaggedAddresses[k] = v - } - - l := &localState{ - config: lc, +func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State { + l := &State{ + config: c, logger: lg, services: make(map[string]*structs.NodeService), serviceStatus: make(map[string]syncStatus), @@ -108,13 +104,18 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, deferCheck: make(map[types.CheckID]*time.Timer), metadata: make(map[string]string), triggerCh: triggerCh, + tokens: tokens, } l.discardCheckOutput.Store(c.DiscardCheckOutput) return l } +func (l *State) SetDelegate(d delegate) { + l.delegate = d +} + // changeMade is used to trigger an anti-entropy run -func (l *localState) changeMade() { +func (l *State) changeMade() { // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer select { case l.triggerCh <- struct{}{}: @@ -122,23 +123,23 @@ func (l *localState) changeMade() { } } -func (l *localState) SetDiscardCheckOutput(b bool) { +func (l *State) SetDiscardCheckOutput(b bool) { l.discardCheckOutput.Store(b) } // ServiceToken returns the configured ACL token for the given // service ID. If none is present, the agent's token is returned. -func (l *localState) ServiceToken(id string) string { +func (l *State) ServiceToken(id string) string { l.RLock() defer l.RUnlock() return l.serviceToken(id) } // serviceToken returns an ACL token associated with a service. -func (l *localState) serviceToken(id string) string { +func (l *State) serviceToken(id string) string { token := l.serviceTokens[id] if token == "" { - token = l.config.Tokens.UserToken() + token = l.tokens.UserToken() } return token } @@ -146,7 +147,7 @@ func (l *localState) serviceToken(id string) string { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddService(service *structs.NodeService, token string) { +func (l *State) AddService(service *structs.NodeService, token string) { // Assign the ID if none given if service.ID == "" && service.Service != "" { service.ID = service.Service @@ -163,7 +164,7 @@ func (l *localState) AddService(service *structs.NodeService, token string) { // RemoveService is used to remove a service entry from the local state. // The agent will make a best effort to ensure it is deregistered -func (l *localState) RemoveService(serviceID string) error { +func (l *State) RemoveService(serviceID string) error { l.Lock() defer l.Unlock() @@ -180,9 +181,17 @@ func (l *localState) RemoveService(serviceID string) error { return nil } +// Service returns the locally registered service that the +// agent is aware of and are being kept in sync with the server +func (l *State) Service(id string) *structs.NodeService { + l.RLock() + defer l.RUnlock() + return l.services[id] +} + // Services returns the locally registered services that the // agent is aware of and are being kept in sync with the server -func (l *localState) Services() map[string]*structs.NodeService { +func (l *State) Services() map[string]*structs.NodeService { services := make(map[string]*structs.NodeService) l.RLock() defer l.RUnlock() @@ -195,17 +204,17 @@ func (l *localState) Services() map[string]*structs.NodeService { // CheckToken is used to return the configured health check token for a // Check, or if none is configured, the default agent ACL token. -func (l *localState) CheckToken(checkID types.CheckID) string { +func (l *State) CheckToken(checkID types.CheckID) string { l.RLock() defer l.RUnlock() return l.checkToken(checkID) } // checkToken returns an ACL token associated with a check. -func (l *localState) checkToken(checkID types.CheckID) string { +func (l *State) checkToken(checkID types.CheckID) string { token := l.checkTokens[checkID] if token == "" { - token = l.config.Tokens.UserToken() + token = l.tokens.UserToken() } return token } @@ -213,7 +222,7 @@ func (l *localState) checkToken(checkID types.CheckID) string { // AddCheck is used to add a health check to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered -func (l *localState) AddCheck(check *structs.HealthCheck, token string) error { +func (l *State) AddCheck(check *structs.HealthCheck, token string) error { l.Lock() defer l.Unlock() @@ -240,7 +249,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) error { // RemoveCheck is used to remove a health check from the local state. // The agent will make a best effort to ensure it is deregistered -func (l *localState) RemoveCheck(checkID types.CheckID) { +func (l *State) RemoveCheck(checkID types.CheckID) { l.Lock() defer l.Unlock() @@ -253,7 +262,7 @@ func (l *localState) RemoveCheck(checkID types.CheckID) { } // UpdateCheck is used to update the status of a check -func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) { +func (l *State) UpdateCheck(checkID types.CheckID, status, output string) { l.Lock() defer l.Unlock() @@ -311,9 +320,17 @@ func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) { l.changeMade() } +// Check returns the locally registered check that the +// agent is aware of and are being kept in sync with the server +func (l *State) Check(id types.CheckID) *structs.HealthCheck { + l.RLock() + defer l.RUnlock() + return l.checks[id] +} + // Checks returns the locally registered checks that the // agent is aware of and are being kept in sync with the server -func (l *localState) Checks() map[types.CheckID]*structs.HealthCheck { +func (l *State) Checks() map[types.CheckID]*structs.HealthCheck { l.RLock() defer l.RUnlock() @@ -337,7 +354,7 @@ type CriticalCheck struct { // aware of and are being kept in sync with the server, and that are in a // critical state. This also returns information about how long each check has // been critical. -func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck { +func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck { checks := make(map[types.CheckID]CriticalCheck) l.RLock() @@ -356,7 +373,7 @@ func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck { // Metadata returns the local node metadata fields that the // agent is aware of and are being kept in sync with the server -func (l *localState) Metadata() map[string]string { +func (l *State) Metadata() map[string]string { metadata := make(map[string]string) l.RLock() defer l.RUnlock() @@ -369,14 +386,11 @@ func (l *localState) Metadata() map[string]string { // UpdateSyncState does a read of the server state, and updates // the local sync status as appropriate -func (l *localState) UpdateSyncState() error { - if l == nil { - panic("config == nil") - } +func (l *State) UpdateSyncState() error { req := structs.NodeSpecificRequest{ Datacenter: l.config.Datacenter, Node: l.config.NodeName, - QueryOptions: structs.QueryOptions{Token: l.config.Tokens.AgentToken()}, + QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()}, } var out1 structs.IndexedNodeServices var out2 structs.IndexedHealthChecks @@ -498,7 +512,7 @@ func (l *localState) UpdateSyncState() error { // SyncChanges is used to scan the status our local services and checks // and update any that are out of sync with the server -func (l *localState) SyncChanges() error { +func (l *State) SyncChanges() error { l.Lock() defer l.Unlock() @@ -555,8 +569,38 @@ func (l *localState) SyncChanges() error { return nil } +// LoadMetadata loads node metadata fields from the agent config and +// updates them on the local agent. +func (l *State) LoadMetadata(data map[string]string) error { + l.Lock() + defer l.Unlock() + + for k, v := range data { + l.metadata[k] = v + } + l.changeMade() + return nil +} + +// UnloadMetadata resets the local metadata state +func (l *State) UnloadMetadata() { + l.Lock() + defer l.Unlock() + l.metadata = make(map[string]string) +} + +// Stats is used to get various debugging state from the sub-systems +func (l *State) Stats() map[string]string { + l.RLock() + defer l.RUnlock() + return map[string]string{ + "services": strconv.Itoa(len(l.services)), + "checks": strconv.Itoa(len(l.checks)), + } +} + // deleteService is used to delete a service from the server -func (l *localState) deleteService(id string) error { +func (l *State) deleteService(id string) error { if id == "" { return fmt.Errorf("ServiceID missing") } @@ -583,7 +627,7 @@ func (l *localState) deleteService(id string) error { } // deleteCheck is used to delete a check from the server -func (l *localState) deleteCheck(id types.CheckID) error { +func (l *State) deleteCheck(id types.CheckID) error { if id == "" { return fmt.Errorf("CheckID missing") } @@ -610,7 +654,7 @@ func (l *localState) deleteCheck(id types.CheckID) error { } // syncService is used to sync a service to the server -func (l *localState) syncService(id string) error { +func (l *State) syncService(id string) error { req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, ID: l.config.NodeID, @@ -667,7 +711,7 @@ func (l *localState) syncService(id string) error { } // syncCheck is used to sync a check to the server -func (l *localState) syncCheck(id types.CheckID) error { +func (l *State) syncCheck(id types.CheckID) error { // Pull in the associated service if any check := l.checks[id] var service *structs.NodeService @@ -704,7 +748,7 @@ func (l *localState) syncCheck(id types.CheckID) error { return err } -func (l *localState) syncNodeInfo() error { +func (l *State) syncNodeInfo() error { req := structs.RegisterRequest{ Datacenter: l.config.Datacenter, ID: l.config.NodeID, @@ -712,7 +756,7 @@ func (l *localState) syncNodeInfo() error { Address: l.config.AdvertiseAddr, TaggedAddresses: l.config.TaggedAddresses, NodeMeta: l.metadata, - WriteRequest: structs.WriteRequest{Token: l.config.Tokens.AgentToken()}, + WriteRequest: structs.WriteRequest{Token: l.tokens.AgentToken()}, } var out struct{} err := l.delegate.RPC("Catalog.Register", &req, &out) diff --git a/agent/local_test.go b/agent/local/state_test.go similarity index 99% rename from agent/local_test.go rename to agent/local/state_test.go index baad74a1e5..2e27c4475a 100644 --- a/agent/local_test.go +++ b/agent/local/state_test.go @@ -1,4 +1,4 @@ -package agent +package local import ( "reflect"