diff --git a/agent/consul/state/acl_events.go b/agent/consul/state/acl_events.go index 6968643571..417962bb71 100644 --- a/agent/consul/state/acl_events.go +++ b/agent/consul/state/acl_events.go @@ -52,7 +52,7 @@ func aclChangeUnsubscribeEvent(tx db.ReadTxn, changes db.Changes) ([]stream.Even } } // TODO: should we remove duplicate IDs here, or rely on sub.Close() being idempotent - return []stream.Event{stream.NewUnsubscribeEvent(secretIDs)}, nil + return []stream.Event{stream.NewCloseSubscriptionEvent(secretIDs)}, nil } // changeObject returns the object before it was deleted if the change was a delete, diff --git a/agent/consul/state/acl_events_test.go b/agent/consul/state/acl_events_test.go index c2c7e1843a..550d7cacea 100644 --- a/agent/consul/state/acl_events_test.go +++ b/agent/consul/state/acl_events_test.go @@ -23,7 +23,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { Mutate: func(s *Store, tx *txn) error { return s.aclTokenSetTxn(tx, tx.Index, newACLToken(1), false, false, false, false) }, - expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)), }, { Name: "token update", @@ -37,7 +37,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { token.Policies = []structs.ACLTokenPolicyLink{{ID: "33333333-1111-1111-1111-111111111111"}} return s.aclTokenSetTxn(tx, tx.Index, token, false, true, false, false) }, - expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)), }, { Name: "token delete", @@ -48,13 +48,13 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { token := newACLToken(1) return s.aclTokenDeleteTxn(tx, tx.Index, token.AccessorID, "id", nil) }, - expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)), }, { Name: "policy create", Mutate: newACLPolicyWithSingleToken, // two identical tokens, because Mutate has two changes - expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1, 1)), }, { Name: "policy update", @@ -64,7 +64,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { policy.Rules = `operator = "write"` return s.aclPolicySetTxn(tx, tx.Index, policy) }, - expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)), }, { Name: "policy delete", @@ -73,13 +73,13 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { policy := newACLPolicy(1) return s.aclPolicyDeleteTxn(tx, tx.Index, policy.ID, s.aclPolicyGetByID, nil) }, - expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)), }, { Name: "role create", Mutate: newACLRoleWithSingleToken, // Two tokens with the same ID, because there are two changes in Mutate - expected: stream.NewUnsubscribeEvent(newSecretIDs(1, 1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1, 1)), }, { Name: "role update", @@ -93,7 +93,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { }) return s.aclRoleSetTxn(tx, tx.Index, role, true) }, - expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)), }, { Name: "role delete", @@ -102,7 +102,7 @@ func TestACLChangeUnsubscribeEvent(t *testing.T) { role := newACLRole(1, newACLRolePolicyLink(1)) return s.aclRoleDeleteTxn(tx, tx.Index, role.ID, s.aclRoleGetByID, nil) }, - expected: stream.NewUnsubscribeEvent(newSecretIDs(1)), + expected: stream.NewCloseSubscriptionEvent(newSecretIDs(1)), }, } diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 0dd56b61b1..641d853701 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -62,7 +62,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Ensure the reset event was sent. err = assertErr(t, eventCh) - require.Equal(stream.ErrSubscriptionReload, err) + require.Equal(stream.ErrSubscriptionClosed, err) // Register another subscription. subscription2 := &stream.SubscribeRequest{ @@ -90,7 +90,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Ensure the reset event was sent. err = assertErr(t, eventCh2) - require.Equal(stream.ErrSubscriptionReload, err) + require.Equal(stream.ErrSubscriptionClosed, err) } func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { @@ -175,7 +175,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Ensure the reload event was sent. err = assertErr(t, eventCh) - require.Equal(stream.ErrSubscriptionReload, err) + require.Equal(stream.ErrSubscriptionClosed, err) // Register another subscription. subscription3 := &stream.SubscribeRequest{ @@ -362,7 +362,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { } } require.Error(t, next.Err) - require.Equal(t, stream.ErrSubscriptionReload, next.Err) + require.Equal(t, stream.ErrSubscriptionClosed, next.Err) return case <-timeoutCh: t.Fatalf("no err after 100ms") @@ -390,7 +390,7 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler { } return events, nil }, - Snapshot: func(req *stream.SubscribeRequest, buffer *stream.EventBuffer) (uint64, error) { + Snapshot: func(req *stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) if err != nil { return idx, err @@ -403,7 +403,7 @@ func newTestTopicHandlers(s *Store) map[stream.Topic]stream.TopicHandler { Index: node.ModifyIndex, Payload: node, } - buffer.Append([]stream.Event{event}) + snap.Append([]stream.Event{event}) } return idx, nil }, diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index ef6265a565..06f40f1f3e 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -1,3 +1,7 @@ +/* +Package stream provides a publish/subscribe system for events produced by changes +to the state store. +*/ package stream type Topic int32 @@ -24,21 +28,21 @@ func (e Event) IsEndOfSnapshot() bool { } func (e Event) IsResumeStream() bool { - return e.Payload == ResumeStream{} + return e.Payload == resumeStream{} } type endOfSnapshot struct{} -type ResumeStream struct{} +type resumeStream struct{} -// TODO: unexport once EventPublisher is in stream package -type UnsubscribePayload struct { - TokensSecretIDs []string +type closeSubscriptionPayload struct { + tokensSecretIDs []string } -// NewUnsubscribeEvent returns a special Event that is handled by the -// stream package, and is never sent to subscribers. It results in any subscriptions -// which match any of the TokenSecretIDs to be unsubscribed. -func NewUnsubscribeEvent(tokenSecretIDs []string) Event { - return Event{Payload: UnsubscribePayload{TokensSecretIDs: tokenSecretIDs}} +// NewCloseSubscriptionEvent returns a special Event that is handled by the +// stream package, and is never sent to subscribers. EventProcessor handles +// these events, and closes any subscriptions which were created using a token +// which matches any of the tokenSecretIDs. +func NewCloseSubscriptionEvent(tokenSecretIDs []string) Event { + return Event{Payload: closeSubscriptionPayload{tokensSecretIDs: tokenSecretIDs}} } diff --git a/agent/consul/stream/event_buffer.go b/agent/consul/stream/event_buffer.go index f8a037e349..d104f689d3 100644 --- a/agent/consul/stream/event_buffer.go +++ b/agent/consul/stream/event_buffer.go @@ -6,7 +6,7 @@ import ( "sync/atomic" ) -// EventBuffer is a single-writer, multiple-reader, unlimited length concurrent +// eventBuffer is a single-writer, multiple-reader, unlimited length concurrent // buffer of events that have been published on a topic. The buffer is // effectively just the head of an atomically updated single-linked list. Atomic // accesses are usually to be suspected as premature optimization but this @@ -27,7 +27,7 @@ import ( // // The buffer is used to deliver all messages broadcast toa topic for active // subscribers to consume, but it is also an effective way to both deliver and -// optionally cache snapshots per topic and key. byt using an EventBuffer, +// optionally cache snapshots per topic and key. byt using an eventBuffer, // snapshot functions don't have to read the whole snapshot into memory before // delivery - they can stream from memdb. However simply by storing a pointer to // the first event in the buffer, we can cache the buffered events for future @@ -46,26 +46,26 @@ import ( // automatically keep the events we need to make that work for exactly the // optimal amount of time and no longer. // -// A new buffer is constructed with a sentinel "empty" BufferItem that has a nil +// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil // Events array. This enables subscribers to start watching for the next update // immediately. // -// The zero value EventBuffer is _not_ a usable type since it has not been +// The zero value eventBuffer is _not_ a usable type since it has not been // initialized with an empty bufferItem so can't be used to wait for the first -// published event. Call NewEventBuffer to construct a new buffer. +// published event. Call newEventBuffer to construct a new buffer. // // Calls to Append or AppendBuffer that mutate the head must be externally // synchronized. This allows systems that already serialize writes to append // without lock overhead (e.g. a snapshot goroutine appending thousands of // events). -type EventBuffer struct { +type eventBuffer struct { head atomic.Value } -// NewEventBuffer creates an EventBuffer ready for use. -func NewEventBuffer() *EventBuffer { - b := &EventBuffer{} - b.head.Store(NewBufferItem()) +// newEventBuffer creates an eventBuffer ready for use. +func newEventBuffer() *eventBuffer { + b := &eventBuffer{} + b.head.Store(newBufferItem()) return b } @@ -76,9 +76,9 @@ func NewEventBuffer() *EventBuffer { // mutations to the events as they may have been exposed to subscribers in other // goroutines. Append only supports a single concurrent caller and must be // externally synchronized with other Append, AppendBuffer or AppendErr calls. -func (b *EventBuffer) Append(events []Event) { +func (b *eventBuffer) Append(events []Event) { // Push events to the head - it := NewBufferItem() + it := newBufferItem() it.Events = events b.AppendBuffer(it) } @@ -92,7 +92,7 @@ func (b *EventBuffer) Append(events []Event) { // // AppendBuffer only supports a single concurrent caller and must be externally // synchronized with other Append, AppendBuffer or AppendErr calls. -func (b *EventBuffer) AppendBuffer(item *BufferItem) { +func (b *eventBuffer) AppendBuffer(item *bufferItem) { // First store it as the next node for the old head this ensures once it's // visible to new searchers the linked list is already valid. Not sure it // matters but this seems nicer. @@ -110,20 +110,20 @@ func (b *EventBuffer) AppendBuffer(item *BufferItem) { // streaming subscription and return the error. AppendErr only supports a // single concurrent caller and must be externally synchronized with other // Append, AppendBuffer or AppendErr calls. -func (b *EventBuffer) AppendErr(err error) { - b.AppendBuffer(&BufferItem{Err: err}) +func (b *eventBuffer) AppendErr(err error) { + b.AppendBuffer(&bufferItem{Err: err}) } // Head returns the current head of the buffer. It will always exist but it may // be a "sentinel" empty item with a nil Events slice to allow consumers to // watch for the next update. Consumers should always check for empty Events and -// treat them as no-ops. Will panic if EventBuffer was not initialized correctly -// with EventBuffer. -func (b *EventBuffer) Head() *BufferItem { - return b.head.Load().(*BufferItem) +// treat them as no-ops. Will panic if eventBuffer was not initialized correctly +// with eventBuffer. +func (b *eventBuffer) Head() *bufferItem { + return b.head.Load().(*bufferItem) } -// BufferItem represents a set of events published by a single raft operation. +// bufferItem represents a set of events published by a single raft operation. // The first item returned by a newly constructed buffer will have nil Events. // It is a sentinel value which is used to wait on the next events via Next. // @@ -135,9 +135,9 @@ func (b *EventBuffer) Head() *BufferItem { // they have been delivered except where it's intentional to maintain a cache or // trailing store of events for performance reasons. // -// Subscribers must not mutate the BufferItem or the Events or Encoded payloads +// Subscribers must not mutate the bufferItem or the Events or Encoded payloads // inside as these are shared between all readers. -type BufferItem struct { +type bufferItem struct { // Events is the set of events published at one raft index. This may be nil as // a sentinel value to allow watching for the first event in a buffer. Callers // should check and skip nil Events at any point in the buffer. It will also @@ -170,10 +170,10 @@ type bufferLink struct { ch chan struct{} } -// NewBufferItem returns a blank buffer item with a link and chan ready to have +// newBufferItem returns a blank buffer item with a link and chan ready to have // the fields set and be appended to a buffer. -func NewBufferItem() *BufferItem { - return &BufferItem{ +func newBufferItem() *bufferItem { + return &bufferItem{ link: &bufferLink{ ch: make(chan struct{}), }, @@ -182,7 +182,7 @@ func NewBufferItem() *BufferItem { // Next return the next buffer item in the buffer. It may block until ctx is // cancelled or until the next item is published. -func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) { +func (i *bufferItem) Next(ctx context.Context) (*bufferItem, error) { // See if there is already a next value, block if so. Note we don't rely on // state change (chan nil) as that's not threadsafe but detecting close is. select { @@ -197,7 +197,7 @@ func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) { // shouldn't be possible return nil, errors.New("invalid next item") } - next := nextRaw.(*BufferItem) + next := nextRaw.(*bufferItem) if next.Err != nil { return nil, next.Err } @@ -210,12 +210,12 @@ func (i *BufferItem) Next(ctx context.Context) (*BufferItem, error) { // NextNoBlock returns the next item in the buffer without blocking. If it // reaches the most recent item it will return nil and no error. -func (i *BufferItem) NextNoBlock() (*BufferItem, error) { +func (i *bufferItem) NextNoBlock() (*bufferItem, error) { nextRaw := i.link.next.Load() if nextRaw == nil { return nil, nil } - next := nextRaw.(*BufferItem) + next := nextRaw.(*bufferItem) if next.Err != nil { return nil, next.Err } @@ -230,14 +230,14 @@ func (i *BufferItem) NextNoBlock() (*BufferItem, error) { // one, or if not it returns an empty item (that will be ignored by subscribers) // that has the same link as the current buffer so that it will be notified of // future updates in the buffer without including the current item. -func (i *BufferItem) FollowAfter() (*BufferItem, error) { +func (i *bufferItem) FollowAfter() (*bufferItem, error) { next, err := i.NextNoBlock() if err != nil { return nil, err } if next == nil { // Return an empty item that can be followed to the next item published. - item := &BufferItem{} + item := &bufferItem{} item.link = i.link return item, nil } diff --git a/agent/consul/stream/event_buffer_test.go b/agent/consul/stream/event_buffer_test.go index cd2461c778..f8ca7b7b7d 100644 --- a/agent/consul/stream/event_buffer_test.go +++ b/agent/consul/stream/event_buffer_test.go @@ -20,7 +20,7 @@ func TestEventBufferFuzz(t *testing.T) { nReaders := 1000 nMessages := 1000 - b := NewEventBuffer() + b := newEventBuffer() // Start a write goroutine that will publish 10000 messages with sequential // indexes and some jitter in timing (to allow clients to "catch up" and block diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index e56612d350..fee2f29232 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -33,11 +33,11 @@ type EventPublisher struct { // topicBuffers stores the head of the linked-list buffer to publish events to // for a topic. - topicBuffers map[Topic]*EventBuffer + topicBuffers map[Topic]*eventBuffer // snapCache if a cache of EventSnapshots indexed by topic and key. // TODO: new struct for snapCache and snapFns and snapCacheTTL - snapCache map[Topic]map[string]*EventSnapshot + snapCache map[Topic]map[string]*eventSnapshot subscriptions *subscriptions @@ -69,13 +69,20 @@ type changeEvents struct { // TopicHandler provides functions which create stream.Events for a topic. type TopicHandler struct { // Snapshot creates the necessary events to reproduce the current state and - // appends them to the EventBuffer. - Snapshot func(*SubscribeRequest, *EventBuffer) (index uint64, err error) + // appends them to the eventBuffer. + Snapshot func(*SubscribeRequest, SnapshotAppender) (index uint64, err error) // ProcessChanges accepts a slice of Changes, and builds a slice of events for // those changes. ProcessChanges func(db.ReadTxn, db.Changes) ([]Event, error) } +// SnapshotAppender appends groups of events to create a Snapshot of state. +type SnapshotAppender interface { + // Append events to the snapshot. + // TODO: document why parameter is a slice instead of a single Event + Append(events []Event) +} + // NewEventPublisher returns an EventPublisher for publishing change events. // Handlers are used to convert the memDB changes into events. // A goroutine is run in the background to publish events to all subscribes. @@ -84,8 +91,8 @@ type TopicHandler struct { func NewEventPublisher(ctx context.Context, handlers map[Topic]TopicHandler, snapCacheTTL time.Duration) *EventPublisher { e := &EventPublisher{ snapCacheTTL: snapCacheTTL, - topicBuffers: make(map[Topic]*EventBuffer), - snapCache: make(map[Topic]map[string]*EventSnapshot), + topicBuffers: make(map[Topic]*eventBuffer), + snapCache: make(map[Topic]map[string]*eventSnapshot), publishCh: make(chan changeEvents, 64), subscriptions: &subscriptions{ byToken: make(map[string]map[*SubscribeRequest]*Subscription), @@ -136,8 +143,8 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) { // as any ACL update events to cause affected listeners to reset their stream. func (e *EventPublisher) sendEvents(update changeEvents) { for _, event := range update.events { - if unsubEvent, ok := event.Payload.(UnsubscribePayload); ok { - e.subscriptions.closeSubscriptionsForTokens(unsubEvent.TokensSecretIDs) + if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok { + e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs) } } @@ -160,10 +167,10 @@ func (e *EventPublisher) sendEvents(update changeEvents) { // already exist. // // EventPublisher.lock must be held to call this method. -func (e *EventPublisher) getTopicBuffer(topic Topic) *EventBuffer { +func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer { buf, ok := e.topicBuffers[topic] if !ok { - buf = NewEventBuffer() + buf = newEventBuffer() e.topicBuffers[topic] = buf } return buf @@ -207,10 +214,10 @@ func (e *EventPublisher) Subscribe( Index: req.Index, Topic: req.Topic, Key: req.Key, - Payload: ResumeStream{}, + Payload: resumeStream{}, } // Make a new buffer to send to the client containing the resume. - buf := NewEventBuffer() + buf := newEventBuffer() // Store the head of that buffer before we append to it to give as the // starting point for the subscription. @@ -226,13 +233,13 @@ func (e *EventPublisher) Subscribe( } buf.AppendBuffer(follow) - sub = NewSubscription(ctx, req, subHead) + sub = newSubscription(ctx, req, subHead) } else { snap, err := e.getSnapshotLocked(req, topicHead) if err != nil { return nil, err } - sub = NewSubscription(ctx, req, snap.Snap) + sub = newSubscription(ctx, req, snap.Snap) } e.subscriptions.add(req, sub) @@ -288,16 +295,16 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) { } } -func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *BufferItem) (*EventSnapshot, error) { +func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) { // See if there is a cached snapshot topicSnaps, ok := e.snapCache[req.Topic] if !ok { - topicSnaps = make(map[string]*EventSnapshot) + topicSnaps = make(map[string]*eventSnapshot) e.snapCache[req.Topic] = topicSnaps } snap, ok := topicSnaps[req.Key] - if ok && snap.Err() == nil { + if ok && snap.err() == nil { return snap, nil } @@ -307,7 +314,7 @@ func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *Buf return nil, fmt.Errorf("unknown topic %d", req.Topic) } - snap = NewEventSnapshot(req, topicHead, handler.Snapshot) + snap = newEventSnapshot(req, topicHead, handler.Snapshot) if e.snapCacheTTL > 0 { topicSnaps[req.Key] = snap diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 5f9df33bab..cdefc4f9db 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -51,7 +51,7 @@ func TestEventPublisher_PublishChangesAndSubscribe_WithSnapshot(t *testing.T) { func newTestTopicHandlers() map[Topic]TopicHandler { return map[Topic]TopicHandler{ testTopic: { - Snapshot: func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) { + Snapshot: func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { if req.Topic != testTopic { return 0, fmt.Errorf("unexpected topic: %v", req.Topic) } diff --git a/agent/consul/stream/event_snapshot.go b/agent/consul/stream/event_snapshot.go index 59359a925a..0744daf121 100644 --- a/agent/consul/stream/event_snapshot.go +++ b/agent/consul/stream/event_snapshot.go @@ -1,37 +1,35 @@ package stream -// EventSnapshot represents the state of memdb for a given topic and key at some +// eventSnapshot represents the state of memdb for a given topic and key at some // point in time. It is modelled as a buffer of events so that snapshots can be // streamed to possibly multiple subscribers concurrently, and can be trivially -// cached by retaining a reference to a Snapshot. Once the reference to EventSnapshot +// cached by retaining a reference to a Snapshot. Once the reference to eventSnapshot // is dropped from memory, any subscribers still reading from it may do so by following // their pointers. When the last subscribe unsubscribes the snapshot is garbage // collected automatically by Go's runtime. This simplifies snapshot and buffer // management dramatically. -type EventSnapshot struct { +type eventSnapshot struct { // Snap is the first item in the buffer containing the snapshot. Once the // snapshot is complete, subsequent BufferItems are appended to snapBuffer, // so that subscribers receive all the events from the same buffer. - Snap *BufferItem + Snap *bufferItem // snapBuffer is the Head of the snapshot buffer the fn should write to. - snapBuffer *EventBuffer + snapBuffer *eventBuffer } -// SnapFn is the type of function needed to generate a snapshot for a topic and -// key. -type SnapFn func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) +type snapFunc func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) -// NewEventSnapshot creates a snapshot buffer based on the subscription request. +// newEventSnapshot creates a snapshot buffer based on the subscription request. // The current buffer head for the topic in question is passed so that once the // snapshot is complete and has been delivered into the buffer, any events // published during snapshotting can be immediately appended and won't be // missed. Once the snapshot is delivered the topic buffer is spliced onto the // snapshot buffer so that subscribers will naturally follow from the snapshot // to wait for any subsequent updates. -func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn SnapFn) *EventSnapshot { - buf := NewEventBuffer() - s := &EventSnapshot{ +func newEventSnapshot(req *SubscribeRequest, topicBufferHead *bufferItem, fn snapFunc) *eventSnapshot { + buf := newEventBuffer() + s := &eventSnapshot{ Snap: buf.Head(), snapBuffer: buf, } @@ -54,7 +52,7 @@ func NewEventSnapshot(req *SubscribeRequest, topicBufferHead *BufferItem, fn Sna return s } -func (s *EventSnapshot) spliceFromTopicBuffer(topicBufferHead *BufferItem, idx uint64) { +func (s *eventSnapshot) spliceFromTopicBuffer(topicBufferHead *bufferItem, idx uint64) { // Now splice on the topic buffer. We need to iterate through the buffer to // find the first event after the current snapshot. item := topicBufferHead @@ -102,10 +100,10 @@ func (s *EventSnapshot) spliceFromTopicBuffer(topicBufferHead *BufferItem, idx u } } -// Err returns an error if the snapshot func has failed with an error or nil +// err returns an error if the snapshot func has failed with an error or nil // otherwise. Nil doesn't necessarily mean there won't be an error but there // hasn't been one yet. -func (s *EventSnapshot) Err() error { +func (s *eventSnapshot) err() error { // Fetch the head of the buffer, this is atomic. If the snapshot func errored // then the last event will be an error. head := s.snapBuffer.Head() diff --git a/agent/consul/stream/event_snapshot_test.go b/agent/consul/stream/event_snapshot_test.go index 21164491d3..8b2185de2b 100644 --- a/agent/consul/stream/event_snapshot_test.go +++ b/agent/consul/stream/event_snapshot_test.go @@ -70,10 +70,10 @@ func TestEventSnapshot(t *testing.T) { snFn := testHealthConsecutiveSnapshotFn(tc.snapshotSize, snapIndex) // Create a topic buffer for updates - tb := NewEventBuffer() + tb := newEventBuffer() // Capture the topic buffer head now so updatesBeforeSnap are "concurrent" - // and are seen by the EventSnapshot once it completes the snap. + // and are seen by the eventSnapshot once it completes the snap. tbHead := tb.Head() // Deliver any pre-snapshot events simulating updates that occur after the @@ -87,9 +87,9 @@ func TestEventSnapshot(t *testing.T) { tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)}) } - // Create EventSnapshot, (will call snFn in another goroutine). The - // Request is ignored by the SnapFn so doesn't matter for now. - es := NewEventSnapshot(&SubscribeRequest{}, tbHead, snFn) + // Create eventSnapshot, (will call snFn in another goroutine). The + // Request is ignored by the snapFunc so doesn't matter for now. + es := newEventSnapshot(&SubscribeRequest{}, tbHead, snFn) // Deliver any post-snapshot events simulating updates that occur // logically after snapshot. It doesn't matter that these might actually @@ -155,8 +155,8 @@ func genSequentialIDs(start, end int) []string { return ids } -func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapFn { - return func(req *SubscribeRequest, buf *EventBuffer) (uint64, error) { +func testHealthConsecutiveSnapshotFn(size int, index uint64) snapFunc { + return func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) { for i := 0; i < size; i++ { // Event content is arbitrary we are just using Health because it's the // first type defined. We just want a set of things with consecutive diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 36e4bc5e0f..c04f80d53d 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -17,11 +17,9 @@ const ( subscriptionStateClosed uint32 = 1 ) -var ( - // ErrSubscriptionReload is a error signalling reload event should be sent to - // the client and the server should close. - ErrSubscriptionReload = errors.New("subscription closed by server, client should retry") -) +// ErrSubscriptionClosed is a error signalling the subscription has been +// closed. The client should Unsubscribe, then re-Subscribe. +var ErrSubscriptionClosed = errors.New("subscription closed by server, client should unsub and retry") // Subscription holds state about a single Subscribe call. Subscribe clients // access their next event by calling Next(). This may initially include the @@ -35,7 +33,7 @@ type Subscription struct { // currentItem stores the current snapshot or topic buffer item we are on. It // is mutated by calls to Next. - currentItem *BufferItem + currentItem *bufferItem // ctx is the Subscription context that wraps the context of the streaming RPC // handler call. @@ -59,9 +57,9 @@ type SubscribeRequest struct { Index uint64 } -// NewSubscription return a new subscription. The caller is responsible for +// newSubscription return a new subscription. The caller is responsible for // calling Unsubscribe when it is done with the subscription, to free resources. -func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferItem) *Subscription { +func newSubscription(ctx context.Context, req *SubscribeRequest, item *bufferItem) *Subscription { subCtx, cancel := context.WithCancel(ctx) return &Subscription{ ctx: subCtx, @@ -75,7 +73,7 @@ func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferIte // single goroutine concurrently as it mutates the Subscription. func (s *Subscription) Next() ([]Event, error) { if atomic.LoadUint32(&s.state) == subscriptionStateClosed { - return nil, ErrSubscriptionReload + return nil, ErrSubscriptionClosed } for { @@ -83,7 +81,7 @@ func (s *Subscription) Next() ([]Event, error) { if err != nil { // Check we didn't return because of a state change cancelling the context if atomic.LoadUint32(&s.state) == subscriptionStateClosed { - return nil, ErrSubscriptionReload + return nil, ErrSubscriptionClosed } return nil, err } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index 615f7f30cb..2a5eb0654c 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -9,7 +9,7 @@ import ( ) func TestSubscription(t *testing.T) { - eb := NewEventBuffer() + eb := newEventBuffer() index := uint64(100) @@ -26,7 +26,7 @@ func TestSubscription(t *testing.T) { Topic: Topic_ServiceHealth, Key: "test", } - sub := NewSubscription(ctx, req, startHead) + sub := newSubscription(ctx, req, startHead) // First call to sub.Next should return our published event immediately start := time.Now() @@ -89,7 +89,7 @@ func TestSubscription(t *testing.T) { } func TestSubscription_Close(t *testing.T) { - eb := NewEventBuffer() + eb := newEventBuffer() index := uint64(100) @@ -106,7 +106,7 @@ func TestSubscription_Close(t *testing.T) { Topic: Topic_ServiceHealth, Key: "test", } - sub := NewSubscription(ctx, req, startHead) + sub := newSubscription(ctx, req, startHead) // First call to sub.Next should return our published event immediately start := time.Now() @@ -128,14 +128,14 @@ func TestSubscription_Close(t *testing.T) { _, err = sub.Next() elapsed = time.Since(start) require.Error(t, err) - require.Equal(t, ErrSubscriptionReload, err) + require.Equal(t, ErrSubscriptionClosed, err) require.True(t, elapsed > 200*time.Millisecond, "Reload should have happened after blocking 200ms, took %s", elapsed) require.True(t, elapsed < 2*time.Second, "Reload should have been delivered after short time, took %s", elapsed) } -func publishTestEvent(index uint64, b *EventBuffer, key string) { +func publishTestEvent(index uint64, b *eventBuffer, key string) { // Don't care about the event payload for now just the semantics of publishing // something. This is not a valid stream in the end-to-end streaming protocol // but enough to test subscription mechanics.