mirror of https://github.com/status-im/consul.git
agent/token: Move token persistence out of agent
And into token.Store. This change isolates any awareness of token persistence in a single place. It is a small step in allowing Agent.New to accept its dependencies.
This commit is contained in:
parent
a80de898ea
commit
330be5b740
|
@ -184,7 +184,9 @@ func TestACL_AgentMasterToken(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
a := NewTestACLAgent(t, t.Name(), TestACLConfig(), nil, nil)
|
||||
a.loadTokens(a.config)
|
||||
err := a.tokens.Load(a.config.ACLTokens, a.logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
authz, err := a.resolveToken("towel")
|
||||
require.NotNil(t, authz)
|
||||
require.Nil(t, err)
|
||||
|
|
103
agent/agent.go
103
agent/agent.go
|
@ -19,6 +19,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/dns"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
|
@ -39,7 +40,6 @@ import (
|
|||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/systemd"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/agent/xds"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/api/watch"
|
||||
|
@ -67,9 +67,6 @@ const (
|
|||
checksDir = "checks"
|
||||
checkStateDir = "checks/state"
|
||||
|
||||
// Name of the file tokens will be persisted within
|
||||
tokensPath = "acl-tokens.json"
|
||||
|
||||
// Default reasons for node/service maintenance mode
|
||||
defaultNodeMaintReason = "Maintenance mode is enabled for this node, " +
|
||||
"but no reason was provided. This is a default message."
|
||||
|
@ -292,11 +289,6 @@ type Agent struct {
|
|||
// based on the current consul configuration.
|
||||
tlsConfigurator *tlsutil.Configurator
|
||||
|
||||
// persistedTokensLock is used to synchronize access to the persisted token
|
||||
// store within the data directory. This will prevent loading while writing as
|
||||
// well as multiple concurrent writes.
|
||||
persistedTokensLock sync.RWMutex
|
||||
|
||||
// httpConnLimiter is used to limit connections to the HTTP server by client
|
||||
// IP.
|
||||
httpConnLimiter connlimit.Limiter
|
||||
|
@ -370,10 +362,8 @@ func New(bd BaseDeps) (*Agent, error) {
|
|||
// pass the agent itself so its safe to move here.
|
||||
a.registerCache()
|
||||
|
||||
// TODO: move to newBaseDeps
|
||||
// TODO: handle error
|
||||
a.loadTokens(a.config)
|
||||
a.loadEnterpriseTokens(a.config)
|
||||
// TODO: why do we ignore failure to load persisted tokens?
|
||||
_ = a.tokens.Load(bd.RuntimeConfig.ACLTokens, a.logger)
|
||||
|
||||
return &a, nil
|
||||
}
|
||||
|
@ -3387,90 +3377,6 @@ func (a *Agent) unloadChecks() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type persistedTokens struct {
|
||||
Replication string `json:"replication,omitempty"`
|
||||
AgentMaster string `json:"agent_master,omitempty"`
|
||||
Default string `json:"default,omitempty"`
|
||||
Agent string `json:"agent,omitempty"`
|
||||
}
|
||||
|
||||
func (a *Agent) getPersistedTokens() (*persistedTokens, error) {
|
||||
persistedTokens := &persistedTokens{}
|
||||
if !a.config.ACLEnableTokenPersistence {
|
||||
return persistedTokens, nil
|
||||
}
|
||||
|
||||
a.persistedTokensLock.RLock()
|
||||
defer a.persistedTokensLock.RUnlock()
|
||||
|
||||
tokensFullPath := filepath.Join(a.config.DataDir, tokensPath)
|
||||
|
||||
buf, err := ioutil.ReadFile(tokensFullPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
// non-existence is not an error we care about
|
||||
return persistedTokens, nil
|
||||
}
|
||||
return persistedTokens, fmt.Errorf("failed reading tokens file %q: %s", tokensFullPath, err)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(buf, persistedTokens); err != nil {
|
||||
return persistedTokens, fmt.Errorf("failed to decode tokens file %q: %s", tokensFullPath, err)
|
||||
}
|
||||
|
||||
return persistedTokens, nil
|
||||
}
|
||||
|
||||
func (a *Agent) loadTokens(conf *config.RuntimeConfig) error {
|
||||
persistedTokens, persistenceErr := a.getPersistedTokens()
|
||||
|
||||
if persistenceErr != nil {
|
||||
a.logger.Warn("unable to load persisted tokens", "error", persistenceErr)
|
||||
}
|
||||
|
||||
if persistedTokens.Default != "" {
|
||||
a.tokens.UpdateUserToken(persistedTokens.Default, token.TokenSourceAPI)
|
||||
|
||||
if conf.ACLToken != "" {
|
||||
a.logger.Warn("\"default\" token present in both the configuration and persisted token store, using the persisted token")
|
||||
}
|
||||
} else {
|
||||
a.tokens.UpdateUserToken(conf.ACLToken, token.TokenSourceConfig)
|
||||
}
|
||||
|
||||
if persistedTokens.Agent != "" {
|
||||
a.tokens.UpdateAgentToken(persistedTokens.Agent, token.TokenSourceAPI)
|
||||
|
||||
if conf.ACLAgentToken != "" {
|
||||
a.logger.Warn("\"agent\" token present in both the configuration and persisted token store, using the persisted token")
|
||||
}
|
||||
} else {
|
||||
a.tokens.UpdateAgentToken(conf.ACLAgentToken, token.TokenSourceConfig)
|
||||
}
|
||||
|
||||
if persistedTokens.AgentMaster != "" {
|
||||
a.tokens.UpdateAgentMasterToken(persistedTokens.AgentMaster, token.TokenSourceAPI)
|
||||
|
||||
if conf.ACLAgentMasterToken != "" {
|
||||
a.logger.Warn("\"agent_master\" token present in both the configuration and persisted token store, using the persisted token")
|
||||
}
|
||||
} else {
|
||||
a.tokens.UpdateAgentMasterToken(conf.ACLAgentMasterToken, token.TokenSourceConfig)
|
||||
}
|
||||
|
||||
if persistedTokens.Replication != "" {
|
||||
a.tokens.UpdateReplicationToken(persistedTokens.Replication, token.TokenSourceAPI)
|
||||
|
||||
if conf.ACLReplicationToken != "" {
|
||||
a.logger.Warn("\"replication\" token present in both the configuration and persisted token store, using the persisted token")
|
||||
}
|
||||
} else {
|
||||
a.tokens.UpdateReplicationToken(conf.ACLReplicationToken, token.TokenSourceConfig)
|
||||
}
|
||||
|
||||
return persistenceErr
|
||||
}
|
||||
|
||||
// snapshotCheckState is used to snapshot the current state of the health
|
||||
// checks. This is done before we reload our checks, so that we can properly
|
||||
// restore into the same state.
|
||||
|
@ -3650,8 +3556,7 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
|
|||
// Reload tokens - should be done before all the other loading
|
||||
// to ensure the correct tokens are available for attaching to
|
||||
// the checks and service registrations.
|
||||
a.loadTokens(newCfg)
|
||||
a.loadEnterpriseTokens(newCfg)
|
||||
a.tokens.Load(newCfg.ACLTokens, a.logger)
|
||||
|
||||
if err := a.tlsConfigurator.Update(newCfg.ToTLSUtilConfig()); err != nil {
|
||||
return fmt.Errorf("Failed reloading tls configuration: %s", err)
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
|
@ -21,7 +19,6 @@ import (
|
|||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/file"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/logging/monitor"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
@ -1233,79 +1230,42 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
if s.agent.config.ACLEnableTokenPersistence {
|
||||
// we hold the lock around updating the internal token store
|
||||
// as well as persisting the tokens because we don't want to write
|
||||
// into the store to have something else wipe it out before we can
|
||||
// persist everything (like an agent config reload). The token store
|
||||
// lock is only held for those operations so other go routines that
|
||||
// just need to read some token out of the store will not be impacted
|
||||
// any more than they would be without token persistence.
|
||||
s.agent.persistedTokensLock.Lock()
|
||||
defer s.agent.persistedTokensLock.Unlock()
|
||||
}
|
||||
|
||||
// Figure out the target token.
|
||||
target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/")
|
||||
triggerAntiEntropySync := false
|
||||
switch target {
|
||||
case "acl_token", "default":
|
||||
changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
|
||||
if changed {
|
||||
triggerAntiEntropySync = true
|
||||
|
||||
err = s.agent.tokens.WithPersistenceLock(func() error {
|
||||
triggerAntiEntropySync := false
|
||||
switch target {
|
||||
case "acl_token", "default":
|
||||
changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
|
||||
if changed {
|
||||
triggerAntiEntropySync = true
|
||||
}
|
||||
|
||||
case "acl_agent_token", "agent":
|
||||
changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
|
||||
if changed {
|
||||
triggerAntiEntropySync = true
|
||||
}
|
||||
|
||||
case "acl_agent_master_token", "agent_master":
|
||||
s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI)
|
||||
|
||||
case "acl_replication_token", "replication":
|
||||
s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI)
|
||||
|
||||
default:
|
||||
return NotFoundError{Reason: fmt.Sprintf("Token %q is unknown", target)}
|
||||
}
|
||||
|
||||
case "acl_agent_token", "agent":
|
||||
changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
|
||||
if changed {
|
||||
triggerAntiEntropySync = true
|
||||
}
|
||||
|
||||
case "acl_agent_master_token", "agent_master":
|
||||
s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI)
|
||||
|
||||
case "acl_replication_token", "replication":
|
||||
s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI)
|
||||
|
||||
default:
|
||||
resp.WriteHeader(http.StatusNotFound)
|
||||
fmt.Fprintf(resp, "Token %q is unknown", target)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if triggerAntiEntropySync {
|
||||
s.agent.sync.SyncFull.Trigger()
|
||||
}
|
||||
|
||||
if s.agent.config.ACLEnableTokenPersistence {
|
||||
tokens := persistedTokens{}
|
||||
|
||||
if tok, source := s.agent.tokens.UserTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
|
||||
tokens.Default = tok
|
||||
}
|
||||
|
||||
if tok, source := s.agent.tokens.AgentTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
|
||||
tokens.Agent = tok
|
||||
}
|
||||
|
||||
if tok, source := s.agent.tokens.AgentMasterTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
|
||||
tokens.AgentMaster = tok
|
||||
}
|
||||
|
||||
if tok, source := s.agent.tokens.ReplicationTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
|
||||
tokens.Replication = tok
|
||||
}
|
||||
|
||||
data, err := json.Marshal(tokens)
|
||||
if err != nil {
|
||||
s.agent.logger.Warn("failed to persist tokens", "error", err)
|
||||
return nil, fmt.Errorf("Failed to marshal tokens for persistence: %v", err)
|
||||
}
|
||||
|
||||
if err := file.WriteAtomicWithPerms(filepath.Join(s.agent.config.DataDir, tokensPath), data, 0700, 0600); err != nil {
|
||||
s.agent.logger.Warn("failed to persist tokens", "error", err)
|
||||
return nil, fmt.Errorf("Failed to persist tokens - %v", err)
|
||||
// TODO: is it safe to move this out of WithPersistenceLock?
|
||||
if triggerAntiEntropySync {
|
||||
s.agent.sync.SyncFull.Trigger()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.agent.logger.Info("Updated agent's ACL token", "token", target)
|
||||
|
|
|
@ -4774,13 +4774,14 @@ func TestAgent_Token(t *testing.T) {
|
|||
init tokens
|
||||
raw tokens
|
||||
effective tokens
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "bad token name",
|
||||
method: "PUT",
|
||||
url: "nope?token=root",
|
||||
body: body("X"),
|
||||
code: http.StatusNotFound,
|
||||
name: "bad token name",
|
||||
method: "PUT",
|
||||
url: "nope?token=root",
|
||||
body: body("X"),
|
||||
expectedErr: NotFoundError{Reason: `Token "nope" is unknown`},
|
||||
},
|
||||
{
|
||||
name: "bad JSON",
|
||||
|
@ -4942,7 +4943,12 @@ func TestAgent_Token(t *testing.T) {
|
|||
url := fmt.Sprintf("/v1/agent/token/%s", tt.url)
|
||||
resp := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(tt.method, url, tt.body)
|
||||
|
||||
_, err := a.srv.AgentToken(resp, req)
|
||||
if tt.expectedErr != nil {
|
||||
require.Equal(t, tt.expectedErr, err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.code, resp.Code)
|
||||
require.Equal(t, tt.effective.user, a.tokens.UserToken())
|
||||
|
|
|
@ -23,10 +23,6 @@ func (a *Agent) initEnterprise(consulCfg *consul.Config) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// loadEnterpriseTokens is a noop stub for the func defined agent_ent.go
|
||||
func (a *Agent) loadEnterpriseTokens(conf *config.RuntimeConfig) {
|
||||
}
|
||||
|
||||
// reloadEnterprise is a noop stub for the func defined agent_ent.go
|
||||
func (a *Agent) reloadEnterprise(conf *config.RuntimeConfig) error {
|
||||
return nil
|
||||
|
|
|
@ -3345,163 +3345,6 @@ func TestAgent_reloadWatchesHTTPS(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAgent_loadTokens(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t, `
|
||||
acl = {
|
||||
enabled = true
|
||||
tokens = {
|
||||
agent = "alfa"
|
||||
agent_master = "bravo",
|
||||
default = "charlie"
|
||||
replication = "delta"
|
||||
}
|
||||
}
|
||||
|
||||
`)
|
||||
defer a.Shutdown()
|
||||
require := require.New(t)
|
||||
|
||||
tokensFullPath := filepath.Join(a.config.DataDir, tokensPath)
|
||||
|
||||
t.Run("original-configuration", func(t *testing.T) {
|
||||
require.Equal("alfa", a.tokens.AgentToken())
|
||||
require.Equal("bravo", a.tokens.AgentMasterToken())
|
||||
require.Equal("charlie", a.tokens.UserToken())
|
||||
require.Equal("delta", a.tokens.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("updated-configuration", func(t *testing.T) {
|
||||
cfg := &config.RuntimeConfig{
|
||||
ACLToken: "echo",
|
||||
ACLAgentToken: "foxtrot",
|
||||
ACLAgentMasterToken: "golf",
|
||||
ACLReplicationToken: "hotel",
|
||||
}
|
||||
// ensures no error for missing persisted tokens file
|
||||
require.NoError(a.loadTokens(cfg))
|
||||
require.Equal("echo", a.tokens.UserToken())
|
||||
require.Equal("foxtrot", a.tokens.AgentToken())
|
||||
require.Equal("golf", a.tokens.AgentMasterToken())
|
||||
require.Equal("hotel", a.tokens.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("persisted-tokens", func(t *testing.T) {
|
||||
cfg := &config.RuntimeConfig{
|
||||
ACLToken: "echo",
|
||||
ACLAgentToken: "foxtrot",
|
||||
ACLAgentMasterToken: "golf",
|
||||
ACLReplicationToken: "hotel",
|
||||
}
|
||||
|
||||
tokens := `{
|
||||
"agent" : "india",
|
||||
"agent_master" : "juliett",
|
||||
"default": "kilo",
|
||||
"replication" : "lima"
|
||||
}`
|
||||
|
||||
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
|
||||
require.NoError(a.loadTokens(cfg))
|
||||
|
||||
// no updates since token persistence is not enabled
|
||||
require.Equal("echo", a.tokens.UserToken())
|
||||
require.Equal("foxtrot", a.tokens.AgentToken())
|
||||
require.Equal("golf", a.tokens.AgentMasterToken())
|
||||
require.Equal("hotel", a.tokens.ReplicationToken())
|
||||
|
||||
a.config.ACLEnableTokenPersistence = true
|
||||
require.NoError(a.loadTokens(cfg))
|
||||
|
||||
require.Equal("india", a.tokens.AgentToken())
|
||||
require.Equal("juliett", a.tokens.AgentMasterToken())
|
||||
require.Equal("kilo", a.tokens.UserToken())
|
||||
require.Equal("lima", a.tokens.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("persisted-tokens-override", func(t *testing.T) {
|
||||
tokens := `{
|
||||
"agent" : "mike",
|
||||
"agent_master" : "november",
|
||||
"default": "oscar",
|
||||
"replication" : "papa"
|
||||
}`
|
||||
|
||||
cfg := &config.RuntimeConfig{
|
||||
ACLToken: "quebec",
|
||||
ACLAgentToken: "romeo",
|
||||
ACLAgentMasterToken: "sierra",
|
||||
ACLReplicationToken: "tango",
|
||||
}
|
||||
|
||||
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
|
||||
require.NoError(a.loadTokens(cfg))
|
||||
|
||||
require.Equal("mike", a.tokens.AgentToken())
|
||||
require.Equal("november", a.tokens.AgentMasterToken())
|
||||
require.Equal("oscar", a.tokens.UserToken())
|
||||
require.Equal("papa", a.tokens.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("partial-persisted", func(t *testing.T) {
|
||||
tokens := `{
|
||||
"agent" : "uniform",
|
||||
"agent_master" : "victor"
|
||||
}`
|
||||
|
||||
cfg := &config.RuntimeConfig{
|
||||
ACLToken: "whiskey",
|
||||
ACLAgentToken: "xray",
|
||||
ACLAgentMasterToken: "yankee",
|
||||
ACLReplicationToken: "zulu",
|
||||
}
|
||||
|
||||
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
|
||||
require.NoError(a.loadTokens(cfg))
|
||||
|
||||
require.Equal("uniform", a.tokens.AgentToken())
|
||||
require.Equal("victor", a.tokens.AgentMasterToken())
|
||||
require.Equal("whiskey", a.tokens.UserToken())
|
||||
require.Equal("zulu", a.tokens.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("persistence-error-not-json", func(t *testing.T) {
|
||||
cfg := &config.RuntimeConfig{
|
||||
ACLToken: "one",
|
||||
ACLAgentToken: "two",
|
||||
ACLAgentMasterToken: "three",
|
||||
ACLReplicationToken: "four",
|
||||
}
|
||||
|
||||
require.NoError(ioutil.WriteFile(tokensFullPath, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, 0600))
|
||||
err := a.loadTokens(cfg)
|
||||
require.Error(err)
|
||||
|
||||
require.Equal("one", a.tokens.UserToken())
|
||||
require.Equal("two", a.tokens.AgentToken())
|
||||
require.Equal("three", a.tokens.AgentMasterToken())
|
||||
require.Equal("four", a.tokens.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("persistence-error-wrong-top-level", func(t *testing.T) {
|
||||
cfg := &config.RuntimeConfig{
|
||||
ACLToken: "alfa",
|
||||
ACLAgentToken: "bravo",
|
||||
ACLAgentMasterToken: "charlie",
|
||||
ACLReplicationToken: "foxtrot",
|
||||
}
|
||||
|
||||
require.NoError(ioutil.WriteFile(tokensFullPath, []byte("[1,2,3]"), 0600))
|
||||
err := a.loadTokens(cfg)
|
||||
require.Error(err)
|
||||
|
||||
require.Equal("alfa", a.tokens.UserToken())
|
||||
require.Equal("bravo", a.tokens.AgentToken())
|
||||
require.Equal("charlie", a.tokens.AgentMasterToken())
|
||||
require.Equal("foxtrot", a.tokens.ReplicationToken())
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgent_SecurityChecks(t *testing.T) {
|
||||
t.Parallel()
|
||||
hcl := `
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
|
@ -63,6 +64,10 @@ type RuntimeConfig struct {
|
|||
// hcl: acl.enabled = boolean
|
||||
ACLsEnabled bool
|
||||
|
||||
// TODO: remove old fields
|
||||
// TODO: set DataDir as well
|
||||
ACLTokens token.Config
|
||||
|
||||
// ACLAgentMasterToken is a special token that has full read and write
|
||||
// privileges for this agent, and can be used to call agent endpoints
|
||||
// when no servers are available.
|
||||
|
|
|
@ -79,6 +79,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
|
||||
d.RuntimeConfig = cfg
|
||||
d.Tokens = new(token.Store)
|
||||
|
||||
// cache-types are not registered yet, but they won't be used until the components are started.
|
||||
d.Cache = cache.New(cfg.Cache)
|
||||
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
|
||||
|
|
|
@ -0,0 +1,190 @@
|
|||
package token
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/hashicorp/consul/lib/file"
|
||||
)
|
||||
|
||||
// Logger used by Store.Load to report warnings.
|
||||
type Logger interface {
|
||||
Warn(msg string, args ...interface{})
|
||||
}
|
||||
|
||||
// Config used by Store.Load, which includes tokens and settings for persistence.
|
||||
type Config struct {
|
||||
EnablePersistence bool
|
||||
DataDir string
|
||||
ACLDefaultToken string
|
||||
ACLAgentToken string
|
||||
ACLAgentMasterToken string
|
||||
ACLReplicationToken string
|
||||
}
|
||||
|
||||
const tokensPath = "acl-tokens.json"
|
||||
|
||||
// Load tokens from Config and optionally from a persisted file in the cfg.DataDir.
|
||||
// If a token exists in both the persisted file and in the Config a warning will
|
||||
// be logged and the persisted token will be used.
|
||||
//
|
||||
// Failures to load the persisted file will result in loading tokens from the
|
||||
// config before returning the error.
|
||||
func (t *Store) Load(cfg Config, logger Logger) error {
|
||||
t.persistenceLock.RLock()
|
||||
if !cfg.EnablePersistence {
|
||||
t.persistence = nil
|
||||
t.persistenceLock.RUnlock()
|
||||
loadTokens(t, cfg, persistedTokens{}, logger)
|
||||
return nil
|
||||
}
|
||||
|
||||
defer t.persistenceLock.RUnlock()
|
||||
t.persistence = &fileStore{
|
||||
filename: filepath.Join(cfg.DataDir, tokensPath),
|
||||
logger: logger,
|
||||
}
|
||||
return t.persistence.load(t, cfg)
|
||||
}
|
||||
|
||||
// WithPersistenceLock executes f while hold a lock. If f returns a nil error,
|
||||
// the tokens in Store will be persisted to the tokens file. Otherwise no
|
||||
// tokens will be persisted, and the error from f will be returned.
|
||||
//
|
||||
// The lock is held so that the writes are persisted before some other thread
|
||||
// can change the value.
|
||||
func (t *Store) WithPersistenceLock(f func() error) error {
|
||||
t.persistenceLock.Lock()
|
||||
if t.persistence == nil {
|
||||
t.persistenceLock.Unlock()
|
||||
return f()
|
||||
}
|
||||
defer t.persistenceLock.Unlock()
|
||||
return t.persistence.withPersistenceLock(t, f)
|
||||
}
|
||||
|
||||
type persistedTokens struct {
|
||||
Replication string `json:"replication,omitempty"`
|
||||
AgentMaster string `json:"agent_master,omitempty"`
|
||||
Default string `json:"default,omitempty"`
|
||||
Agent string `json:"agent,omitempty"`
|
||||
}
|
||||
|
||||
type fileStore struct {
|
||||
filename string
|
||||
logger Logger
|
||||
}
|
||||
|
||||
func (p *fileStore) load(s *Store, cfg Config) error {
|
||||
tokens, err := readPersistedFromFile(p.filename)
|
||||
if err != nil {
|
||||
p.logger.Warn("unable to load persisted tokens", "error", err)
|
||||
}
|
||||
loadTokens(s, cfg, tokens, p.logger)
|
||||
return err
|
||||
}
|
||||
|
||||
func loadTokens(s *Store, cfg Config, tokens persistedTokens, logger Logger) {
|
||||
if tokens.Default != "" {
|
||||
s.UpdateUserToken(tokens.Default, TokenSourceAPI)
|
||||
|
||||
if cfg.ACLDefaultToken != "" {
|
||||
logger.Warn("\"default\" token present in both the configuration and persisted token store, using the persisted token")
|
||||
}
|
||||
} else {
|
||||
s.UpdateUserToken(cfg.ACLDefaultToken, TokenSourceConfig)
|
||||
}
|
||||
|
||||
if tokens.Agent != "" {
|
||||
s.UpdateAgentToken(tokens.Agent, TokenSourceAPI)
|
||||
|
||||
if cfg.ACLAgentToken != "" {
|
||||
logger.Warn("\"agent\" token present in both the configuration and persisted token store, using the persisted token")
|
||||
}
|
||||
} else {
|
||||
s.UpdateAgentToken(cfg.ACLAgentToken, TokenSourceConfig)
|
||||
}
|
||||
|
||||
if tokens.AgentMaster != "" {
|
||||
s.UpdateAgentMasterToken(tokens.AgentMaster, TokenSourceAPI)
|
||||
|
||||
if cfg.ACLAgentMasterToken != "" {
|
||||
logger.Warn("\"agent_master\" token present in both the configuration and persisted token store, using the persisted token")
|
||||
}
|
||||
} else {
|
||||
s.UpdateAgentMasterToken(cfg.ACLAgentMasterToken, TokenSourceConfig)
|
||||
}
|
||||
|
||||
if tokens.Replication != "" {
|
||||
s.UpdateReplicationToken(tokens.Replication, TokenSourceAPI)
|
||||
|
||||
if cfg.ACLReplicationToken != "" {
|
||||
logger.Warn("\"replication\" token present in both the configuration and persisted token store, using the persisted token")
|
||||
}
|
||||
} else {
|
||||
s.UpdateReplicationToken(cfg.ACLReplicationToken, TokenSourceConfig)
|
||||
}
|
||||
|
||||
loadEnterpriseTokens(s, cfg)
|
||||
}
|
||||
|
||||
func readPersistedFromFile(filename string) (persistedTokens, error) {
|
||||
tokens := persistedTokens{}
|
||||
|
||||
buf, err := ioutil.ReadFile(filename)
|
||||
switch {
|
||||
case os.IsNotExist(err):
|
||||
// non-existence is not an error we care about
|
||||
return tokens, nil
|
||||
case err != nil:
|
||||
return tokens, fmt.Errorf("failed reading tokens file %q: %w", filename, err)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(buf, &tokens); err != nil {
|
||||
return tokens, fmt.Errorf("failed to decode tokens file %q: %w", filename, err)
|
||||
}
|
||||
|
||||
return tokens, nil
|
||||
}
|
||||
|
||||
func (p *fileStore) withPersistenceLock(s *Store, f func() error) error {
|
||||
if err := f(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.saveToFile(s)
|
||||
}
|
||||
|
||||
func (p *fileStore) saveToFile(s *Store) error {
|
||||
tokens := persistedTokens{}
|
||||
if tok, source := s.UserTokenAndSource(); tok != "" && source == TokenSourceAPI {
|
||||
tokens.Default = tok
|
||||
}
|
||||
|
||||
if tok, source := s.AgentTokenAndSource(); tok != "" && source == TokenSourceAPI {
|
||||
tokens.Agent = tok
|
||||
}
|
||||
|
||||
if tok, source := s.AgentMasterTokenAndSource(); tok != "" && source == TokenSourceAPI {
|
||||
tokens.AgentMaster = tok
|
||||
}
|
||||
|
||||
if tok, source := s.ReplicationTokenAndSource(); tok != "" && source == TokenSourceAPI {
|
||||
tokens.Replication = tok
|
||||
}
|
||||
|
||||
data, err := json.Marshal(tokens)
|
||||
if err != nil {
|
||||
p.logger.Warn("failed to persist tokens", "error", err)
|
||||
return fmt.Errorf("Failed to marshal tokens for persistence: %v", err)
|
||||
}
|
||||
|
||||
if err := file.WriteAtomicWithPerms(p.filename, data, 0700, 0600); err != nil {
|
||||
p.logger.Warn("failed to persist tokens", "error", err)
|
||||
return fmt.Errorf("Failed to persist tokens - %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,213 @@
|
|||
package token
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestStore_Load(t *testing.T) {
|
||||
dataDir := testutil.TempDir(t, "datadir")
|
||||
tokenFile := filepath.Join(dataDir, tokensPath)
|
||||
logger := hclog.New(nil)
|
||||
store := new(Store)
|
||||
|
||||
t.Run("with empty store", func(t *testing.T) {
|
||||
cfg := Config{
|
||||
DataDir: dataDir,
|
||||
ACLAgentToken: "alfa",
|
||||
ACLAgentMasterToken: "bravo",
|
||||
ACLDefaultToken: "charlie",
|
||||
ACLReplicationToken: "delta",
|
||||
}
|
||||
require.NoError(t, store.Load(cfg, logger))
|
||||
require.Equal(t, "alfa", store.AgentToken())
|
||||
require.Equal(t, "bravo", store.AgentMasterToken())
|
||||
require.Equal(t, "charlie", store.UserToken())
|
||||
require.Equal(t, "delta", store.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("updated from Config", func(t *testing.T) {
|
||||
cfg := Config{
|
||||
DataDir: dataDir,
|
||||
ACLDefaultToken: "echo",
|
||||
ACLAgentToken: "foxtrot",
|
||||
ACLAgentMasterToken: "golf",
|
||||
ACLReplicationToken: "hotel",
|
||||
}
|
||||
// ensures no error for missing persisted tokens file
|
||||
require.NoError(t, store.Load(cfg, logger))
|
||||
require.Equal(t, "echo", store.UserToken())
|
||||
require.Equal(t, "foxtrot", store.AgentToken())
|
||||
require.Equal(t, "golf", store.AgentMasterToken())
|
||||
require.Equal(t, "hotel", store.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("with persisted tokens", func(t *testing.T) {
|
||||
cfg := Config{
|
||||
DataDir: dataDir,
|
||||
ACLDefaultToken: "echo",
|
||||
ACLAgentToken: "foxtrot",
|
||||
ACLAgentMasterToken: "golf",
|
||||
ACLReplicationToken: "hotel",
|
||||
}
|
||||
|
||||
tokens := `{
|
||||
"agent" : "india",
|
||||
"agent_master" : "juliett",
|
||||
"default": "kilo",
|
||||
"replication" : "lima"
|
||||
}`
|
||||
|
||||
require.NoError(t, ioutil.WriteFile(tokenFile, []byte(tokens), 0600))
|
||||
require.NoError(t, store.Load(cfg, logger))
|
||||
|
||||
// no updates since token persistence is not enabled
|
||||
require.Equal(t, "echo", store.UserToken())
|
||||
require.Equal(t, "foxtrot", store.AgentToken())
|
||||
require.Equal(t, "golf", store.AgentMasterToken())
|
||||
require.Equal(t, "hotel", store.ReplicationToken())
|
||||
|
||||
cfg.EnablePersistence = true
|
||||
require.NoError(t, store.Load(cfg, logger))
|
||||
|
||||
require.Equal(t, "india", store.AgentToken())
|
||||
require.Equal(t, "juliett", store.AgentMasterToken())
|
||||
require.Equal(t, "kilo", store.UserToken())
|
||||
require.Equal(t, "lima", store.ReplicationToken())
|
||||
|
||||
// check store persistence was enabled
|
||||
require.NotNil(t, store.persistence)
|
||||
})
|
||||
|
||||
t.Run("with persisted tokens, persisted tokens override config", func(t *testing.T) {
|
||||
tokens := `{
|
||||
"agent" : "mike",
|
||||
"agent_master" : "november",
|
||||
"default": "oscar",
|
||||
"replication" : "papa"
|
||||
}`
|
||||
|
||||
cfg := Config{
|
||||
EnablePersistence: true,
|
||||
DataDir: dataDir,
|
||||
ACLDefaultToken: "quebec",
|
||||
ACLAgentToken: "romeo",
|
||||
ACLAgentMasterToken: "sierra",
|
||||
ACLReplicationToken: "tango",
|
||||
}
|
||||
|
||||
require.NoError(t, ioutil.WriteFile(tokenFile, []byte(tokens), 0600))
|
||||
require.NoError(t, store.Load(cfg, logger))
|
||||
|
||||
require.Equal(t, "mike", store.AgentToken())
|
||||
require.Equal(t, "november", store.AgentMasterToken())
|
||||
require.Equal(t, "oscar", store.UserToken())
|
||||
require.Equal(t, "papa", store.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("with some persisted tokens", func(t *testing.T) {
|
||||
tokens := `{
|
||||
"agent" : "uniform",
|
||||
"agent_master" : "victor"
|
||||
}`
|
||||
|
||||
cfg := Config{
|
||||
EnablePersistence: true,
|
||||
DataDir: dataDir,
|
||||
ACLDefaultToken: "whiskey",
|
||||
ACLAgentToken: "xray",
|
||||
ACLAgentMasterToken: "yankee",
|
||||
ACLReplicationToken: "zulu",
|
||||
}
|
||||
|
||||
require.NoError(t, ioutil.WriteFile(tokenFile, []byte(tokens), 0600))
|
||||
require.NoError(t, store.Load(cfg, logger))
|
||||
|
||||
require.Equal(t, "uniform", store.AgentToken())
|
||||
require.Equal(t, "victor", store.AgentMasterToken())
|
||||
require.Equal(t, "whiskey", store.UserToken())
|
||||
require.Equal(t, "zulu", store.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("persisted file contains invalid data", func(t *testing.T) {
|
||||
cfg := Config{
|
||||
EnablePersistence: true,
|
||||
DataDir: dataDir,
|
||||
ACLDefaultToken: "one",
|
||||
ACLAgentToken: "two",
|
||||
ACLAgentMasterToken: "three",
|
||||
ACLReplicationToken: "four",
|
||||
}
|
||||
|
||||
require.NoError(t, ioutil.WriteFile(tokenFile, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, 0600))
|
||||
err := store.Load(cfg, logger)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "failed to decode tokens file")
|
||||
|
||||
require.Equal(t, "one", store.UserToken())
|
||||
require.Equal(t, "two", store.AgentToken())
|
||||
require.Equal(t, "three", store.AgentMasterToken())
|
||||
require.Equal(t, "four", store.ReplicationToken())
|
||||
})
|
||||
|
||||
t.Run("persisted file contains invalid json", func(t *testing.T) {
|
||||
cfg := Config{
|
||||
EnablePersistence: true,
|
||||
DataDir: dataDir,
|
||||
ACLDefaultToken: "alfa",
|
||||
ACLAgentToken: "bravo",
|
||||
ACLAgentMasterToken: "charlie",
|
||||
ACLReplicationToken: "foxtrot",
|
||||
}
|
||||
|
||||
require.NoError(t, ioutil.WriteFile(tokenFile, []byte("[1,2,3]"), 0600))
|
||||
err := store.Load(cfg, logger)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "failed to decode tokens file")
|
||||
|
||||
require.Equal(t, "alfa", store.UserToken())
|
||||
require.Equal(t, "bravo", store.AgentToken())
|
||||
require.Equal(t, "charlie", store.AgentMasterToken())
|
||||
require.Equal(t, "foxtrot", store.ReplicationToken())
|
||||
})
|
||||
}
|
||||
|
||||
func TestStore_WithPersistenceLock(t *testing.T) {
|
||||
dataDir := testutil.TempDir(t, "datadir")
|
||||
store := new(Store)
|
||||
cfg := Config{
|
||||
EnablePersistence: true,
|
||||
DataDir: dataDir,
|
||||
ACLDefaultToken: "default-token",
|
||||
ACLAgentToken: "agent-token",
|
||||
ACLAgentMasterToken: "master-token",
|
||||
ACLReplicationToken: "replication-token",
|
||||
}
|
||||
err := store.Load(cfg, hclog.New(nil))
|
||||
require.NoError(t, err)
|
||||
|
||||
f := func() error {
|
||||
updated := store.UpdateUserToken("the-new-token", TokenSourceAPI)
|
||||
require.True(t, updated)
|
||||
|
||||
updated = store.UpdateAgentMasterToken("the-new-master-token", TokenSourceAPI)
|
||||
require.True(t, updated)
|
||||
return nil
|
||||
}
|
||||
|
||||
err = store.WithPersistenceLock(f)
|
||||
require.NoError(t, err)
|
||||
|
||||
tokens, err := readPersistedFromFile(filepath.Join(dataDir, tokensPath))
|
||||
require.NoError(t, err)
|
||||
expected := persistedTokens{
|
||||
Default: "the-new-token",
|
||||
AgentMaster: "the-new-master-token",
|
||||
}
|
||||
require.Equal(t, expected, tokens)
|
||||
}
|
|
@ -77,6 +77,12 @@ type Store struct {
|
|||
watchers map[int]watcher
|
||||
watcherIndex int
|
||||
|
||||
persistence *fileStore
|
||||
// persistenceLock is used to synchronize access to the persisted token store
|
||||
// within the data directory. This will prevent loading while writing as well as
|
||||
// multiple concurrent writes.
|
||||
persistenceLock sync.RWMutex
|
||||
|
||||
// enterpriseTokens contains tokens only used in consul-enterprise
|
||||
enterpriseTokens
|
||||
}
|
||||
|
@ -158,7 +164,7 @@ func (t *Store) sendNotificationLocked(kinds ...TokenKind) {
|
|||
// Returns true if it was changed.
|
||||
func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
|
||||
t.l.Lock()
|
||||
changed := (t.userToken != token || t.userTokenSource != source)
|
||||
changed := t.userToken != token || t.userTokenSource != source
|
||||
t.userToken = token
|
||||
t.userTokenSource = source
|
||||
if changed {
|
||||
|
@ -172,7 +178,7 @@ func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
|
|||
// Returns true if it was changed.
|
||||
func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
|
||||
t.l.Lock()
|
||||
changed := (t.agentToken != token || t.agentTokenSource != source)
|
||||
changed := t.agentToken != token || t.agentTokenSource != source
|
||||
t.agentToken = token
|
||||
t.agentTokenSource = source
|
||||
if changed {
|
||||
|
@ -186,7 +192,7 @@ func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
|
|||
// Returns true if it was changed.
|
||||
func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool {
|
||||
t.l.Lock()
|
||||
changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source)
|
||||
changed := t.agentMasterToken != token || t.agentMasterTokenSource != source
|
||||
t.agentMasterToken = token
|
||||
t.agentMasterTokenSource = source
|
||||
if changed {
|
||||
|
@ -200,7 +206,7 @@ func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool {
|
|||
// Returns true if it was changed.
|
||||
func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
|
||||
t.l.Lock()
|
||||
changed := (t.replicationToken != token || t.replicationTokenSource != source)
|
||||
changed := t.replicationToken != token || t.replicationTokenSource != source
|
||||
t.replicationToken = token
|
||||
t.replicationTokenSource = source
|
||||
if changed {
|
||||
|
|
|
@ -7,6 +7,10 @@ type enterpriseTokens struct {
|
|||
}
|
||||
|
||||
// enterpriseAgentToken OSS stub
|
||||
func (s *Store) enterpriseAgentToken() string {
|
||||
func (t *Store) enterpriseAgentToken() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// loadEnterpriseTokens is a noop stub for the func defined agent_ent.go
|
||||
func loadEnterpriseTokens(_ *Store, _ Config) {
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue