Allow forwarding of some status RPCs (#6198)

* Allow forwarding of some status RPCs

* Update docs

* add comments about not using the regular forward
This commit is contained in:
Matt Keeler 2019-07-25 14:26:22 -04:00 committed by GitHub
parent 54ef3d5a40
commit 8b54307be2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 173 additions and 5 deletions

View File

@ -287,7 +287,7 @@ func (s *Server) maybeBootstrap() {
// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version,
"Status.Peers", server.UseTLS, &struct{}{}, &peers); err != nil {
"Status.Peers", server.UseTLS, &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil {
nextRetry := time.Duration((1 << attempt) * peerRetryBase)
s.logger.Printf("[ERR] consul: Failed to confirm peer status for %s: %v. Retrying in "+
"%v...", server.Name, err, nextRetry.String())

View File

@ -5,6 +5,7 @@ import (
"strconv"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
)
// Status endpoint is used to check on server status
@ -18,7 +19,14 @@ func (s *Status) Ping(args struct{}, reply *struct{}) error {
}
// Leader is used to get the address of the leader
func (s *Status) Leader(args struct{}, reply *string) error {
func (s *Status) Leader(args *structs.DCSpecificRequest, reply *string) error {
// not using the regular forward function as it does a bunch of stuff we
// dont want like verifying consistency etc. We just want to enable DC
// forwarding
if args.Datacenter != "" && args.Datacenter != s.server.config.Datacenter {
return s.server.forwardDC("Status.Leader", args.Datacenter, args, reply)
}
leader := string(s.server.raft.Leader())
if leader != "" {
*reply = leader
@ -29,7 +37,14 @@ func (s *Status) Leader(args struct{}, reply *string) error {
}
// Peers is used to get all the Raft peers
func (s *Status) Peers(args struct{}, reply *[]string) error {
func (s *Status) Peers(args *structs.DCSpecificRequest, reply *[]string) error {
// not using the regular forward function as it does a bunch of stuff we
// dont want like verifying consistency etc. We just want to enable DC
// forwarding
if args.Datacenter != "" && args.Datacenter != s.server.config.Datacenter {
return s.server.forwardDC("Status.Peers", args.Datacenter, args, reply)
}
future := s.server.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err

View File

@ -8,9 +8,11 @@ import (
"time"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
)
func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
@ -69,6 +71,32 @@ func TestStatusLeader(t *testing.T) {
}
}
func TestStatusLeader_ForwardDC(t *testing.T) {
t.Parallel()
dir1, s1 := testServerDC(t, "primary")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerDC(t, "secondary")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "secondary")
testrpc.WaitForLeader(t, s2.RPC, "primary")
args := structs.DCSpecificRequest{
Datacenter: "secondary",
}
var out string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.Leader", &args, &out))
require.Equal(t, s2.config.RPCAdvertise.String(), out)
}
func TestStatusPeers(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
@ -86,3 +114,29 @@ func TestStatusPeers(t *testing.T) {
t.Fatalf("no peers: %v", peers)
}
}
func TestStatusPeers_ForwardDC(t *testing.T) {
t.Parallel()
dir1, s1 := testServerDC(t, "primary")
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerDC(t, "secondary")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "secondary")
testrpc.WaitForLeader(t, s2.RPC, "primary")
args := structs.DCSpecificRequest{
Datacenter: "secondary",
}
var out []string
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.Peers", &args, &out))
require.Equal(t, []string{s2.config.RPCAdvertise.String()}, out)
}

View File

@ -2,19 +2,31 @@ package agent
import (
"net/http"
"github.com/hashicorp/consul/agent/structs"
)
func (s *HTTPServer) StatusLeader(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var out string
if err := s.agent.RPC("Status.Leader", struct{}{}, &out); err != nil {
if err := s.agent.RPC("Status.Leader", &args, &out); err != nil {
return nil, err
}
return out, nil
}
func (s *HTTPServer) StatusPeers(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var out []string
if err := s.agent.RPC("Status.Peers", struct{}{}, &out); err != nil {
if err := s.agent.RPC("Status.Peers", &args, &out); err != nil {
return nil, err
}
return out, nil

View File

@ -1,10 +1,13 @@
package agent
import (
"fmt"
"net/http"
"testing"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)
func TestStatusLeader(t *testing.T) {
@ -24,6 +27,42 @@ func TestStatusLeader(t *testing.T) {
}
}
func TestStatusLeaderSecondary(t *testing.T) {
t.Parallel()
a1 := NewTestAgent(t, t.Name(), "datacenter = \"primary\"")
defer a1.Shutdown()
a2 := NewTestAgent(t, t.Name(), "datacenter = \"secondary\"")
defer a2.Shutdown()
testrpc.WaitForTestAgent(t, a1.RPC, "primary")
testrpc.WaitForTestAgent(t, a2.RPC, "secondary")
a1SerfAddr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
a1Addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.ServerPort)
a2Addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.ServerPort)
_, err := a2.JoinWAN([]string{a1SerfAddr})
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
require.Len(r, a1.WANMembers(), 2)
require.Len(r, a2.WANMembers(), 2)
})
req, _ := http.NewRequest("GET", "/v1/status/leader?dc=secondary", nil)
obj, err := a1.srv.StatusLeader(nil, req)
require.NoError(t, err)
leader, ok := obj.(string)
require.True(t, ok)
require.Equal(t, a2Addr, leader)
req, _ = http.NewRequest("GET", "/v1/status/leader?dc=primary", nil)
obj, err = a2.srv.StatusLeader(nil, req)
require.NoError(t, err)
leader, ok = obj.(string)
require.True(t, ok)
require.Equal(t, a1Addr, leader)
}
func TestStatusPeers(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
@ -40,3 +79,39 @@ func TestStatusPeers(t *testing.T) {
t.Fatalf("bad peers: %v", peers)
}
}
func TestStatusPeersSecondary(t *testing.T) {
t.Parallel()
a1 := NewTestAgent(t, t.Name(), "datacenter = \"primary\"")
defer a1.Shutdown()
a2 := NewTestAgent(t, t.Name(), "datacenter = \"secondary\"")
defer a2.Shutdown()
testrpc.WaitForTestAgent(t, a1.RPC, "primary")
testrpc.WaitForTestAgent(t, a2.RPC, "secondary")
a1SerfAddr := fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortWAN)
a1Addr := fmt.Sprintf("127.0.0.1:%d", a1.Config.ServerPort)
a2Addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.ServerPort)
_, err := a2.JoinWAN([]string{a1SerfAddr})
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
require.Len(r, a1.WANMembers(), 2)
require.Len(r, a2.WANMembers(), 2)
})
req, _ := http.NewRequest("GET", "/v1/status/peers?dc=secondary", nil)
obj, err := a1.srv.StatusPeers(nil, req)
require.NoError(t, err)
peers, ok := obj.([]string)
require.True(t, ok)
require.Equal(t, []string{a2Addr}, peers)
req, _ = http.NewRequest("GET", "/v1/status/peers?dc=primary", nil)
obj, err = a2.srv.StatusPeers(nil, req)
require.NoError(t, err)
peers, ok = obj.([]string)
require.True(t, ok)
require.Equal(t, []string{a1Addr}, peers)
}

View File

@ -33,6 +33,12 @@ The table below shows this endpoint's support for
| ---------------- | ----------------- | ------------- | ------------ |
| `NO` | `none` | `none` | `none` |
### Parameters
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried. This is specified as part of the
URL as a query parameter.
### Sample Request
```text
@ -65,6 +71,12 @@ The table below shows this endpoint's support for
| ---------------- | ----------------- | ------------- | ------------ |
| `NO` | `none` | `none` | `none` |
### Parameters
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried. This is specified as part of the
URL as a query parameter.
### Sample Request
```text