mirror of
https://github.com/status-im/consul.git
synced 2025-01-18 17:52:17 +00:00
agent: add a test for streaming in the service health endpoint
Co-authored-by: Paul Banks <banks@banksco.de>
This commit is contained in:
parent
5b2300e63f
commit
1d2d15b1e1
@ -8,15 +8,19 @@ import (
|
|||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/serf/coordinate"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/serf/coordinate"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHealthChecksInState(t *testing.T) {
|
func TestHealthChecksInState(t *testing.T) {
|
||||||
@ -716,6 +720,12 @@ func TestHealthServiceNodes(t *testing.T) {
|
|||||||
if len(nodes) != 2 {
|
if len(nodes) != 2 {
|
||||||
r.Fatalf("Want 2 nodes")
|
r.Fatalf("Want 2 nodes")
|
||||||
}
|
}
|
||||||
|
header := resp.Header().Get("X-Consul-Index")
|
||||||
|
if header == "" || header == "0" {
|
||||||
|
r.Fatalf("Want non-zero header: %q", header)
|
||||||
|
}
|
||||||
|
_, err = strconv.ParseUint(header, 10, 64)
|
||||||
|
r.Check(err)
|
||||||
|
|
||||||
// Should be a cache hit! The data should've updated in the cache
|
// Should be a cache hit! The data should've updated in the cache
|
||||||
// in the background so this should've been fetched directly from
|
// in the background so this should've been fetched directly from
|
||||||
@ -727,6 +737,166 @@ func TestHealthServiceNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHealthServiceNodes_Blocking(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
hcl string
|
||||||
|
grpcMetrics bool
|
||||||
|
}{
|
||||||
|
{name: "no streaming"},
|
||||||
|
{
|
||||||
|
name: "streaming",
|
||||||
|
grpcMetrics: true,
|
||||||
|
hcl: `
|
||||||
|
rpc { enable_streaming = true }
|
||||||
|
use_streaming_backend = true
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
|
||||||
|
sink := metrics.NewInmemSink(5*time.Second, time.Minute)
|
||||||
|
metrics.NewGlobal(&metrics.Config{
|
||||||
|
ServiceName: "testing",
|
||||||
|
AllowedPrefixes: []string{"testing.grpc."},
|
||||||
|
}, sink)
|
||||||
|
|
||||||
|
a := NewTestAgent(t, tc.hcl)
|
||||||
|
defer a.Shutdown()
|
||||||
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register some initial service instances
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
args := &structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "bar",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: fmt.Sprintf("test%03d", i),
|
||||||
|
Service: "test",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var out struct{}
|
||||||
|
require.NoError(t, a.RPC("Catalog.Register", args, &out))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initial request should return two instances
|
||||||
|
req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1", nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
|
require.Len(t, nodes, 2)
|
||||||
|
|
||||||
|
idx := getIndex(t, resp)
|
||||||
|
require.True(t, idx > 0)
|
||||||
|
|
||||||
|
// errCh collects errors from goroutines since it's unsafe for them to use
|
||||||
|
// t to fail tests directly.
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
|
||||||
|
checkErrs := func() {
|
||||||
|
// Ensure no errors were sent on errCh and drain any nils we have
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.NoError(t, err)
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Blocking on that index should block. We test that by launching another
|
||||||
|
// goroutine that will wait a while before updating the registration and
|
||||||
|
// make sure that we unblock before timeout and see the update but that it
|
||||||
|
// takes at least as long as the sleep time.
|
||||||
|
sleep := 200 * time.Millisecond
|
||||||
|
start := time.Now()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(sleep)
|
||||||
|
|
||||||
|
args := &structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "zoo",
|
||||||
|
Address: "127.0.0.3",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "test",
|
||||||
|
Service: "test",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var out struct{}
|
||||||
|
errCh <- a.RPC("Catalog.Register", args, &out)
|
||||||
|
}()
|
||||||
|
|
||||||
|
{
|
||||||
|
timeout := 30 * time.Second
|
||||||
|
url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", idx, timeout)
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
require.True(t, elapsed > sleep, "request should block for at "+
|
||||||
|
" least as long as sleep. sleep=%s, elapsed=%s", sleep, elapsed)
|
||||||
|
|
||||||
|
require.True(t, elapsed < timeout, "request should unblock before"+
|
||||||
|
" it timed out. timeout=%s, elapsed=%s", timeout, elapsed)
|
||||||
|
|
||||||
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
|
require.Len(t, nodes, 3)
|
||||||
|
|
||||||
|
newIdx := getIndex(t, resp)
|
||||||
|
require.True(t, idx < newIdx, "index should have increased."+
|
||||||
|
"idx=%d, newIdx=%d", idx, newIdx)
|
||||||
|
|
||||||
|
idx = newIdx
|
||||||
|
|
||||||
|
checkErrs()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Blocking should last until timeout in absence of updates
|
||||||
|
start = time.Now()
|
||||||
|
{
|
||||||
|
timeout := 200 * time.Millisecond
|
||||||
|
url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s",
|
||||||
|
idx, timeout)
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
// Note that servers add jitter to timeout requested but don't remove it
|
||||||
|
// so this should always be true.
|
||||||
|
require.True(t, elapsed > timeout, "request should block for at "+
|
||||||
|
" least as long as timeout. timeout=%s, elapsed=%s", timeout, elapsed)
|
||||||
|
|
||||||
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
|
require.Len(t, nodes, 3)
|
||||||
|
|
||||||
|
newIdx := getIndex(t, resp)
|
||||||
|
require.Equal(t, idx, newIdx)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.grpcMetrics {
|
||||||
|
data := sink.Data()
|
||||||
|
if l := len(data); l < 1 {
|
||||||
|
t.Errorf("expected at least 1 metrics interval, got :%v", l)
|
||||||
|
}
|
||||||
|
if count := len(data[0].Gauges); count < 2 {
|
||||||
|
t.Errorf("expected at least 2 grpc gauge metrics, got: %v", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
@ -43,7 +43,7 @@ func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger {
|
|||||||
"dc", req.Datacenter,
|
"dc", req.Datacenter,
|
||||||
"key", req.Key,
|
"key", req.Key,
|
||||||
"namespace", req.Namespace,
|
"namespace", req.Namespace,
|
||||||
"index", req.Index,
|
"request_index", req.Index,
|
||||||
"stream_id", &streamID{})
|
"stream_id", &streamID{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,8 +21,6 @@ import (
|
|||||||
uuid "github.com/hashicorp/go-uuid"
|
uuid "github.com/hashicorp/go-uuid"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/connect"
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
@ -30,6 +28,7 @@ import (
|
|||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/freeport"
|
"github.com/hashicorp/consul/sdk/freeport"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
)
|
)
|
||||||
@ -55,8 +54,7 @@ type TestAgent struct {
|
|||||||
// when Shutdown() is called.
|
// when Shutdown() is called.
|
||||||
Config *config.RuntimeConfig
|
Config *config.RuntimeConfig
|
||||||
|
|
||||||
// LogOutput is the sink for the logs. If nil, logs are written
|
// LogOutput is the sink for the logs. If nil, logs are written to os.Stderr.
|
||||||
// to os.Stderr.
|
|
||||||
LogOutput io.Writer
|
LogOutput io.Writer
|
||||||
|
|
||||||
// DataDir may be set to a directory which exists. If is it not set,
|
// DataDir may be set to a directory which exists. If is it not set,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user