From b27068b72a324cd20242e301ff865be8c517bc12 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 5 Oct 2020 12:38:38 -0400 Subject: [PATCH] stream: Return a single event from a subscription.Next Handle batch events as a single event --- agent/consul/state/store_integration_test.go | 20 +-- agent/consul/stream/event.go | 45 +++++++ agent/consul/stream/event_publisher_test.go | 122 +++++++++---------- agent/consul/stream/subscription.go | 65 +++++----- agent/consul/stream/subscription_test.go | 61 +++++----- agent/rpc/subscribe/logger.go | 18 ++- agent/rpc/subscribe/subscribe.go | 79 +++++------- 7 files changed, 208 insertions(+), 202 deletions(-) diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index f9c978ed3c..7f2ae62ce4 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -5,10 +5,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" - "github.com/stretchr/testify/require" ) func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { @@ -294,8 +295,8 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { } type nextResult struct { - Events []stream.Event - Err error + Event stream.Event + Err error } func testRunSub(sub *stream.Subscription) <-chan nextResult { @@ -304,8 +305,8 @@ func testRunSub(sub *stream.Subscription) <-chan nextResult { for { es, err := sub.Next(context.TODO()) eventCh <- nextResult{ - Events: es, - Err: err, + Event: es, + Err: err, } if err != nil { return @@ -320,8 +321,8 @@ func assertNoEvent(t *testing.T, eventCh <-chan nextResult) { select { case next := <-eventCh: require.NoError(t, next.Err) - require.Len(t, next.Events, 1) - t.Fatalf("got unwanted event: %#v", next.Events[0].Payload) + require.Len(t, next.Event, 1) + t.Fatalf("got unwanted event: %#v", next.Event.Payload) case <-time.After(100 * time.Millisecond): } } @@ -331,8 +332,7 @@ func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event { select { case next := <-eventCh: require.NoError(t, next.Err) - require.Len(t, next.Events, 1) - return &next.Events[0] + return &next.Event case <-time.After(100 * time.Millisecond): t.Fatalf("no event after 100ms") } @@ -362,7 +362,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { select { case next := <-eventCh: if allowEOS { - if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() { + if next.Err == nil && next.Event.IsEndOfSnapshot() { continue } } diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 65a2edaf9a..adbe0762c9 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -19,6 +19,51 @@ type Event struct { Payload interface{} } +// Len returns the number of events contained within this event. If the Payload +// is a []Event, the length of that slice is returned. Otherwise 1 is returned. +func (e Event) Len() int { + if batch, ok := e.Payload.([]Event); ok { + return len(batch) + } + return 1 +} + +// Filter returns an Event filtered to only those Events where f returns true. +// If the second return value is false, every Event was removed by the filter. +func (e Event) Filter(f func(Event) bool) (Event, bool) { + batch, ok := e.Payload.([]Event) + if !ok { + return e, f(e) + } + + // To avoid extra allocations, iterate over the list of events first and + // get a count of the total desired size. This trades off some extra cpu + // time in the worse case (when not all items match the filter), for + // fewer memory allocations. + var size int + for idx := range batch { + if f(batch[idx]) { + size++ + } + } + if len(batch) == size || size == 0 { + return e, size != 0 + } + + filtered := make([]Event, 0, size) + for idx := range batch { + event := batch[idx] + if f(event) { + filtered = append(filtered, event) + } + } + if len(filtered) == 0 { + return e, false + } + e.Payload = filtered + return e, true +} + // IsEndOfSnapshot returns true if this is a framing event that indicates the // snapshot has completed. Subsequent events from Subscription.Next will be // streamed as they occur. diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 87f8729617..5bae4288b7 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -32,13 +32,11 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) { require.NoError(t, err) eventCh := runSubscription(ctx, sub) - next := getNextEvents(t, eventCh) - expected := []Event{testSnapshotEvent} - require.Equal(t, expected, next) + next := getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) - next = getNextEvents(t, eventCh) - require.Len(t, next, 1) - require.True(t, next[0].IsEndOfSnapshot()) + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) assertNoResult(t, eventCh) @@ -50,8 +48,8 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) { publisher.Publish(events) // Subscriber should see the published event - next = getNextEvents(t, eventCh) - expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}} + next = getNextEvent(t, eventCh) + expected := Event{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic} require.Equal(t, expected, next) } @@ -80,8 +78,8 @@ func runSubscription(ctx context.Context, sub *Subscription) <-chan eventOrErr { for { es, err := sub.Next(ctx) eventCh <- eventOrErr{ - Events: es, - Err: err, + Event: es, + Err: err, } if err != nil { return @@ -92,19 +90,19 @@ func runSubscription(ctx context.Context, sub *Subscription) <-chan eventOrErr { } type eventOrErr struct { - Events []Event - Err error + Event Event + Err error } -func getNextEvents(t *testing.T, eventCh <-chan eventOrErr) []Event { +func getNextEvent(t *testing.T, eventCh <-chan eventOrErr) Event { t.Helper() select { case next := <-eventCh: require.NoError(t, next.Err) - return next.Events + return next.Event case <-time.After(100 * time.Millisecond): t.Fatalf("timeout waiting for event from subscription") - return nil + return Event{} } } @@ -113,8 +111,7 @@ func assertNoResult(t *testing.T, eventCh <-chan eventOrErr) { select { case next := <-eventCh: require.NoError(t, next.Err) - require.Len(t, next.Events, 1) - t.Fatalf("received unexpected event: %#v", next.Events[0].Payload) + t.Fatalf("received unexpected event: %#v", next.Event.Payload) case <-time.After(25 * time.Millisecond): } } @@ -152,11 +149,11 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { func consumeSub(ctx context.Context, sub *Subscription) error { for { - events, err := sub.Next(ctx) + event, err := sub.Next(ctx) switch { case err != nil: return err - case len(events) == 1 && events[0].IsEndOfSnapshot(): + case event.IsEndOfSnapshot(): continue } } @@ -183,28 +180,25 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { require.NoError(t, err) eventCh := runSubscription(ctx, sub) - next := getNextEvents(t, eventCh) - expected := []Event{testSnapshotEvent} - require.Equal(t, expected, next) + next := getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) - next = getNextEvents(t, eventCh) - require.Len(t, next, 1) - require.True(t, next[0].IsEndOfSnapshot()) + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) // Now subscriber should block waiting for updates assertNoResult(t, eventCh) - events := []Event{{ + expected := Event{ Topic: testTopic, Key: "sub-key", Payload: "the-published-event-payload", Index: 3, - }} - publisher.Publish(events) + } + publisher.Publish([]Event{expected}) // Subscriber should see the published event - next = getNextEvents(t, eventCh) - expected = []Event{events[0]} + next = getNextEvent(t, eventCh) require.Equal(t, expected, next) } @@ -228,14 +222,12 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { eventCh := runSubscription(ctx, sub) - next := getNextEvents(t, eventCh) - expected := []Event{testSnapshotEvent} - require.Equal(t, expected, next) + next := getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) - next = getNextEvents(t, eventCh) - require.Len(t, next, 1) - require.True(t, next[0].IsEndOfSnapshot()) - require.Equal(t, uint64(1), next[0].Index) + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) + require.Equal(t, uint64(1), next.Index) }) runStep(t, "resume the subscription", func(t *testing.T) { @@ -255,8 +247,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { } publisher.publishEvent([]Event{expected}) - next := getNextEvents(t, eventCh) - require.Equal(t, []Event{expected}, next) + next := getNextEvent(t, eventCh) + require.Equal(t, expected, next) }) } @@ -280,14 +272,12 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { eventCh := runSubscription(ctx, sub) - next := getNextEvents(t, eventCh) - expected := []Event{testSnapshotEvent} - require.Equal(t, expected, next) + next := getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) - next = getNextEvents(t, eventCh) - require.Len(t, next, 1) - require.True(t, next[0].IsEndOfSnapshot()) - require.Equal(t, uint64(1), next[0].Index) + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) + require.Equal(t, uint64(1), next.Index) }) nextEvent := Event{ @@ -308,14 +298,14 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { require.NoError(t, err) eventCh := runSubscription(ctx, sub) - next := getNextEvents(t, eventCh) - require.True(t, next[0].IsNewSnapshotToFollow(), next) + next := getNextEvent(t, eventCh) + require.True(t, next.IsNewSnapshotToFollow(), next) - next = getNextEvents(t, eventCh) - require.Equal(t, testSnapshotEvent, next[0]) + next = getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) - next = getNextEvents(t, eventCh) - require.True(t, next[0].IsEndOfSnapshot()) + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) }) } @@ -339,14 +329,12 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin eventCh := runSubscription(ctx, sub) - next := getNextEvents(t, eventCh) - expected := []Event{testSnapshotEvent} - require.Equal(t, expected, next) + next := getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) - next = getNextEvents(t, eventCh) - require.Len(t, next, 1) - require.True(t, next[0].IsEndOfSnapshot()) - require.Equal(t, uint64(1), next[0].Index) + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) + require.Equal(t, uint64(1), next.Index) }) nextEvent := Event{ @@ -371,17 +359,17 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin require.NoError(t, err) eventCh := runSubscription(ctx, sub) - next := getNextEvents(t, eventCh) - require.True(t, next[0].IsNewSnapshotToFollow(), next) + next := getNextEvent(t, eventCh) + require.True(t, next.IsNewSnapshotToFollow(), next) - next = getNextEvents(t, eventCh) - require.Equal(t, testSnapshotEvent, next[0]) + next = getNextEvent(t, eventCh) + require.Equal(t, testSnapshotEvent, next) - next = getNextEvents(t, eventCh) - require.True(t, next[0].IsEndOfSnapshot()) + next = getNextEvent(t, eventCh) + require.True(t, next.IsEndOfSnapshot()) - next = getNextEvents(t, eventCh) - require.Equal(t, nextEvent, next[0]) + next = getNextEvent(t, eventCh) + require.Equal(t, nextEvent, next) }) } diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index aa71d3f612..a602cad551 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -65,59 +65,56 @@ func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subs } } -// Next returns the next set of events to deliver. It must only be called from a +// Next returns the next Event to deliver. It must only be called from a // single goroutine concurrently as it mutates the Subscription. -func (s *Subscription) Next(ctx context.Context) ([]Event, error) { +func (s *Subscription) Next(ctx context.Context) (Event, error) { if atomic.LoadUint32(&s.state) == subscriptionStateClosed { - return nil, ErrSubscriptionClosed + return Event{}, ErrSubscriptionClosed } for { next, err := s.currentItem.Next(ctx, s.forceClosed) switch { case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed: - return nil, ErrSubscriptionClosed + return Event{}, ErrSubscriptionClosed case err != nil: - return nil, err + return Event{}, err } s.currentItem = next - - events := filter(s.req.Key, next.Events) - if len(events) == 0 { + if len(next.Events) == 0 { continue } - return events, nil + event, ok := filterByKey(s.req, next.Events) + if !ok { + continue + } + return event, nil } } -// filter events to only those that match the key exactly. -func filter(key string, events []Event) []Event { - if key == "" || len(events) == 0 { - return events +func newEventFromBatch(req SubscribeRequest, events []Event) Event { + first := events[0] + if len(events) == 1 { + return first + } + return Event{ + Topic: req.Topic, + Key: req.Key, + Index: first.Index, + Payload: events, + } +} + +func filterByKey(req SubscribeRequest, events []Event) (Event, bool) { + event := newEventFromBatch(req, events) + if req.Key == "" { + return event, true } - var count int - for _, e := range events { - if key == e.Key { - count++ - } + fn := func(e Event) bool { + return req.Key == e.Key } - - // Only allocate a new slice if some events need to be filtered out - switch count { - case 0: - return nil - case len(events): - return events - } - - result := make([]Event, 0, count) - for _, e := range events { - if key == e.Key { - result = append(result, e) - } - } - return result + return event.Filter(fn) } // Close the subscription. Subscribers will receive an error when they call Next, diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index 29582c2bde..a2f6fb106d 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -36,8 +36,7 @@ func TestSubscription(t *testing.T) { require.NoError(t, err) require.True(t, elapsed < 200*time.Millisecond, "Event should have been delivered immediately, took %s", elapsed) - require.Len(t, got, 1) - require.Equal(t, index, got[0].Index) + require.Equal(t, index, got.Index) // Schedule an event publish in a while index++ @@ -54,8 +53,7 @@ func TestSubscription(t *testing.T) { "Event should have been delivered after blocking 200ms, took %s", elapsed) require.True(t, elapsed < 2*time.Second, "Event should have been delivered after short time, took %s", elapsed) - require.Len(t, got, 1) - require.Equal(t, index, got[0].Index) + require.Equal(t, index, got.Index) // Event with wrong key should not be delivered. Deliver a good message right // so we don't have to block test thread forever or cancel func yet. @@ -70,9 +68,8 @@ func TestSubscription(t *testing.T) { require.NoError(t, err) require.True(t, elapsed < 200*time.Millisecond, "Event should have been delivered immediately, took %s", elapsed) - require.Len(t, got, 1) - require.Equal(t, index, got[0].Index) - require.Equal(t, "test", got[0].Key) + require.Equal(t, index, got.Index) + require.Equal(t, "test", got.Key) // Cancelling the subscription context should unblock Next start = time.Now() @@ -91,9 +88,7 @@ func TestSubscription(t *testing.T) { func TestSubscription_Close(t *testing.T) { eb := newEventBuffer() - index := uint64(100) - startHead := eb.Head() // Start with an event in the buffer @@ -115,8 +110,7 @@ func TestSubscription_Close(t *testing.T) { require.NoError(t, err) require.True(t, elapsed < 200*time.Millisecond, "Event should have been delivered immediately, took %s", elapsed) - require.Len(t, got, 1) - require.Equal(t, index, got[0].Index) + require.Equal(t, index, got.Index) // Schedule a Close simulating the server deciding this subscroption // needs to reset (e.g. on ACL perm change). @@ -149,46 +143,55 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) { func TestFilter_NoKey(t *testing.T) { events := make([]Event, 0, 5) - events = append(events, Event{Key: "One"}, Event{Key: "Two"}) + events = append(events, Event{Key: "One", Index: 102}, Event{Key: "Two"}) - actual := filter("", events) - require.Equal(t, events, actual) + req := SubscribeRequest{Topic: testTopic} + actual, ok := filterByKey(req, events) + require.True(t, ok) + require.Equal(t, Event{Topic: testTopic, Index: 102, Payload: events}, actual) // test that a new array was not allocated - require.Equal(t, cap(actual), 5) + require.Equal(t, cap(actual.Payload.([]Event)), 5) } func TestFilter_WithKey_AllEventsMatch(t *testing.T) { events := make([]Event, 0, 5) - events = append(events, Event{Key: "Same"}, Event{Key: "Same"}) + events = append(events, Event{Key: "Same", Index: 103}, Event{Key: "Same"}) - actual := filter("Same", events) - require.Equal(t, events, actual) + req := SubscribeRequest{Topic: testTopic, Key: "Same"} + actual, ok := filterByKey(req, events) + require.True(t, ok) + expected := Event{Topic: testTopic, Index: 103, Key: "Same", Payload: events} + require.Equal(t, expected, actual) // test that a new array was not allocated - require.Equal(t, cap(actual), 5) + require.Equal(t, 5, cap(actual.Payload.([]Event))) } func TestFilter_WithKey_SomeEventsMatch(t *testing.T) { events := make([]Event, 0, 5) - events = append(events, Event{Key: "Same"}, Event{Key: "Other"}, Event{Key: "Same"}) + events = append(events, Event{Key: "Same", Index: 104}, Event{Key: "Other"}, Event{Key: "Same"}) - actual := filter("Same", events) - expected := []Event{{Key: "Same"}, {Key: "Same"}} + req := SubscribeRequest{Topic: testTopic, Key: "Same"} + actual, ok := filterByKey(req, events) + require.True(t, ok) + expected := Event{ + Topic: testTopic, + Index: 104, + Key: "Same", + Payload: []Event{{Key: "Same", Index: 104}, {Key: "Same"}}, + } require.Equal(t, expected, actual) // test that a new array was allocated with the correct size - require.Equal(t, cap(actual), 2) + require.Equal(t, cap(actual.Payload.([]Event)), 2) } func TestFilter_WithKey_NoEventsMatch(t *testing.T) { events := make([]Event, 0, 5) events = append(events, Event{Key: "Same"}, Event{Key: "Same"}) - actual := filter("Other", events) - var expected []Event - require.Equal(t, expected, actual) - - // test that no array was allocated - require.Equal(t, cap(actual), 0) + req := SubscribeRequest{Topic: testTopic, Key: "Other"} + _, ok := filterByKey(req, events) + require.False(t, ok) } diff --git a/agent/rpc/subscribe/logger.go b/agent/rpc/subscribe/logger.go index 9aadf6a40e..8615fbd905 100644 --- a/agent/rpc/subscribe/logger.go +++ b/agent/rpc/subscribe/logger.go @@ -52,21 +52,17 @@ type eventLogger struct { count uint64 } -func (l *eventLogger) Trace(e []stream.Event) { - if len(e) == 0 { - return - } - - first := e[0] +func (l *eventLogger) Trace(e stream.Event) { switch { - case first.IsEndOfSnapshot(): + case e.IsEndOfSnapshot(): l.snapshotDone = true - l.logger.Trace("snapshot complete", "index", first.Index, "sent", l.count) - case first.IsNewSnapshotToFollow(): + l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count) + case e.IsNewSnapshotToFollow(): + l.logger.Trace("starting new snapshot", "sent", l.count) return case l.snapshotDone: - l.logger.Trace("sending events", "index", first.Index, "sent", l.count, "batch_size", len(e)) + l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", e.Len()) } - l.count += uint64(len(e)) + l.count += uint64(e.Len()) } diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 981c1714b0..5c7aedb36b 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -67,7 +67,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub elog := &eventLogger{logger: logger} for { - events, err := sub.Next(ctx) + event, err := sub.Next(ctx) switch { case errors.Is(err, stream.ErrSubscriptionClosed): logger.Trace("subscription reset by server") @@ -76,13 +76,14 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub return err } - events = filterStreamEvents(authz, events) - if len(events) == 0 { + var ok bool + event, ok = filterByAuth(authz, event) + if !ok { continue } - elog.Trace(events) - e := newEventFromStreamEvents(req, events) + elog.Trace(event) + e := newEventFromStreamEvent(req, event) if err := serverStream.Send(e); err != nil { return err } @@ -126,68 +127,44 @@ func forwardToDC( } } -// filterStreamEvents to only those allowed by the acl token. -func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event { +// filterByAuth to only those Events allowed by the acl token. +func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) { // authz will be nil when ACLs are disabled - if authz == nil || len(events) == 0 { - return events + if authz == nil { + return event, true } - - // Fast path for the common case of only 1 event since we can avoid slice - // allocation in the hot path of every single update event delivered in vast - // majority of cases with this. Note that this is called _per event/item_ when - // sending snapshots which is a lot worse than being called once on regular - // result. - if len(events) == 1 { - if enforceACL(authz, events[0]) == acl.Allow { - return events - } - return nil + fn := func(e stream.Event) bool { + return enforceACL(authz, e) == acl.Allow } - - var filtered []stream.Event - for idx := range events { - event := events[idx] - if enforceACL(authz, event) == acl.Allow { - filtered = append(filtered, event) - } - } - return filtered + return event.Filter(fn) } -func newEventFromStreamEvents(req *pbsubscribe.SubscribeRequest, events []stream.Event) *pbsubscribe.Event { +func newEventFromStreamEvent(req *pbsubscribe.SubscribeRequest, event stream.Event) *pbsubscribe.Event { e := &pbsubscribe.Event{ Topic: req.Topic, Key: req.Key, - Index: events[0].Index, + Index: event.Index, } - - if len(events) == 1 { - event := events[0] - // TODO: refactor so these are only checked once, instead of 3 times. - switch { - case event.IsEndOfSnapshot(): - e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true} - return e - case event.IsNewSnapshotToFollow(): - e.Payload = &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true} - return e - } - - setPayload(e, event.Payload) + switch { + case event.IsEndOfSnapshot(): + e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true} + return e + case event.IsNewSnapshotToFollow(): + e.Payload = &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true} return e } - - e.Payload = &pbsubscribe.Event_EventBatch{ - EventBatch: &pbsubscribe.EventBatch{ - Events: batchEventsFromEventSlice(events), - }, - } + setPayload(e, event.Payload) return e } func setPayload(e *pbsubscribe.Event, payload interface{}) { switch p := payload.(type) { + case []stream.Event: + e.Payload = &pbsubscribe.Event_EventBatch{ + EventBatch: &pbsubscribe.EventBatch{ + Events: batchEventsFromEventSlice(p), + }, + } case state.EventPayloadCheckServiceNode: e.Payload = &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{