From 0fb2a5b9921dc2ef7fa5c95afa789f506113c5ec Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 15 Sep 2020 15:04:33 -0400 Subject: [PATCH] state: use pbsubscribe.Topic for topic values --- agent/consul/state/catalog_events.go | 20 +++++++++++--------- agent/consul/state/catalog_events_test.go | 10 +++++----- agent/consul/state/memdb.go | 18 +++++------------- agent/consul/state/store_integration_test.go | 8 +++++++- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index d68180ed60..4b7ee11932 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -1,10 +1,11 @@ package state import ( + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbsubscribe" - memdb "github.com/hashicorp/go-memdb" ) // EventPayloadCheckServiceNode is used as the Payload for a stream.Event to @@ -18,19 +19,20 @@ type EventPayloadCheckServiceNode struct { // of stream.Events that describe the current state of a service health query. // // TODO: no tests for this yet -func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc { +func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc { return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { tx := s.db.Txn(false) defer tx.Abort() - connect := topic == TopicServiceHealthConnect + connect := topic == topicServiceHealthConnect // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil) if err != nil { return 0, err } - for _, n := range nodes { + for i := range nodes { + n := nodes[i] event := stream.Event{ Index: idx, Topic: topic, @@ -249,7 +251,7 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S } e := newServiceHealthEventDeregister(idx, before) - e.Topic = TopicServiceHealthConnect + e.Topic = topicServiceHealthConnect e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName return e, true } @@ -285,7 +287,7 @@ func changeTypeFromChange(change memdb.Change) changeType { func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { var result []stream.Event for _, event := range events { - if event.Topic != TopicServiceHealth { + if event.Topic != topicServiceHealth { // Skip non-health or any events already emitted to Connect topic continue } @@ -295,7 +297,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { } connectEvent := event - connectEvent.Topic = TopicServiceHealthConnect + connectEvent.Topic = topicServiceHealthConnect switch { case node.Service.Connect.Native: @@ -428,7 +430,7 @@ func newServiceHealthEventRegister( Checks: checks, } return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: sn.ServiceName, Index: idx, Payload: EventPayloadCheckServiceNode{ @@ -455,7 +457,7 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream } return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: sn.ServiceName, Index: idx, Payload: EventPayloadCheckServiceNode{ diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index d4d5416157..9d6d91d54f 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1138,7 +1138,7 @@ func evConnectNative(e *stream.Event) error { // depending on which topic they are published to and they determin this from // the event. func evConnectTopic(e *stream.Event) error { - e.Topic = TopicServiceHealthConnect + e.Topic = topicServiceHealthConnect return nil } @@ -1172,7 +1172,7 @@ func evSidecar(e *stream.Event) error { // Update event key to be the proxy service name, but only if this is not // already in the connect topic - if e.Topic != TopicServiceHealthConnect { + if e.Topic != topicServiceHealthConnect { e.Key = csn.Service.Service } return nil @@ -1262,7 +1262,7 @@ func evRenameService(e *stream.Event) error { csn.Service.Proxy.DestinationServiceName += "_changed" // If this is the connect topic we need to change the key too - if e.Topic == TopicServiceHealthConnect { + if e.Topic == topicServiceHealthConnect { e.Key += "_changed" } return nil @@ -1392,7 +1392,7 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: svc, Index: index, Payload: EventPayloadCheckServiceNode{ @@ -1460,7 +1460,7 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st // adding too many options to callers. func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event { return stream.Event{ - Topic: TopicServiceHealth, + Topic: topicServiceHealth, Key: svc, Index: index, Payload: EventPayloadCheckServiceNode{ diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 3fd72dfaa7..beb46d62d8 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/go-memdb" ) @@ -158,18 +159,9 @@ func (tx *txn) Commit() error { return nil } -// TODO: may be replaced by a gRPC type. -type topic string - -func (t topic) String() string { - return string(t) -} - var ( - // TopicServiceHealth contains events for all registered service instances. - TopicServiceHealth topic = "topic-service-health" - // TopicServiceHealthConnect contains events for connect-enabled service instances. - TopicServiceHealthConnect topic = "topic-service-health-connect" + topicServiceHealth = pbsubscribe.Topic_ServiceHealth + topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect ) func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { @@ -191,7 +183,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth), - TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect), + topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), + topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), } } diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 6b2e9d1fe6..14d667ce39 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -372,7 +372,13 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { } } -var topicService stream.Topic = topic("test-topic-service") +type topic string + +func (t topic) String() string { + return string(t) +} + +var topicService topic = "test-topic-service" func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{