mirror of https://github.com/status-im/consul.git
Watch fallback channel for gateways that do not exist (#7715)
Also ensure that WatchSets in tests are reset between calls to watchFired. Any time a watch fires, subsequent calls to watchFired on the same WatchSet will also return true even if there were no changes.
This commit is contained in:
parent
dbbaf135ff
commit
021f0ee36e
|
@ -1000,6 +1000,31 @@ func (s *Store) maxIndexAndWatchChForService(tx *memdb.Txn, serviceName string,
|
|||
return s.catalogMaxIndex(tx, entMeta, checks), nil
|
||||
}
|
||||
|
||||
// Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes
|
||||
func (s *Store) maxIndexAndWatchChsForServiceNodes(tx *memdb.Txn,
|
||||
nodes structs.ServiceNodes, watchChecks bool, entMeta *structs.EnterpriseMeta) (uint64, []<-chan struct{}) {
|
||||
|
||||
var watchChans []<-chan struct{}
|
||||
var maxIdx uint64
|
||||
|
||||
seen := make(map[string]bool)
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
svc := nodes[i].ServiceName
|
||||
if ok := seen[svc]; !ok {
|
||||
idx, svcCh := s.maxIndexAndWatchChForService(tx, svc, true, watchChecks, entMeta)
|
||||
if idx > maxIdx {
|
||||
maxIdx = idx
|
||||
}
|
||||
if svcCh != nil {
|
||||
watchChans = append(watchChans, svcCh)
|
||||
}
|
||||
seen[svc] = true
|
||||
}
|
||||
}
|
||||
|
||||
return maxIdx, watchChans
|
||||
}
|
||||
|
||||
// ConnectServiceNodes returns the nodes associated with a Connect
|
||||
// compatible destination for the given service name. This will include
|
||||
// both proxies and native integrations.
|
||||
|
@ -1040,7 +1065,7 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
|
|||
var idx uint64
|
||||
if connect {
|
||||
// Look up gateway nodes associated with the service
|
||||
gwIdx, nodes, chs, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
|
||||
gwIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
|
||||
}
|
||||
|
@ -1048,9 +1073,15 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
|
|||
idx = gwIdx
|
||||
}
|
||||
|
||||
for _, ch := range chs {
|
||||
// Watch for index changes to the gateway nodes
|
||||
svcIdx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false, entMeta)
|
||||
if svcIdx > idx {
|
||||
idx = svcIdx
|
||||
}
|
||||
for _, ch := range chans {
|
||||
ws.Add(ch)
|
||||
}
|
||||
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
results = append(results, nodes[i])
|
||||
}
|
||||
|
@ -1963,15 +1994,19 @@ func (s *Store) CheckConnectServiceNodes(ws memdb.WatchSet, serviceName string,
|
|||
func (s *Store) CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
maxIdx, nodes, watchChs, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindIngressGateway, entMeta)
|
||||
|
||||
maxIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindIngressGateway, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
|
||||
}
|
||||
|
||||
// TODO(ingress) : Deal with incorporating index from mapping table
|
||||
|
||||
// Watch list of gateway nodes for changes
|
||||
for _, ch := range watchChs {
|
||||
// Watch for index changes to the gateway nodes
|
||||
idx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false, entMeta)
|
||||
if idx > maxIdx {
|
||||
maxIdx = idx
|
||||
}
|
||||
for _, ch := range chans {
|
||||
ws.Add(ch)
|
||||
}
|
||||
|
||||
|
@ -2045,7 +2080,7 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
|
|||
var idx uint64
|
||||
if connect {
|
||||
// Look up gateway nodes associated with the service
|
||||
gwIdx, nodes, _, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
|
||||
gwIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err)
|
||||
}
|
||||
|
@ -2100,7 +2135,10 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
|
|||
// use target serviceName here but it actually doesn't matter. No chan will
|
||||
// be returned as we can't use the optimization in this case (and don't need
|
||||
// to as there is only one chan to watch anyway).
|
||||
idx, _ = s.maxIndexAndWatchChForService(tx, serviceName, false, true, entMeta)
|
||||
svcIdx, _ := s.maxIndexAndWatchChForService(tx, serviceName, false, true, entMeta)
|
||||
if idx < svcIdx {
|
||||
idx = svcIdx
|
||||
}
|
||||
}
|
||||
|
||||
// Create a nil watchset to pass below, we'll only pass the real one if we
|
||||
|
@ -2664,11 +2702,11 @@ func (s *Store) gatewayServices(tx *memdb.Txn, name string, entMeta *structs.Ent
|
|||
// TODO(ingress): How to handle index rolling back when a config entry is
|
||||
// deleted that references a service?
|
||||
// We might need something like the service_last_extinction index?
|
||||
func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, []<-chan struct{}, error) {
|
||||
func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
|
||||
// Look up gateway name associated with the service
|
||||
gws, err := s.serviceGateways(tx, service, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, nil, fmt.Errorf("failed gateway lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed gateway lookup: %s", err)
|
||||
}
|
||||
|
||||
// Adding this channel to the WatchSet means that the watch will fire if a config entry targeting the service is added.
|
||||
|
@ -2676,7 +2714,6 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st
|
|||
ws.Add(gws.WatchCh())
|
||||
|
||||
var ret structs.ServiceNodes
|
||||
var watchChans []<-chan struct{}
|
||||
var maxIdx uint64
|
||||
|
||||
for gateway := gws.Next(); gateway != nil; gateway = gws.Next() {
|
||||
|
@ -2693,7 +2730,7 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st
|
|||
// Look up nodes for gateway
|
||||
gwServices, err := s.catalogServiceNodeList(tx, mapping.Gateway.ID, "service", &mapping.Gateway.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return 0, nil, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
||||
var exists bool
|
||||
|
@ -2711,7 +2748,10 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st
|
|||
maxIdx = svcIdx
|
||||
}
|
||||
|
||||
watchChans = append(watchChans, gwServices.WatchCh())
|
||||
// Ensure that blocking queries wake up if the gateway-service mapping exists, but the gateway does not exist yet
|
||||
if !exists {
|
||||
ws.Add(gwServices.WatchCh())
|
||||
}
|
||||
}
|
||||
return maxIdx, ret, watchChans, nil
|
||||
return maxIdx, ret, nil
|
||||
}
|
||||
|
|
|
@ -2151,6 +2151,11 @@ func TestStateStore_ConnectServiceNodes_Gateways(t *testing.T) {
|
|||
assert.Nil(s.EnsureService(15, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000}))
|
||||
assert.True(watchFired(ws))
|
||||
|
||||
// Reset WatchSet to ensure watch fires when associating db with gateway
|
||||
ws = memdb.NewWatchSet()
|
||||
_, _, err = s.ConnectServiceNodes(ws, "db", nil)
|
||||
assert.Nil(err)
|
||||
|
||||
// Associate gateway with db
|
||||
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
|
||||
assert.Nil(s.EnsureConfigEntry(17, &structs.TerminatingGatewayConfigEntry{
|
||||
|
@ -2190,10 +2195,16 @@ func TestStateStore_ConnectServiceNodes_Gateways(t *testing.T) {
|
|||
assert.Nil(s.EnsureService(18, "foo", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway-2", Service: "gateway", Port: 443}))
|
||||
assert.True(watchFired(ws))
|
||||
|
||||
// Reset WatchSet to ensure watch fires when deregistering gateway
|
||||
ws = memdb.NewWatchSet()
|
||||
_, _, err = s.ConnectServiceNodes(ws, "db", nil)
|
||||
assert.Nil(err)
|
||||
|
||||
// Watch should fire when a gateway instance is de-registered
|
||||
assert.Nil(s.DeleteService(19, "bar", "gateway", nil))
|
||||
assert.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, nodes, err = s.ConnectServiceNodes(ws, "db", nil)
|
||||
assert.Nil(err)
|
||||
assert.Equal(idx, uint64(19))
|
||||
|
@ -3600,6 +3611,12 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) {
|
|||
}, nil))
|
||||
assert.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil)
|
||||
assert.Nil(err)
|
||||
assert.Equal(idx, uint64(18))
|
||||
assert.Len(nodes, 0)
|
||||
|
||||
// Watch should fire when a gateway is added
|
||||
assert.Nil(s.EnsureService(19, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
|
||||
assert.True(watchFired(ws))
|
||||
|
@ -3638,6 +3655,7 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) {
|
|||
assert.Nil(s.EnsureService(22, "foo", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway-2", Service: "gateway", Port: 443}))
|
||||
assert.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil)
|
||||
assert.Nil(err)
|
||||
assert.Equal(idx, uint64(22))
|
||||
|
@ -3647,6 +3665,7 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) {
|
|||
assert.Nil(s.DeleteService(23, "bar", "gateway", nil))
|
||||
assert.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil)
|
||||
assert.Nil(err)
|
||||
assert.Equal(idx, uint64(23))
|
||||
|
@ -4574,6 +4593,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
|
|||
assert.Nil(t, s.EnsureService(23, "bar", &structs.NodeService{ID: "redis", Service: "redis", Tags: nil, Address: "", Port: 6379}))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, out, err = s.GatewayServices(ws, "gateway", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(23))
|
||||
|
@ -4623,6 +4643,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
|
|||
assert.Nil(t, s.DeleteService(24, "bar", "redis", nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, out, err = s.GatewayServices(ws, "gateway", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(24))
|
||||
|
@ -4696,6 +4717,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
|
|||
},
|
||||
}, nil))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, out, err = s.GatewayServices(ws, "gateway2", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(26))
|
||||
|
@ -4738,13 +4760,6 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
|
|||
func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Listing with no results returns an empty list.
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, nodes, err := s.GatewayServices(ws, "gateway", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(0))
|
||||
assert.Len(t, nodes, 0)
|
||||
|
||||
// Create some nodes
|
||||
assert.Nil(t, s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
|
||||
assert.Nil(t, s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
|
||||
|
@ -4758,6 +4773,13 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
|
|||
assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443}))
|
||||
assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443}))
|
||||
|
||||
// Listing with no results returns an empty list.
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, nodes, err := s.GatewayServices(ws, "gateway", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(0))
|
||||
assert.Len(t, nodes, 0)
|
||||
|
||||
// Associate the first gateway with db
|
||||
assert.Nil(t, s.EnsureConfigEntry(19, &structs.TerminatingGatewayConfigEntry{
|
||||
Kind: "terminating-gateway",
|
||||
|
@ -4771,7 +4793,14 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
|
|||
}, nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
// Associate the other gateway with a wildcard
|
||||
// Listing with no results returns an empty list.
|
||||
otherWS := memdb.NewWatchSet()
|
||||
idx, _, err = s.GatewayServices(otherWS, "other-gateway", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(19))
|
||||
assert.Len(t, nodes, 0)
|
||||
|
||||
// Associate the second gateway with wildcard
|
||||
assert.Nil(t, s.EnsureConfigEntry(20, &structs.TerminatingGatewayConfigEntry{
|
||||
Kind: "terminating-gateway",
|
||||
Name: "other-gateway",
|
||||
|
@ -4805,7 +4834,7 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) {
|
|||
assert.Equal(t, expect, out)
|
||||
|
||||
// Read everything back for other gateway.
|
||||
otherWS := memdb.NewWatchSet()
|
||||
otherWS = memdb.NewWatchSet()
|
||||
idx, out, err = s.GatewayServices(otherWS, "other-gateway", nil)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, idx, uint64(20))
|
||||
|
@ -4925,6 +4954,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("check service3 ingress gateway", func(t *testing.T) {
|
||||
ws := memdb.NewWatchSet()
|
||||
idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(11), idx)
|
||||
|
@ -4935,6 +4965,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) {
|
|||
t.Run("delete a wildcard entry", func(t *testing.T) {
|
||||
require.Nil(s.DeleteConfigEntry(19, "ingress-gateway", "wildcardIngress", nil))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(13), idx)
|
||||
|
@ -4990,6 +5021,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("wildcard gateway services", func(t *testing.T) {
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(14), idx)
|
||||
|
@ -5008,6 +5040,8 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
|
|||
t.Run("deregistering a service", func(t *testing.T) {
|
||||
require.Nil(s.DeleteService(18, "node1", "service1", nil))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(18), idx)
|
||||
|
@ -5018,6 +5052,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
|
|||
// bug in DeleteService where we delete are entries associated
|
||||
// to a service, not just an entry created by a wildcard.
|
||||
// t.Run("check ingress2 gateway services again", func(t *testing.T) {
|
||||
// ws = memdb.NewWatchSet()
|
||||
// idx, results, err := s.GatewayServices(ws, "ingress2", nil)
|
||||
// require.NoError(err)
|
||||
// require.Equal(uint64(18), idx)
|
||||
|
@ -5030,6 +5065,8 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
|
|||
t.Run("deleting a wildcard config entry", func(t *testing.T) {
|
||||
require.Nil(s.DeleteConfigEntry(19, "ingress-gateway", "wildcardIngress", nil))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(19), idx)
|
||||
|
@ -5044,6 +5081,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) {
|
|||
}
|
||||
require.Nil(s.EnsureConfigEntry(20, ingress1, nil))
|
||||
require.True(watchFired(ws))
|
||||
|
||||
idx, results, err := s.GatewayServices(ws, "ingress1", nil)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(20), idx)
|
||||
|
@ -5130,7 +5168,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
|
|||
testRegisterService(t, s, 10, "node2", "service3")
|
||||
|
||||
// Register some ingress config entries.
|
||||
|
||||
wildcardIngress := &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
Name: "wildcardIngress",
|
||||
|
@ -5148,7 +5185,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
|
|||
}
|
||||
assert.NoError(t, s.EnsureConfigEntry(11, wildcardIngress, nil))
|
||||
|
||||
assert.True(t, watchFired(ws))
|
||||
ingress1 := &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
Name: "ingress1",
|
||||
|
@ -5174,7 +5210,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
|
|||
},
|
||||
}
|
||||
assert.NoError(t, s.EnsureConfigEntry(12, ingress1, nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
ingress2 := &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
|
@ -5192,7 +5227,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
|
|||
},
|
||||
}
|
||||
assert.NoError(t, s.EnsureConfigEntry(13, ingress2, nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
nothingIngress := &structs.IngressGatewayConfigEntry{
|
||||
Kind: "ingress-gateway",
|
||||
|
@ -5200,7 +5234,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet {
|
|||
Listeners: []structs.IngressListener{},
|
||||
}
|
||||
assert.NoError(t, s.EnsureConfigEntry(14, nothingIngress, nil))
|
||||
assert.True(t, watchFired(ws))
|
||||
|
||||
return ws
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue