From 7581305523b1898a24cc4f7a22beae547da7973e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 21 Jul 2020 21:02:22 -0400 Subject: [PATCH] state: Remove unused args and return values Also rename some functions to identify them as constructors for events --- agent/consul/state/catalog_events.go | 105 +++++++++------------------ 1 file changed, 35 insertions(+), 70 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 731a95d973..a08499b11f 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -96,8 +96,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event // the service is already marked. If the caller is just marking the service // dirty without an node change, don't overwrite any existing node change we // know about. - ch := serviceChanges[k] - if ch == nil { + if serviceChanges[k] == nil { serviceChanges[k] = svcChange } } @@ -115,7 +114,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event case "services": sn := changeObject(change).(*structs.ServiceNode) - changeCopy := change + changeCopy := change // TODO: why does the change need to be copied? markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy) case "checks": @@ -145,22 +144,13 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } } - case change.Deleted(): - before := change.Before.(*structs.HealthCheck) - if before.ServiceID == "" { + case change.Deleted(), change.Created(): + obj := changeObject(change).(*structs.HealthCheck) + if obj.ServiceID == "" { // Node level check - markNode(before.Node, changeIndirect) + markNode(obj.Node, changeIndirect) } else { - markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil) - } - - case change.Created(): - after := change.After.(*structs.HealthCheck) - if after.ServiceID == "" { - // Node level check - markNode(after.Node, changeIndirect) - } else { - markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil) + markService(obj.Node, obj.ServiceID, obj.EnterpriseMeta, nil) } } } @@ -175,7 +165,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event continue } // Rebuild events for all services on this node - es, err := serviceHealthEventsForNode(tx, changes.Index, node) + es, err := newServiceHealthEventsForNode(tx, changes.Index, node) if err != nil { return nil, err } @@ -187,13 +177,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event // like a change to checks but it didn't actually change the service // record itself. if change != nil && change.Deleted() { - // Generate delete event for the service instance and append it sn := change.Before.(*structs.ServiceNode) - es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, sn, &tuple.EntMeta) - if err != nil { - return nil, err - } - events = append(events, es...) + e := newServiceHealthEventDeregister(changes.Index, sn) + events = append(events, e) continue } @@ -209,11 +195,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event // go out to subscribers to the new service name topic key, but we need // to fix up subscribers that were watching the old name by sending // deregistrations. - es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, before, &tuple.EntMeta) - if err != nil { - return nil, err - } - events = append(events, es...) + e := newServiceHealthEventDeregister(changes.Index, before) + events = append(events, e) } if before.ServiceKind == structs.ServiceKindConnectProxy && @@ -223,16 +206,10 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event // to deregister this sidecar service though as it still exists and // didn't change its name (or if it did that was caught just above). But // our mechanism for connect events is to convert them so we generate - // the regular one, convert it to Connect topic and then discar the + // the regular one, convert it to Connect topic and then discard the // original. - es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, before, &tuple.EntMeta) - if err != nil { - return nil, err - } - // Don't append es per comment above, but convert it to connect topic - // events. - es = serviceHealthToConnectEvents(es) - events = append(events, es...) + e := newServiceHealthEventDeregister(changes.Index, before) + events = append(events, serviceHealthToConnectEvents(e)...) } } @@ -242,16 +219,16 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event continue } // Build service event and append it - es, err := serviceHealthEventsForServiceInstance(tx, changes.Index, tuple) + e, err := newServiceHealthEventForService(tx, changes.Index, tuple) if err != nil { return nil, err } - events = append(events, es...) + events = append(events, e) } // Duplicate any events that affected connect-enabled instances (proxies or // native apps) to the relevant Connect topic. - events = append(events, serviceHealthToConnectEvents(events)...) + events = append(events, serviceHealthToConnectEvents(events...)...) return events, nil } @@ -284,7 +261,7 @@ func changeTypeFromChange(change memdb.Change) changeType { // enabled and so of no interest to those subscribers but also involves // switching connection details to be the proxy instead of the actual instance // in case of a sidecar. -func serviceHealthToConnectEvents(events []stream.Event) []stream.Event { +func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { serviceHealthConnectEvents := make([]stream.Event, 0, len(events)) for _, event := range events { if event.Topic != TopicServiceHealth { @@ -292,6 +269,7 @@ func serviceHealthToConnectEvents(events []stream.Event) []stream.Event { continue } node := getPayloadCheckServiceNode(event.Payload) + // TODO: do we need to handle gateways here as well? if node.Service == nil || (node.Service.Kind != structs.ServiceKindConnectProxy && !node.Service.Connect.Native) { // Event is not a service instance (i.e. just a node registration) @@ -325,11 +303,11 @@ func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode { return csn } -// serviceHealthEventsForNode returns health events for all services on the +// newServiceHealthEventsForNode returns health events for all services on the // given node. This mirrors some of the the logic in the oddly-named // parseCheckServiceNodes but is more efficient since we know they are all on // the same node. -func serviceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) { +func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) { // TODO(namespace-streaming): figure out the right EntMeta and mystery arg. services, err := catalogServiceListByNode(tx, node, nil, false) if err != nil { @@ -345,13 +323,8 @@ func serviceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.E for service := services.Next(); service != nil; service = services.Next() { sn := service.(*structs.ServiceNode) - es, err := serviceHealthEventsForServiceNodeInternal(idx, n, sn, nodeChecks, svcChecks) - if err != nil { - return nil, err - } - - // Append to the results. - events = append(events, es...) + event := newServiceHealthEventRegister(idx, n, sn, nodeChecks, svcChecks) + events = append(events, event) } return events, nil @@ -396,31 +369,31 @@ func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, return n, nodeChecks, svcChecks, nil } -func serviceHealthEventsForServiceInstance(tx ReadTxn, idx uint64, tuple nodeServiceTuple) ([]stream.Event, error) { +func newServiceHealthEventForService(tx ReadTxn, idx uint64, tuple nodeServiceTuple) (stream.Event, error) { n, nodeChecks, svcChecks, err := getNodeAndChecks(tx, tuple.Node) if err != nil { - return nil, err + return stream.Event{}, err } svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID) if err != nil { - return nil, err + return stream.Event{}, err } sn := svc.Next() if sn == nil { - return nil, ErrMissingService + return stream.Event{}, ErrMissingService } - return serviceHealthEventsForServiceNodeInternal(idx, n, sn.(*structs.ServiceNode), nodeChecks, svcChecks) + return newServiceHealthEventRegister(idx, n, sn.(*structs.ServiceNode), nodeChecks, svcChecks), nil } -func serviceHealthEventsForServiceNodeInternal(idx uint64, +func newServiceHealthEventRegister(idx uint64, node *structs.Node, sn *structs.ServiceNode, nodeChecks structs.HealthChecks, - svcChecks map[string]structs.HealthChecks) ([]stream.Event, error) { - + svcChecks map[string]structs.HealthChecks, +) stream.Event { // Start with a copy of the node checks. checks := nodeChecks for _, check := range svcChecks[sn.ServiceID] { @@ -432,7 +405,7 @@ func serviceHealthEventsForServiceNodeInternal(idx uint64, Service: sn.ToNodeService(), Checks: checks, } - e := stream.Event{ + return stream.Event{ Topic: TopicServiceHealth, Key: sn.ServiceName, Index: idx, @@ -441,16 +414,9 @@ func serviceHealthEventsForServiceNodeInternal(idx uint64, Obj: csn, }, } - - // See if we also need to emit a connect event (i.e. if this instance is a - // connect proxy or connect native app). - - return []stream.Event{e}, nil } -func serviceHealthDeregEventsForServiceInstance(idx uint64, - sn *structs.ServiceNode, entMeta *structs.EnterpriseMeta) ([]stream.Event, error) { - +func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream.Event { // We actually only need the node name populated in the node part as it's only // used as a key to know which service was deregistered so don't bother looking // up the node in the DB. Note that while the ServiceNode does have NodeID @@ -466,7 +432,7 @@ func serviceHealthDeregEventsForServiceInstance(idx uint64, Service: sn.ToNodeService(), } - e := stream.Event{ + return stream.Event{ Topic: TopicServiceHealth, Key: sn.ServiceName, Index: idx, @@ -475,5 +441,4 @@ func serviceHealthDeregEventsForServiceInstance(idx uint64, Obj: csn, }, } - return []stream.Event{e}, nil }