From 6cf2efca0c883284a985607a3fc57a353cee3fa0 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 8 May 2015 18:42:19 -0700 Subject: [PATCH] consul: thread the target DC through the RPC path --- consul/client.go | 2 +- consul/pool.go | 14 +++++++------- consul/rpc.go | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/consul/client.go b/consul/client.go index 3459274a32..766c188ab5 100644 --- a/consul/client.go +++ b/consul/client.go @@ -364,7 +364,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // Forward to remote Consul TRY_RPC: - if err := c.connPool.RPC(server.Addr, server.Version, method, args, reply); err != nil { + if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { c.lastServer = nil c.lastRPCTime = time.Time{} return err diff --git a/consul/pool.go b/consul/pool.go index 53f546a532..7799bf6303 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -182,14 +182,14 @@ func (p *ConnPool) Shutdown() error { // Acquire is used to get a connection that is // pooled or to return a new connection -func (p *ConnPool) acquire(addr net.Addr, version int) (*Conn, error) { +func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) { // Check for a pooled ocnn if conn := p.getPooled(addr, version); conn != nil { return conn, nil } // Create a new connection - return p.getNewConn(addr, version) + return p.getNewConn(dc, addr, version) } // getPooled is used to return a pooled connection @@ -205,7 +205,7 @@ func (p *ConnPool) getPooled(addr net.Addr, version int) *Conn { } // getNewConn is used to return a new connection -func (p *ConnPool) getNewConn(addr net.Addr, version int) (*Conn, error) { +func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) { // Try to dial the conn conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second) if err != nil { @@ -313,11 +313,11 @@ func (p *ConnPool) releaseConn(conn *Conn) { } // getClient is used to get a usable client for an address and protocol version -func (p *ConnPool) getClient(addr net.Addr, version int) (*Conn, *StreamClient, error) { +func (p *ConnPool) getClient(dc string, addr net.Addr, version int) (*Conn, *StreamClient, error) { retries := 0 START: // Try to get a conn first - conn, err := p.acquire(addr, version) + conn, err := p.acquire(dc, addr, version) if err != nil { return nil, nil, fmt.Errorf("failed to get conn: %v", err) } @@ -339,9 +339,9 @@ START: } // RPC is used to make an RPC call to a remote host -func (p *ConnPool) RPC(addr net.Addr, version int, method string, args interface{}, reply interface{}) error { +func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error { // Get a usable client - conn, sc, err := p.getClient(addr, version) + conn, sc, err := p.getClient(dc, addr, version) if err != nil { return fmt.Errorf("rpc error: %v", err) } diff --git a/consul/rpc.go b/consul/rpc.go index 412c627de7..6359d1cf6f 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -208,7 +208,7 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{ if server == nil { return structs.ErrNoLeader } - return s.connPool.RPC(server.Addr, server.Version, method, args, reply) + return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply) } // forwardDC is used to forward an RPC call to a remote DC, or fail if no servers @@ -229,7 +229,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ // Forward to remote Consul metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1) - return s.connPool.RPC(server.Addr, server.Version, method, args, reply) + return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply) } // globalRPC is used to forward an RPC request to one server in each datacenter.