mirror of https://github.com/status-im/consul.git
[OSS] Fix merge central config tests (#13309)
Setting the right enterprise meta to fix the merge central config tests. Re-added the tests that were failing on the OSS to ENT merge.
This commit is contained in:
parent
364758ef2f
commit
1a901953e2
|
@ -8,6 +8,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
|
@ -1052,6 +1053,24 @@ func TestCatalogServiceNodes_ConnectProxy(t *testing.T) {
|
|||
assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy)
|
||||
}
|
||||
|
||||
func registerService(t *testing.T, a *TestAgent) (registerServiceReq *structs.RegisterRequest) {
|
||||
t.Helper()
|
||||
entMeta := acl.DefaultEnterpriseMeta()
|
||||
registerServiceReq = structs.TestRegisterRequestProxy(t)
|
||||
registerServiceReq.EnterpriseMeta = *entMeta
|
||||
registerServiceReq.Service.EnterpriseMeta = *entMeta
|
||||
registerServiceReq.Service.Proxy.Upstreams = structs.TestAddDefaultsToUpstreams(t, registerServiceReq.Service.Proxy.Upstreams, *entMeta)
|
||||
registerServiceReq.Check = &structs.HealthCheck{
|
||||
Node: registerServiceReq.Node,
|
||||
Name: "check1",
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
require.NoError(t, a.RPC("Catalog.Register", registerServiceReq, &out))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs.ProxyConfigEntry) {
|
||||
t.Helper()
|
||||
// Register proxy-defaults
|
||||
|
@ -1063,6 +1082,7 @@ func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs
|
|||
"local_connect_timeout_ms": uint64(1000),
|
||||
"handshake_timeout_ms": uint64(1000),
|
||||
},
|
||||
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
||||
}
|
||||
proxyDefaultsConfigEntryReq := &structs.ConfigEntryRequest{
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
|
@ -1070,7 +1090,7 @@ func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs
|
|||
Entry: &proxyGlobalEntry,
|
||||
}
|
||||
var proxyDefaultsConfigEntryResp bool
|
||||
assert.Nil(t, a.RPC("ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp))
|
||||
require.NoError(t, a.RPC("ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1091,6 +1111,7 @@ func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (se
|
|||
},
|
||||
},
|
||||
},
|
||||
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
||||
}
|
||||
serviceDefaultsConfigEntryReq := &structs.ConfigEntryRequest{
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
|
@ -1098,7 +1119,7 @@ func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (se
|
|||
Entry: &serviceDefaultsConfigEntry,
|
||||
}
|
||||
var serviceDefaultsConfigEntryResp bool
|
||||
assert.Nil(t, a.RPC("ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp))
|
||||
require.NoError(t, a.RPC("ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1108,24 +1129,24 @@ func validateMergeCentralConfigResponse(t *testing.T, v *structs.ServiceNode,
|
|||
serviceDefaultsConfigEntry structs.ServiceConfigEntry) {
|
||||
|
||||
t.Helper()
|
||||
assert.Equal(t, registerServiceReq.Service.Service, v.ServiceName)
|
||||
require.Equal(t, registerServiceReq.Service.Service, v.ServiceName)
|
||||
// validate proxy global defaults are resolved in the merged service config
|
||||
assert.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config)
|
||||
require.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config)
|
||||
// validate service defaults override proxy-defaults/global
|
||||
assert.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode)
|
||||
assert.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode)
|
||||
require.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode)
|
||||
require.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode)
|
||||
// validate service defaults are resolved in the merged service config
|
||||
// expected number of upstreams = (number of upstreams defined in the register request proxy config +
|
||||
// 1 default from service defaults)
|
||||
assert.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams))
|
||||
// 1 centrally configured default from service defaults)
|
||||
require.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams))
|
||||
for _, up := range v.ServiceProxy.Upstreams {
|
||||
if up.DestinationType != "" {
|
||||
if up.DestinationType != "" && up.DestinationType != structs.UpstreamDestTypeService {
|
||||
continue
|
||||
}
|
||||
assert.Contains(t, up.Config, "limits")
|
||||
require.Contains(t, up.Config, "limits")
|
||||
upstreamLimits := up.Config["limits"].(*structs.UpstreamLimits)
|
||||
assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections)
|
||||
assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode)
|
||||
require.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections)
|
||||
require.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1141,15 +1162,11 @@ func TestListServiceNodes_MergeCentralConfig(t *testing.T) {
|
|||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
// Register the service
|
||||
registerServiceReq := structs.TestRegisterRequestProxy(t)
|
||||
var out struct{}
|
||||
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
|
||||
|
||||
registerServiceReq := registerService(t, a)
|
||||
// Register proxy-defaults
|
||||
_ = registerProxyDefaults(t, a)
|
||||
|
||||
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||
// Register service-defaults
|
||||
_ = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||
|
||||
type testCase struct {
|
||||
testCaseName string
|
||||
|
@ -1172,16 +1189,16 @@ func TestListServiceNodes_MergeCentralConfig(t *testing.T) {
|
|||
obj, err = a.srv.CatalogServiceNodes(resp, req)
|
||||
}
|
||||
|
||||
assert.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
assertIndex(t, resp)
|
||||
|
||||
serviceNodes := obj.(structs.ServiceNodes)
|
||||
|
||||
// validate response
|
||||
assert.Len(t, serviceNodes, 1)
|
||||
// v := serviceNodes[0]
|
||||
require.Len(t, serviceNodes, 1)
|
||||
v := serviceNodes[0]
|
||||
|
||||
// validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||
}
|
||||
testCases := []testCase{
|
||||
{
|
||||
|
@ -1201,6 +1218,84 @@ func TestListServiceNodes_MergeCentralConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalogServiceNodes_MergeCentralConfigBlocking(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t, "")
|
||||
defer a.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
// Register the service
|
||||
registerServiceReq := registerService(t, a)
|
||||
// Register proxy-defaults
|
||||
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||
|
||||
// Run the query
|
||||
rpcReq := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: registerServiceReq.Service.Service,
|
||||
MergeCentralConfig: true,
|
||||
}
|
||||
var rpcResp structs.IndexedServiceNodes
|
||||
require.NoError(t, a.RPC("Catalog.ServiceNodes", &rpcReq, &rpcResp))
|
||||
|
||||
require.Len(t, rpcResp.ServiceNodes, 1)
|
||||
serviceNode := rpcResp.ServiceNodes[0]
|
||||
require.Equal(t, registerServiceReq.Service.Service, serviceNode.ServiceName)
|
||||
// validate proxy global defaults are resolved in the merged service config
|
||||
require.Equal(t, proxyGlobalEntry.Config, serviceNode.ServiceProxy.Config)
|
||||
require.Equal(t, proxyGlobalEntry.Mode, serviceNode.ServiceProxy.Mode)
|
||||
|
||||
// Async cause a change - register service defaults
|
||||
waitIndex := rpcResp.Index
|
||||
start := time.Now()
|
||||
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Register service-defaults
|
||||
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||
}()
|
||||
|
||||
const waitDuration = 3 * time.Second
|
||||
RUN_BLOCKING_QUERY:
|
||||
url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config&wait=%s&index=%d",
|
||||
registerServiceReq.Service.Service, waitDuration.String(), waitIndex)
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.CatalogServiceNodes(resp, req)
|
||||
|
||||
require.NoError(t, err)
|
||||
assertIndex(t, resp)
|
||||
|
||||
elapsed := time.Since(start)
|
||||
idx := getIndex(t, resp)
|
||||
if idx < waitIndex {
|
||||
t.Fatalf("bad index returned: %v", idx)
|
||||
} else if idx == waitIndex {
|
||||
if elapsed > waitDuration {
|
||||
// This should prevent the loop from running longer than the waitDuration
|
||||
t.Fatalf("too slow: %v", elapsed)
|
||||
}
|
||||
goto RUN_BLOCKING_QUERY
|
||||
}
|
||||
// Should block at least 100ms before getting the changed results
|
||||
if elapsed < 100*time.Millisecond {
|
||||
t.Fatalf("too fast: %v", elapsed)
|
||||
}
|
||||
|
||||
serviceNodes := obj.(structs.ServiceNodes)
|
||||
|
||||
// validate response
|
||||
require.Len(t, serviceNodes, 1)
|
||||
v := serviceNodes[0]
|
||||
|
||||
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||
}
|
||||
|
||||
// Test that the Connect-compatible endpoints can be queried for a
|
||||
// service via /v1/catalog/connect/:service.
|
||||
func TestCatalogConnectServiceNodes_good(t *testing.T) {
|
||||
|
|
|
@ -1894,19 +1894,11 @@ func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) {
|
|||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
// Register the service
|
||||
registerServiceReq := structs.TestRegisterRequestProxy(t)
|
||||
registerServiceReq.Check = &structs.HealthCheck{
|
||||
Node: registerServiceReq.Node,
|
||||
Name: "check1",
|
||||
}
|
||||
var out struct{}
|
||||
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
|
||||
|
||||
registerServiceReq := registerService(t, a)
|
||||
// Register proxy-defaults
|
||||
_ = registerProxyDefaults(t, a)
|
||||
|
||||
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||
// Register service-defaults
|
||||
_ = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||
|
||||
type testCase struct {
|
||||
testCaseName string
|
||||
|
@ -1929,16 +1921,16 @@ func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) {
|
|||
obj, err = a.srv.HealthServiceNodes(resp, req)
|
||||
}
|
||||
|
||||
assert.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
assertIndex(t, resp)
|
||||
|
||||
checkServiceNodes := obj.(structs.CheckServiceNodes)
|
||||
|
||||
// validate response
|
||||
assert.Len(t, checkServiceNodes, 1)
|
||||
// v := checkServiceNodes[0]
|
||||
require.Len(t, checkServiceNodes, 1)
|
||||
v := checkServiceNodes[0]
|
||||
|
||||
// validateMergeCentralConfigResponse(t, v.Service.ToServiceNode(registerServiceReq.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||
validateMergeCentralConfigResponse(t, v.Service.ToServiceNode(registerServiceReq.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||
}
|
||||
testCases := []testCase{
|
||||
{
|
||||
|
@ -1958,6 +1950,84 @@ func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealthServiceNodes_MergeCentralConfigBlocking(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t, "")
|
||||
defer a.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
// Register the service
|
||||
registerServiceReq := registerService(t, a)
|
||||
// Register proxy-defaults
|
||||
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||
|
||||
// Run the query
|
||||
rpcReq := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: registerServiceReq.Service.Service,
|
||||
MergeCentralConfig: true,
|
||||
}
|
||||
var rpcResp structs.IndexedCheckServiceNodes
|
||||
require.NoError(t, a.RPC("Health.ServiceNodes", &rpcReq, &rpcResp))
|
||||
|
||||
require.Len(t, rpcResp.Nodes, 1)
|
||||
nodeService := rpcResp.Nodes[0].Service
|
||||
require.Equal(t, registerServiceReq.Service.Service, nodeService.Service)
|
||||
// validate proxy global defaults are resolved in the merged service config
|
||||
require.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config)
|
||||
require.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode)
|
||||
|
||||
// Async cause a change - register service defaults
|
||||
waitIndex := rpcResp.Index
|
||||
start := time.Now()
|
||||
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Register service-defaults
|
||||
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||
}()
|
||||
|
||||
const waitDuration = 3 * time.Second
|
||||
RUN_BLOCKING_QUERY:
|
||||
url := fmt.Sprintf("/v1/health/service/%s?merge-central-config&wait=%s&index=%d",
|
||||
registerServiceReq.Service.Service, waitDuration.String(), waitIndex)
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||
|
||||
require.NoError(t, err)
|
||||
assertIndex(t, resp)
|
||||
|
||||
elapsed := time.Since(start)
|
||||
idx := getIndex(t, resp)
|
||||
if idx < waitIndex {
|
||||
t.Fatalf("bad index returned: %v", idx)
|
||||
} else if idx == waitIndex {
|
||||
if elapsed > waitDuration {
|
||||
// This should prevent the loop from running longer than the waitDuration
|
||||
t.Fatalf("too slow: %v", elapsed)
|
||||
}
|
||||
goto RUN_BLOCKING_QUERY
|
||||
}
|
||||
// Should block at least 100ms before getting the changed results
|
||||
if elapsed < 100*time.Millisecond {
|
||||
t.Fatalf("too fast: %v", elapsed)
|
||||
}
|
||||
|
||||
checkServiceNodes := obj.(structs.CheckServiceNodes)
|
||||
|
||||
// validate response
|
||||
require.Len(t, checkServiceNodes, 1)
|
||||
v := checkServiceNodes[0].Service.ToServiceNode(registerServiceReq.Node)
|
||||
|
||||
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||
}
|
||||
|
||||
func peerQuerySuffix(peerName string) string {
|
||||
if peerName == "" {
|
||||
return ""
|
||||
|
|
Loading…
Reference in New Issue