diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go new file mode 100644 index 0000000000..b42d47fc64 --- /dev/null +++ b/agent/consul/state/catalog_events.go @@ -0,0 +1,475 @@ +package state + +import ( + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +type changeOp int + +const ( + OpDelete changeOp = iota + OpCreate + OpUpdate +) + +type eventPayload struct { + Op changeOp + Obj interface{} +} + +// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot +// of stream.Events that describe the current state of a service health query. +// +// TODO: no tests for this yet +func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc { + return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { + tx := s.db.Txn(false) + defer tx.Abort() + + connect := topic == TopicServiceHealthConnect + // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest + idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil) + if err != nil { + return 0, err + } + + for _, n := range nodes { + event := stream.Event{ + Index: idx, + Topic: topic, + Payload: eventPayload{ + Op: OpCreate, + Obj: &n, + }, + } + + if n.Service != nil { + event.Key = n.Service.Service + } + + // append each event as a separate item so that they can be serialized + // separately, to prevent the encoding of one massive message. + buf.Append([]stream.Event{event}) + } + + return idx, err + } +} + +type nodeServiceTuple struct { + Node string + ServiceID string + EntMeta structs.EnterpriseMeta +} + +func newNodeServiceTupleFromServiceNode(sn *structs.ServiceNode) nodeServiceTuple { + return nodeServiceTuple{ + Node: sn.Node, + ServiceID: sn.ServiceID, + EntMeta: sn.EnterpriseMeta, + } +} + +func newNodeServiceTupleFromServiceHealthCheck(hc *structs.HealthCheck) nodeServiceTuple { + return nodeServiceTuple{ + Node: hc.Node, + ServiceID: hc.ServiceID, + EntMeta: hc.EnterpriseMeta, + } +} + +type serviceChange struct { + changeType changeType + change memdb.Change +} + +var serviceChangeIndirect = serviceChange{changeType: changeIndirect} + +// ServiceHealthEventsFromChanges returns all the service and Connect health +// events that should be emitted given a set of changes to the state store. +func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { + var events []stream.Event + + var nodeChanges map[string]changeType + var serviceChanges map[nodeServiceTuple]serviceChange + + markNode := func(node string, typ changeType) { + if nodeChanges == nil { + nodeChanges = make(map[string]changeType) + } + // If the caller has an actual node mutation ensure we store it even if the + // node is already marked. If the caller is just marking the node dirty + // without a node change, don't overwrite any existing node change we know + // about. + if nodeChanges[node] == changeIndirect { + nodeChanges[node] = typ + } + } + markService := func(key nodeServiceTuple, svcChange serviceChange) { + if serviceChanges == nil { + serviceChanges = make(map[nodeServiceTuple]serviceChange) + } + // If the caller has an actual service mutation ensure we store it even if + // the service is already marked. If the caller is just marking the service + // dirty without a service change, don't overwrite any existing service change we + // know about. + if serviceChanges[key].changeType == changeIndirect { + serviceChanges[key] = svcChange + } + } + + for _, change := range changes.Changes { + switch change.Table { + case "nodes": + // Node changed in some way, if it's not a delete, we'll need to + // re-deliver CheckServiceNode results for all services on that node but + // we mark it anyway because if it _is_ a delete then we need to know that + // later to avoid trying to deliver events when node level checks mark the + // node as "changed". + n := changeObject(change).(*structs.Node) + markNode(n.Node, changeTypeFromChange(change)) + + case "services": + sn := changeObject(change).(*structs.ServiceNode) + srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change} + markService(newNodeServiceTupleFromServiceNode(sn), srvChange) + + case "checks": + // For health we only care about the scope for now to know if it's just + // affecting a single service or every service on a node. There is a + // subtle edge case where the check with same ID changes from being node + // scoped to service scoped or vice versa, in either case we need to treat + // it as affecting all services on the node. + switch { + case change.Updated(): + before := change.Before.(*structs.HealthCheck) + after := change.After.(*structs.HealthCheck) + if after.ServiceID == "" || before.ServiceID == "" { + // check before and/or after is node-scoped + markNode(after.Node, changeIndirect) + } else { + // Check changed which means we just need to emit for the linked + // service. + markService(newNodeServiceTupleFromServiceHealthCheck(after), serviceChangeIndirect) + + // Edge case - if the check with same ID was updated to link to a + // different service ID but the old service with old ID still exists, + // then the old service instance needs updating too as it has one + // fewer checks now. + if before.ServiceID != after.ServiceID { + markService(newNodeServiceTupleFromServiceHealthCheck(before), serviceChangeIndirect) + } + } + + case change.Deleted(), change.Created(): + obj := changeObject(change).(*structs.HealthCheck) + if obj.ServiceID == "" { + // Node level check + markNode(obj.Node, changeIndirect) + } else { + markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect) + } + } + } + } + + // Now act on those marked nodes/services + for node, changeType := range nodeChanges { + if changeType == changeDelete { + // Node deletions are a no-op here since the state store transaction will + // have also removed all the service instances which will be handled in + // the loop below. + continue + } + // Rebuild events for all services on this node + es, err := newServiceHealthEventsForNode(tx, changes.Index, node) + if err != nil { + return nil, err + } + events = append(events, es...) + } + + for tuple, srvChange := range serviceChanges { + // change may be nil if there was a change that _affected_ the service + // like a change to checks but it didn't actually change the service + // record itself. + if srvChange.changeType == changeDelete { + sn := srvChange.change.Before.(*structs.ServiceNode) + e := newServiceHealthEventDeregister(changes.Index, sn) + events = append(events, e) + continue + } + + // Check if this was a service mutation that changed it's name which + // requires special handling even if node changed and new events were + // already published. + if srvChange.changeType == changeUpdate { + before := srvChange.change.Before.(*structs.ServiceNode) + after := srvChange.change.After.(*structs.ServiceNode) + + if before.ServiceName != after.ServiceName { + // Service was renamed, the code below will ensure the new registrations + // go out to subscribers to the new service name topic key, but we need + // to fix up subscribers that were watching the old name by sending + // deregistrations. + e := newServiceHealthEventDeregister(changes.Index, before) + events = append(events, e) + } + + if e, ok := isConnectProxyDestinationServiceChange(changes.Index, before, after); ok { + events = append(events, e) + } + } + + if _, ok := nodeChanges[tuple.Node]; ok { + // We already rebuilt events for everything on this node, no need to send + // a duplicate. + continue + } + // Build service event and append it + e, err := newServiceHealthEventForService(tx, changes.Index, tuple) + if err != nil { + return nil, err + } + events = append(events, e) + } + + // Duplicate any events that affected connect-enabled instances (proxies or + // native apps) to the relevant Connect topic. + events = append(events, serviceHealthToConnectEvents(events...)...) + + return events, nil +} + +// isConnectProxyDestinationServiceChange handles the case where a Connect proxy changed +// the service it is proxying. We need to issue a de-registration for the old +// service on the Connect topic. We don't actually need to deregister this sidecar +// service though as it still exists and didn't change its name. +func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.ServiceNode) (stream.Event, bool) { + if before.ServiceKind != structs.ServiceKindConnectProxy || + before.ServiceProxy.DestinationServiceName == after.ServiceProxy.DestinationServiceName { + return stream.Event{}, false + } + + e := newServiceHealthEventDeregister(idx, before) + e.Topic = TopicServiceHealthConnect + e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName + return e, true +} + +type changeType uint8 + +const ( + // changeIndirect indicates some other object changed which has implications + // for the target object. + changeIndirect changeType = iota + changeDelete + changeCreate + changeUpdate +) + +func changeTypeFromChange(change memdb.Change) changeType { + switch { + case change.Deleted(): + return changeDelete + case change.Created(): + return changeCreate + default: + return changeUpdate + } +} + +// serviceHealthToConnectEvents converts already formatted service health +// registration events into the ones needed to publish to the Connect topic. +// This essentially means filtering out any instances that are not Connect +// enabled and so of no interest to those subscribers but also involves +// switching connection details to be the proxy instead of the actual instance +// in case of a sidecar. +func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { + var result []stream.Event + for _, event := range events { + if event.Topic != TopicServiceHealth { + // Skip non-health or any events already emitted to Connect topic + continue + } + node := getPayloadCheckServiceNode(event.Payload) + if node.Service == nil { + continue + } + + connectEvent := event + connectEvent.Topic = TopicServiceHealthConnect + + switch { + case node.Service.Connect.Native: + result = append(result, connectEvent) + + case node.Service.Kind == structs.ServiceKindConnectProxy: + connectEvent.Key = node.Service.Proxy.DestinationServiceName + result = append(result, connectEvent) + + default: + // ServiceKindTerminatingGateway changes are handled separately. + // All other cases are not relevant to the connect topic + } + } + + return result +} + +func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode { + ep, ok := payload.(eventPayload) + if !ok { + return nil + } + csn, ok := ep.Obj.(*structs.CheckServiceNode) + if !ok { + return nil + } + return csn +} + +// newServiceHealthEventsForNode returns health events for all services on the +// given node. This mirrors some of the the logic in the oddly-named +// parseCheckServiceNodes but is more efficient since we know they are all on +// the same node. +func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) { + // TODO(namespace-streaming): figure out the right EntMeta and mystery arg. + services, err := catalogServiceListByNode(tx, node, nil, false) + if err != nil { + return nil, err + } + + n, checksFunc, err := getNodeAndChecks(tx, node) + if err != nil { + return nil, err + } + + var events []stream.Event + for service := services.Next(); service != nil; service = services.Next() { + sn := service.(*structs.ServiceNode) + + event := newServiceHealthEventRegister(idx, n, sn, checksFunc(sn.ServiceID)) + events = append(events, event) + } + + return events, nil +} + +// getNodeAndNodeChecks returns a the node structure and a function that returns +// the full list of checks for a specific service on that node. +func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc, error) { + // Fetch the node + nodeRaw, err := tx.First("nodes", "id", node) + if err != nil { + return nil, nil, err + } + if nodeRaw == nil { + return nil, nil, ErrMissingNode + } + n := nodeRaw.(*structs.Node) + + // TODO(namespace-streaming): work out what EntMeta is needed here, wildcard? + iter, err := catalogListChecksByNode(tx, node, nil) + if err != nil { + return nil, nil, err + } + + var nodeChecks structs.HealthChecks + var svcChecks map[string]structs.HealthChecks + + for check := iter.Next(); check != nil; check = iter.Next() { + check := check.(*structs.HealthCheck) + if check.ServiceID == "" { + nodeChecks = append(nodeChecks, check) + } else { + if svcChecks == nil { + svcChecks = make(map[string]structs.HealthChecks) + } + svcChecks[check.ServiceID] = append(svcChecks[check.ServiceID], check) + } + } + serviceChecks := func(serviceID string) structs.HealthChecks { + // Create a new slice so that append does not modify the array backing nodeChecks. + result := make(structs.HealthChecks, 0, len(nodeChecks)) + result = append(result, nodeChecks...) + for _, check := range svcChecks[serviceID] { + result = append(result, check) + } + return result + } + return n, serviceChecks, nil +} + +type serviceChecksFunc func(serviceID string) structs.HealthChecks + +func newServiceHealthEventForService(tx ReadTxn, idx uint64, tuple nodeServiceTuple) (stream.Event, error) { + n, checksFunc, err := getNodeAndChecks(tx, tuple.Node) + if err != nil { + return stream.Event{}, err + } + + svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID) + if err != nil { + return stream.Event{}, err + } + + raw := svc.Next() + if raw == nil { + return stream.Event{}, ErrMissingService + } + + sn := raw.(*structs.ServiceNode) + return newServiceHealthEventRegister(idx, n, sn, checksFunc(sn.ServiceID)), nil +} + +func newServiceHealthEventRegister( + idx uint64, + node *structs.Node, + sn *structs.ServiceNode, + checks structs.HealthChecks, +) stream.Event { + csn := &structs.CheckServiceNode{ + Node: node, + Service: sn.ToNodeService(), + Checks: checks, + } + return stream.Event{ + Topic: TopicServiceHealth, + Key: sn.ServiceName, + Index: idx, + Payload: eventPayload{ + Op: OpCreate, + Obj: csn, + }, + } +} + +func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream.Event { + // We actually only need the node name populated in the node part as it's only + // used as a key to know which service was deregistered so don't bother looking + // up the node in the DB. Note that while the ServiceNode does have NodeID + // etc. fields, they are never populated in memdb per the comment on that + // struct and only filled in when we return copies of the result to users. + // This is also important because if the service was deleted as part of a + // whole node deregistering then the node record won't actually exist now + // anyway and we'd have to plumb it through from the changeset above. + csn := &structs.CheckServiceNode{ + Node: &structs.Node{ + Node: sn.Node, + }, + Service: sn.ToNodeService(), + } + + return stream.Event{ + Topic: TopicServiceHealth, + Key: sn.ServiceName, + Index: idx, + Payload: eventPayload{ + Op: OpDelete, + Obj: csn, + }, + } +} diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go new file mode 100644 index 0000000000..5cf610604f --- /dev/null +++ b/agent/consul/state/catalog_events_test.go @@ -0,0 +1,1492 @@ +package state + +import ( + "fmt" + "testing" + + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" + "github.com/stretchr/testify/require" +) + +func TestServiceHealthEventsFromChanges(t *testing.T) { + cases := []struct { + Name string + Setup func(s *Store, tx *txn) error + Mutate func(s *Store, tx *txn) error + WantEvents []stream.Event + WantErr bool + }{ + { + Name: "irrelevant events", + Mutate: func(s *Store, tx *txn) error { + return kvsSetTxn(tx, tx.Index, &structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + }, false) + }, + WantEvents: nil, + WantErr: false, + }, + { + Name: "service reg, new node", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")) + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "web"), + }, + WantErr: false, + }, + { + Name: "service reg, existing node", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")) + }, + WantEvents: []stream.Event{ + // Should only publish new service + testServiceHealthEvent(t, "web", evNodeUnchanged), + }, + WantErr: false, + }, + { + Name: "service dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) + }, + WantEvents: []stream.Event{ + // Should only publish deregistration for that service + testServiceHealthDeregistrationEvent(t, "web"), + }, + WantErr: false, + }, + { + Name: "node dereg", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web")); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteNodeTxn(tx, tx.Index, "node1") + }, + WantEvents: []stream.Event{ + // Should publish deregistration events for all services + testServiceHealthDeregistrationEvent(t, "db"), + testServiceHealthDeregistrationEvent(t, "web"), + }, + WantErr: false, + }, + { + Name: "connect native reg, new node", + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative)) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event as well as a connect + // one. + testServiceHealthEvent(t, "web", evConnectNative), + testServiceHealthEvent(t, "web", evConnectNative, evConnectTopic), + }, + WantErr: false, + }, + { + Name: "connect native reg, existing node", + Setup: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db")) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative)) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event as well as a connect + // one. + testServiceHealthEvent(t, "web", + evNodeUnchanged, + evConnectNative), + testServiceHealthEvent(t, "web", + evNodeUnchanged, + evConnectNative, + evConnectTopic), + }, + WantErr: false, + }, + { + Name: "connect native dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "db")); err != nil { + return err + } + + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regConnectNative)) + }, + Mutate: func(s *Store, tx *txn) error { + return s.deleteServiceTxn(tx, tx.Index, "node1", "web", nil) + }, + WantEvents: []stream.Event{ + // We should see both a regular service dereg event and a connect one + testServiceHealthDeregistrationEvent(t, "web", evConnectNative), + testServiceHealthDeregistrationEvent(t, "web", evConnectNative, evConnectTopic), + }, + WantErr: false, + }, + { + Name: "connect sidecar reg, new node", + Mutate: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, testServiceRegistration(t, "web", regSidecar)) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event for the web service + // another for the sidecar service and a connect event for web. + testServiceHealthEvent(t, "web"), + testServiceHealthEvent(t, "web", evSidecar), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }, + { + Name: "connect sidecar reg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")) + }, + Mutate: func(s *Store, tx *txn) error { + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + WantEvents: []stream.Event{ + // We should see both a regular service health event for the proxy + // service and a connect one for the target service. + testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), + }, + WantErr: false, + }, + { + Name: "connect sidecar dereg, existing node", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Delete only the sidecar + return s.deleteServiceTxn(tx, tx.Index, "node1", "web_sidecar_proxy", nil) + }, + WantEvents: []stream.Event{ + // We should see both a regular service dereg event and a connect one + testServiceHealthDeregistrationEvent(t, "web", evSidecar), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }, + { + Name: "connect sidecar mutate svc", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Change port of the target service instance + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regMutatePort)) + }, + WantEvents: []stream.Event{ + // We should see the service topic update but not connect since proxy + // details didn't change. + testServiceHealthEvent(t, "web", + evMutatePort, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "connect sidecar mutate sidecar", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Change port of the sidecar service instance + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regMutatePort)) + }, + WantEvents: []stream.Event{ + // We should see the proxy service topic update and a connect update + testServiceHealthEvent(t, "web", + evSidecar, + evMutatePort, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evMutatePort, + evServiceMutated, + evChecksUnchanged), + }, + WantErr: false, + }, + { + Name: "connect sidecar rename service", + Setup: func(s *Store, tx *txn) error { + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Change service name but not ID, update proxy too + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regRenameService)); err != nil { + return err + } + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameService)) + }, + WantEvents: []stream.Event{ + // We should see events to deregister the old service instance and the + // old connect instance since we changed topic key for both. Then new + // service and connect registrations. The proxy instance should also + // change since it's not proxying a different service. + testServiceHealthDeregistrationEvent(t, "web"), + testServiceHealthEvent(t, "web", + evRenameService, + evServiceMutated, + evNodeUnchanged, + evChecksMutated, + ), + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evSidecar, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameService, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evRenameService, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "connect sidecar change destination service", + Setup: func(s *Store, tx *txn) error { + // Register a web_changed service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web_changed")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // And a sidecar initially for web, will be moved to target web_changed + // in Mutate. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)) + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the destination service of the proxy without a service + // rename or deleting and recreating the proxy. This is far fetched but + // still valid. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameService)) + }, + WantEvents: []stream.Event{ + // We should only see service health events for the sidecar service + // since the actual target services didn't change. But also should see + // Connect topic dereg for the old name to update existing subscribers + // for Connect/web. + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evSidecar, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameService, + evNodeUnchanged, + evServiceMutated, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeUnchanged, + evRenameService, + evServiceMutated, + evChecksUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node update", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node meta. + return s.ensureRegistrationTxn(tx, tx.Index, false, + testNodeRegistration(t, regNodeMeta)) + }, + WantEvents: []stream.Event{ + // We should see updates for all services and a connect update for the + // sidecar's destination. + testServiceHealthEvent(t, "db", + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeMeta, + evNodeMutated, + evServiceUnchanged, + evChecksUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node rename", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node NAME but not it's ID. We do it for every service + // though since this is effectively what client agent anti-entropy would + // do on a node rename. If we only rename the node it will have no + // services registered afterwards. + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db", regRenameNode)); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regRenameNode)); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regRenameNode)); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Node rename is implemented internally as a node delete and new node + // insert after some renaming validation. So we should see full set of + // new events for health, then the deletions of old services, then the + // connect update and delete pair. + testServiceHealthEvent(t, "db", + evRenameNode, + // Although we delete and re-insert, we do maintain the CreatedIndex + // of the node record from the old one. + evNodeMutated, + ), + testServiceHealthEvent(t, "web", + evRenameNode, + evNodeMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evRenameNode, + evNodeMutated, + ), + // dereg events for old node name services + testServiceHealthDeregistrationEvent(t, "db"), + testServiceHealthDeregistrationEvent(t, "web"), + testServiceHealthDeregistrationEvent(t, "web", evSidecar), + // Connect topic updates are last due to the way we add them + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evRenameNode, + evNodeMutated, + ), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evSidecar), + }, + WantErr: false, + }, + { + Name: "multi-service node check failure", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change only the node-level check status + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regNodeCheckFail)); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "db", + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + // Only the node check changed. This needs to come after evNodeUnchanged + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evNodeChecksMutated, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node service check failure", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Change the service-level check status + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regServiceCheckFail)); err != nil { + return err + } + // Also change the service-level check status for the proxy. This is + // analogous to what would happen with an alias check on the client side + // - the proxies check would get updated at roughly the same time as the + // target service check updates. + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regServiceCheckFail)); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Should only see the events for that one service change, the sidecar + // service and hence the connect topic for that service. + testServiceHealthEvent(t, "web", + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evChecksMutated, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evChecksMutated, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evChecksMutated, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node node-level check delete", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete only the node-level check + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "serf-health", nil); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + testServiceHealthEvent(t, "db", + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evNodeCheckDelete, + evNodeUnchanged, + evServiceUnchanged, + ), + }, + WantErr: false, + }, + { + Name: "multi-service node service check delete", + Setup: func(s *Store, tx *txn) error { + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // Delete the service-level check for the main service + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web", nil); err != nil { + return err + } + // Also delete for a proxy + if err := s.deleteCheckTxn(tx, tx.Index, "node1", "service:web_sidecar_proxy", nil); err != nil { + return err + } + return nil + }, + WantEvents: []stream.Event{ + // Should only see the events for that one service change, the sidecar + // service and hence the connect topic for that service. + testServiceHealthEvent(t, "web", + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + testServiceHealthEvent(t, "web", + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + testServiceHealthEvent(t, "web", + evConnectTopic, + evSidecar, + evServiceCheckFail, + evNodeUnchanged, + evServiceUnchanged, + evServiceCheckDelete, + ), + }, + WantErr: false, + }, + { + Name: "many services on many nodes in one TX", + Setup: func(s *Store, tx *txn) error { + // Node1 + + // Register a db service + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "db")); err != nil { + return err + } + + // Node2 + // Also a web + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regNode2)); err != nil { + return err + } + // With a connect sidecar + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar, regNode2)); err != nil { + return err + } + + return nil + }, + Mutate: func(s *Store, tx *txn) error { + // In one transaction the operator moves the web service and it's + // sidecar from node2 back to node1 and deletes them from node2 + + if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web", nil); err != nil { + return err + } + if err := s.deleteServiceTxn(tx, tx.Index, "node2", "web_sidecar_proxy", nil); err != nil { + return err + } + + // Register those on node1 + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web")); err != nil { + return err + } + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "web", regSidecar)); err != nil { + return err + } + + // And for good measure, add a new connect-native service to node2 + if err := s.ensureRegistrationTxn(tx, tx.Index, false, + testServiceRegistration(t, "api", regConnectNative, regNode2)); err != nil { + return err + } + + return nil + }, + WantEvents: []stream.Event{ + // We should see: + // - service dereg for web and proxy on node2 + // - connect dereg for web on node2 + // - service reg for web and proxy on node1 + // - connect reg for web on node1 + // - service reg for api on node2 + // - connect reg for api on node2 + testServiceHealthDeregistrationEvent(t, "web", evNode2), + testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar), + testServiceHealthDeregistrationEvent(t, "web", + evConnectTopic, + evNode2, + evSidecar, + ), + + testServiceHealthEvent(t, "web", evNodeUnchanged), + testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), + testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), + + testServiceHealthEvent(t, "api", + evNode2, + evConnectNative, + evNodeUnchanged, + ), + testServiceHealthEvent(t, "api", + evNode2, + evConnectTopic, + evConnectNative, + evNodeUnchanged, + ), + }, + WantErr: false, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + s := testStateStore(t) + + if tc.Setup != nil { + // Bypass the publish mechanism for this test or we get into odd + // recursive stuff... + setupTx := s.db.WriteTxn(10) + require.NoError(t, tc.Setup(s, setupTx)) + // Commit the underlying transaction without using wrapped Commit so we + // avoid the whole event publishing system for setup here. It _should_ + // work but it makes debugging test hard as it will call the function + // under test for the setup data... + setupTx.Txn.Commit() + } + + tx := s.db.WriteTxn(100) + require.NoError(t, tc.Mutate(s, tx)) + + // Note we call the func under test directly rather than publishChanges so + // we can test this in isolation. + got, err := ServiceHealthEventsFromChanges(tx, Changes{Changes: tx.Changes(), Index: 100}) + if tc.WantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + // Make sure we have the right events, only taking ordering into account + // where it matters to account for non-determinism. + requireEventsInCorrectPartialOrder(t, tc.WantEvents, got, func(e stream.Event) string { + // We need events affecting unique registrations to be ordered, within a topic + csn := getPayloadCheckServiceNode(e.Payload) + return fmt.Sprintf("%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service) + }) + }) + } +} + +type regOption func(req *structs.RegisterRequest) error + +func testNodeRegistration(t *testing.T, opts ...regOption) *structs.RegisterRequest { + r := &structs.RegisterRequest{ + Datacenter: "dc1", + ID: "11111111-2222-3333-4444-555555555555", + Node: "node1", + Address: "10.10.10.10", + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + CheckID: "serf-health", + Name: "serf-health", + Node: "node1", + Status: api.HealthPassing, + }, + }, + } + for _, opt := range opts { + err := opt(r) + require.NoError(t, err) + } + return r +} + +func testServiceRegistration(t *testing.T, svc string, opts ...regOption) *structs.RegisterRequest { + // note: don't pass opts or they might get applied twice! + r := testNodeRegistration(t) + r.Service = &structs.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + } + r.Checks = append(r.Checks, + &structs.HealthCheck{ + CheckID: types.CheckID("service:" + svc), + Name: "service:" + svc, + Node: "node1", + ServiceID: svc, + ServiceName: svc, + Type: "ttl", + Status: api.HealthPassing, + }) + for _, opt := range opts { + err := opt(r) + require.NoError(t, err) + } + return r +} + +type eventOption func(e *stream.Event) error + +func testServiceHealthEvent(t *testing.T, svc string, opts ...eventOption) stream.Event { + e := newTestEventServiceHealthRegister(100, 1, svc) + + // Normalize a few things that are different in the generic event which was + // based on original code here but made more general. This means we don't have + // to change all the test loads... + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.ID = "11111111-2222-3333-4444-555555555555" + csn.Node.Address = "10.10.10.10" + + for _, opt := range opts { + err := opt(&e) + require.NoError(t, err) + } + return e +} + +func testServiceHealthDeregistrationEvent(t *testing.T, svc string, opts ...eventOption) stream.Event { + e := newTestEventServiceHealthDeregister(100, 1, svc) + for _, opt := range opts { + err := opt(&e) + require.NoError(t, err) + } + return e +} + +// regConnectNative option converts the base registration into a Connect-native +// one. +func regConnectNative(req *structs.RegisterRequest) error { + if req.Service == nil { + return nil + } + req.Service.Connect.Native = true + return nil +} + +// regSidecar option converts the base registration request +// into the registration for it's sidecar service. +func regSidecar(req *structs.RegisterRequest) error { + if req.Service == nil { + return nil + } + svc := req.Service.Service + + req.Service.Kind = structs.ServiceKindConnectProxy + req.Service.ID = svc + "_sidecar_proxy" + req.Service.Service = svc + "_sidecar_proxy" + req.Service.Port = 20000 + req.Service.Port + + req.Service.Proxy.DestinationServiceName = svc + req.Service.Proxy.DestinationServiceID = svc + + // Convert the check to point to the right ID now. This isn't totally + // realistic - sidecars should have alias checks etc but this is good enough + // to test this code path. + if len(req.Checks) >= 2 { + req.Checks[1].CheckID = types.CheckID("service:" + svc + "_sidecar_proxy") + req.Checks[1].ServiceID = svc + "_sidecar_proxy" + } + + return nil +} + +// regNodeCheckFail option converts the base registration request +// into a registration with the node-level health check failing +func regNodeCheckFail(req *structs.RegisterRequest) error { + req.Checks[0].Status = api.HealthCritical + return nil +} + +// regServiceCheckFail option converts the base registration request +// into a registration with the service-level health check failing +func regServiceCheckFail(req *structs.RegisterRequest) error { + req.Checks[1].Status = api.HealthCritical + return nil +} + +// regMutatePort option alters the base registration service port by a relative +// amount to simulate a service change. Can be used with regSidecar since it's a +// relative change (+10). +func regMutatePort(req *structs.RegisterRequest) error { + if req.Service == nil { + return nil + } + req.Service.Port += 10 + return nil +} + +// regRenameService option alters the base registration service name but not +// it's ID simulating a service being renamed while it's ID is maintained +// separately e.g. by a scheduler. This is an edge case but an important one as +// it changes which topic key events propagate. +func regRenameService(req *structs.RegisterRequest) error { + if req.Service == nil { + return nil + } + isSidecar := req.Service.Kind == structs.ServiceKindConnectProxy + + if !isSidecar { + req.Service.Service += "_changed" + // Update service checks + if len(req.Checks) >= 2 { + req.Checks[1].ServiceName += "_changed" + } + return nil + } + // This is a sidecar, it's not really realistic but lets only update the + // fields necessary to make it work again with the new service name to be sure + // we get the right result. This is certainly possible if not likely so a + // valid case. + + // We don't need to update out own details, only the name of the destination + req.Service.Proxy.DestinationServiceName += "_changed" + + return nil +} + +// regRenameNode option alters the base registration node name by adding the +// _changed suffix. +func regRenameNode(req *structs.RegisterRequest) error { + req.Node += "_changed" + for i := range req.Checks { + req.Checks[i].Node = req.Node + } + return nil +} + +// regNode2 option alters the base registration to be on a different node. +func regNode2(req *structs.RegisterRequest) error { + req.Node = "node2" + req.ID = "22222222-2222-3333-4444-555555555555" + for i := range req.Checks { + req.Checks[i].Node = req.Node + } + return nil +} + +// regNodeMeta option alters the base registration node to add some meta data. +func regNodeMeta(req *structs.RegisterRequest) error { + req.NodeMeta = map[string]string{"foo": "bar"} + return nil +} + +// evNodeUnchanged option converts the event to reset the node and node check +// raft indexes to the original value where we expect the node not to have been +// changed in the mutation. +func evNodeUnchanged(e *stream.Event) error { + // If the node wasn't touched, its modified index and check's modified + // indexes should be the original ones. + csn := getPayloadCheckServiceNode(e.Payload) + + // Check this isn't a dereg event with made up/placeholder node info + if csn.Node.CreateIndex == 0 { + return nil + } + csn.Node.CreateIndex = 10 + csn.Node.ModifyIndex = 10 + csn.Checks[0].CreateIndex = 10 + csn.Checks[0].ModifyIndex = 10 + return nil +} + +// evServiceUnchanged option converts the event to reset the service and service +// check raft indexes to the original value where we expect the service record +// not to have been changed in the mutation. +func evServiceUnchanged(e *stream.Event) error { + // If the node wasn't touched, its modified index and check's modified + // indexes should be the original ones. + csn := getPayloadCheckServiceNode(e.Payload) + + csn.Service.CreateIndex = 10 + csn.Service.ModifyIndex = 10 + if len(csn.Checks) > 1 { + csn.Checks[1].CreateIndex = 10 + csn.Checks[1].ModifyIndex = 10 + } + return nil +} + +// evConnectNative option converts the base event to represent a connect-native +// service instance. +func evConnectNative(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Service.Connect.Native = true + return nil +} + +// evConnectTopic option converts the base event to the equivalent event that +// should be published to the connect topic. When needed it should be applied +// first as several other options (notable evSidecar) change behavior subtly +// depending on which topic they are published to and they determin this from +// the event. +func evConnectTopic(e *stream.Event) error { + e.Topic = TopicServiceHealthConnect + return nil +} + +// evSidecar option converts the base event to the health (not connect) event +// expected from the sidecar proxy registration for that service instead. When +// needed it should be applied after any option that changes topic (e.g. +// evConnectTopic) but before other options that might change behavior subtly +// depending on whether it's a sidecar or regular service event (e.g. +// evRenameService). +func evSidecar(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + + svc := csn.Service.Service + + csn.Service.Kind = structs.ServiceKindConnectProxy + csn.Service.ID = svc + "_sidecar_proxy" + csn.Service.Service = svc + "_sidecar_proxy" + csn.Service.Port = 20000 + csn.Service.Port + + csn.Service.Proxy.DestinationServiceName = svc + csn.Service.Proxy.DestinationServiceID = svc + + // Convert the check to point to the right ID now. This isn't totally + // realistic - sidecars should have alias checks etc but this is good enough + // to test this code path. + if len(csn.Checks) >= 2 { + csn.Checks[1].CheckID = types.CheckID("service:" + svc + "_sidecar_proxy") + csn.Checks[1].ServiceID = svc + "_sidecar_proxy" + csn.Checks[1].ServiceName = svc + "_sidecar_proxy" + } + + // Update event key to be the proxy service name, but only if this is not + // already in the connect topic + if e.Topic != TopicServiceHealthConnect { + e.Key = csn.Service.Service + } + return nil +} + +// evMutatePort option alters the base event service port by a relative +// amount to simulate a service change. Can be used with evSidecar since it's a +// relative change (+10). +func evMutatePort(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Service.Port += 10 + return nil +} + +// evNodeMutated option alters the base event node to set it's CreateIndex +// (but not modify index) to the setup index. This expresses that we expect the +// node record originally created in setup to have been mutated during the +// update. +func evNodeMutated(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Node.CreateIndex = 10 + return nil +} + +// evServiceMutated option alters the base event service to set it's CreateIndex +// (but not modify index) to the setup index. This expresses that we expect the +// service record originally created in setup to have been mutated during the +// update. +func evServiceMutated(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Service.CreateIndex = 10 + return nil +} + +// evChecksMutated option alters the base event service check to set it's +// CreateIndex (but not modify index) to the setup index. This expresses that we +// expect the service check records originally created in setup to have been +// mutated during the update. NOTE: this must be sequenced after +// evServiceUnchanged if both are used. +func evChecksMutated(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Checks[1].CreateIndex = 10 + getPayloadCheckServiceNode(e.Payload).Checks[1].ModifyIndex = 100 + return nil +} + +// evNodeChecksMutated option alters the base event node check to set it's +// CreateIndex (but not modify index) to the setup index. This expresses that we +// expect the node check records originally created in setup to have been +// mutated during the update. NOTE: this must be sequenced after evNodeUnchanged +// if both are used. +func evNodeChecksMutated(e *stream.Event) error { + getPayloadCheckServiceNode(e.Payload).Checks[0].CreateIndex = 10 + getPayloadCheckServiceNode(e.Payload).Checks[0].ModifyIndex = 100 + return nil +} + +// evChecksUnchanged option alters the base event service to set all check raft +// indexes to the setup index. This expresses that we expect none of the checks +// to have changed in the update. +func evChecksUnchanged(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + for i := range csn.Checks { + csn.Checks[i].CreateIndex = 10 + csn.Checks[i].ModifyIndex = 10 + } + return nil +} + +// evRenameService option alters the base event service to change the service +// name but not ID simulating an in-place service rename. +func evRenameService(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + isSidecar := csn.Service.Kind == structs.ServiceKindConnectProxy + + if !isSidecar { + csn.Service.Service += "_changed" + // Update service checks + if len(csn.Checks) >= 2 { + csn.Checks[1].ServiceName += "_changed" + } + e.Key += "_changed" + return nil + } + // This is a sidecar, it's not really realistic but lets only update the + // fields necessary to make it work again with the new service name to be sure + // we get the right result. This is certainly possible if not likely so a + // valid case. + + // We don't need to update out own details, only the name of the destination + csn.Service.Proxy.DestinationServiceName += "_changed" + + // If this is the connect topic we need to change the key too + if e.Topic == TopicServiceHealthConnect { + e.Key += "_changed" + } + return nil +} + +// evNodeMeta option alters the base event node to add some meta data. +func evNodeMeta(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.Meta = map[string]string{"foo": "bar"} + return nil +} + +// evRenameNode option alters the base event node name. +func evRenameNode(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.Node += "_changed" + for i := range csn.Checks { + csn.Checks[i].Node = csn.Node.Node + } + return nil +} + +// evNode2 option alters the base event to refer to a different node +func evNode2(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.Node = "node2" + // Only change ID if it's set (e.g. it's not in a deregistration event) + if csn.Node.ID != "" { + csn.Node.ID = "22222222-2222-3333-4444-555555555555" + } + for i := range csn.Checks { + csn.Checks[i].Node = csn.Node.Node + } + return nil +} + +// evNodeCheckFail option alters the base event to set the node-level health +// check to be failing +func evNodeCheckFail(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Checks[0].Status = api.HealthCritical + return nil +} + +// evNodeCheckDelete option alters the base event to remove the node-level +// health check +func evNodeCheckDelete(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + // Ensure this is idempotent as we sometimes get called multiple times.. + if len(csn.Checks) > 0 && csn.Checks[0].ServiceID == "" { + csn.Checks = csn.Checks[1:] + } + return nil +} + +// evServiceCheckFail option alters the base event to set the service-level health +// check to be failing +func evServiceCheckFail(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + csn.Checks[1].Status = api.HealthCritical + return nil +} + +// evServiceCheckDelete option alters the base event to remove the service-level +// health check +func evServiceCheckDelete(e *stream.Event) error { + csn := getPayloadCheckServiceNode(e.Payload) + // Ensure this is idempotent as we sometimes get called multiple times.. + if len(csn.Checks) > 1 && csn.Checks[1].ServiceID != "" { + csn.Checks = csn.Checks[0:1] + } + return nil +} + +// requireEventsInCorrectPartialOrder compares that the expected set of events +// was emitted. It allows for _independent_ events to be emitted in any order - +// this can be important because even though the transaction processing is all +// strictly ordered up until the processing func, grouping multiple updates that +// affect the same logical entity may be necessary and may impose random +// ordering changes on the eventual events if a map is used. We only care that +// events _affecting the same topic and key_ are ordered correctly with respect +// to the "expected" set of events so this helper asserts that. +// +// The caller provides a func that can return a partition key for the given +// event types and we assert that all events with the same partition key are +// deliveries in the same order. Note that this is not necessarily the same as +// topic/key since for example in Catalog only events about a specific service +// _instance_ need to be ordered while topic and key are more general. +func requireEventsInCorrectPartialOrder(t *testing.T, want, got []stream.Event, + partKey func(stream.Event) string) { + t.Helper() + + // Partion both arrays by topic/key + wantParts := make(map[string][]stream.Event) + gotParts := make(map[string][]stream.Event) + + for _, e := range want { + k := partKey(e) + wantParts[k] = append(wantParts[k], e) + } + for _, e := range got { + k := partKey(e) + gotParts[k] = append(gotParts[k], e) + } + + for k, want := range wantParts { + require.Equal(t, want, gotParts[k], "got incorrect events for partition: %s", k) + } + + for k, got := range gotParts { + if _, ok := wantParts[k]; !ok { + require.Equal(t, nil, got, "got unwanted events for partition: %s", k) + } + } +} + +// newTestEventServiceHealthRegister returns a realistically populated service +// health registration event. The nodeNum is a +// logical node and is used to create the node name ("node%d") but also change +// the node ID and IP address to make it a little more realistic for cases that +// need that. nodeNum should be less than 64k to make the IP address look +// realistic. Any other changes can be made on the returned event to avoid +// adding too many options to callers. +func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) stream.Event { + node := fmt.Sprintf("node%d", nodeNum) + nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) + addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) + + return stream.Event{ + Topic: TopicServiceHealth, + Key: svc, + Index: index, + Payload: eventPayload{ + Op: OpCreate, + Obj: &structs.CheckServiceNode{ + Node: &structs.Node{ + ID: nodeID, + Node: node, + Address: addr, + Datacenter: "dc1", + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Service: &structs.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + Checks: []*structs.HealthCheck{ + { + Node: node, + CheckID: "serf-health", + Name: "serf-health", + Status: "passing", + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + { + Node: node, + CheckID: types.CheckID("service:" + svc), + Name: "service:" + svc, + ServiceID: svc, + ServiceName: svc, + Type: "ttl", + Status: "passing", + RaftIndex: structs.RaftIndex{ + CreateIndex: index, + ModifyIndex: index, + }, + }, + }, + }, + }, + } +} + +// TestEventServiceHealthDeregister returns a realistically populated service +// health deregistration event. The nodeNum is a +// logical node and is used to create the node name ("node%d") but also change +// the node ID and IP address to make it a little more realistic for cases that +// need that. nodeNum should be less than 64k to make the IP address look +// realistic. Any other changes can be made on the returned event to avoid +// adding too many options to callers. +func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event { + return stream.Event{ + Topic: TopicServiceHealth, + Key: svc, + Index: index, + Payload: eventPayload{ + Op: OpDelete, + Obj: &structs.CheckServiceNode{ + Node: &structs.Node{ + Node: fmt.Sprintf("node%d", nodeNum), + }, + Service: &structs.NodeService{ + ID: svc, + Service: svc, + Port: 8080, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + RaftIndex: structs.RaftIndex{ + // The original insertion index since a delete doesn't update + // this. This magic value came from state store tests where we + // setup at index 10 and then mutate at index 100. It can be + // modified by the caller later and makes it easier than having + // yet another argument in the common case. + CreateIndex: 10, + ModifyIndex: 10, + }, + }, + }, + }, + } +} diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 895da9e069..3fd72dfaa7 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -85,11 +85,8 @@ func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { return t } -func (c *changeTrackerDB) publish(changes Changes) error { - readOnlyTx := c.db.Txn(false) - defer readOnlyTx.Abort() - - events, err := c.processChanges(readOnlyTx, changes) +func (c *changeTrackerDB) publish(tx ReadTxn, changes Changes) error { + events, err := c.processChanges(tx, changes) if err != nil { return fmt.Errorf("failed generating events from changes: %v", err) } @@ -127,7 +124,7 @@ type txn struct { // Index is stored so that it may be passed along to any subscribers as part // of a change event. Index uint64 - publish func(changes Changes) error + publish func(tx ReadTxn, changes Changes) error } // Commit first pushes changes to EventPublisher, then calls Commit on the @@ -152,7 +149,7 @@ func (tx *txn) Commit() error { // In those cases changes should also be empty, and there will be nothing // to publish. if tx.publish != nil { - if err := tx.publish(changes); err != nil { + if err := tx.publish(tx.Txn, changes); err != nil { return err } } @@ -168,11 +165,33 @@ func (t topic) String() string { return string(t) } +var ( + // TopicServiceHealth contains events for all registered service instances. + TopicServiceHealth topic = "topic-service-health" + // TopicServiceHealthConnect contains events for connect-enabled service instances. + TopicServiceHealthConnect topic = "topic-service-health-connect" +) + func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { - // TODO: add other table handlers here. - return aclChangeUnsubscribeEvent(tx, changes) + var events []stream.Event + fns := []func(tx ReadTxn, changes Changes) ([]stream.Event, error){ + aclChangeUnsubscribeEvent, + ServiceHealthEventsFromChanges, + // TODO: add other table handlers here. + } + for _, fn := range fns { + e, err := fn(tx, changes) + if err != nil { + return nil, err + } + events = append(events, e...) + } + return events, nil } -func newSnapshotHandlers() stream.SnapshotHandlers { - return stream.SnapshotHandlers{} +func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { + return stream.SnapshotHandlers{ + TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth), + TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect), + } } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index d19922eece..3a7229607c 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -162,17 +162,17 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) { ctx, cancel := context.WithCancel(context.TODO()) s := &Store{ - schema: schema, - abandonCh: make(chan struct{}), - kvsGraveyard: NewGraveyard(gc), - lockDelay: NewDelay(), - db: &changeTrackerDB{ - db: db, - publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(), 10*time.Second), - processChanges: processDBChanges, - }, + schema: schema, + abandonCh: make(chan struct{}), + kvsGraveyard: NewGraveyard(gc), + lockDelay: NewDelay(), stopEventPublisher: cancel, } + s.db = &changeTrackerDB{ + db: db, + publisher: stream.NewEventPublisher(ctx, newSnapshotHandlers(s), 10*time.Second), + processChanges: processDBChanges, + } return s, nil } diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 83a978bb0a..6b2e9d1fe6 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -376,7 +376,7 @@ var topicService stream.Topic = topic("test-topic-service") func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - topicService: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { + topicService: func(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) if err != nil { return idx, err diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 9dfb8bf9e5..815a68a261 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -61,7 +61,11 @@ type changeEvents struct { // SnapshotHandlers is a mapping of Topic to a function which produces a snapshot // of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender. // The nil Topic is reserved and should not be used. -type SnapshotHandlers map[Topic]func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) +type SnapshotHandlers map[Topic]SnapshotFunc + +// SnapshotFunc builds a snapshot for the subscription request, and appends the +// events to the Snapshot using SnapshotAppender. +type SnapshotFunc func(SubscribeRequest, SnapshotAppender) (index uint64, err error) // SnapshotAppender appends groups of events to create a Snapshot of state. type SnapshotAppender interface { diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 4deeb1503e..4448e68454 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -58,7 +58,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ - testTopic: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { + testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { if req.Topic != testTopic { return 0, fmt.Errorf("unexpected topic: %v", req.Topic) } @@ -117,7 +117,7 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { t.Cleanup(cancel) handlers := newTestSnapshotHandlers() - fn := func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { + fn := func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { return 0, nil } handlers[intTopic(22)] = fn diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index 12a52ea37b..2f0d276f78 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -18,8 +18,6 @@ type eventSnapshot struct { snapBuffer *eventBuffer } -type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) - // newEventSnapshot creates a snapshot buffer based on the subscription request. // The current buffer head for the topic requested is passed so that once the // snapshot is complete and has been delivered into the buffer, any events @@ -27,7 +25,7 @@ type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) // missed. Once the snapshot is delivered the topic buffer is spliced onto the // snapshot buffer so that subscribers will naturally follow from the snapshot // to wait for any subsequent updates. -func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot { +func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn SnapshotFunc) *eventSnapshot { buf := newEventBuffer() s := &eventSnapshot{ Head: buf.Head(), @@ -35,7 +33,7 @@ func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn sna } go func() { - idx, err := fn(req, s.snapBuffer) + idx, err := fn(*req, s.snapBuffer) if err != nil { s.snapBuffer.AppendItem(&bufferItem{Err: err}) return diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 5e62e7f94f..c888e844ab 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -161,8 +161,8 @@ func genSequentialIDs(start, end int) []string { return ids } -func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc { - return func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { +func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapshotFunc { + return func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { for i := 0; i < size; i++ { // Event content is arbitrary we are just using Health because it's the // first type defined. We just want a set of things with consecutive