state: rename and export EventPayload

The subscribe endpoint needs to be able to inspect the payload to filter
events, and convert them into the protobuf types.

Use the protobuf CatalogOp type for the operation field, for now. In the
future if we end up with multiple interfaces we should be able to remove
the protobuf dependency by changing this to an int32 and adding a test
for the mapping between the values.

Make the value of the payload a concrete type instead of interface{}. We
can create other payloads for other event types.
This commit is contained in:
Daniel Nephin 2020-09-08 18:13:24 -04:00
parent 97d39505ef
commit 7b1534ef05
2 changed files with 24 additions and 32 deletions

View File

@ -3,20 +3,15 @@ package state
import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbsubscribe"
memdb "github.com/hashicorp/go-memdb"
)
type changeOp int
const (
OpDelete changeOp = iota
OpCreate
OpUpdate
)
type eventPayload struct {
Op changeOp
Obj interface{}
// EventPayloadCheckServiceNode is used as the Payload for a stream.Event to
// indicates changes to a CheckServiceNode for service health.
type EventPayloadCheckServiceNode struct {
Op pbsubscribe.CatalogOp
Value *structs.CheckServiceNode
}
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
@ -39,9 +34,9 @@ func serviceHealthSnapshot(s *Store, topic topic) stream.SnapshotFunc {
event := stream.Event{
Index: idx,
Topic: topic,
Payload: eventPayload{
Op: OpCreate,
Obj: &n,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
},
}
@ -320,15 +315,11 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
}
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
ep, ok := payload.(eventPayload)
ep, ok := payload.(EventPayloadCheckServiceNode)
if !ok {
return nil
}
csn, ok := ep.Obj.(*structs.CheckServiceNode)
if !ok {
return nil
}
return csn
return ep.Value
}
// newServiceHealthEventsForNode returns health events for all services on the
@ -440,9 +431,9 @@ func newServiceHealthEventRegister(
Topic: TopicServiceHealth,
Key: sn.ServiceName,
Index: idx,
Payload: eventPayload{
Op: OpCreate,
Obj: csn,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: csn,
},
}
}
@ -467,9 +458,9 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream
Topic: TopicServiceHealth,
Key: sn.ServiceName,
Index: idx,
Payload: eventPayload{
Op: OpDelete,
Obj: csn,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
Value: csn,
},
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
)
@ -1394,9 +1395,9 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
Topic: TopicServiceHealth,
Key: svc,
Index: index,
Payload: eventPayload{
Op: OpCreate,
Obj: &structs.CheckServiceNode{
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Node: &structs.Node{
ID: nodeID,
Node: node,
@ -1462,9 +1463,9 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string)
Topic: TopicServiceHealth,
Key: svc,
Index: index,
Payload: eventPayload{
Op: OpDelete,
Obj: &structs.CheckServiceNode{
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
Value: &structs.CheckServiceNode{
Node: &structs.Node{
Node: fmt.Sprintf("node%d", nodeNum),
},