diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 2e6148f73a..b42d47fc64 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -218,17 +218,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event events = append(events, e) } - if before.ServiceKind == structs.ServiceKindConnectProxy && - before.ServiceProxy.DestinationServiceName != after.ServiceProxy.DestinationServiceName { - // Connect proxy changed the service it is representing, need to issue a - // dereg for the old service on the Connect topic. We don't actually need - // to deregister this sidecar service though as it still exists and - // didn't change its name (or if it did that was caught just above). But - // our mechanism for connect events is to convert them so we generate - // the regular one, convert it to Connect topic and then discard the - // original. - e := newServiceHealthEventDeregister(changes.Index, before) - events = append(events, serviceHealthToConnectEvents(e)...) + if e, ok := isConnectProxyDestinationServiceChange(changes.Index, before, after); ok { + events = append(events, e) } } @@ -252,6 +243,22 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event return events, nil } +// isConnectProxyDestinationServiceChange handles the case where a Connect proxy changed +// the service it is proxying. We need to issue a de-registration for the old +// service on the Connect topic. We don't actually need to deregister this sidecar +// service though as it still exists and didn't change its name. +func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.ServiceNode) (stream.Event, bool) { + if before.ServiceKind != structs.ServiceKindConnectProxy || + before.ServiceProxy.DestinationServiceName == after.ServiceProxy.DestinationServiceName { + return stream.Event{}, false + } + + e := newServiceHealthEventDeregister(idx, before) + e.Topic = TopicServiceHealthConnect + e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName + return e, true +} + type changeType uint8 const ( @@ -281,33 +288,35 @@ func changeTypeFromChange(change memdb.Change) changeType { // switching connection details to be the proxy instead of the actual instance // in case of a sidecar. func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { - var serviceHealthConnectEvents []stream.Event + var result []stream.Event for _, event := range events { if event.Topic != TopicServiceHealth { // Skip non-health or any events already emitted to Connect topic continue } node := getPayloadCheckServiceNode(event.Payload) - // TODO: do we need to handle gateways here as well? - if node.Service == nil || - (node.Service.Kind != structs.ServiceKindConnectProxy && !node.Service.Connect.Native) { - // Event is not a service instance (i.e. just a node registration) - // or is not a service that is not connect-enabled in some way. + if node.Service == nil { continue } connectEvent := event connectEvent.Topic = TopicServiceHealthConnect - // If this is a proxy, set the key to the destination service name. - if node.Service.Kind == structs.ServiceKindConnectProxy { - connectEvent.Key = node.Service.Proxy.DestinationServiceName - } + switch { + case node.Service.Connect.Native: + result = append(result, connectEvent) - serviceHealthConnectEvents = append(serviceHealthConnectEvents, connectEvent) + case node.Service.Kind == structs.ServiceKindConnectProxy: + connectEvent.Key = node.Service.Proxy.DestinationServiceName + result = append(result, connectEvent) + + default: + // ServiceKindTerminatingGateway changes are handled separately. + // All other cases are not relevant to the connect topic + } } - return serviceHealthConnectEvents + return result } func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {