diff --git a/consul/client.go b/consul/client.go index caf2979d9a..0c93f0f0c5 100644 --- a/consul/client.go +++ b/consul/client.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "sync" + "time" ) // Interface is used to provide either a Client or Server, @@ -66,7 +67,7 @@ func NewClient(config *Config) (*Client, error) { // Create server c := &Client{ config: config, - connPool: NewPool(1), + connPool: NewPool(3, 30*time.Second), eventCh: make(chan serf.Event, 256), logger: logger, shutdownCh: make(chan struct{}), diff --git a/consul/pool.go b/consul/pool.go index 1cc0c5326a..3a4ae502a8 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -10,9 +10,10 @@ import ( // Conn is a pooled connection to a Consul server type Conn struct { - addr net.Addr - conn *net.TCPConn - client *rpc.Client + addr net.Addr + conn *net.TCPConn + client *rpc.Client + lastUsed time.Time } func (c *Conn) Close() error { @@ -30,6 +31,9 @@ type ConnPool struct { // The maximum connectsion to maintain per server maxConns int + // The maximum time to keep a connection open + maxTime time.Duration + // Pool maps an address to a list of connections pool map[string][]*Conn @@ -38,12 +42,17 @@ type ConnPool struct { } // NewPool is used to make a new connection pool -// Maintain at most maxConns per host -func NewPool(maxConns int) *ConnPool { +// Maintain at most maxConns per host, for up to maxTime. +// Set maxTime to 0 to disable reaping. +func NewPool(maxConns int, maxTime time.Duration) *ConnPool { pool := &ConnPool{ maxConns: maxConns, + maxTime: maxTime, pool: make(map[string][]*Conn), } + if maxTime > 0 { + go pool.reap() + } return pool } @@ -130,6 +139,9 @@ func (p *ConnPool) Return(conn *Conn) { p.Lock() defer p.Unlock() + // Set the last used time + conn.lastUsed = time.Now() + // Look for existing connections conns := p.pool[conn.addr.String()] @@ -169,3 +181,35 @@ func (p *ConnPool) RPC(addr net.Addr, method string, args interface{}, reply int } return err } + +// Reap is used to close conns open over maxTime +func (p *ConnPool) reap() { + for !p.shutdown { + // Sleep for a while + time.Sleep(time.Second) + + // Reap all old conns + p.Lock() + now := time.Now() + for host, conns := range p.pool { + n := len(conns) + for i := 0; i < n; i++ { + // Skip new connections + conn := conns[i] + if now.Sub(conn.lastUsed) < p.maxTime { + continue + } + + // Close the conn + conn.Close() + + // Remove from pool + conns[i], conns[n-1] = conns[n-1], nil + conns = conns[:n-1] + p.pool[host] = conns + n-- + } + } + p.Unlock() + } +} diff --git a/consul/server.go b/consul/server.go index b8a870de87..cb30d2335a 100644 --- a/consul/server.go +++ b/consul/server.go @@ -95,7 +95,7 @@ func NewServer(config *Config) (*Server, error) { // Create server s := &Server{ config: config, - connPool: NewPool(5), + connPool: NewPool(5, 0), eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), logger: logger,