diff --git a/command/agent/agent.go b/command/agent/agent.go index d3f81649e8..14360e04e6 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -48,19 +48,31 @@ const ( "service, but no reason was provided. This is a default message." ) -var ( - // dnsNameRe checks if a name or tag is dns-compatible. - dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`) -) +// dnsNameRe checks if a name or tag is dns-compatible. +var dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`) -/* - The agent is the long running process that is run on every machine. - It exposes an RPC interface that is used by the CLI to control the - agent. The agent runs the query interfaces like HTTP, DNS, and RPC. - However, it can run in either a client, or server mode. In server - mode, it runs a full Consul server. In client-only mode, it only forwards - requests to other Consul servers. -*/ +// clientServer defines the interface shared by both +// consul.Client and consul.Server. +type clientServer interface { + Encrypted() bool + GetLANCoordinate() (*coordinate.Coordinate, error) + Leave() error + LANMembers() []serf.Member + LocalMember() serf.Member + JoinLAN(addrs []string) (n int, err error) + RemoveFailedNode(node string) error + RPC(method string, args interface{}, reply interface{}) error + SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn consul.SnapshotReplyFn) error + Shutdown() error + Stats() map[string]map[string]string +} + +// The agent is the long running process that is run on every machine. +// It exposes an RPC interface that is used by the CLI to control the +// agent. The agent runs the query interfaces like HTTP, DNS, and RPC. +// However, it can run in either a client, or server mode. In server +// mode, it runs a full Consul server. In client-only mode, it only forwards +// requests to other Consul servers. type Agent struct { config *Config @@ -73,10 +85,9 @@ type Agent struct { // Used for streaming logs to logWriter *logger.LogWriter - // We have one of a client or a server, depending - // on our configuration - server *consul.Server - client *consul.Client + // delegate is either a *consul.Server or *consul.Client + // depending on the configuration + delegate clientServer // acls is an object that helps manage local ACL enforcement. acls *aclManager @@ -187,7 +198,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re // Setup either the client or the server. if config.Server { err = agent.setupServer() - agent.state.SetIface(agent.server) + agent.state.SetIface(agent.delegate) // Automatically register the "consul" service on server nodes consulService := structs.NodeService{ @@ -200,7 +211,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re agent.state.AddService(&consulService, agent.config.GetTokenForAgent()) } else { err = agent.setupClient() - agent.state.SetIface(agent.client) + agent.state.SetIface(agent.delegate) } if err != nil { return nil, err @@ -605,7 +616,7 @@ func (a *Agent) setupServer() error { if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } - a.server = server + a.delegate = server return nil } @@ -622,7 +633,7 @@ func (a *Agent) setupClient() error { if err != nil { return fmt.Errorf("Failed to start Consul client: %v", err) } - a.client = client + a.delegate = client return nil } @@ -784,10 +795,7 @@ LOAD: // RPC is used to make an RPC call to the Consul servers // This allows the agent to implement the Consul.Interface func (a *Agent) RPC(method string, args interface{}, reply interface{}) error { - if a.server != nil { - return a.server.RPC(method, args, reply) - } - return a.client.RPC(method, args, reply) + return a.delegate.RPC(method, args, reply) } // SnapshotRPC performs the requested snapshot RPC against the Consul server in @@ -796,19 +804,12 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error { // return payload will be written to out. func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn consul.SnapshotReplyFn) error { - - if a.server != nil { - return a.server.SnapshotRPC(args, in, out, replyFn) - } - return a.client.SnapshotRPC(args, in, out, replyFn) + return a.delegate.SnapshotRPC(args, in, out, replyFn) } // Leave is used to prepare the agent for a graceful shutdown func (a *Agent) Leave() error { - if a.server != nil { - return a.server.Leave() - } - return a.client.Leave() + return a.delegate.Leave() } // Shutdown is used to hard stop the agent. Should be @@ -840,12 +841,7 @@ func (a *Agent) Shutdown() error { } a.logger.Println("[INFO] agent: requesting shutdown") - var err error - if a.server != nil { - err = a.server.Shutdown() - } else { - err = a.client.Shutdown() - } + err := a.delegate.Shutdown() pidErr := a.deletePid() if pidErr != nil { @@ -867,11 +863,7 @@ func (a *Agent) ShutdownCh() <-chan struct{} { // JoinLAN is used to have the agent join a LAN cluster func (a *Agent) JoinLAN(addrs []string) (n int, err error) { a.logger.Printf("[INFO] agent: (LAN) joining: %v", addrs) - if a.server != nil { - n, err = a.server.JoinLAN(addrs) - } else { - n, err = a.client.JoinLAN(addrs) - } + n, err = a.delegate.JoinLAN(addrs) a.logger.Printf("[INFO] agent: (LAN) joined: %d Err: %v", n, err) return } @@ -879,8 +871,8 @@ func (a *Agent) JoinLAN(addrs []string) (n int, err error) { // JoinWAN is used to have the agent join a WAN cluster func (a *Agent) JoinWAN(addrs []string) (n int, err error) { a.logger.Printf("[INFO] agent: (WAN) joining: %v", addrs) - if a.server != nil { - n, err = a.server.JoinWAN(addrs) + if srv, ok := a.delegate.(*consul.Server); ok { + n, err = srv.JoinWAN(addrs) } else { err = fmt.Errorf("Must be a server to join WAN cluster") } @@ -891,11 +883,7 @@ func (a *Agent) JoinWAN(addrs []string) (n int, err error) { // ForceLeave is used to remove a failed node from the cluster func (a *Agent) ForceLeave(node string) (err error) { a.logger.Printf("[INFO] Force leaving node: %v", node) - if a.server != nil { - err = a.server.RemoveFailedNode(node) - } else { - err = a.client.RemoveFailedNode(node) - } + err = a.delegate.RemoveFailedNode(node) if err != nil { a.logger.Printf("[WARN] Failed to remove node: %v", err) } @@ -904,24 +892,18 @@ func (a *Agent) ForceLeave(node string) (err error) { // LocalMember is used to return the local node func (a *Agent) LocalMember() serf.Member { - if a.server != nil { - return a.server.LocalMember() - } - return a.client.LocalMember() + return a.delegate.LocalMember() } // LANMembers is used to retrieve the LAN members func (a *Agent) LANMembers() []serf.Member { - if a.server != nil { - return a.server.LANMembers() - } - return a.client.LANMembers() + return a.delegate.LANMembers() } // WANMembers is used to retrieve the WAN members func (a *Agent) WANMembers() []serf.Member { - if a.server != nil { - return a.server.WANMembers() + if srv, ok := a.delegate.(*consul.Server); ok { + return srv.WANMembers() } return nil } @@ -943,13 +925,10 @@ func (a *Agent) ResumeSync() { a.state.Resume() } -// Returns the coordinate of this node in the local pool (assumes coordinates +// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates // are enabled, so check that before calling). -func (a *Agent) GetCoordinate() (*coordinate.Coordinate, error) { - if a.config.Server { - return a.server.GetLANCoordinate() - } - return a.client.GetCoordinate() +func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) { + return a.delegate.GetLANCoordinate() } // sendCoordinate is a long-running loop that periodically sends our coordinate @@ -974,7 +953,7 @@ func (a *Agent) sendCoordinate() { continue } - c, err := a.GetCoordinate() + c, err := a.GetLANCoordinate() if err != nil { a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err) continue @@ -1205,7 +1184,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe // The agent will make a best effort to ensure it is deregistered func (a *Agent) RemoveService(serviceID string, persist bool) error { // Protect "consul" service from deletion by a user - if a.server != nil && serviceID == consul.ConsulServiceID { + if _, ok := a.delegate.(*consul.Server); ok && serviceID == consul.ConsulServiceID { return fmt.Errorf( "Deregistering the %s service is not allowed", consul.ConsulServiceID) @@ -1563,12 +1542,7 @@ func (a *Agent) Stats() map[string]map[string]string { toString := func(v uint64) string { return strconv.FormatUint(v, 10) } - var stats map[string]map[string]string - if a.server != nil { - stats = a.server.Stats() - } else { - stats = a.client.Stats() - } + stats := a.delegate.Stats() stats["agent"] = map[string]string{ "check_monitors": toString(uint64(len(a.checkMonitors))), "check_ttls": toString(uint64(len(a.checkTTLs))), @@ -1955,11 +1929,11 @@ func (a *Agent) DisableNodeMaintenance() { // that not all agent methods use this mechanism, and that is should only // be used for testing. func (a *Agent) InjectEndpoint(endpoint string, handler interface{}) error { - if a.server == nil { + srv, ok := a.delegate.(*consul.Server) + if !ok { return fmt.Errorf("agent must be a server") } - - if err := a.server.InjectEndpoint(handler); err != nil { + if err := srv.InjectEndpoint(handler); err != nil { return err } name := reflect.Indirect(reflect.ValueOf(handler)).Type().Name() diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 5a25f891e9..bda65fd43e 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -28,7 +28,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int var c *coordinate.Coordinate if !s.agent.config.DisableCoordinates { var err error - if c, err = s.agent.GetCoordinate(); err != nil { + if c, err = s.agent.GetLANCoordinate(); err != nil { return nil, err } } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index edf47a47fd..87335826e1 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -192,7 +192,7 @@ func TestAgent_Self(t *testing.T) { t.Fatalf("incorrect port: %v", obj) } - c, err := srv.agent.server.GetLANCoordinate() + c, err := srv.agent.GetLANCoordinate() if err != nil { t.Fatalf("err: %v", err) } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 8425773a14..513719c307 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1978,7 +1978,7 @@ func TestAgent_GetCoordinate(t *testing.T) { // sure that the agent chooses the correct Serf instance, // depending on how it's configured as a client or a server. // If it chooses the wrong one, this will crash. - if _, err := agent.GetCoordinate(); err != nil { + if _, err := agent.GetLANCoordinate(); err != nil { t.Fatalf("err: %s", err) } } diff --git a/command/agent/command.go b/command/agent/command.go index 682a52474b..61d911e5be 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -16,6 +16,7 @@ import ( "github.com/armon/go-metrics/circonus" "github.com/armon/go-metrics/datadog" "github.com/hashicorp/consul/command/base" + "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logger" @@ -657,13 +658,15 @@ func (c *Command) gossipEncrypted() bool { return true } - server := c.agent.server - if server != nil { + server, ok := c.agent.delegate.(*consul.Server) + if ok { return server.KeyManagerLAN() != nil || server.KeyManagerWAN() != nil } - - client := c.agent.client - return client != nil && client.KeyManagerLAN() != nil + client, ok := c.agent.delegate.(*consul.Client) + if ok { + return client != nil && client.KeyManagerLAN() != nil + } + panic(fmt.Sprintf("delegate is neither server nor client: %T", c.agent.delegate)) } func (c *Command) Run(args []string) int { @@ -846,12 +849,7 @@ func (c *Command) Run(args []string) int { } // Figure out if gossip is encrypted - var gossipEncrypted bool - if config.Server { - gossipEncrypted = c.agent.server.Encrypted() - } else { - gossipEncrypted = c.agent.client.Encrypted() - } + gossipEncrypted := c.agent.delegate.Encrypted() // Let the agent know we've finished registration c.agent.StartSync() diff --git a/command/agent/keyring.go b/command/agent/keyring.go index c870e2dbcb..f06ca22aab 100644 --- a/command/agent/keyring.go +++ b/command/agent/keyring.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" + "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" @@ -111,7 +112,8 @@ func loadKeyringFile(c *serf.Config) error { // performing various operations on the encryption keyring. func (a *Agent) keyringProcess(args *structs.KeyringRequest) (*structs.KeyringResponses, error) { var reply structs.KeyringResponses - if a.server == nil { + + if _, ok := a.delegate.(*consul.Server); !ok { return nil, fmt.Errorf("keyring operations must run against a server node") } if err := a.RPC("Internal.KeyringOperation", args, &reply); err != nil { diff --git a/consul/client.go b/consul/client.go index 82723872f2..9d3429a734 100644 --- a/consul/client.go +++ b/consul/client.go @@ -407,8 +407,8 @@ func (c *Client) Stats() map[string]map[string]string { return stats } -// GetCoordinate returns the network coordinate of the current node, as +// GetLANCoordinate returns the network coordinate of the current node, as // maintained by Serf. -func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) { +func (c *Client) GetLANCoordinate() (*coordinate.Coordinate, error) { return c.serf.GetCoordinate() }