mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
Add some extra handling for destination deletes
This commit is contained in:
parent
6580566c3b
commit
fe1fcea34f
@ -11,6 +11,7 @@ import (
|
||||
"github.com/mitchellh/copystructure"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/configentry"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
@ -2000,7 +2001,8 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
||||
}
|
||||
|
||||
if svc.PeerName == "" {
|
||||
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
|
||||
sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta}
|
||||
if err := cleanupGatewayWildcards(tx, idx, sn, false); err != nil {
|
||||
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
|
||||
}
|
||||
}
|
||||
@ -3656,15 +3658,15 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
|
||||
continue
|
||||
}
|
||||
|
||||
supportsIngress, supportsTerminating, err := serviceHasConnectInstances(tx, sn.ServiceName, entMeta)
|
||||
hasConnectInstance, hasNonConnectInstance, err := serviceHasConnectInstances(tx, sn.ServiceName, entMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if service.GatewayKind == structs.ServiceKindIngressGateway && !supportsIngress {
|
||||
if service.GatewayKind == structs.ServiceKindIngressGateway && !hasConnectInstance {
|
||||
continue
|
||||
}
|
||||
if service.GatewayKind == structs.ServiceKindTerminatingGateway && !supportsTerminating {
|
||||
if service.GatewayKind == structs.ServiceKindTerminatingGateway && !hasNonConnectInstance {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -3872,12 +3874,11 @@ func checkGatewayAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, ki
|
||||
return nil
|
||||
}
|
||||
|
||||
func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) error {
|
||||
func cleanupGatewayWildcards(tx WriteTxn, idx uint64, sn structs.ServiceName, cleaningUpDestination bool) error {
|
||||
// Clean up association between service name and gateways if needed
|
||||
sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta}
|
||||
gateways, err := tx.Get(tableGatewayServices, indexService, sn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
|
||||
return fmt.Errorf("failed gateway lookup for %q: %s", sn.Name, err)
|
||||
}
|
||||
|
||||
mappings := make([]*structs.GatewayService, 0)
|
||||
@ -3891,11 +3892,27 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
|
||||
// If there are no connect instances left, ingress gateways with a wildcard entry can remove
|
||||
// their association with it (same with terminating gateways if there are no non-connect
|
||||
// instances left).
|
||||
hasConnectInstance, hasNonConnectInstance, err := serviceHasConnectInstances(tx, svc.ServiceName, &svc.EnterpriseMeta)
|
||||
hasConnectInstance, hasNonConnectInstance, err := serviceHasConnectInstances(tx, sn.Name, &sn.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If we're deleting a service instance but this service is defined as a destination via config entry,
|
||||
// keep the mapping around.
|
||||
hasDestination := false
|
||||
if !cleaningUpDestination {
|
||||
q := configentry.NewKindName(structs.ServiceDefaults, sn.Name, &sn.EnterpriseMeta)
|
||||
existing, err := tx.First(tableConfigEntries, indexID, q)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed config entry lookup: %s", err)
|
||||
}
|
||||
if existing != nil {
|
||||
if entry, ok := existing.(*structs.ServiceConfigEntry); ok && entry.Destination != nil {
|
||||
hasDestination = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Do the updates in a separate loop so we don't trash the iterator.
|
||||
for _, m := range mappings {
|
||||
// Only delete if association was created by a wildcard specifier.
|
||||
@ -3905,7 +3922,7 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
|
||||
if m.GatewayKind == structs.ServiceKindIngressGateway && hasConnectInstance {
|
||||
continue
|
||||
}
|
||||
if m.GatewayKind == structs.ServiceKindTerminatingGateway && hasNonConnectInstance {
|
||||
if m.GatewayKind == structs.ServiceKindTerminatingGateway && (hasNonConnectInstance || hasDestination) {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -3921,7 +3938,7 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
|
||||
} else {
|
||||
kind, err := GatewayServiceKind(tx, m.Service.Name, &m.Service.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get gateway service kind for service %s: %v", svc.ServiceName, err)
|
||||
return fmt.Errorf("failed to get gateway service kind for service %s: %v", sn.Name, err)
|
||||
}
|
||||
checkGatewayAndUpdate(tx, idx, &structs.ServiceName{Name: m.Service.Name, EnterpriseMeta: m.Service.EnterpriseMeta}, kind)
|
||||
}
|
||||
|
@ -5338,13 +5338,70 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, expect, out)
|
||||
|
||||
// Add a destination via config entry and make sure it's picked up by the wildcard.
|
||||
configEntryDest := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "destination1",
|
||||
Destination: &structs.DestinationConfig{Port: 9000, Addresses: []string{"kafka.test.com"}},
|
||||
}
|
||||
assert.NoError(t, s.EnsureConfigEntry(27, configEntryDest))
|
||||
|
||||
idx, out, err = s.GatewayServices(ws, "gateway2", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(27))
|
||||
assert.Len(t, out, 3)
|
||||
|
||||
expectWildcardIncludesDest := structs.GatewayServices{
|
||||
{
|
||||
Service: structs.NewServiceName("api", nil),
|
||||
Gateway: structs.NewServiceName("gateway2", nil),
|
||||
GatewayKind: structs.ServiceKindTerminatingGateway,
|
||||
FromWildcard: true,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 26,
|
||||
ModifyIndex: 26,
|
||||
},
|
||||
},
|
||||
{
|
||||
Service: structs.NewServiceName("db", nil),
|
||||
Gateway: structs.NewServiceName("gateway2", nil),
|
||||
GatewayKind: structs.ServiceKindTerminatingGateway,
|
||||
FromWildcard: true,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 26,
|
||||
ModifyIndex: 26,
|
||||
},
|
||||
},
|
||||
{
|
||||
Service: structs.NewServiceName("destination1", nil),
|
||||
Gateway: structs.NewServiceName("gateway2", nil),
|
||||
GatewayKind: structs.ServiceKindTerminatingGateway,
|
||||
ServiceKind: structs.GatewayServiceKindDestination,
|
||||
FromWildcard: true,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 27,
|
||||
ModifyIndex: 27,
|
||||
},
|
||||
},
|
||||
}
|
||||
assert.ElementsMatch(t, expectWildcardIncludesDest, out)
|
||||
|
||||
// Delete the destination.
|
||||
assert.NoError(t, s.DeleteConfigEntry(28, structs.ServiceDefaults, "destination1", nil))
|
||||
|
||||
idx, out, err = s.GatewayServices(ws, "gateway2", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(28))
|
||||
assert.Len(t, out, 2)
|
||||
assert.Equal(t, expect, out)
|
||||
|
||||
// Deleting the config entry should remove existing mappings
|
||||
assert.Nil(t, s.DeleteConfigEntry(27, "terminating-gateway", "gateway", nil))
|
||||
assert.Nil(t, s.DeleteConfigEntry(29, "terminating-gateway", "gateway", nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
idx, out, err = s.GatewayServices(ws, "gateway", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(27))
|
||||
assert.Equal(t, idx, uint64(29))
|
||||
assert.Len(t, out, 0)
|
||||
}
|
||||
|
||||
|
@ -374,6 +374,9 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *a
|
||||
if err := checkGatewayWildcardsAndUpdate(tx, idx, &serviceName, nil, gsKind); err != nil {
|
||||
return fmt.Errorf("failed updating gateway mapping: %s", err)
|
||||
}
|
||||
if err := cleanupGatewayWildcards(tx, idx, serviceName, true); err != nil {
|
||||
return fmt.Errorf("failed to cleanup gateway mapping: \"%s\"; err: %v", serviceName, err)
|
||||
}
|
||||
if err := checkGatewayAndUpdate(tx, idx, &serviceName, gsKind); err != nil {
|
||||
return fmt.Errorf("failed updating gateway mapping: %s", err)
|
||||
}
|
||||
|
@ -372,7 +372,7 @@ func TestStore_ServiceDefaults_Kind_Destination(t *testing.T) {
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
require.Equal(t, gatewayServices[0].ServiceKind, structs.GatewayServiceKindUnknown)
|
||||
require.Equal(t, structs.GatewayServiceKindUnknown, gatewayServices[0].ServiceKind)
|
||||
|
||||
_, kindServices, err = s.ServiceNamesOfKind(ws, structs.ServiceKindDestination)
|
||||
require.NoError(t, err)
|
||||
@ -710,13 +710,141 @@ func TestStore_ServiceDefaults_Kind_Destination_Wildcard(t *testing.T) {
|
||||
|
||||
require.NoError(t, s.DeleteConfigEntry(6, structs.ServiceDefaults, destination.Name, &destination.EnterpriseMeta))
|
||||
|
||||
//Watch is fired because we transitioned to a destination, by default we assume it's not.
|
||||
// Watch is fired because we deleted the destination - now the mapping should be gone.
|
||||
require.True(t, watchFired(ws))
|
||||
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
require.Equal(t, gatewayServices[0].ServiceKind, structs.GatewayServiceKindUnknown)
|
||||
require.Len(t, gatewayServices, 0)
|
||||
|
||||
t.Run("delete service instance before config entry", func(t *testing.T) {
|
||||
// Set up a service with both a real instance and destination from a config entry.
|
||||
require.NoError(t, s.EnsureNode(7, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
|
||||
require.NoError(t, s.EnsureService(8, "foo", &structs.NodeService{ID: "dest2", Service: "dest2", Tags: nil, Address: "", Port: 5000}))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
require.Equal(t, structs.GatewayServiceKindService, gatewayServices[0].ServiceKind)
|
||||
|
||||
// Register destination; shouldn't change the gateway mapping.
|
||||
destination2 := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "dest2",
|
||||
Destination: &structs.DestinationConfig{},
|
||||
}
|
||||
require.NoError(t, s.EnsureConfigEntry(9, destination2))
|
||||
require.False(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
expected := structs.GatewayServices{
|
||||
{
|
||||
Service: structs.NewServiceName("dest2", nil),
|
||||
Gateway: structs.NewServiceName("Gtwy1", nil),
|
||||
ServiceKind: structs.GatewayServiceKindService,
|
||||
GatewayKind: structs.ServiceKindTerminatingGateway,
|
||||
FromWildcard: true,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 8,
|
||||
ModifyIndex: 8,
|
||||
},
|
||||
},
|
||||
}
|
||||
require.Equal(t, expected, gatewayServices)
|
||||
|
||||
// Delete the service, mapping should still exist.
|
||||
require.NoError(t, s.DeleteService(10, "foo", "dest2", nil, ""))
|
||||
require.False(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
require.Equal(t, expected, gatewayServices)
|
||||
|
||||
// Delete the config entry, mapping should be gone.
|
||||
require.NoError(t, s.DeleteConfigEntry(11, structs.ServiceDefaults, "dest2", &destination.EnterpriseMeta))
|
||||
require.True(t, watchFired(ws))
|
||||
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, gatewayServices)
|
||||
})
|
||||
|
||||
t.Run("delete config entry before service instance", func(t *testing.T) {
|
||||
// Set up a service with both a real instance and destination from a config entry.
|
||||
destination2 := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "dest2",
|
||||
Destination: &structs.DestinationConfig{},
|
||||
}
|
||||
require.NoError(t, s.EnsureConfigEntry(7, destination2))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
expected := structs.GatewayServices{
|
||||
{
|
||||
Service: structs.NewServiceName("dest2", nil),
|
||||
Gateway: structs.NewServiceName("Gtwy1", nil),
|
||||
ServiceKind: structs.GatewayServiceKindDestination,
|
||||
GatewayKind: structs.ServiceKindTerminatingGateway,
|
||||
FromWildcard: true,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 7,
|
||||
ModifyIndex: 7,
|
||||
},
|
||||
},
|
||||
}
|
||||
require.Equal(t, expected, gatewayServices)
|
||||
|
||||
// Register service, only ServiceKind should have changed on the gateway mapping.
|
||||
require.NoError(t, s.EnsureNode(8, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
|
||||
require.NoError(t, s.EnsureService(9, "foo", &structs.NodeService{ID: "dest2", Service: "dest2", Tags: nil, Address: "", Port: 5000}))
|
||||
require.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
expected = structs.GatewayServices{
|
||||
{
|
||||
Service: structs.NewServiceName("dest2", nil),
|
||||
Gateway: structs.NewServiceName("Gtwy1", nil),
|
||||
ServiceKind: structs.GatewayServiceKindService,
|
||||
GatewayKind: structs.ServiceKindTerminatingGateway,
|
||||
FromWildcard: true,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 7,
|
||||
ModifyIndex: 9,
|
||||
},
|
||||
},
|
||||
}
|
||||
require.Equal(t, expected, gatewayServices)
|
||||
|
||||
// Delete the config entry, mapping should still exist.
|
||||
require.NoError(t, s.DeleteConfigEntry(10, structs.ServiceDefaults, "dest2", &destination.EnterpriseMeta))
|
||||
require.False(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
require.Equal(t, expected, gatewayServices)
|
||||
|
||||
// Delete the service, mapping should be gone.
|
||||
require.NoError(t, s.DeleteService(11, "foo", "dest2", nil, ""))
|
||||
require.True(t, watchFired(ws))
|
||||
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, gatewayServices)
|
||||
})
|
||||
}
|
||||
|
||||
func TestStore_Service_TerminatingGateway_Kind_Service_Wildcard(t *testing.T) {
|
||||
@ -774,74 +902,6 @@ func TestStore_Service_TerminatingGateway_Kind_Service_Wildcard(t *testing.T) {
|
||||
require.Len(t, gatewayServices, 0)
|
||||
}
|
||||
|
||||
func TestStore_Service_TerminatingGateway_Kind_Service_Destination_Wildcard(t *testing.T) {
|
||||
s := testConfigStateStore(t)
|
||||
|
||||
Gtwy := &structs.TerminatingGatewayConfigEntry{
|
||||
Kind: structs.TerminatingGateway,
|
||||
Name: "Gtwy1",
|
||||
Services: []structs.LinkedService{
|
||||
{
|
||||
Name: "*",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Create
|
||||
require.NoError(t, s.EnsureConfigEntry(0, Gtwy))
|
||||
|
||||
service := &structs.NodeService{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
Service: "web",
|
||||
}
|
||||
destination := &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "web",
|
||||
Destination: &structs.DestinationConfig{},
|
||||
}
|
||||
|
||||
_, gatewayServices, err := s.GatewayServices(nil, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 0)
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
_, _, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create
|
||||
require.NoError(t, s.EnsureConfigEntry(0, destination))
|
||||
|
||||
_, gatewayServices, err = s.GatewayServices(nil, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
require.Equal(t, gatewayServices[0].ServiceKind, structs.GatewayServiceKindDestination)
|
||||
|
||||
require.NoError(t, s.EnsureNode(0, &structs.Node{Node: "node1"}))
|
||||
require.NoError(t, s.EnsureService(0, "node1", service))
|
||||
|
||||
//Watch is fired because we transitioned to a destination, by default we assume it's not.
|
||||
require.True(t, watchFired(ws))
|
||||
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 1)
|
||||
require.Equal(t, gatewayServices[0].ServiceKind, structs.GatewayServiceKindService)
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
_, _, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, s.DeleteService(6, "node1", service.ID, &service.EnterpriseMeta, ""))
|
||||
|
||||
//Watch is fired because we transitioned to a destination, by default we assume it's not.
|
||||
require.True(t, watchFired(ws))
|
||||
|
||||
_, gatewayServices, err = s.GatewayServices(ws, "Gtwy1", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, gatewayServices, 0)
|
||||
|
||||
}
|
||||
|
||||
func TestStore_ConfigEntry_GraphValidation(t *testing.T) {
|
||||
ensureConfigEntry := func(s *Store, idx uint64, entry structs.ConfigEntry) error {
|
||||
if err := entry.Normalize(); err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user