consul: client re-uses the last connection if error free

This commit is contained in:
Armon Dadgar 2014-02-03 11:53:04 -08:00
parent be04291fb2
commit db4bf65e2e

View File

@ -13,6 +13,10 @@ import (
"time"
)
var (
clientRPCCache = 30 * time.Second
)
// Interface is used to provide either a Client or Server,
// both of which can be used to perform certain common
// Consul methods
@ -37,6 +41,11 @@ type Client struct {
// serf cluster in the datacenter
eventCh chan serf.Event
// lastServer is the last server we made an RPC call to,
// this is used to re-use the last connection
lastServer net.Addr
lastRPCTime time.Time
// Logger uses the provided LogOutput
logger *log.Logger
@ -68,7 +77,7 @@ func NewClient(config *Config) (*Client, error) {
// Create server
c := &Client{
config: config,
connPool: NewPool(8, 30*time.Second),
connPool: NewPool(8, clientRPCCache),
eventCh: make(chan serf.Event, 256),
logger: logger,
shutdownCh: make(chan struct{}),
@ -238,6 +247,15 @@ func (c *Client) nodeFail(me serf.MemberEvent) {
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Check the last rpc time
var server net.Addr
if time.Now().Sub(c.lastRPCTime) < clientRPCCache {
server = c.lastServer
if server != nil {
goto TRY_RPC
}
}
// Bail if we can't find any servers
c.consulLock.RLock()
if len(c.consuls) == 0 {
@ -246,10 +264,19 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
}
// Select a random addr
offset := rand.Int31() % int32(len(c.consuls))
server := c.consuls[offset]
server = c.consuls[rand.Int31()%int32(len(c.consuls))]
c.consulLock.RUnlock()
// Forward to remote Consul
return c.connPool.RPC(server, method, args, reply)
TRY_RPC:
if err := c.connPool.RPC(server, method, args, reply); err != nil {
c.lastServer = nil
c.lastRPCTime = time.Time{}
return err
}
// Cache the last server
c.lastServer = server
c.lastRPCTime = time.Now()
return nil
}