mirror of
https://github.com/status-im/consul.git
synced 2025-02-23 10:58:25 +00:00
agent: Replace client/server with delegate interface
This patch adds a new internal interface clientServer which defines the common methods of consul.Client and consul.Server. This allows to replace the following code if a.server != nil { a.server.do() } else { a.client.do() } with a.delegate.do() In case a specific type is required a type check can be performed: if srv, ok := a.delegate.(*consul.Server); ok { srv.doSrv() }
This commit is contained in:
parent
9b1bd5197b
commit
e2c37b47ee
@ -48,19 +48,31 @@ const (
|
||||
"service, but no reason was provided. This is a default message."
|
||||
)
|
||||
|
||||
var (
|
||||
// dnsNameRe checks if a name or tag is dns-compatible.
|
||||
dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
|
||||
)
|
||||
// dnsNameRe checks if a name or tag is dns-compatible.
|
||||
var dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
|
||||
|
||||
/*
|
||||
The agent is the long running process that is run on every machine.
|
||||
It exposes an RPC interface that is used by the CLI to control the
|
||||
agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
|
||||
However, it can run in either a client, or server mode. In server
|
||||
mode, it runs a full Consul server. In client-only mode, it only forwards
|
||||
requests to other Consul servers.
|
||||
*/
|
||||
// clientServer defines the interface shared by both
|
||||
// consul.Client and consul.Server.
|
||||
type clientServer interface {
|
||||
Encrypted() bool
|
||||
GetLANCoordinate() (*coordinate.Coordinate, error)
|
||||
Leave() error
|
||||
LANMembers() []serf.Member
|
||||
LocalMember() serf.Member
|
||||
JoinLAN(addrs []string) (n int, err error)
|
||||
RemoveFailedNode(node string) error
|
||||
RPC(method string, args interface{}, reply interface{}) error
|
||||
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn consul.SnapshotReplyFn) error
|
||||
Shutdown() error
|
||||
Stats() map[string]map[string]string
|
||||
}
|
||||
|
||||
// The agent is the long running process that is run on every machine.
|
||||
// It exposes an RPC interface that is used by the CLI to control the
|
||||
// agent. The agent runs the query interfaces like HTTP, DNS, and RPC.
|
||||
// However, it can run in either a client, or server mode. In server
|
||||
// mode, it runs a full Consul server. In client-only mode, it only forwards
|
||||
// requests to other Consul servers.
|
||||
type Agent struct {
|
||||
config *Config
|
||||
|
||||
@ -73,10 +85,9 @@ type Agent struct {
|
||||
// Used for streaming logs to
|
||||
logWriter *logger.LogWriter
|
||||
|
||||
// We have one of a client or a server, depending
|
||||
// on our configuration
|
||||
server *consul.Server
|
||||
client *consul.Client
|
||||
// delegate is either a *consul.Server or *consul.Client
|
||||
// depending on the configuration
|
||||
delegate clientServer
|
||||
|
||||
// acls is an object that helps manage local ACL enforcement.
|
||||
acls *aclManager
|
||||
@ -187,7 +198,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re
|
||||
// Setup either the client or the server.
|
||||
if config.Server {
|
||||
err = agent.setupServer()
|
||||
agent.state.SetIface(agent.server)
|
||||
agent.state.SetIface(agent.delegate)
|
||||
|
||||
// Automatically register the "consul" service on server nodes
|
||||
consulService := structs.NodeService{
|
||||
@ -200,7 +211,7 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re
|
||||
agent.state.AddService(&consulService, agent.config.GetTokenForAgent())
|
||||
} else {
|
||||
err = agent.setupClient()
|
||||
agent.state.SetIface(agent.client)
|
||||
agent.state.SetIface(agent.delegate)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -605,7 +616,7 @@ func (a *Agent) setupServer() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul server: %v", err)
|
||||
}
|
||||
a.server = server
|
||||
a.delegate = server
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -622,7 +633,7 @@ func (a *Agent) setupClient() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to start Consul client: %v", err)
|
||||
}
|
||||
a.client = client
|
||||
a.delegate = client
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -784,10 +795,7 @@ LOAD:
|
||||
// RPC is used to make an RPC call to the Consul servers
|
||||
// This allows the agent to implement the Consul.Interface
|
||||
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
|
||||
if a.server != nil {
|
||||
return a.server.RPC(method, args, reply)
|
||||
}
|
||||
return a.client.RPC(method, args, reply)
|
||||
return a.delegate.RPC(method, args, reply)
|
||||
}
|
||||
|
||||
// SnapshotRPC performs the requested snapshot RPC against the Consul server in
|
||||
@ -796,19 +804,12 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
|
||||
// return payload will be written to out.
|
||||
func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
|
||||
replyFn consul.SnapshotReplyFn) error {
|
||||
|
||||
if a.server != nil {
|
||||
return a.server.SnapshotRPC(args, in, out, replyFn)
|
||||
}
|
||||
return a.client.SnapshotRPC(args, in, out, replyFn)
|
||||
return a.delegate.SnapshotRPC(args, in, out, replyFn)
|
||||
}
|
||||
|
||||
// Leave is used to prepare the agent for a graceful shutdown
|
||||
func (a *Agent) Leave() error {
|
||||
if a.server != nil {
|
||||
return a.server.Leave()
|
||||
}
|
||||
return a.client.Leave()
|
||||
return a.delegate.Leave()
|
||||
}
|
||||
|
||||
// Shutdown is used to hard stop the agent. Should be
|
||||
@ -840,12 +841,7 @@ func (a *Agent) Shutdown() error {
|
||||
}
|
||||
|
||||
a.logger.Println("[INFO] agent: requesting shutdown")
|
||||
var err error
|
||||
if a.server != nil {
|
||||
err = a.server.Shutdown()
|
||||
} else {
|
||||
err = a.client.Shutdown()
|
||||
}
|
||||
err := a.delegate.Shutdown()
|
||||
|
||||
pidErr := a.deletePid()
|
||||
if pidErr != nil {
|
||||
@ -867,11 +863,7 @@ func (a *Agent) ShutdownCh() <-chan struct{} {
|
||||
// JoinLAN is used to have the agent join a LAN cluster
|
||||
func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
|
||||
a.logger.Printf("[INFO] agent: (LAN) joining: %v", addrs)
|
||||
if a.server != nil {
|
||||
n, err = a.server.JoinLAN(addrs)
|
||||
} else {
|
||||
n, err = a.client.JoinLAN(addrs)
|
||||
}
|
||||
n, err = a.delegate.JoinLAN(addrs)
|
||||
a.logger.Printf("[INFO] agent: (LAN) joined: %d Err: %v", n, err)
|
||||
return
|
||||
}
|
||||
@ -879,8 +871,8 @@ func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
|
||||
// JoinWAN is used to have the agent join a WAN cluster
|
||||
func (a *Agent) JoinWAN(addrs []string) (n int, err error) {
|
||||
a.logger.Printf("[INFO] agent: (WAN) joining: %v", addrs)
|
||||
if a.server != nil {
|
||||
n, err = a.server.JoinWAN(addrs)
|
||||
if srv, ok := a.delegate.(*consul.Server); ok {
|
||||
n, err = srv.JoinWAN(addrs)
|
||||
} else {
|
||||
err = fmt.Errorf("Must be a server to join WAN cluster")
|
||||
}
|
||||
@ -891,11 +883,7 @@ func (a *Agent) JoinWAN(addrs []string) (n int, err error) {
|
||||
// ForceLeave is used to remove a failed node from the cluster
|
||||
func (a *Agent) ForceLeave(node string) (err error) {
|
||||
a.logger.Printf("[INFO] Force leaving node: %v", node)
|
||||
if a.server != nil {
|
||||
err = a.server.RemoveFailedNode(node)
|
||||
} else {
|
||||
err = a.client.RemoveFailedNode(node)
|
||||
}
|
||||
err = a.delegate.RemoveFailedNode(node)
|
||||
if err != nil {
|
||||
a.logger.Printf("[WARN] Failed to remove node: %v", err)
|
||||
}
|
||||
@ -904,24 +892,18 @@ func (a *Agent) ForceLeave(node string) (err error) {
|
||||
|
||||
// LocalMember is used to return the local node
|
||||
func (a *Agent) LocalMember() serf.Member {
|
||||
if a.server != nil {
|
||||
return a.server.LocalMember()
|
||||
}
|
||||
return a.client.LocalMember()
|
||||
return a.delegate.LocalMember()
|
||||
}
|
||||
|
||||
// LANMembers is used to retrieve the LAN members
|
||||
func (a *Agent) LANMembers() []serf.Member {
|
||||
if a.server != nil {
|
||||
return a.server.LANMembers()
|
||||
}
|
||||
return a.client.LANMembers()
|
||||
return a.delegate.LANMembers()
|
||||
}
|
||||
|
||||
// WANMembers is used to retrieve the WAN members
|
||||
func (a *Agent) WANMembers() []serf.Member {
|
||||
if a.server != nil {
|
||||
return a.server.WANMembers()
|
||||
if srv, ok := a.delegate.(*consul.Server); ok {
|
||||
return srv.WANMembers()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -943,13 +925,10 @@ func (a *Agent) ResumeSync() {
|
||||
a.state.Resume()
|
||||
}
|
||||
|
||||
// Returns the coordinate of this node in the local pool (assumes coordinates
|
||||
// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates
|
||||
// are enabled, so check that before calling).
|
||||
func (a *Agent) GetCoordinate() (*coordinate.Coordinate, error) {
|
||||
if a.config.Server {
|
||||
return a.server.GetLANCoordinate()
|
||||
}
|
||||
return a.client.GetCoordinate()
|
||||
func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) {
|
||||
return a.delegate.GetLANCoordinate()
|
||||
}
|
||||
|
||||
// sendCoordinate is a long-running loop that periodically sends our coordinate
|
||||
@ -974,7 +953,7 @@ func (a *Agent) sendCoordinate() {
|
||||
continue
|
||||
}
|
||||
|
||||
c, err := a.GetCoordinate()
|
||||
c, err := a.GetLANCoordinate()
|
||||
if err != nil {
|
||||
a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err)
|
||||
continue
|
||||
@ -1205,7 +1184,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe
|
||||
// The agent will make a best effort to ensure it is deregistered
|
||||
func (a *Agent) RemoveService(serviceID string, persist bool) error {
|
||||
// Protect "consul" service from deletion by a user
|
||||
if a.server != nil && serviceID == consul.ConsulServiceID {
|
||||
if _, ok := a.delegate.(*consul.Server); ok && serviceID == consul.ConsulServiceID {
|
||||
return fmt.Errorf(
|
||||
"Deregistering the %s service is not allowed",
|
||||
consul.ConsulServiceID)
|
||||
@ -1563,12 +1542,7 @@ func (a *Agent) Stats() map[string]map[string]string {
|
||||
toString := func(v uint64) string {
|
||||
return strconv.FormatUint(v, 10)
|
||||
}
|
||||
var stats map[string]map[string]string
|
||||
if a.server != nil {
|
||||
stats = a.server.Stats()
|
||||
} else {
|
||||
stats = a.client.Stats()
|
||||
}
|
||||
stats := a.delegate.Stats()
|
||||
stats["agent"] = map[string]string{
|
||||
"check_monitors": toString(uint64(len(a.checkMonitors))),
|
||||
"check_ttls": toString(uint64(len(a.checkTTLs))),
|
||||
@ -1955,11 +1929,11 @@ func (a *Agent) DisableNodeMaintenance() {
|
||||
// that not all agent methods use this mechanism, and that is should only
|
||||
// be used for testing.
|
||||
func (a *Agent) InjectEndpoint(endpoint string, handler interface{}) error {
|
||||
if a.server == nil {
|
||||
srv, ok := a.delegate.(*consul.Server)
|
||||
if !ok {
|
||||
return fmt.Errorf("agent must be a server")
|
||||
}
|
||||
|
||||
if err := a.server.InjectEndpoint(handler); err != nil {
|
||||
if err := srv.InjectEndpoint(handler); err != nil {
|
||||
return err
|
||||
}
|
||||
name := reflect.Indirect(reflect.ValueOf(handler)).Type().Name()
|
||||
|
@ -28,7 +28,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
|
||||
var c *coordinate.Coordinate
|
||||
if !s.agent.config.DisableCoordinates {
|
||||
var err error
|
||||
if c, err = s.agent.GetCoordinate(); err != nil {
|
||||
if c, err = s.agent.GetLANCoordinate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@ -192,7 +192,7 @@ func TestAgent_Self(t *testing.T) {
|
||||
t.Fatalf("incorrect port: %v", obj)
|
||||
}
|
||||
|
||||
c, err := srv.agent.server.GetLANCoordinate()
|
||||
c, err := srv.agent.GetLANCoordinate()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1978,7 +1978,7 @@ func TestAgent_GetCoordinate(t *testing.T) {
|
||||
// sure that the agent chooses the correct Serf instance,
|
||||
// depending on how it's configured as a client or a server.
|
||||
// If it chooses the wrong one, this will crash.
|
||||
if _, err := agent.GetCoordinate(); err != nil {
|
||||
if _, err := agent.GetLANCoordinate(); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/armon/go-metrics/circonus"
|
||||
"github.com/armon/go-metrics/datadog"
|
||||
"github.com/hashicorp/consul/command/base"
|
||||
"github.com/hashicorp/consul/consul"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logger"
|
||||
@ -657,13 +658,15 @@ func (c *Command) gossipEncrypted() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
server := c.agent.server
|
||||
if server != nil {
|
||||
server, ok := c.agent.delegate.(*consul.Server)
|
||||
if ok {
|
||||
return server.KeyManagerLAN() != nil || server.KeyManagerWAN() != nil
|
||||
}
|
||||
|
||||
client := c.agent.client
|
||||
return client != nil && client.KeyManagerLAN() != nil
|
||||
client, ok := c.agent.delegate.(*consul.Client)
|
||||
if ok {
|
||||
return client != nil && client.KeyManagerLAN() != nil
|
||||
}
|
||||
panic(fmt.Sprintf("delegate is neither server nor client: %T", c.agent.delegate))
|
||||
}
|
||||
|
||||
func (c *Command) Run(args []string) int {
|
||||
@ -846,12 +849,7 @@ func (c *Command) Run(args []string) int {
|
||||
}
|
||||
|
||||
// Figure out if gossip is encrypted
|
||||
var gossipEncrypted bool
|
||||
if config.Server {
|
||||
gossipEncrypted = c.agent.server.Encrypted()
|
||||
} else {
|
||||
gossipEncrypted = c.agent.client.Encrypted()
|
||||
}
|
||||
gossipEncrypted := c.agent.delegate.Encrypted()
|
||||
|
||||
// Let the agent know we've finished registration
|
||||
c.agent.StartSync()
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/hashicorp/consul/consul"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
@ -111,7 +112,8 @@ func loadKeyringFile(c *serf.Config) error {
|
||||
// performing various operations on the encryption keyring.
|
||||
func (a *Agent) keyringProcess(args *structs.KeyringRequest) (*structs.KeyringResponses, error) {
|
||||
var reply structs.KeyringResponses
|
||||
if a.server == nil {
|
||||
|
||||
if _, ok := a.delegate.(*consul.Server); !ok {
|
||||
return nil, fmt.Errorf("keyring operations must run against a server node")
|
||||
}
|
||||
if err := a.RPC("Internal.KeyringOperation", args, &reply); err != nil {
|
||||
|
@ -407,8 +407,8 @@ func (c *Client) Stats() map[string]map[string]string {
|
||||
return stats
|
||||
}
|
||||
|
||||
// GetCoordinate returns the network coordinate of the current node, as
|
||||
// GetLANCoordinate returns the network coordinate of the current node, as
|
||||
// maintained by Serf.
|
||||
func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) {
|
||||
func (c *Client) GetLANCoordinate() (*coordinate.Coordinate, error) {
|
||||
return c.serf.GetCoordinate()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user