From 5626d3526cd1e196a4e024c39631d0cf669deaef Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 15 Mar 2017 09:06:21 -0700 Subject: [PATCH] Walks back the changes to change pool address interface into strings. --- consul/autopilot.go | 2 +- consul/client.go | 4 ++-- consul/pool.go | 35 +++++++++++++++++--------------- consul/rpc.go | 4 ++-- consul/serf.go | 2 +- consul/snapshot_endpoint.go | 6 +++--- consul/snapshot_endpoint_test.go | 16 +++++++-------- 7 files changed, 36 insertions(+), 33 deletions(-) diff --git a/consul/autopilot.go b/consul/autopilot.go index 87e186a68e..cc0632a1a0 100644 --- a/consul/autopilot.go +++ b/consul/autopilot.go @@ -361,6 +361,6 @@ func (s *Server) getServerHealth(id string) *structs.ServerHealth { func (s *Server) getServerStats(server *agent.Server) (structs.ServerStats, error) { var args struct{} var reply structs.ServerStats - err := s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, "Status.RaftStats", &args, &reply) + err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.RaftStats", &args, &reply) return reply, err } diff --git a/consul/client.go b/consul/client.go index 6c4b21b211..959fde26b6 100644 --- a/consul/client.go +++ b/consul/client.go @@ -330,7 +330,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { } // Forward to remote Consul - if err := c.connPool.RPC(c.config.Datacenter, server.Addr.String(), server.Version, method, args, reply); err != nil { + if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { c.servers.NotifyFailedServer(server) c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) return err @@ -357,7 +357,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io // Request the operation. var reply structs.SnapshotResponse - snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr.String(), args, in, &reply) + snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr, args, in, &reply) if err != nil { return err } diff --git a/consul/pool.go b/consul/pool.go index 2e83e7a1be..900d9a6037 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -40,7 +40,7 @@ type Conn struct { refCount int32 shouldClose int32 - addr string + addr net.Addr session muxSession lastUsed time.Time version int @@ -189,12 +189,14 @@ func (p *ConnPool) Shutdown() error { // wait for an existing connection attempt to finish, if one if in progress, // and will return that one if it succeeds. If all else fails, it will return a // newly-created connection and add it to the pool. -func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) { +func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) { + addrStr := addr.String() + // Check to see if there's a pooled connection available. This is up // here since it should the the vastly more common case than the rest // of the code here. p.Lock() - c := p.pool[addr] + c := p.pool[addrStr] if c != nil { c.markForUse() p.Unlock() @@ -206,9 +208,9 @@ func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) { // attempt is done. var wait chan struct{} var ok bool - if wait, ok = p.limiter[addr]; !ok { + if wait, ok = p.limiter[addrStr]; !ok { wait = make(chan struct{}) - p.limiter[addr] = wait + p.limiter[addrStr] = wait } isLeadThread := !ok p.Unlock() @@ -218,14 +220,14 @@ func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) { if isLeadThread { c, err := p.getNewConn(dc, addr, version) p.Lock() - delete(p.limiter, addr) + delete(p.limiter, addrStr) close(wait) if err != nil { p.Unlock() return nil, err } - p.pool[addr] = c + p.pool[addrStr] = c p.Unlock() return c, nil } @@ -240,7 +242,7 @@ func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) { // See if the lead thread was able to get us a connection. p.Lock() - if c := p.pool[addr]; c != nil { + if c := p.pool[addrStr]; c != nil { c.markForUse() p.Unlock() return c, nil @@ -261,9 +263,9 @@ type HalfCloser interface { // DialTimeout is used to establish a raw connection to the given server, with a // given connection timeout. -func (p *ConnPool) DialTimeout(dc string, addr string, timeout time.Duration) (net.Conn, HalfCloser, error) { +func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration) (net.Conn, HalfCloser, error) { // Try to dial the conn - conn, err := net.DialTimeout("tcp", addr, defaultDialTimeout) + conn, err := net.DialTimeout("tcp", addr.String(), defaultDialTimeout) if err != nil { return nil, nil, err } @@ -297,7 +299,7 @@ func (p *ConnPool) DialTimeout(dc string, addr string, timeout time.Duration) (n } // getNewConn is used to return a new connection -func (p *ConnPool) getNewConn(dc string, addr string, version int) (*Conn, error) { +func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) { // Get a new, raw connection. conn, _, err := p.DialTimeout(dc, addr, defaultDialTimeout) if err != nil { @@ -343,9 +345,10 @@ func (p *ConnPool) clearConn(conn *Conn) { atomic.StoreInt32(&conn.shouldClose, 1) // Clear from the cache + addrStr := conn.addr.String() p.Lock() - if c, ok := p.pool[conn.addr]; ok && c == conn { - delete(p.pool, conn.addr) + if c, ok := p.pool[addrStr]; ok && c == conn { + delete(p.pool, addrStr) } p.Unlock() @@ -364,7 +367,7 @@ func (p *ConnPool) releaseConn(conn *Conn) { } // getClient is used to get a usable client for an address and protocol version -func (p *ConnPool) getClient(dc string, addr string, version int) (*Conn, *StreamClient, error) { +func (p *ConnPool) getClient(dc string, addr net.Addr, version int) (*Conn, *StreamClient, error) { retries := 0 START: // Try to get a conn first @@ -390,7 +393,7 @@ START: } // RPC is used to make an RPC call to a remote host -func (p *ConnPool) RPC(dc string, addr string, version int, method string, args interface{}, reply interface{}) error { +func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error { // Get a usable client conn, sc, err := p.getClient(dc, addr, version) if err != nil { @@ -415,7 +418,7 @@ func (p *ConnPool) RPC(dc string, addr string, version int, method string, args // returns true if healthy, false if an error occurred func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) { // Get a usable client - conn, sc, err := p.getClient(s.Datacenter, s.Addr.String(), s.Version) + conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version) if err != nil { return false, err } diff --git a/consul/rpc.go b/consul/rpc.go index c8423b071a..4531f2c466 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -262,7 +262,7 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa if server == nil { return structs.ErrNoLeader } - return s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, method, args, reply) + return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply) } // forwardDC is used to forward an RPC call to a remote DC, or fail if no servers @@ -274,7 +274,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ } metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1) - if err := s.connPool.RPC(dc, server.Addr.String(), server.Version, method, args, reply); err != nil { + if err := s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply); err != nil { manager.NotifyFailedServer(server) s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err) return err diff --git a/consul/serf.go b/consul/serf.go index 89fff58ef6..eee7f7e0a6 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -195,7 +195,7 @@ func (s *Server) maybeBootstrap() { // Retry with exponential backoff to get peer status from this server for attempt := uint(0); attempt < maxPeerRetries; attempt++ { - if err := s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, + if err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, "Status.Peers", &struct{}{}, &peers); err != nil { nextRetry := time.Duration((1 << attempt) * peerRetryBase) s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+ diff --git a/consul/snapshot_endpoint.go b/consul/snapshot_endpoint.go index 6bad80c9fb..b00d7b1434 100644 --- a/consul/snapshot_endpoint.go +++ b/consul/snapshot_endpoint.go @@ -35,7 +35,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re return nil, structs.ErrNoDCPath } - snap, err := SnapshotRPC(s.connPool, dc, server.Addr.String(), args, in, reply) + snap, err := SnapshotRPC(s.connPool, dc, server.Addr, args, in, reply) if err != nil { manager.NotifyFailedServer(server) return nil, err @@ -50,7 +50,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re if server == nil { return nil, structs.ErrNoLeader } - return SnapshotRPC(s.connPool, args.Datacenter, server.Addr.String(), args, in, reply) + return SnapshotRPC(s.connPool, args.Datacenter, server.Addr, args, in, reply) } } @@ -160,7 +160,7 @@ RESPOND: // the streaming output (for a snapshot). If the reply contains an error, this // will always return an error as well, so you don't need to check the error // inside the filled-in reply. -func SnapshotRPC(pool *ConnPool, dc string, addr string, +func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) { conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second) diff --git a/consul/snapshot_endpoint_test.go b/consul/snapshot_endpoint_test.go index 753c0c6e34..b72b715bd8 100644 --- a/consul/snapshot_endpoint_test.go +++ b/consul/snapshot_endpoint_test.go @@ -43,7 +43,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) { Op: structs.SnapshotSave, } var reply structs.SnapshotResponse - snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr.String(), + snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr, &args, bytes.NewReader([]byte("")), &reply) if err != nil { t.Fatalf("err: %v", err) @@ -115,7 +115,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) { // Restore the snapshot. args.Op = structs.SnapshotRestore - restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr.String(), + restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr, &args, snap, &reply) if err != nil { t.Fatalf("err: %v", err) @@ -186,7 +186,7 @@ func TestSnapshot_LeaderState(t *testing.T) { Op: structs.SnapshotSave, } var reply structs.SnapshotResponse - snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr.String(), + snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, &args, bytes.NewReader([]byte("")), &reply) if err != nil { t.Fatalf("err: %v", err) @@ -219,7 +219,7 @@ func TestSnapshot_LeaderState(t *testing.T) { // Restore the snapshot. args.Op = structs.SnapshotRestore - restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr.String(), + restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, &args, snap, &reply) if err != nil { t.Fatalf("err: %v", err) @@ -256,7 +256,7 @@ func TestSnapshot_ACLDeny(t *testing.T) { Op: structs.SnapshotSave, } var reply structs.SnapshotResponse - _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr.String(), + _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, &args, bytes.NewReader([]byte("")), &reply) if err == nil || !strings.Contains(err.Error(), permissionDenied) { t.Fatalf("err: %v", err) @@ -270,7 +270,7 @@ func TestSnapshot_ACLDeny(t *testing.T) { Op: structs.SnapshotRestore, } var reply structs.SnapshotResponse - _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr.String(), + _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.RPCAddr, &args, bytes.NewReader([]byte("")), &reply) if err == nil || !strings.Contains(err.Error(), permissionDenied) { t.Fatalf("err: %v", err) @@ -367,7 +367,7 @@ func TestSnapshot_AllowStale(t *testing.T) { Op: structs.SnapshotSave, } var reply structs.SnapshotResponse - _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr.String(), + _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr, &args, bytes.NewReader([]byte("")), &reply) if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) { t.Fatalf("err: %v", err) @@ -384,7 +384,7 @@ func TestSnapshot_AllowStale(t *testing.T) { Op: structs.SnapshotSave, } var reply structs.SnapshotResponse - _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr.String(), + _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.RPCAddr, &args, bytes.NewReader([]byte("")), &reply) if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") { t.Fatalf("err: %v", err)