consul: adding connection pool

This commit is contained in:
Armon Dadgar 2013-12-09 12:09:57 -08:00
parent 9e8cdb6c7a
commit 2ad674677c
2 changed files with 135 additions and 0 deletions

128
consul/pool.go Normal file
View File

@ -0,0 +1,128 @@
package consul
import (
"net"
"sync"
"time"
)
// Conn is a pooled connection to a Consul server
type Conn struct {
addr net.Addr
conn *net.TCPConn
}
// ConnPool is used to maintain a connection pool to other
// Consul servers. This is used to reduce the latency of
// RPC requests between servers
type ConnPool struct {
sync.Mutex
// The maximum connectsion to maintain per server
maxConns int
// Pool maps an address to a list of connections
pool map[string][]*Conn
// Used to indicate the pool is shutdown
shutdown bool
}
// NewPool is used to make a new connection pool
// Maintain at most maxConns per host
func NewPool(maxConns int) *ConnPool {
pool := &ConnPool{
maxConns: maxConns,
pool: make(map[string][]*Conn),
}
return pool
}
// Shutdown is used to close the connection pool
func (p *ConnPool) Shutdown() error {
p.Lock()
defer p.Unlock()
for _, conns := range p.pool {
for _, c := range conns {
c.conn.Close()
}
}
p.pool = make(map[string][]*Conn)
p.shutdown = true
return nil
}
// Acquire is used to get a connection that is
// pooled or to return a new connection
func (p *ConnPool) Acquire(addr net.Addr) (*Conn, error) {
// Check for a pooled ocnn
if conn := p.getPooled(addr); conn != nil {
return conn, nil
}
// Create a new connection
return p.getNewConn(addr)
}
// getPooled is used to return a pooled connection
func (p *ConnPool) getPooled(addr net.Addr) *Conn {
p.Lock()
defer p.Unlock()
// Look for an existing connection
conns := p.pool[addr.String()]
if len(conns) == 0 {
return nil
}
// Remove the last conn from the pool
conn := conns[len(conns)-1]
conns = conns[:len(conns)-1]
p.pool[addr.String()] = conns
return conn
}
// getNewConn is used to return a new connection
func (p *ConnPool) getNewConn(addr net.Addr) (*Conn, error) {
// Try to dial the conn
rawConn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
if err != nil {
return nil, err
}
// Cast to TCPConn
conn := rawConn.(*net.TCPConn)
// Enable keep alives
conn.SetKeepAlive(true)
conn.SetNoDelay(true)
// Wrap the connection
c := &Conn{
addr: addr,
conn: conn,
}
return c, nil
}
// Return is used to return a connection once done. Connections
// that are in an error state should not be returned
func (p *ConnPool) Return(conn *Conn) {
p.Lock()
defer p.Unlock()
// Look for existing connections
conns := p.pool[conn.addr.String()]
// Check for limit on connections or shutdown
if p.shutdown || len(conns) >= p.maxConns {
conn.conn.Close()
return
}
// Retain the connection
conns = append(conns, conn)
p.pool[conn.addr.String()] = conns
}

View File

@ -24,6 +24,9 @@ const (
type Server struct { type Server struct {
config *Config config *Config
// Connection pool to other consul servers
connPool *ConnPool
// eventChLAN is used to receive events from the // eventChLAN is used to receive events from the
// serf cluster in the datacenter // serf cluster in the datacenter
eventChLAN chan serf.Event eventChLAN chan serf.Event
@ -85,6 +88,7 @@ func NewServer(config *Config) (*Server, error) {
// Create server // Create server
s := &Server{ s := &Server{
config: config, config: config,
connPool: NewPool(3),
eventChLAN: make(chan serf.Event, 256), eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256),
logger: logger, logger: logger,
@ -249,6 +253,9 @@ func (s *Server) Shutdown() error {
} }
s.rpcClientLock.Unlock() s.rpcClientLock.Unlock()
// Close the connection pool
s.connPool.Shutdown()
return nil return nil
} }