EventPublisher: handleACL changes synchronously

Use a separate lock for subscriptions.ByToken to allow it to happen synchronously
in the commit flow.
This removes the need to create a new txn for the goroutine, and removes
the need for EventPublisher to contain a reference to DB.
This commit is contained in:
Daniel Nephin 2020-06-18 18:11:42 -04:00
parent effab15131
commit 1622bb3a45
1 changed files with 68 additions and 68 deletions

View File

@ -29,7 +29,7 @@ type EventPublisher struct {
// seconds. // seconds.
snapCacheTTL time.Duration snapCacheTTL time.Duration
// This lock protects the topicBuffers, snapCache and subsByToken maps. // This lock protects the topicBuffers, and snapCache
lock sync.RWMutex lock sync.RWMutex
// topicBuffers stores the head of the linked-list buffer to publish events to // topicBuffers stores the head of the linked-list buffer to publish events to
@ -40,10 +40,7 @@ type EventPublisher struct {
// TODO: new struct for snapCache and snapFns and snapCacheTTL // TODO: new struct for snapCache and snapFns and snapCacheTTL
snapCache map[stream.Topic]map[string]*stream.EventSnapshot snapCache map[stream.Topic]map[string]*stream.EventSnapshot
// subsByToken stores a list of Subscription objects outstanding indexed by a subscriptions *subscriptions
// hash of the ACL token they used to subscribe so we can reload them if their
// ACL permissions change.
subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription
// publishCh is used to send messages from an active txn to a goroutine which // publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from // publishes events, so that publishing can happen asynchronously from
@ -53,8 +50,16 @@ type EventPublisher struct {
handlers map[stream.Topic]topicHandler handlers map[stream.Topic]topicHandler
} }
type subscriptions struct {
lock sync.RWMutex
// subsByToken stores a list of Subscription objects outstanding indexed by a
// hash of the ACL token they used to subscribe so we can reload them if their
// ACL permissions change.
subsByToken map[string]map[*stream.SubscribeRequest]*stream.Subscription
}
type commitUpdate struct { type commitUpdate struct {
tx ReadTxn
events []stream.Event events []stream.Event
} }
@ -64,6 +69,9 @@ func NewEventPublisher(handlers map[stream.Topic]topicHandler, snapCacheTTL time
topicBuffers: make(map[stream.Topic]*stream.EventBuffer), topicBuffers: make(map[stream.Topic]*stream.EventBuffer),
snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot), snapCache: make(map[stream.Topic]map[string]*stream.EventSnapshot),
publishCh: make(chan commitUpdate, 64), publishCh: make(chan commitUpdate, 64),
subscriptions: &subscriptions{
subsByToken: make(map[string]map[*stream.SubscribeRequest]*stream.Subscription),
},
handlers: handlers, handlers: handlers,
} }
@ -83,16 +91,29 @@ func (e *EventPublisher) PublishChanges(tx *txn, changes memdb.Changes) error {
events = append(events, es...) events = append(events, es...)
} }
} }
e.publishCh <- commitUpdate{
// TODO: document why it must be created here, and not in the new thread for _, event := range events {
// // If the event is an ACL update, treat it as a special case. Currently
// Create a new transaction since it's going to be used from a different // ACL update events are only used internally to recognize when a subscriber
// thread. Transactions aren't thread safe but it's OK to create it here // should reload its subscription.
// since we won't try to use it in this thread and pass it straight to the if event.Topic == stream.Topic_ACLTokens ||
// handler which will own it exclusively. event.Topic == stream.Topic_ACLPolicies ||
tx: e.db.Txn(false), event.Topic == stream.Topic_ACLRoles {
events: events,
if err := e.subscriptions.handleACLUpdate(tx, event); err != nil {
// This seems pretty drastic? What would be better. It's not super safe
// to continue since we might have missed some ACL update and so leak
// data to unauthorized clients but crashing whole server also seems
// bad. I wonder if we could send a "reset" to all subscribers instead
// and effectively re-start all subscriptions to be on the safe side
// without just crashing?
// TODO(banks): reset all instead of panic?
panic(err)
} }
}
}
e.publishCh <- commitUpdate{events: events}
return nil return nil
} }
@ -106,43 +127,13 @@ func (e *EventPublisher) handleUpdates() {
// sendEvents sends the given events to any applicable topic listeners, as well // sendEvents sends the given events to any applicable topic listeners, as well
// as any ACL update events to cause affected listeners to reset their stream. // as any ACL update events to cause affected listeners to reset their stream.
func (e *EventPublisher) sendEvents(update commitUpdate) { func (e *EventPublisher) sendEvents(update commitUpdate) {
e.lock.Lock()
defer e.lock.Unlock()
// Always abort the transaction. This is not strictly necessary with memDB
// because once we drop the reference to the Txn object, the radix nodes will
// be GCed anyway but it's hygienic incase memDB ever has a different
// implementation.
defer update.tx.Abort()
eventsByTopic := make(map[stream.Topic][]stream.Event) eventsByTopic := make(map[stream.Topic][]stream.Event)
for _, event := range update.events { for _, event := range update.events {
// If the event is an ACL update, treat it as a special case. Currently
// ACL update events are only used internally to recognize when a subscriber
// should reload its subscription.
if event.Topic == stream.Topic_ACLTokens ||
event.Topic == stream.Topic_ACLPolicies ||
event.Topic == stream.Topic_ACLRoles {
if err := e.handleACLUpdate(update.tx, event); err != nil {
// This seems pretty drastic? What would be better. It's not super safe
// to continue since we might have missed some ACL update and so leak
// data to unauthorized clients but crashing whole server also seems
// bad. I wonder if we could send a "reset" to all subscribers instead
// and effectively re-start all subscriptions to be on the safe side
// without just crashing?
// TODO(banks): reset all instead of panic?
panic(err)
}
continue
}
// Split events by topic to deliver.
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event) eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
} }
e.lock.Lock()
defer e.lock.Unlock()
for topic, events := range eventsByTopic { for topic, events := range eventsByTopic {
e.getTopicBuffer(topic).Append(events) e.getTopicBuffer(topic).Append(events)
} }
@ -161,13 +152,15 @@ func (e *EventPublisher) getTopicBuffer(topic stream.Topic) *stream.EventBuffer
return buf return buf
} }
// handleACLUpdate handles an ACL token/policy/role update. This method assumes // handleACLUpdate handles an ACL token/policy/role update.
// the lock is held. func (s *subscriptions) handleACLUpdate(tx ReadTxn, event stream.Event) error {
func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error { s.lock.RLock()
defer s.lock.RUnlock()
switch event.Topic { switch event.Topic {
case stream.Topic_ACLTokens: case stream.Topic_ACLTokens:
token := event.Payload.(*structs.ACLToken) token := event.Payload.(*structs.ACLToken)
for _, sub := range e.subsByToken[token.SecretID] { for _, sub := range s.subsByToken[token.SecretID] {
sub.CloseReload() sub.CloseReload()
} }
@ -177,7 +170,7 @@ func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error {
if err != nil { if err != nil {
return err return err
} }
e.closeSubscriptionsForTokens(tokens) s.closeSubscriptionsForTokens(tokens)
// Find any roles using this policy so tokens with those roles can be reloaded. // Find any roles using this policy so tokens with those roles can be reloaded.
roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta) roles, err := aclRoleListByPolicy(tx, policy.ID, &policy.EnterpriseMeta)
@ -191,7 +184,7 @@ func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error {
if err != nil { if err != nil {
return err return err
} }
e.closeSubscriptionsForTokens(tokens) s.closeSubscriptionsForTokens(tokens)
} }
case stream.Topic_ACLRoles: case stream.Topic_ACLRoles:
@ -200,17 +193,17 @@ func (e *EventPublisher) handleACLUpdate(tx ReadTxn, event stream.Event) error {
if err != nil { if err != nil {
return err return err
} }
e.closeSubscriptionsForTokens(tokens) s.closeSubscriptionsForTokens(tokens)
} }
return nil return nil
} }
// This method requires the EventPublisher.lock is held // This method requires the EventPublisher.lock is held
func (e *EventPublisher) closeSubscriptionsForTokens(tokens memdb.ResultIterator) { func (s *subscriptions) closeSubscriptionsForTokens(tokens memdb.ResultIterator) {
for token := tokens.Next(); token != nil; token = tokens.Next() { for token := tokens.Next(); token != nil; token = tokens.Next() {
token := token.(*structs.ACLToken) token := token.(*structs.ACLToken)
if subs, ok := e.subsByToken[token.SecretID]; ok { if subs, ok := s.subsByToken[token.SecretID]; ok {
for _, sub := range subs { for _, sub := range subs {
sub.CloseReload() sub.CloseReload()
} }
@ -284,30 +277,37 @@ func (e *EventPublisher) Subscribe(
sub = stream.NewSubscription(ctx, req, snap.Snap) sub = stream.NewSubscription(ctx, req, snap.Snap)
} }
subsByToken, ok := e.subsByToken[req.Token] e.subscriptions.add(req, sub)
if !ok {
subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription)
e.subsByToken[req.Token] = subsByToken
}
subsByToken[req] = sub
return sub, nil return sub, nil
} }
func (s *subscriptions) add(req *stream.SubscribeRequest, sub *stream.Subscription) {
s.lock.Lock()
defer s.lock.Unlock()
subsByToken, ok := s.subsByToken[req.Token]
if !ok {
subsByToken = make(map[*stream.SubscribeRequest]*stream.Subscription)
s.subsByToken[req.Token] = subsByToken
}
subsByToken[req] = sub
}
// Unsubscribe must be called when a client is no longer interested in a // Unsubscribe must be called when a client is no longer interested in a
// subscription to free resources monitoring changes in it's ACL token. The same // subscription to free resources monitoring changes in it's ACL token. The same
// request object passed to Subscribe must be used. // request object passed to Subscribe must be used.
func (e *EventPublisher) Unsubscribe(req *stream.SubscribeRequest) { func (s *subscriptions) Unsubscribe(req *stream.SubscribeRequest) {
e.lock.Lock() s.lock.Lock()
defer e.lock.Unlock() defer s.lock.Unlock()
subsByToken, ok := e.subsByToken[req.Token] subsByToken, ok := s.subsByToken[req.Token]
if !ok { if !ok {
return return
} }
delete(subsByToken, req) delete(subsByToken, req)
if len(subsByToken) == 0 { if len(subsByToken) == 0 {
delete(e.subsByToken, req.Token) delete(s.subsByToken, req.Token)
} }
} }