Merge pull request #7966 from hashicorp/pool_improvements

Agent connection pool cleanup
This commit is contained in:
Hans Hasselberg 2020-06-04 08:56:26 +02:00 committed by hashicorp-ci
parent 476a50769e
commit de3e68c577
15 changed files with 65 additions and 115 deletions

View File

@ -109,7 +109,7 @@ func (c *Client) RequestAutoEncryptCerts(servers []string, port int, token strin
for _, ip := range ips { for _, ip := range ips {
addr := net.TCPAddr{IP: ip, Port: port} addr := net.TCPAddr{IP: ip, Port: port}
if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, 0, "AutoEncrypt.Sign", &args, &reply); err == nil { if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, "AutoEncrypt.Sign", &args, &reply); err == nil {
return &reply, pkPEM, nil return &reply, pkPEM, nil
} else { } else {
c.logger.Warn("AutoEncrypt failed", "error", err) c.logger.Warn("AutoEncrypt failed", "error", err)

View File

@ -137,7 +137,6 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat
MaxTime: clientRPCConnMaxIdle, MaxTime: clientRPCConnMaxIdle,
MaxStreams: clientMaxStreams, MaxStreams: clientMaxStreams,
TLSConfigurator: tlsConfigurator, TLSConfigurator: tlsConfigurator,
ForceTLS: config.VerifyOutgoing,
Datacenter: config.Datacenter, Datacenter: config.Datacenter,
} }
@ -310,7 +309,7 @@ TRY:
} }
// Make the request. // Make the request.
rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, server.Version, method, args, reply) rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply)
if rpcErr == nil { if rpcErr == nil {
return nil return nil
} }
@ -358,7 +357,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
// Request the operation. // Request the operation.
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, server.UseTLS, args, in, &reply) snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, args, in, &reply)
if err != nil { if err != nil {
return err return err
} }

View File

@ -423,7 +423,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
for range servers { for range servers {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
s := c.routers.FindServer() s := c.routers.FindServer()
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr, s.Version) ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr)
if !ok { if !ok {
t.Errorf("Unable to ping server %v: %s", s.String(), err) t.Errorf("Unable to ping server %v: %s", s.String(), err)
} }

View File

