diff --git a/.changelog/10239.txt b/.changelog/10239.txt new file mode 100644 index 0000000000..a6e943dc71 --- /dev/null +++ b/.changelog/10239.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: ensure that central service config flattening properly resets the state each time +``` diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index b2529133d5..ec9b3feeeb 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -7,11 +7,12 @@ import ( "github.com/armon/go-metrics/prometheus" metrics "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" + "github.com/mitchellh/copystructure" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" - "github.com/mitchellh/copystructure" ) var ConfigSummaries = []prometheus.SummaryDefinition{ @@ -328,9 +329,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - reply.Reset() + var thisReply structs.ServiceConfigResponse - reply.MeshGateway.Mode = structs.MeshGatewayModeDefault + thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault // Pass the WatchSet to both the service and proxy config lookups. If either is updated // during the blocking query, this function will be rerun and these state store lookups // will both be current. @@ -364,28 +365,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r if err != nil { return fmt.Errorf("failed to copy global proxy-defaults: %v", err) } - reply.ProxyConfig = mapCopy.(map[string]interface{}) - reply.MeshGateway = proxyConf.MeshGateway - reply.Expose = proxyConf.Expose + thisReply.ProxyConfig = mapCopy.(map[string]interface{}) + thisReply.MeshGateway = proxyConf.MeshGateway + thisReply.Expose = proxyConf.Expose } - reply.Index = index + thisReply.Index = index if serviceConf != nil { if serviceConf.Expose.Checks { - reply.Expose.Checks = true + thisReply.Expose.Checks = true } if len(serviceConf.Expose.Paths) >= 1 { - reply.Expose.Paths = serviceConf.Expose.Paths + thisReply.Expose.Paths = serviceConf.Expose.Paths } if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { - reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode + thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode } if serviceConf.Protocol != "" { - if reply.ProxyConfig == nil { - reply.ProxyConfig = make(map[string]interface{}) + if thisReply.ProxyConfig == nil { + thisReply.ProxyConfig = make(map[string]interface{}) } - reply.ProxyConfig["protocol"] = serviceConf.Protocol + thisReply.ProxyConfig["protocol"] = serviceConf.Protocol } } @@ -443,26 +444,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r // don't allocate the slices just to not fill them if len(usConfigs) == 0 { + *reply = thisReply return nil } if legacyUpstreams { - if reply.UpstreamConfigs == nil { - reply.UpstreamConfigs = make(map[string]map[string]interface{}) + if thisReply.UpstreamConfigs == nil { + thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) } for us, conf := range usConfigs { - reply.UpstreamConfigs[us.ID] = conf + thisReply.UpstreamConfigs[us.ID] = conf } } else { - if reply.UpstreamIDConfigs == nil { - reply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs)) + if thisReply.UpstreamIDConfigs == nil { + thisReply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs)) } for us, conf := range usConfigs { - reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf}) + thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf}) } } + *reply = thisReply return nil }) } diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index ec5f3781a3..a97adf0bd9 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -5,12 +5,13 @@ import ( "testing" "time" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/stretchr/testify/require" ) func TestConfigEntry_Apply(t *testing.T) { @@ -858,6 +859,9 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { // of the blocking query does NOT bleed over into the next run. Concretely // in this test the data present in the initial proxy-defaults should not // be present when we are woken up due to proxy-defaults being deleted. + // + // This test does not pertain to upstreams, see: + // TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking state := s1.fsm.State() require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ @@ -1009,6 +1013,205 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { } } +func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // The main thing this should test is that information from one iteration + // of the blocking query does NOT bleed over into the next run. Concretely + // in this test the data present in the initial proxy-defaults should not + // be present when we are woken up due to proxy-defaults being deleted. + // + // This test is about fields in upstreams, see: + // TestConfigEntry_ResolveServiceConfig_Blocking + + state := s1.fsm.State() + require.NoError(t, state.EnsureConfigEntry(1, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + }, nil)) + require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + Protocol: "http", + }, nil)) + + var index uint64 + + runStep(t, "foo and bar should be both http", func(t *testing.T) { + // Verify that we get the results of service-defaults for 'foo' and 'bar'. + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + }, + &out, + )) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + UpstreamIDConfigs: []structs.UpstreamConfig{ + { + Upstream: structs.NewServiceID("bar", nil), + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) { + // Now setup a blocking query for 'foo' while we erase the + // service-defaults for bar. + + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + err := state.DeleteConfigEntry(index+1, + structs.ServiceDefaults, + "bar", + nil, + ) + if err != nil { + t.Errorf("delete config entry failed: %v", err) + } + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(t, out.Index, index+1) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "foo should be http and bar should be unset", func(t *testing.T) { + // Verify that we get the results of service-defaults for just 'foo'. + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + }, + &out, + )) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) + + runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) { + // Now setup a blocking query for 'foo' while we erase the + // service-defaults for foo. + + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + err := state.DeleteConfigEntry(index+1, + structs.ServiceDefaults, + "foo", + nil, + ) + if err != nil { + t.Errorf("delete config entry failed: %v", err) + } + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + structs.NewServiceID("other", nil), + }, + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(t, out.Index, index+1) + + expected := structs.ServiceConfigResponse{ + QueryMeta: out.QueryMeta, // don't care + } + + require.Equal(t, expected, out) + index = out.Index + }) +} + func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) { t.Parallel() @@ -1266,3 +1469,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) { require.True(t, ok) require.Equal(t, expose, proxyConf.Expose) } + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } +} diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index b6a1c8be8b..23801cd08a 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -619,12 +619,6 @@ type ServiceConfigResponse struct { QueryMeta } -func (r *ServiceConfigResponse) Reset() { - r.ProxyConfig = nil - r.UpstreamConfigs = nil - r.MeshGateway = MeshGatewayConfig{} -} - // MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here // because we need custom decoding of the raw interface{} values. func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {