From 7cff50a4df02be8c2a63233c4c821ef3fdb1982c Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 6 Jul 2017 12:48:37 +0200 Subject: [PATCH] agent: move agent/consul/agent to agent/metadata --- agent/consul/autopilot.go | 12 +++--- agent/consul/client.go | 6 +-- agent/consul/leader.go | 14 +++---- agent/consul/merge.go | 6 +-- agent/consul/operator_raft_endpoint.go | 4 +- agent/consul/rpc.go | 8 ++-- agent/consul/serf.go | 10 ++--- agent/consul/server.go | 10 ++--- agent/consul/server_test.go | 4 +- agent/consul/stats_fetcher.go | 8 ++-- agent/consul/stats_fetcher_test.go | 6 +-- agent/consul/util.go | 4 +- agent/{consul/agent => metadata}/server.go | 2 +- .../server_internal_test.go | 2 +- .../{consul/agent => metadata}/server_test.go | 30 +++++++------- agent/router/manager.go | 34 ++++++++-------- agent/router/manager_internal_test.go | 22 +++++----- agent/router/manager_test.go | 40 +++++++++---------- agent/router/router.go | 22 +++++----- agent/router/serf_adapter.go | 6 +-- agent/router/serf_flooder.go | 10 ++--- 21 files changed, 130 insertions(+), 130 deletions(-) rename agent/{consul/agent => metadata}/server.go (99%) rename agent/{consul/agent => metadata}/server_internal_test.go (98%) rename agent/{consul/agent => metadata}/server_test.go (88%) diff --git a/agent/consul/autopilot.go b/agent/consul/autopilot.go index fe02256da3..ba6d68a69c 100644 --- a/agent/consul/autopilot.go +++ b/agent/consul/autopilot.go @@ -8,7 +8,7 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-version" "github.com/hashicorp/raft" @@ -90,7 +90,7 @@ func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) erro } for _, member := range s.serfLAN.Members() { - valid, parts := agent.IsConsulServer(member) + valid, parts := metadata.IsConsulServer(member) if valid { // Remove this server from the stale list; it has a serf entry @@ -275,13 +275,13 @@ func (s *Server) updateClusterHealth() error { } // Get the the serf members which are Consul servers - serverMap := make(map[string]*agent.Server) + serverMap := make(map[string]*metadata.Server) for _, member := range s.LANMembers() { if member.Status == serf.StatusLeft { continue } - valid, parts := agent.IsConsulServer(member) + valid, parts := metadata.IsConsulServer(member) if valid { serverMap[parts.ID] = parts } @@ -297,7 +297,7 @@ func (s *Server) updateClusterHealth() error { // consistent of a sample as possible. We capture the leader's index // here as well so it roughly lines up with the same point in time. targetLastIndex := s.raft.LastIndex() - var fetchList []*agent.Server + var fetchList []*metadata.Server for _, server := range servers { if parts, ok := serverMap[string(server.ID)]; ok { fetchList = append(fetchList, parts) @@ -377,7 +377,7 @@ func (s *Server) updateClusterHealth() error { // updateServerHealth computes the resulting health of the server based on its // fetched stats and the state of the leader. func (s *Server) updateServerHealth(health *structs.ServerHealth, - server *agent.Server, stats *structs.ServerStats, + server *metadata.Server, stats *structs.ServerStats, autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error { health.LastTerm = stats.LastTerm diff --git a/agent/consul/client.go b/agent/consul/client.go index dc562e1313..d669100ff7 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -11,7 +11,7 @@ import ( "sync" "time" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" @@ -275,7 +275,7 @@ func (c *Client) lanEventHandler() { // nodeJoin is used to handle join events on the serf cluster func (c *Client) nodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok { continue } @@ -297,7 +297,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { // nodeFail is used to handle fail events on the serf cluster func (c *Client) nodeFail(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok { continue } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 1d3c60c931..20749b286c 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -9,7 +9,7 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/types" @@ -440,7 +440,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { return true } - if valid, parts := agent.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { + if valid, parts := metadata.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { return true } return false @@ -451,7 +451,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { func (s *Server) handleAliveMember(member serf.Member) error { // Register consul service if a server var service *structs.NodeService - if valid, parts := agent.IsConsulServer(member); valid { + if valid, parts := metadata.IsConsulServer(member); valid { service = &structs.NodeService{ ID: structs.ConsulServiceID, Service: structs.ConsulServiceName, @@ -595,7 +595,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error } // Remove from Raft peers if this was a server - if valid, parts := agent.IsConsulServer(member); valid { + if valid, parts := metadata.IsConsulServer(member); valid { if err := s.removeConsulServer(member, parts.Port); err != nil { return err } @@ -622,7 +622,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error } // joinConsulServer is used to try to join another consul server -func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { +func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error { // Do not join ourself if m.Name == s.config.NodeName { return nil @@ -632,7 +632,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *agent.Server) error { if parts.Bootstrap { members := s.serfLAN.Members() for _, member := range members { - valid, p := agent.IsConsulServer(member) + valid, p := metadata.IsConsulServer(member) if valid && member.Name != m.Name && p.Bootstrap { s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name) return nil @@ -732,7 +732,7 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { return err } - _, parts := agent.IsConsulServer(m) + _, parts := metadata.IsConsulServer(m) // Pick which remove API to use based on how the server was added. for _, server := range configFuture.Configuration().Servers { diff --git a/agent/consul/merge.go b/agent/consul/merge.go index b361772dcd..f1bc295676 100644 --- a/agent/consul/merge.go +++ b/agent/consul/merge.go @@ -3,7 +3,7 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" ) @@ -48,7 +48,7 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { continue } - ok, parts := agent.IsConsulServer(*m) + ok, parts := metadata.IsConsulServer(*m) if ok && parts.Datacenter != md.dc { return fmt.Errorf("Member '%s' part of wrong datacenter '%s'", m.Name, parts.Datacenter) @@ -65,7 +65,7 @@ type wanMergeDelegate struct { func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error { for _, m := range members { - ok, _ := agent.IsConsulServer(*m) + ok, _ := metadata.IsConsulServer(*m) if !ok { return fmt.Errorf("Member '%s' is not a server", m.Name) } diff --git a/agent/consul/operator_raft_endpoint.go b/agent/consul/operator_raft_endpoint.go index 84e7369607..dcda6eb994 100644 --- a/agent/consul/operator_raft_endpoint.go +++ b/agent/consul/operator_raft_endpoint.go @@ -4,7 +4,7 @@ import ( "fmt" "net" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -35,7 +35,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply // Index the Consul information about the servers. serverMap := make(map[raft.ServerAddress]serf.Member) for _, member := range op.srv.serfLAN.Members() { - valid, parts := agent.IsConsulServer(member) + valid, parts := metadata.IsConsulServer(member) if !valid { continue } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 3777e08c8a..41a1a6699a 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -9,12 +9,12 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/consul/agent/consul/agent" "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/go-memdb" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/memberlist" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/yamux" @@ -225,7 +225,7 @@ CHECK_LEADER: // getLeader returns if the current node is the leader, and if not then it // returns the leader which is potentially nil if the cluster has not yet // elected a leader. -func (s *Server) getLeader() (bool, *agent.Server) { +func (s *Server) getLeader() (bool, *metadata.Server) { // Check if we are the leader if s.IsLeader() { return true, nil @@ -247,7 +247,7 @@ func (s *Server) getLeader() (bool, *agent.Server) { } // forwardLeader is used to forward an RPC call to the leader, or fail if no leader -func (s *Server) forwardLeader(server *agent.Server, method string, args interface{}, reply interface{}) error { +func (s *Server) forwardLeader(server *metadata.Server, method string, args interface{}, reply interface{}) error { // Handle a missing server if server == nil { return structs.ErrNoLeader diff --git a/agent/consul/serf.go b/agent/consul/serf.go index b9e50e0763..16d3f7be31 100644 --- a/agent/consul/serf.go +++ b/agent/consul/serf.go @@ -4,7 +4,7 @@ import ( "strings" "time" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" ) @@ -125,7 +125,7 @@ func (s *Server) localEvent(event serf.UserEvent) { // lanNodeJoin is used to handle join events on the LAN pool. func (s *Server) lanNodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok { continue } @@ -166,9 +166,9 @@ func (s *Server) maybeBootstrap() { // Scan for all the known servers. members := s.serfLAN.Members() - var servers []agent.Server + var servers []metadata.Server for _, member := range members { - valid, p := agent.IsConsulServer(member) + valid, p := metadata.IsConsulServer(member) if !valid { continue } @@ -265,7 +265,7 @@ func (s *Server) maybeBootstrap() { // lanNodeFailed is used to handle fail events on the LAN pool. func (s *Server) lanNodeFailed(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok { continue } diff --git a/agent/consul/server.go b/agent/consul/server.go index f72795a107..06f843417e 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -18,8 +18,8 @@ import ( "time" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/agent" "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" @@ -28,7 +28,7 @@ import ( "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" "github.com/hashicorp/raft" - "github.com/hashicorp/raft-boltdb" + raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) @@ -125,7 +125,7 @@ type Server struct { // localConsuls is used to track the known consuls // in the local datacenter. Used to do leader forwarding. - localConsuls map[raft.ServerAddress]*agent.Server + localConsuls map[raft.ServerAddress]*metadata.Server localLock sync.RWMutex // Logger uses the provided LogOutput @@ -295,7 +295,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* connPool: connPool, eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), - localConsuls: make(map[raft.ServerAddress]*agent.Server), + localConsuls: make(map[raft.ServerAddress]*metadata.Server), logger: logger, reconcileCh: make(chan serf.Member, 32), router: router.NewRouter(logger, config.Datacenter), @@ -385,7 +385,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) // Fire up the LAN <-> WAN join flooder. - portFn := func(s *agent.Server) (int, bool) { + portFn := func(s *metadata.Server) (int, bool) { if s.WanJoinPort > 0 { return s.WanJoinPort, true } diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index dda19c484a..21dfe259f2 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testutil" @@ -667,7 +667,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) { // Have s2 make an RPC call to s1 s2.localLock.RLock() - var leader *agent.Server + var leader *metadata.Server for _, server := range s2.localConsuls { if server.Name == s1.config.NodeName { leader = server diff --git a/agent/consul/stats_fetcher.go b/agent/consul/stats_fetcher.go index e6f91821b2..25eefb9609 100644 --- a/agent/consul/stats_fetcher.go +++ b/agent/consul/stats_fetcher.go @@ -5,7 +5,7 @@ import ( "log" "sync" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/structs" ) @@ -39,7 +39,7 @@ func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string) // cancel this when the context is canceled because we only want one in-flight // RPC to each server, so we let it finish and then clean up the in-flight // tracking. -func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerStats) { +func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *structs.ServerStats) { var args struct{} var reply structs.ServerStats err := f.pool.RPC(f.datacenter, server.Addr, server.Version, "Status.RaftStats", server.UseTLS, &args, &reply) @@ -56,9 +56,9 @@ func (f *StatsFetcher) fetch(server *agent.Server, replyCh chan *structs.ServerS } // Fetch will attempt to query all the servers in parallel. -func (f *StatsFetcher) Fetch(ctx context.Context, servers []*agent.Server) map[string]*structs.ServerStats { +func (f *StatsFetcher) Fetch(ctx context.Context, servers []*metadata.Server) map[string]*structs.ServerStats { type workItem struct { - server *agent.Server + server *metadata.Server replyCh chan *structs.ServerStats } var work []*workItem diff --git a/agent/consul/stats_fetcher_test.go b/agent/consul/stats_fetcher_test.go index 8f9601ebbd..516d08c5ea 100644 --- a/agent/consul/stats_fetcher_test.go +++ b/agent/consul/stats_fetcher_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/types" ) @@ -34,9 +34,9 @@ func TestStatsFetcher(t *testing.T) { t.Fatalf("bad len: %d", len(members)) } - var servers []*agent.Server + var servers []*metadata.Server for _, member := range members { - ok, server := agent.IsConsulServer(member) + ok, server := metadata.IsConsulServer(member) if !ok { t.Fatalf("bad: %#v", member) } diff --git a/agent/consul/util.go b/agent/consul/util.go index 92c0b66836..c13b6fe701 100644 --- a/agent/consul/util.go +++ b/agent/consul/util.go @@ -7,7 +7,7 @@ import ( "runtime" "strconv" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" ) @@ -302,7 +302,7 @@ func runtimeStats() map[string]string { // given Consul version func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool { for _, member := range members { - if valid, parts := agent.IsConsulServer(member); valid && parts.Status == serf.StatusAlive { + if valid, parts := metadata.IsConsulServer(member); valid && parts.Status == serf.StatusAlive { if parts.Build.LessThan(minVersion) { return false } diff --git a/agent/consul/agent/server.go b/agent/metadata/server.go similarity index 99% rename from agent/consul/agent/server.go rename to agent/metadata/server.go index 0c51424c63..97beb03158 100644 --- a/agent/consul/agent/server.go +++ b/agent/metadata/server.go @@ -3,7 +3,7 @@ // communicate Consul server information. Gossiped information that ends up // in Server contains the necessary metadata required for servers.Manager to // select which server an RPC request should be routed to. -package agent +package metadata import ( "fmt" diff --git a/agent/consul/agent/server_internal_test.go b/agent/metadata/server_internal_test.go similarity index 98% rename from agent/consul/agent/server_internal_test.go rename to agent/metadata/server_internal_test.go index 3b4f3d7e1f..01c7b41143 100644 --- a/agent/consul/agent/server_internal_test.go +++ b/agent/metadata/server_internal_test.go @@ -1,4 +1,4 @@ -package agent +package metadata import ( "testing" diff --git a/agent/consul/agent/server_test.go b/agent/metadata/server_test.go similarity index 88% rename from agent/consul/agent/server_test.go rename to agent/metadata/server_test.go index f12ca97371..bd67115242 100644 --- a/agent/consul/agent/server_test.go +++ b/agent/metadata/server_test.go @@ -1,10 +1,10 @@ -package agent_test +package metadata_test import ( "net" "testing" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/serf/serf" ) @@ -14,19 +14,19 @@ func TestServer_Key_params(t *testing.T) { tests := []struct { name string - sd1 *agent.Server - sd2 *agent.Server + sd1 *metadata.Server + sd2 *metadata.Server equal bool }{ { name: "Addr inequality", - sd1: &agent.Server{ + sd1: &metadata.Server{ Name: "s1", Datacenter: "dc1", Port: 8300, Addr: &net.IPAddr{IP: ipv4a}, }, - sd2: &agent.Server{ + sd2: &metadata.Server{ Name: "s1", Datacenter: "dc1", Port: 8300, @@ -42,7 +42,7 @@ func TestServer_Key_params(t *testing.T) { } // Test Key to make sure it actually works as a key - m := make(map[agent.Key]bool) + m := make(map[metadata.Key]bool) m[*test.sd1.Key()] = true if _, found := m[*test.sd2.Key()]; found != test.equal { t.Errorf("Expected a %v result from map test %s", test.equal, test.name) @@ -68,7 +68,7 @@ func TestIsConsulServer(t *testing.T) { }, Status: serf.StatusLeft, } - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { t.Fatalf("bad: %v %v", ok, parts) } @@ -101,7 +101,7 @@ func TestIsConsulServer(t *testing.T) { } m.Tags["bootstrap"] = "1" m.Tags["disabled"] = "1" - ok, parts = agent.IsConsulServer(m) + ok, parts = metadata.IsConsulServer(m) if !ok { t.Fatalf("expected a valid consul server") } @@ -117,7 +117,7 @@ func TestIsConsulServer(t *testing.T) { m.Tags["expect"] = "3" delete(m.Tags, "bootstrap") delete(m.Tags, "disabled") - ok, parts = agent.IsConsulServer(m) + ok, parts = metadata.IsConsulServer(m) if !ok || parts.Expect != 3 { t.Fatalf("bad: %v", parts.Expect) } @@ -126,7 +126,7 @@ func TestIsConsulServer(t *testing.T) { } delete(m.Tags, "role") - ok, parts = agent.IsConsulServer(m) + ok, parts = metadata.IsConsulServer(m) if ok { t.Fatalf("unexpected ok server") } @@ -147,7 +147,7 @@ func TestIsConsulServer_Optional(t *testing.T) { // should default to zero. }, } - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { t.Fatalf("bad: %v %v", ok, parts) } @@ -174,7 +174,7 @@ func TestIsConsulServer_Optional(t *testing.T) { } m.Tags["bootstrap"] = "1" m.Tags["disabled"] = "1" - ok, parts = agent.IsConsulServer(m) + ok, parts = metadata.IsConsulServer(m) if !ok { t.Fatalf("expected a valid consul server") } @@ -190,7 +190,7 @@ func TestIsConsulServer_Optional(t *testing.T) { m.Tags["expect"] = "3" delete(m.Tags, "bootstrap") delete(m.Tags, "disabled") - ok, parts = agent.IsConsulServer(m) + ok, parts = metadata.IsConsulServer(m) if !ok || parts.Expect != 3 { t.Fatalf("bad: %v", parts.Expect) } @@ -199,7 +199,7 @@ func TestIsConsulServer_Optional(t *testing.T) { } delete(m.Tags, "role") - ok, parts = agent.IsConsulServer(m) + ok, parts = metadata.IsConsulServer(m) if ok { t.Fatalf("unexpected ok server") } diff --git a/agent/router/manager.go b/agent/router/manager.go index 497191ba80..19c710e9f7 100644 --- a/agent/router/manager.go +++ b/agent/router/manager.go @@ -1,5 +1,5 @@ // Package servers provides a Manager interface for Manager managed -// agent.Server objects. The servers package manages servers from a Consul +// metadata.Server objects. The servers package manages servers from a Consul // client's perspective (i.e. a list of servers that a client talks with for // RPCs). The servers package does not provide any API guarantees and should // be called only by `hashicorp/consul`. @@ -13,7 +13,7 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/lib" ) @@ -71,7 +71,7 @@ type Pinger interface { type serverList struct { // servers tracks the locally known servers. List membership is // maintained by Serf. - servers []*agent.Server + servers []*metadata.Server } type Manager struct { @@ -111,7 +111,7 @@ type Manager struct { // begin seeing use after the rebalance timer fires or enough servers fail // organically. If the server is already known, merge the new server // details. -func (m *Manager) AddServer(s *agent.Server) { +func (m *Manager) AddServer(s *metadata.Server) { m.listLock.Lock() defer m.listLock.Unlock() l := m.getServerList() @@ -120,7 +120,7 @@ func (m *Manager) AddServer(s *agent.Server) { found := false for idx, existing := range l.servers { if existing.Name == s.Name { - newServers := make([]*agent.Server, len(l.servers)) + newServers := make([]*metadata.Server, len(l.servers)) copy(newServers, l.servers) // Overwrite the existing server details in order to @@ -135,7 +135,7 @@ func (m *Manager) AddServer(s *agent.Server) { // Add to the list if not known if !found { - newServers := make([]*agent.Server, len(l.servers), len(l.servers)+1) + newServers := make([]*metadata.Server, len(l.servers), len(l.servers)+1) copy(newServers, l.servers) newServers = append(newServers, s) l.servers = newServers @@ -156,13 +156,13 @@ func (m *Manager) AddServer(s *agent.Server) { // less desirable than just returning the next server in the firing line. If // the next server fails, it will fail fast enough and cycleServer will be // called again. -func (l *serverList) cycleServer() (servers []*agent.Server) { +func (l *serverList) cycleServer() (servers []*metadata.Server) { numServers := len(l.servers) if numServers < 2 { return servers // No action required } - newServers := make([]*agent.Server, 0, numServers) + newServers := make([]*metadata.Server, 0, numServers) newServers = append(newServers, l.servers[1:]...) newServers = append(newServers, l.servers[0]) @@ -170,7 +170,7 @@ func (l *serverList) cycleServer() (servers []*agent.Server) { } // removeServerByKey performs an inline removal of the first matching server -func (l *serverList) removeServerByKey(targetKey *agent.Key) { +func (l *serverList) removeServerByKey(targetKey *metadata.Key) { for i, s := range l.servers { if targetKey.Equal(s.Key()) { copy(l.servers[i:], l.servers[i+1:]) @@ -202,7 +202,7 @@ func (m *Manager) IsOffline() bool { // server list. If the server at the front of the list has failed or fails // during an RPC call, it is rotated to the end of the list. If there are no // servers available, return nil. -func (m *Manager) FindServer() *agent.Server { +func (m *Manager) FindServer() *metadata.Server { l := m.getServerList() numServers := len(l.servers) if numServers == 0 { @@ -249,14 +249,14 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCl atomic.StoreInt32(&m.offline, 1) l := serverList{} - l.servers = make([]*agent.Server, 0) + l.servers = make([]*metadata.Server, 0) m.saveServerList(l) return m } // NotifyFailedServer marks the passed in server as "failed" by rotating it // to the end of the server list. -func (m *Manager) NotifyFailedServer(s *agent.Server) { +func (m *Manager) NotifyFailedServer(s *metadata.Server) { l := m.getServerList() // If the server being failed is not the first server on the list, @@ -290,7 +290,7 @@ func (m *Manager) NumServers() int { return len(l.servers) } -// RebalanceServers shuffles the list of servers on this agent. The server +// RebalanceServers shuffles the list of servers on this metadata. The server // at the front of the list is selected for the next RPC. RPC calls that // fail for a particular server are rotated to the end of the list. This // method reshuffles the list periodically in order to redistribute work @@ -376,14 +376,14 @@ func (m *Manager) reconcileServerList(l *serverList) bool { } type targetServer struct { - server *agent.Server + server *metadata.Server // 'b' == both // 'o' == original // 'n' == new state byte } - mergedList := make(map[agent.Key]*targetServer, len(l.servers)) + mergedList := make(map[metadata.Key]*targetServer, len(l.servers)) for _, s := range l.servers { mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} } @@ -425,7 +425,7 @@ func (m *Manager) reconcileServerList(l *serverList) bool { // RemoveServer takes out an internal write lock and removes a server from // the server list. -func (m *Manager) RemoveServer(s *agent.Server) { +func (m *Manager) RemoveServer(s *metadata.Server) { m.listLock.Lock() defer m.listLock.Unlock() l := m.getServerList() @@ -433,7 +433,7 @@ func (m *Manager) RemoveServer(s *agent.Server) { // Remove the server if known for i := range l.servers { if l.servers[i].Name == s.Name { - newServers := make([]*agent.Server, 0, len(l.servers)-1) + newServers := make([]*metadata.Server, 0, len(l.servers)-1) newServers = append(newServers, l.servers[:i]...) newServers = append(newServers, l.servers[i+1:]...) l.servers = newServers diff --git a/agent/router/manager_internal_test.go b/agent/router/manager_internal_test.go index c11a3c49e8..0425e14f68 100644 --- a/agent/router/manager_internal_test.go +++ b/agent/router/manager_internal_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" ) var ( @@ -64,14 +64,14 @@ func testManagerFailProb(failPct float64) (m *Manager) { return m } -// func (l *serverList) cycleServer() (servers []*agent.Server) { +// func (l *serverList) cycleServer() (servers []*metadata.Server) { func TestManagerInternal_cycleServer(t *testing.T) { m := testManager() l := m.getServerList() - server0 := &agent.Server{Name: "server1"} - server1 := &agent.Server{Name: "server2"} - server2 := &agent.Server{Name: "server3"} + server0 := &metadata.Server{Name: "server1"} + server1 := &metadata.Server{Name: "server2"} + server2 := &metadata.Server{Name: "server3"} l.servers = append(l.servers, server0, server1, server2) m.saveServerList(l) @@ -167,11 +167,11 @@ func test_reconcileServerList(maxServers int) (bool, error) { const failPct = 0.5 m := testManagerFailProb(failPct) - var failedServers, healthyServers []*agent.Server + var failedServers, healthyServers []*metadata.Server for i := 0; i < maxServers; i++ { nodeName := fmt.Sprintf("s%02d", i) - node := &agent.Server{Name: nodeName} + node := &metadata.Server{Name: nodeName} // Add 66% of servers to Manager if rand.Float64() > 0.33 { m.AddServer(node) @@ -231,7 +231,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { return true, nil } - resultingServerMap := make(map[agent.Key]bool) + resultingServerMap := make(map[metadata.Key]bool) for _, s := range m.getServerList().servers { resultingServerMap[*s.Key()] = true } @@ -303,7 +303,7 @@ func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) { m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) for i := 0; i < s.numServers; i++ { nodeName := fmt.Sprintf("s%02d", i) - m.AddServer(&agent.Server{Name: nodeName}) + m.AddServer(&metadata.Server{Name: nodeName}) } d := m.refreshServerRebalanceTimer() @@ -324,7 +324,7 @@ func TestManagerInternal_saveServerList(t *testing.T) { t.Fatalf("Manager.saveServerList failed to load init config") } - newServer := new(agent.Server) + newServer := new(metadata.Server) l.servers = append(l.servers, newServer) m.saveServerList(l) }() @@ -340,7 +340,7 @@ func TestManagerInternal_saveServerList(t *testing.T) { // Verify mutation w/o a save doesn't alter the original func() { - newServer := new(agent.Server) + newServer := new(metadata.Server) l := m.getServerList() l.servers = append(l.servers, newServer) diff --git a/agent/router/manager_test.go b/agent/router/manager_test.go index 35e17d463f..00d63938de 100644 --- a/agent/router/manager_test.go +++ b/agent/router/manager_test.go @@ -9,7 +9,7 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/router" ) @@ -48,7 +48,7 @@ func testManagerFailProb(failPct float64) (m *router.Manager) { return m } -// func (m *Manager) AddServer(server *agent.Server) { +// func (m *Manager) AddServer(server *metadata.Server) { func TestServers_AddServer(t *testing.T) { m := testManager() var num int @@ -57,7 +57,7 @@ func TestServers_AddServer(t *testing.T) { t.Fatalf("Expected zero servers to start") } - s1 := &agent.Server{Name: "s1"} + s1 := &metadata.Server{Name: "s1"} m.AddServer(s1) num = m.NumServers() if num != 1 { @@ -70,7 +70,7 @@ func TestServers_AddServer(t *testing.T) { t.Fatalf("Expected one server (still)") } - s2 := &agent.Server{Name: "s2"} + s2 := &metadata.Server{Name: "s2"} m.AddServer(s2) num = m.NumServers() if num != 2 { @@ -85,7 +85,7 @@ func TestServers_IsOffline(t *testing.T) { t.Fatalf("bad") } - s1 := &agent.Server{Name: "s1"} + s1 := &metadata.Server{Name: "s1"} m.AddServer(s1) if m.IsOffline() { t.Fatalf("bad") @@ -117,7 +117,7 @@ func TestServers_IsOffline(t *testing.T) { } } -// func (m *Manager) FindServer() (server *agent.Server) { +// func (m *Manager) FindServer() (server *metadata.Server) { func TestServers_FindServer(t *testing.T) { m := testManager() @@ -125,7 +125,7 @@ func TestServers_FindServer(t *testing.T) { t.Fatalf("Expected nil return") } - m.AddServer(&agent.Server{Name: "s1"}) + m.AddServer(&metadata.Server{Name: "s1"}) if m.NumServers() != 1 { t.Fatalf("Expected one server") } @@ -143,7 +143,7 @@ func TestServers_FindServer(t *testing.T) { t.Fatalf("Expected s1 server (still)") } - m.AddServer(&agent.Server{Name: "s2"}) + m.AddServer(&metadata.Server{Name: "s2"}) if m.NumServers() != 2 { t.Fatalf("Expected two servers") } @@ -175,7 +175,7 @@ func TestServers_New(t *testing.T) { } } -// func (m *Manager) NotifyFailedServer(server *agent.Server) { +// func (m *Manager) NotifyFailedServer(server *metadata.Server) { func TestServers_NotifyFailedServer(t *testing.T) { m := testManager() @@ -183,8 +183,8 @@ func TestServers_NotifyFailedServer(t *testing.T) { t.Fatalf("Expected zero servers to start") } - s1 := &agent.Server{Name: "s1"} - s2 := &agent.Server{Name: "s2"} + s1 := &metadata.Server{Name: "s1"} + s2 := &metadata.Server{Name: "s2"} // Try notifying for a server that is not managed by Manager m.NotifyFailedServer(s1) @@ -237,7 +237,7 @@ func TestServers_NumServers(t *testing.T) { t.Fatalf("Expected zero servers to start") } - s := &agent.Server{} + s := &metadata.Server{} m.AddServer(s) num = m.NumServers() if num != 1 { @@ -256,7 +256,7 @@ func TestServers_RebalanceServers(t *testing.T) { // Make a huge list of nodes. for i := 0; i < maxServers; i++ { nodeName := fmt.Sprintf("s%02d", i) - m.AddServer(&agent.Server{Name: nodeName}) + m.AddServer(&metadata.Server{Name: nodeName}) } // Keep track of how many unique shuffles we get. @@ -282,7 +282,7 @@ func TestServers_RebalanceServers(t *testing.T) { } } -// func (m *Manager) RemoveServer(server *agent.Server) { +// func (m *Manager) RemoveServer(server *metadata.Server) { func TestManager_RemoveServer(t *testing.T) { const nodeNameFmt = "s%02d" m := testManager() @@ -293,21 +293,21 @@ func TestManager_RemoveServer(t *testing.T) { // Test removing server before its added nodeName := fmt.Sprintf(nodeNameFmt, 1) - s1 := &agent.Server{Name: nodeName} + s1 := &metadata.Server{Name: nodeName} m.RemoveServer(s1) m.AddServer(s1) nodeName = fmt.Sprintf(nodeNameFmt, 2) - s2 := &agent.Server{Name: nodeName} + s2 := &metadata.Server{Name: nodeName} m.RemoveServer(s2) m.AddServer(s2) const maxServers = 19 - servers := make([]*agent.Server, maxServers) + servers := make([]*metadata.Server, maxServers) // Already added two servers above for i := maxServers; i > 2; i-- { nodeName := fmt.Sprintf(nodeNameFmt, i) - server := &agent.Server{Name: nodeName} + server := &metadata.Server{Name: nodeName} servers = append(servers, server) m.AddServer(server) } @@ -321,7 +321,7 @@ func TestManager_RemoveServer(t *testing.T) { t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers()) } - findServer := func(server *agent.Server) bool { + findServer := func(server *metadata.Server) bool { for i := m.NumServers(); i > 0; i-- { s := m.FindServer() if s == server { @@ -332,7 +332,7 @@ func TestManager_RemoveServer(t *testing.T) { } expectedNumServers := maxServers - removedServers := make([]*agent.Server, 0, maxServers) + removedServers := make([]*metadata.Server, 0, maxServers) // Remove servers from the front of the list for i := 3; i > 0; i-- { diff --git a/agent/router/router.go b/agent/router/router.go index ac043d9393..3299b781cd 100644 --- a/agent/router/router.go +++ b/agent/router/router.go @@ -6,7 +6,7 @@ import ( "sort" "sync" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/types" @@ -34,7 +34,7 @@ type Router struct { managers map[string][]*Manager // routeFn is a hook to actually do the routing. - routeFn func(datacenter string) (*Manager, *agent.Server, bool) + routeFn func(datacenter string) (*Manager, *metadata.Server, bool) // isShutdown prevents adding new routes to a router after it is shut // down. @@ -140,7 +140,7 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger // initially, and then will quickly detect that they are failed if we // can't reach them. for _, m := range cluster.Members() { - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok { r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q", m.Name, areaID) @@ -206,7 +206,7 @@ func (r *Router) RemoveArea(areaID types.AreaID) error { } // addServer does the work of AddServer once the write lock is held. -func (r *Router) addServer(area *areaInfo, s *agent.Server) error { +func (r *Router) addServer(area *areaInfo, s *metadata.Server) error { // Make the manager on the fly if this is the first we've seen of it, // and add it to the index. info, ok := area.managers[s.Datacenter] @@ -236,7 +236,7 @@ func (r *Router) addServer(area *areaInfo, s *agent.Server) error { // AddServer should be called whenever a new server joins an area. This is // typically hooked into the Serf event handler area for this area. -func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error { +func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error { r.Lock() defer r.Unlock() @@ -249,7 +249,7 @@ func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error { // RemoveServer should be called whenever a server is removed from an area. This // is typically hooked into the Serf event handler area for this area. -func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error { +func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error { r.Lock() defer r.Unlock() @@ -282,7 +282,7 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error { // is typically hooked into the Serf event handler area for this area. We will // immediately shift traffic away from this server, but it will remain in the // list of servers. -func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error { +func (r *Router) FailServer(areaID types.AreaID, s *metadata.Server) error { r.RLock() defer r.RUnlock() @@ -309,13 +309,13 @@ func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error { // connection attempt. If any problem occurs with the given server, the caller // should feed that back to the manager associated with the server, which is // also returned, by calling NofifyFailedServer(). -func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) { +func (r *Router) FindRoute(datacenter string) (*Manager, *metadata.Server, bool) { return r.routeFn(datacenter) } // findDirectRoute looks for a route to the given datacenter if it's directly // adjacent to the server. -func (r *Router) findDirectRoute(datacenter string) (*Manager, *agent.Server, bool) { +func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server, bool) { r.RLock() defer r.RUnlock() @@ -399,7 +399,7 @@ func (r *Router) GetDatacentersByDistance() ([]string, error) { } for _, m := range info.cluster.Members() { - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok { r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q", m.Name, areaID) @@ -460,7 +460,7 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) { for areaID, info := range r.areas { index := make(map[string]structs.Coordinates) for _, m := range info.cluster.Members() { - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok { r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q", m.Name, areaID) diff --git a/agent/router/serf_adapter.go b/agent/router/serf_adapter.go index 768834c0da..12941b140a 100644 --- a/agent/router/serf_adapter.go +++ b/agent/router/serf_adapter.go @@ -3,13 +3,13 @@ package router import ( "log" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/types" "github.com/hashicorp/serf/serf" ) // routerFn selects one of the router operations to map to incoming Serf events. -type routerFn func(types.AreaID, *agent.Server) error +type routerFn func(types.AreaID, *metadata.Server) error // handleMemberEvents attempts to apply the given Serf member event to the given // router function. @@ -21,7 +21,7 @@ func handleMemberEvent(logger *log.Logger, fn routerFn, areaID types.AreaID, e s } for _, m := range me.Members { - ok, parts := agent.IsConsulServer(m) + ok, parts := metadata.IsConsulServer(m) if !ok { logger.Printf("[WARN]: consul: Non-server %q in server-only area %q", m.Name, areaID) diff --git a/agent/router/serf_flooder.go b/agent/router/serf_flooder.go index f13e48fcf3..fee912dd65 100644 --- a/agent/router/serf_flooder.go +++ b/agent/router/serf_flooder.go @@ -6,13 +6,13 @@ import ( "net" "strings" - "github.com/hashicorp/consul/agent/consul/agent" + "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/serf/serf" ) // FloodPortFn gets the port to use for a given server when flood-joining. This // will return false if it doesn't have one. -type FloodPortFn func(*agent.Server) (int, bool) +type FloodPortFn func(*metadata.Server) (int, bool) // FloodJoins attempts to make sure all Consul servers in the local Serf // instance are joined in the global Serf instance. It assumes names in the @@ -27,9 +27,9 @@ func FloodJoins(logger *log.Logger, portFn FloodPortFn, // Index the global side so we can do one pass through the local side // with cheap lookups. - index := make(map[string]*agent.Server) + index := make(map[string]*metadata.Server) for _, m := range globalSerf.Members() { - ok, server := agent.IsConsulServer(m) + ok, server := metadata.IsConsulServer(m) if !ok { continue } @@ -48,7 +48,7 @@ func FloodJoins(logger *log.Logger, portFn FloodPortFn, continue } - ok, server := agent.IsConsulServer(m) + ok, server := metadata.IsConsulServer(m) if !ok { continue }