stream: fix overallocation in filter

And add tests
This commit is contained in:
Daniel Nephin 2020-07-15 17:03:39 -04:00
parent edfdcd3d67
commit a99a4103bd
2 changed files with 61 additions and 15 deletions

View File

@ -82,7 +82,7 @@ func (s *Subscription) Next(ctx context.Context) ([]Event, error) {
} }
s.currentItem = next s.currentItem = next
events := s.filter(next.Events) events := filter(s.req.Key, next.Events)
if len(events) == 0 { if len(events) == 0 {
continue continue
} }
@ -90,34 +90,34 @@ func (s *Subscription) Next(ctx context.Context) ([]Event, error) {
} }
} }
// TODO: test cases for this method // filter events to only those that match the key exactly.
func (s *Subscription) filter(events []Event) []Event { func filter(key string, events []Event) []Event {
if s.req.Key == "" || len(events) == 0 { if key == "" || len(events) == 0 {
return events return events
} }
allMatch := true var count int
for _, e := range events { for _, e := range events {
if s.req.Key != e.Key { if key == e.Key {
allMatch = false count++
break
} }
} }
// Only allocate a new slice if some events need to be filtered out // Only allocate a new slice if some events need to be filtered out
if allMatch { switch count {
case 0:
return nil
case len(events):
return events return events
} }
// FIXME: this will over-allocate. We could get a count from the previous range result := make([]Event, 0, count)
// over events.
events = make([]Event, 0, len(events))
for _, e := range events { for _, e := range events {
if s.req.Key == e.Key { if key == e.Key {
events = append(events, e) result = append(result, e)
} }
} }
return events return result
} }
// Close the subscription. Subscribers will receive an error when they call Next, // Close the subscription. Subscribers will receive an error when they call Next,

View File

@ -148,3 +148,49 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) {
} }
b.Append([]Event{e}) b.Append([]Event{e})
} }
func TestFilter_NoKey(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "One"}, Event{Key: "Two"})
actual := filter("", events)
require.Equal(t, events, actual)
// test that a new array was not allocated
require.Equal(t, cap(actual), 5)
}
func TestFilter_WithKey_AllEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same"}, Event{Key: "Same"})
actual := filter("Same", events)
require.Equal(t, events, actual)
// test that a new array was not allocated
require.Equal(t, cap(actual), 5)
}
func TestFilter_WithKey_SomeEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same"}, Event{Key: "Other"}, Event{Key: "Same"})
actual := filter("Same", events)
expected := []Event{{Key: "Same"}, {Key: "Same"}}
require.Equal(t, expected, actual)
// test that a new array was allocated with the correct size
require.Equal(t, cap(actual), 2)
}
func TestFilter_WithKey_NoEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same"}, Event{Key: "Same"})
actual := filter("Other", events)
var expected []Event
require.Equal(t, expected, actual)
// test that no array was allocated
require.Equal(t, cap(actual), 0)
}