diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 71a31d2725..43655ffc16 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -900,18 +900,19 @@ func maxIndexAndWatchChsForServiceNodes(tx ReadTxn, // compatible destination for the given service name. This will include // both proxies and native integrations. func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { - return s.serviceNodes(ws, serviceName, true, entMeta) + tx := s.db.ReadTxn() + defer tx.Abort() + return serviceNodesTxn(tx, ws, serviceName, true, entMeta) } // ServiceNodes returns the nodes associated with a given service name. func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { - return s.serviceNodes(ws, serviceName, false, entMeta) + tx := s.db.ReadTxn() + defer tx.Abort() + return serviceNodesTxn(tx, ws, serviceName, false, entMeta) } -func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { - tx := s.db.Txn(false) - defer tx.Abort() - +func serviceNodesTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, connect bool, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) { // Function for lookup index := "service" if connect { diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index b7fc6118d0..4860141445 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -210,38 +210,29 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change} + if termGatewayChanges == nil { termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange) } - gatewayChanges, ok := termGatewayChanges[gs.Gateway] + _, ok := termGatewayChanges[gs.Gateway] if !ok { termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{} } - prevChange, ok := gatewayChanges[gs.Service] - if !ok { + switch gsChange.changeType { + case changeUpdate: + after := gsChange.change.After.(*structs.GatewayService) + if gsChange.change.Before.(*structs.GatewayService).IsSame(after) { + continue + } termGatewayChanges[gs.Gateway][gs.Service] = gsChange - continue - } - - if changeTypeFromChange(change) == changeDelete { + case changeDelete, changeCreate: termGatewayChanges[gs.Gateway][gs.Service] = gsChange - continue - } - - prevGs := changeObject(prevChange.change).(*structs.GatewayService) - if !gs.IsSame(prevGs) { - gsChange.changeType = changeUpdate - termGatewayChanges[gs.Gateway][gs.Service] = gsChange - } else { - delete(termGatewayChanges[gs.Gateway], gs.Service) } } } - //fmt.Printf("term gateway map: %v", termGatewayChanges) - // Now act on those marked nodes/services for node, changeType := range nodeChanges { if changeType == changeDelete { @@ -304,7 +295,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event for serviceName, gsChange := range serviceChanges { gs := changeObject(gsChange.change).(*structs.GatewayService) - _, nodes, err := serviceGatewayNodes(tx, nil, serviceName.Name, gs.GatewayKind, &gatewayName.EnterpriseMeta) + _, nodes, err := serviceNodesTxn(tx, nil, gs.Gateway.Name, false, &gatewayName.EnterpriseMeta) if err != nil { return nil, err } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index e2f776c856..e4b3b52c37 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1037,8 +1037,15 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) }, Mutate: func(s *Store, tx *txn) error { - return s.ensureRegistrationTxn(tx, tx.Index, false, - testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + if err := s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false, + ); err != nil { + return err + } + return s.ensureRegistrationTxn( + tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway, regNode2), false) }, WantEvents: []stream.Event{ testServiceHealthEvent(t, @@ -1052,6 +1059,20 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { "tgate1", evConnectTopic, evServiceTermingGateway("srv2")), + testServiceHealthEvent(t, + "tgate1", + evServiceTermingGateway("tgate1"), + evNode2), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1"), + evNode2), + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evNode2), }, }, { @@ -1091,9 +1112,100 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { evServiceIndex(setupIndex)), }, }, - // terminating gateway with 2 instances - // changing config entry to add a linked service - // changing config entry to remove a linked service + { + Name: "change the terminating gateway config entry to add a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv2"), + evServiceIndex(setupIndex)), + }, + }, + { + Name: "change the terminating gateway config entry to remove a linked service", + Setup: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv1", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + err := ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + if err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "tgate1", regTerminatingGateway), false) + }, + Mutate: func(s *Store, tx *txn) error { + configEntry := &structs.TerminatingGatewayConfigEntry{ + Kind: structs.TerminatingGateway, + Name: "tgate1", + Services: []structs.LinkedService{ + { + Name: "srv2", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } + return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta()) + }, + WantEvents: []stream.Event{ + testServiceHealthDeregistrationEvent(t, + "tgate1", + evConnectTopic, + evServiceTermingGateway("srv1")), + }, + }, + // change the terminating gateway config entry to update a linked service (new SNI/CAFile/etc) // deleting a config entry // deregistering a service behind a terminating gateway (should send no term gateway events) } @@ -1127,15 +1239,13 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { } require.NoError(t, err) - assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents) + assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents, cmpopts.EquateEmpty()) }) } } func regTerminatingGateway(req *structs.RegisterRequest) error { - req.Service.Service = "tgate1" req.Service.Kind = structs.ServiceKindTerminatingGateway - req.Service.ID = "tgate1" req.Service.Port = 22000 return nil }