From 0c3cf47266187612ee23efbdfa19c4306976748e Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Tue, 20 Aug 2019 12:01:13 -0400 Subject: [PATCH] Ensure that config entry writes are forwarded to the primary DC (#6339) --- agent/consul/config_endpoint.go | 8 ++ agent/consul/config_endpoint_test.go | 119 ++++++++++++++++++++------- 2 files changed, 95 insertions(+), 32 deletions(-) diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 8f9b005260..01834f8af0 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -19,6 +19,10 @@ type ConfigEntry struct { // Apply does an upsert of the given config entry. func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error { + // Ensure that all config entry writes go to the primary datacenter. These will then + // be replicated to all the other datacenters. + args.Datacenter = c.srv.config.PrimaryDatacenter + if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done { return err } @@ -181,6 +185,10 @@ func (c *ConfigEntry) ListAll(args *structs.DCSpecificRequest, reply *structs.In // Delete deletes a config entry. func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error { + // Ensure that all config entry writes go to the primary datacenter. These will then + // be replicated to all the other datacenters. + args.Datacenter = c.srv.config.PrimaryDatacenter + if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done { return err } diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index ec32be119b..9fdcd847be 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" @@ -15,34 +16,59 @@ import ( func TestConfigEntry_Apply(t *testing.T) { t.Parallel() - require := require.New(t) - dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() codec := rpcClient(t, s1) defer codec.Close() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + codec2 := rpcClient(t, s2) + defer codec2.Close() + + testrpc.WaitForLeader(t, s2.RPC, "dc2") + joinWAN(t, s2, s1) + // wait for cross-dc queries to work + testrpc.WaitForLeader(t, s2.RPC, "dc1") + updated := &structs.ServiceConfigEntry{ Name: "foo", } - + // originally target this as going to dc2 args := structs.ConfigEntryRequest{ - Datacenter: "dc1", + Datacenter: "dc2", Entry: updated, } out := false - require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) - require.True(out) + require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Apply", &args, &out)) + require.True(t, out) + // the previous RPC should not return until the primary has been updated but will return + // before the secondary has the data. state := s1.fsm.State() _, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo") - require.NoError(err) + require.NoError(t, err) serviceConf, ok := entry.(*structs.ServiceConfigEntry) - require.True(ok) - require.Equal("foo", serviceConf.Name) - require.Equal(structs.ServiceDefaults, serviceConf.Kind) + require.True(t, ok) + require.Equal(t, "foo", serviceConf.Name) + require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) + + retry.Run(t, func(r *retry.R) { + // wait for replication to happen + state := s2.fsm.State() + _, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(r, err) + require.NotNil(r, entry) + // this test is not testing that the config entries that are replicated are correct as thats done elsewhere. + }) updated = &structs.ServiceConfigEntry{ Name: "foo", @@ -57,23 +83,23 @@ func TestConfigEntry_Apply(t *testing.T) { Entry: updated, } - require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) - require.False(out) + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) + require.False(t, out) args.Entry.GetRaftIndex().ModifyIndex = serviceConf.ModifyIndex - require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) - require.True(out) + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out)) + require.True(t, out) state = s1.fsm.State() _, entry, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo") - require.NoError(err) + require.NoError(t, err) serviceConf, ok = entry.(*structs.ServiceConfigEntry) - require.True(ok) - - updated.Kind = structs.ServiceDefaults // the server adds this - updated.RaftIndex = serviceConf.RaftIndex // these will always be different - require.Equal(updated, serviceConf) + require.True(t, ok) + require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) + require.Equal(t, "foo", serviceConf.Name) + require.Equal(t, "", serviceConf.Protocol) + require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) } func TestConfigEntry_ProxyDefaultsMeshGateway(t *testing.T) { @@ -546,42 +572,71 @@ operator = "read" func TestConfigEntry_Delete(t *testing.T) { t.Parallel() - require := require.New(t) - dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() codec := rpcClient(t, s1) defer codec.Close() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + codec2 := rpcClient(t, s2) + defer codec2.Close() + + testrpc.WaitForLeader(t, s2.RPC, "dc2") + joinWAN(t, s2, s1) + // wait for cross-dc queries to work + testrpc.WaitForLeader(t, s2.RPC, "dc1") + // Create a dummy service in the state store to look up. entry := &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: "foo", } state := s1.fsm.State() - require.NoError(state.EnsureConfigEntry(1, entry)) + require.NoError(t, state.EnsureConfigEntry(1, entry)) // Verify it's there. _, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo") - require.NoError(err) + require.NoError(t, err) serviceConf, ok := existing.(*structs.ServiceConfigEntry) - require.True(ok) - require.Equal("foo", serviceConf.Name) - require.Equal(structs.ServiceDefaults, serviceConf.Kind) + require.True(t, ok) + require.Equal(t, "foo", serviceConf.Name) + require.Equal(t, structs.ServiceDefaults, serviceConf.Kind) + retry.Run(t, func(r *retry.R) { + // wait for it to be replicated into the secondary dc + _, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(r, err) + require.NotNil(r, existing) + }) + + // send the delete request to dc2 - it should get forwarded to dc1. args := structs.ConfigEntryRequest{ - Datacenter: "dc1", + Datacenter: "dc2", } args.Entry = entry var out struct{} - require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out)) + require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Delete", &args, &out)) // Verify the entry was deleted. - _, existing, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo") - require.NoError(err) - require.Nil(existing) + _, existing, err = s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(t, err) + require.Nil(t, existing) + + // verify it gets deleted from the secondary too + retry.Run(t, func(r *retry.R) { + _, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo") + require.NoError(r, err) + require.Nil(r, existing) + }) } func TestConfigEntry_Delete_ACLDeny(t *testing.T) {