diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index b3dd31c58d..cd01e69aa2 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1000,6 +1000,31 @@ func (s *Store) maxIndexAndWatchChForService(tx *memdb.Txn, serviceName string, return s.catalogMaxIndex(tx, entMeta, checks), nil } +// Wrapper for maxIndexAndWatchChForService that operates on a list of ServiceNodes +func (s *Store) maxIndexAndWatchChsForServiceNodes(tx *memdb.Txn, + nodes structs.ServiceNodes, watchChecks bool, entMeta *structs.EnterpriseMeta) (uint64, []<-chan struct{}) { + + var watchChans []<-chan struct{} + var maxIdx uint64 + + seen := make(map[string]bool) + for i := 0; i < len(nodes); i++ { + svc := nodes[i].ServiceName + if ok := seen[svc]; !ok { + idx, svcCh := s.maxIndexAndWatchChForService(tx, svc, true, watchChecks, entMeta) + if idx > maxIdx { + maxIdx = idx + } + if svcCh != nil { + watchChans = append(watchChans, svcCh) + } + seen[svc] = true + } + } + + return maxIdx, watchChans +} + // ConnectServiceNodes returns the nodes associated with a Connect // compatible destination for the given service name. This will include // both proxies and native integrations. @@ -1040,7 +1065,7 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool var idx uint64 if connect { // Look up gateway nodes associated with the service - gwIdx, nodes, chs, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta) + gwIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err) } @@ -1048,9 +1073,15 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool idx = gwIdx } - for _, ch := range chs { + // Watch for index changes to the gateway nodes + svcIdx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false, entMeta) + if svcIdx > idx { + idx = svcIdx + } + for _, ch := range chans { ws.Add(ch) } + for i := 0; i < len(nodes); i++ { results = append(results, nodes[i]) } @@ -1963,15 +1994,19 @@ func (s *Store) CheckConnectServiceNodes(ws memdb.WatchSet, serviceName string, func (s *Store) CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() - maxIdx, nodes, watchChs, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindIngressGateway, entMeta) + + maxIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindIngressGateway, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err) } // TODO(ingress) : Deal with incorporating index from mapping table - - // Watch list of gateway nodes for changes - for _, ch := range watchChs { + // Watch for index changes to the gateway nodes + idx, chans := s.maxIndexAndWatchChsForServiceNodes(tx, nodes, false, entMeta) + if idx > maxIdx { + maxIdx = idx + } + for _, ch := range chans { ws.Add(ch) } @@ -2045,7 +2080,7 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa var idx uint64 if connect { // Look up gateway nodes associated with the service - gwIdx, nodes, _, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta) + gwIdx, nodes, err := s.serviceGatewayNodes(tx, ws, serviceName, structs.ServiceKindTerminatingGateway, entMeta) if err != nil { return 0, nil, fmt.Errorf("failed gateway nodes lookup: %v", err) } @@ -2100,7 +2135,10 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa // use target serviceName here but it actually doesn't matter. No chan will // be returned as we can't use the optimization in this case (and don't need // to as there is only one chan to watch anyway). - idx, _ = s.maxIndexAndWatchChForService(tx, serviceName, false, true, entMeta) + svcIdx, _ := s.maxIndexAndWatchChForService(tx, serviceName, false, true, entMeta) + if idx < svcIdx { + idx = svcIdx + } } // Create a nil watchset to pass below, we'll only pass the real one if we @@ -2664,11 +2702,11 @@ func (s *Store) gatewayServices(tx *memdb.Txn, name string, entMeta *structs.Ent // TODO(ingress): How to handle index rolling back when a config entry is // deleted that references a service? // We might need something like the service_last_extinction index? -func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, []<-chan struct{}, error) { +func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { // Look up gateway name associated with the service gws, err := s.serviceGateways(tx, service, entMeta) if err != nil { - return 0, nil, nil, fmt.Errorf("failed gateway lookup: %s", err) + return 0, nil, fmt.Errorf("failed gateway lookup: %s", err) } // Adding this channel to the WatchSet means that the watch will fire if a config entry targeting the service is added. @@ -2676,7 +2714,6 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st ws.Add(gws.WatchCh()) var ret structs.ServiceNodes - var watchChans []<-chan struct{} var maxIdx uint64 for gateway := gws.Next(); gateway != nil; gateway = gws.Next() { @@ -2693,7 +2730,7 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st // Look up nodes for gateway gwServices, err := s.catalogServiceNodeList(tx, mapping.Gateway.ID, "service", &mapping.Gateway.EnterpriseMeta) if err != nil { - return 0, nil, nil, fmt.Errorf("failed service lookup: %s", err) + return 0, nil, fmt.Errorf("failed service lookup: %s", err) } var exists bool @@ -2711,7 +2748,10 @@ func (s *Store) serviceGatewayNodes(tx *memdb.Txn, ws memdb.WatchSet, service st maxIdx = svcIdx } - watchChans = append(watchChans, gwServices.WatchCh()) + // Ensure that blocking queries wake up if the gateway-service mapping exists, but the gateway does not exist yet + if !exists { + ws.Add(gwServices.WatchCh()) + } } - return maxIdx, ret, watchChans, nil + return maxIdx, ret, nil } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 2b184c9570..4da987b93d 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2151,6 +2151,11 @@ func TestStateStore_ConnectServiceNodes_Gateways(t *testing.T) { assert.Nil(s.EnsureService(15, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000})) assert.True(watchFired(ws)) + // Reset WatchSet to ensure watch fires when associating db with gateway + ws = memdb.NewWatchSet() + _, _, err = s.ConnectServiceNodes(ws, "db", nil) + assert.Nil(err) + // Associate gateway with db assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443})) assert.Nil(s.EnsureConfigEntry(17, &structs.TerminatingGatewayConfigEntry{ @@ -2190,10 +2195,16 @@ func TestStateStore_ConnectServiceNodes_Gateways(t *testing.T) { assert.Nil(s.EnsureService(18, "foo", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway-2", Service: "gateway", Port: 443})) assert.True(watchFired(ws)) + // Reset WatchSet to ensure watch fires when deregistering gateway + ws = memdb.NewWatchSet() + _, _, err = s.ConnectServiceNodes(ws, "db", nil) + assert.Nil(err) + // Watch should fire when a gateway instance is de-registered assert.Nil(s.DeleteService(19, "bar", "gateway", nil)) assert.True(watchFired(ws)) + ws = memdb.NewWatchSet() idx, nodes, err = s.ConnectServiceNodes(ws, "db", nil) assert.Nil(err) assert.Equal(idx, uint64(19)) @@ -3600,6 +3611,12 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) { }, nil)) assert.True(watchFired(ws)) + ws = memdb.NewWatchSet() + idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil) + assert.Nil(err) + assert.Equal(idx, uint64(18)) + assert.Len(nodes, 0) + // Watch should fire when a gateway is added assert.Nil(s.EnsureService(19, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443})) assert.True(watchFired(ws)) @@ -3638,6 +3655,7 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) { assert.Nil(s.EnsureService(22, "foo", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway-2", Service: "gateway", Port: 443})) assert.True(watchFired(ws)) + ws = memdb.NewWatchSet() idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil) assert.Nil(err) assert.Equal(idx, uint64(22)) @@ -3647,6 +3665,7 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) { assert.Nil(s.DeleteService(23, "bar", "gateway", nil)) assert.True(watchFired(ws)) + ws = memdb.NewWatchSet() idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil) assert.Nil(err) assert.Equal(idx, uint64(23)) @@ -4574,6 +4593,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) { assert.Nil(t, s.EnsureService(23, "bar", &structs.NodeService{ID: "redis", Service: "redis", Tags: nil, Address: "", Port: 6379})) assert.True(t, watchFired(ws)) + ws = memdb.NewWatchSet() idx, out, err = s.GatewayServices(ws, "gateway", nil) assert.Nil(t, err) assert.Equal(t, idx, uint64(23)) @@ -4623,6 +4643,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) { assert.Nil(t, s.DeleteService(24, "bar", "redis", nil)) assert.True(t, watchFired(ws)) + ws = memdb.NewWatchSet() idx, out, err = s.GatewayServices(ws, "gateway", nil) assert.Nil(t, err) assert.Equal(t, idx, uint64(24)) @@ -4696,6 +4717,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) { }, }, nil)) + ws = memdb.NewWatchSet() idx, out, err = s.GatewayServices(ws, "gateway2", nil) assert.Nil(t, err) assert.Equal(t, idx, uint64(26)) @@ -4738,13 +4760,6 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) { func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) { s := testStateStore(t) - // Listing with no results returns an empty list. - ws := memdb.NewWatchSet() - idx, nodes, err := s.GatewayServices(ws, "gateway", nil) - assert.Nil(t, err) - assert.Equal(t, idx, uint64(0)) - assert.Len(t, nodes, 0) - // Create some nodes assert.Nil(t, s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"})) assert.Nil(t, s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"})) @@ -4758,6 +4773,13 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) { assert.Nil(t, s.EnsureService(17, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443})) assert.Nil(t, s.EnsureService(18, "baz", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "other-gateway", Service: "other-gateway", Port: 443})) + // Listing with no results returns an empty list. + ws := memdb.NewWatchSet() + idx, nodes, err := s.GatewayServices(ws, "gateway", nil) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(0)) + assert.Len(t, nodes, 0) + // Associate the first gateway with db assert.Nil(t, s.EnsureConfigEntry(19, &structs.TerminatingGatewayConfigEntry{ Kind: "terminating-gateway", @@ -4771,7 +4793,14 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) { }, nil)) assert.True(t, watchFired(ws)) - // Associate the other gateway with a wildcard + // Listing with no results returns an empty list. + otherWS := memdb.NewWatchSet() + idx, _, err = s.GatewayServices(otherWS, "other-gateway", nil) + assert.Nil(t, err) + assert.Equal(t, idx, uint64(19)) + assert.Len(t, nodes, 0) + + // Associate the second gateway with wildcard assert.Nil(t, s.EnsureConfigEntry(20, &structs.TerminatingGatewayConfigEntry{ Kind: "terminating-gateway", Name: "other-gateway", @@ -4805,7 +4834,7 @@ func TestStateStore_GatewayServices_ServiceDeletion(t *testing.T) { assert.Equal(t, expect, out) // Read everything back for other gateway. - otherWS := memdb.NewWatchSet() + otherWS = memdb.NewWatchSet() idx, out, err = s.GatewayServices(otherWS, "other-gateway", nil) assert.Nil(t, err) assert.Equal(t, idx, uint64(20)) @@ -4925,6 +4954,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { }) t.Run("check service3 ingress gateway", func(t *testing.T) { + ws := memdb.NewWatchSet() idx, results, err := s.CheckIngressServiceNodes(ws, "service3", nil) require.NoError(err) require.Equal(uint64(11), idx) @@ -4935,6 +4965,7 @@ func TestStateStore_CheckIngressServiceNodes(t *testing.T) { t.Run("delete a wildcard entry", func(t *testing.T) { require.Nil(s.DeleteConfigEntry(19, "ingress-gateway", "wildcardIngress", nil)) require.True(watchFired(ws)) + idx, results, err := s.CheckIngressServiceNodes(ws, "service1", nil) require.NoError(err) require.Equal(uint64(13), idx) @@ -4990,6 +5021,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) { }) t.Run("wildcard gateway services", func(t *testing.T) { + ws = memdb.NewWatchSet() idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil) require.NoError(err) require.Equal(uint64(14), idx) @@ -5008,6 +5040,8 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) { t.Run("deregistering a service", func(t *testing.T) { require.Nil(s.DeleteService(18, "node1", "service1", nil)) require.True(watchFired(ws)) + + ws = memdb.NewWatchSet() idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil) require.NoError(err) require.Equal(uint64(18), idx) @@ -5018,6 +5052,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) { // bug in DeleteService where we delete are entries associated // to a service, not just an entry created by a wildcard. // t.Run("check ingress2 gateway services again", func(t *testing.T) { + // ws = memdb.NewWatchSet() // idx, results, err := s.GatewayServices(ws, "ingress2", nil) // require.NoError(err) // require.Equal(uint64(18), idx) @@ -5030,6 +5065,8 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) { t.Run("deleting a wildcard config entry", func(t *testing.T) { require.Nil(s.DeleteConfigEntry(19, "ingress-gateway", "wildcardIngress", nil)) require.True(watchFired(ws)) + + ws = memdb.NewWatchSet() idx, results, err := s.GatewayServices(ws, "wildcardIngress", nil) require.NoError(err) require.Equal(uint64(19), idx) @@ -5044,6 +5081,7 @@ func TestStateStore_GatewayServices_Ingress(t *testing.T) { } require.Nil(s.EnsureConfigEntry(20, ingress1, nil)) require.True(watchFired(ws)) + idx, results, err := s.GatewayServices(ws, "ingress1", nil) require.NoError(err) require.Equal(uint64(20), idx) @@ -5130,7 +5168,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { testRegisterService(t, s, 10, "node2", "service3") // Register some ingress config entries. - wildcardIngress := &structs.IngressGatewayConfigEntry{ Kind: "ingress-gateway", Name: "wildcardIngress", @@ -5148,7 +5185,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { } assert.NoError(t, s.EnsureConfigEntry(11, wildcardIngress, nil)) - assert.True(t, watchFired(ws)) ingress1 := &structs.IngressGatewayConfigEntry{ Kind: "ingress-gateway", Name: "ingress1", @@ -5174,7 +5210,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { }, } assert.NoError(t, s.EnsureConfigEntry(12, ingress1, nil)) - assert.True(t, watchFired(ws)) ingress2 := &structs.IngressGatewayConfigEntry{ Kind: "ingress-gateway", @@ -5192,7 +5227,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { }, } assert.NoError(t, s.EnsureConfigEntry(13, ingress2, nil)) - assert.True(t, watchFired(ws)) nothingIngress := &structs.IngressGatewayConfigEntry{ Kind: "ingress-gateway", @@ -5200,7 +5234,6 @@ func setupIngressState(t *testing.T, s *Store) memdb.WatchSet { Listeners: []structs.IngressListener{}, } assert.NoError(t, s.EnsureConfigEntry(14, nothingIngress, nil)) - assert.True(t, watchFired(ws)) return ws }