stream: full test coverage for EventPublisher.Subscribe

This commit is contained in:
Daniel Nephin 2020-10-01 16:34:00 -04:00
parent b88f817bc5
commit 1c6be5ac75
2 changed files with 268 additions and 39 deletions

View File

@ -36,7 +36,7 @@ type EventPublisher struct {
// publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan changeEvents
publishCh chan []Event
snapshotHandlers SnapshotHandlers
}
@ -54,10 +54,6 @@ type subscriptions struct {
byToken map[string]map[*SubscribeRequest]*Subscription
}
type changeEvents struct {
events []Event
}
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
// The nil Topic is reserved and should not be used.
@ -84,7 +80,7 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer),
snapCache: make(map[Topic]map[string]*eventSnapshot),
publishCh: make(chan changeEvents, 64),
publishCh: make(chan []Event, 64),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
@ -97,7 +93,7 @@ func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *E
// Publish events to all subscribers of the event Topic.
func (e *EventPublisher) Publish(events []Event) {
if len(events) > 0 {
e.publishCh <- changeEvents{events: events}
e.publishCh <- events
}
}
@ -110,16 +106,16 @@ func (e *EventPublisher) Run(ctx context.Context) {
e.subscriptions.closeAll()
return
case update := <-e.publishCh:
e.sendEvents(update)
e.publishEvent(update)
}
}
}
// sendEvents sends the given events to any applicable topic listeners, as well
// as any ACL update events to cause affected listeners to reset their stream.
func (e *EventPublisher) sendEvents(update changeEvents) {
// publishEvent appends the events to any applicable topic buffers. It handles
// any closeSubscriptionPayload events by closing associated subscriptions.
func (e *EventPublisher) publishEvent(events []Event) {
eventsByTopic := make(map[Topic][]Event)
for _, event := range update.events {
for _, event := range events {
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
continue
@ -183,7 +179,6 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
}
snap := newEventSnapshot()
// TODO: testcase for this case, especially the from-cache-splice case
// if the request has an Index the client view is stale and must be reset
// with a NewSnapshotToFollow event.
if req.Index > 0 {

View File

@ -17,8 +17,8 @@ func (i intTopic) String() string {
var testTopic Topic = intTopic(999)
func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
subscription := &SubscribeRequest{
func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
@ -28,20 +28,18 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
go publisher.Run(ctx)
sub, err := publisher.Subscribe(subscription)
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
eventCh := runSubscription(ctx, sub)
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []Event{{Payload: "snapshot-event-payload", Key: "sub-key"}}
require.Equal(t, expected, result.Events)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
result = nextResult(t, eventCh)
require.Len(t, result.Events, 1)
require.True(t, result.Events[0].IsEndOfSnapshot())
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []Event{{
@ -52,10 +50,16 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) {
publisher.Publish(events)
// Subscriber should see the published event
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
next = getNextEvents(t, eventCh)
expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}}
require.Equal(t, expected, result.Events)
require.Equal(t, expected, next)
}
var testSnapshotEvent = Event{
Topic: testTopic,
Payload: "snapshot-event-payload",
Key: "sub-key",
Index: 1,
}
func newTestSnapshotHandlers() SnapshotHandlers {
@ -64,18 +68,18 @@ func newTestSnapshotHandlers() SnapshotHandlers {
if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
}
buf.Append([]Event{{Payload: "snapshot-event-payload", Key: "sub-key"}})
buf.Append([]Event{testSnapshotEvent})
return 1, nil
},
}
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
func runSubscription(ctx context.Context, sub *Subscription) <-chan eventOrErr {
eventCh := make(chan eventOrErr, 1)
go func() {
for {
es, err := sub.Next(ctx)
eventCh <- subNextResult{
eventCh <- eventOrErr{
Events: es,
Err: err,
}
@ -87,30 +91,31 @@ func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextR
return eventCh
}
type subNextResult struct {
type eventOrErr struct {
Events []Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
func getNextEvents(t *testing.T, eventCh <-chan eventOrErr) []Event {
t.Helper()
select {
case next := <-eventCh:
return next
require.NoError(t, next.Err)
return next.Events
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
t.Fatalf("timeout waiting for event from subscription")
return nil
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
func assertNoResult(t *testing.T, eventCh <-chan eventOrErr) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
case <-time.After(25 * time.Millisecond):
}
}
@ -156,3 +161,232 @@ func consumeSub(ctx context.Context, sub *Subscription) error {
}
}
}
func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
_, err := publisher.Subscribe(req)
require.NoError(t, err)
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
}
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []Event{{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
Index: 3,
}}
publisher.Publish(events)
// Subscriber should see the published event
next = getNextEvents(t, eventCh)
expected = []Event{events[0]}
require.Equal(t, expected, next)
}
func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
assertNoResult(t, eventCh)
expected := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
publisher.publishEvent([]Event{expected})
next := getNextEvents(t, eventCh)
require.Equal(t, []Event{expected}, next)
})
}
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), 0)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {
publisher.publishEvent([]Event{nextEvent})
})
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
require.True(t, next[0].IsNewSnapshotToFollow(), next)
next = getNextEvents(t, eventCh)
require.Equal(t, testSnapshotEvent, next[0])
next = getNextEvents(t, eventCh)
require.True(t, next[0].IsEndOfSnapshot())
})
}
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
})
nextEvent := Event{
Topic: testTopic,
Key: "sub-key",
Index: 3,
Payload: "event-3",
}
runStep(t, "publish an event while unsubed", func(t *testing.T) {
publisher.publishEvent([]Event{nextEvent})
})
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
}
runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
require.True(t, next[0].IsNewSnapshotToFollow(), next)
next = getNextEvents(t, eventCh)
require.Equal(t, testSnapshotEvent, next[0])
next = getNextEvents(t, eventCh)
require.True(t, next[0].IsEndOfSnapshot())
next = getNextEvents(t, eventCh)
require.Equal(t, nextEvent, next[0])
})
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
if !t.Run(name, fn) {
t.FailNow()
}
}