diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 3621f20472..09369920bf 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -24,62 +24,55 @@ type Payload interface { // Generally this means that the payload matches the key and namespace or // the payload is a special framing event that should be returned to every // subscription. + // TODO: rename to MatchesKey FilterByKey(key, namespace string) bool } -// Len returns the number of events contained within this event. If the Payload -// is a []Event, the length of that slice is returned. Otherwise 1 is returned. -func (e Event) Len() int { - if batch, ok := e.Payload.(PayloadEvents); ok { - return len(batch) - } - return 1 +// PayloadEvents is an Payload which contains multiple Events. +type PayloadEvents struct { + Items []Event } -// Filter returns an Event filtered to only those Events where f returns true. -// If the second return value is false, every Event was removed by the filter. -func (e Event) Filter(f func(Event) bool) (Event, bool) { - batch, ok := e.Payload.(PayloadEvents) - if !ok { - return e, f(e) - } +func NewPayloadEvents(items ...Event) *PayloadEvents { + return &PayloadEvents{Items: items} +} + +func (p *PayloadEvents) filter(f func(Event) bool) bool { + items := p.Items // To avoid extra allocations, iterate over the list of events first and // get a count of the total desired size. This trades off some extra cpu // time in the worse case (when not all items match the filter), for // fewer memory allocations. var size int - for idx := range batch { - if f(batch[idx]) { + for idx := range items { + if f(items[idx]) { size++ } } - if len(batch) == size || size == 0 { - return e, size != 0 + if len(items) == size || size == 0 { + return size != 0 } - filtered := make(PayloadEvents, 0, size) - for idx := range batch { - event := batch[idx] + filtered := make([]Event, 0, size) + for idx := range items { + event := items[idx] if f(event) { filtered = append(filtered, event) } } - if len(filtered) == 0 { - return e, false - } - e.Payload = filtered - return e, true + p.Items = filtered + return true } -// PayloadEvents is an Payload which contains multiple Events. -type PayloadEvents []Event +func (p *PayloadEvents) FilterByKey(key, namespace string) bool { + return p.filter(func(event Event) bool { + return event.Payload.FilterByKey(key, namespace) + }) +} -// TODO: this method is not called, but needs to exist so that we can store -// a slice of events as a payload. In the future we should be able to refactor -// Event.Filter so that this FilterByKey includes the re-slicing. -func (e PayloadEvents) FilterByKey(_, _ string) bool { - return true +func (p *PayloadEvents) Len() int { + return len(p.Items) } // IsEndOfSnapshot returns true if this is a framing event that indicates the diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go index 182f0d5122..f3daba8021 100644 --- a/agent/consul/stream/event_test.go +++ b/agent/consul/stream/event_test.go @@ -15,3 +15,134 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) { require.False(t, e.IsEndOfSnapshot()) }) } + +func newSimpleEvent(key string, index uint64) Event { + return Event{Index: index, Payload: simplePayload{key: key}} +} + +func TestPayloadEvents_FilterByKey(t *testing.T) { + type testCase struct { + name string + req SubscribeRequest + events []Event + expectEvent bool + expected *PayloadEvents + expectedCap int + } + + fn := func(t *testing.T, tc testCase) { + events := make([]Event, 0, 5) + events = append(events, tc.events...) + + pe := &PayloadEvents{Items: events} + ok := pe.FilterByKey(tc.req.Key, tc.req.Namespace) + require.Equal(t, tc.expectEvent, ok) + if !tc.expectEvent { + return + } + + require.Equal(t, tc.expected, pe) + // test if there was a new array allocated or not + require.Equal(t, tc.expectedCap, cap(pe.Items)) + } + + var testCases = []testCase{ + { + name: "all events match, no key or namespace", + req: SubscribeRequest{Topic: testTopic}, + events: []Event{ + newSimpleEvent("One", 102), + newSimpleEvent("Two", 102)}, + expectEvent: true, + expected: NewPayloadEvents( + newSimpleEvent("One", 102), + newSimpleEvent("Two", 102)), + expectedCap: 5, + }, + { + name: "all events match, no namespace", + req: SubscribeRequest{Topic: testTopic, Key: "Same"}, + events: []Event{ + newSimpleEvent("Same", 103), + newSimpleEvent("Same", 103)}, + expectEvent: true, + expected: NewPayloadEvents( + newSimpleEvent("Same", 103), + newSimpleEvent("Same", 103)), + expectedCap: 5, + }, + { + name: "all events match, no key", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("Something", "apps"), + newNSEvent("Other", "apps")}, + expectEvent: true, + expected: NewPayloadEvents( + newNSEvent("Something", "apps"), + newNSEvent("Other", "apps")), + expectedCap: 5, + }, + { + name: "some evens match, no namespace", + req: SubscribeRequest{Topic: testTopic, Key: "Same"}, + events: []Event{ + newSimpleEvent("Same", 104), + newSimpleEvent("Other", 104), + newSimpleEvent("Same", 104)}, + expectEvent: true, + expected: NewPayloadEvents( + newSimpleEvent("Same", 104), + newSimpleEvent("Same", 104)), + expectedCap: 2, + }, + { + name: "some events match, no key", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("app1", "apps"), + newNSEvent("db1", "dbs"), + newNSEvent("app2", "apps")}, + expectEvent: true, + expected: NewPayloadEvents( + newNSEvent("app1", "apps"), + newNSEvent("app2", "apps")), + expectedCap: 2, + }, + { + name: "no events match key", + req: SubscribeRequest{Topic: testTopic, Key: "Other"}, + events: []Event{ + newSimpleEvent("Same", 0), + newSimpleEvent("Same", 0)}, + }, + { + name: "no events match namespace", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("app1", "group1"), + newNSEvent("app2", "group2")}, + expectEvent: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } +} + +func newNSEvent(key, namespace string) Event { + return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}} +} + +type nsPayload struct { + key string + namespace string + value string +} + +func (p nsPayload) FilterByKey(key, namespace string) bool { + return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) +} diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 472b0ce90d..5dc45efdfb 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -101,8 +101,8 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) { if len(next.Events) == 0 { continue } - event, ok := filterByKey(s.req, next.Events) - if !ok { + event := newEventFromBatch(s.req, next.Events) + if !event.Payload.FilterByKey(s.req.Key, s.req.Namespace) { continue } return event, nil @@ -128,22 +128,10 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event { return Event{ Topic: req.Topic, Index: first.Index, - Payload: PayloadEvents(events), + Payload: NewPayloadEvents(events...), } } -func filterByKey(req SubscribeRequest, events []Event) (Event, bool) { - event := newEventFromBatch(req, events) - if req.Key == "" && req.Namespace == "" { - return event, true - } - - fn := func(e Event) bool { - return e.Payload.FilterByKey(req.Key, req.Namespace) - } - return event.Filter(fn) -} - // Close the subscription. Subscribers will receive an error when they call Next, // and will need to perform a new Subscribe request. // It is safe to call from any goroutine. diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index 2c192b1840..bec2538cb4 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -138,147 +138,29 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) { b.Append([]Event{e}) } -func newSimpleEvent(key string, index uint64) Event { - return Event{Index: index, Payload: simplePayload{key: key}} -} - -func TestFilterByKey(t *testing.T) { - type testCase struct { - name string - req SubscribeRequest - events []Event - expectEvent bool - expected Event - expectedCap int - } - - fn := func(t *testing.T, tc testCase) { - events := make(PayloadEvents, 0, 5) - events = append(events, tc.events...) - - actual, ok := filterByKey(tc.req, events) - require.Equal(t, tc.expectEvent, ok) - if !tc.expectEvent { - return +func TestNewEventsFromBatch(t *testing.T) { + t.Run("single item", func(t *testing.T) { + first := Event{ + Topic: testTopic, + Index: 1234, + Payload: simplePayload{key: "key"}, } - - require.Equal(t, tc.expected, actual) - // test if there was a new array allocated or not - require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents))) - } - - var testCases = []testCase{ - { - name: "all events match, no key or namespace", - req: SubscribeRequest{Topic: testTopic}, - events: []Event{ - newSimpleEvent("One", 102), - newSimpleEvent("Two", 102)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 102, - Payload: PayloadEvents{ - newSimpleEvent("One", 102), - newSimpleEvent("Two", 102)}}, - expectedCap: 5, - }, - { - name: "all events match, no namespace", - req: SubscribeRequest{Topic: testTopic, Key: "Same"}, - events: []Event{ - newSimpleEvent("Same", 103), - newSimpleEvent("Same", 103)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 103, - Payload: PayloadEvents{ - newSimpleEvent("Same", 103), - newSimpleEvent("Same", 103)}}, - expectedCap: 5, - }, - { - name: "all events match, no key", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("Something", "apps"), - newNSEvent("Other", "apps")}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 22, - Payload: PayloadEvents{ - newNSEvent("Something", "apps"), - newNSEvent("Other", "apps")}}, - expectedCap: 5, - }, - { - name: "some evens match, no namespace", - req: SubscribeRequest{Topic: testTopic, Key: "Same"}, - events: []Event{ - newSimpleEvent("Same", 104), - newSimpleEvent("Other", 104), - newSimpleEvent("Same", 104)}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 104, - Payload: PayloadEvents{ - newSimpleEvent("Same", 104), - newSimpleEvent("Same", 104)}}, - expectedCap: 2, - }, - { - name: "some events match, no key", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("app1", "apps"), - newNSEvent("db1", "dbs"), - newNSEvent("app2", "apps")}, - expectEvent: true, - expected: Event{ - Topic: testTopic, - Index: 22, - Payload: PayloadEvents{ - newNSEvent("app1", "apps"), - newNSEvent("app2", "apps")}}, - expectedCap: 2, - }, - { - name: "no events match key", - req: SubscribeRequest{Topic: testTopic, Key: "Other"}, - events: []Event{ - newSimpleEvent("Same", 0), - newSimpleEvent("Same", 0)}, - }, - { - name: "no events match namespace", - req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, - events: []Event{ - newNSEvent("app1", "group1"), - newNSEvent("app2", "group2")}, - expectEvent: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - fn(t, tc) - }) - } -} - -func newNSEvent(key, namespace string) Event { - return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}} -} - -type nsPayload struct { - key string - namespace string - value string -} - -func (p nsPayload) FilterByKey(key, namespace string) bool { - return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) + e := newEventFromBatch(SubscribeRequest{}, []Event{first}) + require.Equal(t, first, e) + }) + t.Run("many items", func(t *testing.T) { + events := []Event{ + newSimpleEvent("foo", 9999), + newSimpleEvent("foo", 9999), + newSimpleEvent("zee", 9999), + } + req := SubscribeRequest{Topic: testTopic} + e := newEventFromBatch(req, events) + expected := Event{ + Topic: testTopic, + Index: 9999, + Payload: NewPayloadEvents(events...), + } + require.Equal(t, expected, e) + }) }