From 27b02d391c7c28119b753fcbbf948b94ad976504 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 21 Jul 2020 19:39:36 -0400 Subject: [PATCH] state: use an enum for tracking node changes --- agent/consul/state/catalog_events.go | 61 ++++++++++++++++------------ 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index f04f9f60f9..731a95d973 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -68,20 +68,19 @@ type nodeServiceTuple struct { func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { var events []stream.Event - var nodeChanges map[string]*memdb.Change + var nodeChanges map[string]changeType var serviceChanges map[nodeServiceTuple]*memdb.Change - markNode := func(node string, nodeChange *memdb.Change) { + markNode := func(node string, typ changeType) { if nodeChanges == nil { - nodeChanges = make(map[string]*memdb.Change) + nodeChanges = make(map[string]changeType) } // If the caller has an actual node mutation ensure we store it even if the // node is already marked. If the caller is just marking the node dirty // without an node change, don't overwrite any existing node change we know // about. - ch := nodeChanges[node] - if ch == nil { - nodeChanges[node] = nodeChange + if nodeChanges[node] == changeIndirect { + nodeChanges[node] = typ } } markService := func(node, service string, entMeta structs.EnterpriseMeta, svcChange *memdb.Change) { @@ -111,20 +110,11 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event // we mark it anyway because if it _is_ a delete then we need to know that // later to avoid trying to deliver events when node level checks mark the // node as "changed". - nRaw := change.After - if change.After == nil { - nRaw = change.Before - } - n := nRaw.(*structs.Node) - changeCopy := change - markNode(n.Node, &changeCopy) + n := changeObject(change).(*structs.Node) + markNode(n.Node, changeTypeFromChange(change)) case "services": - snRaw := change.After - if change.After == nil { - snRaw = change.Before - } - sn := snRaw.(*structs.ServiceNode) + sn := changeObject(change).(*structs.ServiceNode) changeCopy := change markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy) @@ -140,7 +130,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event after := change.After.(*structs.HealthCheck) if after.ServiceID == "" || before.ServiceID == "" { // Either changed from or to being node-scoped - markNode(after.Node, nil) + markNode(after.Node, changeIndirect) } else { // Check changed which means we just need to emit for the linked // service. @@ -159,7 +149,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event before := change.Before.(*structs.HealthCheck) if before.ServiceID == "" { // Node level check - markNode(before.Node, nil) + markNode(before.Node, changeIndirect) } else { markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil) } @@ -168,7 +158,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event after := change.After.(*structs.HealthCheck) if after.ServiceID == "" { // Node level check - markNode(after.Node, nil) + markNode(after.Node, changeIndirect) } else { markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil) } @@ -177,11 +167,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event } // Now act on those marked nodes/services - for node, change := range nodeChanges { - // change may be nil if there was a change that _affected_ the node - // like a change to checks but it didn't actually change the node - // record itself. - if change != nil && change.Deleted() { + for node, changeType := range nodeChanges { + if changeType == changeDelete { // Node deletions are a no-op here since the state store transaction will // have also removed all the service instances which will be handled in // the loop below. @@ -269,6 +256,28 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event return events, nil } +type changeType uint8 + +const ( + // changeIndirect indicates some other object changed which has implications + // for the target object. + changeIndirect changeType = iota + changeDelete + changeCreate + changeUpdate +) + +func changeTypeFromChange(change memdb.Change) changeType { + switch { + case change.Deleted(): + return changeDelete + case change.Created(): + return changeCreate + default: + return changeUpdate + } +} + // serviceHealthToConnectEvents converts already formatted service health // registration events into the ones needed to publish to the Connect topic. // This essentially means filtering out any instances that are not Connect