From 9b904de406967568ea3755128a07b9afbf762321 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 16 Nov 2020 14:20:07 -0500 Subject: [PATCH] Merge pull request #9114 from hashicorp/dnephin/filtering-in-stream stream: improve naming of Payload methods --- agent/consul/state/catalog_events.go | 11 +- agent/consul/state/catalog_events_test.go | 2 +- agent/consul/state/store_integration_test.go | 6 +- agent/consul/stream/event.go | 117 +++++++------ agent/consul/stream/event_publisher.go | 3 +- agent/consul/stream/event_publisher_test.go | 13 +- agent/consul/stream/event_test.go | 160 ++++++++++++++++++ agent/consul/stream/subscription.go | 18 +- agent/consul/stream/subscription_test.go | 166 +++---------------- agent/rpc/subscribe/auth.go | 22 --- agent/rpc/subscribe/logger.go | 14 +- agent/rpc/subscribe/subscribe.go | 10 +- agent/rpc/subscribe/subscribe_test.go | 13 +- 13 files changed, 308 insertions(+), 247 deletions(-) delete mode 100644 agent/rpc/subscribe/auth.go diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 0a11a94360..526eca3549 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -3,6 +3,7 @@ package state import ( memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbsubscribe" @@ -10,6 +11,10 @@ import ( // EventPayloadCheckServiceNode is used as the Payload for a stream.Event to // indicates changes to a CheckServiceNode for service health. +// +// The stream.Payload methods implemented by EventPayloadCheckServiceNode are +// do not mutate the payload, making it safe to use in an Event sent to +// stream.EventPublisher.Publish. type EventPayloadCheckServiceNode struct { Op pbsubscribe.CatalogOp Value *structs.CheckServiceNode @@ -19,7 +24,11 @@ type EventPayloadCheckServiceNode struct { key string } -func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool { +func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool { + return e.Value.CanRead(authz) == acl.Allow +} + +func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool { if key == "" && namespace == "" { return true } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 7019c9254f..f9a2e1af21 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -1476,7 +1476,7 @@ func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) { t.Skip("cant test namespace matching without namespace support") } - require.Equal(t, tc.expected, tc.payload.FilterByKey(tc.key, tc.namespace)) + require.Equal(t, tc.expected, tc.payload.MatchesKey(tc.key, tc.namespace)) } var testCases = []testCase{ diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index d75512195e..fc4d05591e 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -410,10 +410,14 @@ type nodePayload struct { node *structs.ServiceNode } -func (p nodePayload) FilterByKey(key, _ string) bool { +func (p nodePayload) MatchesKey(key, _ string) bool { return p.key == key } +func (p nodePayload) HasReadPermission(acl.Authorizer) bool { + return true +} + func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { token := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 09d96ee6db..74df46b5e1 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -4,7 +4,11 @@ to the state store. */ package stream -import "fmt" +import ( + "fmt" + + "github.com/hashicorp/consul/acl" +) // Topic is an identifier that partitions events. A subscription will only receive // events which match the Topic. @@ -18,72 +22,81 @@ type Event struct { Payload Payload } +// A Payload contains the topic-specific data in an event. The payload methods +// should not modify the state of the payload if the Event is being submitted to +// EventPublisher.Publish. type Payload interface { - // FilterByKey must return true if the Payload should be included in a subscription + // MatchesKey must return true if the Payload should be included in a subscription // requested with the key and namespace. // Generally this means that the payload matches the key and namespace or // the payload is a special framing event that should be returned to every // subscription. - FilterByKey(key, namespace string) bool + MatchesKey(key, namespace string) bool + + // HasReadPermission uses the acl.Authorizer to determine if the items in the + // Payload are visible to the request. It returns true if the payload is + // authorized for Read, otherwise returns false. + HasReadPermission(authz acl.Authorizer) bool } -// Len returns the number of events contained within this event. If the Payload -// is a []Event, the length of that slice is returned. Otherwise 1 is returned. -func (e Event) Len() int { - if batch, ok := e.Payload.(PayloadEvents); ok { - return len(batch) - } - return 1 +// PayloadEvents is a Payload that may be returned by Subscription.Next when +// there are multiple events at an index. +// +// Note that unlike most other Payload, PayloadEvents is mutable and it is NOT +// safe to send to EventPublisher.Publish. +type PayloadEvents struct { + Items []Event } -// Filter returns an Event filtered to only those Events where f returns true. -// If the second return value is false, every Event was removed by the filter. -func (e Event) Filter(f func(Event) bool) (Event, bool) { - batch, ok := e.Payload.(PayloadEvents) - if !ok { - return e, f(e) - } +func newPayloadEvents(items ...Event) *PayloadEvents { + return &PayloadEvents{Items: items} +} + +func (p *PayloadEvents) filter(f func(Event) bool) bool { + items := p.Items // To avoid extra allocations, iterate over the list of events first and // get a count of the total desired size. This trades off some extra cpu // time in the worse case (when not all items match the filter), for // fewer memory allocations. var size int - for idx := range batch { - if f(batch[idx]) { + for idx := range items { + if f(items[idx]) { size++ } } - if len(batch) == size || size == 0 { - return e, size != 0 + if len(items) == size || size == 0 { + return size != 0 } - filtered := make(PayloadEvents, 0, size) - for idx := range batch { - event := batch[idx] + filtered := make([]Event, 0, size) + for idx := range items { + event := items[idx] if f(event) { filtered = append(filtered, event) } } - if len(filtered) == 0 { - return e, false - } - e.Payload = filtered - return e, true -} - -// PayloadEvents is an Payload which contains multiple Events. -type PayloadEvents []Event - -// TODO: this method is not called, but needs to exist so that we can store -// a slice of events as a payload. In the future we should be able to refactor -// Event.Filter so that this FilterByKey includes the re-slicing. -func (e PayloadEvents) FilterByKey(_, _ string) bool { + p.Items = filtered return true } -func (e PayloadEvents) Events() []Event { - return e +// MatchesKey filters the PayloadEvents to those which match the key and namespace. +func (p *PayloadEvents) MatchesKey(key, namespace string) bool { + return p.filter(func(event Event) bool { + return event.Payload.MatchesKey(key, namespace) + }) +} + +func (p *PayloadEvents) Len() int { + return len(p.Items) +} + +// HasReadPermission filters the PayloadEvents to those which are authorized +// for reading by authz. +func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool { + return p.filter(func(event Event) bool { + return event.Payload.HasReadPermission(authz) + }) } // IsEndOfSnapshot returns true if this is a framing event that indicates the @@ -100,24 +113,34 @@ func (e Event) IsNewSnapshotToFollow() bool { return e.Payload == newSnapshotToFollow{} } -type endOfSnapshot struct{} +type framingEvent struct{} -func (endOfSnapshot) FilterByKey(string, string) bool { +func (framingEvent) MatchesKey(string, string) bool { return true } -type newSnapshotToFollow struct{} - -func (newSnapshotToFollow) FilterByKey(string, string) bool { +func (framingEvent) HasReadPermission(acl.Authorizer) bool { return true } +type endOfSnapshot struct { + framingEvent +} + +type newSnapshotToFollow struct { + framingEvent +} + type closeSubscriptionPayload struct { tokensSecretIDs []string } -func (closeSubscriptionPayload) FilterByKey(string, string) bool { - return true +func (closeSubscriptionPayload) MatchesKey(string, string) bool { + return false +} + +func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool { + return false } // NewCloseSubscriptionEvent returns a special Event that is handled by the diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 379bfdfa8d..769e875d83 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -91,7 +91,8 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E return e } -// Publish events to all subscribers of the event Topic. +// Publish events to all subscribers of the event Topic. The events will be shared +// with all subscriptions, so the Payload used in Event.Payload must be immutable. func (e *EventPublisher) Publish(events []Event) { if len(events) > 0 { e.publishCh <- events diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index f2a9e43a36..576d4ccc35 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" ) type intTopic int @@ -63,17 +65,22 @@ var testSnapshotEvent = Event{ } type simplePayload struct { - key string - value string + key string + value string + noReadPerm bool } -func (p simplePayload) FilterByKey(key, _ string) bool { +func (p simplePayload) MatchesKey(key, _ string) bool { if key == "" { return true } return p.key == key } +func (p simplePayload) HasReadPermission(acl.Authorizer) bool { + return !p.noReadPerm +} + func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go index 182f0d5122..8b36ee8d15 100644 --- a/agent/consul/stream/event_test.go +++ b/agent/consul/stream/event_test.go @@ -15,3 +15,163 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) { require.False(t, e.IsEndOfSnapshot()) }) } + +func newSimpleEvent(key string, index uint64) Event { + return Event{Index: index, Payload: simplePayload{key: key}} +} + +func TestPayloadEvents_FilterByKey(t *testing.T) { + type testCase struct { + name string + req SubscribeRequest + events []Event + expectEvent bool + expected *PayloadEvents + expectedCap int + } + + fn := func(t *testing.T, tc testCase) { + events := make([]Event, 0, 5) + events = append(events, tc.events...) + + pe := &PayloadEvents{Items: events} + ok := pe.MatchesKey(tc.req.Key, tc.req.Namespace) + require.Equal(t, tc.expectEvent, ok) + if !tc.expectEvent { + return + } + + require.Equal(t, tc.expected, pe) + // test if there was a new array allocated or not + require.Equal(t, tc.expectedCap, cap(pe.Items)) + } + + var testCases = []testCase{ + { + name: "all events match, no key or namespace", + req: SubscribeRequest{Topic: testTopic}, + events: []Event{ + newSimpleEvent("One", 102), + newSimpleEvent("Two", 102)}, + expectEvent: true, + expected: newPayloadEvents( + newSimpleEvent("One", 102), + newSimpleEvent("Two", 102)), + expectedCap: 5, + }, + { + name: "all events match, no namespace", + req: SubscribeRequest{Topic: testTopic, Key: "Same"}, + events: []Event{ + newSimpleEvent("Same", 103), + newSimpleEvent("Same", 103)}, + expectEvent: true, + expected: newPayloadEvents( + newSimpleEvent("Same", 103), + newSimpleEvent("Same", 103)), + expectedCap: 5, + }, + { + name: "all events match, no key", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("Something", "apps"), + newNSEvent("Other", "apps")}, + expectEvent: true, + expected: newPayloadEvents( + newNSEvent("Something", "apps"), + newNSEvent("Other", "apps")), + expectedCap: 5, + }, + { + name: "some evens match, no namespace", + req: SubscribeRequest{Topic: testTopic, Key: "Same"}, + events: []Event{ + newSimpleEvent("Same", 104), + newSimpleEvent("Other", 104), + newSimpleEvent("Same", 104)}, + expectEvent: true, + expected: newPayloadEvents( + newSimpleEvent("Same", 104), + newSimpleEvent("Same", 104)), + expectedCap: 2, + }, + { + name: "some events match, no key", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("app1", "apps"), + newNSEvent("db1", "dbs"), + newNSEvent("app2", "apps")}, + expectEvent: true, + expected: newPayloadEvents( + newNSEvent("app1", "apps"), + newNSEvent("app2", "apps")), + expectedCap: 2, + }, + { + name: "no events match key", + req: SubscribeRequest{Topic: testTopic, Key: "Other"}, + events: []Event{ + newSimpleEvent("Same", 0), + newSimpleEvent("Same", 0)}, + }, + { + name: "no events match namespace", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("app1", "group1"), + newNSEvent("app2", "group2")}, + expectEvent: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } +} + +func newNSEvent(key, namespace string) Event { + return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}} +} + +type nsPayload struct { + framingEvent + key string + namespace string + value string +} + +func (p nsPayload) MatchesKey(key, namespace string) bool { + return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) +} + +func TestPayloadEvents_HasReadPermission(t *testing.T) { + t.Run("some events filtered", func(t *testing.T) { + ep := newPayloadEvents( + Event{Payload: simplePayload{key: "one", noReadPerm: true}}, + Event{Payload: simplePayload{key: "two", noReadPerm: false}}, + Event{Payload: simplePayload{key: "three", noReadPerm: true}}, + Event{Payload: simplePayload{key: "four", noReadPerm: false}}) + + require.True(t, ep.HasReadPermission(nil)) + expected := []Event{ + {Payload: simplePayload{key: "two"}}, + {Payload: simplePayload{key: "four"}}, + } + require.Equal(t, expected, ep.Items) + }) + + t.Run("all events filtered", func(t *testing.T) { + ep := newPayloadEvents( + Event{Payload: simplePayload{key: "one", noReadPerm: true}}, + Event{Payload: simplePayload{key: "two", noReadPerm: true}}, + Event{Payload: simplePayload{key: "three", noReadPerm: true}}, + Event{Payload: simplePayload{key: "four", noReadPerm: true}}) + + require.False(t, ep.HasReadPermission(nil)) + }) + +} diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 472b0ce90d..03069ea931 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -101,8 +101,8 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) { if len(next.Events) == 0 { continue } - event, ok := filterByKey(s.req, next.Events) - if !ok { + event := newEventFromBatch(s.req, next.Events) + if !event.Payload.MatchesKey(s.req.Key, s.req.Namespace) { continue } return event, nil @@ -128,22 +128,10 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event { return Event{ Topic: req.Topic, Index: first.Index, - Payload: PayloadEvents(events), + Payload: newPayloadEvents(events...), } } -func filterByKey(req SubscribeRequest, events []Event) (Event, bool) { - event := newEventFromBatch(req, events) - if req.Key == "" && req.Namespace == "" { - return event, true - } - - fn := func(e Event) bool { - return e.Payload.FilterByKey(req.Key, req.Namespace) - } - return event.Filter(fn) -} - // Close the subscription. Subscribers will receive an error when they call Next, // and will need to perform a new Subscribe request. // It is safe to call from any goroutine. diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index 2c192b1840..02368f61d7 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -138,147 +138,29 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) { b.Append([]Event{e}) } -func newSimpleEvent(key string, index uint64) Event { - return Event{Index: index, Payload: simplePayload{key: key}} -} - -func TestFilterByKey(t *testing.T) { - type testCase struct { - name string - req SubscribeRequest - events []Event - expectEvent bool - expected Event - expectedCap int - } - - fn := func(t *testing.T, tc testCase) { - events := make(PayloadEvents, 0, 5) - events = append(events, tc.events...) - - actual, ok := filterByKey(tc.req, events) - require.Equal(t, tc.expectEvent, ok) - if !tc.expectEvent { - return +func TestNewEventsFromBatch(t *testing.T) { + t.Run("single item", func(t *testing.T) { + first := Event{ + Topic: testTopic, + Index: 1234, + Payload: simplePayload{key: "key"}, } - - require.Equal(t, tc.expected, actual) - // test if there was a new array allocated or not - require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents))) - } - - var testCases = []testCase{ - { - name: "all events match, no key or namespace", - req: SubscribeRequest{Topic: testTopic}, - events: []Event{ - newSimpleEvent("One", 102), - newSimpleEvent("Two", 102)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 102, - Payload: PayloadEvents{ - newSimpleEvent("One", 102), - newSimpleEvent("Two", 102)}}, - expectedCap: 5, - }, - { - name: "all events match, no namespace", - req: SubscribeRequest{Topic: testTopic, Key: "Same"}, - events: []Event{ - newSimpleEvent("Same", 103), - newSimpleEvent("Same", 103)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 103, - Payload: PayloadEvents{ - newSimpleEvent("Same", 103), - newSimpleEvent("Same", 103)}}, - expectedCap: 5, - }, - { - name: "all events match, no key", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("Something", "apps"), - newNSEvent("Other", "apps")}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 22, - Payload: PayloadEvents{ - newNSEvent("Something", "apps"), - newNSEvent("Other", "apps")}}, - expectedCap: 5, - }, - { - name: "some evens match, no namespace", - req: SubscribeRequest{Topic: testTopic, Key: "Same"}, - events: []Event{ - newSimpleEvent("Same", 104), - newSimpleEvent("Other", 104), - newSimpleEvent("Same", 104)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 104, - Payload: PayloadEvents{ - newSimpleEvent("Same", 104), - newSimpleEvent("Same", 104)}}, - expectedCap: 2, - }, - { - name: "some events match, no key", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("app1", "apps"), - newNSEvent("db1", "dbs"), - newNSEvent("app2", "apps")}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 22, - Payload: PayloadEvents{ - newNSEvent("app1", "apps"), - newNSEvent("app2", "apps")}}, - expectedCap: 2, - }, - { - name: "no events match key", - req: SubscribeRequest{Topic: testTopic, Key: "Other"}, - events: []Event{ - newSimpleEvent("Same", 0), - newSimpleEvent("Same", 0)}, - }, - { - name: "no events match namespace", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("app1", "group1"), - newNSEvent("app2", "group2")}, - expectEvent: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - fn(t, tc) - }) - } -} - -func newNSEvent(key, namespace string) Event { - return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}} -} - -type nsPayload struct { - key string - namespace string - value string -} - -func (p nsPayload) FilterByKey(key, namespace string) bool { - return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) + e := newEventFromBatch(SubscribeRequest{}, []Event{first}) + require.Equal(t, first, e) + }) + t.Run("many items", func(t *testing.T) { + events := []Event{ + newSimpleEvent("foo", 9999), + newSimpleEvent("foo", 9999), + newSimpleEvent("zee", 9999), + } + req := SubscribeRequest{Topic: testTopic} + e := newEventFromBatch(req, events) + expected := Event{ + Topic: testTopic, + Index: 9999, + Payload: newPayloadEvents(events...), + } + require.Equal(t, expected, e) + }) } diff --git a/agent/rpc/subscribe/auth.go b/agent/rpc/subscribe/auth.go deleted file mode 100644 index b41b1fdc40..0000000000 --- a/agent/rpc/subscribe/auth.go +++ /dev/null @@ -1,22 +0,0 @@ -package subscribe - -import ( - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/consul/stream" -) - -// EnforceACL takes an acl.Authorizer and returns the decision for whether the -// event is allowed to be sent to this client or not. -func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision { - switch { - case e.IsEndOfSnapshot(), e.IsNewSnapshotToFollow(): - return acl.Allow - } - - switch p := e.Payload.(type) { - case state.EventPayloadCheckServiceNode: - return p.Value.CanRead(authz) - } - return acl.Deny -} diff --git a/agent/rpc/subscribe/logger.go b/agent/rpc/subscribe/logger.go index ddddb20ca5..99394f5465 100644 --- a/agent/rpc/subscribe/logger.go +++ b/agent/rpc/subscribe/logger.go @@ -58,12 +58,20 @@ func (l *eventLogger) Trace(e stream.Event) { case e.IsEndOfSnapshot(): l.snapshotDone = true l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count) + return case e.IsNewSnapshotToFollow(): l.logger.Trace("starting new snapshot", "sent", l.count) return - case l.snapshotDone: - l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", e.Len()) } - l.count += uint64(e.Len()) + size := 1 + if l, ok := e.Payload.(length); ok { + size = l.Len() + } + l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", size) + l.count += uint64(size) +} + +type length interface { + Len() int } diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 71919babab..0e98893f92 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -132,10 +132,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) if authz == nil { return event, true } - fn := func(e stream.Event) bool { - return enforceACL(authz, e) == acl.Allow - } - return event.Filter(fn) + + return event, event.Payload.HasReadPermission(authz) } func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { @@ -154,10 +152,10 @@ func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { func setPayload(e *pbsubscribe.Event, payload stream.Payload) { switch p := payload.(type) { - case stream.PayloadEvents: + case *stream.PayloadEvents: e.Payload = &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{ - Events: batchEventsFromEventSlice(p), + Events: batchEventsFromEventSlice(p.Items), }, } case state.EventPayloadCheckServiceNode: diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index bc41ed1e88..cf37e75bbc 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -917,8 +917,8 @@ func TestNewEventFromSteamEvent(t *testing.T) { name: "event batch", event: stream.Event{ Index: 2002, - Payload: stream.PayloadEvents{ - { + Payload: newPayloadEvents( + stream.Event{ Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -928,7 +928,7 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, }, - { + stream.Event{ Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, @@ -937,8 +937,7 @@ func TestNewEventFromSteamEvent(t *testing.T) { Service: &structs.NodeService{Service: "web1"}, }, }, - }, - }, + }), }, expected: pbsubscribe.Event{ Index: 2002, @@ -1008,6 +1007,10 @@ func TestNewEventFromSteamEvent(t *testing.T) { } } +func newPayloadEvents(items ...stream.Event) *stream.PayloadEvents { + return &stream.PayloadEvents{Items: items} +} + // newEventFromSubscription is used to return framing events. EndOfSnapshot and // NewSnapshotToFollow are not exported, but we can get them from a subscription. func newEventFromSubscription(t *testing.T, index uint64) stream.Event {