subscribe: set the request namespace

This commit is contained in:
Daniel Nephin 2020-10-21 16:08:33 -04:00
parent a5dd2001cf
commit c42fe5ae43
7 changed files with 51 additions and 34 deletions

View File

@ -73,7 +73,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
Token: srvReq.Token, Token: srvReq.Token,
Datacenter: srvReq.Datacenter, Datacenter: srvReq.Datacenter,
Index: index, Index: index,
// TODO(streaming): set Namespace from srvReq.EnterpriseMeta.Namespace Namespace: srvReq.EnterpriseMeta.GetNamespace(),
} }
if srvReq.Connect { if srvReq.Connect {
req.Topic = pbsubscribe.Topic_ServiceHealthConnect req.Topic = pbsubscribe.Topic_ServiceHealthConnect

View File

@ -1,6 +1,7 @@
package consul package consul
import ( import (
"github.com/hashicorp/consul/agent/structs"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
@ -16,8 +17,12 @@ type subscribeBackend struct {
// TODO: refactor Resolve methods to an ACLBackend that can be used by all // TODO: refactor Resolve methods to an ACLBackend that can be used by all
// the endpoints. // the endpoints.
func (s subscribeBackend) ResolveToken(token string) (acl.Authorizer, error) { func (s subscribeBackend) ResolveTokenAndDefaultMeta(
return s.srv.ResolveToken(token) token string,
entMeta *structs.EnterpriseMeta,
authzContext *acl.AuthorizerContext,
) (acl.Authorizer, error) {
return s.srv.ResolveTokenAndDefaultMeta(token, entMeta, authzContext)
} }
var _ subscribe.Backend = (*subscribeBackend)(nil) var _ subscribe.Backend = (*subscribeBackend)(nil)

View File

@ -37,11 +37,12 @@ func (s *streamID) String() string {
return s.id return s.id
} }
func (h *Server) newLoggerForRequest(req *pbsubscribe.SubscribeRequest) Logger { func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger {
return h.Logger.With( return l.With(
"topic", req.Topic.String(), "topic", req.Topic.String(),
"dc", req.Datacenter, "dc", req.Datacenter,
"key", req.Key, "key", req.Key,
"namespace", req.Namespace,
"index", req.Index, "index", req.Index,
"stream_id", &streamID{}) "stream_id", &streamID{})
} }

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
) )
@ -35,15 +36,13 @@ type Logger interface {
var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil) var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil)
type Backend interface { type Backend interface {
// TODO(streaming): Use ResolveTokenAndDefaultMeta instead once SubscribeRequest ResolveTokenAndDefaultMeta(token string, entMeta *structs.EnterpriseMeta, authzContext *acl.AuthorizerContext) (acl.Authorizer, error)
// has an EnterpriseMeta.
ResolveToken(token string) (acl.Authorizer, error)
Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error)
Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
} }
func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error { func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error {
logger := h.newLoggerForRequest(req) logger := newLoggerForRequest(h.Logger, req)
handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger)) handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger))
if handled || err != nil { if handled || err != nil {
return err return err
@ -52,13 +51,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
logger.Trace("new subscription") logger.Trace("new subscription")
defer logger.Trace("subscription closed") defer logger.Trace("subscription closed")
// Resolve the token and create the ACL filter. entMeta := structs.EnterpriseMetaInitializer(req.Namespace)
authz, err := h.Backend.ResolveToken(req.Token) authz, err := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil)
if err != nil { if err != nil {
return err return err
} }
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req)) sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req, entMeta))
if err != nil { if err != nil {
return err return err
} }
@ -90,13 +89,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
} }
} }
// TODO: can be replaced by mog conversion func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest {
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.SubscribeRequest {
return &stream.SubscribeRequest{ return &stream.SubscribeRequest{
Topic: req.Topic, Topic: req.Topic,
Key: req.Key, Key: req.Key,
Token: req.Token, Token: req.Token,
Index: req.Index, Index: req.Index,
Namespace: entMeta.GetNamespace(),
} }
} }

View File

