state: add first terminating catalog catalog event

Health of a terminating gateway instance changes
- Generate an event for creating/destroying this instance of the terminating gateway,
  duplicate it for each affected service

Co-Authored-By: Kyle Havlovitz <kylehav@gmail.com>
This commit is contained in:
Daniel Nephin 2020-12-17 17:46:24 -05:00
parent 3e882dde12
commit 28de159c14
2 changed files with 116 additions and 6 deletions

View File

@ -267,7 +267,11 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
// Duplicate any events that affected connect-enabled instances (proxies or // Duplicate any events that affected connect-enabled instances (proxies or
// native apps) to the relevant Connect topic. // native apps) to the relevant Connect topic.
events = append(events, serviceHealthToConnectEvents(events...)...) connectEvents, err := serviceHealthToConnectEvents(tx, events...)
if err != nil {
return nil, err
}
events = append(events, connectEvents...)
return events, nil return events, nil
} }
@ -318,10 +322,13 @@ func changeTypeFromChange(change memdb.Change) changeType {
// enabled and so of no interest to those subscribers but also involves // enabled and so of no interest to those subscribers but also involves
// switching connection details to be the proxy instead of the actual instance // switching connection details to be the proxy instead of the actual instance
// in case of a sidecar. // in case of a sidecar.
func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { func serviceHealthToConnectEvents(
tx ReadTxn,
events ...stream.Event,
) ([]stream.Event, error) {
var result []stream.Event var result []stream.Event
for _, event := range events { for _, event := range events {
if event.Topic != topicServiceHealth { if event.Topic != topicServiceHealth { // event.Topic == topicServiceHealthConnect
// Skip non-health or any events already emitted to Connect topic // Skip non-health or any events already emitted to Connect topic
continue continue
} }
@ -343,13 +350,33 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
connectEvent.Payload = payload connectEvent.Payload = payload
result = append(result, connectEvent) result = append(result, connectEvent)
case node.Service.Kind == structs.ServiceKindTerminatingGateway:
iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta)
if err != nil {
return nil, err
}
// similar to checkServiceNodesTxn -> serviceGatewayNodes
for obj := iter.Next(); obj != nil; obj = iter.Next() {
result = append(result, copyEventForService(event, obj.(*structs.GatewayService).Service))
}
default: default:
// ServiceKindTerminatingGateway changes are handled separately.
// All other cases are not relevant to the connect topic // All other cases are not relevant to the connect topic
} }
} }
return result return result, nil
}
func copyEventForService(event stream.Event, service structs.ServiceName) stream.Event {
event.Topic = topicServiceHealthConnect
payload := event.Payload.(EventPayloadCheckServiceNode)
payload.key = service.Name
event.Payload = payload
// FIXME: we need payload to have an override for namespace, so that it can be filtered
// properly by EventPayloadCheckServiceNode.MatchesKey
return event
} }
func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode { func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode {

View File

@ -1001,6 +1001,56 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged), testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged),
}, },
}, },
{
Name: "terminating gateway registered with no config entry",
Mutate: func(s *Store, tx *txn) error {
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1")),
},
},
{
Name: "terminating gateway registered after config entry exists",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "srv1",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Name: "srv2",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
return ensureConfigEntryTxn(tx, tx.Index, configEntry, structs.DefaultEnterpriseMeta())
},
Mutate: func(s *Store, tx *txn) error {
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evServiceTermingGateway("tgate1")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv2")),
},
},
} }
for _, tc := range cases { for _, tc := range cases {
@ -1037,6 +1087,39 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
} }
} }
func regTerminatingGateway(req *structs.RegisterRequest) error {
req.Service.Service = "tgate1"
req.Service.Kind = structs.ServiceKindTerminatingGateway
req.Service.ID = "tgate1"
req.Service.Port = 22000
return nil
}
func evServiceTermingGateway(name string) func(e *stream.Event) error {
return func(e *stream.Event) error {
csn := getPayloadCheckServiceNode(e.Payload)
csn.Service.Kind = structs.ServiceKindTerminatingGateway
csn.Service.Port = 22000
// Convert the check to point to the right ID now. This isn't totally
// realistic - sidecars should have alias checks etc but this is good enough
// to test this code path.
//if len(csn.Checks) >= 2 {
// csn.Checks[1].CheckID = types.CheckID("service:" + svc + "_terminating_gateway")
// csn.Checks[1].ServiceID = svc + "_terminating_gateway"
// csn.Checks[1].ServiceName = svc + "_terminating_gateway"
//}
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = name
e.Payload = payload
}
return nil
}
}
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper() t.Helper()
if diff := cmp.Diff(x, y, opts...); diff != "" { if diff := cmp.Diff(x, y, opts...); diff != "" {
@ -1302,7 +1385,7 @@ func evConnectNative(e *stream.Event) error {
// evConnectTopic option converts the base event to the equivalent event that // evConnectTopic option converts the base event to the equivalent event that
// should be published to the connect topic. When needed it should be applied // should be published to the connect topic. When needed it should be applied
// first as several other options (notable evSidecar) change behavior subtly // first as several other options (notable evSidecar) change behavior subtly
// depending on which topic they are published to and they determin this from // depending on which topic they are published to and they determine this from
// the event. // the event.
func evConnectTopic(e *stream.Event) error { func evConnectTopic(e *stream.Event) error {
e.Topic = topicServiceHealthConnect e.Topic = topicServiceHealthConnect