From 1a901953e2b33c3c75420d919c212c5378008e0f Mon Sep 17 00:00:00 2001 From: Riddhi Shah Date: Tue, 31 May 2022 12:04:19 -0700 Subject: [PATCH] [OSS] Fix merge central config tests (#13309) Setting the right enterprise meta to fix the merge central config tests. Re-added the tests that were failing on the OSS to ENT merge. --- agent/catalog_endpoint_test.go | 141 +++++++++++++++++++++++++++------ agent/health_endpoint_test.go | 100 +++++++++++++++++++---- 2 files changed, 203 insertions(+), 38 deletions(-) diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index d7688e46b4..ff45a10cbd 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/api" "github.com/hashicorp/serf/coordinate" @@ -1052,6 +1053,24 @@ func TestCatalogServiceNodes_ConnectProxy(t *testing.T) { assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy) } +func registerService(t *testing.T, a *TestAgent) (registerServiceReq *structs.RegisterRequest) { + t.Helper() + entMeta := acl.DefaultEnterpriseMeta() + registerServiceReq = structs.TestRegisterRequestProxy(t) + registerServiceReq.EnterpriseMeta = *entMeta + registerServiceReq.Service.EnterpriseMeta = *entMeta + registerServiceReq.Service.Proxy.Upstreams = structs.TestAddDefaultsToUpstreams(t, registerServiceReq.Service.Proxy.Upstreams, *entMeta) + registerServiceReq.Check = &structs.HealthCheck{ + Node: registerServiceReq.Node, + Name: "check1", + } + + var out struct{} + require.NoError(t, a.RPC("Catalog.Register", registerServiceReq, &out)) + + return +} + func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs.ProxyConfigEntry) { t.Helper() // Register proxy-defaults @@ -1063,6 +1082,7 @@ func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs "local_connect_timeout_ms": uint64(1000), "handshake_timeout_ms": uint64(1000), }, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), } proxyDefaultsConfigEntryReq := &structs.ConfigEntryRequest{ Op: structs.ConfigEntryUpsert, @@ -1070,7 +1090,7 @@ func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs Entry: &proxyGlobalEntry, } var proxyDefaultsConfigEntryResp bool - assert.Nil(t, a.RPC("ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp)) + require.NoError(t, a.RPC("ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp)) return } @@ -1091,6 +1111,7 @@ func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (se }, }, }, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), } serviceDefaultsConfigEntryReq := &structs.ConfigEntryRequest{ Op: structs.ConfigEntryUpsert, @@ -1098,7 +1119,7 @@ func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (se Entry: &serviceDefaultsConfigEntry, } var serviceDefaultsConfigEntryResp bool - assert.Nil(t, a.RPC("ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp)) + require.NoError(t, a.RPC("ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp)) return } @@ -1108,24 +1129,24 @@ func validateMergeCentralConfigResponse(t *testing.T, v *structs.ServiceNode, serviceDefaultsConfigEntry structs.ServiceConfigEntry) { t.Helper() - assert.Equal(t, registerServiceReq.Service.Service, v.ServiceName) + require.Equal(t, registerServiceReq.Service.Service, v.ServiceName) // validate proxy global defaults are resolved in the merged service config - assert.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config) + require.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config) // validate service defaults override proxy-defaults/global - assert.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode) - assert.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode) + require.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode) + require.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode) // validate service defaults are resolved in the merged service config // expected number of upstreams = (number of upstreams defined in the register request proxy config + - // 1 default from service defaults) - assert.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams)) + // 1 centrally configured default from service defaults) + require.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams)) for _, up := range v.ServiceProxy.Upstreams { - if up.DestinationType != "" { + if up.DestinationType != "" && up.DestinationType != structs.UpstreamDestTypeService { continue } - assert.Contains(t, up.Config, "limits") + require.Contains(t, up.Config, "limits") upstreamLimits := up.Config["limits"].(*structs.UpstreamLimits) - assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections) - assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode) + require.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections) + require.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode) } } @@ -1141,15 +1162,11 @@ func TestListServiceNodes_MergeCentralConfig(t *testing.T) { testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service - registerServiceReq := structs.TestRegisterRequestProxy(t) - var out struct{} - assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out)) - + registerServiceReq := registerService(t, a) // Register proxy-defaults - _ = registerProxyDefaults(t, a) - + proxyGlobalEntry := registerProxyDefaults(t, a) // Register service-defaults - _ = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) type testCase struct { testCaseName string @@ -1172,16 +1189,16 @@ func TestListServiceNodes_MergeCentralConfig(t *testing.T) { obj, err = a.srv.CatalogServiceNodes(resp, req) } - assert.Nil(t, err) + require.NoError(t, err) assertIndex(t, resp) serviceNodes := obj.(structs.ServiceNodes) // validate response - assert.Len(t, serviceNodes, 1) - // v := serviceNodes[0] + require.Len(t, serviceNodes, 1) + v := serviceNodes[0] - // validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) + validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) } testCases := []testCase{ { @@ -1201,6 +1218,84 @@ func TestListServiceNodes_MergeCentralConfig(t *testing.T) { } } +func TestCatalogServiceNodes_MergeCentralConfigBlocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := NewTestAgent(t, "") + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register the service + registerServiceReq := registerService(t, a) + // Register proxy-defaults + proxyGlobalEntry := registerProxyDefaults(t, a) + + // Run the query + rpcReq := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: registerServiceReq.Service.Service, + MergeCentralConfig: true, + } + var rpcResp structs.IndexedServiceNodes + require.NoError(t, a.RPC("Catalog.ServiceNodes", &rpcReq, &rpcResp)) + + require.Len(t, rpcResp.ServiceNodes, 1) + serviceNode := rpcResp.ServiceNodes[0] + require.Equal(t, registerServiceReq.Service.Service, serviceNode.ServiceName) + // validate proxy global defaults are resolved in the merged service config + require.Equal(t, proxyGlobalEntry.Config, serviceNode.ServiceProxy.Config) + require.Equal(t, proxyGlobalEntry.Mode, serviceNode.ServiceProxy.Mode) + + // Async cause a change - register service defaults + waitIndex := rpcResp.Index + start := time.Now() + var serviceDefaultsConfigEntry structs.ServiceConfigEntry + go func() { + time.Sleep(100 * time.Millisecond) + // Register service-defaults + serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + }() + + const waitDuration = 3 * time.Second +RUN_BLOCKING_QUERY: + url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config&wait=%s&index=%d", + registerServiceReq.Service.Service, waitDuration.String(), waitIndex) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.CatalogServiceNodes(resp, req) + + require.NoError(t, err) + assertIndex(t, resp) + + elapsed := time.Since(start) + idx := getIndex(t, resp) + if idx < waitIndex { + t.Fatalf("bad index returned: %v", idx) + } else if idx == waitIndex { + if elapsed > waitDuration { + // This should prevent the loop from running longer than the waitDuration + t.Fatalf("too slow: %v", elapsed) + } + goto RUN_BLOCKING_QUERY + } + // Should block at least 100ms before getting the changed results + if elapsed < 100*time.Millisecond { + t.Fatalf("too fast: %v", elapsed) + } + + serviceNodes := obj.(structs.ServiceNodes) + + // validate response + require.Len(t, serviceNodes, 1) + v := serviceNodes[0] + + validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) +} + // Test that the Connect-compatible endpoints can be queried for a // service via /v1/catalog/connect/:service. func TestCatalogConnectServiceNodes_good(t *testing.T) { diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index d41a1a0985..a339331051 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -1894,19 +1894,11 @@ func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) { testrpc.WaitForLeader(t, a.RPC, "dc1") // Register the service - registerServiceReq := structs.TestRegisterRequestProxy(t) - registerServiceReq.Check = &structs.HealthCheck{ - Node: registerServiceReq.Node, - Name: "check1", - } - var out struct{} - assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out)) - + registerServiceReq := registerService(t, a) // Register proxy-defaults - _ = registerProxyDefaults(t, a) - + proxyGlobalEntry := registerProxyDefaults(t, a) // Register service-defaults - _ = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) type testCase struct { testCaseName string @@ -1929,16 +1921,16 @@ func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) { obj, err = a.srv.HealthServiceNodes(resp, req) } - assert.Nil(t, err) + require.NoError(t, err) assertIndex(t, resp) checkServiceNodes := obj.(structs.CheckServiceNodes) // validate response - assert.Len(t, checkServiceNodes, 1) - // v := checkServiceNodes[0] + require.Len(t, checkServiceNodes, 1) + v := checkServiceNodes[0] - // validateMergeCentralConfigResponse(t, v.Service.ToServiceNode(registerServiceReq.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) + validateMergeCentralConfigResponse(t, v.Service.ToServiceNode(registerServiceReq.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) } testCases := []testCase{ { @@ -1958,6 +1950,84 @@ func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) { } } +func TestHealthServiceNodes_MergeCentralConfigBlocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := NewTestAgent(t, "") + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register the service + registerServiceReq := registerService(t, a) + // Register proxy-defaults + proxyGlobalEntry := registerProxyDefaults(t, a) + + // Run the query + rpcReq := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: registerServiceReq.Service.Service, + MergeCentralConfig: true, + } + var rpcResp structs.IndexedCheckServiceNodes + require.NoError(t, a.RPC("Health.ServiceNodes", &rpcReq, &rpcResp)) + + require.Len(t, rpcResp.Nodes, 1) + nodeService := rpcResp.Nodes[0].Service + require.Equal(t, registerServiceReq.Service.Service, nodeService.Service) + // validate proxy global defaults are resolved in the merged service config + require.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config) + require.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode) + + // Async cause a change - register service defaults + waitIndex := rpcResp.Index + start := time.Now() + var serviceDefaultsConfigEntry structs.ServiceConfigEntry + go func() { + time.Sleep(100 * time.Millisecond) + // Register service-defaults + serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + }() + + const waitDuration = 3 * time.Second +RUN_BLOCKING_QUERY: + url := fmt.Sprintf("/v1/health/service/%s?merge-central-config&wait=%s&index=%d", + registerServiceReq.Service.Service, waitDuration.String(), waitIndex) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + + require.NoError(t, err) + assertIndex(t, resp) + + elapsed := time.Since(start) + idx := getIndex(t, resp) + if idx < waitIndex { + t.Fatalf("bad index returned: %v", idx) + } else if idx == waitIndex { + if elapsed > waitDuration { + // This should prevent the loop from running longer than the waitDuration + t.Fatalf("too slow: %v", elapsed) + } + goto RUN_BLOCKING_QUERY + } + // Should block at least 100ms before getting the changed results + if elapsed < 100*time.Millisecond { + t.Fatalf("too fast: %v", elapsed) + } + + checkServiceNodes := obj.(structs.CheckServiceNodes) + + // validate response + require.Len(t, checkServiceNodes, 1) + v := checkServiceNodes[0].Service.ToServiceNode(registerServiceReq.Node) + + validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) +} + func peerQuerySuffix(peerName string) string { if peerName == "" { return ""