stream.Subscription unexport fields and additiona docstrings

This commit is contained in:
Daniel Nephin 2020-06-19 16:34:50 -04:00
parent 17b833b4c9
commit a709ed1ab5
3 changed files with 23 additions and 21 deletions

View File

@ -176,7 +176,7 @@ func (s *subscriptions) handleACLUpdate(tx ReadTxn, event stream.Event) error {
case stream.Topic_ACLTokens: case stream.Topic_ACLTokens:
token := event.Payload.(*structs.ACLToken) token := event.Payload.(*structs.ACLToken)
for _, sub := range s.byToken[token.SecretID] { for _, sub := range s.byToken[token.SecretID] {
sub.ForceReload() sub.Close()
} }
case stream.Topic_ACLPolicies: case stream.Topic_ACLPolicies:
@ -220,7 +220,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokens memdb.ResultIterator)
token := token.(*structs.ACLToken) token := token.(*structs.ACLToken)
if subs, ok := s.byToken[token.SecretID]; ok { if subs, ok := s.byToken[token.SecretID]; ok {
for _, sub := range subs { for _, sub := range subs {
sub.ForceReload() sub.Close()
} }
} }
} }

View File

@ -1,18 +1,20 @@
package stream package stream
import ( import (
context "context" "context"
"errors" "errors"
"sync/atomic" "sync/atomic"
) )
const ( const (
// SubscriptionStateOpen is the default state of a subscription // subscriptionStateOpen is the default state of a subscription. An open
SubscriptionStateOpen uint32 = 0 // subscription may receive new events.
subscriptionStateOpen uint32 = 0
// SubscriptionStateCloseReload signals that the subscription was closed by // subscriptionStateClosed indicates that the subscription was closed, possibly
// server and client should retry. // as a result of a change to an ACL token, and will not receive new events.
SubscriptionStateCloseReload uint32 = 1 // The subscriber must issue a new Subscribe request.
subscriptionStateClosed uint32 = 1
) )
var ( var (
@ -48,6 +50,8 @@ type Subscription struct {
Unsubscribe func() Unsubscribe func()
} }
// SubscribeRequest identifies the types of events the subscriber would like to
// receiver. Topic and Token are required.
type SubscribeRequest struct { type SubscribeRequest struct {
Topic Topic Topic Topic
Key string Key string
@ -55,7 +59,8 @@ type SubscribeRequest struct {
Index uint64 Index uint64
} }
// NewSubscription return a new subscription. // NewSubscription return a new subscription. The caller is responsible for
// calling Unsubscribe when it is done with the subscription, to free resources.
func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferItem) *Subscription { func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferItem) *Subscription {
subCtx, cancel := context.WithCancel(ctx) subCtx, cancel := context.WithCancel(ctx)
return &Subscription{ return &Subscription{
@ -69,8 +74,7 @@ func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferIte
// Next returns the next set of events to deliver. It must only be called from a // Next returns the next set of events to deliver. It must only be called from a
// single goroutine concurrently as it mutates the Subscription. // single goroutine concurrently as it mutates the Subscription.
func (s *Subscription) Next() ([]Event, error) { func (s *Subscription) Next() ([]Event, error) {
state := atomic.LoadUint32(&s.state) if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
if state == SubscriptionStateCloseReload {
return nil, ErrSubscriptionReload return nil, ErrSubscriptionReload
} }
@ -78,8 +82,7 @@ func (s *Subscription) Next() ([]Event, error) {
next, err := s.currentItem.Next(s.ctx) next, err := s.currentItem.Next(s.ctx)
if err != nil { if err != nil {
// Check we didn't return because of a state change cancelling the context // Check we didn't return because of a state change cancelling the context
state := atomic.LoadUint32(&s.state) if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
if state == SubscriptionStateCloseReload {
return nil, ErrSubscriptionReload return nil, ErrSubscriptionReload
} }
return nil, err return nil, err
@ -120,12 +123,11 @@ func (s *Subscription) Next() ([]Event, error) {
} }
} }
// ForceReload closes the stream and signals that the subscriber should reload. // Close the subscription. Subscribers will receive an error when they call Next,
// and will need to perform a new Subscribe request.
// It is safe to call from any goroutine. // It is safe to call from any goroutine.
func (s *Subscription) ForceReload() { func (s *Subscription) Close() {
swapped := atomic.CompareAndSwapUint32(&s.state, SubscriptionStateOpen, swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed)
SubscriptionStateCloseReload)
if swapped { if swapped {
s.cancelFn() s.cancelFn()
} }

View File

@ -88,7 +88,7 @@ func TestSubscription(t *testing.T) {
"Event should have been delivered after short time, took %s", elapsed) "Event should have been delivered after short time, took %s", elapsed)
} }
func TestSubscriptionCloseReload(t *testing.T) { func TestSubscription_Close(t *testing.T) {
eb := NewEventBuffer() eb := NewEventBuffer()
index := uint64(100) index := uint64(100)
@ -118,11 +118,11 @@ func TestSubscriptionCloseReload(t *testing.T) {
require.Len(t, got, 1) require.Len(t, got, 1)
require.Equal(t, index, got[0].Index) require.Equal(t, index, got[0].Index)
// Schedule a ForceReload simulating the server deciding this subscroption // Schedule a Close simulating the server deciding this subscroption
// needs to reset (e.g. on ACL perm change). // needs to reset (e.g. on ACL perm change).
start = time.Now() start = time.Now()
time.AfterFunc(200*time.Millisecond, func() { time.AfterFunc(200*time.Millisecond, func() {
sub.ForceReload() sub.Close()
}) })
_, err = sub.Next() _, err = sub.Next()