mirror of https://github.com/status-im/consul.git
Merge pull request #5237 from hashicorp/term-grpc-stream-on-token-failure
Check ACLs more often for xDS endpoints.
This commit is contained in:
commit
db8a871309
|
@ -522,6 +522,8 @@ func (a *Agent) listenAndServeGRPC() error {
|
||||||
Authz: a,
|
Authz: a,
|
||||||
ResolveToken: a.resolveToken,
|
ResolveToken: a.resolveToken,
|
||||||
}
|
}
|
||||||
|
a.xdsServer.Initialize()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
|
a.grpcServer, err = a.xdsServer.GRPCServer(a.config.CertFile, a.config.KeyFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
@ -56,6 +57,10 @@ const (
|
||||||
// LocalAgentClusterName is the name we give the local agent "cluster" in
|
// LocalAgentClusterName is the name we give the local agent "cluster" in
|
||||||
// Envoy config.
|
// Envoy config.
|
||||||
LocalAgentClusterName = "local_agent"
|
LocalAgentClusterName = "local_agent"
|
||||||
|
|
||||||
|
// DefaultAuthCheckFrequency is the default value for
|
||||||
|
// Server.AuthCheckFrequency to use when the zero value is provided.
|
||||||
|
DefaultAuthCheckFrequency = 5 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
|
// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
|
||||||
|
@ -90,6 +95,18 @@ type Server struct {
|
||||||
CfgMgr ConfigManager
|
CfgMgr ConfigManager
|
||||||
Authz ConnectAuthz
|
Authz ConnectAuthz
|
||||||
ResolveToken ACLResolverFunc
|
ResolveToken ACLResolverFunc
|
||||||
|
// AuthCheckFrequency is how often we should re-check the credentials used
|
||||||
|
// during a long-lived gRPC Stream after it has been initially established.
|
||||||
|
// This is only used during idle periods of stream interactions (i.e. when
|
||||||
|
// there has been no recent DiscoveryRequest).
|
||||||
|
AuthCheckFrequency time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize will finish configuring the Server for first use.
|
||||||
|
func (s *Server) Initialize() {
|
||||||
|
if s.AuthCheckFrequency == 0 {
|
||||||
|
s.AuthCheckFrequency = DefaultAuthCheckFrequency
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// StreamAggregatedResources implements
|
// StreamAggregatedResources implements
|
||||||
|
@ -126,7 +143,7 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {
|
||||||
|
|
||||||
const (
|
const (
|
||||||
stateInit int = iota
|
stateInit int = iota
|
||||||
statePendingAuth
|
statePendingInitialConfig
|
||||||
stateRunning
|
stateRunning
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -176,8 +193,44 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var authTimer <-chan time.Time
|
||||||
|
extendAuthTimer := func() {
|
||||||
|
authTimer = time.After(s.AuthCheckFrequency)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
|
||||||
|
if cfgSnap == nil {
|
||||||
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
|
||||||
|
}
|
||||||
|
|
||||||
|
token := tokenFromStream(stream)
|
||||||
|
rule, err := s.ResolveToken(token)
|
||||||
|
|
||||||
|
if acl.IsErrNotFound(err) {
|
||||||
|
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
|
||||||
|
} else if acl.IsErrPermissionDenied(err) {
|
||||||
|
return status.Errorf(codes.PermissionDenied, "permission denied: %v", err)
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
|
||||||
|
return status.Errorf(codes.PermissionDenied, "permission denied")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Authed OK!
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-authTimer:
|
||||||
|
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
|
||||||
|
if err := checkStreamACLs(cfgSnap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
extendAuthTimer()
|
||||||
|
|
||||||
case req, ok = <-reqCh:
|
case req, ok = <-reqCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
// reqCh is closed when stream.Recv errors which is how we detect client
|
// reqCh is closed when stream.Recv errors which is how we detect client
|
||||||
|
@ -218,27 +271,27 @@ func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest)
|
||||||
defer watchCancel()
|
defer watchCancel()
|
||||||
|
|
||||||
// Now wait for the config so we can check ACL
|
// Now wait for the config so we can check ACL
|
||||||
state = statePendingAuth
|
state = statePendingInitialConfig
|
||||||
case statePendingAuth:
|
case statePendingInitialConfig:
|
||||||
if cfgSnap == nil {
|
if cfgSnap == nil {
|
||||||
// Nothing we can do until we get the initial config
|
// Nothing we can do until we get the initial config
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Got config, try to authenticate
|
|
||||||
token := tokenFromStream(stream)
|
// Got config, try to authenticate next.
|
||||||
rule, err := s.ResolveToken(token)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) {
|
|
||||||
return status.Errorf(codes.PermissionDenied, "permission denied")
|
|
||||||
}
|
|
||||||
// Authed OK!
|
|
||||||
state = stateRunning
|
state = stateRunning
|
||||||
|
|
||||||
// Lets actually process the config we just got or we'll mis responding
|
// Lets actually process the config we just got or we'll mis responding
|
||||||
fallthrough
|
fallthrough
|
||||||
case stateRunning:
|
case stateRunning:
|
||||||
|
// Check ACLs on every Discovery{Request,Response}.
|
||||||
|
if err := checkStreamACLs(cfgSnap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// For the first time through the state machine, this is when the
|
||||||
|
// timer is first started.
|
||||||
|
extendAuthTimer()
|
||||||
|
|
||||||
// See if any handlers need to have the current (possibly new) config
|
// See if any handlers need to have the current (possibly new) config
|
||||||
// sent. Note the order here is actually significant so we can't just
|
// sent. Note the order here is actually significant so we can't just
|
||||||
// range the map which has no determined order. It's important because:
|
// range the map which has no determined order. It's important because:
|
||||||
|
@ -364,12 +417,12 @@ func (s *Server) Check(ctx context.Context, r *envoyauthz.CheckRequest) (*envoya
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Treat this as an auth error since Envoy has sent something it considers
|
// Treat this as an auth error since Envoy has sent something it considers
|
||||||
// valid, it's just not an identity we trust.
|
// valid, it's just not an identity we trust.
|
||||||
return deniedResponse("Destination Principal is not a valid Connect identitiy")
|
return deniedResponse("Destination Principal is not a valid Connect identity")
|
||||||
}
|
}
|
||||||
|
|
||||||
destID, ok := dest.(*connect.SpiffeIDService)
|
destID, ok := dest.(*connect.SpiffeIDService)
|
||||||
if !ok {
|
if !ok {
|
||||||
return deniedResponse("Destination Principal is not a valid Service identitiy")
|
return deniedResponse("Destination Principal is not a valid Service identity")
|
||||||
}
|
}
|
||||||
|
|
||||||
// For now we don't validate the trust domain of the _destination_ at all -
|
// For now we don't validate the trust domain of the _destination_ at all -
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
@ -82,7 +83,7 @@ func (m *testManager) Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, pr
|
||||||
// AssertWatchCancelled checks that the most recent call to a Watch cancel func
|
// AssertWatchCancelled checks that the most recent call to a Watch cancel func
|
||||||
// was from the specified proxyID and that one is made in a short time. This
|
// was from the specified proxyID and that one is made in a short time. This
|
||||||
// probably won't work if you are running multiple Watches in parallel on
|
// probably won't work if you are running multiple Watches in parallel on
|
||||||
// multiple proxyIDS due to timing/ordering issues but I dont think we need to
|
// multiple proxyIDS due to timing/ordering issues but I don't think we need to
|
||||||
// do that.
|
// do that.
|
||||||
func (m *testManager) AssertWatchCancelled(t *testing.T, proxyID string) {
|
func (m *testManager) AssertWatchCancelled(t *testing.T, proxyID string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
@ -115,7 +116,13 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
|
||||||
envoy := NewTestEnvoy(t, "web-sidecar-proxy", "")
|
envoy := NewTestEnvoy(t, "web-sidecar-proxy", "")
|
||||||
defer envoy.Close()
|
defer envoy.Close()
|
||||||
|
|
||||||
s := Server{logger, mgr, mgr, aclResolve}
|
s := Server{
|
||||||
|
Logger: logger,
|
||||||
|
CfgMgr: mgr,
|
||||||
|
Authz: mgr,
|
||||||
|
ResolveToken: aclResolve,
|
||||||
|
}
|
||||||
|
s.Initialize()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := s.StreamAggregatedResources(envoy.stream)
|
err := s.StreamAggregatedResources(envoy.stream)
|
||||||
|
@ -162,11 +169,11 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
|
||||||
// And should get a response immediately.
|
// And should get a response immediately.
|
||||||
assertResponseSent(t, envoy.stream.sendCh, expectListenerJSON(t, snap, "", 1, 3))
|
assertResponseSent(t, envoy.stream.sendCh, expectListenerJSON(t, snap, "", 1, 3))
|
||||||
|
|
||||||
// Now send Route request along with next listner one
|
// Now send Route request along with next listener one
|
||||||
envoy.SendReq(t, RouteType, 0, 0)
|
envoy.SendReq(t, RouteType, 0, 0)
|
||||||
envoy.SendReq(t, ListenerType, 1, 3)
|
envoy.SendReq(t, ListenerType, 1, 3)
|
||||||
|
|
||||||
// We don't serve routes yet so this shoould block with no response
|
// We don't serve routes yet so this should block with no response
|
||||||
assertChanBlocked(t, envoy.stream.sendCh)
|
assertChanBlocked(t, envoy.stream.sendCh)
|
||||||
|
|
||||||
// WOOP! Envoy now has full connect config. Lets verify that if we update it,
|
// WOOP! Envoy now has full connect config. Lets verify that if we update it,
|
||||||
|
@ -179,7 +186,7 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
|
||||||
mgr.DeliverConfig(t, "web-sidecar-proxy", snap)
|
mgr.DeliverConfig(t, "web-sidecar-proxy", snap)
|
||||||
|
|
||||||
// All 3 response that have something to return should return with new version
|
// All 3 response that have something to return should return with new version
|
||||||
// note that the ordering is not determinisic in general. Trying to make this
|
// note that the ordering is not deterministic in general. Trying to make this
|
||||||
// test order-agnostic though is a massive pain since we are comparing
|
// test order-agnostic though is a massive pain since we are comparing
|
||||||
// non-identical JSON strings (so can simply sort by anything) and because we
|
// non-identical JSON strings (so can simply sort by anything) and because we
|
||||||
// don't know the order the nonces will be assigned. For now we rely and
|
// don't know the order the nonces will be assigned. For now we rely and
|
||||||
|
@ -189,21 +196,21 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
|
||||||
assertResponseSent(t, envoy.stream.sendCh, expectEndpointsJSON(t, snap, "", 2, 5))
|
assertResponseSent(t, envoy.stream.sendCh, expectEndpointsJSON(t, snap, "", 2, 5))
|
||||||
assertResponseSent(t, envoy.stream.sendCh, expectListenerJSON(t, snap, "", 2, 6))
|
assertResponseSent(t, envoy.stream.sendCh, expectListenerJSON(t, snap, "", 2, 6))
|
||||||
|
|
||||||
// Let's pretent that Envoy doesn't like that new listener config. It will ACK
|
// Let's pretend that Envoy doesn't like that new listener config. It will ACK
|
||||||
// all the others (same version) but NACK the listener. This is the most
|
// all the others (same version) but NACK the listener. This is the most
|
||||||
// subtle part of xDS and the server implementation so I'll elaborate. A full
|
// subtle part of xDS and the server implementation so I'll elaborate. A full
|
||||||
// description of the protocol can be found at
|
// description of the protocol can be found at
|
||||||
// https://github.com/envoyproxy/data-plane-api/blob/master/XDS_PROTOCOL.md.
|
// https://github.com/envoyproxy/data-plane-api/blob/master/XDS_PROTOCOL.md.
|
||||||
// Envoy delays making a followup reqeest for a type until after it has
|
// Envoy delays making a followup request for a type until after it has
|
||||||
// processed and applied the last response. The next request then will include
|
// processed and applied the last response. The next request then will include
|
||||||
// the nonce in the last response which acknowledges _recieving_ and handling
|
// the nonce in the last response which acknowledges _receiving_ and handling
|
||||||
// that response. It also includes the currently applied version. If all is
|
// that response. It also includes the currently applied version. If all is
|
||||||
// good and it successfully applies the config, then the version in the next
|
// good and it successfully applies the config, then the version in the next
|
||||||
// response will be the same version just sent. This is considered to be an
|
// response will be the same version just sent. This is considered to be an
|
||||||
// ACK of that version for that type. If envoy fails to apply the config for
|
// ACK of that version for that type. If envoy fails to apply the config for
|
||||||
// some reason, it will still acknowledge that it received it (still return
|
// some reason, it will still acknowledge that it received it (still return
|
||||||
// the responses nonce), but will show the previous version it's still using.
|
// the responses nonce), but will show the previous version it's still using.
|
||||||
// This is considered a NACK. It's impotant that the server pay attention to
|
// This is considered a NACK. It's important that the server pay attention to
|
||||||
// the _nonce_ and not the version when deciding what to send otherwise a bad
|
// the _nonce_ and not the version when deciding what to send otherwise a bad
|
||||||
// version that can't be applied in Envoy will cause a busy loop.
|
// version that can't be applied in Envoy will cause a busy loop.
|
||||||
//
|
//
|
||||||
|
@ -489,7 +496,7 @@ func assertChanBlocked(t *testing.T, ch chan *envoy.DiscoveryResponse) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
select {
|
select {
|
||||||
case r := <-ch:
|
case r := <-ch:
|
||||||
t.Fatalf("chan should block but recieved: %v", r)
|
t.Fatalf("chan should block but received: %v", r)
|
||||||
case <-time.After(10 * time.Millisecond):
|
case <-time.After(10 * time.Millisecond):
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -501,12 +508,12 @@ func assertResponseSent(t *testing.T, ch chan *envoy.DiscoveryResponse, wantJSON
|
||||||
case r := <-ch:
|
case r := <-ch:
|
||||||
assertResponse(t, r, wantJSON)
|
assertResponse(t, r, wantJSON)
|
||||||
case <-time.After(50 * time.Millisecond):
|
case <-time.After(50 * time.Millisecond):
|
||||||
t.Fatalf("no response recieved after 50ms")
|
t.Fatalf("no response received after 50ms")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// assertResponse is a helper to test a envoy.DiscoveryResponse matches the
|
// assertResponse is a helper to test a envoy.DiscoveryResponse matches the
|
||||||
// JSON representaion we expect. We use JSON because the responses use protobuf
|
// JSON representation we expect. We use JSON because the responses use protobuf
|
||||||
// Any type which includes binary protobuf encoding and would make creating
|
// Any type which includes binary protobuf encoding and would make creating
|
||||||
// expected structs require the same code that is under test!
|
// expected structs require the same code that is under test!
|
||||||
func assertResponse(t *testing.T, r *envoy.DiscoveryResponse, wantJSON string) {
|
func assertResponse(t *testing.T, r *envoy.DiscoveryResponse, wantJSON string) {
|
||||||
|
@ -519,7 +526,7 @@ func assertResponse(t *testing.T, r *envoy.DiscoveryResponse, wantJSON string) {
|
||||||
require.JSONEqf(t, wantJSON, gotJSON, "got:\n%s", gotJSON)
|
require.JSONEqf(t, wantJSON, gotJSON, "got:\n%s", gotJSON)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_StreamAggregatedResources_ACLEnforcment(t *testing.T) {
|
func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -529,7 +536,7 @@ func TestServer_StreamAggregatedResources_ACLEnforcment(t *testing.T) {
|
||||||
wantDenied bool
|
wantDenied bool
|
||||||
}{
|
}{
|
||||||
// Note that although we've stubbed actual ACL checks in the testManager
|
// Note that although we've stubbed actual ACL checks in the testManager
|
||||||
// ConnectAuthorize mock, by asserting against specifc reason strings here
|
// ConnectAuthorize mock, by asserting against specific reason strings here
|
||||||
// even in the happy case which can't match the default one returned by the
|
// even in the happy case which can't match the default one returned by the
|
||||||
// mock we are implicitly validating that the implementation used the
|
// mock we are implicitly validating that the implementation used the
|
||||||
// correct token from the context.
|
// correct token from the context.
|
||||||
|
@ -589,7 +596,13 @@ func TestServer_StreamAggregatedResources_ACLEnforcment(t *testing.T) {
|
||||||
envoy := NewTestEnvoy(t, "web-sidecar-proxy", tt.token)
|
envoy := NewTestEnvoy(t, "web-sidecar-proxy", tt.token)
|
||||||
defer envoy.Close()
|
defer envoy.Close()
|
||||||
|
|
||||||
s := Server{logger, mgr, mgr, aclResolve}
|
s := Server{
|
||||||
|
Logger: logger,
|
||||||
|
CfgMgr: mgr,
|
||||||
|
Authz: mgr,
|
||||||
|
ResolveToken: aclResolve,
|
||||||
|
}
|
||||||
|
s.Initialize()
|
||||||
|
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -632,6 +645,196 @@ func TestServer_StreamAggregatedResources_ACLEnforcment(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuringDiscoveryRequest(t *testing.T) {
|
||||||
|
aclRules := `service "web" { policy = "write" }`
|
||||||
|
token := "service-write-on-web"
|
||||||
|
|
||||||
|
policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var validToken atomic.Value
|
||||||
|
validToken.Store(token)
|
||||||
|
|
||||||
|
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||||
|
mgr := newTestManager(t)
|
||||||
|
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||||
|
if token := validToken.Load(); token == nil || id != token.(string) {
|
||||||
|
return nil, acl.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return acl.NewPolicyAuthorizer(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
|
||||||
|
}
|
||||||
|
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
|
||||||
|
defer envoy.Close()
|
||||||
|
|
||||||
|
s := Server{
|
||||||
|
Logger: logger,
|
||||||
|
CfgMgr: mgr,
|
||||||
|
Authz: mgr,
|
||||||
|
ResolveToken: aclResolve,
|
||||||
|
AuthCheckFrequency: 1 * time.Hour, // make sure this doesn't kick in
|
||||||
|
}
|
||||||
|
s.Initialize()
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
errCh <- s.StreamAggregatedResources(envoy.stream)
|
||||||
|
}()
|
||||||
|
|
||||||
|
getError := func() (gotErr error, ok bool) {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
return err, true
|
||||||
|
default:
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the proxy to create state needed to Watch() on
|
||||||
|
mgr.RegisterProxy(t, "web-sidecar-proxy")
|
||||||
|
|
||||||
|
// Send initial cluster discover (OK)
|
||||||
|
envoy.SendReq(t, ClusterType, 0, 0)
|
||||||
|
{
|
||||||
|
err, ok := getError()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check no response sent yet
|
||||||
|
assertChanBlocked(t, envoy.stream.sendCh)
|
||||||
|
{
|
||||||
|
err, ok := getError()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deliver a new snapshot
|
||||||
|
snap := proxycfg.TestConfigSnapshot(t)
|
||||||
|
mgr.DeliverConfig(t, "web-sidecar-proxy", snap)
|
||||||
|
|
||||||
|
assertResponseSent(t, envoy.stream.sendCh, expectClustersJSON(t, snap, token, 1, 1))
|
||||||
|
|
||||||
|
// Now nuke the ACL token.
|
||||||
|
validToken.Store("")
|
||||||
|
|
||||||
|
// It also (in parallel) issues the next cluster request (which acts as an ACK
|
||||||
|
// of the version we sent)
|
||||||
|
envoy.SendReq(t, ClusterType, 1, 1)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.Error(t, err)
|
||||||
|
gerr, ok := status.FromError(err)
|
||||||
|
require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
|
||||||
|
require.Equal(t, codes.Unauthenticated, gerr.Code())
|
||||||
|
require.Equal(t, "unauthenticated: ACL not found", gerr.Message())
|
||||||
|
|
||||||
|
mgr.AssertWatchCancelled(t, "web-sidecar-proxy")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
t.Fatalf("timed out waiting for handler to finish")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBackground(t *testing.T) {
|
||||||
|
aclRules := `service "web" { policy = "write" }`
|
||||||
|
token := "service-write-on-web"
|
||||||
|
|
||||||
|
policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var validToken atomic.Value
|
||||||
|
validToken.Store(token)
|
||||||
|
|
||||||
|
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||||
|
mgr := newTestManager(t)
|
||||||
|
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||||
|
if token := validToken.Load(); token == nil || id != token.(string) {
|
||||||
|
return nil, acl.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return acl.NewPolicyAuthorizer(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
|
||||||
|
}
|
||||||
|
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
|
||||||
|
defer envoy.Close()
|
||||||
|
|
||||||
|
s := Server{
|
||||||
|
Logger: logger,
|
||||||
|
CfgMgr: mgr,
|
||||||
|
Authz: mgr,
|
||||||
|
ResolveToken: aclResolve,
|
||||||
|
AuthCheckFrequency: 100 * time.Millisecond, // Make this short.
|
||||||
|
}
|
||||||
|
s.Initialize()
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
errCh <- s.StreamAggregatedResources(envoy.stream)
|
||||||
|
}()
|
||||||
|
|
||||||
|
getError := func() (gotErr error, ok bool) {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
return err, true
|
||||||
|
default:
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the proxy to create state needed to Watch() on
|
||||||
|
mgr.RegisterProxy(t, "web-sidecar-proxy")
|
||||||
|
|
||||||
|
// Send initial cluster discover (OK)
|
||||||
|
envoy.SendReq(t, ClusterType, 0, 0)
|
||||||
|
{
|
||||||
|
err, ok := getError()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check no response sent yet
|
||||||
|
assertChanBlocked(t, envoy.stream.sendCh)
|
||||||
|
{
|
||||||
|
err, ok := getError()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deliver a new snapshot
|
||||||
|
snap := proxycfg.TestConfigSnapshot(t)
|
||||||
|
mgr.DeliverConfig(t, "web-sidecar-proxy", snap)
|
||||||
|
|
||||||
|
assertResponseSent(t, envoy.stream.sendCh, expectClustersJSON(t, snap, token, 1, 1))
|
||||||
|
|
||||||
|
// It also (in parallel) issues the next cluster request (which acts as an ACK
|
||||||
|
// of the version we sent)
|
||||||
|
envoy.SendReq(t, ClusterType, 1, 1)
|
||||||
|
|
||||||
|
// Check no response sent yet
|
||||||
|
assertChanBlocked(t, envoy.stream.sendCh)
|
||||||
|
{
|
||||||
|
err, ok := getError()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now nuke the ACL token while there's no activity.
|
||||||
|
validToken.Store("")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.Error(t, err)
|
||||||
|
gerr, ok := status.FromError(err)
|
||||||
|
require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
|
||||||
|
require.Equal(t, codes.Unauthenticated, gerr.Code())
|
||||||
|
require.Equal(t, "unauthenticated: ACL not found", gerr.Message())
|
||||||
|
|
||||||
|
mgr.AssertWatchCancelled(t, "web-sidecar-proxy")
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
t.Fatalf("timed out waiting for handler to finish")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// This tests the ext_authz service method that implements connect authz.
|
// This tests the ext_authz service method that implements connect authz.
|
||||||
func TestServer_Check(t *testing.T) {
|
func TestServer_Check(t *testing.T) {
|
||||||
|
|
||||||
|
@ -685,7 +888,7 @@ func TestServer_Check(t *testing.T) {
|
||||||
destPrincipal: "not-a-spiffe-id",
|
destPrincipal: "not-a-spiffe-id",
|
||||||
// Should never make it to authz call.
|
// Should never make it to authz call.
|
||||||
wantDenied: true,
|
wantDenied: true,
|
||||||
wantReason: "Destination Principal is not a valid Connect identitiy",
|
wantReason: "Destination Principal is not a valid Connect identity",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "dest not a service URI",
|
name: "dest not a service URI",
|
||||||
|
@ -693,7 +896,7 @@ func TestServer_Check(t *testing.T) {
|
||||||
destPrincipal: "spiffe://trust-domain.consul",
|
destPrincipal: "spiffe://trust-domain.consul",
|
||||||
// Should never make it to authz call.
|
// Should never make it to authz call.
|
||||||
wantDenied: true,
|
wantDenied: true,
|
||||||
wantReason: "Destination Principal is not a valid Service identitiy",
|
wantReason: "Destination Principal is not a valid Service identity",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "ACL not got permission for authz call",
|
name: "ACL not got permission for authz call",
|
||||||
|
@ -729,7 +932,13 @@ func TestServer_Check(t *testing.T) {
|
||||||
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
|
envoy := NewTestEnvoy(t, "web-sidecar-proxy", token)
|
||||||
defer envoy.Close()
|
defer envoy.Close()
|
||||||
|
|
||||||
s := Server{logger, mgr, mgr, aclResolve}
|
s := Server{
|
||||||
|
Logger: logger,
|
||||||
|
CfgMgr: mgr,
|
||||||
|
Authz: mgr,
|
||||||
|
ResolveToken: aclResolve,
|
||||||
|
}
|
||||||
|
s.Initialize()
|
||||||
|
|
||||||
// Create a context with the correct token
|
// Create a context with the correct token
|
||||||
ctx := metadata.NewIncomingContext(context.Background(),
|
ctx := metadata.NewIncomingContext(context.Background(),
|
||||||
|
|
Loading…
Reference in New Issue