connect: ensure intention replication continues to work when the replication ACL token changes (#6288)

This commit is contained in:
R.B. Boyer 2019-08-07 11:34:09 -05:00 committed by GitHub
parent 913784e1bf
commit 9bbbea1777
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 11 deletions

View File

@ -5314,6 +5314,18 @@ func retrieveTestToken(codec rpc.ClientCodec, masterToken string, datacenter str
return &out, nil return &out, nil
} }
func deleteTestToken(codec rpc.ClientCodec, masterToken string, datacenter string, tokenAccessor string) error {
arg := structs.ACLTokenDeleteRequest{
Datacenter: datacenter,
TokenID: tokenAccessor,
WriteRequest: structs.WriteRequest{Token: masterToken},
}
var ignored string
err := msgpackrpc.CallWithCodec(codec, "ACL.TokenDelete", &arg, &ignored)
return err
}
func deleteTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string, policyID string) error { func deleteTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string, policyID string) error {
arg := structs.ACLPolicyDeleteRequest{ arg := structs.ACLPolicyDeleteRequest{
Datacenter: datacenter, Datacenter: datacenter,

View File

@ -575,12 +575,14 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
func (s *Server) replicateIntentions(stopCh <-chan struct{}) { func (s *Server) replicateIntentions(stopCh <-chan struct{}) {
args := structs.DCSpecificRequest{ args := structs.DCSpecificRequest{
Datacenter: s.config.PrimaryDatacenter, Datacenter: s.config.PrimaryDatacenter,
QueryOptions: structs.QueryOptions{Token: s.tokens.ReplicationToken()},
} }
s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter) s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter)
retryLoopBackoff(stopCh, func() error { retryLoopBackoff(stopCh, func() error {
// Always use the latest replication token value in case it changed while looping.
args.QueryOptions.Token = s.tokens.ReplicationToken()
var remote structs.IndexedIntentions var remote structs.IndexedIntentions
if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil { if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil {
return err return err

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
uuid "github.com/hashicorp/go-uuid" uuid "github.com/hashicorp/go-uuid"
@ -463,20 +464,47 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
require := require.New(t) require := require.New(t)
dir1, s1 := testServer(t) dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.PrimaryDatacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")
s1.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig)
// create some tokens
replToken1, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `acl = "read"`)
require.NoError(err)
replToken2, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `acl = "read"`)
require.NoError(err)
// dc2 as a secondary DC // dc2 as a secondary DC
dir2, s2 := testServerWithConfig(t, func(c *Config) { dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2" c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1" c.PrimaryDatacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLDefaultPolicy = "deny"
c.ACLTokenReplication = false
}) })
defer os.RemoveAll(dir2) defer os.RemoveAll(dir2)
defer s2.Shutdown() defer s2.Shutdown()
s2.tokens.UpdateAgentToken("root", tokenStore.TokenSourceConfig)
// start out with one token
s2.tokens.UpdateReplicationToken(replToken1.SecretID, tokenStore.TokenSourceConfig)
// Create the WAN link // Create the WAN link
joinWAN(t, s2, s1) joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s2.RPC, "dc2") testrpc.WaitForLeader(t, s2.RPC, "dc2")
@ -484,6 +512,7 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
// Create an intention in dc1 // Create an intention in dc1
ixn := structs.IntentionRequest{ ixn := structs.IntentionRequest{
Datacenter: "dc1", Datacenter: "dc1",
WriteRequest: structs.WriteRequest{Token: "root"},
Op: structs.IntentionOpCreate, Op: structs.IntentionOpCreate,
Intention: &structs.Intention{ Intention: &structs.Intention{
SourceNS: structs.IntentionDefaultNamespace, SourceNS: structs.IntentionDefaultNamespace,
@ -505,6 +534,7 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{ req := &structs.IntentionQueryRequest{
Datacenter: "dc2", Datacenter: "dc2",
QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID, IntentionID: ixn.Intention.ID,
} }
var resp structs.IndexedIntentions var resp structs.IndexedIntentions
@ -519,6 +549,12 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
// Sleep a bit so that the UpdatedAt field will definitely be different // Sleep a bit so that the UpdatedAt field will definitely be different
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
// delete underlying acl token being used for replication
require.NoError(deleteTestToken(codec, "root", "dc1", replToken1.AccessorID))
// switch to the other token
s2.tokens.UpdateReplicationToken(replToken2.SecretID, tokenStore.TokenSourceConfig)
// Update the intention in dc1 // Update the intention in dc1
ixn.Op = structs.IntentionOpUpdate ixn.Op = structs.IntentionOpUpdate
ixn.Intention.ID = reply ixn.Intention.ID = reply
@ -531,6 +567,7 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{ req := &structs.IntentionQueryRequest{
Datacenter: "dc2", Datacenter: "dc2",
QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID, IntentionID: ixn.Intention.ID,
} }
r.Check(s2.RPC("Intention.Get", req, &resp)) r.Check(s2.RPC("Intention.Get", req, &resp))
@ -560,6 +597,7 @@ func TestLeader_ReplicateIntentions(t *testing.T) {
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
req := &structs.IntentionQueryRequest{ req := &structs.IntentionQueryRequest{
Datacenter: "dc2", Datacenter: "dc2",
QueryOptions: structs.QueryOptions{Token: "root"},
IntentionID: ixn.Intention.ID, IntentionID: ixn.Intention.ID,
} }
var resp structs.IndexedIntentions var resp structs.IndexedIntentions