mirror of https://github.com/status-im/consul.git
296 lines
8.3 KiB
Go
296 lines
8.3 KiB
Go
package token
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"crypto/subtle"
|
|
)
|
|
|
|
type TokenSource bool
|
|
|
|
const (
|
|
TokenSourceConfig TokenSource = false
|
|
TokenSourceAPI TokenSource = true
|
|
)
|
|
|
|
type TokenKind int
|
|
|
|
const (
|
|
TokenKindAgent TokenKind = iota
|
|
TokenKindAgentRecovery
|
|
TokenKindUser
|
|
TokenKindReplication
|
|
)
|
|
|
|
type watcher struct {
|
|
kind TokenKind
|
|
ch chan<- struct{}
|
|
}
|
|
|
|
// Notifier holds the channel used to notify a watcher
|
|
// of token updates as well as some internal tracking
|
|
// information to allow for deregistering the notifier.
|
|
type Notifier struct {
|
|
id int
|
|
Ch <-chan struct{}
|
|
}
|
|
|
|
// Store is used to hold the special ACL tokens used by Consul agents. It is
|
|
// designed to update the tokens on the fly, so the token store itself should be
|
|
// plumbed around and used to get tokens at runtime, don't save the resulting
|
|
// tokens.
|
|
type Store struct {
|
|
// l synchronizes access to the token store.
|
|
l sync.RWMutex
|
|
|
|
// userToken is passed along for requests when the user didn't supply a
|
|
// token, and may be left blank to use the anonymous token. This will
|
|
// also be used for agent operations if the agent token isn't set.
|
|
userToken string
|
|
|
|
// userTokenSource indicates where this token originated from
|
|
userTokenSource TokenSource
|
|
|
|
// agentToken is used for internal agent operations like self-registering
|
|
// with the catalog and anti-entropy, but should never be used for
|
|
// user-initiated operations.
|
|
agentToken string
|
|
|
|
// agentTokenSource indicates where this token originated from
|
|
agentTokenSource TokenSource
|
|
|
|
// agentRecoveryToken is a special token that's only used locally for
|
|
// access to the /v1/agent utility operations if the servers aren't
|
|
// available.
|
|
agentRecoveryToken string
|
|
|
|
// agentRecoveryTokenSource indicates where this token originated from
|
|
agentRecoveryTokenSource TokenSource
|
|
|
|
// replicationToken is a special token that's used by servers to
|
|
// replicate data from the primary datacenter.
|
|
replicationToken string
|
|
|
|
// replicationTokenSource indicates where this token originated from
|
|
replicationTokenSource TokenSource
|
|
|
|
watchers map[int]watcher
|
|
watcherIndex int
|
|
|
|
persistence *fileStore
|
|
// persistenceLock is used to synchronize access to the persisted token store
|
|
// within the data directory. This will prevent loading while writing as well as
|
|
// multiple concurrent writes.
|
|
persistenceLock sync.RWMutex
|
|
|
|
// enterpriseTokens contains tokens only used in consul-enterprise
|
|
enterpriseTokens
|
|
}
|
|
|
|
// Notify will set up a watch for when tokens of the desired kind is changed
|
|
func (t *Store) Notify(kind TokenKind) Notifier {
|
|
// buffering ensures that notifications aren't missed if the watcher
|
|
// isn't already in a select and that our notifications don't
|
|
// block returning from the Update* methods.
|
|
ch := make(chan struct{}, 1)
|
|
|
|
w := watcher{
|
|
kind: kind,
|
|
ch: ch,
|
|
}
|
|
|
|
t.l.Lock()
|
|
defer t.l.Unlock()
|
|
if t.watchers == nil {
|
|
t.watchers = make(map[int]watcher)
|
|
}
|
|
// we specifically want to avoid the zero-value to prevent accidental stop-notification requests
|
|
t.watcherIndex += 1
|
|
t.watchers[t.watcherIndex] = w
|
|
|
|
return Notifier{id: t.watcherIndex, Ch: ch}
|
|
}
|
|
|
|
// StopNotify stops the token store from sending notifications to the specified notifiers chan
|
|
func (t *Store) StopNotify(n Notifier) {
|
|
t.l.Lock()
|
|
defer t.l.Unlock()
|
|
delete(t.watchers, n.id)
|
|
}
|
|
|
|
// anyKindAllowed returns true if any of the kinds in the `check` list are
|
|
// set to be allowed in the `allowed` map.
|
|
//
|
|
// Note: this is mostly just a convenience to simplify the code in
|
|
// sendNotificationLocked and prevent more nested looping with breaks/continues
|
|
// and other state tracking.
|
|
func anyKindAllowed(allowed TokenKind, check []TokenKind) bool {
|
|
for _, kind := range check {
|
|
if allowed == kind {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// sendNotificationLocked will iterate through all watchers and notify them that a
|
|
// token they are watching has been updated.
|
|
//
|
|
// NOTE: this function explicitly does not attempt to send the kind or new token value
|
|
// along through the channel. With that approach watchers could potentially miss updates
|
|
// if the buffered chan fills up. Instead with this approach we just notify that any
|
|
// token they care about has been udpated and its up to the caller to retrieve the
|
|
// new value (after receiving from the chan). With this approach its entirely possible
|
|
// for the watcher to be notified twice before actually retrieving the token after the first
|
|
// read from the chan. This is better behavior than missing events. It can cause some
|
|
// churn temporarily but in common cases its not expected that these tokens would be updated
|
|
// frequently enough to cause this to happen.
|
|
func (t *Store) sendNotificationLocked(kinds ...TokenKind) {
|
|
for _, watcher := range t.watchers {
|
|
if !anyKindAllowed(watcher.kind, kinds) {
|
|
// ignore this watcher as it doesn't want events for these kinds of token
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case watcher.ch <- struct{}{}:
|
|
default:
|
|
// its already pending a notification
|
|
}
|
|
}
|
|
}
|
|
|
|
// UpdateUserToken replaces the current user token in the store.
|
|
// Returns true if it was changed.
|
|
func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
|
|
t.l.Lock()
|
|
changed := t.userToken != token || t.userTokenSource != source
|
|
t.userToken = token
|
|
t.userTokenSource = source
|
|
if changed {
|
|
t.sendNotificationLocked(TokenKindUser)
|
|
}
|
|
t.l.Unlock()
|
|
return changed
|
|
}
|
|
|
|
// UpdateAgentToken replaces the current agent token in the store.
|
|
// Returns true if it was changed.
|
|
func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
|
|
t.l.Lock()
|
|
changed := t.agentToken != token || t.agentTokenSource != source
|
|
t.agentToken = token
|
|
t.agentTokenSource = source
|
|
if changed {
|
|
t.sendNotificationLocked(TokenKindAgent)
|
|
}
|
|
t.l.Unlock()
|
|
return changed
|
|
}
|
|
|
|
// UpdateAgentRecoveryToken replaces the current agent recovery token in the store.
|
|
// Returns true if it was changed.
|
|
func (t *Store) UpdateAgentRecoveryToken(token string, source TokenSource) bool {
|
|
t.l.Lock()
|
|
changed := t.agentRecoveryToken != token || t.agentRecoveryTokenSource != source
|
|
t.agentRecoveryToken = token
|
|
t.agentRecoveryTokenSource = source
|
|
if changed {
|
|
t.sendNotificationLocked(TokenKindAgentRecovery)
|
|
}
|
|
t.l.Unlock()
|
|
return changed
|
|
}
|
|
|
|
// UpdateReplicationToken replaces the current replication token in the store.
|
|
// Returns true if it was changed.
|
|
func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
|
|
t.l.Lock()
|
|
changed := t.replicationToken != token || t.replicationTokenSource != source
|
|
t.replicationToken = token
|
|
t.replicationTokenSource = source
|
|
if changed {
|
|
t.sendNotificationLocked(TokenKindReplication)
|
|
}
|
|
t.l.Unlock()
|
|
return changed
|
|
}
|
|
|
|
// UserToken returns the best token to use for user operations.
|
|
func (t *Store) UserToken() string {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
return t.userToken
|
|
}
|
|
|
|
// AgentToken returns the best token to use for internal agent operations.
|
|
func (t *Store) AgentToken() string {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
if tok := t.enterpriseAgentToken(); tok != "" {
|
|
return tok
|
|
}
|
|
|
|
if t.agentToken != "" {
|
|
return t.agentToken
|
|
}
|
|
return t.userToken
|
|
}
|
|
|
|
func (t *Store) AgentRecoveryToken() string {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
return t.agentRecoveryToken
|
|
}
|
|
|
|
// ReplicationToken returns the replication token.
|
|
func (t *Store) ReplicationToken() string {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
return t.replicationToken
|
|
}
|
|
|
|
// UserToken returns the best token to use for user operations.
|
|
func (t *Store) UserTokenAndSource() (string, TokenSource) {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
return t.userToken, t.userTokenSource
|
|
}
|
|
|
|
// AgentToken returns the best token to use for internal agent operations.
|
|
func (t *Store) AgentTokenAndSource() (string, TokenSource) {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
return t.agentToken, t.agentTokenSource
|
|
}
|
|
|
|
func (t *Store) AgentRecoveryTokenAndSource() (string, TokenSource) {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
return t.agentRecoveryToken, t.agentRecoveryTokenSource
|
|
}
|
|
|
|
// ReplicationToken returns the replication token.
|
|
func (t *Store) ReplicationTokenAndSource() (string, TokenSource) {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
return t.replicationToken, t.replicationTokenSource
|
|
}
|
|
|
|
// IsAgentRecoveryToken checks to see if a given token is the agent recovery token.
|
|
// This will never match an empty token for safety.
|
|
func (t *Store) IsAgentRecoveryToken(token string) bool {
|
|
t.l.RLock()
|
|
defer t.l.RUnlock()
|
|
|
|
return (token != "") && (subtle.ConstantTimeCompare([]byte(token), []byte(t.agentRecoveryToken)) == 1)
|
|
}
|