diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 4b7ee11932..08ff3d1fa4 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -13,6 +13,26 @@ import ( type EventPayloadCheckServiceNode struct { Op pbsubscribe.CatalogOp Value *structs.CheckServiceNode + // key is used to override the key used to filter the payload. It is set for + // events in the connect topic to specify the name of the underlying service + // when the change event is for a sidecar or gateway. + key string +} + +func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool { + if key == "" && namespace == "" { + return true + } + + if e.Value.Service == nil { + return false + } + + name := e.Value.Service.Service + if e.key != "" { + name = e.key + } + return key == name && namespace == e.Value.Service.EnterpriseMeta.GetNamespace() } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -42,10 +62,6 @@ func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc { }, } - if n.Service != nil { - event.Key = n.Service.Service - } - // append each event as a separate item so that they can be serialized // separately, to prevent the encoding of one massive message. buf.Append([]stream.Event{event}) @@ -252,7 +268,9 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S e := newServiceHealthEventDeregister(idx, before) e.Topic = topicServiceHealthConnect - e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = payload.Value.Service.Proxy.DestinationServiceName + e.Payload = payload return e, true } @@ -304,7 +322,9 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { result = append(result, connectEvent) case node.Service.Kind == structs.ServiceKindConnectProxy: - connectEvent.Key = node.Service.Proxy.DestinationServiceName + payload := event.Payload.(EventPayloadCheckServiceNode) + payload.key = node.Service.Proxy.DestinationServiceName + connectEvent.Payload = payload result = append(result, connectEvent) default: @@ -316,7 +336,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { return result } -func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode { +func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode { ep, ok := payload.(EventPayloadCheckServiceNode) if !ok { return nil @@ -431,7 +451,6 @@ func newServiceHealthEventRegister( } return stream.Event{ Topic: topicServiceHealth, - Key: sn.ServiceName, Index: idx, Payload: EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -458,7 +477,6 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream return stream.Event{ Topic: topicServiceHealth, - Key: sn.ServiceName, Index: idx, Payload: EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 50d95e0134..ac4a07d67e 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -822,6 +822,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { return nil }, + WantEvents: []stream.Event{ // We should see: // - service dereg for web and proxy on node2 @@ -832,29 +833,15 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { // - connect reg for api on node2 testServiceHealthDeregistrationEvent(t, "web", evNode2), testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar), - testServiceHealthDeregistrationEvent(t, "web", - evConnectTopic, - evNode2, - evSidecar, - ), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evNode2, evSidecar), testServiceHealthEvent(t, "web", evNodeUnchanged), testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), - testServiceHealthEvent(t, "api", - evNode2, - evConnectNative, - evNodeUnchanged, - ), - testServiceHealthEvent(t, "api", - evNode2, - evConnectTopic, - evConnectNative, - evNodeUnchanged, - ), + testServiceHealthEvent(t, "api", evNode2, evConnectNative, evNodeUnchanged), + testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged), }, - WantErr: false, }, } @@ -1192,10 +1179,10 @@ func evSidecar(e *stream.Event) error { csn.Checks[1].ServiceName = svc + "_sidecar_proxy" } - // Update event key to be the proxy service name, but only if this is not - // already in the connect topic - if e.Topic != topicServiceHealthConnect { - e.Key = csn.Service.Service + if e.Topic == topicServiceHealthConnect { + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = svc + e.Payload = payload } return nil } @@ -1264,15 +1251,13 @@ func evChecksUnchanged(e *stream.Event) error { // name but not ID simulating an in-place service rename. func evRenameService(e *stream.Event) error { csn := getPayloadCheckServiceNode(e.Payload) - isSidecar := csn.Service.Kind == structs.ServiceKindConnectProxy - if !isSidecar { + if csn.Service.Kind != structs.ServiceKindConnectProxy { csn.Service.Service += "_changed" // Update service checks if len(csn.Checks) >= 2 { csn.Checks[1].ServiceName += "_changed" } - e.Key += "_changed" return nil } // This is a sidecar, it's not really realistic but lets only update the @@ -1280,12 +1265,13 @@ func evRenameService(e *stream.Event) error { // we get the right result. This is certainly possible if not likely so a // valid case. - // We don't need to update out own details, only the name of the destination + // We don't need to update our own details, only the name of the destination csn.Service.Proxy.DestinationServiceName += "_changed" - // If this is the connect topic we need to change the key too if e.Topic == topicServiceHealthConnect { - e.Key += "_changed" + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = csn.Service.Proxy.DestinationServiceName + e.Payload = payload } return nil } @@ -1373,7 +1359,6 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st return stream.Event{ Topic: topicServiceHealth, - Key: svc, Index: index, Payload: EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -1444,7 +1429,6 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event { return stream.Event{ Topic: topicServiceHealth, - Key: svc, Index: index, Payload: EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index a16c635e1d..d75512195e 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -395,9 +395,8 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { for _, node := range nodes { event := stream.Event{ Topic: req.Topic, - Key: req.Key, Index: node.ModifyIndex, - Payload: node, + Payload: nodePayload{node: node, key: req.Key}, } snap.Append([]stream.Event{event}) } @@ -406,6 +405,15 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { } } +type nodePayload struct { + key string + node *structs.ServiceNode +} + +func (p nodePayload) FilterByKey(key, _ string) bool { + return p.key == key +} + func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { token := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index adbe0762c9..09d96ee6db 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -14,15 +14,23 @@ type Topic fmt.Stringer // EventPublisher and returned to Subscribers. type Event struct { Topic Topic - Key string Index uint64 - Payload interface{} + Payload Payload +} + +type Payload interface { + // FilterByKey must return true if the Payload should be included in a subscription + // requested with the key and namespace. + // Generally this means that the payload matches the key and namespace or + // the payload is a special framing event that should be returned to every + // subscription. + FilterByKey(key, namespace string) bool } // Len returns the number of events contained within this event. If the Payload // is a []Event, the length of that slice is returned. Otherwise 1 is returned. func (e Event) Len() int { - if batch, ok := e.Payload.([]Event); ok { + if batch, ok := e.Payload.(PayloadEvents); ok { return len(batch) } return 1 @@ -31,7 +39,7 @@ func (e Event) Len() int { // Filter returns an Event filtered to only those Events where f returns true. // If the second return value is false, every Event was removed by the filter. func (e Event) Filter(f func(Event) bool) (Event, bool) { - batch, ok := e.Payload.([]Event) + batch, ok := e.Payload.(PayloadEvents) if !ok { return e, f(e) } @@ -50,7 +58,7 @@ func (e Event) Filter(f func(Event) bool) (Event, bool) { return e, size != 0 } - filtered := make([]Event, 0, size) + filtered := make(PayloadEvents, 0, size) for idx := range batch { event := batch[idx] if f(event) { @@ -64,6 +72,20 @@ func (e Event) Filter(f func(Event) bool) (Event, bool) { return e, true } +// PayloadEvents is an Payload which contains multiple Events. +type PayloadEvents []Event + +// TODO: this method is not called, but needs to exist so that we can store +// a slice of events as a payload. In the future we should be able to refactor +// Event.Filter so that this FilterByKey includes the re-slicing. +func (e PayloadEvents) FilterByKey(_, _ string) bool { + return true +} + +func (e PayloadEvents) Events() []Event { + return e +} + // IsEndOfSnapshot returns true if this is a framing event that indicates the // snapshot has completed. Subsequent events from Subscription.Next will be // streamed as they occur. @@ -80,12 +102,24 @@ func (e Event) IsNewSnapshotToFollow() bool { type endOfSnapshot struct{} +func (endOfSnapshot) FilterByKey(string, string) bool { + return true +} + type newSnapshotToFollow struct{} +func (newSnapshotToFollow) FilterByKey(string, string) bool { + return true +} + type closeSubscriptionPayload struct { tokensSecretIDs []string } +func (closeSubscriptionPayload) FilterByKey(string, string) bool { + return true +} + // NewCloseSubscriptionEvent returns a special Event that is handled by the // stream package, and is never sent to subscribers. EventProcessor handles // these events, and closes any subscriptions which were created using a token diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 6109ce2d9e..53a1bf8dd0 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -185,7 +185,6 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) if req.Index > 0 { snap.buffer.Append([]Event{{ Topic: req.Topic, - Key: req.Key, Payload: newSnapshotToFollow{}, }}) diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 0dec574960..f2a9e43a36 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -43,24 +43,37 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) { events := []Event{{ Topic: testTopic, - Key: "sub-key", - Payload: "the-published-event-payload", + Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"}, }} publisher.Publish(events) // Subscriber should see the published event next = getNextEvent(t, eventCh) - expected := Event{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic} + expected := Event{ + Topic: testTopic, + Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"}, + } require.Equal(t, expected, next) } var testSnapshotEvent = Event{ Topic: testTopic, - Payload: "snapshot-event-payload", - Key: "sub-key", + Payload: simplePayload{key: "sub-key", value: "snapshot-event-payload"}, Index: 1, } +type simplePayload struct { + key string + value string +} + +func (p simplePayload) FilterByKey(key, _ string) bool { + if key == "" { + return true + } + return p.key == key +} + func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { @@ -193,8 +206,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { expected := Event{ Topic: testTopic, - Key: "sub-key", - Payload: "the-published-event-payload", + Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"}, Index: 3, } publisher.Publish([]Event{expected}) @@ -243,9 +255,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { expected := Event{ Topic: testTopic, - Key: "sub-key", Index: 3, - Payload: "event-3", + Payload: simplePayload{key: "sub-key", value: "event-3"}, } publisher.publishEvent([]Event{expected}) @@ -284,9 +295,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { nextEvent := Event{ Topic: testTopic, - Key: "sub-key", Index: 3, - Payload: "event-3", + Payload: simplePayload{key: "sub-key", value: "event-3"}, } runStep(t, "publish an event while unsubed", func(t *testing.T) { @@ -341,9 +351,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin nextEvent := Event{ Topic: testTopic, - Key: "sub-key", Index: 3, - Payload: "event-3", + Payload: simplePayload{key: "sub-key", value: "event-3"}, } runStep(t, "publish an event while unsubed", func(t *testing.T) { diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index 124cb32513..e20e53a4a1 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -37,7 +37,6 @@ func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, t } s.buffer.Append([]Event{{ Topic: req.Topic, - Key: req.Key, Index: idx, Payload: endOfSnapshot{}, }}) diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 98e3c683bc..7c8329e406 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -129,9 +129,9 @@ func TestEventSnapshot(t *testing.T) { e := curItem.Events[0] switch { case snapDone: - payload, ok := e.Payload.(string) + payload, ok := e.Payload.(simplePayload) require.True(t, ok, "want health event got: %#v", e.Payload) - updateIDs = append(updateIDs, payload) + updateIDs = append(updateIDs, payload.value) if len(updateIDs) == tc.updatesAfterSnap { // We're done! break RECV @@ -139,9 +139,9 @@ func TestEventSnapshot(t *testing.T) { case e.IsEndOfSnapshot(): snapDone = true default: - payload, ok := e.Payload.(string) + payload, ok := e.Payload.(simplePayload) require.True(t, ok, "want health event got: %#v", e.Payload) - snapIDs = append(snapIDs, payload) + snapIDs = append(snapIDs, payload.value) } } @@ -176,6 +176,6 @@ func newDefaultHealthEvent(index uint64, n int) Event { return Event{ Index: index, Topic: testTopic, - Payload: fmt.Sprintf("test-event-%03d", n), + Payload: simplePayload{value: fmt.Sprintf("test-event-%03d", n)}, } } diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 012b410928..bcc76acef7 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -53,9 +53,21 @@ type Subscription struct { // SubscribeRequest identifies the types of events the subscriber would like to // receiver. Topic and Token are required. type SubscribeRequest struct { + // Topic to subscribe to Topic Topic - Key string + // Key used to filter events in the topic. Only events matching the key will + // be returned by the subscription. A blank key will return all events. Key + // is generally the name of the resource. + Key string + // Namespace used to filter events in the topic. Only events matching the + // namespace will be returned by the subscription. + Namespace string + // Token that was used to authenticate the request. If any ACL policy + // changes impact the token the subscription will be forcefully closed. Token string + // Index is the last index the client received. If non-zero the + // subscription will be resumed from this index. If the index is out-of-date + // a NewSnapshotToFollow event will be sent. Index uint64 } @@ -115,9 +127,8 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event { } return Event{ Topic: req.Topic, - Key: req.Key, Index: first.Index, - Payload: events, + Payload: PayloadEvents(events), } } @@ -128,7 +139,7 @@ func filterByKey(req SubscribeRequest, events []Event) (Event, bool) { } fn := func(e Event) bool { - return req.Key == e.Key + return e.Payload.FilterByKey(req.Key, req.Namespace) } return event.Filter(fn) } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index db15313f57..dc6c8a06c2 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -69,7 +69,7 @@ func TestSubscription(t *testing.T) { require.True(t, elapsed < 200*time.Millisecond, "Event should have been delivered immediately, took %s", elapsed) require.Equal(t, index, got.Index) - require.Equal(t, "test", got.Key) + require.Equal(t, "test", got.Payload.(simplePayload).key) // Cancelling the subscription context should unblock Next start = time.Now() @@ -130,20 +130,17 @@ func TestSubscription_Close(t *testing.T) { } func publishTestEvent(index uint64, b *eventBuffer, key string) { - // Don't care about the event payload for now just the semantics of publishing - // something. This is not a valid stream in the end-to-end streaming protocol - // but enough to test subscription mechanics. e := Event{ - Index: index, - Topic: testTopic, - Key: key, + Index: index, + Topic: testTopic, + Payload: simplePayload{key: key}, } b.Append([]Event{e}) } func TestFilter_NoKey(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Key: "One", Index: 102}, Event{Key: "Two"}) + events := make(PayloadEvents, 0, 5) + events = append(events, newSimpleEvent("One", 102), newSimpleEvent("Two", 102)) req := SubscribeRequest{Topic: testTopic} actual, ok := filterByKey(req, events) @@ -151,26 +148,33 @@ func TestFilter_NoKey(t *testing.T) { require.Equal(t, Event{Topic: testTopic, Index: 102, Payload: events}, actual) // test that a new array was not allocated - require.Equal(t, cap(actual.Payload.([]Event)), 5) + require.Equal(t, cap(actual.Payload.(PayloadEvents)), 5) +} + +func newSimpleEvent(key string, index uint64) Event { + return Event{Index: index, Payload: simplePayload{key: key}} } func TestFilter_WithKey_AllEventsMatch(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Key: "Same", Index: 103}, Event{Key: "Same"}) + events := make(PayloadEvents, 0, 5) + events = append(events, newSimpleEvent("Same", 103), newSimpleEvent("Same", 103)) req := SubscribeRequest{Topic: testTopic, Key: "Same"} actual, ok := filterByKey(req, events) require.True(t, ok) - expected := Event{Topic: testTopic, Index: 103, Key: "Same", Payload: events} + expected := Event{Topic: testTopic, Index: 103, Payload: events} require.Equal(t, expected, actual) // test that a new array was not allocated - require.Equal(t, 5, cap(actual.Payload.([]Event))) + require.Equal(t, 5, cap(actual.Payload.(PayloadEvents))) } func TestFilter_WithKey_SomeEventsMatch(t *testing.T) { events := make([]Event, 0, 5) - events = append(events, Event{Key: "Same", Index: 104}, Event{Key: "Other"}, Event{Key: "Same"}) + events = append(events, + newSimpleEvent("Same", 104), + newSimpleEvent("Other", 0), + newSimpleEvent("Same", 0)) req := SubscribeRequest{Topic: testTopic, Key: "Same"} actual, ok := filterByKey(req, events) @@ -178,18 +182,17 @@ func TestFilter_WithKey_SomeEventsMatch(t *testing.T) { expected := Event{ Topic: testTopic, Index: 104, - Key: "Same", - Payload: []Event{{Key: "Same", Index: 104}, {Key: "Same"}}, + Payload: PayloadEvents{newSimpleEvent("Same", 104), newSimpleEvent("Same", 0)}, } require.Equal(t, expected, actual) // test that a new array was allocated with the correct size - require.Equal(t, cap(actual.Payload.([]Event)), 2) + require.Equal(t, cap(actual.Payload.(PayloadEvents)), 2) } func TestFilter_WithKey_NoEventsMatch(t *testing.T) { events := make([]Event, 0, 5) - events = append(events, Event{Key: "Same"}, Event{Key: "Same"}) + events = append(events, newSimpleEvent("Same", 0), newSimpleEvent("Same", 0)) req := SubscribeRequest{Topic: testTopic, Key: "Other"} _, ok := filterByKey(req, events) diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 563880ef5b..93c5e65d6c 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -153,9 +153,9 @@ func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { return e } -func setPayload(e *pbsubscribe.Event, payload interface{}) { +func setPayload(e *pbsubscribe.Event, payload stream.Payload) { switch p := payload.(type) { - case []stream.Event: + case stream.PayloadEvents: e.Payload = &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{ Events: batchEventsFromEventSlice(p), diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index 80f02d8cf4..d3187fdfc2 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -915,7 +915,7 @@ func TestNewEventFromSteamEvent(t *testing.T) { name: "event batch", event: stream.Event{ Index: 2002, - Payload: []stream.Event{ + Payload: stream.PayloadEvents{ { Index: 2002, Payload: state.EventPayloadCheckServiceNode{ diff --git a/agent/structs/structs_oss.go b/agent/structs/structs_oss.go index 522e2f0ffd..29a45520b4 100644 --- a/agent/structs/structs_oss.go +++ b/agent/structs/structs_oss.go @@ -74,6 +74,11 @@ func (_ *EnterpriseMeta) FillAuthzContext(_ *acl.AuthorizerContext) {} func (_ *EnterpriseMeta) Normalize() {} +// GetNamespace always returns the empty string. +func (_ *EnterpriseMeta) GetNamespace() string { + return "" +} + // FillAuthzContext stub func (_ *DirEntry) FillAuthzContext(_ *acl.AuthorizerContext) {}