mirror of https://github.com/status-im/consul.git
Ensure that config entry writes are forwarded to the primary DC (#6339)
This commit is contained in:
parent
9a5b258edf
commit
0c3cf47266
|
@ -19,6 +19,10 @@ type ConfigEntry struct {
|
||||||
|
|
||||||
// Apply does an upsert of the given config entry.
|
// Apply does an upsert of the given config entry.
|
||||||
func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error {
|
func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error {
|
||||||
|
// Ensure that all config entry writes go to the primary datacenter. These will then
|
||||||
|
// be replicated to all the other datacenters.
|
||||||
|
args.Datacenter = c.srv.config.PrimaryDatacenter
|
||||||
|
|
||||||
if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done {
|
if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -181,6 +185,10 @@ func (c *ConfigEntry) ListAll(args *structs.DCSpecificRequest, reply *structs.In
|
||||||
|
|
||||||
// Delete deletes a config entry.
|
// Delete deletes a config entry.
|
||||||
func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error {
|
func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error {
|
||||||
|
// Ensure that all config entry writes go to the primary datacenter. These will then
|
||||||
|
// be replicated to all the other datacenters.
|
||||||
|
args.Datacenter = c.srv.config.PrimaryDatacenter
|
||||||
|
|
||||||
if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done {
|
if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -15,34 +16,59 @@ import (
|
||||||
func TestConfigEntry_Apply(t *testing.T) {
|
func TestConfigEntry_Apply(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
require := require.New(t)
|
|
||||||
|
|
||||||
dir1, s1 := testServer(t)
|
dir1, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
defer s1.Shutdown()
|
defer s1.Shutdown()
|
||||||
codec := rpcClient(t, s1)
|
codec := rpcClient(t, s1)
|
||||||
defer codec.Close()
|
defer codec.Close()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.Datacenter = "dc2"
|
||||||
|
c.PrimaryDatacenter = "dc1"
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
codec2 := rpcClient(t, s2)
|
||||||
|
defer codec2.Close()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||||
|
joinWAN(t, s2, s1)
|
||||||
|
// wait for cross-dc queries to work
|
||||||
|
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||||
|
|
||||||
updated := &structs.ServiceConfigEntry{
|
updated := &structs.ServiceConfigEntry{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
}
|
}
|
||||||
|
// originally target this as going to dc2
|
||||||
args := structs.ConfigEntryRequest{
|
args := structs.ConfigEntryRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc2",
|
||||||
Entry: updated,
|
Entry: updated,
|
||||||
}
|
}
|
||||||
out := false
|
out := false
|
||||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Apply", &args, &out))
|
||||||
require.True(out)
|
require.True(t, out)
|
||||||
|
|
||||||
|
// the previous RPC should not return until the primary has been updated but will return
|
||||||
|
// before the secondary has the data.
|
||||||
state := s1.fsm.State()
|
state := s1.fsm.State()
|
||||||
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||||
require.NoError(err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
|
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
|
||||||
require.True(ok)
|
require.True(t, ok)
|
||||||
require.Equal("foo", serviceConf.Name)
|
require.Equal(t, "foo", serviceConf.Name)
|
||||||
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
|
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
|
||||||
|
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
// wait for replication to happen
|
||||||
|
state := s2.fsm.State()
|
||||||
|
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.NotNil(r, entry)
|
||||||
|
// this test is not testing that the config entries that are replicated are correct as thats done elsewhere.
|
||||||
|
})
|
||||||
|
|
||||||
updated = &structs.ServiceConfigEntry{
|
updated = &structs.ServiceConfigEntry{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
|
@ -57,23 +83,23 @@ func TestConfigEntry_Apply(t *testing.T) {
|
||||||
Entry: updated,
|
Entry: updated,
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
||||||
require.False(out)
|
require.False(t, out)
|
||||||
|
|
||||||
args.Entry.GetRaftIndex().ModifyIndex = serviceConf.ModifyIndex
|
args.Entry.GetRaftIndex().ModifyIndex = serviceConf.ModifyIndex
|
||||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
||||||
require.True(out)
|
require.True(t, out)
|
||||||
|
|
||||||
state = s1.fsm.State()
|
state = s1.fsm.State()
|
||||||
_, entry, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
_, entry, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||||
require.NoError(err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
serviceConf, ok = entry.(*structs.ServiceConfigEntry)
|
serviceConf, ok = entry.(*structs.ServiceConfigEntry)
|
||||||
require.True(ok)
|
require.True(t, ok)
|
||||||
|
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
|
||||||
updated.Kind = structs.ServiceDefaults // the server adds this
|
require.Equal(t, "foo", serviceConf.Name)
|
||||||
updated.RaftIndex = serviceConf.RaftIndex // these will always be different
|
require.Equal(t, "", serviceConf.Protocol)
|
||||||
require.Equal(updated, serviceConf)
|
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigEntry_ProxyDefaultsMeshGateway(t *testing.T) {
|
func TestConfigEntry_ProxyDefaultsMeshGateway(t *testing.T) {
|
||||||
|
@ -546,42 +572,71 @@ operator = "read"
|
||||||
func TestConfigEntry_Delete(t *testing.T) {
|
func TestConfigEntry_Delete(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
require := require.New(t)
|
|
||||||
|
|
||||||
dir1, s1 := testServer(t)
|
dir1, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
defer s1.Shutdown()
|
defer s1.Shutdown()
|
||||||
codec := rpcClient(t, s1)
|
codec := rpcClient(t, s1)
|
||||||
defer codec.Close()
|
defer codec.Close()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.Datacenter = "dc2"
|
||||||
|
c.PrimaryDatacenter = "dc1"
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
codec2 := rpcClient(t, s2)
|
||||||
|
defer codec2.Close()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||||
|
joinWAN(t, s2, s1)
|
||||||
|
// wait for cross-dc queries to work
|
||||||
|
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||||
|
|
||||||
// Create a dummy service in the state store to look up.
|
// Create a dummy service in the state store to look up.
|
||||||
entry := &structs.ServiceConfigEntry{
|
entry := &structs.ServiceConfigEntry{
|
||||||
Kind: structs.ServiceDefaults,
|
Kind: structs.ServiceDefaults,
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
}
|
}
|
||||||
state := s1.fsm.State()
|
state := s1.fsm.State()
|
||||||
require.NoError(state.EnsureConfigEntry(1, entry))
|
require.NoError(t, state.EnsureConfigEntry(1, entry))
|
||||||
|
|
||||||
// Verify it's there.
|
// Verify it's there.
|
||||||
_, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
_, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||||
require.NoError(err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
serviceConf, ok := existing.(*structs.ServiceConfigEntry)
|
serviceConf, ok := existing.(*structs.ServiceConfigEntry)
|
||||||
require.True(ok)
|
require.True(t, ok)
|
||||||
require.Equal("foo", serviceConf.Name)
|
require.Equal(t, "foo", serviceConf.Name)
|
||||||
require.Equal(structs.ServiceDefaults, serviceConf.Kind)
|
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
|
||||||
|
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
// wait for it to be replicated into the secondary dc
|
||||||
|
_, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.NotNil(r, existing)
|
||||||
|
})
|
||||||
|
|
||||||
|
// send the delete request to dc2 - it should get forwarded to dc1.
|
||||||
args := structs.ConfigEntryRequest{
|
args := structs.ConfigEntryRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc2",
|
||||||
}
|
}
|
||||||
args.Entry = entry
|
args.Entry = entry
|
||||||
var out struct{}
|
var out struct{}
|
||||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
|
require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Delete", &args, &out))
|
||||||
|
|
||||||
// Verify the entry was deleted.
|
// Verify the entry was deleted.
|
||||||
_, existing, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
_, existing, err = s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||||
require.NoError(err)
|
require.NoError(t, err)
|
||||||
require.Nil(existing)
|
require.Nil(t, existing)
|
||||||
|
|
||||||
|
// verify it gets deleted from the secondary too
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
_, existing, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo")
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.Nil(r, existing)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConfigEntry_Delete_ACLDeny(t *testing.T) {
|
func TestConfigEntry_Delete_ACLDeny(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue