subscribe: add integration test for filtering events by acl

This commit is contained in:
Daniel Nephin 2020-09-28 17:11:51 -04:00
parent 083f4e8f57
commit 39beed0af6

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
@ -29,7 +30,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
backend, err := newTestBackend() backend, err := newTestBackend()
require.NoError(t, err) require.NoError(t, err)
srv := &Server{Backend: backend, Logger: hclog.New(nil)} srv := &Server{Backend: backend, Logger: hclog.New(nil)}
addr := newTestServer(t, srv) addr := newTestServer(t, srv)
ids := newCounter() ids := newCounter()
@ -273,12 +273,12 @@ func assertDeepEqual(t *testing.T, x, y interface{}) {
type testBackend struct { type testBackend struct {
store *state.Store store *state.Store
authorizer acl.Authorizer authorizer func(token string) acl.Authorizer
forwardConn *gogrpc.ClientConn forwardConn *gogrpc.ClientConn
} }
func (b testBackend) ResolveToken(_ string) (acl.Authorizer, error) { func (b testBackend) ResolveToken(token string) (acl.Authorizer, error) {
return b.authorizer, nil return b.authorizer(token), nil
} }
func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) { func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) {
@ -301,7 +301,10 @@ func newTestBackend() (*testBackend, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &testBackend{store: store, authorizer: acl.AllowAll()}, nil allowAll := func(_ string) acl.Authorizer {
return acl.AllowAll()
}
return &testBackend{store: store, authorizer: allowAll}, nil
} }
var _ Backend = (*testBackend)(nil) var _ Backend = (*testBackend)(nil)
@ -395,7 +398,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
Port: 9000, Port: 9000,
}, },
} }
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("req1"), req)) require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg1"), req))
} }
{ {
req := &structs.RegisterRequest{ req := &structs.RegisterRequest{
@ -577,185 +580,83 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
assertDeepEqual(t, expectedEvent, event) assertDeepEqual(t, expectedEvent, event)
} }
/* TODO // TODO: test case for converting stream.Events to pbsubscribe.Events, including framing events
func TestStreaming_Subscribe_SkipSnapshot(t *testing.T) {
t.Parallel()
require := require.New(t) func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testing.T) {
dir1, server := testServerWithConfig(t, func(c *Config) { if testing.Short() {
c.Datacenter = "dc1" t.Skip("too slow for -short run")
c.Bootstrap = true
c.GRPCEnabled = true
})
defer os.RemoveAll(dir1)
defer server.Shutdown()
codec := rpcClient(t, server)
defer codec.Close()
dir2, client := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.GRPCEnabled = true
})
defer os.RemoveAll(dir2)
defer client.Shutdown()
// Try to join
testrpc.WaitForLeader(t, server.RPC, "dc1")
joinLAN(t, client, server)
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
// Register a dummy node with our service on it.
{
req := &structs.RegisterRequest{
Node: "node1",
Address: "3.4.5.6",
Datacenter: "dc1",
Service: &structs.NodeService{
ID: "redis1",
Service: "redis",
Address: "3.4.5.6",
Port: 8080,
},
}
var out struct{}
require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &out))
} }
// Start a Subscribe call to our streaming endpoint. backend, err := newTestBackend()
conn, err := client.GRPCConn() require.NoError(t, err)
require.NoError(err) srv := &Server{Backend: backend, Logger: hclog.New(nil)}
addr := newTestServer(t, srv)
streamClient := pbsubscribe.NewConsulClient(conn)
var index uint64
{
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"})
require.NoError(err)
// Start a goroutine to read updates off the pbsubscribe.
eventCh := make(chan *pbsubscribe.Event, 0)
go recvEvents(t, eventCh, streamHandle)
var snapshotEvents []*pbsubscribe.Event
for i := 0; i < 2; i++ {
select {
case event := <-eventCh:
snapshotEvents = append(snapshotEvents, event)
case <-time.After(3 * time.Second):
t.Fatalf("did not receive events past %d", len(snapshotEvents))
}
}
// Save the index from the event
index = snapshotEvents[0].Index
}
// Start another Subscribe call passing the index from the last event.
{
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis",
Index: index,
})
require.NoError(err)
// Start a goroutine to read updates off the pbsubscribe.
eventCh := make(chan *pbsubscribe.Event, 0)
go recvEvents(t, eventCh, streamHandle)
// We should get no snapshot and the first event should be "resume stream"
select {
case event := <-eventCh:
require.True(event.GetResumeStream())
case <-time.After(500 * time.Millisecond):
t.Fatalf("never got event")
}
// Wait and make sure there aren't any events coming. The server shouldn't send
// a snapshot and we haven't made any updates to the catalog that would trigger
// more events.
select {
case event := <-eventCh:
t.Fatalf("got another event: %v", event)
case <-time.After(500 * time.Millisecond):
}
}
}
func TestStreaming_Subscribe_FilterACL(t *testing.T) {
t.Parallel()
require := require.New(t)
dir, _, server, codec := testACLFilterServerWithConfigFn(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.ACLEnforceVersion8 = true
c.GRPCEnabled = true
})
defer os.RemoveAll(dir)
defer server.Shutdown()
defer codec.Close()
dir2, client := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.GRPCEnabled = true
})
defer os.RemoveAll(dir2)
defer client.Shutdown()
// Try to join
testrpc.WaitForLeader(t, server.RPC, "dc1")
joinLAN(t, client, server)
testrpc.WaitForTestAgent(t, client.RPC, "dc1", testrpc.WithToken("root"))
// Create a policy for the test token. // Create a policy for the test token.
policyReq := structs.ACLPolicySetRequest{ rules := `
Datacenter: "dc1",
Policy: structs.ACLPolicy{
Description: "foobar",
Name: "baz",
Rules: fmt.Sprintf(`
service "foo" { service "foo" {
policy = "write" policy = "write"
} }
node "%s" { node "node1" {
policy = "write" policy = "write"
} }
`, server.config.NodeName), `
}, authorizer, err := acl.NewAuthorizerFromRules(
WriteRequest: structs.WriteRequest{Token: "root"}, "1", 0, rules, acl.SyntaxCurrent,
} &acl.Config{WildcardName: structs.WildcardSpecifier},
resp := structs.ACLPolicy{} nil)
require.NoError(msgpackrpc.CallWithCodec(codec, "ACL.PolicySet", &policyReq, &resp)) require.NoError(t, err)
authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()})
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
// Create a new token that only has access to one node. // TODO: is there any easy way to do this with the acl package?
var token structs.ACLToken token := "this-token-is-good"
arg := structs.ACLTokenSetRequest{ backend.authorizer = func(tok string) acl.Authorizer {
if tok == token {
return authorizer
}
return acl.DenyAll()
}
ids := newCounter()
{
req := &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
ACLToken: structs.ACLToken{ Node: "node1",
Policies: []structs.ACLTokenPolicyLink{ Address: "127.0.0.1",
structs.ACLTokenPolicyLink{ Service: &structs.NodeService{
ID: resp.ID, ID: "foo",
Service: "foo",
}, },
Check: &structs.HealthCheck{
CheckID: "service:foo",
Name: "service:foo",
Node: "node1",
ServiceID: "foo",
Status: api.HealthPassing,
}, },
},
WriteRequest: structs.WriteRequest{Token: "root"},
} }
require.NoError(msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &arg, &token)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg1"), req))
auth, err := server.ResolveToken(token.SecretID)
require.NoError(err)
require.Equal(auth.NodeRead("denied", nil), acl.Deny)
// Register another instance of service foo on a fake node the token doesn't have access to. // Register a service which should be denied
regArg := structs.RegisterRequest{ req = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "node1",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "bar",
Service: "bar",
},
Check: &structs.HealthCheck{
CheckID: "service:bar",
Name: "service:bar",
Node: "node1",
ServiceID: "bar",
},
}
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req))
req = &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "denied", Node: "denied",
Address: "127.0.0.1", Address: "127.0.0.1",
@ -763,50 +664,44 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) {
ID: "foo", ID: "foo",
Service: "foo", Service: "foo",
}, },
WriteRequest: structs.WriteRequest{Token: "root"},
} }
require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &regArg, nil)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
}
// Set up the gRPC client. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
conn, err := client.GRPCConn() t.Cleanup(cancel)
require.NoError(err)
streamClient := pbsubscribe.NewConsulClient(conn) conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure())
require.NoError(t, err)
t.Cleanup(logError(t, conn.Close))
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
// Start a Subscribe call to our streaming endpoint for the service we have access to. // Start a Subscribe call to our streaming endpoint for the service we have access to.
{ {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "foo", Key: "foo",
Token: token.SecretID, Token: token,
}) })
require.NoError(err) require.NoError(t, err)
// Start a goroutine to read updates off the pbsubscribe. chEvents := make(chan eventOrError, 0)
eventCh := make(chan *pbsubscribe.Event, 0) go recvEvents(chEvents, streamHandle)
go recvEvents(t, eventCh, streamHandle)
// Read events off the pbsubscribe. We should not see any events for the filtered node.
var snapshotEvents []*pbsubscribe.Event var snapshotEvents []*pbsubscribe.Event
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
select { snapshotEvents = append(snapshotEvents, getEvent(t, chEvents))
case event := <-eventCh:
snapshotEvents = append(snapshotEvents, event)
case <-time.After(5 * time.Second):
t.Fatalf("did not receive events past %d", len(snapshotEvents))
} }
}
require.Len(snapshotEvents, 2) require.Len(t, snapshotEvents, 2)
require.Equal("foo", snapshotEvents[0].GetServiceHealth().CheckServiceNode.Service.Service) require.Equal(t, "foo", snapshotEvents[0].GetServiceHealth().CheckServiceNode.Service.Service)
require.Equal(server.config.NodeName, snapshotEvents[0].GetServiceHealth().CheckServiceNode.Node.Node) require.Equal(t, "node1", snapshotEvents[0].GetServiceHealth().CheckServiceNode.Node.Node)
require.True(snapshotEvents[1].GetEndOfSnapshot()) require.True(t, snapshotEvents[1].GetEndOfSnapshot())
// Update the service with a new port to trigger a new event. // Update the service with a new port to trigger a new event.
regArg := structs.RegisterRequest{ req := &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: server.config.NodeName, Node: "node1",
Address: "127.0.0.1", Address: "127.0.0.1",
Service: &structs.NodeService{ Service: &structs.NodeService{
ID: "foo", ID: "foo",
@ -818,22 +713,19 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) {
Name: "service:foo", Name: "service:foo",
ServiceID: "foo", ServiceID: "foo",
Status: api.HealthPassing, Status: api.HealthPassing,
Node: "node1",
}, },
WriteRequest: structs.WriteRequest{Token: "root"}, WriteRequest: structs.WriteRequest{Token: "root"},
} }
require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &regArg, nil)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg4"), req))
select { event := getEvent(t, chEvents)
case event := <-eventCh:
service := event.GetServiceHealth().CheckServiceNode.Service service := event.GetServiceHealth().CheckServiceNode.Service
require.Equal("foo", service.Service) require.Equal(t, "foo", service.Service)
require.Equal(1234, service.Port) require.Equal(t, int32(1234), service.Port)
case <-time.After(5 * time.Second):
t.Fatalf("did not receive events past %d", len(snapshotEvents))
}
// Now update the service on the denied node and make sure we don't see an event. // Now update the service on the denied node and make sure we don't see an event.
regArg = structs.RegisterRequest{ req = &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "denied", Node: "denied",
Address: "127.0.0.1", Address: "127.0.0.1",
@ -847,13 +739,14 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) {
Name: "service:foo", Name: "service:foo",
ServiceID: "foo", ServiceID: "foo",
Status: api.HealthPassing, Status: api.HealthPassing,
Node: "denied",
}, },
WriteRequest: structs.WriteRequest{Token: "root"}, WriteRequest: structs.WriteRequest{Token: "root"},
} }
require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &regArg, nil)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg5"), req))
select { select {
case event := <-eventCh: case event := <-chEvents:
t.Fatalf("should not have received event: %v", event) t.Fatalf("should not have received event: %v", event)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
} }
@ -861,30 +754,22 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) {
// Start another subscribe call for bar, which the token shouldn't have access to. // Start another subscribe call for bar, which the token shouldn't have access to.
{ {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "bar", Key: "bar",
Token: token.SecretID, Token: token,
}) })
require.NoError(err) require.NoError(t, err)
// Start a goroutine to read updates off the pbsubscribe. chEvents := make(chan eventOrError, 0)
eventCh := make(chan *pbsubscribe.Event, 0) go recvEvents(chEvents, streamHandle)
go recvEvents(t, eventCh, streamHandle)
select { require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
case event := <-eventCh:
require.True(event.GetEndOfSnapshot())
case <-time.After(3 * time.Second):
t.Fatal("did not receive event")
}
// Update the service and make sure we don't get a new event. // Update the service and make sure we don't get a new event.
regArg := structs.RegisterRequest{ req := &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: server.config.NodeName, Node: "node1",
Address: "127.0.0.1", Address: "127.0.0.1",
Service: &structs.NodeService{ Service: &structs.NodeService{
ID: "bar", ID: "bar",
@ -895,19 +780,21 @@ func TestStreaming_Subscribe_FilterACL(t *testing.T) {
CheckID: "service:bar", CheckID: "service:bar",
Name: "service:bar", Name: "service:bar",
ServiceID: "bar", ServiceID: "bar",
Node: "node1",
}, },
WriteRequest: structs.WriteRequest{Token: "root"}, WriteRequest: structs.WriteRequest{Token: "root"},
} }
require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &regArg, nil)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg6"), req))
select { select {
case event := <-eventCh: case event := <-chEvents:
t.Fatalf("should not have received event: %v", event) t.Fatalf("should not have received event: %v", event)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
} }
} }
} }
/*
func TestStreaming_Subscribe_ACLUpdate(t *testing.T) { func TestStreaming_Subscribe_ACLUpdate(t *testing.T) {
t.Parallel() t.Parallel()