state: use pbsubscribe.Topic for topic values

This commit is contained in:
Daniel Nephin 2020-09-15 15:04:33 -04:00
parent 7b1534ef05
commit 0fb2a5b992
4 changed files with 28 additions and 28 deletions

View File

@ -1,10 +1,11 @@
package state package state
import ( import (
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
memdb "github.com/hashicorp/go-memdb"
) )
// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to // EventPayloadCheckServiceNode is used as the Payload for a stream.Event to
@ -18,19 +19,20 @@ type EventPayloadCheckServiceNode struct {
// of stream.Events that describe the current state of a service health query. // of stream.Events that describe the current state of a service health query.
// //
// TODO: no tests for this yet // TODO: no tests for this yet
func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc { func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc {
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
connect := topic == TopicServiceHealthConnect connect := topic == topicServiceHealthConnect
// TODO(namespace-streaming): plumb entMeta through from SubscribeRequest // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil) idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil)
if err != nil { if err != nil {
return 0, err return 0, err
} }
for _, n := range nodes { for i := range nodes {
n := nodes[i]
event := stream.Event{ event := stream.Event{
Index: idx, Index: idx,
Topic: topic, Topic: topic,
@ -249,7 +251,7 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S
} }
e := newServiceHealthEventDeregister(idx, before) e := newServiceHealthEventDeregister(idx, before)
e.Topic = TopicServiceHealthConnect e.Topic = topicServiceHealthConnect
e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName
return e, true return e, true
} }
@ -285,7 +287,7 @@ func changeTypeFromChange(change memdb.Change) changeType {
func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
var result []stream.Event var result []stream.Event
for _, event := range events { for _, event := range events {
if event.Topic != TopicServiceHealth { if event.Topic != topicServiceHealth {
// Skip non-health or any events already emitted to Connect topic // Skip non-health or any events already emitted to Connect topic
continue continue
} }
@ -295,7 +297,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
} }
connectEvent := event connectEvent := event
connectEvent.Topic = TopicServiceHealthConnect connectEvent.Topic = topicServiceHealthConnect
switch { switch {
case node.Service.Connect.Native: case node.Service.Connect.Native:
@ -428,7 +430,7 @@ func newServiceHealthEventRegister(
Checks: checks, Checks: checks,
} }
return stream.Event{ return stream.Event{
Topic: TopicServiceHealth, Topic: topicServiceHealth,
Key: sn.ServiceName, Key: sn.ServiceName,
Index: idx, Index: idx,
Payload: EventPayloadCheckServiceNode{ Payload: EventPayloadCheckServiceNode{
@ -455,7 +457,7 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream
} }
return stream.Event{ return stream.Event{
Topic: TopicServiceHealth, Topic: topicServiceHealth,
Key: sn.ServiceName, Key: sn.ServiceName,
Index: idx, Index: idx,
Payload: EventPayloadCheckServiceNode{ Payload: EventPayloadCheckServiceNode{

View File

@ -1138,7 +1138,7 @@ func evConnectNative(e *stream.Event) error {
// depending on which topic they are published to and they determin this from // depending on which topic they are published to and they determin this from
// the event. // the event.
func evConnectTopic(e *stream.Event) error { func evConnectTopic(e *stream.Event) error {
e.Topic = TopicServiceHealthConnect e.Topic = topicServiceHealthConnect
return nil return nil
} }
@ -1172,7 +1172,7 @@ func evSidecar(e *stream.Event) error {
// Update event key to be the proxy service name, but only if this is not // Update event key to be the proxy service name, but only if this is not
// already in the connect topic // already in the connect topic
if e.Topic != TopicServiceHealthConnect { if e.Topic != topicServiceHealthConnect {
e.Key = csn.Service.Service e.Key = csn.Service.Service
} }
return nil return nil
@ -1262,7 +1262,7 @@ func evRenameService(e *stream.Event) error {
csn.Service.Proxy.DestinationServiceName += "_changed" csn.Service.Proxy.DestinationServiceName += "_changed"
// If this is the connect topic we need to change the key too // If this is the connect topic we need to change the key too
if e.Topic == TopicServiceHealthConnect { if e.Topic == topicServiceHealthConnect {
e.Key += "_changed" e.Key += "_changed"
} }
return nil return nil
@ -1392,7 +1392,7 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return stream.Event{ return stream.Event{
Topic: TopicServiceHealth, Topic: topicServiceHealth,
Key: svc, Key: svc,
Index: index, Index: index,
Payload: EventPayloadCheckServiceNode{ Payload: EventPayloadCheckServiceNode{
@ -1460,7 +1460,7 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
// adding too many options to callers. // adding too many options to callers.
func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event { func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event {
return stream.Event{ return stream.Event{
Topic: TopicServiceHealth, Topic: topicServiceHealth,
Key: svc, Key: svc,
Index: index, Index: index,
Payload: EventPayloadCheckServiceNode{ Payload: EventPayloadCheckServiceNode{

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
@ -158,18 +159,9 @@ func (tx *txn) Commit() error {
return nil return nil
} }
// TODO: may be replaced by a gRPC type.
type topic string
func (t topic) String() string {
return string(t)
}
var ( var (
// TopicServiceHealth contains events for all registered service instances. topicServiceHealth = pbsubscribe.Topic_ServiceHealth
TopicServiceHealth topic = "topic-service-health" topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect
// TopicServiceHealthConnect contains events for connect-enabled service instances.
TopicServiceHealthConnect topic = "topic-service-health-connect"
) )
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
@ -191,7 +183,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{ return stream.SnapshotHandlers{
TopicServiceHealth: serviceHealthSnapshot(s, TopicServiceHealth), topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth),
TopicServiceHealthConnect: serviceHealthSnapshot(s, TopicServiceHealthConnect), topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect),
} }
} }

View File

@ -372,7 +372,13 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
} }
} }
var topicService stream.Topic = topic("test-topic-service") type topic string
func (t topic) String() string {
return string(t)
}
var topicService topic = "test-topic-service"
func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
return stream.SnapshotHandlers{ return stream.SnapshotHandlers{