From 85a08bf8ed2b5f005ca3ed2782b5b1893b881a04 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Tue, 10 Mar 2020 11:15:22 -0500 Subject: [PATCH] server: strip local ACL tokens from RPCs during forwarding if crossing datacenters (#7419) Fixes #7414 --- agent/agentpb/common.go | 8 +++ agent/consul/acl.go | 4 ++ agent/consul/helper_test.go | 3 + agent/consul/rpc.go | 18 +++++ agent/consul/rpc_test.go | 134 ++++++++++++++++++++++++++++++++++++ agent/structs/acl.go | 5 ++ agent/structs/structs.go | 9 +++ 7 files changed, 181 insertions(+) diff --git a/agent/agentpb/common.go b/agent/agentpb/common.go index b8ab3a846b..ff7dd955ce 100644 --- a/agent/agentpb/common.go +++ b/agent/agentpb/common.go @@ -18,6 +18,10 @@ func (q *QueryOptions) TokenSecret() string { return q.Token } +func (q *QueryOptions) SetTokenSecret(s string) { + q.Token = s +} + // SetToken is needed to implement the structs.QueryOptionsCompat interface func (q *QueryOptions) SetToken(token string) { q.Token = token @@ -102,6 +106,10 @@ func (w WriteRequest) TokenSecret() string { return w.Token } +func (w *WriteRequest) SetTokenSecret(s string) { + w.Token = s +} + // AllowStaleRead returns whether a stale read should be allowed func (w WriteRequest) AllowStaleRead() bool { return false diff --git a/agent/consul/acl.go b/agent/consul/acl.go index c823674c6c..9e256e95d1 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -103,6 +103,10 @@ func (id *missingIdentity) IsExpired(asOf time.Time) bool { return false } +func (id *missingIdentity) IsLocal() bool { + return false +} + func (id *missingIdentity) EnterpriseMetadata() *structs.EnterpriseMeta { return structs.DefaultEnterpriseMeta() } diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 6234702ff8..08955c86b9 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -161,6 +161,8 @@ func joinWAN(t *testing.T, member, leader *Server) { } func waitForNewACLs(t *testing.T, server *Server) { + t.Helper() + retry.Run(t, func(r *retry.R) { require.False(r, server.UseLegacyACLs(), "Server cannot use new ACLs") }) @@ -169,6 +171,7 @@ func waitForNewACLs(t *testing.T, server *Server) { } func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationType structs.ACLReplicationType, minPolicyIndex, minTokenIndex, minRoleIndex uint64) { + t.Helper() retry.Run(t, func(r *retry.R) { status := server.getACLReplicationStatus() require.Equal(r, expectedReplicationType, status.ReplicationType, "Server not running new replicator yet") diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 5958dfc5c2..a7a40442e5 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -12,6 +12,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/wanfed" "github.com/hashicorp/consul/agent/metadata" @@ -470,6 +471,23 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, // Handle DC forwarding dc := info.RequestDatacenter() if dc != s.config.Datacenter { + // Local tokens only work within the current datacenter. Check to see + // if we are attempting to forward one to a remote datacenter and strip + // it, falling back on the anonymous token on the other end. + if token := info.TokenSecret(); token != "" { + done, ident, err := s.ResolveIdentityFromToken(token) + if done { + if err != nil && !acl.IsErrNotFound(err) { + return false, err + } + if ident != nil && ident.IsLocal() { + // Strip it from the request. + info.SetTokenSecret("") + defer info.SetTokenSecret(token) + } + } + } + err := s.forwardDC(method, dc, args, reply) return true, err } diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 184e04395d..fdd37705c9 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -11,9 +11,12 @@ import ( "testing" "time" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/structs" + tokenStore "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/go-memdb" @@ -732,3 +735,134 @@ func TestRPC_readUint32(t *testing.T) { }) } } + +func TestRPC_LocalTokenStrippedOnForward(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLDefaultPolicy = "deny" + c.ACLMasterToken = "root" + c.ACLEnforceVersion8 = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLDefaultPolicy = "deny" + c.ACLTokenReplication = true + c.ACLEnforceVersion8 = true + c.ACLReplicationRate = 100 + c.ACLReplicationBurst = 100 + c.ACLReplicationApplyLimit = 1000000 + }) + s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + defer os.RemoveAll(dir2) + defer s2.Shutdown() + codec2 := rpcClient(t, s2) + defer codec2.Close() + + // Try to join. + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s1.RPC, "dc2") + + // Wait for legacy acls to be disabled so we are clear that + // legacy replication isn't meddling. + waitForNewACLs(t, s1) + waitForNewACLs(t, s2) + waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) + + // create simple kv policy + kvPolicy, err := upsertTestPolicyWithRules(codec, "root", "dc1", ` + key_prefix "" { policy = "write" } + `) + require.NoError(t, err) + + // Wait for it to replicate + retry.Run(t, func(r *retry.R) { + _, p, err := s2.fsm.State().ACLPolicyGetByID(nil, kvPolicy.ID, &structs.EnterpriseMeta{}) + require.Nil(r, err) + require.NotNil(r, p) + }) + + // create local token that only works in DC2 + localToken2, err := upsertTestToken(codec, "root", "dc2", func(token *structs.ACLToken) { + token.Local = true + token.Policies = []structs.ACLTokenPolicyLink{ + {ID: kvPolicy.ID}, + } + }) + require.NoError(t, err) + + // Try to use it locally (it should work) + arg := structs.KVSRequest{ + Datacenter: "dc2", + Op: api.KVSet, + DirEnt: structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + }, + WriteRequest: structs.WriteRequest{Token: localToken2.SecretID}, + } + var out bool + err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out) + require.NoError(t, err) + require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped") + + // Try to use it remotely + arg = structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVSet, + DirEnt: structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + }, + WriteRequest: structs.WriteRequest{Token: localToken2.SecretID}, + } + err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // Update the anon token to also be able to write to kv + { + tokenUpsertReq := structs.ACLTokenSetRequest{ + Datacenter: "dc1", + ACLToken: structs.ACLToken{ + AccessorID: structs.ACLTokenAnonymousID, + Policies: []structs.ACLTokenPolicyLink{ + structs.ACLTokenPolicyLink{ + ID: kvPolicy.ID, + }, + }, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + token := structs.ACLToken{} + err = msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &tokenUpsertReq, &token) + require.NoError(t, err) + require.NotEmpty(t, token.SecretID) + } + + // Try to use it remotely again, but this time it should fallback to anon + arg = structs.KVSRequest{ + Datacenter: "dc1", + Op: api.KVSet, + DirEnt: structs.DirEntry{ + Key: "foo", + Value: []byte("bar"), + }, + WriteRequest: structs.WriteRequest{Token: localToken2.SecretID}, + } + err = msgpackrpc.CallWithCodec(codec2, "KVS.Apply", &arg, &out) + require.NoError(t, err) + require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped") +} diff --git a/agent/structs/acl.go b/agent/structs/acl.go index 353ce59966..793125c0c1 100644 --- a/agent/structs/acl.go +++ b/agent/structs/acl.go @@ -119,6 +119,7 @@ type ACLIdentity interface { EmbeddedPolicy() *ACLPolicy ServiceIdentityList() []*ACLServiceIdentity IsExpired(asOf time.Time) bool + IsLocal() bool EnterpriseMetadata() *EnterpriseMeta } @@ -369,6 +370,10 @@ func (t *ACLToken) IsExpired(asOf time.Time) bool { return t.ExpirationTime.Before(asOf) } +func (t *ACLToken) IsLocal() bool { + return t.Local +} + func (t *ACLToken) HasExpirationTime() bool { return t.ExpirationTime != nil && !t.ExpirationTime.IsZero() } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index f1872b6527..92ea6fde66 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -148,6 +148,7 @@ type RPCInfo interface { IsRead() bool AllowStaleRead() bool TokenSecret() string + SetTokenSecret(string) } // QueryOptions is used to specify various flags for read queries @@ -237,6 +238,10 @@ func (q QueryOptions) TokenSecret() string { return q.Token } +func (q *QueryOptions) SetTokenSecret(s string) { + q.Token = s +} + type WriteRequest struct { // Token is the ACL token ID. If not provided, the 'anonymous' // token is assumed for backwards compatibility. @@ -256,6 +261,10 @@ func (w WriteRequest) TokenSecret() string { return w.Token } +func (w *WriteRequest) SetTokenSecret(s string) { + w.Token = s +} + // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct {