diff --git a/agent/consul/auto_encrypt.go b/agent/consul/auto_encrypt.go index e5dd7be16e..3beace5984 100644 --- a/agent/consul/auto_encrypt.go +++ b/agent/consul/auto_encrypt.go @@ -109,7 +109,7 @@ func (c *Client) RequestAutoEncryptCerts(servers []string, port int, token strin for _, ip := range ips { addr := net.TCPAddr{IP: ip, Port: port} - if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, 0, "AutoEncrypt.Sign", &args, &reply); err == nil { + if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, "AutoEncrypt.Sign", &args, &reply); err == nil { return &reply, pkPEM, nil } else { c.logger.Warn("AutoEncrypt failed", "error", err) diff --git a/agent/consul/client.go b/agent/consul/client.go index bcaf5aac19..c7a36293ba 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -308,7 +308,7 @@ TRY: } // Make the request. - rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, server.Version, method, args, reply) + rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply) if rpcErr == nil { return nil } diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index 8b037b18b4..5cb6b68438 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -425,7 +425,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { for range servers { time.Sleep(200 * time.Millisecond) s := c.routers.FindServer() - ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr, s.Version) + ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr) if !ok { t.Errorf("Unable to ping server %v: %s", s.String(), err) } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index f00b14ea82..6edaa54a4d 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -552,7 +552,7 @@ CHECK_LEADER: rpcErr := structs.ErrNoLeader if leader != nil { rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr, - leader.Version, method, args, reply) + method, args, reply) if rpcErr != nil && canRetry(info, rpcErr) { goto RETRY } @@ -617,7 +617,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1, []metrics.Label{{Name: "datacenter", Value: dc}}) - if err := s.connPool.RPC(dc, server.ShortName, server.Addr, server.Version, method, args, reply); err != nil { + if err := s.connPool.RPC(dc, server.ShortName, server.Addr, method, args, reply); err != nil { manager.NotifyFailedServer(server) s.rpcLogger().Error("RPC failed to server in DC", "server", server.Addr, diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 9c717a6008..00f88091aa 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -355,7 +355,7 @@ func (s *Server) maybeBootstrap() { // Retry with exponential backoff to get peer status from this server for attempt := uint(0); attempt < maxPeerRetries; attempt++ { - if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr, server.Version, + if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr, "Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil { nextRetry := (1 << attempt) * time.Second s.logger.Error("Failed to confirm peer status for server (will retry).", diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index a49d18e926..9cd5b4c7ce 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -1277,7 +1277,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) { if leader == nil { t.Fatal("no leader") } - return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr, leader.Version) + return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr) } func TestServer_TLSToNoTLS(t *testing.T) { diff --git a/agent/consul/stats_fetcher.go b/agent/consul/stats_fetcher.go index 1635126d55..bd283f9e83 100644 --- a/agent/consul/stats_fetcher.go +++ b/agent/consul/stats_fetcher.go @@ -43,7 +43,7 @@ func NewStatsFetcher(logger hclog.Logger, pool *pool.ConnPool, datacenter string func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) { var args struct{} var reply autopilot.ServerStats - err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, server.Version, "Status.RaftStats", &args, &reply) + err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, "Status.RaftStats", &args, &reply) if err != nil { f.logger.Warn("error getting server health from server", "server", server.Name, diff --git a/agent/pool/pool.go b/agent/pool/pool.go index 4ce7c1e460..16dcb7a91c 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -46,7 +46,6 @@ type Conn struct { addr net.Addr session muxSession lastUsed time.Time - version int pool *ConnPool @@ -209,7 +208,7 @@ func (p *ConnPool) Shutdown() error { // wait for an existing connection attempt to finish, if one if in progress, // and will return that one if it succeeds. If all else fails, it will return a // newly-created connection and add it to the pool. -func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, error) { +func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr, useTLS bool) (*Conn, error) { if nodeName == "" { return nil, fmt.Errorf("pool: ConnPool.acquire requires a node name") } @@ -244,7 +243,7 @@ func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr, version in // If we are the lead thread, make the new connection and then wake // everybody else up to see if we got it. if isLeadThread { - c, err := p.getNewConn(dc, nodeName, addr, version, useTLS) + c, err := p.getNewConn(dc, nodeName, addr, useTLS) p.Lock() delete(p.limiter, addrStr) close(wait) @@ -497,17 +496,11 @@ func DialTimeoutWithRPCTypeViaMeshGateway( } // getNewConn is used to return a new connection -func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, error) { +func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, useTLS bool) (*Conn, error) { if nodeName == "" { return nil, fmt.Errorf("pool: ConnPool.getNewConn requires a node name") } - // Switch the multiplexing based on version - var session muxSession - if version < 2 { - return nil, fmt.Errorf("cannot make client connection, unsupported protocol version %d", version) - } - // Get a new, raw connection and write the Consul multiplex byte to set the mode conn, _, err := p.DialTimeout(dc, nodeName, addr, defaultDialTimeout, useTLS, RPCMultiplexV2) if err != nil { @@ -519,7 +512,7 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version conf.LogOutput = p.LogOutput // Create a multiplexed session - session, _ = yamux.Client(conn, conf) + session, _ := yamux.Client(conn, conf) // Wrap the connection c := &Conn{ @@ -529,7 +522,6 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version session: session, clients: list.New(), lastUsed: time.Now(), - version: version, pool: p, } return c, nil @@ -567,12 +559,12 @@ func (p *ConnPool) releaseConn(conn *Conn) { } } -// getClient is used to get a usable client for an address and protocol version -func (p *ConnPool) getClient(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, *StreamClient, error) { +// getClient is used to get a usable client for an address +func (p *ConnPool) getClient(dc string, nodeName string, addr net.Addr, useTLS bool) (*Conn, *StreamClient, error) { retries := 0 START: // Try to get a conn first - conn, err := p.acquire(dc, nodeName, addr, version, useTLS) + conn, err := p.acquire(dc, nodeName, addr, useTLS) if err != nil { return nil, nil, fmt.Errorf("failed to get conn: %v", err) } @@ -598,7 +590,6 @@ func (p *ConnPool) RPC( dc string, nodeName string, addr net.Addr, - version int, method string, args interface{}, reply interface{}, @@ -610,7 +601,7 @@ func (p *ConnPool) RPC( if method == "AutoEncrypt.Sign" { return p.rpcInsecure(dc, nodeName, addr, method, args, reply) } else { - return p.rpc(dc, nodeName, addr, version, method, args, reply) + return p.rpc(dc, nodeName, addr, method, args, reply) } } @@ -636,12 +627,12 @@ func (p *ConnPool) rpcInsecure(dc string, nodeName string, addr net.Addr, method return nil } -func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error { +func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error { p.once.Do(p.init) // Get a usable client useTLS := p.TLSConfigurator.UseTLS(dc) - conn, sc, err := p.getClient(dc, nodeName, addr, version, useTLS) + conn, sc, err := p.getClient(dc, nodeName, addr, useTLS) if err != nil { return fmt.Errorf("rpc error getting client: %v", err) } @@ -671,9 +662,9 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, version int, m // Ping sends a Status.Ping message to the specified server and // returns true if healthy, false if an error occurred -func (p *ConnPool) Ping(dc string, nodeName string, addr net.Addr, version int) (bool, error) { +func (p *ConnPool) Ping(dc string, nodeName string, addr net.Addr) (bool, error) { var out struct{} - err := p.RPC(dc, nodeName, addr, version, "Status.Ping", struct{}{}, &out) + err := p.RPC(dc, nodeName, addr, "Status.Ping", struct{}{}, &out) return err == nil, err } diff --git a/agent/router/manager.go b/agent/router/manager.go index 7944e48d26..3715d85b4f 100644 --- a/agent/router/manager.go +++ b/agent/router/manager.go @@ -61,7 +61,7 @@ type ManagerSerfCluster interface { // Pinger is an interface wrapping client.ConnPool to prevent a cyclic import // dependency. type Pinger interface { - Ping(dc, nodeName string, addr net.Addr, version int) (bool, error) + Ping(dc, nodeName string, addr net.Addr) (bool, error) } // serverList is a local copy of the struct used to maintain the list of @@ -350,7 +350,7 @@ func (m *Manager) RebalanceServers() { if m.serverName != "" && srv.Name == m.serverName { continue } - ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.ShortName, srv.Addr, srv.Version) + ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.ShortName, srv.Addr) if ok { foundHealthyServer = true break diff --git a/agent/router/manager_internal_test.go b/agent/router/manager_internal_test.go index b06ccc98d5..10f39fbf9c 100644 --- a/agent/router/manager_internal_test.go +++ b/agent/router/manager_internal_test.go @@ -33,7 +33,7 @@ type fauxConnPool struct { failPct float64 } -func (cp *fauxConnPool) Ping(string, string, net.Addr, int) (bool, error) { +func (cp *fauxConnPool) Ping(string, string, net.Addr) (bool, error) { var success bool successProb := rand.Float64() if successProb > cp.failPct { @@ -179,7 +179,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { // failPct of the servers for the reconcile. This // allows for the selected server to no longer be // healthy for the reconcile below. - if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.ShortName, node.Addr, node.Version); ok { + if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.ShortName, node.Addr); ok { // Will still be present healthyServers = append(healthyServers, node) } else { diff --git a/agent/router/manager_test.go b/agent/router/manager_test.go index 3b99bfe654..6888891c09 100644 --- a/agent/router/manager_test.go +++ b/agent/router/manager_test.go @@ -32,7 +32,7 @@ type fauxConnPool struct { failAddr net.Addr } -func (cp *fauxConnPool) Ping(dc string, nodeName string, addr net.Addr, version int) (bool, error) { +func (cp *fauxConnPool) Ping(dc string, nodeName string, addr net.Addr) (bool, error) { var success bool successProb := rand.Float64()