@ -93,8 +93,9 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -130,7 +131,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
Expose: pbservice.ExposeConfig{}, Expose: pbservice.ExposeConfig{},
}, },
RaftIndex: raftIndex(ids, "reg2", "reg2"), RaftIndex: raftIndex(ids, "reg2", "reg2"),
EnterpriseMeta: pbcommon.EnterpriseMeta{}, EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
}, },
}, },
}, },
@ -160,7 +161,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
Expose: pbservice.ExposeConfig{}, Expose: pbservice.ExposeConfig{},
}, },
RaftIndex: raftIndex(ids, "reg3", "reg3"), RaftIndex: raftIndex(ids, "reg3", "reg3"),
EnterpriseMeta: pbcommon.EnterpriseMeta{}, EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
}, },
}, },
}, },
@ -209,7 +210,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
Expose: pbservice.ExposeConfig{}, Expose: pbservice.ExposeConfig{},
}, },
RaftIndex: raftIndex(ids, "reg3", "reg3"), RaftIndex: raftIndex(ids, "reg3", "reg3"),
EnterpriseMeta: pbcommon.EnterpriseMeta{}, EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
}, },
Checks: []*pbservice.HealthCheck{ Checks: []*pbservice.HealthCheck{
{ {
@ -220,7 +221,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
ServiceID: "redis1", ServiceID: "redis1",
ServiceName: "redis", ServiceName: "redis",
RaftIndex: raftIndex(ids, "update", "update"), RaftIndex: raftIndex(ids, "update", "update"),
EnterpriseMeta: pbcommon.EnterpriseMeta{}, EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
}, },
}, },
}, },
@ -261,7 +262,7 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event {
case item := <-ch: case item := <-ch:
require.NoError(t, item.err) require.NoError(t, item.err)
return item.event return item.event
case <-time.After(10 * time.Second): case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting on event from server") t.Fatalf("timeout waiting on event from server")
} }
return nil return nil
@ -280,7 +281,11 @@ type testBackend struct {
forwardConn *gogrpc.ClientConn forwardConn *gogrpc.ClientConn
} }
func (b testBackend) ResolveToken(token string) (acl.Authorizer, error) { func (b testBackend) ResolveTokenAndDefaultMeta(
token string,
_ *structs.EnterpriseMeta,
_ *acl.AuthorizerContext,
) (acl.Authorizer, error) {
return b.authorizer(token), nil return b.authorizer(token), nil
} }
@ -443,6 +448,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Datacenter: "dc2", Datacenter: "dc2",
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
}) })
require.NoError(t, err) require.NoError(t, err)
go recvEvents(chEvents, streamHandle) go recvEvents(chEvents, streamHandle)
@ -477,7 +483,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
MeshGateway: pbservice.MeshGatewayConfig{}, MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{}, Expose: pbservice.ExposeConfig{},
}, },
EnterpriseMeta: pbcommon.EnterpriseMeta{}, EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
RaftIndex: raftIndex(ids, "reg2", "reg2"), RaftIndex: raftIndex(ids, "reg2", "reg2"),
}, },
}, },
@ -507,7 +513,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
MeshGateway: pbservice.MeshGatewayConfig{}, MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{}, Expose: pbservice.ExposeConfig{},
}, },
EnterpriseMeta: pbcommon.EnterpriseMeta{}, EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
RaftIndex: raftIndex(ids, "reg3", "reg3"), RaftIndex: raftIndex(ids, "reg3", "reg3"),
}, },
}, },
@ -557,7 +563,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
MeshGateway: pbservice.MeshGatewayConfig{}, MeshGateway: pbservice.MeshGatewayConfig{},
Expose: pbservice.ExposeConfig{}, Expose: pbservice.ExposeConfig{},
}, },
EnterpriseMeta: pbcommon.EnterpriseMeta{}, EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
}, },
Checks: []*pbservice.HealthCheck{ Checks: []*pbservice.HealthCheck{
{ {
@ -568,7 +574,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
ServiceID: "redis1", ServiceID: "redis1",
ServiceName: "redis", ServiceName: "redis",
RaftIndex: raftIndex(ids, "update", "update"), RaftIndex: raftIndex(ids, "update", "update"),
EnterpriseMeta: pbcommon.EnterpriseMeta{}, EnterpriseMeta: pbcommon.DefaultEnterpriseMeta,
}, },
}, },
}, },
@ -679,9 +685,10 @@ node "node1" {
runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) { runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) {
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "foo", Key: "foo",
Token: token, Token: token,
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
}) })
require.NoError(t, err) require.NoError(t, err)

View File

@ -1628,7 +1628,7 @@ func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecisi
// TODO(streaming): add enterprise test that uses namespaces // TODO(streaming): add enterprise test that uses namespaces
authzContext := new(acl.AuthorizerContext) authzContext := new(acl.AuthorizerContext)
csn.Service.FillAuthzContext(authzContext) csn.Service.EnterpriseMeta.FillAuthzContext(authzContext)
if authz.NodeRead(csn.Node.Node, authzContext) != acl.Allow { if authz.NodeRead(csn.Node.Node, authzContext) != acl.Allow {
return acl.Deny return acl.Deny

View File

@ -0,0 +1,5 @@
// +build !consulent
package pbcommon
var DefaultEnterpriseMeta = EnterpriseMeta{}