From db4bf65e2eda361d4e5ced9caa5d31638d87fe87 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 3 Feb 2014 11:53:04 -0800 Subject: [PATCH] consul: client re-uses the last connection if error free --- consul/client.go | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/consul/client.go b/consul/client.go index adbab9e258..adca9da5b7 100644 --- a/consul/client.go +++ b/consul/client.go @@ -13,6 +13,10 @@ import ( "time" ) +var ( + clientRPCCache = 30 * time.Second +) + // Interface is used to provide either a Client or Server, // both of which can be used to perform certain common // Consul methods @@ -37,6 +41,11 @@ type Client struct { // serf cluster in the datacenter eventCh chan serf.Event + // lastServer is the last server we made an RPC call to, + // this is used to re-use the last connection + lastServer net.Addr + lastRPCTime time.Time + // Logger uses the provided LogOutput logger *log.Logger @@ -68,7 +77,7 @@ func NewClient(config *Config) (*Client, error) { // Create server c := &Client{ config: config, - connPool: NewPool(8, 30*time.Second), + connPool: NewPool(8, clientRPCCache), eventCh: make(chan serf.Event, 256), logger: logger, shutdownCh: make(chan struct{}), @@ -238,6 +247,15 @@ func (c *Client) nodeFail(me serf.MemberEvent) { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { + // Check the last rpc time + var server net.Addr + if time.Now().Sub(c.lastRPCTime) < clientRPCCache { + server = c.lastServer + if server != nil { + goto TRY_RPC + } + } + // Bail if we can't find any servers c.consulLock.RLock() if len(c.consuls) == 0 { @@ -246,10 +264,19 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { } // Select a random addr - offset := rand.Int31() % int32(len(c.consuls)) - server := c.consuls[offset] + server = c.consuls[rand.Int31()%int32(len(c.consuls))] c.consulLock.RUnlock() // Forward to remote Consul - return c.connPool.RPC(server, method, args, reply) +TRY_RPC: + if err := c.connPool.RPC(server, method, args, reply); err != nil { + c.lastServer = nil + c.lastRPCTime = time.Time{} + return err + } + + // Cache the last server + c.lastServer = server + c.lastRPCTime = time.Now() + return nil }