mirror of https://github.com/status-im/consul.git
server: skip deleted and deleting namespaces when migrating intentions to config entries (#9186)
This commit is contained in:
parent
7af643ac37
commit
c52bc632df
|
@ -0,0 +1,18 @@
|
||||||
|
```release-note:bug
|
||||||
|
server: skip deleted and deleting namespaces when migrating intentions to config entries
|
||||||
|
```
|
||||||
|
|
||||||
|
```release-note:breaking-change
|
||||||
|
server: **(Enterprise only)** Pre-existing intentions defined with
|
||||||
|
non-existent destination namespaces were non-functional and are erased during
|
||||||
|
the upgrade process. This should not matter as these intentions had nothing to
|
||||||
|
enforce.
|
||||||
|
```
|
||||||
|
|
||||||
|
```release-note:breaking-change
|
||||||
|
server: **(OSS only)** Pre-existing intentions defined with either a source or
|
||||||
|
destination namespace value that is not "default" are rewritten or deleted
|
||||||
|
during the upgrade process. Wildcards first attempt to downgrade to "default"
|
||||||
|
unless an intention already exists, otherwise these non-functional intentions
|
||||||
|
are deleted.
|
||||||
|
```
|
|
@ -100,6 +100,11 @@ func (s *Server) legacyIntentionMigration(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
entries, err = s.filterMigratedLegacyIntentions(entries)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Totally cheat and repurpose one part of config entry replication
|
// Totally cheat and repurpose one part of config entry replication
|
||||||
// here so we automatically get our writes rate limited.
|
// here so we automatically get our writes rate limited.
|
||||||
_, err = s.reconcileLocalConfig(ctx, entries, structs.ConfigEntryUpsert)
|
_, err = s.reconcileLocalConfig(ctx, entries, structs.ConfigEntryUpsert)
|
||||||
|
|
|
@ -84,3 +84,7 @@ func migrateIntentionsToConfigEntries(ixns structs.Intentions) []*structs.Servic
|
||||||
|
|
||||||
return structs.MigrateIntentions(output)
|
return structs.MigrateIntentions(output)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) filterMigratedLegacyIntentions(entries []structs.ConfigEntry) ([]structs.ConfigEntry, error) {
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
|
|
|
@ -11,6 +11,13 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func testLeader_LegacyIntentionMigrationHookEnterprise(_ *testing.T, _ *Server, _ bool) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendLegacyIntentionsForMigrationTestEnterprise(_ *testing.T, _ *Server, ixns []*structs.Intention) []*structs.Intention {
|
||||||
|
return ixns
|
||||||
|
}
|
||||||
|
|
||||||
func TestMigrateIntentionsToConfigEntries(t *testing.T) {
|
func TestMigrateIntentionsToConfigEntries(t *testing.T) {
|
||||||
compare := func(t *testing.T, got structs.Intentions, expect [][]string) {
|
compare := func(t *testing.T, got structs.Intentions, expect [][]string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
|
@ -425,6 +425,11 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
||||||
makeIxn("contractor", "*", false),
|
makeIxn("contractor", "*", false),
|
||||||
makeIxn("*", "*", true),
|
makeIxn("*", "*", true),
|
||||||
}
|
}
|
||||||
|
ixns = appendLegacyIntentionsForMigrationTestEnterprise(t, s1pre, ixns)
|
||||||
|
|
||||||
|
testLeader_LegacyIntentionMigrationHookEnterprise(t, s1pre, true)
|
||||||
|
|
||||||
|
var retained []*structs.Intention
|
||||||
for _, ixn := range ixns {
|
for _, ixn := range ixns {
|
||||||
ixn2 := *ixn
|
ixn2 := *ixn
|
||||||
resp, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{
|
resp, err := s1pre.raftApply(structs.IntentionRequestType, &structs.IntentionRequest{
|
||||||
|
@ -435,6 +440,10 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
||||||
if respErr, ok := resp.(error); ok {
|
if respErr, ok := resp.(error); ok {
|
||||||
t.Fatalf("respErr: %v", respErr)
|
t.Fatalf("respErr: %v", respErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, present := ixn.Meta["unit-test-discarded"]; !present {
|
||||||
|
retained = append(retained, ixn)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mapify := func(ixns []*structs.Intention) map[string]*structs.Intention {
|
mapify := func(ixns []*structs.Intention) map[string]*structs.Intention {
|
||||||
|
@ -465,7 +474,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
||||||
for k, expectV := range expect {
|
for k, expectV := range expect {
|
||||||
gotV, ok := gotM[k]
|
gotV, ok := gotM[k]
|
||||||
if !ok {
|
if !ok {
|
||||||
r.Errorf("results are missing key %q", k)
|
r.Errorf("results are missing key %q: %v", k, expectV)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -483,8 +492,14 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
expectM := mapify(ixns)
|
expectM := mapify(ixns)
|
||||||
checkIntentions(t, s1pre, false, expectM)
|
expectRetainedM := mapify(retained)
|
||||||
checkIntentions(t, s1pre, true, expectM)
|
|
||||||
|
require.True(t, t.Run("check initial intentions", func(t *testing.T) {
|
||||||
|
checkIntentions(t, s1pre, false, expectM)
|
||||||
|
}))
|
||||||
|
require.True(t, t.Run("check initial legacy intentions", func(t *testing.T) {
|
||||||
|
checkIntentions(t, s1pre, true, expectM)
|
||||||
|
}))
|
||||||
|
|
||||||
// Shutdown s1pre and restart it to trigger migration.
|
// Shutdown s1pre and restart it to trigger migration.
|
||||||
s1pre.Shutdown()
|
s1pre.Shutdown()
|
||||||
|
@ -500,8 +515,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
||||||
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
// check that all 7 intentions are present before migration
|
testLeader_LegacyIntentionMigrationHookEnterprise(t, s1, false)
|
||||||
checkIntentions(t, s1, false, expectM)
|
|
||||||
|
|
||||||
// Wait until the migration routine is complete.
|
// Wait until the migration routine is complete.
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
@ -513,9 +527,13 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// check that all 7 intentions are present the general way after migration
|
// check that all 7 intentions are present the general way after migration
|
||||||
checkIntentions(t, s1, false, expectM)
|
require.True(t, t.Run("check migrated intentions", func(t *testing.T) {
|
||||||
// check that no intentions exist in the legacy table
|
checkIntentions(t, s1, false, expectRetainedM)
|
||||||
checkIntentions(t, s1, true, map[string]*structs.Intention{})
|
}))
|
||||||
|
require.True(t, t.Run("check migrated legacy intentions", func(t *testing.T) {
|
||||||
|
// check that no intentions exist in the legacy table
|
||||||
|
checkIntentions(t, s1, true, map[string]*structs.Intention{})
|
||||||
|
}))
|
||||||
|
|
||||||
mapifyConfigs := func(entries interface{}) map[structs.ConfigEntryKindName]*structs.ServiceIntentionsConfigEntry {
|
mapifyConfigs := func(entries interface{}) map[structs.ConfigEntryKindName]*structs.ServiceIntentionsConfigEntry {
|
||||||
m := make(map[structs.ConfigEntryKindName]*structs.ServiceIntentionsConfigEntry)
|
m := make(map[structs.ConfigEntryKindName]*structs.ServiceIntentionsConfigEntry)
|
||||||
|
@ -541,7 +559,7 @@ func TestLeader_LegacyIntentionMigration(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
gotConfigsM := mapifyConfigs(gotConfigs)
|
gotConfigsM := mapifyConfigs(gotConfigs)
|
||||||
|
|
||||||
expectConfigs := structs.MigrateIntentions(ixns)
|
expectConfigs := structs.MigrateIntentions(retained)
|
||||||
for _, entry := range expectConfigs {
|
for _, entry := range expectConfigs {
|
||||||
require.NoError(t, entry.LegacyNormalize()) // tidy them up the same way the write would
|
require.NoError(t, entry.LegacyNormalize()) // tidy them up the same way the write would
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,8 +12,17 @@ import (
|
||||||
type LeaderRoutine func(ctx context.Context) error
|
type LeaderRoutine func(ctx context.Context) error
|
||||||
|
|
||||||
type leaderRoutine struct {
|
type leaderRoutine struct {
|
||||||
running bool
|
cancel context.CancelFunc
|
||||||
cancel context.CancelFunc
|
stoppedCh chan struct{} // closed when no longer running
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *leaderRoutine) running() bool {
|
||||||
|
select {
|
||||||
|
case <-r.stoppedCh:
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type LeaderRoutineManager struct {
|
type LeaderRoutineManager struct {
|
||||||
|
@ -41,7 +50,7 @@ func (m *LeaderRoutineManager) IsRunning(name string) bool {
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
if routine, ok := m.routines[name]; ok {
|
if routine, ok := m.routines[name]; ok {
|
||||||
return routine.running
|
return routine.running()
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
|
@ -55,7 +64,7 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
if instance, ok := m.routines[name]; ok && instance.running {
|
if instance, ok := m.routines[name]; ok && instance.running() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,11 +74,15 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(parentCtx)
|
ctx, cancel := context.WithCancel(parentCtx)
|
||||||
instance := &leaderRoutine{
|
instance := &leaderRoutine{
|
||||||
running: true,
|
cancel: cancel,
|
||||||
cancel: cancel,
|
stoppedCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
close(instance.stoppedCh)
|
||||||
|
}()
|
||||||
|
|
||||||
err := routine(ctx)
|
err := routine(ctx)
|
||||||
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
||||||
m.logger.Error("routine exited with error",
|
m.logger.Error("routine exited with error",
|
||||||
|
@ -79,10 +92,6 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
||||||
} else {
|
} else {
|
||||||
m.logger.Debug("stopped routine", "routine", name)
|
m.logger.Debug("stopped routine", "routine", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.lock.Lock()
|
|
||||||
instance.running = false
|
|
||||||
m.lock.Unlock()
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
m.routines[name] = instance
|
m.routines[name] = instance
|
||||||
|
@ -90,7 +99,19 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LeaderRoutineManager) Stop(name string) error {
|
func (m *LeaderRoutineManager) Stop(name string) <-chan struct{} {
|
||||||
|
instance := m.stopInstance(name)
|
||||||
|
if instance == nil {
|
||||||
|
// Fabricate a closed channel so it won't block forever.
|
||||||
|
ch := make(chan struct{})
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
return instance.stoppedCh
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LeaderRoutineManager) stopInstance(name string) *leaderRoutine {
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
|
@ -100,15 +121,16 @@ func (m *LeaderRoutineManager) Stop(name string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !instance.running {
|
if !instance.running() {
|
||||||
return nil
|
return instance
|
||||||
}
|
}
|
||||||
|
|
||||||
m.logger.Debug("stopping routine", "routine", name)
|
m.logger.Debug("stopping routine", "routine", name)
|
||||||
instance.cancel()
|
instance.cancel()
|
||||||
|
|
||||||
delete(m.routines, name)
|
delete(m.routines, name)
|
||||||
return nil
|
|
||||||
|
return instance
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *LeaderRoutineManager) StopAll() {
|
func (m *LeaderRoutineManager) StopAll() {
|
||||||
|
@ -116,7 +138,7 @@ func (m *LeaderRoutineManager) StopAll() {
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
for name, routine := range m.routines {
|
for name, routine := range m.routines {
|
||||||
if !routine.running {
|
if !routine.running() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m.logger.Debug("stopping routine", "routine", name)
|
m.logger.Debug("stopping routine", "routine", name)
|
||||||
|
|
|
@ -36,7 +36,9 @@ func TestLeaderRoutineManager(t *testing.T) {
|
||||||
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
|
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
|
||||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||||
})
|
})
|
||||||
require.NoError(t, mgr.Stop("run"))
|
doneCh := mgr.Stop("run")
|
||||||
|
require.NotNil(t, doneCh)
|
||||||
|
<-doneCh
|
||||||
|
|
||||||
// ensure the background go routine was actually cancelled
|
// ensure the background go routine was actually cancelled
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
@ -51,7 +53,10 @@ func TestLeaderRoutineManager(t *testing.T) {
|
||||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, mgr.Stop("run"))
|
doneCh = mgr.Stop("run")
|
||||||
|
require.NotNil(t, doneCh)
|
||||||
|
<-doneCh
|
||||||
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
|
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue