diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 82fff12cd6..f1d85e2628 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -3,14 +3,15 @@ package agent import ( "encoding/json" "fmt" - "github.com/hashicorp/go-memdb" - "github.com/mitchellh/hashstructure" "log" "net/http" "path/filepath" "strconv" "strings" + "github.com/hashicorp/go-memdb" + "github.com/mitchellh/hashstructure" + "github.com/hashicorp/consul/acl" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/debug" @@ -1188,12 +1189,19 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in // Figure out the target token. target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/") + triggerAntiEntropySync := false switch target { case "acl_token", "default": - s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI) + changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI) + if changed { + triggerAntiEntropySync = true + } case "acl_agent_token", "agent": - s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI) + changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI) + if changed { + triggerAntiEntropySync = true + } case "acl_agent_master_token", "agent_master": s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI) @@ -1207,6 +1215,10 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in return nil, nil } + if triggerAntiEntropySync { + s.agent.sync.SyncFull.Trigger() + } + if s.agent.config.ACLEnableTokenPersistence { tokens := persistedTokens{} diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 1de613085e..39a6cb44ca 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/debug" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/token" tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" @@ -4153,6 +4154,113 @@ func TestAgent_Monitor_ACLDeny(t *testing.T) { // here. } +func TestAgent_TokenTriggersFullSync(t *testing.T) { + t.Parallel() + + body := func(token string) io.Reader { + return jsonReader(&api.AgentToken{Token: token}) + } + + createNodePolicy := func(t *testing.T, a *TestAgent, policyName string) *structs.ACLPolicy { + policy := &structs.ACLPolicy{ + Name: policyName, + Rules: `node_prefix "" { policy = "write" }`, + } + + req, err := http.NewRequest("PUT", "/v1/acl/policy?token=root", jsonBody(policy)) + require.NoError(t, err) + + resp := httptest.NewRecorder() + obj, err := a.srv.ACLPolicyCreate(resp, req) + require.NoError(t, err) + + policy, ok := obj.(*structs.ACLPolicy) + require.True(t, ok) + return policy + } + + createNodeToken := func(t *testing.T, a *TestAgent, policyName string) *structs.ACLToken { + createNodePolicy(t, a, policyName) + + token := &structs.ACLToken{ + Description: "test", + Policies: []structs.ACLTokenPolicyLink{ + structs.ACLTokenPolicyLink{Name: policyName}, + }, + } + + req, err := http.NewRequest("PUT", "/v1/acl/token?token=root", jsonBody(token)) + require.NoError(t, err) + + resp := httptest.NewRecorder() + obj, err := a.srv.ACLTokenCreate(resp, req) + require.NoError(t, err) + + token, ok := obj.(*structs.ACLToken) + require.True(t, ok) + return token + } + + cases := []struct { + path string + tokenGetFn func(*token.Store) string + }{ + { + path: "acl_agent_token", + tokenGetFn: (*token.Store).AgentToken, + }, + { + path: "agent", + tokenGetFn: (*token.Store).AgentToken, + }, + { + path: "acl_token", + tokenGetFn: (*token.Store).UserToken, + }, + { + path: "default", + tokenGetFn: (*token.Store).UserToken, + }, + } + + for _, tt := range cases { + tt := tt + t.Run(tt.path, func(t *testing.T) { + url := fmt.Sprintf("/v1/agent/token/%s?token=root", tt.path) + + a := NewTestAgent(t, t.Name(), TestACLConfig()+` + acl { + tokens { + default = "" + agent = "" + agent_master = "" + replication = "" + } + } + `) + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // create node policy and token + token := createNodeToken(t, a, "test") + + req, err := http.NewRequest("PUT", url, body(token.SecretID)) + require.NoError(t, err) + + resp := httptest.NewRecorder() + _, err = a.srv.AgentToken(resp, req) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, resp.Code) + require.Equal(t, token.SecretID, tt.tokenGetFn(a.tokens)) + + testrpc.WaitForTestAgent(t, a.RPC, "dc1", + testrpc.WithToken("root"), + testrpc.WaitForAntiEntropySync()) + }) + } +} + func TestAgent_Token(t *testing.T) { t.Parallel() diff --git a/agent/token/store.go b/agent/token/store.go index 3c62906264..e450a028ef 100644 --- a/agent/token/store.go +++ b/agent/token/store.go @@ -52,35 +52,47 @@ type Store struct { } // UpdateUserToken replaces the current user token in the store. -func (t *Store) UpdateUserToken(token string, source TokenSource) { +// Returns true if it was changed. +func (t *Store) UpdateUserToken(token string, source TokenSource) bool { t.l.Lock() + changed := (t.userToken != token || t.userTokenSource != source) t.userToken = token t.userTokenSource = source t.l.Unlock() + return changed } // UpdateAgentToken replaces the current agent token in the store. -func (t *Store) UpdateAgentToken(token string, source TokenSource) { +// Returns true if it was changed. +func (t *Store) UpdateAgentToken(token string, source TokenSource) bool { t.l.Lock() + changed := (t.agentToken != token || t.agentTokenSource != source) t.agentToken = token t.agentTokenSource = source t.l.Unlock() + return changed } // UpdateAgentMasterToken replaces the current agent master token in the store. -func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) { +// Returns true if it was changed. +func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool { t.l.Lock() + changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source) t.agentMasterToken = token t.agentMasterTokenSource = source t.l.Unlock() + return changed } // UpdateReplicationToken replaces the current replication token in the store. -func (t *Store) UpdateReplicationToken(token string, source TokenSource) { +// Returns true if it was changed. +func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool { t.l.Lock() + changed := (t.replicationToken != token || t.replicationTokenSource != source) t.replicationToken = token t.replicationTokenSource = source t.l.Unlock() + return changed } // UserToken returns the best token to use for user operations. diff --git a/agent/token/store_test.go b/agent/token/store_test.go index 18beb4597f..7470b04467 100644 --- a/agent/token/store_test.go +++ b/agent/token/store_test.go @@ -92,10 +92,16 @@ func TestStore_RegularTokens(t *testing.T) { t.Parallel() s := new(Store) - s.UpdateUserToken(tt.set.user, tt.set.userSource) - s.UpdateAgentToken(tt.set.agent, tt.set.agentSource) - s.UpdateReplicationToken(tt.set.repl, tt.set.replSource) - s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource) + require.True(t, s.UpdateUserToken(tt.set.user, tt.set.userSource)) + require.True(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource)) + require.True(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource)) + require.True(t, s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource)) + + // If they don't change then they return false. + require.False(t, s.UpdateUserToken(tt.set.user, tt.set.userSource)) + require.False(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource)) + require.False(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource)) + require.False(t, s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource)) require.Equal(t, tt.effective.user, s.UserToken()) require.Equal(t, tt.effective.agent, s.AgentToken()) diff --git a/testrpc/wait.go b/testrpc/wait.go index f8a31cb476..496fc43523 100644 --- a/testrpc/wait.go +++ b/testrpc/wait.go @@ -41,24 +41,34 @@ func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string) { } type waitOption struct { - Token string + Token string + WaitForAntiEntropySync bool } func WithToken(token string) waitOption { return waitOption{Token: token} } +func WaitForAntiEntropySync() waitOption { + return waitOption{WaitForAntiEntropySync: true} +} + // WaitForTestAgent ensures we have a node with serfHealth check registered func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption) { var nodes structs.IndexedNodes var checks structs.IndexedHealthChecks - // first extra arg is an optional acl token - var token string + var ( + token string + waitForAntiEntropySync bool + ) for _, opt := range options { if opt.Token != "" { token = opt.Token } + if opt.WaitForAntiEntropySync { + waitForAntiEntropySync = true + } } retry.Run(t, func(r *retry.R) { @@ -73,6 +83,12 @@ func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption) r.Fatalf("No registered nodes") } + if waitForAntiEntropySync { + if len(nodes.Nodes[0].TaggedAddresses) == 0 { + r.Fatalf("Not synced via anti entropy yet") + } + } + // This assumes that there is a single agent per dc, typically a TestAgent nodeReq := &structs.NodeSpecificRequest{ Datacenter: dc,