Merge pull request #8554 from hashicorp/dnephin/agent-setup-persisted-tokens

agent: move token persistence from agent into token.Store
This commit is contained in:
Daniel Nephin 2020-09-03 17:29:21 -04:00 committed by GitHub
commit 4c9ed41eab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 554 additions and 425 deletions

View File

@ -184,7 +184,9 @@ func TestACL_AgentMasterToken(t *testing.T) {
t.Parallel()
a := NewTestACLAgent(t, t.Name(), TestACLConfig(), nil, nil)
a.loadTokens(a.config)
err := a.tokens.Load(a.config.ACLTokens, a.logger)
require.NoError(t, err)
authz, err := a.resolveToken("towel")
require.NotNil(t, authz)
require.Nil(t, err)

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/agent/dns"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
@ -39,7 +40,6 @@ import (
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
@ -67,9 +67,6 @@ const (
checksDir = "checks"
checkStateDir = "checks/state"
// Name of the file tokens will be persisted within
tokensPath = "acl-tokens.json"
// Default reasons for node/service maintenance mode
defaultNodeMaintReason = "Maintenance mode is enabled for this node, " +
"but no reason was provided. This is a default message."
@ -294,11 +291,6 @@ type Agent struct {
// based on the current consul configuration.
tlsConfigurator *tlsutil.Configurator
// persistedTokensLock is used to synchronize access to the persisted token
// store within the data directory. This will prevent loading while writing as
// well as multiple concurrent writes.
persistedTokensLock sync.RWMutex
// httpConnLimiter is used to limit connections to the HTTP server by client
// IP.
httpConnLimiter connlimit.Limiter
@ -372,10 +364,8 @@ func New(bd BaseDeps) (*Agent, error) {
// pass the agent itself so its safe to move here.
a.registerCache()
// TODO: move to newBaseDeps
// TODO: handle error
a.loadTokens(a.config)
a.loadEnterpriseTokens(a.config)
// TODO: why do we ignore failure to load persisted tokens?
_ = a.tokens.Load(bd.RuntimeConfig.ACLTokens, a.logger)
// TODO: pass in a fully populated apiServers into Agent.New
a.apiServers = NewAPIServers(a.logger)
@ -3352,90 +3342,6 @@ func (a *Agent) unloadChecks() error {
return nil
}
type persistedTokens struct {
Replication string `json:"replication,omitempty"`
AgentMaster string `json:"agent_master,omitempty"`
Default string `json:"default,omitempty"`
Agent string `json:"agent,omitempty"`
}
func (a *Agent) getPersistedTokens() (*persistedTokens, error) {
persistedTokens := &persistedTokens{}
if !a.config.ACLEnableTokenPersistence {
return persistedTokens, nil
}
a.persistedTokensLock.RLock()
defer a.persistedTokensLock.RUnlock()
tokensFullPath := filepath.Join(a.config.DataDir, tokensPath)
buf, err := ioutil.ReadFile(tokensFullPath)
if err != nil {
if os.IsNotExist(err) {
// non-existence is not an error we care about
return persistedTokens, nil
}
return persistedTokens, fmt.Errorf("failed reading tokens file %q: %s", tokensFullPath, err)
}
if err := json.Unmarshal(buf, persistedTokens); err != nil {
return persistedTokens, fmt.Errorf("failed to decode tokens file %q: %s", tokensFullPath, err)
}
return persistedTokens, nil
}
func (a *Agent) loadTokens(conf *config.RuntimeConfig) error {
persistedTokens, persistenceErr := a.getPersistedTokens()
if persistenceErr != nil {
a.logger.Warn("unable to load persisted tokens", "error", persistenceErr)
}
if persistedTokens.Default != "" {
a.tokens.UpdateUserToken(persistedTokens.Default, token.TokenSourceAPI)
if conf.ACLToken != "" {
a.logger.Warn("\"default\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
a.tokens.UpdateUserToken(conf.ACLToken, token.TokenSourceConfig)
}
if persistedTokens.Agent != "" {
a.tokens.UpdateAgentToken(persistedTokens.Agent, token.TokenSourceAPI)
if conf.ACLAgentToken != "" {
a.logger.Warn("\"agent\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
a.tokens.UpdateAgentToken(conf.ACLAgentToken, token.TokenSourceConfig)
}
if persistedTokens.AgentMaster != "" {
a.tokens.UpdateAgentMasterToken(persistedTokens.AgentMaster, token.TokenSourceAPI)
if conf.ACLAgentMasterToken != "" {
a.logger.Warn("\"agent_master\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
a.tokens.UpdateAgentMasterToken(conf.ACLAgentMasterToken, token.TokenSourceConfig)
}
if persistedTokens.Replication != "" {
a.tokens.UpdateReplicationToken(persistedTokens.Replication, token.TokenSourceAPI)
if conf.ACLReplicationToken != "" {
a.logger.Warn("\"replication\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
a.tokens.UpdateReplicationToken(conf.ACLReplicationToken, token.TokenSourceConfig)
}
return persistenceErr
}
// snapshotCheckState is used to snapshot the current state of the health
// checks. This is done before we reload our checks, so that we can properly
// restore into the same state.
@ -3615,8 +3521,7 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
// Reload tokens - should be done before all the other loading
// to ensure the correct tokens are available for attaching to
// the checks and service registrations.
a.loadTokens(newCfg)
a.loadEnterpriseTokens(newCfg)
a.tokens.Load(newCfg.ACLTokens, a.logger)
if err := a.tlsConfigurator.Update(newCfg.ToTLSUtilConfig()); err != nil {
return fmt.Errorf("Failed reloading tls configuration: %s", err)

View File

@ -1,10 +1,8 @@
package agent
import (
"encoding/json"
"fmt"
"net/http"
"path/filepath"
"strconv"
"strings"
@ -21,7 +19,6 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/file"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/logging/monitor"
"github.com/hashicorp/consul/types"
@ -1233,79 +1230,42 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in
return nil, nil
}
if s.agent.config.ACLEnableTokenPersistence {
// we hold the lock around updating the internal token store
// as well as persisting the tokens because we don't want to write
// into the store to have something else wipe it out before we can
// persist everything (like an agent config reload). The token store
// lock is only held for those operations so other go routines that
// just need to read some token out of the store will not be impacted
// any more than they would be without token persistence.
s.agent.persistedTokensLock.Lock()
defer s.agent.persistedTokensLock.Unlock()
}
// Figure out the target token.
target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/")
triggerAntiEntropySync := false
switch target {
case "acl_token", "default":
changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
if changed {
triggerAntiEntropySync = true
err = s.agent.tokens.WithPersistenceLock(func() error {
triggerAntiEntropySync := false
switch target {
case "acl_token", "default":
changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
if changed {
triggerAntiEntropySync = true
}
case "acl_agent_token", "agent":
changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
if changed {
triggerAntiEntropySync = true
}
case "acl_agent_master_token", "agent_master":
s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI)
case "acl_replication_token", "replication":
s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI)
default:
return NotFoundError{Reason: fmt.Sprintf("Token %q is unknown", target)}
}
case "acl_agent_token", "agent":
changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
if changed {
triggerAntiEntropySync = true
}
case "acl_agent_master_token", "agent_master":
s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI)
case "acl_replication_token", "replication":
s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI)
default:
resp.WriteHeader(http.StatusNotFound)
fmt.Fprintf(resp, "Token %q is unknown", target)
return nil, nil
}
if triggerAntiEntropySync {
s.agent.sync.SyncFull.Trigger()
}
if s.agent.config.ACLEnableTokenPersistence {
tokens := persistedTokens{}
if tok, source := s.agent.tokens.UserTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
tokens.Default = tok
}
if tok, source := s.agent.tokens.AgentTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
tokens.Agent = tok
}
if tok, source := s.agent.tokens.AgentMasterTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
tokens.AgentMaster = tok
}
if tok, source := s.agent.tokens.ReplicationTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
tokens.Replication = tok
}
data, err := json.Marshal(tokens)
if err != nil {
s.agent.logger.Warn("failed to persist tokens", "error", err)
return nil, fmt.Errorf("Failed to marshal tokens for persistence: %v", err)
}
if err := file.WriteAtomicWithPerms(filepath.Join(s.agent.config.DataDir, tokensPath), data, 0700, 0600); err != nil {
s.agent.logger.Warn("failed to persist tokens", "error", err)
return nil, fmt.Errorf("Failed to persist tokens - %v", err)
// TODO: is it safe to move this out of WithPersistenceLock?
if triggerAntiEntropySync {
s.agent.sync.SyncFull.Trigger()
}
return nil
})
if err != nil {
return nil, err
}
s.agent.logger.Info("Updated agent's ACL token", "token", target)

View File

@ -4774,13 +4774,14 @@ func TestAgent_Token(t *testing.T) {
init tokens
raw tokens
effective tokens
expectedErr error
}{
{
name: "bad token name",
method: "PUT",
url: "nope?token=root",
body: body("X"),
code: http.StatusNotFound,
name: "bad token name",
method: "PUT",
url: "nope?token=root",
body: body("X"),
expectedErr: NotFoundError{Reason: `Token "nope" is unknown`},
},
{
name: "bad JSON",
@ -4942,7 +4943,12 @@ func TestAgent_Token(t *testing.T) {
url := fmt.Sprintf("/v1/agent/token/%s", tt.url)
resp := httptest.NewRecorder()
req, _ := http.NewRequest(tt.method, url, tt.body)
_, err := a.srv.AgentToken(resp, req)
if tt.expectedErr != nil {
require.Equal(t, tt.expectedErr, err)
return
}
require.NoError(t, err)
require.Equal(t, tt.code, resp.Code)
require.Equal(t, tt.effective.user, a.tokens.UserToken())

View File

@ -23,10 +23,6 @@ func (a *Agent) initEnterprise(consulCfg *consul.Config) error {
return nil
}
// loadEnterpriseTokens is a noop stub for the func defined agent_ent.go
func (a *Agent) loadEnterpriseTokens(conf *config.RuntimeConfig) {
}
// reloadEnterprise is a noop stub for the func defined agent_ent.go
func (a *Agent) reloadEnterprise(conf *config.RuntimeConfig) error {
return nil

View File

@ -3345,163 +3345,6 @@ func TestAgent_reloadWatchesHTTPS(t *testing.T) {
}
}
func TestAgent_loadTokens(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, `
acl = {
enabled = true
tokens = {
agent = "alfa"
agent_master = "bravo",
default = "charlie"
replication = "delta"
}
}
`)
defer a.Shutdown()
require := require.New(t)
tokensFullPath := filepath.Join(a.config.DataDir, tokensPath)
t.Run("original-configuration", func(t *testing.T) {
require.Equal("alfa", a.tokens.AgentToken())
require.Equal("bravo", a.tokens.AgentMasterToken())
require.Equal("charlie", a.tokens.UserToken())
require.Equal("delta", a.tokens.ReplicationToken())
})
t.Run("updated-configuration", func(t *testing.T) {
cfg := &config.RuntimeConfig{
ACLToken: "echo",
ACLAgentToken: "foxtrot",
ACLAgentMasterToken: "golf",
ACLReplicationToken: "hotel",
}
// ensures no error for missing persisted tokens file
require.NoError(a.loadTokens(cfg))
require.Equal("echo", a.tokens.UserToken())
require.Equal("foxtrot", a.tokens.AgentToken())
require.Equal("golf", a.tokens.AgentMasterToken())
require.Equal("hotel", a.tokens.ReplicationToken())
})
t.Run("persisted-tokens", func(t *testing.T) {
cfg := &config.RuntimeConfig{
ACLToken: "echo",
ACLAgentToken: "foxtrot",
ACLAgentMasterToken: "golf",
ACLReplicationToken: "hotel",
}
tokens := `{
"agent" : "india",
"agent_master" : "juliett",
"default": "kilo",
"replication" : "lima"
}`
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
require.NoError(a.loadTokens(cfg))
// no updates since token persistence is not enabled
require.Equal("echo", a.tokens.UserToken())
require.Equal("foxtrot", a.tokens.AgentToken())
require.Equal("golf", a.tokens.AgentMasterToken())
require.Equal("hotel", a.tokens.ReplicationToken())
a.config.ACLEnableTokenPersistence = true
require.NoError(a.loadTokens(cfg))
require.Equal("india", a.tokens.AgentToken())
require.Equal("juliett", a.tokens.AgentMasterToken())
require.Equal("kilo", a.tokens.UserToken())
require.Equal("lima", a.tokens.ReplicationToken())
})
t.Run("persisted-tokens-override", func(t *testing.T) {
tokens := `{
"agent" : "mike",
"agent_master" : "november",
"default": "oscar",
"replication" : "papa"
}`
cfg := &config.RuntimeConfig{
ACLToken: "quebec",
ACLAgentToken: "romeo",
ACLAgentMasterToken: "sierra",
ACLReplicationToken: "tango",
}
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
require.NoError(a.loadTokens(cfg))
require.Equal("mike", a.tokens.AgentToken())
require.Equal("november", a.tokens.AgentMasterToken())
require.Equal("oscar", a.tokens.UserToken())
require.Equal("papa", a.tokens.ReplicationToken())
})
t.Run("partial-persisted", func(t *testing.T) {
tokens := `{
"agent" : "uniform",
"agent_master" : "victor"
}`
cfg := &config.RuntimeConfig{
ACLToken: "whiskey",
ACLAgentToken: "xray",
ACLAgentMasterToken: "yankee",
ACLReplicationToken: "zulu",
}
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
require.NoError(a.loadTokens(cfg))
require.Equal("uniform", a.tokens.AgentToken())
require.Equal("victor", a.tokens.AgentMasterToken())
require.Equal("whiskey", a.tokens.UserToken())
require.Equal("zulu", a.tokens.ReplicationToken())
})
t.Run("persistence-error-not-json", func(t *testing.T) {
cfg := &config.RuntimeConfig{
ACLToken: "one",
ACLAgentToken: "two",
ACLAgentMasterToken: "three",
ACLReplicationToken: "four",
}
require.NoError(ioutil.WriteFile(tokensFullPath, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, 0600))
err := a.loadTokens(cfg)
require.Error(err)
require.Equal("one", a.tokens.UserToken())
require.Equal("two", a.tokens.AgentToken())
require.Equal("three", a.tokens.AgentMasterToken())
require.Equal("four", a.tokens.ReplicationToken())
})
t.Run("persistence-error-wrong-top-level", func(t *testing.T) {
cfg := &config.RuntimeConfig{
ACLToken: "alfa",
ACLAgentToken: "bravo",
ACLAgentMasterToken: "charlie",
ACLReplicationToken: "foxtrot",
}
require.NoError(ioutil.WriteFile(tokensFullPath, []byte("[1,2,3]"), 0600))
err := a.loadTokens(cfg)
require.Error(err)
require.Equal("alfa", a.tokens.UserToken())
require.Equal("bravo", a.tokens.AgentToken())
require.Equal("charlie", a.tokens.AgentMasterToken())
require.Equal("foxtrot", a.tokens.ReplicationToken())
})
}
func TestAgent_SecurityChecks(t *testing.T) {
t.Parallel()
hcl := `

View File

@ -222,7 +222,7 @@ func (ac *AutoConfig) recordInitialConfiguration(resp *pbautoconf.AutoConfigResp
}
// ignoring the return value which would indicate a change in the token
_ = ac.acConfig.Tokens.UpdateAgentToken(config.ACLAgentToken, token.TokenSourceConfig)
_ = ac.acConfig.Tokens.UpdateAgentToken(config.ACLTokens.ACLAgentToken, token.TokenSourceConfig)
// extra a structs.SignedResponse from the AutoConfigResponse for use in cache prepopulation
signed, err := extractSignedResponse(resp)

View File

@ -22,6 +22,7 @@ import (
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
"github.com/hashicorp/consul/agent/dns"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
libtempl "github.com/hashicorp/consul/lib/template"
@ -799,6 +800,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
// ----------------------------------------------------------------
// build runtime config
//
dataDir := b.stringVal(c.DataDir)
rt = RuntimeConfig{
// non-user configurable values
ACLDisabledTTL: b.durationVal("acl.disabled_ttl", c.ACL.DisabledTTL),
@ -837,21 +839,25 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
GossipWANRetransmitMult: b.intVal(c.GossipWAN.RetransmitMult),
// ACL
ACLsEnabled: aclsEnabled,
ACLAgentMasterToken: b.stringValWithDefault(c.ACL.Tokens.AgentMaster, b.stringVal(c.ACLAgentMasterToken)),
ACLAgentToken: b.stringValWithDefault(c.ACL.Tokens.Agent, b.stringVal(c.ACLAgentToken)),
ACLDatacenter: primaryDatacenter,
ACLDefaultPolicy: b.stringValWithDefault(c.ACL.DefaultPolicy, b.stringVal(c.ACLDefaultPolicy)),
ACLDownPolicy: b.stringValWithDefault(c.ACL.DownPolicy, b.stringVal(c.ACLDownPolicy)),
ACLEnableKeyListPolicy: b.boolValWithDefault(c.ACL.EnableKeyListPolicy, b.boolVal(c.ACLEnableKeyListPolicy)),
ACLMasterToken: b.stringValWithDefault(c.ACL.Tokens.Master, b.stringVal(c.ACLMasterToken)),
ACLReplicationToken: b.stringValWithDefault(c.ACL.Tokens.Replication, b.stringVal(c.ACLReplicationToken)),
ACLTokenTTL: b.durationValWithDefault("acl.token_ttl", c.ACL.TokenTTL, b.durationVal("acl_ttl", c.ACLTTL)),
ACLPolicyTTL: b.durationVal("acl.policy_ttl", c.ACL.PolicyTTL),
ACLRoleTTL: b.durationVal("acl.role_ttl", c.ACL.RoleTTL),
ACLToken: b.stringValWithDefault(c.ACL.Tokens.Default, b.stringVal(c.ACLToken)),
ACLTokenReplication: b.boolValWithDefault(c.ACL.TokenReplication, b.boolValWithDefault(c.EnableACLReplication, enableTokenReplication)),
ACLEnableTokenPersistence: b.boolValWithDefault(c.ACL.EnableTokenPersistence, false),
ACLsEnabled: aclsEnabled,
ACLDatacenter: primaryDatacenter,
ACLDefaultPolicy: b.stringValWithDefault(c.ACL.DefaultPolicy, b.stringVal(c.ACLDefaultPolicy)),
ACLDownPolicy: b.stringValWithDefault(c.ACL.DownPolicy, b.stringVal(c.ACLDownPolicy)),
ACLEnableKeyListPolicy: b.boolValWithDefault(c.ACL.EnableKeyListPolicy, b.boolVal(c.ACLEnableKeyListPolicy)),
ACLMasterToken: b.stringValWithDefault(c.ACL.Tokens.Master, b.stringVal(c.ACLMasterToken)),
ACLTokenTTL: b.durationValWithDefault("acl.token_ttl", c.ACL.TokenTTL, b.durationVal("acl_ttl", c.ACLTTL)),
ACLPolicyTTL: b.durationVal("acl.policy_ttl", c.ACL.PolicyTTL),
ACLRoleTTL: b.durationVal("acl.role_ttl", c.ACL.RoleTTL),
ACLTokenReplication: b.boolValWithDefault(c.ACL.TokenReplication, b.boolValWithDefault(c.EnableACLReplication, enableTokenReplication)),
ACLTokens: token.Config{
DataDir: dataDir,
EnablePersistence: b.boolValWithDefault(c.ACL.EnableTokenPersistence, false),
ACLDefaultToken: b.stringValWithDefault(c.ACL.Tokens.Default, b.stringVal(c.ACLToken)),
ACLAgentToken: b.stringValWithDefault(c.ACL.Tokens.Agent, b.stringVal(c.ACLAgentToken)),
ACLAgentMasterToken: b.stringValWithDefault(c.ACL.Tokens.AgentMaster, b.stringVal(c.ACLAgentMasterToken)),
ACLReplicationToken: b.stringValWithDefault(c.ACL.Tokens.Replication, b.stringVal(c.ACLReplicationToken)),
},
// Autopilot
AutopilotCleanupDeadServers: b.boolVal(c.Autopilot.CleanupDeadServers),
@ -957,7 +963,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
ConnectTestCALeafRootChangeSpread: b.durationVal("connect.test_ca_leaf_root_change_spread", c.Connect.TestCALeafRootChangeSpread),
ExposeMinPort: exposeMinPort,
ExposeMaxPort: exposeMaxPort,
DataDir: b.stringVal(c.DataDir),
DataDir: dataDir,
Datacenter: datacenter,
DefaultQueryTime: b.durationVal("default_query_time", c.DefaultQueryTime),
DevMode: b.boolVal(b.devMode),
@ -1072,10 +1078,8 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_rate must be strictly positive, was: %v", rt.Cache.EntryFetchRate)
}
if entCfg, err := b.BuildEnterpriseRuntimeConfig(&c); err != nil {
return RuntimeConfig{}, err
} else {
rt.EnterpriseRuntimeConfig = entCfg
if err := b.BuildEnterpriseRuntimeConfig(&rt, &c); err != nil {
return rt, err
}
if rt.BootstrapExpect == 1 {
@ -1363,7 +1367,8 @@ func (b *Builder) Validate(rt RuntimeConfig) error {
b.warn(err.Error())
}
return nil
err := b.validateEnterpriseConfig(rt)
return err
}
// addrUnique checks if the given address is already in use for another

View File

@ -51,8 +51,12 @@ func (e enterpriseConfigKeyError) Error() string {
return fmt.Sprintf("%q is a Consul Enterprise configuration and will have no effect", e.key)
}
func (_ *Builder) BuildEnterpriseRuntimeConfig(_ *Config) (EnterpriseRuntimeConfig, error) {
return EnterpriseRuntimeConfig{}, nil
func (*Builder) BuildEnterpriseRuntimeConfig(_ *RuntimeConfig, _ *Config) error {
return nil
}
func (*Builder) validateEnterpriseConfig(_ RuntimeConfig) error {
return nil
}
// validateEnterpriseConfig is a function to validate the enterprise specific

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
@ -63,19 +64,7 @@ type RuntimeConfig struct {
// hcl: acl.enabled = boolean
ACLsEnabled bool
// ACLAgentMasterToken is a special token that has full read and write
// privileges for this agent, and can be used to call agent endpoints
// when no servers are available.
//
// hcl: acl.tokens.agent_master = string
ACLAgentMasterToken string
// ACLAgentToken is the default token used to make requests for the agent
// itself, such as for registering itself with the catalog. If not
// configured, the 'acl_token' will be used.
//
// hcl: acl.tokens.agent = string
ACLAgentToken string
ACLTokens token.Config
// ACLDatacenter is the central datacenter that holds authoritative
// ACL records. This must be the same for the entire cluster.
@ -123,16 +112,6 @@ type RuntimeConfig struct {
// hcl: acl.tokens.master = string
ACLMasterToken string
// ACLReplicationToken is used to replicate data locally from the
// PrimaryDatacenter. Replication is only available on servers in
// datacenters other than the PrimaryDatacenter
//
// DEPRECATED (ACL-Legacy-Compat): Setting this to a non-empty value
// also enables legacy ACL replication if ACLs are enabled and in legacy mode.
//
// hcl: acl.tokens.replication = string
ACLReplicationToken string
// ACLtokenReplication is used to indicate that both tokens and policies
// should be replicated instead of just policies
//
@ -157,16 +136,6 @@ type RuntimeConfig struct {
// hcl: acl.role_ttl = "duration"
ACLRoleTTL time.Duration
// ACLToken is the default token used to make requests if a per-request
// token is not provided. If not configured the 'anonymous' token is used.
//
// hcl: acl.tokens.default = string
ACLToken string
// ACLEnableTokenPersistence determines whether or not tokens set via the agent HTTP API
// should be persisted to disk and reloaded when an agent restarts.
ACLEnableTokenPersistence bool
// AutopilotCleanupDeadServers enables the automatic cleanup of dead servers when new ones
// are added to the peer list. Defaults to true.
//

View File

@ -6,11 +6,9 @@ var entMetaJSON = `{}`
var entRuntimeConfigSanitize = `{}`
var entFullDNSJSONConfig = ``
var entTokenConfigSanitize = `"EnterpriseConfig": {},`
var entFullDNSHCLConfig = ``
var entFullRuntimeConfig = EnterpriseRuntimeConfig{}
func entFullRuntimeConfig(rt *RuntimeConfig) {}
var enterpriseNonVotingServerWarnings []string = []string{enterpriseConfigKeyError{key: "non_voting_server"}.Error()}

View File

@ -21,6 +21,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/sdk/testutil"
@ -1613,7 +1614,7 @@ func TestBuilder_BuildAndValide_ConfigFlagsAndEdgecases(t *testing.T) {
json: []string{`{ "acl_replication_token": "a" }`},
hcl: []string{`acl_replication_token = "a"`},
patch: func(rt *RuntimeConfig) {
rt.ACLReplicationToken = "a"
rt.ACLTokens.ACLReplicationToken = "a"
rt.ACLTokenReplication = true
rt.DataDir = dataDir
},
@ -4386,6 +4387,13 @@ func testConfig(t *testing.T, tests []configTest, dataDir string) {
if tt.patch != nil {
tt.patch(&expected)
}
// both DataDir fields should always be the same, so test for the
// invariant, and than updated the expected, so that every test
// case does not need to set this field.
require.Equal(t, actual.DataDir, actual.ACLTokens.DataDir)
expected.ACLTokens.DataDir = actual.ACLTokens.DataDir
require.Equal(t, expected, actual)
})
}
@ -5879,20 +5887,24 @@ func TestFullConfig(t *testing.T) {
// user configurable values
ACLAgentMasterToken: "64fd0e08",
ACLAgentToken: "bed2377c",
ACLTokens: token.Config{
EnablePersistence: true,
DataDir: dataDir,
ACLDefaultToken: "418fdff1",
ACLAgentToken: "bed2377c",
ACLAgentMasterToken: "64fd0e08",
ACLReplicationToken: "5795983a",
},
ACLsEnabled: true,
ACLDatacenter: "ejtmd43d",
ACLDefaultPolicy: "72c2e7a0",
ACLDownPolicy: "03eb2aee",
ACLEnableKeyListPolicy: true,
ACLEnableTokenPersistence: true,
ACLMasterToken: "8a19ac27",
ACLReplicationToken: "5795983a",
ACLTokenTTL: 3321 * time.Second,
ACLPolicyTTL: 1123 * time.Second,
ACLRoleTTL: 9876 * time.Second,
ACLToken: "418fdff1",
ACLTokenReplication: true,
AdvertiseAddrLAN: ipAddr("17.99.29.16"),
AdvertiseAddrWAN: ipAddr("78.63.37.19"),
@ -6521,9 +6533,10 @@ func TestFullConfig(t *testing.T) {
"args": []interface{}{"dltjDJ2a", "flEa7C2d"},
},
},
EnterpriseRuntimeConfig: entFullRuntimeConfig,
}
entFullRuntimeConfig(&want)
warns := []string{
`The 'acl_datacenter' field is deprecated. Use the 'primary_datacenter' field instead.`,
`bootstrap_expect > 0: expecting 53 servers`,
@ -6840,21 +6853,25 @@ func TestSanitize(t *testing.T) {
}
rtJSON := `{
"ACLAgentMasterToken": "hidden",
"ACLAgentToken": "hidden",
"ACLTokens": {
` + entTokenConfigSanitize + `
"ACLAgentMasterToken": "hidden",
"ACLAgentToken": "hidden",
"ACLDefaultToken": "hidden",
"ACLReplicationToken": "hidden",
"DataDir": "",
"EnablePersistence": false
},
"ACLDatacenter": "",
"ACLDefaultPolicy": "",
"ACLDisabledTTL": "0s",
"ACLDownPolicy": "",
"ACLEnableKeyListPolicy": false,
"ACLEnableTokenPersistence": false,
"ACLMasterToken": "hidden",
"ACLPolicyTTL": "0s",
"ACLReplicationToken": "hidden",
"ACLRoleTTL": "0s",
"ACLTokenReplication": false,
"ACLTokenTTL": "0s",
"ACLToken": "hidden",
"ACLsEnabled": false,
"AEInterval": "0s",
"AdvertiseAddrLAN": "",

View File

@ -79,6 +79,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
d.RuntimeConfig = cfg
d.Tokens = new(token.Store)
// cache-types are not registered yet, but they won't be used until the components are started.
d.Cache = cache.New(cfg.Cache)
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)

192
agent/token/persistence.go Normal file
View File

@ -0,0 +1,192 @@
package token
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"github.com/hashicorp/consul/lib/file"
)
// Logger used by Store.Load to report warnings.
type Logger interface {
Warn(msg string, args ...interface{})
}
// Config used by Store.Load, which includes tokens and settings for persistence.
type Config struct {
EnablePersistence bool
DataDir string
ACLDefaultToken string
ACLAgentToken string
ACLAgentMasterToken string
ACLReplicationToken string
EnterpriseConfig
}
const tokensPath = "acl-tokens.json"
// Load tokens from Config and optionally from a persisted file in the cfg.DataDir.
// If a token exists in both the persisted file and in the Config a warning will
// be logged and the persisted token will be used.
//
// Failures to load the persisted file will result in loading tokens from the
// config before returning the error.
func (t *Store) Load(cfg Config, logger Logger) error {
t.persistenceLock.RLock()
if !cfg.EnablePersistence {
t.persistence = nil
t.persistenceLock.RUnlock()
loadTokens(t, cfg, persistedTokens{}, logger)
return nil
}
defer t.persistenceLock.RUnlock()
t.persistence = &fileStore{
filename: filepath.Join(cfg.DataDir, tokensPath),
logger: logger,
}
return t.persistence.load(t, cfg)
}
// WithPersistenceLock executes f while hold a lock. If f returns a nil error,
// the tokens in Store will be persisted to the tokens file. Otherwise no
// tokens will be persisted, and the error from f will be returned.
//
// The lock is held so that the writes are persisted before some other thread
// can change the value.
func (t *Store) WithPersistenceLock(f func() error) error {
t.persistenceLock.Lock()
if t.persistence == nil {
t.persistenceLock.Unlock()
return f()
}
defer t.persistenceLock.Unlock()
return t.persistence.withPersistenceLock(t, f)
}
type persistedTokens struct {
Replication string `json:"replication,omitempty"`
AgentMaster string `json:"agent_master,omitempty"`
Default string `json:"default,omitempty"`
Agent string `json:"agent,omitempty"`
}
type fileStore struct {
filename string
logger Logger
}
func (p *fileStore) load(s *Store, cfg Config) error {
tokens, err := readPersistedFromFile(p.filename)
if err != nil {
p.logger.Warn("unable to load persisted tokens", "error", err)
}
loadTokens(s, cfg, tokens, p.logger)
return err
}
func loadTokens(s *Store, cfg Config, tokens persistedTokens, logger Logger) {
if tokens.Default != "" {
s.UpdateUserToken(tokens.Default, TokenSourceAPI)
if cfg.ACLDefaultToken != "" {
logger.Warn("\"default\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
s.UpdateUserToken(cfg.ACLDefaultToken, TokenSourceConfig)
}
if tokens.Agent != "" {
s.UpdateAgentToken(tokens.Agent, TokenSourceAPI)
if cfg.ACLAgentToken != "" {
logger.Warn("\"agent\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
s.UpdateAgentToken(cfg.ACLAgentToken, TokenSourceConfig)
}
if tokens.AgentMaster != "" {
s.UpdateAgentMasterToken(tokens.AgentMaster, TokenSourceAPI)
if cfg.ACLAgentMasterToken != "" {
logger.Warn("\"agent_master\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
s.UpdateAgentMasterToken(cfg.ACLAgentMasterToken, TokenSourceConfig)
}
if tokens.Replication != "" {
s.UpdateReplicationToken(tokens.Replication, TokenSourceAPI)
if cfg.ACLReplicationToken != "" {
logger.Warn("\"replication\" token present in both the configuration and persisted token store, using the persisted token")
}
} else {
s.UpdateReplicationToken(cfg.ACLReplicationToken, TokenSourceConfig)
}
loadEnterpriseTokens(s, cfg)
}
func readPersistedFromFile(filename string) (persistedTokens, error) {
tokens := persistedTokens{}
buf, err := ioutil.ReadFile(filename)
switch {
case os.IsNotExist(err):
// non-existence is not an error we care about
return tokens, nil
case err != nil:
return tokens, fmt.Errorf("failed reading tokens file %q: %w", filename, err)
}
if err := json.Unmarshal(buf, &tokens); err != nil {
return tokens, fmt.Errorf("failed to decode tokens file %q: %w", filename, err)
}
return tokens, nil
}
func (p *fileStore) withPersistenceLock(s *Store, f func() error) error {
if err := f(); err != nil {
return err
}
return p.saveToFile(s)
}
func (p *fileStore) saveToFile(s *Store) error {
tokens := persistedTokens{}
if tok, source := s.UserTokenAndSource(); tok != "" && source == TokenSourceAPI {
tokens.Default = tok
}
if tok, source := s.AgentTokenAndSource(); tok != "" && source == TokenSourceAPI {
tokens.Agent = tok
}
if tok, source := s.AgentMasterTokenAndSource(); tok != "" && source == TokenSourceAPI {
tokens.AgentMaster = tok
}
if tok, source := s.ReplicationTokenAndSource(); tok != "" && source == TokenSourceAPI {
tokens.Replication = tok
}
data, err := json.Marshal(tokens)
if err != nil {
p.logger.Warn("failed to persist tokens", "error", err)
return fmt.Errorf("Failed to marshal tokens for persistence: %v", err)
}
if err := file.WriteAtomicWithPerms(p.filename, data, 0700, 0600); err != nil {
p.logger.Warn("failed to persist tokens", "error", err)
return fmt.Errorf("Failed to persist tokens - %v", err)
}
return nil
}

View File

@ -0,0 +1,213 @@
package token
import (
"io/ioutil"
"path/filepath"
"testing"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
func TestStore_Load(t *testing.T) {
dataDir := testutil.TempDir(t, "datadir")
tokenFile := filepath.Join(dataDir, tokensPath)
logger := hclog.New(nil)
store := new(Store)
t.Run("with empty store", func(t *testing.T) {
cfg := Config{
DataDir: dataDir,
ACLAgentToken: "alfa",
ACLAgentMasterToken: "bravo",
ACLDefaultToken: "charlie",
ACLReplicationToken: "delta",
}
require.NoError(t, store.Load(cfg, logger))
require.Equal(t, "alfa", store.AgentToken())
require.Equal(t, "bravo", store.AgentMasterToken())
require.Equal(t, "charlie", store.UserToken())
require.Equal(t, "delta", store.ReplicationToken())
})
t.Run("updated from Config", func(t *testing.T) {
cfg := Config{
DataDir: dataDir,
ACLDefaultToken: "echo",
ACLAgentToken: "foxtrot",
ACLAgentMasterToken: "golf",
ACLReplicationToken: "hotel",
}
// ensures no error for missing persisted tokens file
require.NoError(t, store.Load(cfg, logger))
require.Equal(t, "echo", store.UserToken())
require.Equal(t, "foxtrot", store.AgentToken())
require.Equal(t, "golf", store.AgentMasterToken())
require.Equal(t, "hotel", store.ReplicationToken())
})
t.Run("with persisted tokens", func(t *testing.T) {
cfg := Config{
DataDir: dataDir,
ACLDefaultToken: "echo",
ACLAgentToken: "foxtrot",
ACLAgentMasterToken: "golf",
ACLReplicationToken: "hotel",
}
tokens := `{
"agent" : "india",
"agent_master" : "juliett",
"default": "kilo",
"replication" : "lima"
}`
require.NoError(t, ioutil.WriteFile(tokenFile, []byte(tokens), 0600))
require.NoError(t, store.Load(cfg, logger))
// no updates since token persistence is not enabled
require.Equal(t, "echo", store.UserToken())
require.Equal(t, "foxtrot", store.AgentToken())
require.Equal(t, "golf", store.AgentMasterToken())
require.Equal(t, "hotel", store.ReplicationToken())
cfg.EnablePersistence = true
require.NoError(t, store.Load(cfg, logger))
require.Equal(t, "india", store.AgentToken())
require.Equal(t, "juliett", store.AgentMasterToken())
require.Equal(t, "kilo", store.UserToken())
require.Equal(t, "lima", store.ReplicationToken())
// check store persistence was enabled
require.NotNil(t, store.persistence)
})
t.Run("with persisted tokens, persisted tokens override config", func(t *testing.T) {
tokens := `{
"agent" : "mike",
"agent_master" : "november",
"default": "oscar",
"replication" : "papa"
}`
cfg := Config{
EnablePersistence: true,
DataDir: dataDir,
ACLDefaultToken: "quebec",
ACLAgentToken: "romeo",
ACLAgentMasterToken: "sierra",
ACLReplicationToken: "tango",
}
require.NoError(t, ioutil.WriteFile(tokenFile, []byte(tokens), 0600))
require.NoError(t, store.Load(cfg, logger))
require.Equal(t, "mike", store.AgentToken())
require.Equal(t, "november", store.AgentMasterToken())
require.Equal(t, "oscar", store.UserToken())
require.Equal(t, "papa", store.ReplicationToken())
})
t.Run("with some persisted tokens", func(t *testing.T) {
tokens := `{
"agent" : "uniform",
"agent_master" : "victor"
}`
cfg := Config{
EnablePersistence: true,
DataDir: dataDir,
ACLDefaultToken: "whiskey",
ACLAgentToken: "xray",
ACLAgentMasterToken: "yankee",
ACLReplicationToken: "zulu",
}
require.NoError(t, ioutil.WriteFile(tokenFile, []byte(tokens), 0600))
require.NoError(t, store.Load(cfg, logger))
require.Equal(t, "uniform", store.AgentToken())
require.Equal(t, "victor", store.AgentMasterToken())
require.Equal(t, "whiskey", store.UserToken())
require.Equal(t, "zulu", store.ReplicationToken())
})
t.Run("persisted file contains invalid data", func(t *testing.T) {
cfg := Config{
EnablePersistence: true,
DataDir: dataDir,
ACLDefaultToken: "one",
ACLAgentToken: "two",
ACLAgentMasterToken: "three",
ACLReplicationToken: "four",
}
require.NoError(t, ioutil.WriteFile(tokenFile, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, 0600))
err := store.Load(cfg, logger)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to decode tokens file")
require.Equal(t, "one", store.UserToken())
require.Equal(t, "two", store.AgentToken())
require.Equal(t, "three", store.AgentMasterToken())
require.Equal(t, "four", store.ReplicationToken())
})
t.Run("persisted file contains invalid json", func(t *testing.T) {
cfg := Config{
EnablePersistence: true,
DataDir: dataDir,
ACLDefaultToken: "alfa",
ACLAgentToken: "bravo",
ACLAgentMasterToken: "charlie",
ACLReplicationToken: "foxtrot",
}
require.NoError(t, ioutil.WriteFile(tokenFile, []byte("[1,2,3]"), 0600))
err := store.Load(cfg, logger)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to decode tokens file")
require.Equal(t, "alfa", store.UserToken())
require.Equal(t, "bravo", store.AgentToken())
require.Equal(t, "charlie", store.AgentMasterToken())
require.Equal(t, "foxtrot", store.ReplicationToken())
})
}
func TestStore_WithPersistenceLock(t *testing.T) {
dataDir := testutil.TempDir(t, "datadir")
store := new(Store)
cfg := Config{
EnablePersistence: true,
DataDir: dataDir,
ACLDefaultToken: "default-token",
ACLAgentToken: "agent-token",
ACLAgentMasterToken: "master-token",
ACLReplicationToken: "replication-token",
}
err := store.Load(cfg, hclog.New(nil))
require.NoError(t, err)
f := func() error {
updated := store.UpdateUserToken("the-new-token", TokenSourceAPI)
require.True(t, updated)
updated = store.UpdateAgentMasterToken("the-new-master-token", TokenSourceAPI)
require.True(t, updated)
return nil
}
err = store.WithPersistenceLock(f)
require.NoError(t, err)
tokens, err := readPersistedFromFile(filepath.Join(dataDir, tokensPath))
require.NoError(t, err)
expected := persistedTokens{
Default: "the-new-token",
AgentMaster: "the-new-master-token",
}
require.Equal(t, expected, tokens)
}

View File

@ -77,6 +77,12 @@ type Store struct {
watchers map[int]watcher
watcherIndex int
persistence *fileStore
// persistenceLock is used to synchronize access to the persisted token store
// within the data directory. This will prevent loading while writing as well as
// multiple concurrent writes.
persistenceLock sync.RWMutex
// enterpriseTokens contains tokens only used in consul-enterprise
enterpriseTokens
}
@ -158,7 +164,7 @@ func (t *Store) sendNotificationLocked(kinds ...TokenKind) {
// Returns true if it was changed.
func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.userToken != token || t.userTokenSource != source)
changed := t.userToken != token || t.userTokenSource != source
t.userToken = token
t.userTokenSource = source
if changed {
@ -172,7 +178,7 @@ func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
// Returns true if it was changed.
func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.agentToken != token || t.agentTokenSource != source)
changed := t.agentToken != token || t.agentTokenSource != source
t.agentToken = token
t.agentTokenSource = source
if changed {
@ -186,7 +192,7 @@ func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
// Returns true if it was changed.
func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source)
changed := t.agentMasterToken != token || t.agentMasterTokenSource != source
t.agentMasterToken = token
t.agentMasterTokenSource = source
if changed {
@ -200,7 +206,7 @@ func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool {
// Returns true if it was changed.
func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.replicationToken != token || t.replicationTokenSource != source)
changed := t.replicationToken != token || t.replicationTokenSource != source
t.replicationToken = token
t.replicationTokenSource = source
if changed {

View File

@ -2,11 +2,18 @@
package token
type EnterpriseConfig struct {
}
// Stub for enterpriseTokens
type enterpriseTokens struct {
}
// enterpriseAgentToken OSS stub
func (s *Store) enterpriseAgentToken() string {
func (t *Store) enterpriseAgentToken() string {
return ""
}
// loadEnterpriseTokens is a noop stub for the func defined agent_ent.go
func loadEnterpriseTokens(_ *Store, _ Config) {
}

View File

@ -7,8 +7,6 @@ import (
)
func TestStore_RegularTokens(t *testing.T) {
t.Parallel()
type tokens struct {
userSource TokenSource
user string
@ -89,13 +87,22 @@ func TestStore_RegularTokens(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
s := new(Store)
require.True(t, s.UpdateUserToken(tt.set.user, tt.set.userSource))
require.True(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource))
require.True(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource))
require.True(t, s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource))
if tt.set.user != "" {
require.True(t, s.UpdateUserToken(tt.set.user, tt.set.userSource))
}
if tt.set.agent != "" {
require.True(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource))
}
if tt.set.repl != "" {
require.True(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource))
}
if tt.set.master != "" {
require.True(t, s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource))
}
// If they don't change then they return false.
require.False(t, s.UpdateUserToken(tt.set.user, tt.set.userSource))
@ -128,7 +135,6 @@ func TestStore_RegularTokens(t *testing.T) {
}
func TestStore_AgentMasterToken(t *testing.T) {
t.Parallel()
s := new(Store)
verify := func(want bool, toks ...string) {
@ -152,7 +158,6 @@ func TestStore_AgentMasterToken(t *testing.T) {
}
func TestStore_Notify(t *testing.T) {
t.Parallel()
s := new(Store)
newNotification := func(t *testing.T, s *Store, kind TokenKind) Notifier {