From 7b1534ef0548a9c6b01953506c51569b7ad38851 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 8 Sep 2020 18:13:24 -0400 Subject: [PATCH] state: rename and export EventPayload The subscribe endpoint needs to be able to inspect the payload to filter events, and convert them into the protobuf types. Use the protobuf CatalogOp type for the operation field, for now. In the future if we end up with multiple interfaces we should be able to remove the protobuf dependency by changing this to an int32 and adding a test for the mapping between the values. Make the value of the payload a concrete type instead of interface{}. We can create other payloads for other event types. --- agent/consul/state/catalog_events.go | 43 +++++++++-------------- agent/consul/state/catalog_events_test.go | 13 +++---- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index b42d47fc64..d68180ed60 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -3,20 +3,15 @@ package state import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbsubscribe" memdb "github.com/hashicorp/go-memdb" ) -type changeOp int - -const ( - OpDelete changeOp = iota - OpCreate - OpUpdate -) - -type eventPayload struct { - Op changeOp - Obj interface{} +// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to +// indicates changes to a CheckServiceNode for service health. +type EventPayloadCheckServiceNode struct { + Op pbsubscribe.CatalogOp + Value *structs.CheckServiceNode } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -39,9 +34,9 @@ func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc { event := stream.Event{ Index: idx, Topic: topic, - Payload: eventPayload{ - Op: OpCreate, - Obj: &n, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &n, }, } @@ -320,15 +315,11 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { } func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode { - ep, ok := payload.(eventPayload) + ep, ok := payload.(EventPayloadCheckServiceNode) if !ok { return nil } - csn, ok := ep.Obj.(*structs.CheckServiceNode) - if !ok { - return nil - } - return csn + return ep.Value } // newServiceHealthEventsForNode returns health events for all services on the @@ -440,9 +431,9 @@ func newServiceHealthEventRegister( Topic: TopicServiceHealth, Key: sn.ServiceName, Index: idx, - Payload: eventPayload{ - Op: OpCreate, - Obj: csn, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: csn, }, } } @@ -467,9 +458,9 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream Topic: TopicServiceHealth, Key: sn.ServiceName, Index: idx, - Payload: eventPayload{ - Op: OpDelete, - Obj: csn, + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: csn, }, } } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 5cf610604f..d4d5416157 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" "github.com/stretchr/testify/require" ) @@ -1394,9 +1395,9 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st Topic: TopicServiceHealth, Key: svc, Index: index, - Payload: eventPayload{ - Op: OpCreate, - Obj: &structs.CheckServiceNode{ + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ Node: &structs.Node{ ID: nodeID, Node: node, @@ -1462,9 +1463,9 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) Topic: TopicServiceHealth, Key: svc, Index: index, - Payload: eventPayload{ - Op: OpDelete, - Obj: &structs.CheckServiceNode{ + Payload: EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: &structs.CheckServiceNode{ Node: &structs.Node{ Node: fmt.Sprintf("node%d", nodeNum), },