mirror of
https://github.com/status-im/consul.git
synced 2025-01-24 12:40:17 +00:00
Changes pool's dial address to a string and adds a timeout.
This commit is contained in:
parent
b333f3ea04
commit
376fdde779
@ -331,7 +331,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Forward to remote Consul
|
// Forward to remote Consul
|
||||||
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
if err := c.connPool.RPC(c.config.Datacenter, server.Addr.String(), server.Version, method, args, reply); err != nil {
|
||||||
c.servers.NotifyFailedServer(server)
|
c.servers.NotifyFailedServer(server)
|
||||||
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
|
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
|
||||||
return err
|
return err
|
||||||
@ -358,7 +358,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
|
|||||||
|
|
||||||
// Request the operation.
|
// Request the operation.
|
||||||
var reply structs.SnapshotResponse
|
var reply structs.SnapshotResponse
|
||||||
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr, args, in, &reply)
|
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.Addr.String(), args, in, &reply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ type Conn struct {
|
|||||||
refCount int32
|
refCount int32
|
||||||
shouldClose int32
|
shouldClose int32
|
||||||
|
|
||||||
addr net.Addr
|
addr string
|
||||||
session muxSession
|
session muxSession
|
||||||
lastUsed time.Time
|
lastUsed time.Time
|
||||||
version int
|
version int
|
||||||
@ -187,12 +187,12 @@ func (p *ConnPool) Shutdown() error {
|
|||||||
// wait for an existing connection attempt to finish, if one if in progress,
|
// wait for an existing connection attempt to finish, if one if in progress,
|
||||||
// and will return that one if it succeeds. If all else fails, it will return a
|
// and will return that one if it succeeds. If all else fails, it will return a
|
||||||
// newly-created connection and add it to the pool.
|
// newly-created connection and add it to the pool.
|
||||||
func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) {
|
func (p *ConnPool) acquire(dc string, addr string, version int) (*Conn, error) {
|
||||||
// Check to see if there's a pooled connection available. This is up
|
// Check to see if there's a pooled connection available. This is up
|
||||||
// here since it should the the vastly more common case than the rest
|
// here since it should the the vastly more common case than the rest
|
||||||
// of the code here.
|
// of the code here.
|
||||||
p.Lock()
|
p.Lock()
|
||||||
c := p.pool[addr.String()]
|
c := p.pool[addr]
|
||||||
if c != nil {
|
if c != nil {
|
||||||
c.markForUse()
|
c.markForUse()
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
@ -204,9 +204,9 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
|
|||||||
// attempt is done.
|
// attempt is done.
|
||||||
var wait chan struct{}
|
var wait chan struct{}
|
||||||
var ok bool
|
var ok bool
|
||||||
if wait, ok = p.limiter[addr.String()]; !ok {
|
if wait, ok = p.limiter[addr]; !ok {
|
||||||
wait = make(chan struct{})
|
wait = make(chan struct{})
|
||||||
p.limiter[addr.String()] = wait
|
p.limiter[addr] = wait
|
||||||
}
|
}
|
||||||
isLeadThread := !ok
|
isLeadThread := !ok
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
@ -216,14 +216,14 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
|
|||||||
if isLeadThread {
|
if isLeadThread {
|
||||||
c, err := p.getNewConn(dc, addr, version)
|
c, err := p.getNewConn(dc, addr, version)
|
||||||
p.Lock()
|
p.Lock()
|
||||||
delete(p.limiter, addr.String())
|
delete(p.limiter, addr)
|
||||||
close(wait)
|
close(wait)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
p.pool[addr.String()] = c
|
p.pool[addr] = c
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
@ -238,7 +238,7 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error)
|
|||||||
|
|
||||||
// See if the lead thread was able to get us a connection.
|
// See if the lead thread was able to get us a connection.
|
||||||
p.Lock()
|
p.Lock()
|
||||||
if c := p.pool[addr.String()]; c != nil {
|
if c := p.pool[addr]; c != nil {
|
||||||
c.markForUse()
|
c.markForUse()
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
return c, nil
|
return c, nil
|
||||||
@ -257,10 +257,11 @@ type HalfCloser interface {
|
|||||||
CloseWrite() error
|
CloseWrite() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dial is used to establish a raw connection to the given server.
|
// DialTimeout is used to establish a raw connection to the given server, with a
|
||||||
func (p *ConnPool) Dial(dc string, addr net.Addr) (net.Conn, HalfCloser, error) {
|
// given connection timeout.
|
||||||
|
func (p *ConnPool) DialTimeout(dc string, addr string, timeout time.Duration) (net.Conn, HalfCloser, error) {
|
||||||
// Try to dial the conn
|
// Try to dial the conn
|
||||||
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
|
conn, err := net.DialTimeout("tcp", addr, 10*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -294,9 +295,9 @@ func (p *ConnPool) Dial(dc string, addr net.Addr) (net.Conn, HalfCloser, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getNewConn is used to return a new connection
|
// getNewConn is used to return a new connection
|
||||||
func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) {
|
func (p *ConnPool) getNewConn(dc string, addr string, version int) (*Conn, error) {
|
||||||
// Get a new, raw connection.
|
// Get a new, raw connection.
|
||||||
conn, _, err := p.Dial(dc, addr)
|
conn, _, err := p.DialTimeout(dc, addr, 10*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -341,8 +342,8 @@ func (p *ConnPool) clearConn(conn *Conn) {
|
|||||||
|
|
||||||
// Clear from the cache
|
// Clear from the cache
|
||||||
p.Lock()
|
p.Lock()
|
||||||
if c, ok := p.pool[conn.addr.String()]; ok && c == conn {
|
if c, ok := p.pool[conn.addr]; ok && c == conn {
|
||||||
delete(p.pool, conn.addr.String())
|
delete(p.pool, conn.addr)
|
||||||
}
|
}
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
|
|
||||||
@ -361,7 +362,7 @@ func (p *ConnPool) releaseConn(conn *Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getClient is used to get a usable client for an address and protocol version
|
// getClient is used to get a usable client for an address and protocol version
|
||||||
func (p *ConnPool) getClient(dc string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
|
func (p *ConnPool) getClient(dc string, addr string, version int) (*Conn, *StreamClient, error) {
|
||||||
retries := 0
|
retries := 0
|
||||||
START:
|
START:
|
||||||
// Try to get a conn first
|
// Try to get a conn first
|
||||||
@ -387,7 +388,7 @@ START:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RPC is used to make an RPC call to a remote host
|
// RPC is used to make an RPC call to a remote host
|
||||||
func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
|
func (p *ConnPool) RPC(dc string, addr string, version int, method string, args interface{}, reply interface{}) error {
|
||||||
// Get a usable client
|
// Get a usable client
|
||||||
conn, sc, err := p.getClient(dc, addr, version)
|
conn, sc, err := p.getClient(dc, addr, version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -412,7 +413,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg
|
|||||||
// returns true if healthy, false if an error occurred
|
// returns true if healthy, false if an error occurred
|
||||||
func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) {
|
func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) {
|
||||||
// Get a usable client
|
// Get a usable client
|
||||||
conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version)
|
conn, sc, err := p.getClient(s.Datacenter, s.Addr.String(), s.Version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -263,7 +263,7 @@ func (s *Server) forwardLeader(server *agent.Server, method string, args interfa
|
|||||||
if server == nil {
|
if server == nil {
|
||||||
return structs.ErrNoLeader
|
return structs.ErrNoLeader
|
||||||
}
|
}
|
||||||
return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply)
|
return s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version, method, args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getRemoteServer returns a random server from a remote datacenter. This uses
|
// getRemoteServer returns a random server from a remote datacenter. This uses
|
||||||
@ -290,7 +290,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
|
|||||||
}
|
}
|
||||||
|
|
||||||
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
|
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
|
||||||
return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply)
|
return s.connPool.RPC(dc, server.Addr.String(), server.Version, method, args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
// globalRPC is used to forward an RPC request to one server in each datacenter.
|
// globalRPC is used to forward an RPC request to one server in each datacenter.
|
||||||
|
@ -249,7 +249,7 @@ func (s *Server) maybeBootstrap() {
|
|||||||
|
|
||||||
// Retry with exponential backoff to get peer status from this server
|
// Retry with exponential backoff to get peer status from this server
|
||||||
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
|
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
|
||||||
if err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version,
|
if err := s.connPool.RPC(s.config.Datacenter, server.Addr.String(), server.Version,
|
||||||
"Status.Peers", &struct{}{}, &peers); err != nil {
|
"Status.Peers", &struct{}{}, &peers); err != nil {
|
||||||
nextRetry := time.Duration((1 << attempt) * peerRetryBase)
|
nextRetry := time.Duration((1 << attempt) * peerRetryBase)
|
||||||
s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+
|
s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/snapshot"
|
"github.com/hashicorp/consul/snapshot"
|
||||||
@ -33,7 +34,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil, structs.ErrNoDCPath
|
return nil, structs.ErrNoDCPath
|
||||||
}
|
}
|
||||||
return SnapshotRPC(s.connPool, dc, server.Addr, args, in, reply)
|
return SnapshotRPC(s.connPool, dc, server.Addr.String(), args, in, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform leader forwarding if required.
|
// Perform leader forwarding if required.
|
||||||
@ -42,7 +43,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
|
|||||||
if server == nil {
|
if server == nil {
|
||||||
return nil, structs.ErrNoLeader
|
return nil, structs.ErrNoLeader
|
||||||
}
|
}
|
||||||
return SnapshotRPC(s.connPool, args.Datacenter, server.Addr, args, in, reply)
|
return SnapshotRPC(s.connPool, args.Datacenter, server.Addr.String(), args, in, reply)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -152,10 +153,10 @@ RESPOND:
|
|||||||
// the streaming output (for a snapshot). If the reply contains an error, this
|
// the streaming output (for a snapshot). If the reply contains an error, this
|
||||||
// will always return an error as well, so you don't need to check the error
|
// will always return an error as well, so you don't need to check the error
|
||||||
// inside the filled-in reply.
|
// inside the filled-in reply.
|
||||||
func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr,
|
func SnapshotRPC(pool *ConnPool, dc string, addr string,
|
||||||
args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) {
|
args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) {
|
||||||
|
|
||||||
conn, hc, err := pool.Dial(dc, addr)
|
conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user