stream: remove Event.Key

Makes Payload a type with FilterByKey so that Payloads can implement
filtering by key. With this approach we don't need to expose a Namespace
field on Event, and we don't need to invest micro formats or require a
bunch of code to be aware of exactly how the key field is encoded.
This commit is contained in:
Daniel Nephin 2020-10-27 14:40:06 -04:00
parent 1c094da40d
commit a5dd2001cf
13 changed files with 161 additions and 91 deletions

View File

@ -13,6 +13,26 @@ import (
type EventPayloadCheckServiceNode struct {
Op pbsubscribe.CatalogOp
Value *structs.CheckServiceNode
// key is used to override the key used to filter the payload. It is set for
// events in the connect topic to specify the name of the underlying service
// when the change event is for a sidecar or gateway.
key string
}
func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool {
if key == "" && namespace == "" {
return true
}
if e.Value.Service == nil {
return false
}
name := e.Value.Service.Service
if e.key != "" {
name = e.key
}
return key == name && namespace == e.Value.Service.EnterpriseMeta.GetNamespace()
}
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
@ -42,10 +62,6 @@ func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc {
},
}
if n.Service != nil {
event.Key = n.Service.Service
}
// append each event as a separate item so that they can be serialized
// separately, to prevent the encoding of one massive message.
buf.Append([]stream.Event{event})
@ -252,7 +268,9 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S
e := newServiceHealthEventDeregister(idx, before)
e.Topic = topicServiceHealthConnect
e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = payload.Value.Service.Proxy.DestinationServiceName
e.Payload = payload
return e, true
}
@ -304,7 +322,9 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
result = append(result, connectEvent)
case node.Service.Kind == structs.ServiceKindConnectProxy:
connectEvent.Key = node.Service.Proxy.DestinationServiceName
payload := event.Payload.(EventPayloadCheckServiceNode)
payload.key = node.Service.Proxy.DestinationServiceName
connectEvent.Payload = payload
result = append(result, connectEvent)
default:
@ -316,7 +336,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
return result
}
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode {
ep, ok := payload.(EventPayloadCheckServiceNode)
if !ok {
return nil
@ -431,7 +451,6 @@ func newServiceHealthEventRegister(
}
return stream.Event{
Topic: topicServiceHealth,
Key: sn.ServiceName,
Index: idx,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
@ -458,7 +477,6 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream
return stream.Event{
Topic: topicServiceHealth,
Key: sn.ServiceName,
Index: idx,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,

View File

@ -822,6 +822,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
return nil
},
WantEvents: []stream.Event{
// We should see:
// - service dereg for web and proxy on node2
@ -832,29 +833,15 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
// - connect reg for api on node2
testServiceHealthDeregistrationEvent(t, "web", evNode2),
testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar),
testServiceHealthDeregistrationEvent(t, "web",
evConnectTopic,
evNode2,
evSidecar,
),
testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evNode2, evSidecar),
testServiceHealthEvent(t, "web", evNodeUnchanged),
testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged),
testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged),
testServiceHealthEvent(t, "api",
evNode2,
evConnectNative,
evNodeUnchanged,
),
testServiceHealthEvent(t, "api",
evNode2,
evConnectTopic,
evConnectNative,
evNodeUnchanged,
),
testServiceHealthEvent(t, "api", evNode2, evConnectNative, evNodeUnchanged),
testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged),
},
WantErr: false,
},
}
@ -1192,10 +1179,10 @@ func evSidecar(e *stream.Event) error {
csn.Checks[1].ServiceName = svc + "_sidecar_proxy"
}
// Update event key to be the proxy service name, but only if this is not
// already in the connect topic
if e.Topic != topicServiceHealthConnect {
e.Key = csn.Service.Service
if e.Topic == topicServiceHealthConnect {
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = svc
e.Payload = payload
}
return nil
}
@ -1264,15 +1251,13 @@ func evChecksUnchanged(e *stream.Event) error {
// name but not ID simulating an in-place service rename.
func evRenameService(e *stream.Event) error {
csn := getPayloadCheckServiceNode(e.Payload)
isSidecar := csn.Service.Kind == structs.ServiceKindConnectProxy
if !isSidecar {
if csn.Service.Kind != structs.ServiceKindConnectProxy {
csn.Service.Service += "_changed"
// Update service checks
if len(csn.Checks) >= 2 {
csn.Checks[1].ServiceName += "_changed"
}
e.Key += "_changed"
return nil
}
// This is a sidecar, it's not really realistic but lets only update the
@ -1280,12 +1265,13 @@ func evRenameService(e *stream.Event) error {
// we get the right result. This is certainly possible if not likely so a
// valid case.
// We don't need to update out own details, only the name of the destination
// We don't need to update our own details, only the name of the destination
csn.Service.Proxy.DestinationServiceName += "_changed"
// If this is the connect topic we need to change the key too
if e.Topic == topicServiceHealthConnect {
e.Key += "_changed"
payload := e.Payload.(EventPayloadCheckServiceNode)
payload.key = csn.Service.Proxy.DestinationServiceName
e.Payload = payload
}
return nil
}
@ -1373,7 +1359,6 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
return stream.Event{
Topic: topicServiceHealth,
Key: svc,
Index: index,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
@ -1444,7 +1429,6 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st
func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event {
return stream.Event{
Topic: topicServiceHealth,
Key: svc,
Index: index,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,

View File

@ -395,9 +395,8 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
for _, node := range nodes {
event := stream.Event{
Topic: req.Topic,
Key: req.Key,
Index: node.ModifyIndex,
Payload: node,
Payload: nodePayload{node: node, key: req.Key},
}
snap.Append([]stream.Event{event})
}
@ -406,6 +405,15 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers {
}
}
type nodePayload struct {
key string
node *structs.ServiceNode
}
func (p nodePayload) FilterByKey(key, _ string) bool {
return p.key == key
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",

View File

@ -14,15 +14,23 @@ type Topic fmt.Stringer
// EventPublisher and returned to Subscribers.
type Event struct {
Topic Topic
Key string
Index uint64
Payload interface{}
Payload Payload
}
type Payload interface {
// FilterByKey must return true if the Payload should be included in a subscription
// requested with the key and namespace.
// Generally this means that the payload matches the key and namespace or
// the payload is a special framing event that should be returned to every
// subscription.
FilterByKey(key, namespace string) bool
}
// Len returns the number of events contained within this event. If the Payload
// is a []Event, the length of that slice is returned. Otherwise 1 is returned.
func (e Event) Len() int {
if batch, ok := e.Payload.([]Event); ok {
if batch, ok := e.Payload.(PayloadEvents); ok {
return len(batch)
}
return 1
@ -31,7 +39,7 @@ func (e Event) Len() int {
// Filter returns an Event filtered to only those Events where f returns true.
// If the second return value is false, every Event was removed by the filter.
func (e Event) Filter(f func(Event) bool) (Event, bool) {
batch, ok := e.Payload.([]Event)
batch, ok := e.Payload.(PayloadEvents)
if !ok {
return e, f(e)
}
@ -50,7 +58,7 @@ func (e Event) Filter(f func(Event) bool) (Event, bool) {
return e, size != 0
}
filtered := make([]Event, 0, size)
filtered := make(PayloadEvents, 0, size)
for idx := range batch {
event := batch[idx]
if f(event) {
@ -64,6 +72,20 @@ func (e Event) Filter(f func(Event) bool) (Event, bool) {
return e, true
}
// PayloadEvents is an Payload which contains multiple Events.
type PayloadEvents []Event
// TODO: this method is not called, but needs to exist so that we can store
// a slice of events as a payload. In the future we should be able to refactor
// Event.Filter so that this FilterByKey includes the re-slicing.
func (e PayloadEvents) FilterByKey(_, _ string) bool {
return true
}
func (e PayloadEvents) Events() []Event {
return e
}
// IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Subsequent events from Subscription.Next will be
// streamed as they occur.
@ -80,12 +102,24 @@ func (e Event) IsNewSnapshotToFollow() bool {
type endOfSnapshot struct{}
func (endOfSnapshot) FilterByKey(string, string) bool {
return true
}
type newSnapshotToFollow struct{}
func (newSnapshotToFollow) FilterByKey(string, string) bool {
return true
}
type closeSubscriptionPayload struct {
tokensSecretIDs []string
}
func (closeSubscriptionPayload) FilterByKey(string, string) bool {
return true
}
// NewCloseSubscriptionEvent returns a special Event that is handled by the
// stream package, and is never sent to subscribers. EventProcessor handles
// these events, and closes any subscriptions which were created using a token

View File

@ -185,7 +185,6 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
if req.Index > 0 {
snap.buffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Payload: newSnapshotToFollow{},
}})

View File

@ -43,24 +43,37 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
}}
publisher.Publish(events)
// Subscriber should see the published event
next = getNextEvent(t, eventCh)
expected := Event{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}
expected := Event{
Topic: testTopic,
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
}
require.Equal(t, expected, next)
}
var testSnapshotEvent = Event{
Topic: testTopic,
Payload: "snapshot-event-payload",
Key: "sub-key",
Payload: simplePayload{key: "sub-key", value: "snapshot-event-payload"},
Index: 1,
}
type simplePayload struct {
key string
value string
}
func (p simplePayload) FilterByKey(key, _ string) bool {
if key == "" {
return true
}
return p.key == key
}
func newTestSnapshotHandlers() SnapshotHandlers {
return SnapshotHandlers{
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
@ -193,8 +206,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
expected := Event{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
Index: 3,
}
publisher.Publish([]Event{expected})
@ -243,9 +255,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
expected := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
Payload: simplePayload{key: "sub-key", value: "event-3"},
}
publisher.publishEvent([]Event{expected})
@ -284,9 +295,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
Payload: simplePayload{key: "sub-key", value: "event-3"},
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {
@ -341,9 +351,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
Payload: simplePayload{key: "sub-key", value: "event-3"},
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {

View File

@ -37,7 +37,6 @@ func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, t
}
s.buffer.Append([]Event{{
Topic: req.Topic,
Key: req.Key,
Index: idx,
Payload: endOfSnapshot{},
}})

View File

@ -129,9 +129,9 @@ func TestEventSnapshot(t *testing.T) {
e := curItem.Events[0]
switch {
case snapDone:
payload, ok := e.Payload.(string)
payload, ok := e.Payload.(simplePayload)
require.True(t, ok, "want health event got: %#v", e.Payload)
updateIDs = append(updateIDs, payload)
updateIDs = append(updateIDs, payload.value)
if len(updateIDs) == tc.updatesAfterSnap {
// We're done!
break RECV
@ -139,9 +139,9 @@ func TestEventSnapshot(t *testing.T) {
case e.IsEndOfSnapshot():
snapDone = true
default:
payload, ok := e.Payload.(string)
payload, ok := e.Payload.(simplePayload)
require.True(t, ok, "want health event got: %#v", e.Payload)
snapIDs = append(snapIDs, payload)
snapIDs = append(snapIDs, payload.value)
}
}
@ -176,6 +176,6 @@ func newDefaultHealthEvent(index uint64, n int) Event {
return Event{
Index: index,
Topic: testTopic,
Payload: fmt.Sprintf("test-event-%03d", n),
Payload: simplePayload{value: fmt.Sprintf("test-event-%03d", n)},
}
}

View File

@ -53,9 +53,21 @@ type Subscription struct {
// SubscribeRequest identifies the types of events the subscriber would like to
// receiver. Topic and Token are required.
type SubscribeRequest struct {
// Topic to subscribe to
Topic Topic
Key string
// Key used to filter events in the topic. Only events matching the key will
// be returned by the subscription. A blank key will return all events. Key
// is generally the name of the resource.
Key string
// Namespace used to filter events in the topic. Only events matching the
// namespace will be returned by the subscription.
Namespace string
// Token that was used to authenticate the request. If any ACL policy
// changes impact the token the subscription will be forcefully closed.
Token string
// Index is the last index the client received. If non-zero the
// subscription will be resumed from this index. If the index is out-of-date
// a NewSnapshotToFollow event will be sent.
Index uint64
}
@ -115,9 +127,8 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event {
}
return Event{
Topic: req.Topic,
Key: req.Key,
Index: first.Index,
Payload: events,
Payload: PayloadEvents(events),
}
}
@ -128,7 +139,7 @@ func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
}
fn := func(e Event) bool {
return req.Key == e.Key
return e.Payload.FilterByKey(req.Key, req.Namespace)
}
return event.Filter(fn)
}

View File

@ -69,7 +69,7 @@ func TestSubscription(t *testing.T) {
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Equal(t, index, got.Index)
require.Equal(t, "test", got.Key)
require.Equal(t, "test", got.Payload.(simplePayload).key)
// Cancelling the subscription context should unblock Next
start = time.Now()
@ -130,20 +130,17 @@ func TestSubscription_Close(t *testing.T) {
}
func publishTestEvent(index uint64, b *eventBuffer, key string) {
// Don't care about the event payload for now just the semantics of publishing
// something. This is not a valid stream in the end-to-end streaming protocol
// but enough to test subscription mechanics.
e := Event{
Index: index,
Topic: testTopic,
Key: key,
Index: index,
Topic: testTopic,
Payload: simplePayload{key: key},
}
b.Append([]Event{e})
}
func TestFilter_NoKey(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "One", Index: 102}, Event{Key: "Two"})
events := make(PayloadEvents, 0, 5)
events = append(events, newSimpleEvent("One", 102), newSimpleEvent("Two", 102))
req := SubscribeRequest{Topic: testTopic}
actual, ok := filterByKey(req, events)
@ -151,26 +148,33 @@ func TestFilter_NoKey(t *testing.T) {
require.Equal(t, Event{Topic: testTopic, Index: 102, Payload: events}, actual)
// test that a new array was not allocated
require.Equal(t, cap(actual.Payload.([]Event)), 5)
require.Equal(t, cap(actual.Payload.(PayloadEvents)), 5)
}
func newSimpleEvent(key string, index uint64) Event {
return Event{Index: index, Payload: simplePayload{key: key}}
}
func TestFilter_WithKey_AllEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same", Index: 103}, Event{Key: "Same"})
events := make(PayloadEvents, 0, 5)
events = append(events, newSimpleEvent("Same", 103), newSimpleEvent("Same", 103))
req := SubscribeRequest{Topic: testTopic, Key: "Same"}
actual, ok := filterByKey(req, events)
require.True(t, ok)
expected := Event{Topic: testTopic, Index: 103, Key: "Same", Payload: events}
expected := Event{Topic: testTopic, Index: 103, Payload: events}
require.Equal(t, expected, actual)
// test that a new array was not allocated
require.Equal(t, 5, cap(actual.Payload.([]Event)))
require.Equal(t, 5, cap(actual.Payload.(PayloadEvents)))
}
func TestFilter_WithKey_SomeEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same", Index: 104}, Event{Key: "Other"}, Event{Key: "Same"})
events = append(events,
newSimpleEvent("Same", 104),
newSimpleEvent("Other", 0),
newSimpleEvent("Same", 0))
req := SubscribeRequest{Topic: testTopic, Key: "Same"}
actual, ok := filterByKey(req, events)
@ -178,18 +182,17 @@ func TestFilter_WithKey_SomeEventsMatch(t *testing.T) {
expected := Event{
Topic: testTopic,
Index: 104,
Key: "Same",
Payload: []Event{{Key: "Same", Index: 104}, {Key: "Same"}},
Payload: PayloadEvents{newSimpleEvent("Same", 104), newSimpleEvent("Same", 0)},
}
require.Equal(t, expected, actual)
// test that a new array was allocated with the correct size
require.Equal(t, cap(actual.Payload.([]Event)), 2)
require.Equal(t, cap(actual.Payload.(PayloadEvents)), 2)
}
func TestFilter_WithKey_NoEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same"}, Event{Key: "Same"})
events = append(events, newSimpleEvent("Same", 0), newSimpleEvent("Same", 0))
req := SubscribeRequest{Topic: testTopic, Key: "Other"}
_, ok := filterByKey(req, events)

View File

@ -153,9 +153,9 @@ func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
return e
}
func setPayload(e *pbsubscribe.Event, payload interface{}) {
func setPayload(e *pbsubscribe.Event, payload stream.Payload) {
switch p := payload.(type) {
case []stream.Event:
case stream.PayloadEvents:
e.Payload = &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{
Events: batchEventsFromEventSlice(p),

View File

@ -915,7 +915,7 @@ func TestNewEventFromSteamEvent(t *testing.T) {
name: "event batch",
event: stream.Event{
Index: 2002,
Payload: []stream.Event{
Payload: stream.PayloadEvents{
{
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{

View File

@ -74,6 +74,11 @@ func (_ *EnterpriseMeta) FillAuthzContext(_ *acl.AuthorizerContext) {}
func (_ *EnterpriseMeta) Normalize() {}
// GetNamespace always returns the empty string.
func (_ *EnterpriseMeta) GetNamespace() string {
return ""
}
// FillAuthzContext stub
func (_ *DirEntry) FillAuthzContext(_ *acl.AuthorizerContext) {}