local state: move to separate package

This patch moves the local state to a separate package to further
decouple it from the agent code.

The code compiles but the tests do not yet.
This commit is contained in:
Frank Schroeder 2017-08-28 14:17:12 +02:00
parent c00bbdb5e4
commit 6027a9e2a5
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
3 changed files with 128 additions and 82 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/hashicorp/consul/agent/ae" "github.com/hashicorp/consul/agent/ae"
"github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd" "github.com/hashicorp/consul/agent/systemd"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
@ -108,7 +109,7 @@ type Agent struct {
// state stores a local representation of the node, // state stores a local representation of the node,
// services and checks. Used for anti-entropy. // services and checks. Used for anti-entropy.
state *localState state *local.State
// sync manages the synchronization of the local // sync manages the synchronization of the local
// and the remote state. // and the remote state.
@ -255,7 +256,19 @@ func (a *Agent) Start() error {
triggerCh := make(chan struct{}, 1) triggerCh := make(chan struct{}, 1)
// create the local state // create the local state
a.state = NewLocalState(c, a.logger, a.tokens, triggerCh) lc := local.Config{
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
CheckUpdateInterval: c.CheckUpdateInterval,
Datacenter: c.Datacenter,
DiscardCheckOutput: c.DiscardCheckOutput,
NodeID: c.NodeID,
NodeName: c.NodeName,
TaggedAddresses: map[string]string{},
}
for k, v := range c.TaggedAddresses {
lc.TaggedAddresses[k] = v
}
a.state = local.NewState(lc, a.logger, a.tokens, triggerCh)
// create the state synchronization manager which performs // create the state synchronization manager which performs
// regular and on-demand state synchronizations (anti-entropy). // regular and on-demand state synchronizations (anti-entropy).
@ -293,7 +306,7 @@ func (a *Agent) Start() error {
} }
a.delegate = server a.delegate = server
a.state.delegate = server a.state.SetDelegate(server)
a.sync.ClusterSize = func() int { return len(server.LANMembers()) } a.sync.ClusterSize = func() int { return len(server.LANMembers()) }
} else { } else {
client, err := consul.NewClientLogger(consulCfg, a.logger) client, err := consul.NewClientLogger(consulCfg, a.logger)
@ -302,7 +315,7 @@ func (a *Agent) Start() error {
} }
a.delegate = client a.delegate = client
a.state.delegate = client a.state.SetDelegate(client)
a.sync.ClusterSize = func() int { return len(client.LANMembers()) } a.sync.ClusterSize = func() int { return len(client.LANMembers()) }
} }
@ -2005,15 +2018,13 @@ func (a *Agent) GossipEncrypted() bool {
// Stats is used to get various debugging state from the sub-systems // Stats is used to get various debugging state from the sub-systems
func (a *Agent) Stats() map[string]map[string]string { func (a *Agent) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
stats := a.delegate.Stats() stats := a.delegate.Stats()
stats["agent"] = map[string]string{ stats["agent"] = map[string]string{
"check_monitors": toString(uint64(len(a.checkMonitors))), "check_monitors": strconv.Itoa(len(a.checkMonitors)),
"check_ttls": toString(uint64(len(a.checkTTLs))), "check_ttls": strconv.Itoa(len(a.checkTTLs)),
"checks": toString(uint64(len(a.state.checks))), }
"services": toString(uint64(len(a.state.services))), for k, v := range a.state.Stats() {
stats["agent"][k] = v
} }
revision := a.config.Revision revision := a.config.Revision
@ -2136,7 +2147,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
} }
serviceID := p.Service.ID serviceID := p.Service.ID
if _, ok := a.state.services[serviceID]; ok { if a.state.Service(serviceID) != nil {
// Purge previously persisted service. This allows config to be // Purge previously persisted service. This allows config to be
// preferred over services persisted from the API. // preferred over services persisted from the API.
a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q", a.logger.Printf("[DEBUG] agent: service %q exists, not restoring from %q",
@ -2215,7 +2226,7 @@ func (a *Agent) loadChecks(conf *config.RuntimeConfig) error {
} }
checkID := p.Check.CheckID checkID := p.Check.CheckID
if _, ok := a.state.checks[checkID]; ok { if a.state.Check(checkID) != nil {
// Purge previously persisted check. This allows config to be // Purge previously persisted check. This allows config to be
// preferred over persisted checks from the API. // preferred over persisted checks from the API.
a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q", a.logger.Printf("[DEBUG] agent: check %q exists, not restoring from %q",
@ -2273,26 +2284,17 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
// loadMetadata loads node metadata fields from the agent config and // loadMetadata loads node metadata fields from the agent config and
// updates them on the local agent. // updates them on the local agent.
func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error { func (a *Agent) loadMetadata(conf *config.RuntimeConfig) error {
a.state.Lock() meta := map[string]string{}
defer a.state.Unlock() for k, v := range conf.NodeMeta {
meta[k] = v
for key, value := range conf.NodeMeta {
a.state.metadata[key] = value
} }
meta[structs.MetaSegmentKey] = conf.SegmentName
a.state.metadata[structs.MetaSegmentKey] = conf.SegmentName return a.state.LoadMetadata(meta)
a.state.changeMade()
return nil
} }
// unloadMetadata resets the local metadata state // unloadMetadata resets the local metadata state
func (a *Agent) unloadMetadata() { func (a *Agent) unloadMetadata() {
a.state.Lock() a.state.UnloadMetadata()
defer a.state.Unlock()
a.state.metadata = make(map[string]string)
} }
// serviceMaintCheckID returns the ID of a given service's maintenance check // serviceMaintCheckID returns the ID of a given service's maintenance check

View File

@ -1,16 +1,16 @@
package agent package local
import ( import (
"fmt" "fmt"
"log" "log"
"reflect" "reflect"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
@ -18,34 +18,41 @@ import (
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
) )
// permissionDenied is returned when an ACL based rejection happens.
const permissionDenied = "Permission denied"
// syncStatus is used to represent the difference between // syncStatus is used to represent the difference between
// the local and remote state, and if action needs to be taken // the local and remote state, and if action needs to be taken
type syncStatus struct { type syncStatus struct {
inSync bool // Is this in sync with the server inSync bool // Is this in sync with the server
} }
// localStateConfig is the configuration for the localState. It is // Config is the configuration for the State. It is
// populated during NewLocalAgent from the agent configuration to avoid // populated during NewLocalAgent from the agent configuration to avoid
// race conditions with the agent configuration. // race conditions with the agent configuration.
type localStateConfig struct { type Config struct {
AdvertiseAddr string AdvertiseAddr string
CheckUpdateInterval time.Duration CheckUpdateInterval time.Duration
Datacenter string Datacenter string
DiscardCheckOutput bool
NodeID types.NodeID NodeID types.NodeID
NodeName string NodeName string
TaggedAddresses map[string]string TaggedAddresses map[string]string
Tokens *token.Store
} }
// localState is used to represent the node's services, type delegate interface {
RPC(method string, args interface{}, reply interface{}) error
}
// State is used to represent the node's services,
// and checks. We used it to perform anti-entropy with the // and checks. We used it to perform anti-entropy with the
// catalog representation // catalog representation
type localState struct { type State struct {
sync.RWMutex sync.RWMutex
logger *log.Logger logger *log.Logger
// Config is the agent config // Config is the agent config
config localStateConfig config Config
// delegate is the consul interface to use for keeping in sync // delegate is the consul interface to use for keeping in sync
delegate delegate delegate delegate
@ -78,25 +85,14 @@ type localState struct {
// discardCheckOutput stores whether the output of health checks // discardCheckOutput stores whether the output of health checks
// is stored in the raft log. // is stored in the raft log.
discardCheckOutput atomic.Value // bool discardCheckOutput atomic.Value // bool
tokens *token.Store
} }
// NewLocalState creates a is used to initialize the local state // NewLocalState creates a is used to initialize the local state
func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *localState { func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State {
lc := localStateConfig{ l := &State{
AdvertiseAddr: c.AdvertiseAddrLAN.String(), config: c,
CheckUpdateInterval: c.CheckUpdateInterval,
Datacenter: c.Datacenter,
NodeID: c.NodeID,
NodeName: c.NodeName,
TaggedAddresses: map[string]string{},
Tokens: tokens,
}
for k, v := range c.TaggedAddresses {
lc.TaggedAddresses[k] = v
}
l := &localState{
config: lc,
logger: lg, logger: lg,
services: make(map[string]*structs.NodeService), services: make(map[string]*structs.NodeService),
serviceStatus: make(map[string]syncStatus), serviceStatus: make(map[string]syncStatus),
@ -108,13 +104,18 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store,
deferCheck: make(map[types.CheckID]*time.Timer), deferCheck: make(map[types.CheckID]*time.Timer),
metadata: make(map[string]string), metadata: make(map[string]string),
triggerCh: triggerCh, triggerCh: triggerCh,
tokens: tokens,
} }
l.discardCheckOutput.Store(c.DiscardCheckOutput) l.discardCheckOutput.Store(c.DiscardCheckOutput)
return l return l
} }
func (l *State) SetDelegate(d delegate) {
l.delegate = d
}
// changeMade is used to trigger an anti-entropy run // changeMade is used to trigger an anti-entropy run
func (l *localState) changeMade() { func (l *State) changeMade() {
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
select { select {
case l.triggerCh <- struct{}{}: case l.triggerCh <- struct{}{}:
@ -122,23 +123,23 @@ func (l *localState) changeMade() {
} }
} }
func (l *localState) SetDiscardCheckOutput(b bool) { func (l *State) SetDiscardCheckOutput(b bool) {
l.discardCheckOutput.Store(b) l.discardCheckOutput.Store(b)
} }
// ServiceToken returns the configured ACL token for the given // ServiceToken returns the configured ACL token for the given
// service ID. If none is present, the agent's token is returned. // service ID. If none is present, the agent's token is returned.
func (l *localState) ServiceToken(id string) string { func (l *State) ServiceToken(id string) string {
l.RLock() l.RLock()
defer l.RUnlock() defer l.RUnlock()
return l.serviceToken(id) return l.serviceToken(id)
} }
// serviceToken returns an ACL token associated with a service. // serviceToken returns an ACL token associated with a service.
func (l *localState) serviceToken(id string) string { func (l *State) serviceToken(id string) string {
token := l.serviceTokens[id] token := l.serviceTokens[id]
if token == "" { if token == "" {
token = l.config.Tokens.UserToken() token = l.tokens.UserToken()
} }
return token return token
} }
@ -146,7 +147,7 @@ func (l *localState) serviceToken(id string) string {
// AddService is used to add a service entry to the local state. // AddService is used to add a service entry to the local state.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (l *localState) AddService(service *structs.NodeService, token string) { func (l *State) AddService(service *structs.NodeService, token string) {
// Assign the ID if none given // Assign the ID if none given
if service.ID == "" && service.Service != "" { if service.ID == "" && service.Service != "" {
service.ID = service.Service service.ID = service.Service
@ -163,7 +164,7 @@ func (l *localState) AddService(service *structs.NodeService, token string) {
// RemoveService is used to remove a service entry from the local state. // RemoveService is used to remove a service entry from the local state.
// The agent will make a best effort to ensure it is deregistered // The agent will make a best effort to ensure it is deregistered
func (l *localState) RemoveService(serviceID string) error { func (l *State) RemoveService(serviceID string) error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -180,9 +181,17 @@ func (l *localState) RemoveService(serviceID string) error {
return nil return nil
} }
// Service returns the locally registered service that the
// agent is aware of and are being kept in sync with the server
func (l *State) Service(id string) *structs.NodeService {
l.RLock()
defer l.RUnlock()
return l.services[id]
}
// Services returns the locally registered services that the // Services returns the locally registered services that the
// agent is aware of and are being kept in sync with the server // agent is aware of and are being kept in sync with the server
func (l *localState) Services() map[string]*structs.NodeService { func (l *State) Services() map[string]*structs.NodeService {
services := make(map[string]*structs.NodeService) services := make(map[string]*structs.NodeService)
l.RLock() l.RLock()
defer l.RUnlock() defer l.RUnlock()
@ -195,17 +204,17 @@ func (l *localState) Services() map[string]*structs.NodeService {
// CheckToken is used to return the configured health check token for a // CheckToken is used to return the configured health check token for a
// Check, or if none is configured, the default agent ACL token. // Check, or if none is configured, the default agent ACL token.
func (l *localState) CheckToken(checkID types.CheckID) string { func (l *State) CheckToken(checkID types.CheckID) string {
l.RLock() l.RLock()
defer l.RUnlock() defer l.RUnlock()
return l.checkToken(checkID) return l.checkToken(checkID)
} }
// checkToken returns an ACL token associated with a check. // checkToken returns an ACL token associated with a check.
func (l *localState) checkToken(checkID types.CheckID) string { func (l *State) checkToken(checkID types.CheckID) string {
token := l.checkTokens[checkID] token := l.checkTokens[checkID]
if token == "" { if token == "" {
token = l.config.Tokens.UserToken() token = l.tokens.UserToken()
} }
return token return token
} }
@ -213,7 +222,7 @@ func (l *localState) checkToken(checkID types.CheckID) string {
// AddCheck is used to add a health check to the local state. // AddCheck is used to add a health check to the local state.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (l *localState) AddCheck(check *structs.HealthCheck, token string) error { func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -240,7 +249,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck, token string) error {
// RemoveCheck is used to remove a health check from the local state. // RemoveCheck is used to remove a health check from the local state.
// The agent will make a best effort to ensure it is deregistered // The agent will make a best effort to ensure it is deregistered
func (l *localState) RemoveCheck(checkID types.CheckID) { func (l *State) RemoveCheck(checkID types.CheckID) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -253,7 +262,7 @@ func (l *localState) RemoveCheck(checkID types.CheckID) {
} }
// UpdateCheck is used to update the status of a check // UpdateCheck is used to update the status of a check
func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) { func (l *State) UpdateCheck(checkID types.CheckID, status, output string) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -311,9 +320,17 @@ func (l *localState) UpdateCheck(checkID types.CheckID, status, output string) {
l.changeMade() l.changeMade()
} }
// Check returns the locally registered check that the
// agent is aware of and are being kept in sync with the server
func (l *State) Check(id types.CheckID) *structs.HealthCheck {
l.RLock()
defer l.RUnlock()
return l.checks[id]
}
// Checks returns the locally registered checks that the // Checks returns the locally registered checks that the
// agent is aware of and are being kept in sync with the server // agent is aware of and are being kept in sync with the server
func (l *localState) Checks() map[types.CheckID]*structs.HealthCheck { func (l *State) Checks() map[types.CheckID]*structs.HealthCheck {
l.RLock() l.RLock()
defer l.RUnlock() defer l.RUnlock()
@ -337,7 +354,7 @@ type CriticalCheck struct {
// aware of and are being kept in sync with the server, and that are in a // aware of and are being kept in sync with the server, and that are in a
// critical state. This also returns information about how long each check has // critical state. This also returns information about how long each check has
// been critical. // been critical.
func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck { func (l *State) CriticalChecks() map[types.CheckID]CriticalCheck {
checks := make(map[types.CheckID]CriticalCheck) checks := make(map[types.CheckID]CriticalCheck)
l.RLock() l.RLock()
@ -356,7 +373,7 @@ func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
// Metadata returns the local node metadata fields that the // Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server // agent is aware of and are being kept in sync with the server
func (l *localState) Metadata() map[string]string { func (l *State) Metadata() map[string]string {
metadata := make(map[string]string) metadata := make(map[string]string)
l.RLock() l.RLock()
defer l.RUnlock() defer l.RUnlock()
@ -369,14 +386,11 @@ func (l *localState) Metadata() map[string]string {
// UpdateSyncState does a read of the server state, and updates // UpdateSyncState does a read of the server state, and updates
// the local sync status as appropriate // the local sync status as appropriate
func (l *localState) UpdateSyncState() error { func (l *State) UpdateSyncState() error {
if l == nil {
panic("config == nil")
}
req := structs.NodeSpecificRequest{ req := structs.NodeSpecificRequest{
Datacenter: l.config.Datacenter, Datacenter: l.config.Datacenter,
Node: l.config.NodeName, Node: l.config.NodeName,
QueryOptions: structs.QueryOptions{Token: l.config.Tokens.AgentToken()}, QueryOptions: structs.QueryOptions{Token: l.tokens.AgentToken()},
} }
var out1 structs.IndexedNodeServices var out1 structs.IndexedNodeServices
var out2 structs.IndexedHealthChecks var out2 structs.IndexedHealthChecks
@ -498,7 +512,7 @@ func (l *localState) UpdateSyncState() error {
// SyncChanges is used to scan the status our local services and checks // SyncChanges is used to scan the status our local services and checks
// and update any that are out of sync with the server // and update any that are out of sync with the server
func (l *localState) SyncChanges() error { func (l *State) SyncChanges() error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
@ -555,8 +569,38 @@ func (l *localState) SyncChanges() error {
return nil return nil
} }
// LoadMetadata loads node metadata fields from the agent config and
// updates them on the local agent.
func (l *State) LoadMetadata(data map[string]string) error {
l.Lock()
defer l.Unlock()
for k, v := range data {
l.metadata[k] = v
}
l.changeMade()
return nil
}
// UnloadMetadata resets the local metadata state
func (l *State) UnloadMetadata() {
l.Lock()
defer l.Unlock()
l.metadata = make(map[string]string)
}
// Stats is used to get various debugging state from the sub-systems
func (l *State) Stats() map[string]string {
l.RLock()
defer l.RUnlock()
return map[string]string{
"services": strconv.Itoa(len(l.services)),
"checks": strconv.Itoa(len(l.checks)),
}
}
// deleteService is used to delete a service from the server // deleteService is used to delete a service from the server
func (l *localState) deleteService(id string) error { func (l *State) deleteService(id string) error {
if id == "" { if id == "" {
return fmt.Errorf("ServiceID missing") return fmt.Errorf("ServiceID missing")
} }
@ -583,7 +627,7 @@ func (l *localState) deleteService(id string) error {
} }
// deleteCheck is used to delete a check from the server // deleteCheck is used to delete a check from the server
func (l *localState) deleteCheck(id types.CheckID) error { func (l *State) deleteCheck(id types.CheckID) error {
if id == "" { if id == "" {
return fmt.Errorf("CheckID missing") return fmt.Errorf("CheckID missing")
} }
@ -610,7 +654,7 @@ func (l *localState) deleteCheck(id types.CheckID) error {
} }
// syncService is used to sync a service to the server // syncService is used to sync a service to the server
func (l *localState) syncService(id string) error { func (l *State) syncService(id string) error {
req := structs.RegisterRequest{ req := structs.RegisterRequest{
Datacenter: l.config.Datacenter, Datacenter: l.config.Datacenter,
ID: l.config.NodeID, ID: l.config.NodeID,
@ -667,7 +711,7 @@ func (l *localState) syncService(id string) error {
} }
// syncCheck is used to sync a check to the server // syncCheck is used to sync a check to the server
func (l *localState) syncCheck(id types.CheckID) error { func (l *State) syncCheck(id types.CheckID) error {
// Pull in the associated service if any // Pull in the associated service if any
check := l.checks[id] check := l.checks[id]
var service *structs.NodeService var service *structs.NodeService
@ -704,7 +748,7 @@ func (l *localState) syncCheck(id types.CheckID) error {
return err return err
} }
func (l *localState) syncNodeInfo() error { func (l *State) syncNodeInfo() error {
req := structs.RegisterRequest{ req := structs.RegisterRequest{
Datacenter: l.config.Datacenter, Datacenter: l.config.Datacenter,
ID: l.config.NodeID, ID: l.config.NodeID,
@ -712,7 +756,7 @@ func (l *localState) syncNodeInfo() error {
Address: l.config.AdvertiseAddr, Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses, TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata, NodeMeta: l.metadata,
WriteRequest: structs.WriteRequest{Token: l.config.Tokens.AgentToken()}, WriteRequest: structs.WriteRequest{Token: l.tokens.AgentToken()},
} }
var out struct{} var out struct{}
err := l.delegate.RPC("Catalog.Register", &req, &out) err := l.delegate.RPC("Catalog.Register", &req, &out)

View File

@ -1,4 +1,4 @@
package agent package local
import ( import (
"reflect" "reflect"