mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
consul: thread the target DC through the RPC path
This commit is contained in:
parent
92e5548b23
commit
6cf2efca0c
@ -364,7 +364,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
|||||||
|
|
||||||
// Forward to remote Consul
|
// Forward to remote Consul
|
||||||
TRY_RPC:
|
TRY_RPC:
|
||||||
if err := c.connPool.RPC(server.Addr, server.Version, method, args, reply); err != nil {
|
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
||||||
c.lastServer = nil
|
c.lastServer = nil
|
||||||
c.lastRPCTime = time.Time{}
|
c.lastRPCTime = time.Time{}
|
||||||
return err
|
return err
|
||||||
|
@ -182,14 +182,14 @@ func (p *ConnPool) Shutdown() error {
|
|||||||
|
|
||||||
// Acquire is used to get a connection that is
|
// Acquire is used to get a connection that is
|
||||||
// pooled or to return a new connection
|
// pooled or to return a new connection
|
||||||
func (p *ConnPool) acquire(addr net.Addr, version int) (*Conn, error) {
|
func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) {
|
||||||
// Check for a pooled ocnn
|
// Check for a pooled ocnn
|
||||||
if conn := p.getPooled(addr, version); conn != nil {
|
if conn := p.getPooled(addr, version); conn != nil {
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new connection
|
// Create a new connection
|
||||||
return p.getNewConn(addr, version)
|
return p.getNewConn(dc, addr, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPooled is used to return a pooled connection
|
// getPooled is used to return a pooled connection
|
||||||
@ -205,7 +205,7 @@ func (p *ConnPool) getPooled(addr net.Addr, version int) *Conn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getNewConn is used to return a new connection
|
// getNewConn is used to return a new connection
|
||||||
func (p *ConnPool) getNewConn(addr net.Addr, version int) (*Conn, error) {
|
func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, error) {
|
||||||
// Try to dial the conn
|
// Try to dial the conn
|
||||||
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
|
conn, err := net.DialTimeout("tcp", addr.String(), 10*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -313,11 +313,11 @@ func (p *ConnPool) releaseConn(conn *Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getClient is used to get a usable client for an address and protocol version
|
// getClient is used to get a usable client for an address and protocol version
|
||||||
func (p *ConnPool) getClient(addr net.Addr, version int) (*Conn, *StreamClient, error) {
|
func (p *ConnPool) getClient(dc string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
|
||||||
retries := 0
|
retries := 0
|
||||||
START:
|
START:
|
||||||
// Try to get a conn first
|
// Try to get a conn first
|
||||||
conn, err := p.acquire(addr, version)
|
conn, err := p.acquire(dc, addr, version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
|
return nil, nil, fmt.Errorf("failed to get conn: %v", err)
|
||||||
}
|
}
|
||||||
@ -339,9 +339,9 @@ START:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RPC is used to make an RPC call to a remote host
|
// RPC is used to make an RPC call to a remote host
|
||||||
func (p *ConnPool) RPC(addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
|
func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, args interface{}, reply interface{}) error {
|
||||||
// Get a usable client
|
// Get a usable client
|
||||||
conn, sc, err := p.getClient(addr, version)
|
conn, sc, err := p.getClient(dc, addr, version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("rpc error: %v", err)
|
return fmt.Errorf("rpc error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -208,7 +208,7 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{
|
|||||||
if server == nil {
|
if server == nil {
|
||||||
return structs.ErrNoLeader
|
return structs.ErrNoLeader
|
||||||
}
|
}
|
||||||
return s.connPool.RPC(server.Addr, server.Version, method, args, reply)
|
return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
|
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
|
||||||
@ -229,7 +229,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
|
|||||||
|
|
||||||
// Forward to remote Consul
|
// Forward to remote Consul
|
||||||
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
|
metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
|
||||||
return s.connPool.RPC(server.Addr, server.Version, method, args, reply)
|
return s.connPool.RPC(dc, server.Addr, server.Version, method, args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
// globalRPC is used to forward an RPC request to one server in each datacenter.
|
// globalRPC is used to forward an RPC request to one server in each datacenter.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user