diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 975c42d556..d54f464614 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -8,15 +8,19 @@ import ( "net/http/httptest" "net/url" "reflect" + "strconv" "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/serf/coordinate" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/serf/coordinate" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestHealthChecksInState(t *testing.T) { @@ -716,6 +720,12 @@ func TestHealthServiceNodes(t *testing.T) { if len(nodes) != 2 { r.Fatalf("Want 2 nodes") } + header := resp.Header().Get("X-Consul-Index") + if header == "" || header == "0" { + r.Fatalf("Want non-zero header: %q", header) + } + _, err = strconv.ParseUint(header, 10, 64) + r.Check(err) // Should be a cache hit! The data should've updated in the cache // in the background so this should've been fetched directly from @@ -727,6 +737,166 @@ func TestHealthServiceNodes(t *testing.T) { } } +func TestHealthServiceNodes_Blocking(t *testing.T) { + cases := []struct { + name string + hcl string + grpcMetrics bool + }{ + {name: "no streaming"}, + { + name: "streaming", + grpcMetrics: true, + hcl: ` +rpc { enable_streaming = true } +use_streaming_backend = true +`, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + + sink := metrics.NewInmemSink(5*time.Second, time.Minute) + metrics.NewGlobal(&metrics.Config{ + ServiceName: "testing", + AllowedPrefixes: []string{"testing.grpc."}, + }, sink) + + a := NewTestAgent(t, tc.hcl) + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + // Register some initial service instances + for i := 0; i < 2; i++ { + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: fmt.Sprintf("test%03d", i), + Service: "test", + }, + } + + var out struct{} + require.NoError(t, a.RPC("Catalog.Register", args, &out)) + } + + // Initial request should return two instances + req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) + + nodes := obj.(structs.CheckServiceNodes) + require.Len(t, nodes, 2) + + idx := getIndex(t, resp) + require.True(t, idx > 0) + + // errCh collects errors from goroutines since it's unsafe for them to use + // t to fail tests directly. + errCh := make(chan error, 1) + + checkErrs := func() { + // Ensure no errors were sent on errCh and drain any nils we have + for { + select { + case err := <-errCh: + require.NoError(t, err) + default: + return + } + } + } + + // Blocking on that index should block. We test that by launching another + // goroutine that will wait a while before updating the registration and + // make sure that we unblock before timeout and see the update but that it + // takes at least as long as the sleep time. + sleep := 200 * time.Millisecond + start := time.Now() + go func() { + time.Sleep(sleep) + + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "zoo", + Address: "127.0.0.3", + Service: &structs.NodeService{ + ID: "test", + Service: "test", + }, + } + + var out struct{} + errCh <- a.RPC("Catalog.Register", args, &out) + }() + + { + timeout := 30 * time.Second + url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", idx, timeout) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed > sleep, "request should block for at "+ + " least as long as sleep. sleep=%s, elapsed=%s", sleep, elapsed) + + require.True(t, elapsed < timeout, "request should unblock before"+ + " it timed out. timeout=%s, elapsed=%s", timeout, elapsed) + + nodes := obj.(structs.CheckServiceNodes) + require.Len(t, nodes, 3) + + newIdx := getIndex(t, resp) + require.True(t, idx < newIdx, "index should have increased."+ + "idx=%d, newIdx=%d", idx, newIdx) + + idx = newIdx + + checkErrs() + } + + // Blocking should last until timeout in absence of updates + start = time.Now() + { + timeout := 200 * time.Millisecond + url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", + idx, timeout) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) + elapsed := time.Since(start) + // Note that servers add jitter to timeout requested but don't remove it + // so this should always be true. + require.True(t, elapsed > timeout, "request should block for at "+ + " least as long as timeout. timeout=%s, elapsed=%s", timeout, elapsed) + + nodes := obj.(structs.CheckServiceNodes) + require.Len(t, nodes, 3) + + newIdx := getIndex(t, resp) + require.Equal(t, idx, newIdx) + } + + if tc.grpcMetrics { + data := sink.Data() + if l := len(data); l < 1 { + t.Errorf("expected at least 1 metrics interval, got :%v", l) + } + if count := len(data[0].Gauges); count < 2 { + t.Errorf("expected at least 2 grpc gauge metrics, got: %v", count) + } + } + }) + } +} + func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/rpc/subscribe/logger.go b/agent/rpc/subscribe/logger.go index 99394f5465..693c8604af 100644 --- a/agent/rpc/subscribe/logger.go +++ b/agent/rpc/subscribe/logger.go @@ -43,7 +43,7 @@ func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger { "dc", req.Datacenter, "key", req.Key, "namespace", req.Namespace, - "index", req.Index, + "request_index", req.Index, "stream_id", &streamID{}) } diff --git a/agent/testagent.go b/agent/testagent.go index c58c6a6c5d..3bbfe0cbe3 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -21,8 +21,6 @@ import ( uuid "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/connect" @@ -30,6 +28,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/freeport" + "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/tlsutil" ) @@ -55,8 +54,7 @@ type TestAgent struct { // when Shutdown() is called. Config *config.RuntimeConfig - // LogOutput is the sink for the logs. If nil, logs are written - // to os.Stderr. + // LogOutput is the sink for the logs. If nil, logs are written to os.Stderr. LogOutput io.Writer // DataDir may be set to a directory which exists. If is it not set,