diff --git a/agent/token/store.go b/agent/token/store.go index 1d83cfa7e2..56ab7d806a 100644 --- a/agent/token/store.go +++ b/agent/token/store.go @@ -13,6 +13,28 @@ const ( TokenSourceAPI TokenSource = true ) +type TokenKind int + +const ( + TokenKindAgent TokenKind = iota + TokenKindAgentMaster + TokenKindUser + TokenKindReplication +) + +type watcher struct { + kind TokenKind + ch chan<- struct{} +} + +// Notifier holds the channel used to notify a watcher +// of token updates as well as some internal tracking +// information to allow for deregistering the notifier. +type Notifier struct { + id int + Ch <-chan struct{} +} + // Store is used to hold the special ACL tokens used by Consul agents. It is // designed to update the tokens on the fly, so the token store itself should be // plumbed around and used to get tokens at runtime, don't save the resulting @@ -52,10 +74,86 @@ type Store struct { // replicationTokenSource indicates where this token originated from replicationTokenSource TokenSource + watchers map[int]watcher + watcherIndex int + // enterpriseTokens contains tokens only used in consul-enterprise enterpriseTokens } +// Notify will set up a watch for when tokens of the desired kind is changed +func (t *Store) Notify(kind TokenKind) Notifier { + // buffering ensures that notifications aren't missed if the watcher + // isn't already in a select and that our notifications don't + // block returning from the Update* methods. + ch := make(chan struct{}, 1) + + w := watcher{ + kind: kind, + ch: ch, + } + + t.l.Lock() + defer t.l.Unlock() + if t.watchers == nil { + t.watchers = make(map[int]watcher) + } + // we specifically want to avoid the zero-value to prevent accidental stop-notification requests + t.watcherIndex += 1 + t.watchers[t.watcherIndex] = w + + return Notifier{id: t.watcherIndex, Ch: ch} +} + +// StopNotify stops the token store from sending notifications to the specified notifiers chan +func (t *Store) StopNotify(n Notifier) { + t.l.Lock() + defer t.l.Unlock() + delete(t.watchers, n.id) +} + +// anyKindAllowed returns true if any of the kinds in the `check` list are +// set to be allowed in the `allowed` map. +// +// Note: this is mostly just a convenience to simplify the code in +// sendNotificationLocked and prevent more nested looping with breaks/continues +// and other state tracking. +func anyKindAllowed(allowed TokenKind, check []TokenKind) bool { + for _, kind := range check { + if allowed == kind { + return true + } + } + return false +} + +// sendNotificationLocked will iterate through all watchers and notify them that a +// token they are watching has been updated. +// +// NOTE: this function explicitly does not attempt to send the kind or new token value +// along through the channel. With that approach watchers could potentially miss updates +// if the buffered chan fills up. Instead with this approach we just notify that any +// token they care about has been udpated and its up to the caller to retrieve the +// new value (after receiving from the chan). With this approach its entirely possible +// for the watcher to be notified twice before actually retrieving the token after the first +// read from the chan. This is better behavior than missing events. It can cause some +// churn temporarily but in common cases its not expected that these tokens would be updated +// frequently enough to cause this to happen. +func (t *Store) sendNotificationLocked(kinds ...TokenKind) { + for _, watcher := range t.watchers { + if !anyKindAllowed(watcher.kind, kinds) { + // ignore this watcher as it doesn't want events for these kinds of token + continue + } + + select { + case watcher.ch <- struct{}{}: + default: + // its already pending a notification + } + } +} + // UpdateUserToken replaces the current user token in the store. // Returns true if it was changed. func (t *Store) UpdateUserToken(token string, source TokenSource) bool { @@ -63,6 +161,9 @@ func (t *Store) UpdateUserToken(token string, source TokenSource) bool { changed := (t.userToken != token || t.userTokenSource != source) t.userToken = token t.userTokenSource = source + if changed { + t.sendNotificationLocked(TokenKindUser) + } t.l.Unlock() return changed } @@ -74,6 +175,9 @@ func (t *Store) UpdateAgentToken(token string, source TokenSource) bool { changed := (t.agentToken != token || t.agentTokenSource != source) t.agentToken = token t.agentTokenSource = source + if changed { + t.sendNotificationLocked(TokenKindAgent) + } t.l.Unlock() return changed } @@ -85,6 +189,9 @@ func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool { changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source) t.agentMasterToken = token t.agentMasterTokenSource = source + if changed { + t.sendNotificationLocked(TokenKindAgentMaster) + } t.l.Unlock() return changed } @@ -96,6 +203,9 @@ func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool { changed := (t.replicationToken != token || t.replicationTokenSource != source) t.replicationToken = token t.replicationTokenSource = source + if changed { + t.sendNotificationLocked(TokenKindReplication) + } t.l.Unlock() return changed } diff --git a/agent/token/store_test.go b/agent/token/store_test.go index 7470b04467..f46fcc3a98 100644 --- a/agent/token/store_test.go +++ b/agent/token/store_test.go @@ -150,3 +150,94 @@ func TestStore_AgentMasterToken(t *testing.T) { s.UpdateAgentMasterToken("", TokenSourceConfig) verify(false, "", "nope", "master", "another") } + +func TestStore_Notify(t *testing.T) { + t.Parallel() + s := new(Store) + + newNotification := func(t *testing.T, s *Store, kind TokenKind) Notifier { + n := s.Notify(kind) + require.NotNil(t, n.Ch) + return n + } + + requireNotNotified := func(t *testing.T, ch <-chan struct{}) { + require.Empty(t, ch) + } + + requireNotifiedOnce := func(t *testing.T, ch <-chan struct{}) { + require.Len(t, ch, 1) + // drain the channel + <-ch + // just to be safe + require.Empty(t, ch) + } + + agentNotifier := newNotification(t, s, TokenKindAgent) + userNotifier := newNotification(t, s, TokenKindUser) + agentMasterNotifier := newNotification(t, s, TokenKindAgentMaster) + replicationNotifier := newNotification(t, s, TokenKindReplication) + replicationNotifier2 := newNotification(t, s, TokenKindReplication) + + // perform an update of the user token + require.True(t, s.UpdateUserToken("edcae2a2-3b51-4864-b412-c7a568f49cb1", TokenSourceConfig)) + // do it again to ensure it doesn't block even though nothing has read from the 1 buffered chan yet + require.True(t, s.UpdateUserToken("47788919-f944-476a-bda5-446d64be1df8", TokenSourceAPI)) + + // ensure notifications were sent to the user and all notifiers + requireNotNotified(t, agentNotifier.Ch) + requireNotifiedOnce(t, userNotifier.Ch) + requireNotNotified(t, replicationNotifier.Ch) + requireNotNotified(t, agentMasterNotifier.Ch) + requireNotNotified(t, replicationNotifier2.Ch) + + // now update the agent token which should send notificaitons to the agent and all notifier + require.True(t, s.UpdateAgentToken("5d748ec2-d536-461f-8e2a-1f7eae98d559", TokenSourceAPI)) + + requireNotifiedOnce(t, agentNotifier.Ch) + requireNotNotified(t, userNotifier.Ch) + requireNotNotified(t, replicationNotifier.Ch) + requireNotNotified(t, agentMasterNotifier.Ch) + requireNotNotified(t, replicationNotifier2.Ch) + + // now update the agent master token which should send notificaitons to the agent master and all notifier + require.True(t, s.UpdateAgentMasterToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI)) + + requireNotNotified(t, agentNotifier.Ch) + requireNotNotified(t, userNotifier.Ch) + requireNotNotified(t, replicationNotifier.Ch) + requireNotifiedOnce(t, agentMasterNotifier.Ch) + requireNotNotified(t, replicationNotifier2.Ch) + + // now update the replication token which should send notificaitons to the replication and all notifier + require.True(t, s.UpdateReplicationToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI)) + + requireNotNotified(t, agentNotifier.Ch) + requireNotNotified(t, userNotifier.Ch) + requireNotifiedOnce(t, replicationNotifier.Ch) + requireNotNotified(t, agentMasterNotifier.Ch) + requireNotifiedOnce(t, replicationNotifier2.Ch) + + s.StopNotify(replicationNotifier2) + + // now update the replication token which should send notificaitons to the replication and all notifier + require.True(t, s.UpdateReplicationToken("eb0b56b9-fa65-4ae1-902a-c64457c62ac6", TokenSourceAPI)) + + requireNotNotified(t, agentNotifier.Ch) + requireNotNotified(t, userNotifier.Ch) + requireNotifiedOnce(t, replicationNotifier.Ch) + requireNotNotified(t, agentMasterNotifier.Ch) + requireNotNotified(t, replicationNotifier2.Ch) + + // request updates but that are not changes + require.False(t, s.UpdateAgentToken("5d748ec2-d536-461f-8e2a-1f7eae98d559", TokenSourceAPI)) + require.False(t, s.UpdateAgentMasterToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI)) + require.False(t, s.UpdateUserToken("47788919-f944-476a-bda5-446d64be1df8", TokenSourceAPI)) + require.False(t, s.UpdateReplicationToken("eb0b56b9-fa65-4ae1-902a-c64457c62ac6", TokenSourceAPI)) + + // ensure that notifications were not sent + requireNotNotified(t, agentNotifier.Ch) + requireNotNotified(t, userNotifier.Ch) + requireNotNotified(t, replicationNotifier.Ch) + requireNotNotified(t, agentMasterNotifier.Ch) +}