@ -552,7 +552,7 @@ CHECK_LEADER:
rpcErr := structs.ErrNoLeader rpcErr := structs.ErrNoLeader
if leader != nil { if leader != nil {
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr, rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
leader.Version, method, args, reply) method, args, reply)
if rpcErr != nil && canRetry(info, rpcErr) { if rpcErr != nil && canRetry(info, rpcErr) {
goto RETRY goto RETRY
} }
@ -617,7 +617,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1, metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1,
[]metrics.Label{{Name: "datacenter", Value: dc}}) []metrics.Label{{Name: "datacenter", Value: dc}})
if err := s.connPool.RPC(dc, server.ShortName, server.Addr, server.Version, method, args, reply); err != nil { if err := s.connPool.RPC(dc, server.ShortName, server.Addr, method, args, reply); err != nil {
manager.NotifyFailedServer(server) manager.NotifyFailedServer(server)
s.rpcLogger().Error("RPC failed to server in DC", s.rpcLogger().Error("RPC failed to server in DC",
"server", server.Addr, "server", server.Addr,

View File

@ -382,7 +382,6 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token
MaxTime: serverRPCCache, MaxTime: serverRPCCache,
MaxStreams: serverMaxStreams, MaxStreams: serverMaxStreams,
TLSConfigurator: tlsConfigurator, TLSConfigurator: tlsConfigurator,
ForceTLS: config.VerifyOutgoing,
Datacenter: config.Datacenter, Datacenter: config.Datacenter,
} }

View File

@ -358,7 +358,7 @@ func (s *Server) maybeBootstrap() {
// Retry with exponential backoff to get peer status from this server // Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ { for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr, server.Version, if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr,
"Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil { "Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil {
nextRetry := (1 << attempt) * time.Second nextRetry := (1 << attempt) * time.Second
s.logger.Error("Failed to confirm peer status for server (will retry).", s.logger.Error("Failed to confirm peer status for server (will retry).",

View File

@ -1276,7 +1276,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
if leader == nil { if leader == nil {
t.Fatal("no leader") t.Fatal("no leader")
} }
return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr, leader.Version) return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr)
} }
func TestServer_TLSToNoTLS(t *testing.T) { func TestServer_TLSToNoTLS(t *testing.T) {

View File

@ -37,7 +37,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
return nil, structs.ErrNoDCPath return nil, structs.ErrNoDCPath
} }
snap, err := SnapshotRPC(s.connPool, dc, server.ShortName, server.Addr, server.UseTLS, args, in, reply) snap, err := SnapshotRPC(s.connPool, dc, server.ShortName, server.Addr, args, in, reply)
if err != nil { if err != nil {
manager.NotifyFailedServer(server) manager.NotifyFailedServer(server)
return nil, err return nil, err
@ -52,7 +52,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
if server == nil { if server == nil {
return nil, structs.ErrNoLeader return nil, structs.ErrNoLeader
} }
return SnapshotRPC(s.connPool, args.Datacenter, server.ShortName, server.Addr, server.UseTLS, args, in, reply) return SnapshotRPC(s.connPool, args.Datacenter, server.ShortName, server.Addr, args, in, reply)
} }
} }
@ -194,14 +194,13 @@ func SnapshotRPC(
dc string, dc string,
nodeName string, nodeName string,
addr net.Addr, addr net.Addr,
useTLS bool,
args *structs.SnapshotRequest, args *structs.SnapshotRequest,
in io.Reader, in io.Reader,
reply *structs.SnapshotResponse, reply *structs.SnapshotResponse,
) (io.ReadCloser, error) { ) (io.ReadCloser, error) {
// Write the snapshot RPC byte to set the mode, then perform the // Write the snapshot RPC byte to set the mode, then perform the
// request. // request.
conn, hc, err := connPool.DialTimeout(dc, nodeName, addr, 10*time.Second, useTLS, pool.RPCSnapshot) conn, hc, err := connPool.DialTimeout(dc, nodeName, addr, pool.RPCSnapshot)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -46,7 +46,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false, snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -121,7 +121,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) {
// Restore the snapshot. // Restore the snapshot.
args.Op = structs.SnapshotRestore args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false, restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, snap, &reply) &args, snap, &reply)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -196,7 +196,7 @@ func TestSnapshot_LeaderState(t *testing.T) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false, snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -229,7 +229,7 @@ func TestSnapshot_LeaderState(t *testing.T) {
// Restore the snapshot. // Restore the snapshot.
args.Op = structs.SnapshotRestore args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false, restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, snap, &reply) &args, snap, &reply)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -268,7 +268,7 @@ func TestSnapshot_ACLDeny(t *testing.T) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false, _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if !acl.IsErrPermissionDenied(err) { if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -282,7 +282,7 @@ func TestSnapshot_ACLDeny(t *testing.T) {
Op: structs.SnapshotRestore, Op: structs.SnapshotRestore,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false, _, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if !acl.IsErrPermissionDenied(err) { if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -391,7 +391,7 @@ func TestSnapshot_AllowStale(t *testing.T) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false, _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) { if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -408,7 +408,7 @@ func TestSnapshot_AllowStale(t *testing.T) {
Op: structs.SnapshotSave, Op: structs.SnapshotSave,
} }
var reply structs.SnapshotResponse var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false, _, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply) &args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") { if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)

View File

@ -43,7 +43,7 @@ func NewStatsFetcher(logger hclog.Logger, pool *pool.ConnPool, datacenter string
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) { func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) {
var args struct{} var args struct{}
var reply autopilot.ServerStats var reply autopilot.ServerStats
err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, server.Version, "Status.RaftStats", &args, &reply) err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, "Status.RaftStats", &args, &reply)
if err != nil { if err != nil {
f.logger.Warn("error getting server health from server", f.logger.Warn("error getting server health from server",
"server", server.Name, "server", server.Name,

View File

@ -37,20 +37,25 @@ func insecureRPCClient(s *Server, c tlsutil.Config) (rpc.ClientCodec, error) {
if wrapper == nil { if wrapper == nil {
return nil, err return nil, err
} }
conn, _, err := pool.DialTimeoutWithRPCTypeDirectly( d := &net.Dialer{Timeout: time.Second}
s.config.Datacenter, conn, err := d.Dial("tcp", addr.String())
s.config.NodeName,
addr,
nil,
time.Second,
true,
wrapper,
pool.RPCTLSInsecure,
pool.RPCTLSInsecure,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Switch the connection into TLS mode
if _, err = conn.Write([]byte{byte(pool.RPCTLSInsecure)}); err != nil {
conn.Close()
return nil, err
}
// Wrap the connection in a TLS client
tlsConn, err := wrapper(s.config.Datacenter, conn)
if err != nil {
conn.Close()
return nil, err
}
conn = tlsConn
return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle), nil return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle), nil
} }

View File

@ -46,7 +46,6 @@ type Conn struct {
addr net.Addr addr net.Addr
session muxSession session muxSession
lastUsed time.Time lastUsed time.Time
version int
pool *ConnPool pool *ConnPool
@ -147,9 +146,6 @@ type ConnPool struct {
// Datacenter is the datacenter of the current agent. // Datacenter is the datacenter of the current agent.
Datacenter string Datacenter string
// ForceTLS is used to enforce outgoing TLS verification
ForceTLS bool
// Server should be set to true if this connection pool is configured in a // Server should be set to true if this connection pool is configured in a
// server instead of a client. // server instead of a client.
Server bool Server bool
@ -209,7 +205,7 @@ func (p *ConnPool) Shutdown() error {
// wait for an existing connection attempt to finish, if one if in progress, // wait for an existing connection attempt to finish, if one if in progress,
// and will return that one if it succeeds. If all else fails, it will return a // and will return that one if it succeeds. If all else fails, it will return a
// newly-created connection and add it to the pool. // newly-created connection and add it to the pool.
func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, error) { func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr) (*Conn, error) {
if nodeName == "" { if nodeName == "" {
return nil, fmt.Errorf("pool: ConnPool.acquire requires a node name") return nil, fmt.Errorf("pool: ConnPool.acquire requires a node name")
} }
@ -244,7 +240,7 @@ func (p *ConnPool) acquire(dc string, nodeName string, addr net.Addr, version in
// If we are the lead thread, make the new connection and then wake // If we are the lead thread, make the new connection and then wake
// everybody else up to see if we got it. // everybody else up to see if we got it.
if isLeadThread { if isLeadThread {
c, err := p.getNewConn(dc, nodeName, addr, version, useTLS) c, err := p.getNewConn(dc, nodeName, addr)
p.Lock() p.Lock()
delete(p.limiter, addrStr) delete(p.limiter, addrStr)
close(wait) close(wait)
@ -290,8 +286,6 @@ func (p *ConnPool) DialTimeout(
dc string, dc string,
nodeName string, nodeName string,
addr net.Addr, addr net.Addr,
timeout time.Duration,
useTLS bool,
actualRPCType RPCType, actualRPCType RPCType,
) (net.Conn, HalfCloser, error) { ) (net.Conn, HalfCloser, error) {
p.once.Do(p.init) p.once.Do(p.init)
@ -303,7 +297,6 @@ func (p *ConnPool) DialTimeout(
nodeName, nodeName,
addr, addr,
p.SrcAddr, p.SrcAddr,
timeout,
p.TLSConfigurator.OutgoingALPNRPCWrapper(), p.TLSConfigurator.OutgoingALPNRPCWrapper(),
actualRPCType, actualRPCType,
RPCTLS, RPCTLS,
@ -315,64 +308,24 @@ func (p *ConnPool) DialTimeout(
) )
} }
return DialTimeoutWithRPCTypeDirectly( return p.dial(
dc, dc,
nodeName, nodeName,
addr, addr,
p.SrcAddr,
timeout,
useTLS || p.ForceTLS,
p.TLSConfigurator.OutgoingRPCWrapper(),
actualRPCType, actualRPCType,
RPCTLS, RPCTLS,
) )
} }
// DialTimeoutInsecure is used to establish a raw connection to the given func (p *ConnPool) dial(
// server, with given connection timeout. It also writes RPCTLSInsecure as the
// first byte to indicate that the client cannot provide a certificate. This is
// so far only used for AutoEncrypt.Sign.
func (p *ConnPool) DialTimeoutInsecure(
dc string, dc string,
nodeName string, nodeName string,
addr net.Addr, addr net.Addr,
timeout time.Duration,
wrapper tlsutil.DCWrapper,
) (net.Conn, HalfCloser, error) {
p.once.Do(p.init)
if wrapper == nil {
return nil, nil, fmt.Errorf("wrapper cannot be nil")
} else if dc != p.Datacenter {
return nil, nil, fmt.Errorf("insecure dialing prohibited between datacenters")
}
return DialTimeoutWithRPCTypeDirectly(
dc,
nodeName,
addr,
p.SrcAddr,
timeout,
true,
wrapper,
RPCTLSInsecure,
RPCTLSInsecure,
)
}
func DialTimeoutWithRPCTypeDirectly(
dc string,
nodeName string,
addr net.Addr,
src *net.TCPAddr,
timeout time.Duration,
useTLS bool,
wrapper tlsutil.DCWrapper,
actualRPCType RPCType, actualRPCType RPCType,
tlsRPCType RPCType, tlsRPCType RPCType,
) (net.Conn, HalfCloser, error) { ) (net.Conn, HalfCloser, error) {
// Try to dial the conn // Try to dial the conn
d := &net.Dialer{LocalAddr: src, Timeout: timeout} d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: defaultDialTimeout}
conn, err := d.Dial("tcp", addr.String()) conn, err := d.Dial("tcp", addr.String())
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -389,7 +342,8 @@ func DialTimeoutWithRPCTypeDirectly(
} }
// Check if TLS is enabled // Check if TLS is enabled
if useTLS && wrapper != nil { if p.TLSConfigurator.UseTLS(dc) {
wrapper := p.TLSConfigurator.OutgoingRPCWrapper()
// Switch the connection into TLS mode // Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(tlsRPCType)}); err != nil { if _, err := conn.Write([]byte{byte(tlsRPCType)}); err != nil {
conn.Close() conn.Close()
@ -435,7 +389,6 @@ func DialTimeoutWithRPCTypeViaMeshGateway(
nodeName string, nodeName string,
addr net.Addr, addr net.Addr,
src *net.TCPAddr, src *net.TCPAddr,
timeout time.Duration,
wrapper tlsutil.ALPNWrapper, wrapper tlsutil.ALPNWrapper,
actualRPCType RPCType, actualRPCType RPCType,
tlsRPCType RPCType, tlsRPCType RPCType,
@ -467,7 +420,7 @@ func DialTimeoutWithRPCTypeViaMeshGateway(
return nil, nil, structs.ErrDCNotAvailable return nil, nil, structs.ErrDCNotAvailable
} }
dialer := &net.Dialer{LocalAddr: src, Timeout: timeout} dialer := &net.Dialer{LocalAddr: src, Timeout: defaultDialTimeout}
rawConn, err := dialer.Dial("tcp", gwAddr) rawConn, err := dialer.Dial("tcp", gwAddr)
if err != nil { if err != nil {
@ -497,19 +450,13 @@ func DialTimeoutWithRPCTypeViaMeshGateway(
} }
// getNewConn is used to return a new connection // getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, error) { func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr) (*Conn, error) {
if nodeName == "" { if nodeName == "" {
return nil, fmt.Errorf("pool: ConnPool.getNewConn requires a node name") return nil, fmt.Errorf("pool: ConnPool.getNewConn requires a node name")
} }
// Switch the multiplexing based on version
var session muxSession
if version < 2 {
return nil, fmt.Errorf("cannot make client connection, unsupported protocol version %d", version)
}
// Get a new, raw connection and write the Consul multiplex byte to set the mode // Get a new, raw connection and write the Consul multiplex byte to set the mode
conn, _, err := p.DialTimeout(dc, nodeName, addr, defaultDialTimeout, useTLS, RPCMultiplexV2) conn, _, err := p.DialTimeout(dc, nodeName, addr, RPCMultiplexV2)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -519,7 +466,7 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version
conf.LogOutput = p.LogOutput conf.LogOutput = p.LogOutput
// Create a multiplexed session // Create a multiplexed session
session, _ = yamux.Client(conn, conf) session, _ := yamux.Client(conn, conf)
// Wrap the connection // Wrap the connection
c := &Conn{ c := &Conn{
@ -529,7 +476,6 @@ func (p *ConnPool) getNewConn(dc string, nodeName string, addr net.Addr, version
session: session, session: session,
clients: list.New(), clients: list.New(),
lastUsed: time.Now(), lastUsed: time.Now(),
version: version,
pool: p, pool: p,
} }
return c, nil return c, nil
@ -567,12 +513,12 @@ func (p *ConnPool) releaseConn(conn *Conn) {
} }
} }
// getClient is used to get a usable client for an address and protocol version // getClient is used to get a usable client for an address
func (p *ConnPool) getClient(dc string, nodeName string, addr net.Addr, version int, useTLS bool) (*Conn, *StreamClient, error) { func (p *ConnPool) getClient(dc string, nodeName string, addr net.Addr) (*Conn, *StreamClient, error) {
retries := 0 retries := 0
START: START:
// Try to get a conn first // Try to get a conn first
conn, err := p.acquire(dc, nodeName, addr, version, useTLS) conn, err := p.acquire(dc, nodeName, addr)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to get conn: %v", err) return nil, nil, fmt.Errorf("failed to get conn: %v", err)
} }
@ -598,7 +544,6 @@ func (p *ConnPool) RPC(
dc string, dc string,
nodeName string, nodeName string,
addr net.Addr, addr net.Addr,
version int,
method string, method string,
args interface{}, args interface{},
reply interface{}, reply interface{},
@ -610,7 +555,7 @@ func (p *ConnPool) RPC(
if method == "AutoEncrypt.Sign" { if method == "AutoEncrypt.Sign" {
return p.rpcInsecure(dc, nodeName, addr, method, args, reply) return p.rpcInsecure(dc, nodeName, addr, method, args, reply)
} else { } else {
return p.rpc(dc, nodeName, addr, version, method, args, reply) return p.rpc(dc, nodeName, addr, method, args, reply)
} }
} }
@ -620,8 +565,12 @@ func (p *ConnPool) RPC(
// AutoEncrypt.Sign is a one-off call and it doesn't make sense to pool that // AutoEncrypt.Sign is a one-off call and it doesn't make sense to pool that
// connection if it is not being reused. // connection if it is not being reused.
func (p *ConnPool) rpcInsecure(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error { func (p *ConnPool) rpcInsecure(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error {
if dc != p.Datacenter {
return fmt.Errorf("insecure dialing prohibited between datacenters")
}
var codec rpc.ClientCodec var codec rpc.ClientCodec
conn, _, err := p.DialTimeoutInsecure(dc, nodeName, addr, 1*time.Second, p.TLSConfigurator.OutgoingRPCWrapper()) conn, _, err := p.dial(dc, nodeName, addr, 0, RPCTLSInsecure)
if err != nil { if err != nil {
return fmt.Errorf("rpcinsecure error establishing connection: %v", err) return fmt.Errorf("rpcinsecure error establishing connection: %v", err)
} }
@ -636,12 +585,11 @@ func (p *ConnPool) rpcInsecure(dc string, nodeName string, addr net.Addr, method
return nil return nil
} }
func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error { func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error {
p.once.Do(p.init) p.once.Do(p.init)
// Get a usable client // Get a usable client
useTLS := p.TLSConfigurator.UseTLS(dc) conn, sc, err := p.getClient(dc, nodeName, addr)
conn, sc, err := p.getClient(dc, nodeName, addr, version, useTLS)
if err != nil { if err != nil {
return fmt.Errorf("rpc error getting client: %v", err) return fmt.Errorf("rpc error getting client: %v", err)
} }
@ -671,9 +619,9 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, version int, m
// Ping sends a Status.Ping message to the specified server and // Ping sends a Status.Ping message to the specified server and
// returns true if healthy, false if an error occurred // returns true if healthy, false if an error occurred
func (p *ConnPool) Ping(dc string, nodeName string, addr net.Addr, version int) (bool, error) { func (p *ConnPool) Ping(dc string, nodeName string, addr net.Addr) (bool, error) {
var out struct{} var out struct{}
err := p.RPC(dc, nodeName, addr, version, "Status.Ping", struct{}{}, &out) err := p.RPC(dc, nodeName, addr, "Status.Ping", struct{}{}, &out)
return err == nil, err return err == nil, err
} }

View File

@ -61,7 +61,7 @@ type ManagerSerfCluster interface {
// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import // Pinger is an interface wrapping client.ConnPool to prevent a cyclic import
// dependency. // dependency.
type Pinger interface { type Pinger interface {
Ping(dc, nodeName string, addr net.Addr, version int) (bool, error) Ping(dc, nodeName string, addr net.Addr) (bool, error)
} }
// serverList is a local copy of the struct used to maintain the list of // serverList is a local copy of the struct used to maintain the list of
@ -354,7 +354,7 @@ func (m *Manager) RebalanceServers() {
if m.serverName != "" && srv.Name == m.serverName { if m.serverName != "" && srv.Name == m.serverName {
continue continue
} }
ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.ShortName, srv.Addr, srv.Version) ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.ShortName, srv.Addr)
if ok { if ok {
foundHealthyServer = true foundHealthyServer = true
break break

View File

@ -33,7 +33,7 @@ type fauxConnPool struct {
failPct float64 failPct float64
} }
func (cp *fauxConnPool) Ping(string, string, net.Addr, int) (bool, error) { func (cp *fauxConnPool) Ping(string, string, net.Addr) (bool, error) {
var success bool var success bool
successProb := rand.Float64() successProb := rand.Float64()
if successProb > cp.failPct { if successProb > cp.failPct {
@ -179,7 +179,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
// failPct of the servers for the reconcile. This // failPct of the servers for the reconcile. This
// allows for the selected server to no longer be // allows for the selected server to no longer be
// healthy for the reconcile below. // healthy for the reconcile below.
if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.ShortName, node.Addr, node.Version); ok { if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.ShortName, node.Addr); ok {
// Will still be present // Will still be present
healthyServers = append(healthyServers, node) healthyServers = append(healthyServers, node)
} else { } else {

View File

@ -32,7 +32,7 @@ type fauxConnPool struct {
failAddr net.Addr failAddr net.Addr
} }
func (cp *fauxConnPool) Ping(dc string, nodeName string, addr net.Addr, version int) (bool, error) { func (cp *fauxConnPool) Ping(dc string, nodeName string, addr net.Addr) (bool, error) {
var success bool var success bool
successProb := rand.Float64() successProb := rand.Float64()