mirror of https://github.com/status-im/consul.git
agent: updates to the agent token trigger anti-entropy full syncs (#6577)
This commit is contained in:
parent
d65bbbfd4e
commit
06ca8cd2d7
|
@ -3,14 +3,15 @@ package agent
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
"log"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/debug"
|
||||
|
@ -1188,12 +1189,19 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in
|
|||
|
||||
// Figure out the target token.
|
||||
target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/")
|
||||
triggerAntiEntropySync := false
|
||||
switch target {
|
||||
case "acl_token", "default":
|
||||
s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
|
||||
changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
|
||||
if changed {
|
||||
triggerAntiEntropySync = true
|
||||
}
|
||||
|
||||
case "acl_agent_token", "agent":
|
||||
s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
|
||||
changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
|
||||
if changed {
|
||||
triggerAntiEntropySync = true
|
||||
}
|
||||
|
||||
case "acl_agent_master_token", "agent_master":
|
||||
s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI)
|
||||
|
@ -1207,6 +1215,10 @@ func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (in
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
if triggerAntiEntropySync {
|
||||
s.agent.sync.SyncFull.Trigger()
|
||||
}
|
||||
|
||||
if s.agent.config.ACLEnableTokenPersistence {
|
||||
tokens := persistedTokens{}
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/debug"
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
tokenStore "github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
|
@ -4153,6 +4154,113 @@ func TestAgent_Monitor_ACLDeny(t *testing.T) {
|
|||
// here.
|
||||
}
|
||||
|
||||
func TestAgent_TokenTriggersFullSync(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
body := func(token string) io.Reader {
|
||||
return jsonReader(&api.AgentToken{Token: token})
|
||||
}
|
||||
|
||||
createNodePolicy := func(t *testing.T, a *TestAgent, policyName string) *structs.ACLPolicy {
|
||||
policy := &structs.ACLPolicy{
|
||||
Name: policyName,
|
||||
Rules: `node_prefix "" { policy = "write" }`,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("PUT", "/v1/acl/policy?token=root", jsonBody(policy))
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.ACLPolicyCreate(resp, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
policy, ok := obj.(*structs.ACLPolicy)
|
||||
require.True(t, ok)
|
||||
return policy
|
||||
}
|
||||
|
||||
createNodeToken := func(t *testing.T, a *TestAgent, policyName string) *structs.ACLToken {
|
||||
createNodePolicy(t, a, policyName)
|
||||
|
||||
token := &structs.ACLToken{
|
||||
Description: "test",
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{Name: policyName},
|
||||
},
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("PUT", "/v1/acl/token?token=root", jsonBody(token))
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.ACLTokenCreate(resp, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
token, ok := obj.(*structs.ACLToken)
|
||||
require.True(t, ok)
|
||||
return token
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
path string
|
||||
tokenGetFn func(*token.Store) string
|
||||
}{
|
||||
{
|
||||
path: "acl_agent_token",
|
||||
tokenGetFn: (*token.Store).AgentToken,
|
||||
},
|
||||
{
|
||||
path: "agent",
|
||||
tokenGetFn: (*token.Store).AgentToken,
|
||||
},
|
||||
{
|
||||
path: "acl_token",
|
||||
tokenGetFn: (*token.Store).UserToken,
|
||||
},
|
||||
{
|
||||
path: "default",
|
||||
tokenGetFn: (*token.Store).UserToken,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range cases {
|
||||
tt := tt
|
||||
t.Run(tt.path, func(t *testing.T) {
|
||||
url := fmt.Sprintf("/v1/agent/token/%s?token=root", tt.path)
|
||||
|
||||
a := NewTestAgent(t, t.Name(), TestACLConfig()+`
|
||||
acl {
|
||||
tokens {
|
||||
default = ""
|
||||
agent = ""
|
||||
agent_master = ""
|
||||
replication = ""
|
||||
}
|
||||
}
|
||||
`)
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||
|
||||
// create node policy and token
|
||||
token := createNodeToken(t, a, "test")
|
||||
|
||||
req, err := http.NewRequest("PUT", url, body(token.SecretID))
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
_, err = a.srv.AgentToken(resp, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, http.StatusOK, resp.Code)
|
||||
require.Equal(t, token.SecretID, tt.tokenGetFn(a.tokens))
|
||||
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1",
|
||||
testrpc.WithToken("root"),
|
||||
testrpc.WaitForAntiEntropySync())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_Token(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -52,35 +52,47 @@ type Store struct {
|
|||
}
|
||||
|
||||
// UpdateUserToken replaces the current user token in the store.
|
||||
func (t *Store) UpdateUserToken(token string, source TokenSource) {
|
||||
// Returns true if it was changed.
|
||||
func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
|
||||
t.l.Lock()
|
||||
changed := (t.userToken != token || t.userTokenSource != source)
|
||||
t.userToken = token
|
||||
t.userTokenSource = source
|
||||
t.l.Unlock()
|
||||
return changed
|
||||
}
|
||||
|
||||
// UpdateAgentToken replaces the current agent token in the store.
|
||||
func (t *Store) UpdateAgentToken(token string, source TokenSource) {
|
||||
// Returns true if it was changed.
|
||||
func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
|
||||
t.l.Lock()
|
||||
changed := (t.agentToken != token || t.agentTokenSource != source)
|
||||
t.agentToken = token
|
||||
t.agentTokenSource = source
|
||||
t.l.Unlock()
|
||||
return changed
|
||||
}
|
||||
|
||||
// UpdateAgentMasterToken replaces the current agent master token in the store.
|
||||
func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) {
|
||||
// Returns true if it was changed.
|
||||
func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool {
|
||||
t.l.Lock()
|
||||
changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source)
|
||||
t.agentMasterToken = token
|
||||
t.agentMasterTokenSource = source
|
||||
t.l.Unlock()
|
||||
return changed
|
||||
}
|
||||
|
||||
// UpdateReplicationToken replaces the current replication token in the store.
|
||||
func (t *Store) UpdateReplicationToken(token string, source TokenSource) {
|
||||
// Returns true if it was changed.
|
||||
func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
|
||||
t.l.Lock()
|
||||
changed := (t.replicationToken != token || t.replicationTokenSource != source)
|
||||
t.replicationToken = token
|
||||
t.replicationTokenSource = source
|
||||
t.l.Unlock()
|
||||
return changed
|
||||
}
|
||||
|
||||
// UserToken returns the best token to use for user operations.
|
||||
|
|
|
@ -92,10 +92,16 @@ func TestStore_RegularTokens(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
s := new(Store)
|
||||
s.UpdateUserToken(tt.set.user, tt.set.userSource)
|
||||
s.UpdateAgentToken(tt.set.agent, tt.set.agentSource)
|
||||
s.UpdateReplicationToken(tt.set.repl, tt.set.replSource)
|
||||
s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource)
|
||||
require.True(t, s.UpdateUserToken(tt.set.user, tt.set.userSource))
|
||||
require.True(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource))
|
||||
require.True(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource))
|
||||
require.True(t, s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource))
|
||||
|
||||
// If they don't change then they return false.
|
||||
require.False(t, s.UpdateUserToken(tt.set.user, tt.set.userSource))
|
||||
require.False(t, s.UpdateAgentToken(tt.set.agent, tt.set.agentSource))
|
||||
require.False(t, s.UpdateReplicationToken(tt.set.repl, tt.set.replSource))
|
||||
require.False(t, s.UpdateAgentMasterToken(tt.set.master, tt.set.masterSource))
|
||||
|
||||
require.Equal(t, tt.effective.user, s.UserToken())
|
||||
require.Equal(t, tt.effective.agent, s.AgentToken())
|
||||
|
|
|
@ -41,24 +41,34 @@ func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string) {
|
|||
}
|
||||
|
||||
type waitOption struct {
|
||||
Token string
|
||||
Token string
|
||||
WaitForAntiEntropySync bool
|
||||
}
|
||||
|
||||
func WithToken(token string) waitOption {
|
||||
return waitOption{Token: token}
|
||||
}
|
||||
|
||||
func WaitForAntiEntropySync() waitOption {
|
||||
return waitOption{WaitForAntiEntropySync: true}
|
||||
}
|
||||
|
||||
// WaitForTestAgent ensures we have a node with serfHealth check registered
|
||||
func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption) {
|
||||
var nodes structs.IndexedNodes
|
||||
var checks structs.IndexedHealthChecks
|
||||
|
||||
// first extra arg is an optional acl token
|
||||
var token string
|
||||
var (
|
||||
token string
|
||||
waitForAntiEntropySync bool
|
||||
)
|
||||
for _, opt := range options {
|
||||
if opt.Token != "" {
|
||||
token = opt.Token
|
||||
}
|
||||
if opt.WaitForAntiEntropySync {
|
||||
waitForAntiEntropySync = true
|
||||
}
|
||||
}
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -73,6 +83,12 @@ func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption)
|
|||
r.Fatalf("No registered nodes")
|
||||
}
|
||||
|
||||
if waitForAntiEntropySync {
|
||||
if len(nodes.Nodes[0].TaggedAddresses) == 0 {
|
||||
r.Fatalf("Not synced via anti entropy yet")
|
||||
}
|
||||
}
|
||||
|
||||
// This assumes that there is a single agent per dc, typically a TestAgent
|
||||
nodeReq := &structs.NodeSpecificRequest{
|
||||
Datacenter: dc,
|
||||
|
|
Loading…
Reference in New Issue