From 376fdde779aa6d67138efc687cfe6c45e63aca54 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Sun, 5 Mar 2017 14:30:49 -0800 Subject: [PATCH] Changes pool's dial address to a string and adds a timeout. --- consul/client.go | 4 ++-- consul/pool.go | 37 +++++++++++++++++++------------------ consul/rpc.go | 4 ++-- consul/serf.go | 2 +- consul/snapshot_endpoint.go | 9 +++++---- 5 files changed, 29 insertions(+), 27 deletions(-) diff --git a/consul/client.go b/consul/client.go index 6ef697e7c3..fec3a571a8 100644 --- a/consul/client.go +++ b/consul/client.go @@ -331,7 +331,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { } // Forward to remote Consul - if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { + if err := c.connPool.RPC(c.config.Datacenter, server.Addr.String(), server.Version, method, args, reply); err != nil { c.servers.NotifyFailedServer(server) c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) return err @@ -358,7 +358,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io // Request the operation. var reply structs.SnapshotResponse - snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr, args, in, &reply) + snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr.String(), args, in, &reply) if err != nil { return err } diff --git a/consul/pool.go b/consul/pool.go index 2002b997a4..9c8a6125d9 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -38,7 +38,7 @@ type Conn struct { refCount int32 shouldClose int32 - addr net.Addr + addr string session muxSession lastUsed time.Time version int @@ -187,12 +187,12 @@ func (p *ConnPool) Shutdown() error { // wait for an existing connection attempt to finish, if one if in progress, // and will return that one if it succeeds. If all else fails, it will return a // newly-created connection and add it to the pool. -func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) { +func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) { // Check to see if there's a pooled connection available. This is up // here since it should the the vastly more common case than the rest // of the code here. p.Lock() - c := p.pool[addr.String()] + c := p.pool[addr] if c != nil { c.markForUse() p.Unlock() @@ -204,9 +204,9 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) // attempt is done. var wait chan struct{} var ok bool - if wait, ok = p.limiter[addr.String()]; !ok { + if wait, ok = p.limiter[addr]; !ok { wait = make(chan struct{}) - p.limiter[addr.String()] = wait + p.limiter[addr] = wait } isLeadThread := !ok p.Unlock() @@ -216,14 +216,14 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) if isLeadThread { c, err := p.getNewConn(dc, addr, version) p.Lock() - delete(p.limiter, addr.String()) + delete(p.limiter, addr) close(wait) if err != nil { p.Unlock() return nil, err } - p.pool[addr.String()] = c + p.pool[addr] = c p.Unlock() return c, nil } @@ -238,7 +238,7 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) // See if the lead thread was able to get us a connection. p.Lock() - if c := p.pool[addr.String()]; c != nil { + if c := p.pool[addr]; c != nil { c.markForUse() p.Unlock() return c, nil @@ -257,10 +257,11 @@ type HalfCloser interface { CloseWrite() error } -// Dial is used to establish a raw connection to the given server. -func (p *ConnPool) Dial(dc string, addr net.Addr) (net.Conn, HalfCloser, error) { +// DialTimeout is used to establish a raw connection to the given server, with a +// given connection timeout. +func (p *ConnPool) DialTimeout(dc string, addr string, timeout time.Duration) (net.Conn, HalfCloser, error) { // Try to dial the conn - conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second) + conn, err := net.DialTimeout("tcp", addr, 10*time.Second) if err != nil { return nil, nil, err } @@ -294,9 +295,9 @@ func (p *ConnPool) Dial(dc string, addr net.Addr) (net.Conn, HalfCloser, error) } // getNewConn is used to return a new connection -func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) { +func (p *ConnPool) getNewConn(dc string, addr string, version int) (*Conn, error) { // Get a new, raw connection. - conn, _, err := p.Dial(dc, addr) + conn, _, err := p.DialTimeout(dc, addr, 10*time.Second) if err != nil { return nil, err } @@ -341,8 +342,8 @@ func (p *ConnPool) clearConn(conn *Conn) { // Clear from the cache p.Lock() - if c, ok := p.pool[conn.addr.String()]; ok && c == conn { - delete(p.pool, conn.addr.String()) + if c, ok := p.pool[conn.addr]; ok && c == conn { + delete(p.pool, conn.addr) } p.Unlock() @@ -361,7 +362,7 @@ func (p *ConnPool) releaseConn(conn *Conn) { } // getClient is used to get a usable client for an address and protocol version -func (p *ConnPool) getClient(dc string, addr net.Addr, version int) (*Conn, *StreamClient, error) { +func (p *ConnPool) getClient(dc string, addr string, version int) (*Conn, *StreamClient, error) { retries := 0 START: // Try to get a conn first @@ -387,7 +388,7 @@ START: } // RPC is used to make an RPC call to a remote host -func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error { +func (p *ConnPool) RPC(dc string, addr string, version int, method string, args interface{}, reply interface{}) error { // Get a usable client conn, sc, err := p.getClient(dc, addr, version) if err != nil { @@ -412,7 +413,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg // returns true if healthy, false if an error occurred func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) { // Get a usable client - conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version) + conn, sc, err := p.getClient(s.Datacenter, s.Addr.String(), s.Version) if err != nil { return false, err } diff --git a/consul/rpc.go b/consul/rpc.go index ef11c376b4..ed4c418cf9 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -263,7 +263,7 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa if server == nil { return structs.ErrNoLeader } - return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply) + return s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, method, args, reply) } // getRemoteServer returns a random server from a remote datacenter. This uses @@ -290,7 +290,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ } metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1) - return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply) + return s.connPool.RPC(dc, server.Addr.String(), server.Version, method, args, reply) } // globalRPC is used to forward an RPC request to one server in each datacenter. diff --git a/consul/serf.go b/consul/serf.go index c449d73a24..7e23134ace 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -249,7 +249,7 @@ func (s *Server) maybeBootstrap() { // Retry with exponential backoff to get peer status from this server for attempt := uint(0); attempt < maxPeerRetries; attempt++ { - if err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, + if err := s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, "Status.Peers", &struct{}{}, &peers); err != nil { nextRetry := time.Duration((1 << attempt) * peerRetryBase) s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+ diff --git a/consul/snapshot_endpoint.go b/consul/snapshot_endpoint.go index 274f071af6..a67cc2558b 100644 --- a/consul/snapshot_endpoint.go +++ b/consul/snapshot_endpoint.go @@ -14,6 +14,7 @@ import ( "io" "io/ioutil" "net" + "time" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/snapshot" @@ -33,7 +34,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re if !ok { return nil, structs.ErrNoDCPath } - return SnapshotRPC(s.connPool, dc, server.Addr, args, in, reply) + return SnapshotRPC(s.connPool, dc, server.Addr.String(), args, in, reply) } // Perform leader forwarding if required. @@ -42,7 +43,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re if server == nil { return nil, structs.ErrNoLeader } - return SnapshotRPC(s.connPool, args.Datacenter, server.Addr, args, in, reply) + return SnapshotRPC(s.connPool, args.Datacenter, server.Addr.String(), args, in, reply) } } @@ -152,10 +153,10 @@ RESPOND: // the streaming output (for a snapshot). If the reply contains an error, this // will always return an error as well, so you don't need to check the error // inside the filled-in reply. -func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, +func SnapshotRPC(pool *ConnPool, dc string, addr string, args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) { - conn, hc, err := pool.Dial(dc, addr) + conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second) if err != nil { return nil, err }