diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 4a088ed8fc..9be0a9daee 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -23,8 +23,8 @@ type EventPayloadCheckServiceNode struct { // key is used to override the key used to filter the payload. It is set for // events in the connect topic to specify the name of the underlying service // when the change event is for a sidecar or gateway. - key string - // FIXME: we need to be able to override the namespace for some terminating gateway events + overrideKey string + overrideNamespace string } func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool { @@ -41,11 +41,15 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool { } name := e.Value.Service.Service - if e.key != "" { - name = e.key + if e.overrideKey != "" { + name = e.overrideKey } ns := e.Value.Service.EnterpriseMeta.GetNamespace() - return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns) + if e.overrideNamespace != "" { + ns = e.overrideNamespace + } + return (key == "" || strings.EqualFold(key, name)) && + (namespace == "" || strings.EqualFold(namespace, ns)) } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -74,7 +78,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { } if connect && n.Service.Kind == structs.ServiceKindConnectProxy { - payload.key = n.Service.Proxy.DestinationServiceName + payload.overrideKey = n.Service.Proxy.DestinationServiceName } event.Payload = payload @@ -306,9 +310,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event e := newServiceHealthEventDeregister(changes.Index, sn) e.Topic = topicServiceHealthConnect - // todo(streaming): make namespace-aware in enterprise payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = serviceName.Name + payload.overrideKey = serviceName.Name + payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace() e.Payload = payload events = append(events, e) @@ -328,9 +332,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } e.Topic = topicServiceHealthConnect - // todo(streaming): make namespace-aware in enterprise payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = serviceName.Name + payload.overrideKey = serviceName.Name + payload.overrideNamespace = serviceName.EnterpriseMeta.GetNamespace() e.Payload = payload events = append(events, e) @@ -362,7 +366,7 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S e := newServiceHealthEventDeregister(idx, before) e.Topic = topicServiceHealthConnect payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = payload.Value.Service.Proxy.DestinationServiceName + payload.overrideKey = payload.Value.Service.Proxy.DestinationServiceName e.Payload = payload return e, true } @@ -419,7 +423,7 @@ func serviceHealthToConnectEvents( case node.Service.Kind == structs.ServiceKindConnectProxy: payload := event.Payload.(EventPayloadCheckServiceNode) - payload.key = node.Service.Proxy.DestinationServiceName + payload.overrideKey = node.Service.Proxy.DestinationServiceName connectEvent.Payload = payload result = append(result, connectEvent) @@ -445,10 +449,8 @@ func serviceHealthToConnectEvents( func copyEventForService(event stream.Event, service structs.ServiceName) stream.Event { event.Topic = topicServiceHealthConnect payload := event.Payload.(EventPayloadCheckServiceNode) - payload.key = service.Name - // FIXME: we need payload to have an override for namespace, so that it can be filtered - // properly by EventPayloadCheckServiceNode.MatchesKey - // payload.enterpriseMeta = service.EnterpriseMeta + payload.overrideKey = service.Name + payload.overrideNamespace = service.EnterpriseMeta.GetNamespace() event.Payload = payload return event } diff --git a/agent/consul/state/catalog_events_oss_test.go b/agent/consul/state/catalog_events_oss_test.go new file mode 100644 index 0000000000..bad6fa817c --- /dev/null +++ b/agent/consul/state/catalog_events_oss_test.go @@ -0,0 +1,7 @@ +// +build !consulent + +package state + +func withServiceHealthEnterpriseCases(cases []serviceHealthTestCase) []serviceHealthTestCase { + return cases +} diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 83d7cd184a..108f6eea73 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -98,7 +98,7 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error { e.Index = counter.Last() ep := e.Payload.(EventPayloadCheckServiceNode) - ep.key = "web" + ep.overrideKey = "web" e.Payload = ep csn := ep.Value csn.Node.CreateIndex = 1 @@ -116,7 +116,7 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error { e.Index = counter.Last() ep := e.Payload.(EventPayloadCheckServiceNode) - ep.key = "web" + ep.overrideKey = "web" e.Payload = ep csn := ep.Value csn.Node.CreateIndex = 4 @@ -173,17 +173,19 @@ func evIndexes(idx, create, modify uint64) func(e *stream.Event) error { } } +type serviceHealthTestCase struct { + Name string + Setup func(s *Store, tx *txn) error + Mutate func(s *Store, tx *txn) error + WantEvents []stream.Event + WantErr bool +} + func TestServiceHealthEventsFromChanges(t *testing.T) { setupIndex := uint64(10) mutateIndex := uint64(100) - cases := []struct { - Name string - Setup func(s *Store, tx *txn) error - Mutate func(s *Store, tx *txn) error - WantEvents []stream.Event - WantErr bool - }{ + cases := []serviceHealthTestCase{ { Name: "irrelevant events", Mutate: func(s *Store, tx *txn) error { @@ -1543,6 +1545,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { }, }, } + cases = withServiceHealthEnterpriseCases(cases) for _, tc := range cases { tc := tc @@ -1602,7 +1605,7 @@ func evServiceTermingGateway(name string) func(e *stream.Event) error { if e.Topic == topicServiceHealthConnect { payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = name + payload.overrideKey = name e.Payload = payload } return nil @@ -1641,7 +1644,7 @@ var cmpPartialOrderEvents = cmp.Options{ key := func(e stream.Event) string { csn := getPayloadCheckServiceNode(e.Payload) // TODO: double check this sort key is correct. - return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service, e.Payload.(EventPayloadCheckServiceNode).key) + return fmt.Sprintf("%s/%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service, e.Payload.(EventPayloadCheckServiceNode).overrideKey) } return key(i) < key(j) }), @@ -1929,7 +1932,7 @@ func evSidecar(e *stream.Event) error { if e.Topic == topicServiceHealthConnect { payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = svc + payload.overrideKey = svc e.Payload = payload } return nil @@ -2018,7 +2021,7 @@ func evRenameService(e *stream.Event) error { if e.Topic == topicServiceHealthConnect { payload := e.Payload.(EventPayloadCheckServiceNode) - payload.key = csn.Service.Proxy.DestinationServiceName + payload.overrideKey = csn.Service.Proxy.DestinationServiceName e.Payload = payload } return nil @@ -2268,14 +2271,42 @@ func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) { }, { name: "override key match", - payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv1"), + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", ""), key: "srv1", namespace: "ns1", expected: true, }, { - name: "override key match", - payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv2"), + name: "override key mismatch", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", ""), + key: "proxy", + namespace: "ns1", + expected: false, + }, + { + name: "override namespace match", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns2"), + key: "proxy", + namespace: "ns2", + expected: true, + }, + { + name: "override namespace mismatch", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns3"), + key: "proxy", + namespace: "ns1", + expected: false, + }, + { + name: "override both key and namespace match", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", "ns2"), + key: "srv1", + namespace: "ns2", + expected: true, + }, + { + name: "override both key and namespace mismatch namespace", + payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", "ns3"), key: "proxy", namespace: "ns1", expected: false, @@ -2300,7 +2331,8 @@ func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServ } } -func newPayloadCheckServiceNodeWithKey(service, namespace, key string) EventPayloadCheckServiceNode { +func newPayloadCheckServiceNodeWithOverride( + service, namespace, overrideKey, overrideNamespace string) EventPayloadCheckServiceNode { return EventPayloadCheckServiceNode{ Value: &structs.CheckServiceNode{ Service: &structs.NodeService{ @@ -2308,6 +2340,7 @@ func newPayloadCheckServiceNodeWithKey(service, namespace, key string) EventPayl EnterpriseMeta: structs.NewEnterpriseMeta(namespace), }, }, - key: key, + overrideKey: overrideKey, + overrideNamespace: overrideNamespace, } }