From c42fe5ae43c56c497eebb038e293c3adf1d53d5b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 21 Oct 2020 16:08:33 -0400 Subject: [PATCH] subscribe: set the request namespace --- .../cache-types/streaming_health_services.go | 2 +- agent/consul/subscribe_backend.go | 9 ++++- agent/rpc/subscribe/logger.go | 5 ++- agent/rpc/subscribe/subscribe.go | 25 ++++++------- agent/rpc/subscribe/subscribe_test.go | 37 +++++++++++-------- agent/structs/structs.go | 2 +- proto/pbcommon/common_oss.go | 5 +++ 7 files changed, 51 insertions(+), 34 deletions(-) create mode 100644 proto/pbcommon/common_oss.go diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index e3d0adefd0..ac1e6e5285 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -73,7 +73,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque Token: srvReq.Token, Datacenter: srvReq.Datacenter, Index: index, - // TODO(streaming): set Namespace from srvReq.EnterpriseMeta.Namespace + Namespace: srvReq.EnterpriseMeta.GetNamespace(), } if srvReq.Connect { req.Topic = pbsubscribe.Topic_ServiceHealthConnect diff --git a/agent/consul/subscribe_backend.go b/agent/consul/subscribe_backend.go index 56f2bac01a..1f5e5a08e2 100644 --- a/agent/consul/subscribe_backend.go +++ b/agent/consul/subscribe_backend.go @@ -1,6 +1,7 @@ package consul import ( + "github.com/hashicorp/consul/agent/structs" "google.golang.org/grpc" "github.com/hashicorp/consul/acl" @@ -16,8 +17,12 @@ type subscribeBackend struct { // TODO: refactor Resolve methods to an ACLBackend that can be used by all // the endpoints. -func (s subscribeBackend) ResolveToken(token string) (acl.Authorizer, error) { - return s.srv.ResolveToken(token) +func (s subscribeBackend) ResolveTokenAndDefaultMeta( + token string, + entMeta *structs.EnterpriseMeta, + authzContext *acl.AuthorizerContext, +) (acl.Authorizer, error) { + return s.srv.ResolveTokenAndDefaultMeta(token, entMeta, authzContext) } var _ subscribe.Backend = (*subscribeBackend)(nil) diff --git a/agent/rpc/subscribe/logger.go b/agent/rpc/subscribe/logger.go index 8615fbd905..ddddb20ca5 100644 --- a/agent/rpc/subscribe/logger.go +++ b/agent/rpc/subscribe/logger.go @@ -37,11 +37,12 @@ func (s *streamID) String() string { return s.id } -func (h *Server) newLoggerForRequest(req *pbsubscribe.SubscribeRequest) Logger { - return h.Logger.With( +func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger { + return l.With( "topic", req.Topic.String(), "dc", req.Datacenter, "key", req.Key, + "namespace", req.Namespace, "index", req.Index, "stream_id", &streamID{}) } diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 93c5e65d6c..71919babab 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" ) @@ -35,15 +36,13 @@ type Logger interface { var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil) type Backend interface { - // TODO(streaming): Use ResolveTokenAndDefaultMeta instead once SubscribeRequest - // has an EnterpriseMeta. - ResolveToken(token string) (acl.Authorizer, error) + ResolveTokenAndDefaultMeta(token string, entMeta *structs.EnterpriseMeta, authzContext *acl.AuthorizerContext) (acl.Authorizer, error) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) } func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error { - logger := h.newLoggerForRequest(req) + logger := newLoggerForRequest(h.Logger, req) handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger)) if handled || err != nil { return err @@ -52,13 +51,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub logger.Trace("new subscription") defer logger.Trace("subscription closed") - // Resolve the token and create the ACL filter. - authz, err := h.Backend.ResolveToken(req.Token) + entMeta := structs.EnterpriseMetaInitializer(req.Namespace) + authz, err := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil) if err != nil { return err } - sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req)) + sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req, entMeta)) if err != nil { return err } @@ -90,13 +89,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub } } -// TODO: can be replaced by mog conversion -func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.SubscribeRequest { +func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest { return &stream.SubscribeRequest{ - Topic: req.Topic, - Key: req.Key, - Token: req.Token, - Index: req.Index, + Topic: req.Topic, + Key: req.Key, + Token: req.Token, + Index: req.Index, + Namespace: entMeta.GetNamespace(), } } diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index d3187fdfc2..2f22846716 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -93,8 +93,9 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Namespace: pbcommon.DefaultEnterpriseMeta.Namespace, }) require.NoError(t, err) @@ -130,7 +131,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { Expose: pbservice.ExposeConfig{}, }, RaftIndex: raftIndex(ids, "reg2", "reg2"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, }, }, @@ -160,7 +161,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { Expose: pbservice.ExposeConfig{}, }, RaftIndex: raftIndex(ids, "reg3", "reg3"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, }, }, @@ -209,7 +210,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { Expose: pbservice.ExposeConfig{}, }, RaftIndex: raftIndex(ids, "reg3", "reg3"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, Checks: []*pbservice.HealthCheck{ { @@ -220,7 +221,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { ServiceID: "redis1", ServiceName: "redis", RaftIndex: raftIndex(ids, "update", "update"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, }, }, @@ -261,7 +262,7 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event { case item := <-ch: require.NoError(t, item.err) return item.event - case <-time.After(10 * time.Second): + case <-time.After(2 * time.Second): t.Fatalf("timeout waiting on event from server") } return nil @@ -280,7 +281,11 @@ type testBackend struct { forwardConn *gogrpc.ClientConn } -func (b testBackend) ResolveToken(token string) (acl.Authorizer, error) { +func (b testBackend) ResolveTokenAndDefaultMeta( + token string, + _ *structs.EnterpriseMeta, + _ *acl.AuthorizerContext, +) (acl.Authorizer, error) { return b.authorizer(token), nil } @@ -443,6 +448,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Datacenter: "dc2", + Namespace: pbcommon.DefaultEnterpriseMeta.Namespace, }) require.NoError(t, err) go recvEvents(chEvents, streamHandle) @@ -477,7 +483,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { MeshGateway: pbservice.MeshGatewayConfig{}, Expose: pbservice.ExposeConfig{}, }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, RaftIndex: raftIndex(ids, "reg2", "reg2"), }, }, @@ -507,7 +513,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { MeshGateway: pbservice.MeshGatewayConfig{}, Expose: pbservice.ExposeConfig{}, }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, RaftIndex: raftIndex(ids, "reg3", "reg3"), }, }, @@ -557,7 +563,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { MeshGateway: pbservice.MeshGatewayConfig{}, Expose: pbservice.ExposeConfig{}, }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, Checks: []*pbservice.HealthCheck{ { @@ -568,7 +574,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { ServiceID: "redis1", ServiceName: "redis", RaftIndex: raftIndex(ids, "update", "update"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, }, }, @@ -679,9 +685,10 @@ node "node1" { runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) { streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "foo", - Token: token, + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "foo", + Token: token, + Namespace: pbcommon.DefaultEnterpriseMeta.Namespace, }) require.NoError(t, err) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 46a6ba3992..0a8d4faee5 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1628,7 +1628,7 @@ func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecisi // TODO(streaming): add enterprise test that uses namespaces authzContext := new(acl.AuthorizerContext) - csn.Service.FillAuthzContext(authzContext) + csn.Service.EnterpriseMeta.FillAuthzContext(authzContext) if authz.NodeRead(csn.Node.Node, authzContext) != acl.Allow { return acl.Deny diff --git a/proto/pbcommon/common_oss.go b/proto/pbcommon/common_oss.go new file mode 100644 index 0000000000..024f207faf --- /dev/null +++ b/proto/pbcommon/common_oss.go @@ -0,0 +1,5 @@ +// +build !consulent + +package pbcommon + +var DefaultEnterpriseMeta = EnterpriseMeta{}