mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
agent: move conn pool for muxed connections into separate pkg
This commit is contained in:
parent
80971c8a85
commit
82a132da60
@ -14,6 +14,7 @@ import (
|
||||
"github.com/hashicorp/consul/agent/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/consul/servers"
|
||||
"github.com/hashicorp/consul/agent/consul/structs"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
@ -49,7 +50,7 @@ type Client struct {
|
||||
config *Config
|
||||
|
||||
// Connection pool to consul servers
|
||||
connPool *ConnPool
|
||||
connPool *pool.ConnPool
|
||||
|
||||
// servers is responsible for the selection and maintenance of
|
||||
// Consul servers this agent uses for RPC requests
|
||||
@ -109,10 +110,19 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
|
||||
logger = log.New(config.LogOutput, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
connPool := &pool.ConnPool{
|
||||
SrcAddr: config.RPCSrcAddr,
|
||||
LogOutput: config.LogOutput,
|
||||
MaxTime: clientRPCConnMaxIdle,
|
||||
MaxStreams: clientMaxStreams,
|
||||
TLSWrapper: tlsWrap,
|
||||
ForceTLS: config.VerifyOutgoing,
|
||||
}
|
||||
|
||||
// Create server
|
||||
c := &Client{
|
||||
config: config,
|
||||
connPool: NewPool(config.RPCSrcAddr, config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap, config.VerifyOutgoing),
|
||||
connPool: connPool,
|
||||
eventCh: make(chan serf.Event, serfEventBacklog),
|
||||
logger: logger,
|
||||
shutdownCh: make(chan struct{}),
|
||||
|
@ -265,7 +265,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
||||
for range servers {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
s := c.servers.FindServer()
|
||||
ok, err := c.connPool.PingConsulServer(s)
|
||||
ok, err := c.connPool.Ping(s.Datacenter, s.Addr, s.Version, s.UseTLS)
|
||||
if !ok {
|
||||
t.Errorf("Unable to ping server %v: %s", s.String(), err)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
@ -100,7 +101,7 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net
|
||||
// Check for tls mode
|
||||
if l.tlsFunc(address) && l.tlsWrap != nil {
|
||||
// Switch the connection into TLS mode
|
||||
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
|
||||
if _, err := conn.Write([]byte{byte(pool.RPCTLS)}); err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
@ -113,7 +114,7 @@ func (l *RaftLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net
|
||||
}
|
||||
|
||||
// Write the Raft byte to set the mode
|
||||
_, err = conn.Write([]byte{byte(rpcRaft)})
|
||||
_, err = conn.Write([]byte{byte(pool.RPCRaft)})
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/hashicorp/consul/agent/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/structs"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/memberlist"
|
||||
@ -19,18 +20,6 @@ import (
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
|
||||
type RPCType byte
|
||||
|
||||
const (
|
||||
rpcConsul RPCType = iota
|
||||
rpcRaft
|
||||
rpcMultiplex // Old Muxado byte, no longer supported.
|
||||
rpcTLS
|
||||
rpcMultiplexV2
|
||||
rpcSnapshot
|
||||
rpcGossip
|
||||
)
|
||||
|
||||
const (
|
||||
// maxQueryTime is used to bound the limit of a blocking query
|
||||
maxQueryTime = 600 * time.Second
|
||||
@ -92,24 +81,25 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
typ := pool.RPCType(buf[0])
|
||||
|
||||
// Enforce TLS if VerifyIncoming is set
|
||||
if s.config.VerifyIncoming && !isTLS && RPCType(buf[0]) != rpcTLS {
|
||||
if s.config.VerifyIncoming && !isTLS && typ != pool.RPCTLS {
|
||||
s.logger.Printf("[WARN] consul.rpc: Non-TLS connection attempted with VerifyIncoming set %s", logConn(conn))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Switch on the byte
|
||||
switch RPCType(buf[0]) {
|
||||
case rpcConsul:
|
||||
switch typ {
|
||||
case pool.RPCConsul:
|
||||
s.handleConsulConn(conn)
|
||||
|
||||
case rpcRaft:
|
||||
case pool.RPCRaft:
|
||||
metrics.IncrCounter([]string{"consul", "rpc", "raft_handoff"}, 1)
|
||||
s.raftLayer.Handoff(conn)
|
||||
|
||||
case rpcTLS:
|
||||
case pool.RPCTLS:
|
||||
if s.rpcTLS == nil {
|
||||
s.logger.Printf("[WARN] consul.rpc: TLS connection attempted, server not configured for TLS %s", logConn(conn))
|
||||
conn.Close()
|
||||
@ -118,14 +108,14 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
|
||||
conn = tls.Server(conn, s.rpcTLS)
|
||||
s.handleConn(conn, true)
|
||||
|
||||
case rpcMultiplexV2:
|
||||
case pool.RPCMultiplexV2:
|
||||
s.handleMultiplexV2(conn)
|
||||
|
||||
case rpcSnapshot:
|
||||
case pool.RPCSnapshot:
|
||||
s.handleSnapshotConn(conn)
|
||||
|
||||
default:
|
||||
s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", buf[0], logConn(conn))
|
||||
s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v %s", typ, logConn(conn))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"github.com/hashicorp/consul/agent/consul/servers"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/structs"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
@ -99,7 +100,7 @@ type Server struct {
|
||||
config *Config
|
||||
|
||||
// Connection pool to other consul servers
|
||||
connPool *ConnPool
|
||||
connPool *pool.ConnPool
|
||||
|
||||
// Endpoints holds our RPC endpoints
|
||||
endpoints endpoints
|
||||
@ -271,12 +272,21 @@ func NewServerLogger(config *Config, logger *log.Logger) (*Server, error) {
|
||||
// Create the shutdown channel - this is closed but never written to.
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
connPool := &pool.ConnPool{
|
||||
SrcAddr: config.RPCSrcAddr,
|
||||
LogOutput: config.LogOutput,
|
||||
MaxTime: serverRPCCache,
|
||||
MaxStreams: serverMaxStreams,
|
||||
TLSWrapper: tlsWrap,
|
||||
ForceTLS: config.VerifyOutgoing,
|
||||
}
|
||||
|
||||
// Create server.
|
||||
s := &Server{
|
||||
autopilotRemoveDeadCh: make(chan struct{}),
|
||||
autopilotShutdownCh: make(chan struct{}),
|
||||
config: config,
|
||||
connPool: NewPool(config.RPCSrcAddr, config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap, config.VerifyOutgoing),
|
||||
connPool: connPool,
|
||||
eventChLAN: make(chan serf.Event, 256),
|
||||
eventChWAN: make(chan serf.Event, 256),
|
||||
localConsuls: make(map[raft.ServerAddress]*agent.Server),
|
||||
|
@ -593,7 +593,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
|
||||
if leader == nil {
|
||||
t.Fatal("no leader")
|
||||
}
|
||||
return s2.connPool.PingConsulServer(leader)
|
||||
return s2.connPool.Ping(leader.Datacenter, leader.Addr, leader.Version, leader.UseTLS)
|
||||
}
|
||||
|
||||
func TestServer_TLSToNoTLS(t *testing.T) {
|
||||
|
@ -8,6 +8,7 @@ package servers
|
||||
import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -59,7 +60,7 @@ type ManagerSerfCluster interface {
|
||||
// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import
|
||||
// dependency.
|
||||
type Pinger interface {
|
||||
PingConsulServer(s *agent.Server) (bool, error)
|
||||
Ping(dc string, addr net.Addr, version int, useTLS bool) (bool, error)
|
||||
}
|
||||
|
||||
// serverList is a local copy of the struct used to maintain the list of
|
||||
@ -306,14 +307,14 @@ func (m *Manager) RebalanceServers() {
|
||||
for i := 0; i < len(l.servers); i++ {
|
||||
// Always test the first server. Failed servers are cycled
|
||||
// while Serf detects the node has failed.
|
||||
selectedServer := l.servers[0]
|
||||
srv := l.servers[0]
|
||||
|
||||
ok, err := m.connPoolPinger.PingConsulServer(selectedServer)
|
||||
ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.Addr, srv.Version, srv.UseTLS)
|
||||
if ok {
|
||||
foundHealthyServer = true
|
||||
break
|
||||
}
|
||||
m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, selectedServer.String(), err)
|
||||
m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, srv, err)
|
||||
|
||||
l.cycleServer()
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
@ -31,7 +32,7 @@ type fauxConnPool struct {
|
||||
failPct float64
|
||||
}
|
||||
|
||||
func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) {
|
||||
func (cp *fauxConnPool) Ping(string, net.Addr, int, bool) (bool, error) {
|
||||
var success bool
|
||||
successProb := rand.Float64()
|
||||
if successProb > cp.failPct {
|
||||
@ -179,7 +180,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
||||
// failPct of the servers for the reconcile. This
|
||||
// allows for the selected server to no longer be
|
||||
// healthy for the reconcile below.
|
||||
if ok, _ := m.connPoolPinger.PingConsulServer(node); ok {
|
||||
if ok, _ := m.connPoolPinger.Ping(node.Datacenter, node.Addr, node.Version, node.UseTLS); ok {
|
||||
// Will still be present
|
||||
healthyServers = append(healthyServers, node)
|
||||
} else {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -17,7 +18,7 @@ type fauxConnPool struct {
|
||||
failPct float64
|
||||
}
|
||||
|
||||
func (cp *fauxConnPool) PingConsulServer(server *agent.Server) (bool, error) {
|
||||
func (cp *fauxConnPool) Ping(string, net.Addr, int, bool) (bool, error) {
|
||||
var success bool
|
||||
successProb := rand.Float64()
|
||||
if successProb > cp.failPct {
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/structs"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/snapshot"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
)
|
||||
@ -187,10 +188,10 @@ RESPOND:
|
||||
// the streaming output (for a snapshot). If the reply contains an error, this
|
||||
// will always return an error as well, so you don't need to check the error
|
||||
// inside the filled-in reply.
|
||||
func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, useTLS bool,
|
||||
func SnapshotRPC(connPool *pool.ConnPool, dc string, addr net.Addr, useTLS bool,
|
||||
args *structs.SnapshotRequest, in io.Reader, reply *structs.SnapshotResponse) (io.ReadCloser, error) {
|
||||
|
||||
conn, hc, err := pool.DialTimeout(dc, addr, 10*time.Second, useTLS)
|
||||
conn, hc, err := connPool.DialTimeout(dc, addr, 10*time.Second, useTLS)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -206,7 +207,7 @@ func SnapshotRPC(pool *ConnPool, dc string, addr net.Addr, useTLS bool,
|
||||
|
||||
// Write the snapshot RPC byte to set the mode, then perform the
|
||||
// request.
|
||||
if _, err := conn.Write([]byte{byte(rpcSnapshot)}); err != nil {
|
||||
if _, err := conn.Write([]byte{byte(pool.RPCSnapshot)}); err != nil {
|
||||
return nil, fmt.Errorf("failed to write stream type: %v", err)
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/consul/structs"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
)
|
||||
|
||||
// StatsFetcher has two functions for autopilot. First, lets us fetch all the
|
||||
@ -18,14 +19,14 @@ import (
|
||||
// as we run the health check fairly frequently.
|
||||
type StatsFetcher struct {
|
||||
logger *log.Logger
|
||||
pool *ConnPool
|
||||
pool *pool.ConnPool
|
||||
datacenter string
|
||||
inflight map[string]struct{}
|
||||
inflightLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewStatsFetcher returns a stats fetcher.
|
||||
func NewStatsFetcher(logger *log.Logger, pool *ConnPool, datacenter string) *StatsFetcher {
|
||||
func NewStatsFetcher(logger *log.Logger, pool *pool.ConnPool, datacenter string) *StatsFetcher {
|
||||
return &StatsFetcher{
|
||||
logger: logger,
|
||||
pool: pool,
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
)
|
||||
@ -19,7 +20,7 @@ func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
|
||||
}
|
||||
|
||||
// Write the Consul RPC byte to set the mode
|
||||
conn.Write([]byte{byte(rpcConsul)})
|
||||
conn.Write([]byte{byte(pool.RPCConsul)})
|
||||
return msgpackrpc.NewClientCodec(conn)
|
||||
}
|
||||
|
||||
|
15
agent/pool/conn.go
Normal file
15
agent/pool/conn.go
Normal file
@ -0,0 +1,15 @@
|
||||
package pool
|
||||
|
||||
type RPCType byte
|
||||
|
||||
const (
|
||||
// keep numbers unique.
|
||||
// iota depends on order
|
||||
RPCConsul RPCType = 0
|
||||
RPCRaft = 1
|
||||
RPCMultiplex = 2 // Old Muxado byte, no longer supported.
|
||||
RPCTLS = 3
|
||||
RPCMultiplexV2 = 4
|
||||
RPCSnapshot = 5
|
||||
RPCGossip = 6
|
||||
)
|
@ -1,4 +1,4 @@
|
||||
package consul
|
||||
package pool
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
@ -10,7 +10,6 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/agent"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/yamux"
|
||||
@ -90,7 +89,7 @@ func (c *Conn) getClient() (*StreamClient, error) {
|
||||
func (c *Conn) returnClient(client *StreamClient) {
|
||||
didSave := false
|
||||
c.clientLock.Lock()
|
||||
if c.clients.Len() < c.pool.maxStreams && atomic.LoadInt32(&c.shouldClose) == 0 {
|
||||
if c.clients.Len() < c.pool.MaxStreams && atomic.LoadInt32(&c.shouldClose) == 0 {
|
||||
c.clients.PushFront(client)
|
||||
didSave = true
|
||||
|
||||
@ -112,25 +111,34 @@ func (c *Conn) markForUse() {
|
||||
atomic.AddInt32(&c.refCount, 1)
|
||||
}
|
||||
|
||||
// ConnPool is used to maintain a connection pool to other
|
||||
// Consul servers. This is used to reduce the latency of
|
||||
// RPC requests between servers. It is only used to pool
|
||||
// connections in the rpcConsul mode. Raft connections
|
||||
// are pooled separately.
|
||||
// ConnPool is used to maintain a connection pool to other Consul
|
||||
// servers. This is used to reduce the latency of RPC requests between
|
||||
// servers. It is only used to pool connections in the rpcConsul mode.
|
||||
// Raft connections are pooled separately. Maintain at most one
|
||||
// connection per host, for up to MaxTime. When MaxTime connection
|
||||
// reaping is disabled. MaxStreams is used to control the number of idle
|
||||
// streams allowed. If TLS settings are provided outgoing connections
|
||||
// use TLS.
|
||||
type ConnPool struct {
|
||||
sync.Mutex
|
||||
|
||||
// src is the source address for outgoing connections.
|
||||
src *net.TCPAddr
|
||||
// SrcAddr is the source address for outgoing connections.
|
||||
SrcAddr *net.TCPAddr
|
||||
|
||||
// LogOutput is used to control logging
|
||||
logOutput io.Writer
|
||||
LogOutput io.Writer
|
||||
|
||||
// The maximum time to keep a connection open
|
||||
maxTime time.Duration
|
||||
MaxTime time.Duration
|
||||
|
||||
// The maximum number of open streams to keep
|
||||
maxStreams int
|
||||
MaxStreams int
|
||||
|
||||
// TLS wrapper
|
||||
TLSWrapper tlsutil.DCWrapper
|
||||
|
||||
// ForceTLS is used to enforce outgoing TLS verification
|
||||
ForceTLS bool
|
||||
|
||||
sync.Mutex
|
||||
|
||||
// pool maps an address to a open connection
|
||||
pool map[string]*Conn
|
||||
@ -141,42 +149,30 @@ type ConnPool struct {
|
||||
// on to close.
|
||||
limiter map[string]chan struct{}
|
||||
|
||||
// TLS wrapper
|
||||
tlsWrap tlsutil.DCWrapper
|
||||
|
||||
// forceTLS is used to enforce outgoing TLS verification
|
||||
forceTLS bool
|
||||
|
||||
// Used to indicate the pool is shutdown
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
|
||||
// once initializes the internal data structures and connection
|
||||
// reaping on first use.
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// NewPool is used to make a new connection pool
|
||||
// Maintain at most one connection per host, for up to maxTime.
|
||||
// Set maxTime to 0 to disable reaping. maxStreams is used to control
|
||||
// the number of idle streams allowed.
|
||||
// If TLS settings are provided outgoing connections use TLS.
|
||||
func NewPool(src *net.TCPAddr, logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap tlsutil.DCWrapper, forceTLS bool) *ConnPool {
|
||||
pool := &ConnPool{
|
||||
src: src,
|
||||
logOutput: logOutput,
|
||||
maxTime: maxTime,
|
||||
maxStreams: maxStreams,
|
||||
pool: make(map[string]*Conn),
|
||||
limiter: make(map[string]chan struct{}),
|
||||
tlsWrap: tlsWrap,
|
||||
forceTLS: forceTLS,
|
||||
shutdownCh: make(chan struct{}),
|
||||
// init configures the initial data structures. It should be called
|
||||
// by p.once.Do(p.init) in all public methods.
|
||||
func (p *ConnPool) init() {
|
||||
p.pool = make(map[string]*Conn)
|
||||
p.limiter = make(map[string]chan struct{})
|
||||
p.shutdownCh = make(chan struct{})
|
||||
if p.MaxTime > 0 {
|
||||
go p.reap()
|
||||
}
|
||||
if maxTime > 0 {
|
||||
go pool.reap()
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
// Shutdown is used to close the connection pool
|
||||
func (p *ConnPool) Shutdown() error {
|
||||
p.once.Do(p.init)
|
||||
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
@ -272,8 +268,10 @@ type HalfCloser interface {
|
||||
// DialTimeout is used to establish a raw connection to the given server, with a
|
||||
// given connection timeout.
|
||||
func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration, useTLS bool) (net.Conn, HalfCloser, error) {
|
||||
p.once.Do(p.init)
|
||||
|
||||
// Try to dial the conn
|
||||
d := &net.Dialer{LocalAddr: p.src, Timeout: timeout}
|
||||
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: timeout}
|
||||
conn, err := d.Dial("tcp", addr.String())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@ -288,15 +286,15 @@ func (p *ConnPool) DialTimeout(dc string, addr net.Addr, timeout time.Duration,
|
||||
}
|
||||
|
||||
// Check if TLS is enabled
|
||||
if (useTLS || p.forceTLS) && p.tlsWrap != nil {
|
||||
if (useTLS || p.ForceTLS) && p.TLSWrapper != nil {
|
||||
// Switch the connection into TLS mode
|
||||
if _, err := conn.Write([]byte{byte(rpcTLS)}); err != nil {
|
||||
if _, err := conn.Write([]byte{byte(RPCTLS)}); err != nil {
|
||||
conn.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Wrap the connection in a TLS client
|
||||
tlsConn, err := p.tlsWrap(dc, conn)
|
||||
tlsConn, err := p.TLSWrapper(dc, conn)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, nil, err
|
||||
@ -323,14 +321,14 @@ func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int, useTLS bool
|
||||
}
|
||||
|
||||
// Write the Consul multiplex byte to set the mode
|
||||
if _, err := conn.Write([]byte{byte(rpcMultiplexV2)}); err != nil {
|
||||
if _, err := conn.Write([]byte{byte(RPCMultiplexV2)}); err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Setup the logger
|
||||
conf := yamux.DefaultConfig()
|
||||
conf.LogOutput = p.logOutput
|
||||
conf.LogOutput = p.LogOutput
|
||||
|
||||
// Create a multiplexed session
|
||||
session, _ = yamux.Client(conn, conf)
|
||||
@ -403,6 +401,8 @@ START:
|
||||
|
||||
// RPC is used to make an RPC call to a remote host
|
||||
func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, useTLS bool, args interface{}, reply interface{}) error {
|
||||
p.once.Do(p.init)
|
||||
|
||||
// Get a usable client
|
||||
conn, sc, err := p.getClient(dc, addr, version, useTLS)
|
||||
if err != nil {
|
||||
@ -423,28 +423,12 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use
|
||||
return nil
|
||||
}
|
||||
|
||||
// PingConsulServer sends a Status.Ping message to the specified server and
|
||||
// Ping sends a Status.Ping message to the specified server and
|
||||
// returns true if healthy, false if an error occurred
|
||||
func (p *ConnPool) PingConsulServer(s *agent.Server) (bool, error) {
|
||||
// Get a usable client
|
||||
conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version, s.UseTLS)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Make the RPC call
|
||||
func (p *ConnPool) Ping(dc string, addr net.Addr, version int, useTLS bool) (bool, error) {
|
||||
var out struct{}
|
||||
err = msgpackrpc.CallWithCodec(sc.codec, "Status.Ping", struct{}{}, &out)
|
||||
if err != nil {
|
||||
sc.Close()
|
||||
p.releaseConn(conn)
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Done with the connection
|
||||
conn.returnClient(sc)
|
||||
p.releaseConn(conn)
|
||||
return true, nil
|
||||
err := p.RPC(dc, addr, version, "Status.Ping", useTLS, struct{}{}, &out)
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
// Reap is used to close conns open over maxTime
|
||||
@ -463,7 +447,7 @@ func (p *ConnPool) reap() {
|
||||
now := time.Now()
|
||||
for host, conn := range p.pool {
|
||||
// Skip recently used connections
|
||||
if now.Sub(conn.lastUsed) < p.maxTime {
|
||||
if now.Sub(conn.lastUsed) < p.MaxTime {
|
||||
continue
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user