From c395599ceaf8aca07d19948ef7f9f35ab437d386 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 6 Jul 2017 12:40:54 +0200 Subject: [PATCH] agent: move agent/consul/servers to agent/router --- agent/consul/client.go | 24 +++++++++---------- agent/consul/client_test.go | 12 +++++----- agent/consul/flood.go | 6 ++--- agent/consul/server.go | 10 ++++---- agent/{consul/servers => router}/manager.go | 2 +- .../manager_internal_test.go | 2 +- .../servers => router}/manager_test.go | 14 +++++------ agent/{consul/servers => router}/router.go | 2 +- .../{consul/servers => router}/router_test.go | 2 +- .../servers => router}/serf_adapter.go | 2 +- .../servers => router}/serf_flooder.go | 2 +- 11 files changed, 39 insertions(+), 39 deletions(-) rename agent/{consul/servers => router}/manager.go (99%) rename agent/{consul/servers => router}/manager_internal_test.go (99%) rename agent/{consul/servers => router}/manager_test.go (95%) rename agent/{consul/servers => router}/router.go (99%) rename agent/{consul/servers => router}/router_test.go (99%) rename agent/{consul/servers => router}/serf_adapter.go (99%) rename agent/{consul/servers => router}/serf_flooder.go (99%) diff --git a/agent/consul/client.go b/agent/consul/client.go index 8e23e4b884..dc562e1313 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -12,8 +12,8 @@ import ( "time" "github.com/hashicorp/consul/agent/consul/agent" - "github.com/hashicorp/consul/agent/consul/servers" "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/serf/coordinate" @@ -52,9 +52,9 @@ type Client struct { // Connection pool to consul servers connPool *pool.ConnPool - // servers is responsible for the selection and maintenance of + // routers is responsible for the selection and maintenance of // Consul servers this agent uses for RPC requests - servers *servers.Manager + routers *router.Manager // eventCh is used to receive events from the // serf cluster in the datacenter @@ -140,8 +140,8 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) { } // Start maintenance task for servers - c.servers = servers.New(c.logger, c.shutdownCh, c.serf, c.connPool) - go c.servers.Start() + c.routers = router.New(c.logger, c.shutdownCh, c.serf, c.connPool) + go c.routers.Start() return c, nil } @@ -285,7 +285,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { continue } c.logger.Printf("[INFO] consul: adding server %s", parts) - c.servers.AddServer(parts) + c.routers.AddServer(parts) // Trigger the callback if c.config.ServerUp != nil { @@ -302,7 +302,7 @@ func (c *Client) nodeFail(me serf.MemberEvent) { continue } c.logger.Printf("[INFO] consul: removing server %s", parts) - c.servers.RemoveServer(parts) + c.routers.RemoveServer(parts) } } @@ -336,14 +336,14 @@ func (c *Client) localEvent(event serf.UserEvent) { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { - server := c.servers.FindServer() + server := c.routers.FindServer() if server == nil { return structs.ErrNoServers } // Forward to remote Consul if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil { - c.servers.NotifyFailedServer(server) + c.routers.NotifyFailedServer(server) c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) return err } @@ -358,7 +358,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io replyFn structs.SnapshotReplyFn) error { // Locate a server to make the request to. - server := c.servers.FindServer() + server := c.routers.FindServer() if server == nil { return structs.ErrNoServers } @@ -395,7 +395,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io // Stats is used to return statistics for debugging and insight // for various sub-systems func (c *Client) Stats() map[string]map[string]string { - numServers := c.servers.NumServers() + numServers := c.routers.NumServers() toString := func(v uint64) string { return strconv.FormatUint(v, 10) @@ -412,7 +412,7 @@ func (c *Client) Stats() map[string]map[string]string { } func (c *Client) ServerAddrs() map[string]string { - return c.servers.GetServerAddrs() + return c.routers.GetServerAddrs() } // GetLANCoordinate returns the network coordinate of the current node, as diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index e5a79d1aa6..ba29912750 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -84,7 +84,7 @@ func TestClient_JoinLAN(t *testing.T) { // Try to join joinLAN(t, c1, s1) retry.Run(t, func(r *retry.R) { - if got, want := c1.servers.NumServers(), 1; got != want { + if got, want := c1.routers.NumServers(), 1; got != want { r.Fatalf("got %d servers want %d", got, want) } if got, want := len(s1.LANMembers()), 2; got != want { @@ -254,7 +254,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { } // Sleep to allow Serf to sync, shuffle, and let the shuffle complete - c.servers.ResetRebalanceTimer() + c.routers.ResetRebalanceTimer() time.Sleep(time.Second) if len(c.LANMembers()) != numServers+numClients { @@ -270,7 +270,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { var pingCount int for range servers { time.Sleep(200 * time.Millisecond) - s := c.servers.FindServer() + s := c.routers.FindServer() ok, err := c.connPool.Ping(s.Datacenter, s.Addr, s.Version, s.UseTLS) if !ok { t.Errorf("Unable to ping server %v: %s", s.String(), err) @@ -279,7 +279,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { // Artificially fail the server in order to rotate the server // list - c.servers.NotifyFailedServer(s) + c.routers.NotifyFailedServer(s) } if pingCount != numServers { @@ -354,7 +354,7 @@ func TestClient_SnapshotRPC(t *testing.T) { // Wait until we've got a healthy server. retry.Run(t, func(r *retry.R) { - if got, want := c1.servers.NumServers(), 1; got != want { + if got, want := c1.routers.NumServers(), 1; got != want { r.Fatalf("got %d servers want %d", got, want) } }) @@ -413,7 +413,7 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) { } // Wait until we've got a healthy server. - if got, want := c1.servers.NumServers(), 1; got != want { + if got, want := c1.routers.NumServers(), 1; got != want { r.Fatalf("got %d servers want %d", got, want) } }) diff --git a/agent/consul/flood.go b/agent/consul/flood.go index 12705c8b9d..0e233eab9c 100644 --- a/agent/consul/flood.go +++ b/agent/consul/flood.go @@ -3,7 +3,7 @@ package consul import ( "time" - "github.com/hashicorp/consul/agent/consul/servers" + "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/serf/serf" ) @@ -24,7 +24,7 @@ func (s *Server) FloodNotify() { // Flood is a long-running goroutine that floods servers from the LAN to the // given global Serf instance, such as the WAN. This will exit once either of // the Serf instances are shut down. -func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) { +func (s *Server) Flood(portFn router.FloodPortFn, global *serf.Serf) { s.floodLock.Lock() floodCh := make(chan struct{}) s.floodCh = append(s.floodCh, floodCh) @@ -61,6 +61,6 @@ func (s *Server) Flood(portFn servers.FloodPortFn, global *serf.Serf) { } FLOOD: - servers.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global) + router.FloodJoins(s.logger, portFn, s.config.Datacenter, s.serfLAN, global) } } diff --git a/agent/consul/server.go b/agent/consul/server.go index 5c89e9e051..f72795a107 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -19,9 +19,9 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/agent" - "github.com/hashicorp/consul/agent/consul/servers" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/lib" @@ -154,7 +154,7 @@ type Server struct { // router is used to map out Consul servers in the WAN and in Consul // Enterprise user-defined areas. - router *servers.Router + router *router.Router // Listener is used to listen for incoming connections Listener net.Listener @@ -181,7 +181,7 @@ type Server struct { sessionTimers *SessionTimers // statsFetcher is used by autopilot to check the status of the other - // Consul servers. + // Consul router. statsFetcher *StatsFetcher // reassertLeaderCh is used to signal the leader loop should re-run @@ -298,7 +298,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* localConsuls: make(map[raft.ServerAddress]*agent.Server), logger: logger, reconcileCh: make(chan serf.Member, 32), - router: servers.NewRouter(logger, config.Datacenter), + router: router.NewRouter(logger, config.Datacenter), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, reassertLeaderCh: make(chan chan error), @@ -382,7 +382,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* s.Shutdown() return nil, fmt.Errorf("Failed to add WAN serf route: %v", err) } - go servers.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) + go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) // Fire up the LAN <-> WAN join flooder. portFn := func(s *agent.Server) (int, bool) { diff --git a/agent/consul/servers/manager.go b/agent/router/manager.go similarity index 99% rename from agent/consul/servers/manager.go rename to agent/router/manager.go index 092efadd7b..497191ba80 100644 --- a/agent/consul/servers/manager.go +++ b/agent/router/manager.go @@ -3,7 +3,7 @@ // client's perspective (i.e. a list of servers that a client talks with for // RPCs). The servers package does not provide any API guarantees and should // be called only by `hashicorp/consul`. -package servers +package router import ( "log" diff --git a/agent/consul/servers/manager_internal_test.go b/agent/router/manager_internal_test.go similarity index 99% rename from agent/consul/servers/manager_internal_test.go rename to agent/router/manager_internal_test.go index c6c993697d..c11a3c49e8 100644 --- a/agent/consul/servers/manager_internal_test.go +++ b/agent/router/manager_internal_test.go @@ -1,4 +1,4 @@ -package servers +package router import ( "bytes" diff --git a/agent/consul/servers/manager_test.go b/agent/router/manager_test.go similarity index 95% rename from agent/consul/servers/manager_test.go rename to agent/router/manager_test.go index 8c8041bd5a..35e17d463f 100644 --- a/agent/consul/servers/manager_test.go +++ b/agent/router/manager_test.go @@ -1,4 +1,4 @@ -package servers_test +package router_test import ( "fmt" @@ -10,7 +10,7 @@ import ( "testing" "github.com/hashicorp/consul/agent/consul/agent" - "github.com/hashicorp/consul/agent/consul/servers" + "github.com/hashicorp/consul/agent/router" ) type fauxConnPool struct { @@ -34,17 +34,17 @@ func (s *fauxSerf) NumNodes() int { return 16384 } -func testManager() (m *servers.Manager) { +func testManager() (m *router.Manager) { logger := log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) + m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) return m } -func testManagerFailProb(failPct float64) (m *servers.Manager) { +func testManagerFailProb(failPct float64) (m *router.Manager) { logger := log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) + m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) return m } @@ -169,7 +169,7 @@ func TestServers_FindServer(t *testing.T) { func TestServers_New(t *testing.T) { logger := log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - m := servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) + m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) if m == nil { t.Fatalf("Manager nil") } diff --git a/agent/consul/servers/router.go b/agent/router/router.go similarity index 99% rename from agent/consul/servers/router.go rename to agent/router/router.go index b295cca83a..ac043d9393 100644 --- a/agent/consul/servers/router.go +++ b/agent/router/router.go @@ -1,4 +1,4 @@ -package servers +package router import ( "fmt" diff --git a/agent/consul/servers/router_test.go b/agent/router/router_test.go similarity index 99% rename from agent/consul/servers/router_test.go rename to agent/router/router_test.go index c6f430079c..f6cf2a08ed 100644 --- a/agent/consul/servers/router_test.go +++ b/agent/router/router_test.go @@ -1,4 +1,4 @@ -package servers +package router import ( "fmt" diff --git a/agent/consul/servers/serf_adapter.go b/agent/router/serf_adapter.go similarity index 99% rename from agent/consul/servers/serf_adapter.go rename to agent/router/serf_adapter.go index 052696b4ac..768834c0da 100644 --- a/agent/consul/servers/serf_adapter.go +++ b/agent/router/serf_adapter.go @@ -1,4 +1,4 @@ -package servers +package router import ( "log" diff --git a/agent/consul/servers/serf_flooder.go b/agent/router/serf_flooder.go similarity index 99% rename from agent/consul/servers/serf_flooder.go rename to agent/router/serf_flooder.go index bf49aee67b..f13e48fcf3 100644 --- a/agent/consul/servers/serf_flooder.go +++ b/agent/router/serf_flooder.go @@ -1,4 +1,4 @@ -package servers +package router import ( "fmt"