state: Remove unused args and return values

Also rename some functions to identify them as constructors for events
This commit is contained in:
Daniel Nephin 2020-07-21 21:02:22 -04:00
parent 27b02d391c
commit 7581305523
1 changed files with 35 additions and 70 deletions

View File

@ -96,8 +96,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
// the service is already marked. If the caller is just marking the service // the service is already marked. If the caller is just marking the service
// dirty without an node change, don't overwrite any existing node change we // dirty without an node change, don't overwrite any existing node change we
// know about. // know about.
ch := serviceChanges[k] if serviceChanges[k] == nil {
if ch == nil {
serviceChanges[k] = svcChange serviceChanges[k] = svcChange
} }
} }
@ -115,7 +114,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
case "services": case "services":
sn := changeObject(change).(*structs.ServiceNode) sn := changeObject(change).(*structs.ServiceNode)
changeCopy := change changeCopy := change // TODO: why does the change need to be copied?
markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy) markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy)
case "checks": case "checks":
@ -145,22 +144,13 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
} }
} }
case change.Deleted(): case change.Deleted(), change.Created():
before := change.Before.(*structs.HealthCheck) obj := changeObject(change).(*structs.HealthCheck)
if before.ServiceID == "" { if obj.ServiceID == "" {
// Node level check // Node level check
markNode(before.Node, changeIndirect) markNode(obj.Node, changeIndirect)
} else { } else {
markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil) markService(obj.Node, obj.ServiceID, obj.EnterpriseMeta, nil)
}
case change.Created():
after := change.After.(*structs.HealthCheck)
if after.ServiceID == "" {
// Node level check
markNode(after.Node, changeIndirect)
} else {
markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil)
} }
} }
} }
@ -175,7 +165,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
continue continue
} }
// Rebuild events for all services on this node // Rebuild events for all services on this node
es, err := serviceHealthEventsForNode(tx, changes.Index, node) es, err := newServiceHealthEventsForNode(tx, changes.Index, node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -187,13 +177,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
// like a change to checks but it didn't actually change the service // like a change to checks but it didn't actually change the service
// record itself. // record itself.
if change != nil && change.Deleted() { if change != nil && change.Deleted() {
// Generate delete event for the service instance and append it
sn := change.Before.(*structs.ServiceNode) sn := change.Before.(*structs.ServiceNode)
es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, sn, &tuple.EntMeta) e := newServiceHealthEventDeregister(changes.Index, sn)
if err != nil { events = append(events, e)
return nil, err
}
events = append(events, es...)
continue continue
} }
@ -209,11 +195,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
// go out to subscribers to the new service name topic key, but we need // go out to subscribers to the new service name topic key, but we need
// to fix up subscribers that were watching the old name by sending // to fix up subscribers that were watching the old name by sending
// deregistrations. // deregistrations.
es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, before, &tuple.EntMeta) e := newServiceHealthEventDeregister(changes.Index, before)
if err != nil { events = append(events, e)
return nil, err
}
events = append(events, es...)
} }
if before.ServiceKind == structs.ServiceKindConnectProxy && if before.ServiceKind == structs.ServiceKindConnectProxy &&
@ -223,16 +206,10 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
// to deregister this sidecar service though as it still exists and // to deregister this sidecar service though as it still exists and
// didn't change its name (or if it did that was caught just above). But // didn't change its name (or if it did that was caught just above). But
// our mechanism for connect events is to convert them so we generate // our mechanism for connect events is to convert them so we generate
// the regular one, convert it to Connect topic and then discar the // the regular one, convert it to Connect topic and then discard the
// original. // original.
es, err := serviceHealthDeregEventsForServiceInstance(changes.Index, before, &tuple.EntMeta) e := newServiceHealthEventDeregister(changes.Index, before)
if err != nil { events = append(events, serviceHealthToConnectEvents(e)...)
return nil, err
}
// Don't append es per comment above, but convert it to connect topic
// events.
es = serviceHealthToConnectEvents(es)
events = append(events, es...)
} }
} }
@ -242,16 +219,16 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
continue continue
} }
// Build service event and append it // Build service event and append it
es, err := serviceHealthEventsForServiceInstance(tx, changes.Index, tuple) e, err := newServiceHealthEventForService(tx, changes.Index, tuple)
if err != nil { if err != nil {
return nil, err return nil, err
} }
events = append(events, es...) events = append(events, e)
} }
// Duplicate any events that affected connect-enabled instances (proxies or // Duplicate any events that affected connect-enabled instances (proxies or
// native apps) to the relevant Connect topic. // native apps) to the relevant Connect topic.
events = append(events, serviceHealthToConnectEvents(events)...) events = append(events, serviceHealthToConnectEvents(events...)...)
return events, nil return events, nil
} }
@ -284,7 +261,7 @@ func changeTypeFromChange(change memdb.Change) changeType {
// enabled and so of no interest to those subscribers but also involves // enabled and so of no interest to those subscribers but also involves
// switching connection details to be the proxy instead of the actual instance // switching connection details to be the proxy instead of the actual instance
// in case of a sidecar. // in case of a sidecar.
func serviceHealthToConnectEvents(events []stream.Event) []stream.Event { func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
serviceHealthConnectEvents := make([]stream.Event, 0, len(events)) serviceHealthConnectEvents := make([]stream.Event, 0, len(events))
for _, event := range events { for _, event := range events {
if event.Topic != TopicServiceHealth { if event.Topic != TopicServiceHealth {
@ -292,6 +269,7 @@ func serviceHealthToConnectEvents(events []stream.Event) []stream.Event {
continue continue
} }
node := getPayloadCheckServiceNode(event.Payload) node := getPayloadCheckServiceNode(event.Payload)
// TODO: do we need to handle gateways here as well?
if node.Service == nil || if node.Service == nil ||
(node.Service.Kind != structs.ServiceKindConnectProxy && !node.Service.Connect.Native) { (node.Service.Kind != structs.ServiceKindConnectProxy && !node.Service.Connect.Native) {
// Event is not a service instance (i.e. just a node registration) // Event is not a service instance (i.e. just a node registration)
@ -325,11 +303,11 @@ func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
return csn return csn
} }
// serviceHealthEventsForNode returns health events for all services on the // newServiceHealthEventsForNode returns health events for all services on the
// given node. This mirrors some of the the logic in the oddly-named // given node. This mirrors some of the the logic in the oddly-named
// parseCheckServiceNodes but is more efficient since we know they are all on // parseCheckServiceNodes but is more efficient since we know they are all on
// the same node. // the same node.
func serviceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) { func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) {
// TODO(namespace-streaming): figure out the right EntMeta and mystery arg. // TODO(namespace-streaming): figure out the right EntMeta and mystery arg.
services, err := catalogServiceListByNode(tx, node, nil, false) services, err := catalogServiceListByNode(tx, node, nil, false)
if err != nil { if err != nil {
@ -345,13 +323,8 @@ func serviceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.E
for service := services.Next(); service != nil; service = services.Next() { for service := services.Next(); service != nil; service = services.Next() {
sn := service.(*structs.ServiceNode) sn := service.(*structs.ServiceNode)
es, err := serviceHealthEventsForServiceNodeInternal(idx, n, sn, nodeChecks, svcChecks) event := newServiceHealthEventRegister(idx, n, sn, nodeChecks, svcChecks)
if err != nil { events = append(events, event)
return nil, err
}
// Append to the results.
events = append(events, es...)
} }
return events, nil return events, nil
@ -396,31 +369,31 @@ func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node,
return n, nodeChecks, svcChecks, nil return n, nodeChecks, svcChecks, nil
} }
func serviceHealthEventsForServiceInstance(tx ReadTxn, idx uint64, tuple nodeServiceTuple) ([]stream.Event, error) { func newServiceHealthEventForService(tx ReadTxn, idx uint64, tuple nodeServiceTuple) (stream.Event, error) {
n, nodeChecks, svcChecks, err := getNodeAndChecks(tx, tuple.Node) n, nodeChecks, svcChecks, err := getNodeAndChecks(tx, tuple.Node)
if err != nil { if err != nil {
return nil, err return stream.Event{}, err
} }
svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID) svc, err := getCompoundWithTxn(tx, "services", "id", &tuple.EntMeta, tuple.Node, tuple.ServiceID)
if err != nil { if err != nil {
return nil, err return stream.Event{}, err
} }
sn := svc.Next() sn := svc.Next()
if sn == nil { if sn == nil {
return nil, ErrMissingService return stream.Event{}, ErrMissingService
} }
return serviceHealthEventsForServiceNodeInternal(idx, n, sn.(*structs.ServiceNode), nodeChecks, svcChecks) return newServiceHealthEventRegister(idx, n, sn.(*structs.ServiceNode), nodeChecks, svcChecks), nil
} }
func serviceHealthEventsForServiceNodeInternal(idx uint64, func newServiceHealthEventRegister(idx uint64,
node *structs.Node, node *structs.Node,
sn *structs.ServiceNode, sn *structs.ServiceNode,
nodeChecks structs.HealthChecks, nodeChecks structs.HealthChecks,
svcChecks map[string]structs.HealthChecks) ([]stream.Event, error) { svcChecks map[string]structs.HealthChecks,
) stream.Event {
// Start with a copy of the node checks. // Start with a copy of the node checks.
checks := nodeChecks checks := nodeChecks
for _, check := range svcChecks[sn.ServiceID] { for _, check := range svcChecks[sn.ServiceID] {
@ -432,7 +405,7 @@ func serviceHealthEventsForServiceNodeInternal(idx uint64,
Service: sn.ToNodeService(), Service: sn.ToNodeService(),
Checks: checks, Checks: checks,
} }
e := stream.Event{ return stream.Event{
Topic: TopicServiceHealth, Topic: TopicServiceHealth,
Key: sn.ServiceName, Key: sn.ServiceName,
Index: idx, Index: idx,
@ -441,16 +414,9 @@ func serviceHealthEventsForServiceNodeInternal(idx uint64,
Obj: csn, Obj: csn,
}, },
} }
// See if we also need to emit a connect event (i.e. if this instance is a
// connect proxy or connect native app).
return []stream.Event{e}, nil
} }
func serviceHealthDeregEventsForServiceInstance(idx uint64, func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream.Event {
sn *structs.ServiceNode, entMeta *structs.EnterpriseMeta) ([]stream.Event, error) {
// We actually only need the node name populated in the node part as it's only // We actually only need the node name populated in the node part as it's only
// used as a key to know which service was deregistered so don't bother looking // used as a key to know which service was deregistered so don't bother looking
// up the node in the DB. Note that while the ServiceNode does have NodeID // up the node in the DB. Note that while the ServiceNode does have NodeID
@ -466,7 +432,7 @@ func serviceHealthDeregEventsForServiceInstance(idx uint64,
Service: sn.ToNodeService(), Service: sn.ToNodeService(),
} }
e := stream.Event{ return stream.Event{
Topic: TopicServiceHealth, Topic: TopicServiceHealth,
Key: sn.ServiceName, Key: sn.ServiceName,
Index: idx, Index: idx,
@ -475,5 +441,4 @@ func serviceHealthDeregEventsForServiceInstance(idx uint64,
Obj: csn, Obj: csn,
}, },
} }
return []stream.Event{e}, nil
} }