diff --git a/agent/cache-types/streaming_events_test.go b/agent/cache-types/streaming_events_test.go index 05bc3649fd..272372754d 100644 --- a/agent/cache-types/streaming_events_test.go +++ b/agent/cache-types/streaming_events_test.go @@ -9,17 +9,15 @@ import ( "github.com/hashicorp/consul/types" ) -func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event { +func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { return &pbsubscribe.Event{ - Topic: topic, Index: index, Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, } } -func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic) *pbsubscribe.Event { +func newNewSnapshotToFollowEvent() *pbsubscribe.Event { return &pbsubscribe.Event{ - Topic: topic, Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, } } @@ -37,8 +35,6 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) return &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: svc, Index: index, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -117,8 +113,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs node := fmt.Sprintf("node%d", nodeNum) return &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: svc, Index: index, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -164,7 +158,6 @@ func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event events[i+1] = evs[i] } return &pbsubscribe.Event{ - Topic: first.Topic, Index: first.Index, Payload: &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{Events: events}, diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 3e794611b6..5c963c7a8a 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -26,7 +26,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { // Initially there are no services registered. Server should send an // EndOfSnapshot message immediately with index of 1. - client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1)) + client.QueueEvents(newEndOfSnapshotEvent(1)) opts := cache.FetchOptions{ MinIndex: 0, @@ -230,7 +230,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { registerServiceWeb(5, 1), registerServiceWeb(5, 2), registerServiceWeb(5, 3), - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) + newEndOfSnapshotEvent(5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ @@ -301,7 +301,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { registerServiceWeb(50, 3), // overlap existing node registerServiceWeb(50, 4), registerServiceWeb(50, 5), - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) + newEndOfSnapshotEvent(50)) // Make another blocking query with THE SAME index. It should immediately // return the new snapshot. @@ -324,11 +324,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { client.QueueErr(tempError("temporary connection error")) client.QueueEvents( - newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth), + newNewSnapshotToFollowEvent(), registerServiceWeb(50, 3), // overlap existing node registerServiceWeb(50, 4), registerServiceWeb(50, 5), - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) + newEndOfSnapshotEvent(50)) start := time.Now() opts.MinIndex = 49 @@ -358,7 +358,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { newEventServiceHealthRegister(5, 3, "web")) client.QueueEvents( batchEv, - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) + newEndOfSnapshotEvent(5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ @@ -428,7 +428,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { newEventServiceHealthRegister(5, 3, "web")) client.QueueEvents( batchEv, - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) + newEndOfSnapshotEvent(5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 9e681bb46c..f6af8e019f 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -13,6 +13,26 @@ import ( type EventPayloadCheckServiceNode struct { Op pbsubscribe.CatalogOp Value *structs.CheckServiceNode + // key is used to override the key used to filter the payload. It is set for + // events in the connect topic to specify the name of the underlying service + // when the change event is for a sidecar or gateway. + key string +} + +func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool { + if key == "" && namespace == "" { + return true + } + + if e.Value.Service == nil { + return false + } + + name := e.Value.Service.Service + if e.key != "" { + name = e.key + } + return key == name && namespace == e.Value.Service.EnterpriseMeta.GetNamespace() } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -42,10 +62,6 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { }, } - if n.Service != nil { - event.Key = n.Service.Service - } - // append each event as a separate item so that they can be serialized // separately, to prevent the encoding of one massive message. buf.Append([]stream.Event{event}) @@ -252,7 +268,9 @@ func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.S e := newServiceHealthEventDeregister(idx, before) e.Topic = topicServiceHealthConnect - e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = payload.Value.Service.Proxy.DestinationServiceName + e.Payload = payload return e, true } @@ -304,7 +322,9 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { result = append(result, connectEvent) case node.Service.Kind == structs.ServiceKindConnectProxy: - connectEvent.Key = node.Service.Proxy.DestinationServiceName + payload := event.Payload.(EventPayloadCheckServiceNode) + payload.key = node.Service.Proxy.DestinationServiceName + connectEvent.Payload = payload result = append(result, connectEvent) default: @@ -316,7 +336,7 @@ func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event { return result } -func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode { +func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNode { ep, ok := payload.(EventPayloadCheckServiceNode) if !ok { return nil @@ -431,7 +451,6 @@ func newServiceHealthEventRegister( } return stream.Event{ Topic: topicServiceHealth, - Key: sn.ServiceName, Index: idx, Payload: EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -458,7 +477,6 @@ func newServiceHealthEventDeregister(idx uint64, sn *structs.ServiceNode) stream return stream.Event{ Topic: topicServiceHealth, - Key: sn.ServiceName, Index: idx, Payload: EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 95d13c0df4..ac4a07d67e 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -4,12 +4,15 @@ import ( "fmt" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" - "github.com/stretchr/testify/require" ) func TestServiceHealthEventsFromChanges(t *testing.T) { @@ -819,6 +822,7 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { return nil }, + WantEvents: []stream.Event{ // We should see: // - service dereg for web and proxy on node2 @@ -829,29 +833,15 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { // - connect reg for api on node2 testServiceHealthDeregistrationEvent(t, "web", evNode2), testServiceHealthDeregistrationEvent(t, "web", evNode2, evSidecar), - testServiceHealthDeregistrationEvent(t, "web", - evConnectTopic, - evNode2, - evSidecar, - ), + testServiceHealthDeregistrationEvent(t, "web", evConnectTopic, evNode2, evSidecar), testServiceHealthEvent(t, "web", evNodeUnchanged), testServiceHealthEvent(t, "web", evSidecar, evNodeUnchanged), testServiceHealthEvent(t, "web", evConnectTopic, evSidecar, evNodeUnchanged), - testServiceHealthEvent(t, "api", - evNode2, - evConnectNative, - evNodeUnchanged, - ), - testServiceHealthEvent(t, "api", - evNode2, - evConnectTopic, - evConnectNative, - evNodeUnchanged, - ), + testServiceHealthEvent(t, "api", evNode2, evConnectNative, evNodeUnchanged), + testServiceHealthEvent(t, "api", evNode2, evConnectTopic, evConnectNative, evNodeUnchanged), }, - WantErr: false, }, } @@ -884,17 +874,36 @@ func TestServiceHealthEventsFromChanges(t *testing.T) { } require.NoError(t, err) - // Make sure we have the right events, only taking ordering into account - // where it matters to account for non-determinism. - requireEventsInCorrectPartialOrder(t, tc.WantEvents, got, func(e stream.Event) string { - // We need events affecting unique registrations to be ordered, within a topic - csn := getPayloadCheckServiceNode(e.Payload) - return fmt.Sprintf("%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service) - }) + assertDeepEqual(t, tc.WantEvents, got, cmpPartialOrderEvents) }) } } +func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { + t.Helper() + if diff := cmp.Diff(x, y, opts...); diff != "" { + t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) + } +} + +// cmpPartialOrderEvents returns a compare option which sorts events so that +// all events for a particular node/service are grouped together. The sort is +// stable so events with the same node/service retain their relative order. +var cmpPartialOrderEvents = cmp.Options{ + cmpopts.SortSlices(func(i, j stream.Event) bool { + key := func(e stream.Event) string { + csn := getPayloadCheckServiceNode(e.Payload) + return fmt.Sprintf("%s/%s/%s", e.Topic, csn.Node.Node, csn.Service.Service) + } + return key(i) < key(j) + }), + cmpEvents, +} + +var cmpEvents = cmp.Options{ + cmp.AllowUnexported(EventPayloadCheckServiceNode{}), +} + type regOption func(req *structs.RegisterRequest) error func testNodeRegistration(t *testing.T, opts ...regOption) *structs.RegisterRequest { @@ -1170,10 +1179,10 @@ func evSidecar(e *stream.Event) error { csn.Checks[1].ServiceName = svc + "_sidecar_proxy" } - // Update event key to be the proxy service name, but only if this is not - // already in the connect topic - if e.Topic != topicServiceHealthConnect { - e.Key = csn.Service.Service + if e.Topic == topicServiceHealthConnect { + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = svc + e.Payload = payload } return nil } @@ -1242,15 +1251,13 @@ func evChecksUnchanged(e *stream.Event) error { // name but not ID simulating an in-place service rename. func evRenameService(e *stream.Event) error { csn := getPayloadCheckServiceNode(e.Payload) - isSidecar := csn.Service.Kind == structs.ServiceKindConnectProxy - if !isSidecar { + if csn.Service.Kind != structs.ServiceKindConnectProxy { csn.Service.Service += "_changed" // Update service checks if len(csn.Checks) >= 2 { csn.Checks[1].ServiceName += "_changed" } - e.Key += "_changed" return nil } // This is a sidecar, it's not really realistic but lets only update the @@ -1258,12 +1265,13 @@ func evRenameService(e *stream.Event) error { // we get the right result. This is certainly possible if not likely so a // valid case. - // We don't need to update out own details, only the name of the destination + // We don't need to update our own details, only the name of the destination csn.Service.Proxy.DestinationServiceName += "_changed" - // If this is the connect topic we need to change the key too if e.Topic == topicServiceHealthConnect { - e.Key += "_changed" + payload := e.Payload.(EventPayloadCheckServiceNode) + payload.key = csn.Service.Proxy.DestinationServiceName + e.Payload = payload } return nil } @@ -1337,48 +1345,6 @@ func evServiceCheckDelete(e *stream.Event) error { return nil } -// requireEventsInCorrectPartialOrder compares that the expected set of events -// was emitted. It allows for _independent_ events to be emitted in any order - -// this can be important because even though the transaction processing is all -// strictly ordered up until the processing func, grouping multiple updates that -// affect the same logical entity may be necessary and may impose random -// ordering changes on the eventual events if a map is used. We only care that -// events _affecting the same topic and key_ are ordered correctly with respect -// to the "expected" set of events so this helper asserts that. -// -// The caller provides a func that can return a partition key for the given -// event types and we assert that all events with the same partition key are -// deliveries in the same order. Note that this is not necessarily the same as -// topic/key since for example in Catalog only events about a specific service -// _instance_ need to be ordered while topic and key are more general. -func requireEventsInCorrectPartialOrder(t *testing.T, want, got []stream.Event, - partKey func(stream.Event) string) { - t.Helper() - - // Partion both arrays by topic/key - wantParts := make(map[string][]stream.Event) - gotParts := make(map[string][]stream.Event) - - for _, e := range want { - k := partKey(e) - wantParts[k] = append(wantParts[k], e) - } - for _, e := range got { - k := partKey(e) - gotParts[k] = append(gotParts[k], e) - } - - for k, want := range wantParts { - require.Equal(t, want, gotParts[k], "got incorrect events for partition: %s", k) - } - - for k, got := range gotParts { - if _, ok := wantParts[k]; !ok { - require.Equal(t, nil, got, "got unwanted events for partition: %s", k) - } - } -} - // newTestEventServiceHealthRegister returns a realistically populated service // health registration event. The nodeNum is a // logical node and is used to create the node name ("node%d") but also change @@ -1393,7 +1359,6 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st return stream.Event{ Topic: topicServiceHealth, - Key: svc, Index: index, Payload: EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -1464,7 +1429,6 @@ func newTestEventServiceHealthRegister(index uint64, nodeNum int, svc string) st func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) stream.Event { return stream.Event{ Topic: topicServiceHealth, - Key: svc, Index: index, Payload: EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index a16c635e1d..d75512195e 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -395,9 +395,8 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { for _, node := range nodes { event := stream.Event{ Topic: req.Topic, - Key: req.Key, Index: node.ModifyIndex, - Payload: node, + Payload: nodePayload{node: node, key: req.Key}, } snap.Append([]stream.Event{event}) } @@ -406,6 +405,15 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { } } +type nodePayload struct { + key string + node *structs.ServiceNode +} + +func (p nodePayload) FilterByKey(key, _ string) bool { + return p.key == key +} + func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { token := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index adbe0762c9..09d96ee6db 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -14,15 +14,23 @@ type Topic fmt.Stringer // EventPublisher and returned to Subscribers. type Event struct { Topic Topic - Key string Index uint64 - Payload interface{} + Payload Payload +} + +type Payload interface { + // FilterByKey must return true if the Payload should be included in a subscription + // requested with the key and namespace. + // Generally this means that the payload matches the key and namespace or + // the payload is a special framing event that should be returned to every + // subscription. + FilterByKey(key, namespace string) bool } // Len returns the number of events contained within this event. If the Payload // is a []Event, the length of that slice is returned. Otherwise 1 is returned. func (e Event) Len() int { - if batch, ok := e.Payload.([]Event); ok { + if batch, ok := e.Payload.(PayloadEvents); ok { return len(batch) } return 1 @@ -31,7 +39,7 @@ func (e Event) Len() int { // Filter returns an Event filtered to only those Events where f returns true. // If the second return value is false, every Event was removed by the filter. func (e Event) Filter(f func(Event) bool) (Event, bool) { - batch, ok := e.Payload.([]Event) + batch, ok := e.Payload.(PayloadEvents) if !ok { return e, f(e) } @@ -50,7 +58,7 @@ func (e Event) Filter(f func(Event) bool) (Event, bool) { return e, size != 0 } - filtered := make([]Event, 0, size) + filtered := make(PayloadEvents, 0, size) for idx := range batch { event := batch[idx] if f(event) { @@ -64,6 +72,20 @@ func (e Event) Filter(f func(Event) bool) (Event, bool) { return e, true } +// PayloadEvents is an Payload which contains multiple Events. +type PayloadEvents []Event + +// TODO: this method is not called, but needs to exist so that we can store +// a slice of events as a payload. In the future we should be able to refactor +// Event.Filter so that this FilterByKey includes the re-slicing. +func (e PayloadEvents) FilterByKey(_, _ string) bool { + return true +} + +func (e PayloadEvents) Events() []Event { + return e +} + // IsEndOfSnapshot returns true if this is a framing event that indicates the // snapshot has completed. Subsequent events from Subscription.Next will be // streamed as they occur. @@ -80,12 +102,24 @@ func (e Event) IsNewSnapshotToFollow() bool { type endOfSnapshot struct{} +func (endOfSnapshot) FilterByKey(string, string) bool { + return true +} + type newSnapshotToFollow struct{} +func (newSnapshotToFollow) FilterByKey(string, string) bool { + return true +} + type closeSubscriptionPayload struct { tokensSecretIDs []string } +func (closeSubscriptionPayload) FilterByKey(string, string) bool { + return true +} + // NewCloseSubscriptionEvent returns a special Event that is handled by the // stream package, and is never sent to subscribers. EventProcessor handles // these events, and closes any subscriptions which were created using a token diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 6109ce2d9e..53a1bf8dd0 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -185,7 +185,6 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) if req.Index > 0 { snap.buffer.Append([]Event{{ Topic: req.Topic, - Key: req.Key, Payload: newSnapshotToFollow{}, }}) diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 0dec574960..f2a9e43a36 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -43,24 +43,37 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) { events := []Event{{ Topic: testTopic, - Key: "sub-key", - Payload: "the-published-event-payload", + Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"}, }} publisher.Publish(events) // Subscriber should see the published event next = getNextEvent(t, eventCh) - expected := Event{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic} + expected := Event{ + Topic: testTopic, + Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"}, + } require.Equal(t, expected, next) } var testSnapshotEvent = Event{ Topic: testTopic, - Payload: "snapshot-event-payload", - Key: "sub-key", + Payload: simplePayload{key: "sub-key", value: "snapshot-event-payload"}, Index: 1, } +type simplePayload struct { + key string + value string +} + +func (p simplePayload) FilterByKey(key, _ string) bool { + if key == "" { + return true + } + return p.key == key +} + func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { @@ -193,8 +206,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { expected := Event{ Topic: testTopic, - Key: "sub-key", - Payload: "the-published-event-payload", + Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"}, Index: 3, } publisher.Publish([]Event{expected}) @@ -243,9 +255,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { expected := Event{ Topic: testTopic, - Key: "sub-key", Index: 3, - Payload: "event-3", + Payload: simplePayload{key: "sub-key", value: "event-3"}, } publisher.publishEvent([]Event{expected}) @@ -284,9 +295,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { nextEvent := Event{ Topic: testTopic, - Key: "sub-key", Index: 3, - Payload: "event-3", + Payload: simplePayload{key: "sub-key", value: "event-3"}, } runStep(t, "publish an event while unsubed", func(t *testing.T) { @@ -341,9 +351,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin nextEvent := Event{ Topic: testTopic, - Key: "sub-key", Index: 3, - Payload: "event-3", + Payload: simplePayload{key: "sub-key", value: "event-3"}, } runStep(t, "publish an event while unsubed", func(t *testing.T) { diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index 124cb32513..e20e53a4a1 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -37,7 +37,6 @@ func (s *eventSnapshot) appendAndSplice(req SubscribeRequest, fn SnapshotFunc, t } s.buffer.Append([]Event{{ Topic: req.Topic, - Key: req.Key, Index: idx, Payload: endOfSnapshot{}, }}) diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 98e3c683bc..7c8329e406 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -129,9 +129,9 @@ func TestEventSnapshot(t *testing.T) { e := curItem.Events[0] switch { case snapDone: - payload, ok := e.Payload.(string) + payload, ok := e.Payload.(simplePayload) require.True(t, ok, "want health event got: %#v", e.Payload) - updateIDs = append(updateIDs, payload) + updateIDs = append(updateIDs, payload.value) if len(updateIDs) == tc.updatesAfterSnap { // We're done! break RECV @@ -139,9 +139,9 @@ func TestEventSnapshot(t *testing.T) { case e.IsEndOfSnapshot(): snapDone = true default: - payload, ok := e.Payload.(string) + payload, ok := e.Payload.(simplePayload) require.True(t, ok, "want health event got: %#v", e.Payload) - snapIDs = append(snapIDs, payload) + snapIDs = append(snapIDs, payload.value) } } @@ -176,6 +176,6 @@ func newDefaultHealthEvent(index uint64, n int) Event { return Event{ Index: index, Topic: testTopic, - Payload: fmt.Sprintf("test-event-%03d", n), + Payload: simplePayload{value: fmt.Sprintf("test-event-%03d", n)}, } } diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 012b410928..bcc76acef7 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -53,9 +53,21 @@ type Subscription struct { // SubscribeRequest identifies the types of events the subscriber would like to // receiver. Topic and Token are required. type SubscribeRequest struct { + // Topic to subscribe to Topic Topic - Key string + // Key used to filter events in the topic. Only events matching the key will + // be returned by the subscription. A blank key will return all events. Key + // is generally the name of the resource. + Key string + // Namespace used to filter events in the topic. Only events matching the + // namespace will be returned by the subscription. + Namespace string + // Token that was used to authenticate the request. If any ACL policy + // changes impact the token the subscription will be forcefully closed. Token string + // Index is the last index the client received. If non-zero the + // subscription will be resumed from this index. If the index is out-of-date + // a NewSnapshotToFollow event will be sent. Index uint64 } @@ -115,9 +127,8 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event { } return Event{ Topic: req.Topic, - Key: req.Key, Index: first.Index, - Payload: events, + Payload: PayloadEvents(events), } } @@ -128,7 +139,7 @@ func filterByKey(req SubscribeRequest, events []Event) (Event, bool) { } fn := func(e Event) bool { - return req.Key == e.Key + return e.Payload.FilterByKey(req.Key, req.Namespace) } return event.Filter(fn) } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index db15313f57..dc6c8a06c2 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -69,7 +69,7 @@ func TestSubscription(t *testing.T) { require.True(t, elapsed < 200*time.Millisecond, "Event should have been delivered immediately, took %s", elapsed) require.Equal(t, index, got.Index) - require.Equal(t, "test", got.Key) + require.Equal(t, "test", got.Payload.(simplePayload).key) // Cancelling the subscription context should unblock Next start = time.Now() @@ -130,20 +130,17 @@ func TestSubscription_Close(t *testing.T) { } func publishTestEvent(index uint64, b *eventBuffer, key string) { - // Don't care about the event payload for now just the semantics of publishing - // something. This is not a valid stream in the end-to-end streaming protocol - // but enough to test subscription mechanics. e := Event{ - Index: index, - Topic: testTopic, - Key: key, + Index: index, + Topic: testTopic, + Payload: simplePayload{key: key}, } b.Append([]Event{e}) } func TestFilter_NoKey(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Key: "One", Index: 102}, Event{Key: "Two"}) + events := make(PayloadEvents, 0, 5) + events = append(events, newSimpleEvent("One", 102), newSimpleEvent("Two", 102)) req := SubscribeRequest{Topic: testTopic} actual, ok := filterByKey(req, events) @@ -151,26 +148,33 @@ func TestFilter_NoKey(t *testing.T) { require.Equal(t, Event{Topic: testTopic, Index: 102, Payload: events}, actual) // test that a new array was not allocated - require.Equal(t, cap(actual.Payload.([]Event)), 5) + require.Equal(t, cap(actual.Payload.(PayloadEvents)), 5) +} + +func newSimpleEvent(key string, index uint64) Event { + return Event{Index: index, Payload: simplePayload{key: key}} } func TestFilter_WithKey_AllEventsMatch(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, Event{Key: "Same", Index: 103}, Event{Key: "Same"}) + events := make(PayloadEvents, 0, 5) + events = append(events, newSimpleEvent("Same", 103), newSimpleEvent("Same", 103)) req := SubscribeRequest{Topic: testTopic, Key: "Same"} actual, ok := filterByKey(req, events) require.True(t, ok) - expected := Event{Topic: testTopic, Index: 103, Key: "Same", Payload: events} + expected := Event{Topic: testTopic, Index: 103, Payload: events} require.Equal(t, expected, actual) // test that a new array was not allocated - require.Equal(t, 5, cap(actual.Payload.([]Event))) + require.Equal(t, 5, cap(actual.Payload.(PayloadEvents))) } func TestFilter_WithKey_SomeEventsMatch(t *testing.T) { events := make([]Event, 0, 5) - events = append(events, Event{Key: "Same", Index: 104}, Event{Key: "Other"}, Event{Key: "Same"}) + events = append(events, + newSimpleEvent("Same", 104), + newSimpleEvent("Other", 0), + newSimpleEvent("Same", 0)) req := SubscribeRequest{Topic: testTopic, Key: "Same"} actual, ok := filterByKey(req, events) @@ -178,18 +182,17 @@ func TestFilter_WithKey_SomeEventsMatch(t *testing.T) { expected := Event{ Topic: testTopic, Index: 104, - Key: "Same", - Payload: []Event{{Key: "Same", Index: 104}, {Key: "Same"}}, + Payload: PayloadEvents{newSimpleEvent("Same", 104), newSimpleEvent("Same", 0)}, } require.Equal(t, expected, actual) // test that a new array was allocated with the correct size - require.Equal(t, cap(actual.Payload.([]Event)), 2) + require.Equal(t, cap(actual.Payload.(PayloadEvents)), 2) } func TestFilter_WithKey_NoEventsMatch(t *testing.T) { events := make([]Event, 0, 5) - events = append(events, Event{Key: "Same"}, Event{Key: "Same"}) + events = append(events, newSimpleEvent("Same", 0), newSimpleEvent("Same", 0)) req := SubscribeRequest{Topic: testTopic, Key: "Other"} _, ok := filterByKey(req, events) diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index b7eda488e4..93c5e65d6c 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -83,7 +83,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub } elog.Trace(event) - e := newEventFromStreamEvent(req.Topic, event) + e := newEventFromStreamEvent(event) if err := serverStream.Send(e); err != nil { return err } @@ -139,12 +139,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) return event.Filter(fn) } -func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsubscribe.Event { - e := &pbsubscribe.Event{ - Topic: topic, - Key: event.Key, - Index: event.Index, - } +func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { + e := &pbsubscribe.Event{Index: event.Index} switch { case event.IsEndOfSnapshot(): e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true} @@ -157,9 +153,9 @@ func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsub return e } -func setPayload(e *pbsubscribe.Event, payload interface{}) { +func setPayload(e *pbsubscribe.Event, payload stream.Payload) { switch p := payload.(type) { - case []stream.Event: + case stream.PayloadEvents: e.Payload = &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{ Events: batchEventsFromEventSlice(p), @@ -182,7 +178,7 @@ func batchEventsFromEventSlice(events []stream.Event) []*pbsubscribe.Event { result := make([]*pbsubscribe.Event, len(events)) for i := range events { event := events[i] - result[i] = &pbsubscribe.Event{Key: event.Key, Index: event.Index} + result[i] = &pbsubscribe.Event{Index: event.Index} setPayload(result[i], event.Payload) } return result diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index 436687155f..cd5ebead82 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -107,8 +107,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { runStep(t, "receive the initial snapshot of events", func(t *testing.T) { expected := []*pbsubscribe.Event{ { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -139,8 +137,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { }, }, { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -171,8 +167,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { }, }, { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, }, @@ -192,8 +186,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { event := getEvent(t, chEvents) expectedEvent := &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -460,8 +452,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { runStep(t, "receive the initial snapshot of events", func(t *testing.T) { expected := []*pbsubscribe.Event{ { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -492,8 +482,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }, }, { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -524,8 +512,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }, }, { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, }, @@ -545,8 +531,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { event := getEvent(t, chEvents) expectedEvent := &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -902,11 +886,9 @@ func TestNewEventFromSteamEvent(t *testing.T) { expected pbsubscribe.Event } - testTopic := pbsubscribe.Topic_ServiceHealthConnect fn := func(t *testing.T, tc testCase) { expected := tc.expected - expected.Topic = testTopic - actual := newEventFromStreamEvent(testTopic, tc.event) + actual := newEventFromStreamEvent(tc.event) assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty()) } @@ -929,11 +911,9 @@ func TestNewEventFromSteamEvent(t *testing.T) { { name: "event batch", event: stream.Event{ - Key: "web1", Index: 2002, - Payload: []stream.Event{ + Payload: stream.PayloadEvents{ { - Key: "web1", Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -944,7 +924,6 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, { - Key: "web1", Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, @@ -957,13 +936,11 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, expected: pbsubscribe.Event{ - Key: "web1", Index: 2002, Payload: &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{ Events: []*pbsubscribe.Event{ { - Key: "web1", Index: 2002, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -976,7 +953,6 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, { - Key: "web1", Index: 2002, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ @@ -996,7 +972,6 @@ func TestNewEventFromSteamEvent(t *testing.T) { { name: "event payload CheckServiceNode", event: stream.Event{ - Key: "web1", Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -1007,7 +982,6 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, expected: pbsubscribe.Event{ - Key: "web1", Index: 2002, Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ diff --git a/agent/structs/structs_oss.go b/agent/structs/structs_oss.go index 522e2f0ffd..29a45520b4 100644 --- a/agent/structs/structs_oss.go +++ b/agent/structs/structs_oss.go @@ -74,6 +74,11 @@ func (_ *EnterpriseMeta) FillAuthzContext(_ *acl.AuthorizerContext) {} func (_ *EnterpriseMeta) Normalize() {} +// GetNamespace always returns the empty string. +func (_ *EnterpriseMeta) GetNamespace() string { + return "" +} + // FillAuthzContext stub func (_ *DirEntry) FillAuthzContext(_ *acl.AuthorizerContext) {} diff --git a/proto/pbsubscribe/subscribe.pb.go b/proto/pbsubscribe/subscribe.pb.go index 35122711ec..f7e2011d99 100644 --- a/proto/pbsubscribe/subscribe.pb.go +++ b/proto/pbsubscribe/subscribe.pb.go @@ -108,7 +108,12 @@ type SubscribeRequest struct { // If it's not the local DC the server will forward the request to // the remote DC and proxy the results back to the subscriber. An empty // string defaults to the local datacenter. - Datacenter string `protobuf:"bytes,5,opt,name=Datacenter,proto3" json:"Datacenter,omitempty"` + Datacenter string `protobuf:"bytes,5,opt,name=Datacenter,proto3" json:"Datacenter,omitempty"` + // Namespace which contains the resources. If Namespace is not specified the + // default namespace will be used. + // + // Namespace is an enterprise-only feature. + Namespace string `protobuf:"bytes,6,opt,name=Namespace,proto3" json:"Namespace,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -182,20 +187,23 @@ func (m *SubscribeRequest) GetDatacenter() string { return "" } +func (m *SubscribeRequest) GetNamespace() string { + if m != nil { + return m.Namespace + } + return "" +} + // Event describes a streaming update on a subscription. Events are used both to // describe the current "snapshot" of the result as well as ongoing mutations to // that snapshot. type Event struct { - // Topic the event was published to - Topic Topic `protobuf:"varint,1,opt,name=Topic,proto3,enum=subscribe.Topic" json:"Topic,omitempty"` - // Key is the logical identifier for the entity that was mutated. - Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` // Index is the raft index at which the mutation took place. At the top // level of a subscription there will always be at most one Event per index. // If multiple events are published to the same topic in a single raft // transaction then the batch of events will be encoded inside a single // top-level event to ensure they are delivered atomically to clients. - Index uint64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"` + Index uint64 `protobuf:"varint,1,opt,name=Index,proto3" json:"Index,omitempty"` // Payload is the actual event content. // // Types that are valid to be assigned to Payload: @@ -249,13 +257,13 @@ type isEvent_Payload interface { } type Event_EndOfSnapshot struct { - EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"` + EndOfSnapshot bool `protobuf:"varint,2,opt,name=EndOfSnapshot,proto3,oneof"` } type Event_NewSnapshotToFollow struct { - NewSnapshotToFollow bool `protobuf:"varint,6,opt,name=NewSnapshotToFollow,proto3,oneof"` + NewSnapshotToFollow bool `protobuf:"varint,3,opt,name=NewSnapshotToFollow,proto3,oneof"` } type Event_EventBatch struct { - EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"` + EventBatch *EventBatch `protobuf:"bytes,4,opt,name=EventBatch,proto3,oneof"` } type Event_ServiceHealth struct { ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"` @@ -273,20 +281,6 @@ func (m *Event) GetPayload() isEvent_Payload { return nil } -func (m *Event) GetTopic() Topic { - if m != nil { - return m.Topic - } - return Topic_Unknown -} - -func (m *Event) GetKey() string { - if m != nil { - return m.Key - } - return "" -} - func (m *Event) GetIndex() uint64 { if m != nil { return m.Index @@ -341,17 +335,17 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { if x.EndOfSnapshot { t = 1 } - _ = b.EncodeVarint(5<<3 | proto.WireVarint) + _ = b.EncodeVarint(2<<3 | proto.WireVarint) _ = b.EncodeVarint(t) case *Event_NewSnapshotToFollow: t := uint64(0) if x.NewSnapshotToFollow { t = 1 } - _ = b.EncodeVarint(6<<3 | proto.WireVarint) + _ = b.EncodeVarint(3<<3 | proto.WireVarint) _ = b.EncodeVarint(t) case *Event_EventBatch: - _ = b.EncodeVarint(7<<3 | proto.WireBytes) + _ = b.EncodeVarint(4<<3 | proto.WireBytes) if err := b.EncodeMessage(x.EventBatch); err != nil { return err } @@ -370,21 +364,21 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { m := msg.(*Event) switch tag { - case 5: // Payload.EndOfSnapshot + case 2: // Payload.EndOfSnapshot if wire != proto.WireVarint { return true, proto.ErrInternalBadWireType } x, err := b.DecodeVarint() m.Payload = &Event_EndOfSnapshot{x != 0} return true, err - case 6: // Payload.NewSnapshotToFollow + case 3: // Payload.NewSnapshotToFollow if wire != proto.WireVarint { return true, proto.ErrInternalBadWireType } x, err := b.DecodeVarint() m.Payload = &Event_NewSnapshotToFollow{x != 0} return true, err - case 7: // Payload.EventBatch + case 4: // Payload.EventBatch if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -546,40 +540,41 @@ func init() { func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) } var fileDescriptor_ab3eb8c810e315fb = []byte{ - // 526 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0x5f, 0x6f, 0xd2, 0x50, - 0x14, 0xef, 0x85, 0x01, 0xe3, 0xe0, 0x96, 0x7a, 0x87, 0xb1, 0x61, 0x49, 0x83, 0xc4, 0x2c, 0x75, - 0x89, 0xd4, 0x60, 0xa2, 0x6f, 0x1a, 0x61, 0x9b, 0x18, 0x93, 0x61, 0xca, 0xf6, 0xa0, 0x6f, 0x97, - 0xf6, 0x48, 0x1b, 0xea, 0xbd, 0xb5, 0xbd, 0x0c, 0xf7, 0xee, 0x87, 0xd8, 0xb7, 0xf1, 0xd5, 0x47, - 0x3f, 0x82, 0xc1, 0x2f, 0x62, 0xb8, 0x94, 0xae, 0xc0, 0xde, 0xf6, 0xd6, 0xf3, 0xfb, 0x73, 0xcf, - 0x2f, 0xe7, 0xf4, 0xc0, 0x93, 0x28, 0x16, 0x52, 0xd8, 0xd1, 0x28, 0x99, 0x8e, 0x12, 0x37, 0x0e, - 0x46, 0x68, 0x67, 0x5f, 0x6d, 0xc5, 0xd1, 0x6a, 0x06, 0x34, 0x1a, 0x99, 0x1a, 0xe3, 0xab, 0xc0, - 0x45, 0x9b, 0x0b, 0x2f, 0x95, 0xb5, 0x6e, 0x08, 0xe8, 0xc3, 0x95, 0xd2, 0xc1, 0xef, 0x53, 0x4c, - 0x24, 0x3d, 0x82, 0xd2, 0x85, 0x88, 0x02, 0xd7, 0x20, 0x4d, 0x62, 0xed, 0x77, 0xf4, 0xf6, 0xed, - 0xe3, 0x0a, 0x77, 0x96, 0x34, 0xd5, 0xa1, 0xf8, 0x11, 0xaf, 0x8d, 0x42, 0x93, 0x58, 0x55, 0x67, - 0xf1, 0x49, 0xeb, 0x0b, 0xe7, 0x04, 0xb9, 0x51, 0x54, 0xd8, 0xb2, 0x58, 0xa0, 0x1f, 0xb8, 0x87, - 0x3f, 0x8c, 0x9d, 0x26, 0xb1, 0x76, 0x9c, 0x65, 0x41, 0x4d, 0x80, 0x13, 0x26, 0x99, 0x8b, 0x5c, - 0x62, 0x6c, 0x94, 0x94, 0x21, 0x87, 0xb4, 0x7e, 0x15, 0xa0, 0x74, 0x7a, 0x85, 0xfc, 0x9e, 0x79, - 0x96, 0x9d, 0x8b, 0xf9, 0xce, 0x47, 0xb0, 0x77, 0xca, 0xbd, 0xc1, 0xd7, 0x21, 0x67, 0x51, 0xe2, - 0x0b, 0xa9, 0x9a, 0xef, 0xf6, 0x35, 0x67, 0x1d, 0xa6, 0x1d, 0x38, 0x38, 0xc7, 0xd9, 0xaa, 0xbc, - 0x10, 0x67, 0x22, 0x0c, 0xc5, 0xcc, 0x28, 0xa7, 0xea, 0xbb, 0x48, 0xfa, 0x1a, 0x40, 0x85, 0xee, - 0x32, 0xe9, 0xfa, 0x46, 0xa5, 0x49, 0xac, 0x5a, 0xe7, 0x51, 0x2e, 0xf0, 0x2d, 0xd9, 0xd7, 0x9c, - 0x9c, 0x94, 0x9e, 0xc1, 0xde, 0x70, 0xb9, 0x9f, 0x3e, 0xb2, 0x50, 0xfa, 0x06, 0x28, 0xaf, 0x99, - 0xf3, 0xae, 0xf1, 0x97, 0x91, 0xc7, 0x24, 0x2e, 0x42, 0xaf, 0xc1, 0xdd, 0x2a, 0x54, 0x3e, 0xb1, - 0xeb, 0x50, 0x30, 0xaf, 0xf5, 0x2a, 0x9f, 0x85, 0x5a, 0x50, 0x56, 0x55, 0x62, 0x90, 0x66, 0xd1, - 0xaa, 0xad, 0x8d, 0x51, 0x11, 0x4e, 0xca, 0xb7, 0x7e, 0x12, 0x38, 0xb8, 0xa3, 0x17, 0x7d, 0x0a, - 0x85, 0x41, 0x94, 0x2e, 0xa1, 0x9e, 0x73, 0xf7, 0x98, 0x64, 0xa1, 0x18, 0x0f, 0x22, 0xa7, 0x30, - 0x88, 0xe8, 0x7b, 0xd0, 0x7b, 0x3e, 0xba, 0x93, 0xf4, 0x85, 0x73, 0xe1, 0xa1, 0x5a, 0x49, 0xad, - 0x73, 0xd8, 0xce, 0xfe, 0xc1, 0xf6, 0xa6, 0xc4, 0xd9, 0x32, 0x1d, 0xbf, 0x4b, 0xd7, 0x4e, 0x6b, - 0x50, 0xb9, 0xe4, 0x13, 0x2e, 0x66, 0x5c, 0xd7, 0xe8, 0xc3, 0x8d, 0x39, 0xe9, 0x84, 0x1a, 0x50, - 0x5f, 0x83, 0x7a, 0x82, 0x73, 0x74, 0xa5, 0x5e, 0x38, 0x7e, 0x06, 0xd5, 0x2c, 0x1c, 0x7d, 0x00, - 0xbb, 0x0e, 0x8e, 0x83, 0x44, 0x62, 0xac, 0x6b, 0x74, 0x1f, 0xe0, 0x04, 0xe3, 0x55, 0x4d, 0x3a, - 0x9f, 0xe1, 0xf1, 0x50, 0x32, 0x89, 0x3d, 0x9f, 0xf1, 0x31, 0xa6, 0x37, 0x11, 0xc9, 0x40, 0x70, - 0xfa, 0x06, 0xaa, 0xd9, 0x8d, 0xd0, 0xc3, 0xfc, 0x42, 0x36, 0x2e, 0xa7, 0xb1, 0x35, 0xd3, 0x96, - 0xf6, 0x82, 0x74, 0xdf, 0xfe, 0x9e, 0x9b, 0xe4, 0xcf, 0xdc, 0x24, 0x7f, 0xe7, 0x26, 0xb9, 0xf9, - 0x67, 0x6a, 0x5f, 0x9e, 0x8f, 0x03, 0xe9, 0x4f, 0x47, 0x6d, 0x57, 0x7c, 0xb3, 0x7d, 0x96, 0xf8, - 0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05, 0x4f, 0xa6, 0xa1, 0xbd, 0x75, 0xdc, 0xa3, 0xb2, 0x82, 0x5e, - 0xfe, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x44, 0xbc, 0x0a, 0xfb, 0xf8, 0x03, 0x00, 0x00, + // 536 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x5f, 0x6f, 0xd2, 0x50, + 0x14, 0xef, 0x65, 0x83, 0xad, 0x07, 0xb7, 0xd4, 0x3b, 0x8c, 0x0d, 0x33, 0x0d, 0x12, 0xb3, 0xe0, + 0x12, 0xa9, 0xc1, 0x44, 0xdf, 0x34, 0xc2, 0x36, 0x31, 0x26, 0x60, 0xca, 0xf6, 0xa0, 0x6f, 0x97, + 0xf6, 0x48, 0x1b, 0xba, 0x7b, 0x6b, 0x7b, 0x19, 0xee, 0x5d, 0xbf, 0x83, 0x9f, 0xc4, 0xcf, 0xe0, + 0xa3, 0x1f, 0xc1, 0xe0, 0x17, 0x31, 0x5c, 0x4a, 0x29, 0xb0, 0xb7, 0x9e, 0xdf, 0x9f, 0x73, 0x4f, + 0xcf, 0x1f, 0x78, 0x1c, 0xc5, 0x42, 0x0a, 0x3b, 0x1a, 0x26, 0x93, 0x61, 0xe2, 0xc6, 0xc1, 0x10, + 0xed, 0xec, 0xab, 0xa9, 0x38, 0xaa, 0x67, 0x40, 0xb5, 0x9a, 0xa9, 0x31, 0xbe, 0x09, 0x5c, 0xb4, + 0xb9, 0xf0, 0x52, 0x59, 0xfd, 0x17, 0x01, 0x63, 0xb0, 0x54, 0x3a, 0xf8, 0x75, 0x82, 0x89, 0xa4, + 0x27, 0x50, 0xbc, 0x14, 0x51, 0xe0, 0x9a, 0xa4, 0x46, 0x1a, 0x87, 0x2d, 0xa3, 0xb9, 0x4a, 0xae, + 0x70, 0x67, 0x41, 0x53, 0x03, 0x76, 0x3e, 0xe0, 0xad, 0x59, 0xa8, 0x91, 0x86, 0xee, 0xcc, 0x3f, + 0x69, 0x65, 0xee, 0x1c, 0x23, 0x37, 0x77, 0x14, 0xb6, 0x08, 0xe6, 0xe8, 0x7b, 0xee, 0xe1, 0x37, + 0x73, 0xb7, 0x46, 0x1a, 0xbb, 0xce, 0x22, 0xa0, 0x16, 0xc0, 0x19, 0x93, 0xcc, 0x45, 0x2e, 0x31, + 0x36, 0x8b, 0xca, 0x90, 0x43, 0xe8, 0x23, 0xd0, 0x7b, 0xec, 0x1a, 0x93, 0x88, 0xb9, 0x68, 0x96, + 0x14, 0xbd, 0x02, 0xea, 0x3f, 0x0a, 0x50, 0x3c, 0xbf, 0x41, 0x2e, 0x57, 0xd9, 0x49, 0x3e, 0xfb, + 0x09, 0x1c, 0x9c, 0x73, 0xaf, 0xff, 0x65, 0xc0, 0x59, 0x94, 0xf8, 0x42, 0xaa, 0x2a, 0xf7, 0xbb, + 0x9a, 0xb3, 0x0e, 0xd3, 0x16, 0x1c, 0xf5, 0x70, 0xba, 0x0c, 0x2f, 0xc5, 0x85, 0x08, 0x43, 0x31, + 0x55, 0xf5, 0xcf, 0xd5, 0x77, 0x91, 0xf4, 0x15, 0x80, 0x7a, 0xba, 0xcd, 0xa4, 0xeb, 0xab, 0x9f, + 0x2a, 0xb7, 0x1e, 0xe4, 0x9a, 0xb4, 0x22, 0xbb, 0x9a, 0x93, 0x93, 0xd2, 0x0b, 0x38, 0x18, 0x2c, + 0x66, 0xd0, 0x45, 0x16, 0x4a, 0xdf, 0x04, 0xe5, 0xb5, 0x72, 0xde, 0x35, 0xfe, 0x2a, 0xf2, 0x98, + 0xc4, 0x79, 0xd1, 0x6b, 0x70, 0x5b, 0x87, 0xbd, 0x8f, 0xec, 0x36, 0x14, 0xcc, 0xab, 0xbf, 0xcc, + 0xd7, 0x42, 0x1b, 0x50, 0x52, 0x51, 0x62, 0x92, 0xda, 0x4e, 0xa3, 0xbc, 0x36, 0x3a, 0x45, 0x38, + 0x29, 0x5f, 0xff, 0x4e, 0xe0, 0xe8, 0x8e, 0xb7, 0xe8, 0x13, 0x28, 0xf4, 0xa3, 0x74, 0xf0, 0x95, + 0x9c, 0xbb, 0xc3, 0x24, 0x0b, 0xc5, 0xa8, 0x1f, 0x39, 0x85, 0x7e, 0x44, 0xdf, 0x81, 0xd1, 0xf1, + 0xd1, 0x1d, 0xa7, 0x19, 0x7a, 0xc2, 0x43, 0xd5, 0xe0, 0x72, 0xeb, 0xb8, 0x99, 0xed, 0x59, 0x73, + 0x53, 0xe2, 0x6c, 0x99, 0x4e, 0xdf, 0xa6, 0xab, 0x46, 0xcb, 0xb0, 0x77, 0xc5, 0xc7, 0x5c, 0x4c, + 0xb9, 0xa1, 0xd1, 0xfb, 0x1b, 0x7d, 0x32, 0x08, 0x35, 0xa1, 0xb2, 0x06, 0x75, 0x04, 0xe7, 0xe8, + 0x4a, 0xa3, 0x70, 0xfa, 0x14, 0xf4, 0xac, 0x38, 0x7a, 0x0f, 0xf6, 0x1d, 0x1c, 0x05, 0x89, 0xc4, + 0xd8, 0xd0, 0xe8, 0x21, 0xc0, 0x19, 0xc6, 0xcb, 0x98, 0xb4, 0x3e, 0xc1, 0xc3, 0x81, 0x64, 0x12, + 0x3b, 0x3e, 0xe3, 0x23, 0x4c, 0xf7, 0x3e, 0x92, 0x81, 0xe0, 0xf4, 0x35, 0xe8, 0xd9, 0x1d, 0xd0, + 0xe3, 0xfc, 0x40, 0x36, 0xae, 0xa3, 0xba, 0xd5, 0xd3, 0xba, 0xf6, 0x9c, 0xb4, 0xdf, 0xfc, 0x9e, + 0x59, 0xe4, 0xcf, 0xcc, 0x22, 0x7f, 0x67, 0x16, 0xf9, 0xf9, 0xcf, 0xd2, 0x3e, 0x3f, 0x1b, 0x05, + 0xd2, 0x9f, 0x0c, 0x9b, 0xae, 0xb8, 0xb6, 0x7d, 0x96, 0xf8, 0x81, 0x2b, 0xe2, 0xc8, 0x76, 0x05, + 0x4f, 0x26, 0xa1, 0xbd, 0x75, 0xc0, 0xc3, 0x92, 0x82, 0x5e, 0xfc, 0x0f, 0x00, 0x00, 0xff, 0xff, + 0x8f, 0x56, 0x73, 0x78, 0xdc, 0x03, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -751,6 +746,13 @@ func (m *SubscribeRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.Namespace) > 0 { + i -= len(m.Namespace) + copy(dAtA[i:], m.Namespace) + i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Namespace))) + i-- + dAtA[i] = 0x32 + } if len(m.Datacenter) > 0 { i -= len(m.Datacenter) copy(dAtA[i:], m.Datacenter) @@ -821,18 +823,6 @@ func (m *Event) MarshalToSizedBuffer(dAtA []byte) (int, error) { if m.Index != 0 { i = encodeVarintSubscribe(dAtA, i, uint64(m.Index)) i-- - dAtA[i] = 0x18 - } - if len(m.Key) > 0 { - i -= len(m.Key) - copy(dAtA[i:], m.Key) - i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Key))) - i-- - dAtA[i] = 0x12 - } - if m.Topic != 0 { - i = encodeVarintSubscribe(dAtA, i, uint64(m.Topic)) - i-- dAtA[i] = 0x8 } return len(dAtA) - i, nil @@ -851,7 +841,7 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0 } i-- - dAtA[i] = 0x28 + dAtA[i] = 0x10 return len(dAtA) - i, nil } func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) { @@ -867,7 +857,7 @@ func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, erro dAtA[i] = 0 } i-- - dAtA[i] = 0x30 + dAtA[i] = 0x18 return len(dAtA) - i, nil } func (m *Event_EventBatch) MarshalTo(dAtA []byte) (int, error) { @@ -886,7 +876,7 @@ func (m *Event_EventBatch) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintSubscribe(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x3a + dAtA[i] = 0x22 } return len(dAtA) - i, nil } @@ -1030,6 +1020,10 @@ func (m *SubscribeRequest) Size() (n int) { if l > 0 { n += 1 + l + sovSubscribe(uint64(l)) } + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovSubscribe(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1042,13 +1036,6 @@ func (m *Event) Size() (n int) { } var l int _ = l - if m.Topic != 0 { - n += 1 + sovSubscribe(uint64(m.Topic)) - } - l = len(m.Key) - if l > 0 { - n += 1 + l + sovSubscribe(uint64(l)) - } if m.Index != 0 { n += 1 + sovSubscribe(uint64(m.Index)) } @@ -1309,6 +1296,38 @@ func (m *SubscribeRequest) Unmarshal(dAtA []byte) error { } m.Datacenter = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSubscribe + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSubscribe + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSubscribe + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipSubscribe(dAtA[iNdEx:]) @@ -1364,57 +1383,6 @@ func (m *Event) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) - } - m.Topic = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowSubscribe - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Topic |= Topic(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowSubscribe - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthSubscribe - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthSubscribe - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Key = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) } @@ -1433,7 +1401,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { break } } - case 5: + case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field EndOfSnapshot", wireType) } @@ -1454,7 +1422,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { } b := bool(v != 0) m.Payload = &Event_EndOfSnapshot{b} - case 6: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType) } @@ -1475,7 +1443,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { } b := bool(v != 0) m.Payload = &Event_NewSnapshotToFollow{b} - case 7: + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType) } diff --git a/proto/pbsubscribe/subscribe.proto b/proto/pbsubscribe/subscribe.proto index c0efab87c3..37ae9e0206 100644 --- a/proto/pbsubscribe/subscribe.proto +++ b/proto/pbsubscribe/subscribe.proto @@ -73,43 +73,43 @@ message SubscribeRequest { // the remote DC and proxy the results back to the subscriber. An empty // string defaults to the local datacenter. string Datacenter = 5; + + // Namespace which contains the resources. If Namespace is not specified the + // default namespace will be used. + // + // Namespace is an enterprise-only feature. + string Namespace = 6; } // Event describes a streaming update on a subscription. Events are used both to // describe the current "snapshot" of the result as well as ongoing mutations to // that snapshot. message Event { - // Topic the event was published to - Topic Topic = 1; - - // Key is the logical identifier for the entity that was mutated. - string Key = 2; - // Index is the raft index at which the mutation took place. At the top // level of a subscription there will always be at most one Event per index. // If multiple events are published to the same topic in a single raft // transaction then the batch of events will be encoded inside a single // top-level event to ensure they are delivered atomically to clients. - uint64 Index = 3; + uint64 Index = 1; // Payload is the actual event content. oneof Payload { // EndOfSnapshot indicates the event stream for the initial snapshot has // ended. Subsequent Events delivered will be mutations to that result. - bool EndOfSnapshot = 5; + bool EndOfSnapshot = 2; // NewSnapshotToFollow indicates that the client view is stale. The client // must reset its view before handing any more events. Subsequent events // in the stream will be for a new snapshot until an EndOfSnapshot event // is received. - bool NewSnapshotToFollow = 6; + bool NewSnapshotToFollow = 3; // EventBatch is a set of events. This is typically used as the payload // type where multiple events are emitted in a single topic and raft // index (e.g. transactional updates). In this case the Topic and Index // values of all events will match and the whole set should be delivered // and consumed atomically. - EventBatch EventBatch = 7; + EventBatch EventBatch = 4; // ServiceHealth is used for ServiceHealth and ServiceHealthConnect // topics.