diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index bfe187ea7b..b822bdde82 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -1128,76 +1128,84 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { } for _, tst := range tests { t.Run(tst.name, func(t *testing.T) { - a := NewTestAgent(t, tst.config) - defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + waitForStreamingToBeReady(t, a) - req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil) - resp := httptest.NewRecorder() - obj, err := a.srv.HealthServiceNodes(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } + encodedMeta := url.QueryEscape("somekey:somevalue") - assertIndex(t, resp) + var lastIndex uint64 + testutil.RunStep(t, "do initial read", func(t *testing.T) { + u := fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=%s", encodedMeta) - cIndex, err := strconv.ParseUint(resp.Header().Get("X-Consul-Index"), 10, 64) - require.NoError(t, err) + req, err := http.NewRequest("GET", u, nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) - // Should be a non-nil empty list - nodes := obj.(structs.CheckServiceNodes) - if nodes == nil || len(nodes) != 0 { - t.Fatalf("bad: %v", obj) - } + lastIndex = getIndex(t, resp) + require.True(t, lastIndex > 0) - args := &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "bar", - Address: "127.0.0.1", - NodeMeta: map[string]string{"somekey": "somevalue"}, - Service: &structs.NodeService{ - ID: "test", - Service: "test", - }, - } + // Should be a non-nil empty list + nodes := obj.(structs.CheckServiceNodes) + require.NotNil(t, nodes) + require.Empty(t, nodes) + }) - var out struct{} - if err := a.RPC("Catalog.Register", args, &out); err != nil { - t.Fatalf("err: %v", err) - } + require.True(t, lastIndex > 0, "lastindex = %d", lastIndex) - args = &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "bar2", - Address: "127.0.0.1", - NodeMeta: map[string]string{"somekey": "othervalue"}, - Service: &structs.NodeService{ - ID: "test2", - Service: "test", - }, - } + testutil.RunStep(t, "register item 1", func(t *testing.T) { + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.1", + NodeMeta: map[string]string{"somekey": "somevalue"}, + Service: &structs.NodeService{ + ID: "test", + Service: "test", + }, + } - if err := a.RPC("Catalog.Register", args, &out); err != nil { - t.Fatalf("err: %v", err) - } + var ignored struct{} + require.NoError(t, a.RPC("Catalog.Register", args, &ignored)) + }) - req, _ = http.NewRequest("GET", fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue&index=%d&wait=10ms", cIndex), nil) - resp = httptest.NewRecorder() - obj, err = a.srv.HealthServiceNodes(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } + testutil.RunStep(t, "register item 2", func(t *testing.T) { + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar2", + Address: "127.0.0.1", + NodeMeta: map[string]string{"somekey": "othervalue"}, + Service: &structs.NodeService{ + ID: "test2", + Service: "test", + }, + } + var ignored struct{} + require.NoError(t, a.RPC("Catalog.Register", args, &ignored)) + }) - assertIndex(t, resp) + testutil.RunStep(t, "do blocking read", func(t *testing.T) { + u := fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=%s&index=%d&wait=100ms&cached", encodedMeta, lastIndex) - // Should be a non-nil empty list for checks - nodes = obj.(structs.CheckServiceNodes) - if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { - t.Fatalf("bad: %v", obj) - } + req, err := http.NewRequest("GET", u, nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) - require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) + assertIndex(t, resp) + + // Should be a non-nil empty list for checks + nodes := obj.(structs.CheckServiceNodes) + require.Len(t, nodes, 1) + require.NotNil(t, nodes[0].Checks) + require.Empty(t, nodes[0].Checks) + + require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) + }) }) } } @@ -1637,6 +1645,7 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { a := NewTestAgent(t, agentHCL) defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") + waitForStreamingToBeReady(t, a) // Register gateway gatewayArgs := structs.TestRegisterIngressGateway(t) @@ -2038,3 +2047,9 @@ func peerQuerySuffix(peerName string) string { } return "&peer=" + peerName } + +func waitForStreamingToBeReady(t *testing.T, a *TestAgent) { + retry.Run(t, func(r *retry.R) { + require.True(r, a.rpcClientHealth.IsReadyForStreaming()) + }) +} diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index a4bdae78a9..9b24a164dd 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -3,6 +3,8 @@ package health import ( "context" + "google.golang.org/grpc/connectivity" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/submatview" @@ -34,6 +36,16 @@ type MaterializedViewStore interface { NotifyCallback(ctx context.Context, req submatview.Request, cID string, cb cache.Callback) error } +// IsReadyForStreaming will indicate if the underlying gRPC connection is ready. +func (c *Client) IsReadyForStreaming() bool { + conn := c.MaterializerDeps.Conn + if conn == nil { + return false + } + + return conn.GetState() == connectivity.Ready +} + func (c *Client) ServiceNodes( ctx context.Context, req structs.ServiceSpecificRequest,