mirror of https://github.com/status-im/consul.git
server: conditionally avoid writing a config entry to raft if it was already the same (#12321)
This will both save on unnecessary raft operations as well as unnecessarily incrementing the raft modify index of config entries subject to no-op updates.
This commit is contained in:
parent
80dfcb1bcd
commit
115946da99
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
server: conditionally avoid writing a config entry to raft if it was already the same
|
||||
```
|
|
@ -2,6 +2,7 @@ package consul
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
|
@ -93,6 +94,14 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error
|
|||
if args.Op != structs.ConfigEntryUpsert && args.Op != structs.ConfigEntryUpsertCAS {
|
||||
args.Op = structs.ConfigEntryUpsert
|
||||
}
|
||||
|
||||
if skip, err := c.shouldSkipOperation(args); err != nil {
|
||||
return err
|
||||
} else if skip {
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -104,6 +113,62 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error
|
|||
return nil
|
||||
}
|
||||
|
||||
// shouldSkipOperation returns true if the result of the operation has
|
||||
// already happened and is safe to skip.
|
||||
//
|
||||
// It is ok if this incorrectly detects something as changed when it
|
||||
// in fact has not, the important thing is that it doesn't do
|
||||
// the reverse and incorrectly detect a change as a no-op.
|
||||
func (c *ConfigEntry) shouldSkipOperation(args *structs.ConfigEntryRequest) (bool, error) {
|
||||
state := c.srv.fsm.State()
|
||||
_, currentEntry, err := state.ConfigEntry(nil, args.Entry.GetKind(), args.Entry.GetName(), args.Entry.GetEnterpriseMeta())
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error reading current config entry value: %w", err)
|
||||
}
|
||||
|
||||
switch args.Op {
|
||||
case structs.ConfigEntryUpsert, structs.ConfigEntryUpsertCAS:
|
||||
return c.shouldSkipUpsertOperation(currentEntry, args.Entry)
|
||||
case structs.ConfigEntryDelete, structs.ConfigEntryDeleteCAS:
|
||||
return (currentEntry == nil), nil
|
||||
default:
|
||||
return false, fmt.Errorf("invalid config entry operation type: %v", args.Op)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigEntry) shouldSkipUpsertOperation(currentEntry, updatedEntry structs.ConfigEntry) (bool, error) {
|
||||
if currentEntry == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if currentEntry.GetKind() != updatedEntry.GetKind() ||
|
||||
currentEntry.GetName() != updatedEntry.GetName() ||
|
||||
!currentEntry.GetEnterpriseMeta().IsSame(updatedEntry.GetEnterpriseMeta()) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// The only reason a fully Normalized and Validated config entry may
|
||||
// legitimately differ from the persisted one is due to the embedded
|
||||
// RaftIndex.
|
||||
//
|
||||
// So, to intercept more no-op upserts we temporarily set the new config
|
||||
// entry's raft index field to that of the existing data for the purposes
|
||||
// of comparison, and then restore it.
|
||||
var (
|
||||
currentRaftIndex = currentEntry.GetRaftIndex()
|
||||
userProvidedRaftIndex = updatedEntry.GetRaftIndex()
|
||||
|
||||
currentRaftIndexCopy = *currentRaftIndex
|
||||
userProvidedRaftIndexCopy = *userProvidedRaftIndex
|
||||
)
|
||||
|
||||
*userProvidedRaftIndex = currentRaftIndexCopy // change
|
||||
same := reflect.DeepEqual(currentEntry, updatedEntry) // compare
|
||||
*userProvidedRaftIndex = userProvidedRaftIndexCopy // restore
|
||||
|
||||
return same, nil
|
||||
}
|
||||
|
||||
// Get returns a single config entry by Kind/Name.
|
||||
func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigEntryResponse) error {
|
||||
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
||||
|
@ -309,6 +374,13 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *structs.Co
|
|||
args.Op = structs.ConfigEntryDelete
|
||||
}
|
||||
|
||||
if skip, err := c.shouldSkipOperation(args); err != nil {
|
||||
return err
|
||||
} else if skip {
|
||||
reply.Deleted = true
|
||||
return nil
|
||||
}
|
||||
|
||||
rsp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -45,22 +45,24 @@ func TestConfigEntry_Apply(t *testing.T) {
|
|||
// wait for cross-dc queries to work
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
runStep(t, "send the apply request to dc2 - it should get forwarded to dc1", func(t *testing.T) {
|
||||
updated := &structs.ServiceConfigEntry{
|
||||
Name: "foo",
|
||||
}
|
||||
// originally target this as going to dc2
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc2",
|
||||
Entry: updated,
|
||||
}
|
||||
out := false
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Apply", &args, &out))
|
||||
require.True(t, out)
|
||||
})
|
||||
|
||||
var originalModifyIndex uint64
|
||||
runStep(t, "verify the entry was updated in the primary and secondary", func(t *testing.T) {
|
||||
// the previous RPC should not return until the primary has been updated but will return
|
||||
// before the secondary has the data.
|
||||
state := s1.fsm.State()
|
||||
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
_, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
|
||||
|
@ -70,43 +72,88 @@ func TestConfigEntry_Apply(t *testing.T) {
|
|||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// wait for replication to happen
|
||||
state := s2.fsm.State()
|
||||
_, entry, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
_, entry, err := s2.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
require.NoError(r, err)
|
||||
require.NotNil(r, entry)
|
||||
// this test is not testing that the config entries that are replicated are correct as thats done elsewhere.
|
||||
})
|
||||
originalModifyIndex = serviceConf.ModifyIndex
|
||||
})
|
||||
|
||||
updated = &structs.ServiceConfigEntry{
|
||||
runStep(t, "update the entry again in the primary", func(t *testing.T) {
|
||||
updated := &structs.ServiceConfigEntry{
|
||||
Name: "foo",
|
||||
MeshGateway: structs.MeshGatewayConfig{
|
||||
Mode: structs.MeshGatewayModeLocal,
|
||||
},
|
||||
}
|
||||
|
||||
args = structs.ConfigEntryRequest{
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ConfigEntryUpsertCAS,
|
||||
Entry: updated,
|
||||
}
|
||||
|
||||
runStep(t, "with the wrong CAS", func(t *testing.T) {
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
||||
require.False(t, out)
|
||||
|
||||
args.Entry.GetRaftIndex().ModifyIndex = serviceConf.ModifyIndex
|
||||
})
|
||||
runStep(t, "with the correct CAS", func(t *testing.T) {
|
||||
var out bool
|
||||
args.Entry.GetRaftIndex().ModifyIndex = originalModifyIndex
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
||||
require.True(t, out)
|
||||
})
|
||||
})
|
||||
|
||||
state = s1.fsm.State()
|
||||
_, entry, err = state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
runStep(t, "verify the entry was updated in the state store", func(t *testing.T) {
|
||||
_, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
serviceConf, ok = entry.(*structs.ServiceConfigEntry)
|
||||
serviceConf, ok := entry.(*structs.ServiceConfigEntry)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
|
||||
require.Equal(t, "foo", serviceConf.Name)
|
||||
require.Equal(t, "", serviceConf.Protocol)
|
||||
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
|
||||
})
|
||||
|
||||
runStep(t, "verify no-op updates do not advance the raft indexes", func(t *testing.T) {
|
||||
var modifyIndex uint64
|
||||
for i := 0; i < 3; i++ {
|
||||
runStep(t, fmt.Sprintf("iteration %d", i), func(t *testing.T) {
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "noop",
|
||||
Protocol: "grpc",
|
||||
},
|
||||
}
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &args, &out))
|
||||
require.True(t, out)
|
||||
|
||||
getIndex, entry, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "noop", nil)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, entry)
|
||||
|
||||
listIndex, entries, err := s1.fsm.State().ConfigEntries(nil, nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, 2)
|
||||
|
||||
if i == 0 {
|
||||
modifyIndex = entry.GetRaftIndex().ModifyIndex
|
||||
} else {
|
||||
require.Equal(t, modifyIndex, entry.GetRaftIndex().ModifyIndex)
|
||||
require.Equal(t, modifyIndex, getIndex)
|
||||
require.Equal(t, modifyIndex, listIndex)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestConfigEntry_ProxyDefaultsMeshGateway(t *testing.T) {
|
||||
|
@ -623,16 +670,17 @@ func TestConfigEntry_Delete(t *testing.T) {
|
|||
// wait for cross-dc queries to work
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
// Create a dummy service in the state store to look up.
|
||||
runStep(t, "create a dummy service in the state store to look up", func(t *testing.T) {
|
||||
entry := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
}
|
||||
state := s1.fsm.State()
|
||||
require.NoError(t, state.EnsureConfigEntry(1, entry))
|
||||
require.NoError(t, s1.fsm.State().EnsureConfigEntry(1, entry))
|
||||
})
|
||||
|
||||
runStep(t, "verify it exists in the primary and is replicated to the secondary", func(t *testing.T) {
|
||||
// Verify it's there.
|
||||
_, existing, err := state.ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
_, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
serviceConf, ok := existing.(*structs.ServiceConfigEntry)
|
||||
|
@ -646,17 +694,24 @@ func TestConfigEntry_Delete(t *testing.T) {
|
|||
require.NoError(r, err)
|
||||
require.NotNil(r, existing)
|
||||
})
|
||||
})
|
||||
|
||||
// send the delete request to dc2 - it should get forwarded to dc1.
|
||||
runStep(t, "send the delete request to dc2 - it should get forwarded to dc1", func(t *testing.T) {
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc2",
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
},
|
||||
}
|
||||
args.Entry = entry
|
||||
var out struct{}
|
||||
var out structs.ConfigEntryDeleteResponse
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec2, "ConfigEntry.Delete", &args, &out))
|
||||
require.True(t, out.Deleted)
|
||||
})
|
||||
|
||||
runStep(t, "verify the entry was deleted in the primary and secondary", func(t *testing.T) {
|
||||
// Verify the entry was deleted.
|
||||
_, existing, err = s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
_, existing, err := s1.fsm.State().ConfigEntry(nil, structs.ServiceDefaults, "foo", nil)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, existing)
|
||||
|
||||
|
@ -666,6 +721,20 @@ func TestConfigEntry_Delete(t *testing.T) {
|
|||
require.NoError(r, err)
|
||||
require.Nil(r, existing)
|
||||
})
|
||||
})
|
||||
|
||||
runStep(t, "delete in dc1 again - should be fine", func(t *testing.T) {
|
||||
args := structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
},
|
||||
}
|
||||
var out structs.ConfigEntryDeleteResponse
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Delete", &args, &out))
|
||||
require.True(t, out.Deleted)
|
||||
})
|
||||
}
|
||||
|
||||
func TestConfigEntry_DeleteCAS(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue