mirror of https://github.com/status-im/consul.git
Adding time based reaping to ConnPool
This commit is contained in:
parent
e20189e2c4
commit
862a838ff1
|
@ -10,6 +10,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Interface is used to provide either a Client or Server,
|
||||
|
@ -66,7 +67,7 @@ func NewClient(config *Config) (*Client, error) {
|
|||
// Create server
|
||||
c := &Client{
|
||||
config: config,
|
||||
connPool: NewPool(1),
|
||||
connPool: NewPool(3, 30*time.Second),
|
||||
eventCh: make(chan serf.Event, 256),
|
||||
logger: logger,
|
||||
shutdownCh: make(chan struct{}),
|
||||
|
|
|
@ -10,9 +10,10 @@ import (
|
|||
|
||||
// Conn is a pooled connection to a Consul server
|
||||
type Conn struct {
|
||||
addr net.Addr
|
||||
conn *net.TCPConn
|
||||
client *rpc.Client
|
||||
addr net.Addr
|
||||
conn *net.TCPConn
|
||||
client *rpc.Client
|
||||
lastUsed time.Time
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
|
@ -30,6 +31,9 @@ type ConnPool struct {
|
|||
// The maximum connectsion to maintain per server
|
||||
maxConns int
|
||||
|
||||
// The maximum time to keep a connection open
|
||||
maxTime time.Duration
|
||||
|
||||
// Pool maps an address to a list of connections
|
||||
pool map[string][]*Conn
|
||||
|
||||
|
@ -38,12 +42,17 @@ type ConnPool struct {
|
|||
}
|
||||
|
||||
// NewPool is used to make a new connection pool
|
||||
// Maintain at most maxConns per host
|
||||
func NewPool(maxConns int) *ConnPool {
|
||||
// Maintain at most maxConns per host, for up to maxTime.
|
||||
// Set maxTime to 0 to disable reaping.
|
||||
func NewPool(maxConns int, maxTime time.Duration) *ConnPool {
|
||||
pool := &ConnPool{
|
||||
maxConns: maxConns,
|
||||
maxTime: maxTime,
|
||||
pool: make(map[string][]*Conn),
|
||||
}
|
||||
if maxTime > 0 {
|
||||
go pool.reap()
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
|
@ -130,6 +139,9 @@ func (p *ConnPool) Return(conn *Conn) {
|
|||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
// Set the last used time
|
||||
conn.lastUsed = time.Now()
|
||||
|
||||
// Look for existing connections
|
||||
conns := p.pool[conn.addr.String()]
|
||||
|
||||
|
@ -169,3 +181,35 @@ func (p *ConnPool) RPC(addr net.Addr, method string, args interface{}, reply int
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Reap is used to close conns open over maxTime
|
||||
func (p *ConnPool) reap() {
|
||||
for !p.shutdown {
|
||||
// Sleep for a while
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Reap all old conns
|
||||
p.Lock()
|
||||
now := time.Now()
|
||||
for host, conns := range p.pool {
|
||||
n := len(conns)
|
||||
for i := 0; i < n; i++ {
|
||||
// Skip new connections
|
||||
conn := conns[i]
|
||||
if now.Sub(conn.lastUsed) < p.maxTime {
|
||||
continue
|
||||
}
|
||||
|
||||
// Close the conn
|
||||
conn.Close()
|
||||
|
||||
// Remove from pool
|
||||
conns[i], conns[n-1] = conns[n-1], nil
|
||||
conns = conns[:n-1]
|
||||
p.pool[host] = conns
|
||||
n--
|
||||
}
|
||||
}
|
||||
p.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,7 +95,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||
// Create server
|
||||
s := &Server{
|
||||
config: config,
|
||||
connPool: NewPool(5),
|
||||
connPool: NewPool(5, 0),
|
||||
eventChLAN: make(chan serf.Event, 256),
|
||||
eventChWAN: make(chan serf.Event, 256),
|
||||
logger: logger,
|
||||
|
|
Loading…
Reference in New Issue