From a70e1886c94d0fc609363ff224d41481420bd95c Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Tue, 5 Apr 2022 15:26:14 +0100 Subject: [PATCH] WatchRoots gRPC endpoint (#12678) Adds a new gRPC streaming endpoint (WatchRoots) that dataplane clients will use to fetch the current list of active Connect CA roots and receive new lists whenever the roots are rotated. --- .changelog/12678.txt | 3 + agent/consul/acl.go | 20 + agent/consul/leader_connect_ca_test.go | 3 +- agent/consul/leader_test.go | 3 +- agent/consul/server.go | 8 + agent/consul/server_test.go | 3 +- agent/consul/state/catalog_events.go | 64 ++- agent/consul/state/catalog_events_test.go | 69 +-- agent/consul/state/connect_ca_events.go | 17 +- agent/consul/state/connect_ca_events_test.go | 23 + agent/consul/state/state_store.go | 5 +- agent/consul/state/store_integration_test.go | 56 ++- agent/consul/stream/event.go | 14 +- agent/consul/stream/event_publisher.go | 9 +- agent/consul/stream/event_publisher_test.go | 40 +- agent/consul/stream/subscription.go | 36 +- agent/consul/stream/subscription_test.go | 30 +- .../private/services/subscribe/subscribe.go | 12 +- .../services/subscribe/subscribe_test.go | 6 +- .../public/services/connectca/acl_test.go | 27 + .../services/connectca/mock_ACLResolver.go | 38 ++ .../grpc/public/services/connectca/server.go | 42 ++ .../public/services/connectca/server_test.go | 52 ++ .../public/services/connectca/watch_roots.go | 202 ++++++++ .../services/connectca/watch_roots_test.go | 280 +++++++++++ agent/grpc/public/token.go | 28 ++ agent/submatview/store_integration_test.go | 10 +- agent/xds/server.go | 16 +- proto-public/pbconnectca/ca.pb.binary.go | 28 ++ proto-public/pbconnectca/ca.pb.go | 473 ++++++++++++++++++ proto-public/pbconnectca/ca.proto | 72 +++ 31 files changed, 1473 insertions(+), 216 deletions(-) create mode 100644 .changelog/12678.txt create mode 100644 agent/grpc/public/services/connectca/acl_test.go create mode 100644 agent/grpc/public/services/connectca/mock_ACLResolver.go create mode 100644 agent/grpc/public/services/connectca/server.go create mode 100644 agent/grpc/public/services/connectca/server_test.go create mode 100644 agent/grpc/public/services/connectca/watch_roots.go create mode 100644 agent/grpc/public/services/connectca/watch_roots_test.go create mode 100644 agent/grpc/public/token.go create mode 100644 proto-public/pbconnectca/ca.pb.binary.go create mode 100644 proto-public/pbconnectca/ca.pb.go create mode 100644 proto-public/pbconnectca/ca.proto diff --git a/.changelog/12678.txt b/.changelog/12678.txt new file mode 100644 index 0000000000..3758a06a10 --- /dev/null +++ b/.changelog/12678.txt @@ -0,0 +1,3 @@ +```release-note:feature +ca: Root certificates can now be consumed from a gRPC streaming endpoint: `WatchRoots` +``` diff --git a/agent/consul/acl.go b/agent/consul/acl.go index bd84857b65..8b3d4e55ed 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -664,6 +664,26 @@ func (r *ACLResolver) synthesizePoliciesForNodeIdentities(nodeIdentities []*stru return syntheticPolicies } +// plainACLResolver wraps ACLResolver so that it can be used in other packages +// that cannot import agent/consul wholesale (e.g. because of import cycles). +// +// TODO(agentless): this pattern was copied from subscribeBackend for expediency +// but we should really refactor ACLResolver so it can be passed as a dependency +// to other packages. +type plainACLResolver struct { + resolver *ACLResolver +} + +func (r plainACLResolver) ResolveTokenAndDefaultMeta( + token string, + entMeta *structs.EnterpriseMeta, + authzContext *acl.AuthorizerContext, +) (acl.Authorizer, error) { + // ACLResolver.ResolveTokenAndDefaultMeta returns a ACLResolveResult which + // can't be used in other packages, but it embeds acl.Authorizer which can. + return r.resolver.ResolveTokenAndDefaultMeta(token, entMeta, authzContext) +} + func dedupeServiceIdentities(in []*structs.ACLServiceIdentity) []*structs.ACLServiceIdentity { // From: https://github.com/golang/go/wiki/SliceTricks#in-place-deduplicate-comparable diff --git a/agent/consul/leader_connect_ca_test.go b/agent/consul/leader_connect_ca_test.go index 2ebbda0a67..1f2c964b8a 100644 --- a/agent/consul/leader_connect_ca_test.go +++ b/agent/consul/leader_connect_ca_test.go @@ -19,6 +19,7 @@ import ( vaultapi "github.com/hashicorp/vault/api" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/hashicorp/consul-net-rpc/net/rpc" @@ -550,7 +551,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) { deps := newDefaultDeps(t, conf1) deps.Logger = logger - s1, err := NewServer(conf1, deps, nil) + s1, err := NewServer(conf1, deps, grpc.NewServer()) require.NoError(t, err) defer s1.Shutdown() testrpc.WaitForLeader(t, s1.RPC, "dc1") diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 189c058b9c..cb767acf04 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/require" + "google.golang.org/grpc" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" @@ -1528,7 +1529,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) { deps := newDefaultDeps(t, config) deps.Logger = logger - srv, err := NewServer(config, deps, nil) + srv, err := NewServer(config, deps, grpc.NewServer()) require.NoError(t, err) defer srv.Shutdown() diff --git a/agent/consul/server.go b/agent/consul/server.go index da821ebc8b..ccfa3044a4 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -43,6 +43,7 @@ import ( "github.com/hashicorp/consul/agent/consul/wanfed" agentgrpc "github.com/hashicorp/consul/agent/grpc/private" "github.com/hashicorp/consul/agent/grpc/private/services/subscribe" + "github.com/hashicorp/consul/agent/grpc/public/services/connectca" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -632,6 +633,13 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve // since it can fire events when leadership is obtained. go s.monitorLeadership() + // Initialize public gRPC server. + connectca.NewServer(connectca.Config{ + GetStore: func() connectca.StateStore { return s.FSM().State() }, + Logger: logger.Named("grpc-api.connect-ca"), + ACLResolver: plainACLResolver{s.ACLResolver}, + }).Register(s.publicGRPCServer) + // Start listening for RPC requests. go func() { if err := s.grpcHandler.Run(); err != nil { diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index bf7ff0ab6b..6f953dd1c7 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/tcpproxy" "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" + "google.golang.org/grpc" "github.com/hashicorp/consul/ipaddr" @@ -263,7 +264,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) { } } - srv, err := NewServer(c, newDefaultDeps(t, c), nil) + srv, err := NewServer(c, newDefaultDeps(t, c), grpc.NewServer()) if err != nil { return nil, err } diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 3c72db0bd8..eaca440a8e 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -1,6 +1,7 @@ package state import ( + "fmt" "strings" memdb "github.com/hashicorp/go-memdb" @@ -11,6 +12,38 @@ import ( "github.com/hashicorp/consul/proto/pbsubscribe" ) +// EventSubjectService is a stream.Subject used to route and receive events for +// a specific service. +type EventSubjectService struct { + Key string + EnterpriseMeta structs.EnterpriseMeta + + overrideKey string + overrideNamespace string + overridePartition string +} + +// String satisfies the stream.Subject interface. +func (s EventSubjectService) String() string { + partition := s.EnterpriseMeta.PartitionOrDefault() + if v := s.overridePartition; v != "" { + partition = strings.ToLower(v) + } + + namespace := s.EnterpriseMeta.NamespaceOrDefault() + if v := s.overrideNamespace; v != "" { + namespace = strings.ToLower(v) + } + + key := s.Key + if v := s.overrideKey; v != "" { + key = v + } + key = strings.ToLower(key) + + return partition + "/" + namespace + "/" + key +} + // EventPayloadCheckServiceNode is used as the Payload for a stream.Event to // indicates changes to a CheckServiceNode for service health. // @@ -33,25 +66,14 @@ func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bo } func (e EventPayloadCheckServiceNode) Subject() stream.Subject { - partition := e.Value.Service.PartitionOrDefault() - if e.overridePartition != "" { - partition = e.overridePartition - } - partition = strings.ToLower(partition) + return EventSubjectService{ + Key: e.Value.Service.Service, + EnterpriseMeta: e.Value.Service.EnterpriseMeta, - namespace := e.Value.Service.NamespaceOrDefault() - if e.overrideNamespace != "" { - namespace = e.overrideNamespace + overrideKey: e.overrideKey, + overrideNamespace: e.overrideNamespace, + overridePartition: e.overridePartition, } - namespace = strings.ToLower(namespace) - - key := e.Value.Service.Service - if e.overrideKey != "" { - key = e.overrideKey - } - key = strings.ToLower(key) - - return stream.Subject(partition + "/" + namespace + "/" + key) } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -62,7 +84,13 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { defer tx.Abort() connect := topic == topicServiceHealthConnect - idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &req.EnterpriseMeta) + + subject, ok := req.Subject.(EventSubjectService) + if !ok { + return 0, fmt.Errorf("expected SubscribeRequest.Subject to be a: state.EventSubjectService, was a: %T", req.Subject) + } + + idx, nodes, err := checkServiceNodesTxn(tx, nil, subject.Key, connect, &subject.EnterpriseMeta) if err != nil { return 0, err } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index bb17dae106..b85ea5f76d 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -16,11 +16,10 @@ import ( "github.com/hashicorp/consul/types" ) -func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) { - // Matches. +func TestEventPayloadCheckServiceNode_Subject(t *testing.T) { for desc, tc := range map[string]struct { evt EventPayloadCheckServiceNode - req stream.SubscribeRequest + sub string }{ "default partition and namespace": { EventPayloadCheckServiceNode{ @@ -30,10 +29,7 @@ func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) { }, }, }, - stream.SubscribeRequest{ - Key: "foo", - EnterpriseMeta: structs.EnterpriseMeta{}, - }, + "default/default/foo", }, "mixed casing": { EventPayloadCheckServiceNode{ @@ -43,7 +39,7 @@ func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) { }, }, }, - stream.SubscribeRequest{Key: "foo"}, + "default/default/foo", }, "override key": { EventPayloadCheckServiceNode{ @@ -54,60 +50,11 @@ func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) { }, overrideKey: "bar", }, - stream.SubscribeRequest{Key: "bar"}, + "default/default/bar", }, } { t.Run(desc, func(t *testing.T) { - require.Equal(t, tc.req.Subject(), tc.evt.Subject()) - }) - } - - // Non-matches. - for desc, tc := range map[string]struct { - evt EventPayloadCheckServiceNode - req stream.SubscribeRequest - }{ - "different key": { - EventPayloadCheckServiceNode{ - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - Service: "foo", - }, - }, - }, - stream.SubscribeRequest{ - Key: "bar", - }, - }, - "different partition": { - EventPayloadCheckServiceNode{ - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - Service: "foo", - }, - }, - overridePartition: "bar", - }, - stream.SubscribeRequest{ - Key: "foo", - }, - }, - "different namespace": { - EventPayloadCheckServiceNode{ - Value: &structs.CheckServiceNode{ - Service: &structs.NodeService{ - Service: "foo", - }, - }, - overrideNamespace: "bar", - }, - stream.SubscribeRequest{ - Key: "foo", - }, - }, - } { - t.Run(desc, func(t *testing.T) { - require.NotEqual(t, tc.req.Subject(), tc.evt.Subject()) + require.Equal(t, tc.sub, tc.evt.Subject().String()) }) } } @@ -125,7 +72,7 @@ func TestServiceHealthSnapshot(t *testing.T) { fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth) buf := &snapshotAppender{} - req := stream.SubscribeRequest{Key: "web"} + req := stream.SubscribeRequest{Subject: EventSubjectService{Key: "web"}} idx, err := fn(req, buf) require.NoError(t, err) @@ -202,7 +149,7 @@ func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect) buf := &snapshotAppender{} - req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect} + req := stream.SubscribeRequest{Subject: EventSubjectService{Key: "web"}, Topic: topicServiceHealthConnect} idx, err := fn(req, buf) require.NoError(t, err) diff --git a/agent/consul/state/connect_ca_events.go b/agent/consul/state/connect_ca_events.go index e73c206b5d..c6bd135be0 100644 --- a/agent/consul/state/connect_ca_events.go +++ b/agent/consul/state/connect_ca_events.go @@ -12,11 +12,13 @@ import ( // // Note: topics are ordinarily defined in subscribe.proto, but this one isn't // currently available via the Subscribe endpoint. -const EventTopicCARoots stringTopic = "CARoots" +const EventTopicCARoots stringer = "CARoots" -type stringTopic string +// stringer is a convenience type to turn a regular string into a fmt.Stringer +// so that it can be used as a stream.Topic or stream.Subject. +type stringer string -func (s stringTopic) String() string { return string(s) } +func (s stringer) String() string { return string(s) } type EventPayloadCARoots struct { CARoots structs.CARoots @@ -25,9 +27,12 @@ type EventPayloadCARoots struct { func (e EventPayloadCARoots) Subject() stream.Subject { return stream.SubjectNone } func (e EventPayloadCARoots) HasReadPermission(authz acl.Authorizer) bool { - // TODO(agentless): implement this method once the Authorizer exposes a method - // to check for `service:write` on any service. - panic("EventPayloadCARoots does not implement HasReadPermission") + // Require `service:write` on any service in any partition and namespace. + var authzContext acl.AuthorizerContext + structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier). + FillAuthzContext(&authzContext) + + return authz.ServiceWriteAny(&authzContext) == acl.Allow } // caRootsChangeEvents returns an event on EventTopicCARoots whenever the list diff --git a/agent/consul/state/connect_ca_events_test.go b/agent/consul/state/connect_ca_events_test.go index 9e91343674..9651e2a470 100644 --- a/agent/consul/state/connect_ca_events_test.go +++ b/agent/consul/state/connect_ca_events_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" @@ -93,3 +94,25 @@ func TestCARootsSnapshot(t *testing.T) { }) }) } + +func TestEventPayloadCARoots_HasReadPermission(t *testing.T) { + t.Run("no service:write", func(t *testing.T) { + hasRead := EventPayloadCARoots{}.HasReadPermission(acl.DenyAll()) + require.False(t, hasRead) + }) + + t.Run("has service:write", func(t *testing.T) { + policy, err := acl.NewPolicyFromSource(` + service "foo" { + policy = "write" + } + `, acl.SyntaxCurrent, nil, nil) + require.NoError(t, err) + + authz, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil) + require.NoError(t, err) + + hasRead := EventPayloadCARoots{}.HasReadPermission(authz) + require.True(t, hasRead) + }) +} diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index 82dc8d3567..2689ac1420 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -276,8 +276,11 @@ func (s *Store) AbandonCh() <-chan struct{} { // Abandon is used to signal that the given state store has been abandoned. // Calling this more than one time will panic. func (s *Store) Abandon() { - s.stopEventPublisher() + // Note: the order of these operations matters. Subscribers may receive on + // abandonCh to determine whether their subscription was closed because the + // store was abandoned, therefore it's important abandonCh is closed first. close(s.abandonCh) + s.stopEventPublisher() } // maxIndex is a helper used to retrieve the highest known index diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index edd0513896..55c3059ce9 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -25,9 +25,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, + Topic: topicService, + Subject: stringer("nope"), + Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -71,9 +71,9 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, + Topic: topicService, + Subject: stringer("nope"), + Token: token.SecretID, } sub2, err := publisher.Subscribe(subscription2) require.NoError(t, err) @@ -112,9 +112,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, + Topic: topicService, + Subject: stringer("nope"), + Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -162,9 +162,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, + Topic: topicService, + Subject: stringer("nope"), + Token: token.SecretID, } sub, err = publisher.Subscribe(subscription2) require.NoError(t, err) @@ -191,9 +191,9 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Register another subscription. subscription3 := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, + Topic: topicService, + Subject: stringer("nope"), + Token: token.SecretID, } sub, err = publisher.Subscribe(subscription3) require.NoError(t, err) @@ -233,9 +233,9 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, + Topic: topicService, + Subject: stringer("nope"), + Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -278,9 +278,9 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, + Topic: topicService, + Subject: stringer("nope"), + Token: token.SecretID, } sub, err = publisher.Subscribe(subscription2) require.NoError(t, err) @@ -396,7 +396,9 @@ var topicService topic = "test-topic-service" func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ topicService: func(req stream.SubscribeRequest, snap stream.SnapshotAppender) (uint64, error) { - idx, nodes, err := s.ServiceNodes(nil, req.Key, nil) + key := req.Subject.String() + + idx, nodes, err := s.ServiceNodes(nil, key, nil) if err != nil { return idx, err } @@ -405,7 +407,7 @@ func newTestSnapshotHandlers(s *Store) stream.SnapshotHandlers { event := stream.Event{ Topic: req.Topic, Index: node.ModifyIndex, - Payload: nodePayload{node: node, key: req.Key}, + Payload: nodePayload{node: node, key: key}, } snap.Append([]stream.Event{event}) } @@ -424,7 +426,7 @@ func (p nodePayload) HasReadPermission(acl.Authorizer) bool { } func (p nodePayload) Subject() stream.Subject { - return stream.Subject(p.node.PartitionOrDefault() + "/" + p.node.NamespaceOrDefault() + "/" + p.key) + return stringer(p.key) } func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { @@ -451,9 +453,9 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo // so we know the initial token write event has been sent out before // continuing... req := &stream.SubscribeRequest{ - Topic: topicService, - Key: "nope", - Token: token.SecretID, + Topic: topicService, + Subject: stringer("nope"), + Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 78e41bc375..b3936a49b7 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -17,12 +17,16 @@ type Topic fmt.Stringer // Subject identifies a portion of a topic for which a subscriber wishes to // receive events (e.g. health events for a particular service) usually the // normalized resource name (including partition and namespace if applicable). -type Subject string +type Subject fmt.Stringer // SubjectNone is used when all events on a given topic are "global" and not // further partitioned by subject. For example: the "CA Roots" topic which is // used to notify subscribers when the global set CA root certificates changes. -const SubjectNone Subject = "none" +const SubjectNone stringer = "none" + +type stringer string + +func (s stringer) String() string { return string(s) } // Event is a structure with identifiers and a payload. Events are Published to // EventPublisher and returned to Subscribers. @@ -123,6 +127,12 @@ func (e Event) IsNewSnapshotToFollow() bool { return e.Payload == newSnapshotToFollow{} } +// IsFramingEvent returns true if this is a framing event (e.g. EndOfSnapshot +// or NewSnapshotToFollow). +func (e Event) IsFramingEvent() bool { + return e.IsEndOfSnapshot() || e.IsNewSnapshotToFollow() +} + type framingEvent struct{} func (framingEvent) HasReadPermission(acl.Authorizer) bool { diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 0941013557..06b7b03a27 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -44,8 +44,8 @@ type EventPublisher struct { // topicSubject is used as a map key when accessing topic buffers and cached // snapshots. type topicSubject struct { - Topic Topic - Subject Subject + Topic string + Subject string } type subscriptions struct { @@ -138,7 +138,10 @@ func (e *EventPublisher) publishEvent(events []Event) { continue } - groupKey := topicSubject{event.Topic, event.Payload.Subject()} + groupKey := topicSubject{ + Topic: event.Topic.String(), + Subject: event.Payload.Subject().String(), + } groupedEvents[groupKey] = append(groupedEvents[groupKey], event) } diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index f90af0b1b0..c718d58538 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -21,8 +21,8 @@ var testTopic Topic = intTopic(999) func TestEventPublisher_SubscribeWithIndex0(t *testing.T) { req := &SubscribeRequest{ - Topic: testTopic, - Key: "sub-key", + Topic: testTopic, + Subject: stringer("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -81,7 +81,7 @@ func (p simplePayload) HasReadPermission(acl.Authorizer) bool { return !p.noReadPerm } -func (p simplePayload) Subject() Subject { return Subject("default/default/" + p.key) } +func (p simplePayload) Subject() Subject { return stringer(p.key) } func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ @@ -153,11 +153,11 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { publisher := NewEventPublisher(handlers, time.Second) go publisher.Run(ctx) - sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)}) + sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22), Subject: SubjectNone}) require.NoError(t, err) defer sub1.Unsubscribe() - sub2, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(33)}) + sub2, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(33), Subject: SubjectNone}) require.NoError(t, err) defer sub2.Unsubscribe() @@ -184,8 +184,8 @@ func consumeSub(ctx context.Context, sub *Subscription) error { func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { req := &SubscribeRequest{ - Topic: testTopic, - Key: "sub-key", + Topic: testTopic, + Subject: stringer("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -229,8 +229,8 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { req := &SubscribeRequest{ - Topic: testTopic, - Key: "sub-key", + Topic: testTopic, + Subject: stringer("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -282,8 +282,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { req := &SubscribeRequest{ - Topic: testTopic, - Key: "sub-key", + Topic: testTopic, + Subject: stringer("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -338,8 +338,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) { req := &SubscribeRequest{ - Topic: testTopic, - Key: "sub-key", + Topic: testTopic, + Subject: stringer("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -406,9 +406,9 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) { req := &SubscribeRequest{ - Topic: testTopic, - Key: "sub-key", - Index: 1, + Topic: testTopic, + Subject: stringer("sub-key"), + Index: 1, } nextEvent := Event{ @@ -492,8 +492,8 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) { req := &SubscribeRequest{ - Topic: testTopic, - Key: "sub-key", + Topic: testTopic, + Subject: stringer("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -514,8 +514,8 @@ func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) { func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *testing.T) { req := &SubscribeRequest{ - Topic: testTopic, - Key: "sub-key", + Topic: testTopic, + Subject: stringer("sub-key"), } publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second) diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 0a42947154..28ca50c3a1 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -4,10 +4,7 @@ import ( "context" "errors" "fmt" - "strings" "sync/atomic" - - "github.com/hashicorp/consul/agent/structs" ) const ( @@ -54,37 +51,32 @@ type Subscription struct { } // SubscribeRequest identifies the types of events the subscriber would like to -// receiver. Topic and Token are required. +// receive. Topic, Subject, and Token are required. type SubscribeRequest struct { - // Topic to subscribe to + // Topic to subscribe to (e.g. service health). Topic Topic - // Key used to filter events in the topic. Only events matching the key will - // be returned by the subscription. A blank key will return all events. Key - // is generally the name of the resource. - Key string - // EnterpriseMeta is used to filter events in the topic. Only events matching - // the partition and namespace will be returned by the subscription. - EnterpriseMeta structs.EnterpriseMeta + + // Subject identifies the subset of Topic events the subscriber wishes to + // receive (e.g. events for a specific service). SubjectNone may be provided + // if all events on the given topic are "global" and not further partitioned + // by subject. + Subject Subject + // Token that was used to authenticate the request. If any ACL policy // changes impact the token the subscription will be forcefully closed. Token string + // Index is the last index the client received. If non-zero the // subscription will be resumed from this index. If the index is out-of-date // a NewSnapshotToFollow event will be sent. Index uint64 } -func (req SubscribeRequest) Subject() Subject { - var ( - partition = req.EnterpriseMeta.PartitionOrDefault() - namespace = req.EnterpriseMeta.NamespaceOrDefault() - key = strings.ToLower(req.Key) - ) - return Subject(partition + "/" + namespace + "/" + key) -} - func (req SubscribeRequest) topicSubject() topicSubject { - return topicSubject{req.Topic, req.Subject()} + return topicSubject{ + Topic: req.Topic.String(), + Subject: req.Subject.String(), + } } // newSubscription return a new subscription. The caller is responsible for diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index cf3be6393a..b6e0f1a5fe 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -6,32 +6,10 @@ import ( time "time" "github.com/stretchr/testify/require" - - "github.com/hashicorp/consul/agent/structs" ) func noopUnSub() {} -func TestSubscription_Subject(t *testing.T) { - for desc, tc := range map[string]struct { - req SubscribeRequest - sub Subject - }{ - "default partition and namespace": { - SubscribeRequest{Key: "foo", EnterpriseMeta: structs.EnterpriseMeta{}}, - "default/default/foo", - }, - "mixed casing": { - SubscribeRequest{Key: "BaZ"}, - "default/default/baz", - }, - } { - t.Run(desc, func(t *testing.T) { - require.Equal(t, tc.sub, tc.req.Subject()) - }) - } -} - func TestSubscription(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -50,8 +28,8 @@ func TestSubscription(t *testing.T) { defer cancel() req := SubscribeRequest{ - Topic: testTopic, - Key: "test", + Topic: testTopic, + Subject: stringer("test"), } sub := newSubscription(req, startHead, noopUnSub) @@ -124,8 +102,8 @@ func TestSubscription_Close(t *testing.T) { defer cancel() req := SubscribeRequest{ - Topic: testTopic, - Key: "test", + Topic: testTopic, + Subject: stringer("test"), } sub := newSubscription(req, startHead, noopUnSub) diff --git a/agent/grpc/private/services/subscribe/subscribe.go b/agent/grpc/private/services/subscribe/subscribe.go index 1a9d0031ac..18372b2002 100644 --- a/agent/grpc/private/services/subscribe/subscribe.go +++ b/agent/grpc/private/services/subscribe/subscribe.go @@ -93,11 +93,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest { return &stream.SubscribeRequest{ - Topic: req.Topic, - Key: req.Key, - EnterpriseMeta: entMeta, - Token: req.Token, - Index: req.Index, + Topic: req.Topic, + Subject: state.EventSubjectService{ + Key: req.Key, + EnterpriseMeta: entMeta, + }, + Token: req.Token, + Index: req.Index, } } diff --git a/agent/grpc/private/services/subscribe/subscribe_test.go b/agent/grpc/private/services/subscribe/subscribe_test.go index 95df5fb13d..a5a47a0770 100644 --- a/agent/grpc/private/services/subscribe/subscribe_test.go +++ b/agent/grpc/private/services/subscribe/subscribe_test.go @@ -3,13 +3,12 @@ package subscribe import ( "context" "errors" - "github.com/golang/protobuf/ptypes/duration" - "github.com/hashicorp/consul/proto/pbcommon" "io" "net" "testing" "time" + "github.com/golang/protobuf/ptypes/duration" "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" @@ -25,6 +24,7 @@ import ( grpc "github.com/hashicorp/consul/agent/grpc/private" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/prototest" @@ -1106,7 +1106,7 @@ func newEventFromSubscription(t *testing.T, index uint64) stream.Event { }, } ep := stream.NewEventPublisher(handlers, 0) - req := &stream.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealthConnect, Index: index} + req := &stream.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealthConnect, Subject: stream.SubjectNone, Index: index} sub, err := ep.Subscribe(req) require.NoError(t, err) diff --git a/agent/grpc/public/services/connectca/acl_test.go b/agent/grpc/public/services/connectca/acl_test.go new file mode 100644 index 0000000000..bac0e342e6 --- /dev/null +++ b/agent/grpc/public/services/connectca/acl_test.go @@ -0,0 +1,27 @@ +package connectca + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" +) + +// testAuthorizer returns an ACL policy authorizer with `service:write` on an +// arbitrary service. +func testAuthorizer(t *testing.T) acl.Authorizer { + t.Helper() + + policy, err := acl.NewPolicyFromSource(` + service "foo" { + policy = "write" + } + `, acl.SyntaxCurrent, nil, nil) + require.NoError(t, err) + + authz, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil) + require.NoError(t, err) + + return authz +} diff --git a/agent/grpc/public/services/connectca/mock_ACLResolver.go b/agent/grpc/public/services/connectca/mock_ACLResolver.go new file mode 100644 index 0000000000..bbc462c444 --- /dev/null +++ b/agent/grpc/public/services/connectca/mock_ACLResolver.go @@ -0,0 +1,38 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package connectca + +import ( + acl "github.com/hashicorp/consul/acl" + mock "github.com/stretchr/testify/mock" + + structs "github.com/hashicorp/consul/agent/structs" +) + +// MockACLResolver is an autogenerated mock type for the ACLResolver type +type MockACLResolver struct { + mock.Mock +} + +// ResolveTokenAndDefaultMeta provides a mock function with given fields: _a0, _a1, _a2 +func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *structs.EnterpriseMeta, _a2 *acl.AuthorizerContext) (acl.Authorizer, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 acl.Authorizer + if rf, ok := ret.Get(0).(func(string, *structs.EnterpriseMeta, *acl.AuthorizerContext) acl.Authorizer); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(acl.Authorizer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, *structs.EnterpriseMeta, *acl.AuthorizerContext) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/agent/grpc/public/services/connectca/server.go b/agent/grpc/public/services/connectca/server.go new file mode 100644 index 0000000000..64bced2dda --- /dev/null +++ b/agent/grpc/public/services/connectca/server.go @@ -0,0 +1,42 @@ +package connectca + +import ( + "google.golang.org/grpc" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto-public/pbconnectca" +) + +type Server struct { + Config +} + +type Config struct { + GetStore func() StateStore + Logger hclog.Logger + ACLResolver ACLResolver +} + +type StateStore interface { + EventPublisher() state.EventPublisher + CAConfig(memdb.WatchSet) (uint64, *structs.CAConfiguration, error) + AbandonCh() <-chan struct{} +} + +//go:generate mockery -name ACLResolver -inpkg +type ACLResolver interface { + ResolveTokenAndDefaultMeta(string, *structs.EnterpriseMeta, *acl.AuthorizerContext) (acl.Authorizer, error) +} + +func NewServer(cfg Config) *Server { + return &Server{cfg} +} + +func (s *Server) Register(grpcServer *grpc.Server) { + pbconnectca.RegisterConnectCAServiceServer(grpcServer, s) +} diff --git a/agent/grpc/public/services/connectca/server_test.go b/agent/grpc/public/services/connectca/server_test.go new file mode 100644 index 0000000000..6a4d42fa0f --- /dev/null +++ b/agent/grpc/public/services/connectca/server_test.go @@ -0,0 +1,52 @@ +package connectca + +import ( + "context" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/proto-public/pbconnectca" +) + +func testStateStore(t *testing.T) *state.Store { + t.Helper() + + gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) + require.NoError(t, err) + + return state.NewStateStoreWithEventPublisher(gc) +} + +func testClient(t *testing.T, server *Server) pbconnectca.ConnectCAServiceClient { + t.Helper() + + addr := runTestServer(t, server) + + conn, err := grpc.DialContext(context.Background(), addr.String(), grpc.WithInsecure()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn.Close()) + }) + + return pbconnectca.NewConnectCAServiceClient(conn) +} + +func runTestServer(t *testing.T, server *Server) net.Addr { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + grpcServer := grpc.NewServer() + server.Register(grpcServer) + + go grpcServer.Serve(lis) + t.Cleanup(grpcServer.Stop) + + return lis.Addr() +} diff --git a/agent/grpc/public/services/connectca/watch_roots.go b/agent/grpc/public/services/connectca/watch_roots.go new file mode 100644 index 0000000000..eeaf2d8c8c --- /dev/null +++ b/agent/grpc/public/services/connectca/watch_roots.go @@ -0,0 +1,202 @@ +package connectca + +import ( + "context" + "errors" + "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-uuid" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/grpc/public" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto-public/pbconnectca" +) + +// WatchRoots provides a stream on which you can receive the list of active +// Connect CA roots. Current roots are sent immediately at the start of the +// stream, and new lists will be sent whenever the roots are rotated. +func (s *Server) WatchRoots(_ *emptypb.Empty, serverStream pbconnectca.ConnectCAService_WatchRootsServer) error { + logger := s.Logger.Named("watch-roots").With("stream_id", streamID()) + + logger.Trace("starting stream") + defer logger.Trace("stream closed") + + token := public.TokenFromContext(serverStream.Context()) + + // Serve the roots from an EventPublisher subscription. If the subscription is + // closed due to an ACL change, we'll attempt to re-authorize and resume it to + // prevent unnecessarily terminating the stream. + var idx uint64 + for { + var err error + idx, err = s.serveRoots(token, idx, serverStream, logger) + if errors.Is(err, stream.ErrSubForceClosed) { + logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") + } else { + return err + } + } +} + +func (s *Server) serveRoots( + token string, + idx uint64, + serverStream pbconnectca.ConnectCAService_WatchRootsServer, + logger hclog.Logger, +) (uint64, error) { + if err := s.authorize(token); err != nil { + return 0, err + } + + store := s.GetStore() + + // Read the TrustDomain up front - we do not allow users to change the ClusterID + // so reading it once at the beginning of the stream is sufficient. + trustDomain, err := getTrustDomain(store, logger) + if err != nil { + return 0, err + } + + // Start the subscription. + sub, err := store.EventPublisher().Subscribe(&stream.SubscribeRequest{ + Topic: state.EventTopicCARoots, + Subject: stream.SubjectNone, + Token: token, + Index: idx, + }) + if err != nil { + logger.Error("failed to subscribe to CA Roots events", "error", err) + return 0, status.Error(codes.Internal, "failed to subscribe to CA Roots events") + } + defer sub.Unsubscribe() + + for { + event, err := sub.Next(serverStream.Context()) + switch { + case errors.Is(err, stream.ErrSubForceClosed): + // If the subscription was closed because the state store was abandoned (e.g. + // following a snapshot restore) reset idx to ensure we don't skip over the + // new store's events. + select { + case <-store.AbandonCh(): + idx = 0 + default: + } + return idx, err + case errors.Is(err, context.Canceled): + return 0, nil + case err != nil: + logger.Error("failed to read next event", "error", err) + return idx, status.Error(codes.Internal, err.Error()) + } + + // Note: this check isn't strictly necessary because the event publishing + // machinery will ensure the index increases monotonically, but it can be + // tricky to faithfully reproduce this in tests (e.g. the EventPublisher + // garbage collects topic buffers and snapshots aggressively when streams + // disconnect) so this avoids a bunch of confusing setup code. + if event.Index <= idx { + continue + } + + idx = event.Index + + // We do not send framing events (e.g. EndOfSnapshot, NewSnapshotToFollow) + // because we send a full list of roots on every event, rather than expecting + // clients to maintain a state-machine in the way they do for service health. + if event.IsFramingEvent() { + continue + } + + rsp, err := eventToResponse(event, trustDomain) + if err != nil { + logger.Error("failed to convert event to response", "error", err) + return idx, status.Error(codes.Internal, err.Error()) + } + if err := serverStream.Send(rsp); err != nil { + logger.Error("failed to send response", "error", err) + return idx, err + } + } +} + +func eventToResponse(event stream.Event, trustDomain string) (*pbconnectca.WatchRootsResponse, error) { + payload, ok := event.Payload.(state.EventPayloadCARoots) + if !ok { + return nil, fmt.Errorf("unexpected event payload type: %T", payload) + } + + var active string + roots := make([]*pbconnectca.CARoot, 0) + + for _, root := range payload.CARoots { + if root.Active { + active = root.ID + } + + roots = append(roots, &pbconnectca.CARoot{ + Id: root.ID, + Name: root.Name, + SerialNumber: root.SerialNumber, + SigningKeyId: root.SigningKeyID, + RootCert: root.RootCert, + IntermediateCerts: root.IntermediateCerts, + Active: root.Active, + RotatedOutAt: timestamppb.New(root.RotatedOutAt), + }) + } + + return &pbconnectca.WatchRootsResponse{ + TrustDomain: trustDomain, + ActiveRootId: active, + Roots: roots, + }, nil +} + +func (s *Server) authorize(token string) error { + // Require the given ACL token to have `service:write` on any service (in any + // partition and namespace). + var authzContext acl.AuthorizerContext + entMeta := structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier) + authz, err := s.ACLResolver.ResolveTokenAndDefaultMeta(token, entMeta, &authzContext) + if err != nil { + return status.Error(codes.Unauthenticated, err.Error()) + } + if err := authz.ToAllowAuthorizer().ServiceWriteAnyAllowed(&authzContext); err != nil { + return status.Error(codes.PermissionDenied, err.Error()) + } + return nil +} + +// We tag logs with a unique identifier to ease debugging. In the future this +// should probably be an Open Telemetry trace ID. +func streamID() string { + id, err := uuid.GenerateUUID() + if err != nil { + return "" + } + return id +} + +func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) { + _, cfg, err := store.CAConfig(nil) + switch { + case err != nil: + logger.Error("failed to read Connect CA Config", "error", err) + return "", status.Error(codes.Internal, "failed to read Connect CA Config") + case cfg == nil: + logger.Warn("cannot begin stream because Connect CA is not yet initialized") + return "", status.Error(codes.FailedPrecondition, "Connect CA is not yet initialized") + } + return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil +} diff --git a/agent/grpc/public/services/connectca/watch_roots_test.go b/agent/grpc/public/services/connectca/watch_roots_test.go new file mode 100644 index 0000000000..efd022d902 --- /dev/null +++ b/agent/grpc/public/services/connectca/watch_roots_test.go @@ -0,0 +1,280 @@ +package connectca + +import ( + "context" + "errors" + "io" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-uuid" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/grpc/public" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto-public/pbconnectca" +) + +const testACLToken = "acl-token" + +func TestWatchRoots_Success(t *testing.T) { + store := testStateStore(t) + + // Set the initial roots and CA configuration. + rootA := connect.TestCA(t, nil) + _, err := store.CARootSetCAS(1, 0, structs.CARoots{rootA}) + require.NoError(t, err) + + err = store.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-id"}) + require.NoError(t, err) + + // Mock the ACL Resolver to return an authorizer with `service:write`. + aclResolver := &MockACLResolver{} + aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). + Return(testAuthorizer(t), nil) + + ctx := public.ContextWithToken(context.Background(), testACLToken) + + server := NewServer(Config{ + GetStore: func() StateStore { return store }, + Logger: hclog.NewNullLogger(), + ACLResolver: aclResolver, + }) + + // Begin the stream. + client := testClient(t, server) + stream, err := client.WatchRoots(ctx, &emptypb.Empty{}) + require.NoError(t, err) + rspCh := handleRootsStream(t, stream) + + // Expect an initial message containing current roots (provided by the snapshot). + roots := mustGetRoots(t, rspCh) + require.Equal(t, "cluster-id.consul", roots.TrustDomain) + require.Equal(t, rootA.ID, roots.ActiveRootId) + require.Len(t, roots.Roots, 1) + require.Equal(t, rootA.ID, roots.Roots[0].Id) + + // Rotate the roots. + rootB := connect.TestCA(t, nil) + _, err = store.CARootSetCAS(2, 1, structs.CARoots{rootB}) + require.NoError(t, err) + + // Expect another event containing the new roots. + roots = mustGetRoots(t, rspCh) + require.Equal(t, "cluster-id.consul", roots.TrustDomain) + require.Equal(t, rootB.ID, roots.ActiveRootId) + require.Len(t, roots.Roots, 1) + require.Equal(t, rootB.ID, roots.Roots[0].Id) +} + +func TestWatchRoots_InvalidACLToken(t *testing.T) { + store := testStateStore(t) + + // Set the initial CA configuration. + err := store.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-id"}) + require.NoError(t, err) + + // Mock the ACL resolver to return ErrNotFound. + aclResolver := &MockACLResolver{} + aclResolver.On("ResolveTokenAndDefaultMeta", mock.Anything, mock.Anything, mock.Anything). + Return(nil, acl.ErrNotFound) + + ctx := public.ContextWithToken(context.Background(), testACLToken) + + server := NewServer(Config{ + GetStore: func() StateStore { return store }, + Logger: hclog.NewNullLogger(), + ACLResolver: aclResolver, + }) + + // Start the stream. + client := testClient(t, server) + stream, err := client.WatchRoots(ctx, &emptypb.Empty{}) + require.NoError(t, err) + rspCh := handleRootsStream(t, stream) + + // Expect to get an Unauthenticated error immediately. + err = mustGetError(t, rspCh) + require.Equal(t, codes.Unauthenticated.String(), status.Code(err).String()) +} + +func TestWatchRoots_ACLTokenInvalidated(t *testing.T) { + store := testStateStore(t) + + // Set the initial roots and CA configuration. + rootA := connect.TestCA(t, nil) + _, err := store.CARootSetCAS(1, 0, structs.CARoots{rootA}) + require.NoError(t, err) + + err = store.CASetConfig(2, &structs.CAConfiguration{ClusterID: "cluster-id"}) + require.NoError(t, err) + + // Mock the ACL Resolver to return an authorizer with `service:write` the + // first two times it is called (initial connect and first re-auth). + aclResolver := &MockACLResolver{} + aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). + Return(testAuthorizer(t), nil).Twice() + + ctx := public.ContextWithToken(context.Background(), testACLToken) + + server := NewServer(Config{ + GetStore: func() StateStore { return store }, + Logger: hclog.NewNullLogger(), + ACLResolver: aclResolver, + }) + + // Start the stream. + client := testClient(t, server) + stream, err := client.WatchRoots(ctx, &emptypb.Empty{}) + require.NoError(t, err) + rspCh := handleRootsStream(t, stream) + + // Consume the initial response. + mustGetRoots(t, rspCh) + + // Update the ACL token to cause the subscription to be force-closed. + accessorID, err := uuid.GenerateUUID() + require.NoError(t, err) + err = store.ACLTokenSet(1, &structs.ACLToken{ + AccessorID: accessorID, + SecretID: testACLToken, + }) + require.NoError(t, err) + + // Update the roots. + rootB := connect.TestCA(t, nil) + _, err = store.CARootSetCAS(3, 1, structs.CARoots{rootB}) + require.NoError(t, err) + + // Expect the stream to remain open and to receive the new roots. + mustGetRoots(t, rspCh) + + // Simulate removing the `service:write` permission. + aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). + Return(acl.DenyAll(), nil) + + // Update the ACL token to cause the subscription to be force-closed. + err = store.ACLTokenSet(1, &structs.ACLToken{ + AccessorID: accessorID, + SecretID: testACLToken, + }) + require.NoError(t, err) + + // Expect the stream to be terminated. + err = mustGetError(t, rspCh) + require.Equal(t, codes.PermissionDenied.String(), status.Code(err).String()) +} + +func TestWatchRoots_StateStoreAbandoned(t *testing.T) { + storeA := testStateStore(t) + + // Set the initial roots and CA configuration. + rootA := connect.TestCA(t, nil) + _, err := storeA.CARootSetCAS(1, 0, structs.CARoots{rootA}) + require.NoError(t, err) + + err = storeA.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-a"}) + require.NoError(t, err) + + // Mock the ACL Resolver to return an authorizer with `service:write`. + aclResolver := &MockACLResolver{} + aclResolver.On("ResolveTokenAndDefaultMeta", testACLToken, mock.Anything, mock.Anything). + Return(testAuthorizer(t), nil) + + ctx := public.ContextWithToken(context.Background(), testACLToken) + + server := NewServer(Config{ + GetStore: func() StateStore { return storeA }, + Logger: hclog.NewNullLogger(), + ACLResolver: aclResolver, + }) + + // Begin the stream. + client := testClient(t, server) + stream, err := client.WatchRoots(ctx, &emptypb.Empty{}) + require.NoError(t, err) + rspCh := handleRootsStream(t, stream) + + // Consume the initial roots. + mustGetRoots(t, rspCh) + + // Simulate a snapshot restore. + storeB := testStateStore(t) + + rootB := connect.TestCA(t, nil) + _, err = storeB.CARootSetCAS(1, 0, structs.CARoots{rootB}) + require.NoError(t, err) + + err = storeB.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-b"}) + require.NoError(t, err) + + server.GetStore = func() StateStore { return storeB } + + storeA.Abandon() + + // Expect to get the new store's roots. + newRoots := mustGetRoots(t, rspCh) + require.Equal(t, "cluster-b.consul", newRoots.TrustDomain) + require.Len(t, newRoots.Roots, 1) + require.Equal(t, rootB.ID, newRoots.ActiveRootId) +} + +func mustGetRoots(t *testing.T, ch <-chan rootsOrError) *pbconnectca.WatchRootsResponse { + t.Helper() + + select { + case rsp := <-ch: + require.NoError(t, rsp.err) + return rsp.rsp + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for WatchRootsResponse") + return nil + } +} + +func mustGetError(t *testing.T, ch <-chan rootsOrError) error { + t.Helper() + + select { + case rsp := <-ch: + require.Error(t, rsp.err) + return rsp.err + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for WatchRootsResponse") + return nil + } +} + +func handleRootsStream(t *testing.T, stream pbconnectca.ConnectCAService_WatchRootsClient) <-chan rootsOrError { + t.Helper() + + rspCh := make(chan rootsOrError) + go func() { + for { + rsp, err := stream.Recv() + if errors.Is(err, io.EOF) || + errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) { + return + } + rspCh <- rootsOrError{ + rsp: rsp, + err: err, + } + } + }() + return rspCh +} + +type rootsOrError struct { + rsp *pbconnectca.WatchRootsResponse + err error +} diff --git a/agent/grpc/public/token.go b/agent/grpc/public/token.go new file mode 100644 index 0000000000..237317ee4c --- /dev/null +++ b/agent/grpc/public/token.go @@ -0,0 +1,28 @@ +package public + +import ( + "context" + + "google.golang.org/grpc/metadata" +) + +const metadataKeyToken = "x-consul-token" + +// TokenFromContext returns the ACL token in the gRPC metadata attached to the +// given context. +func TokenFromContext(ctx context.Context) string { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "" + } + toks, ok := md[metadataKeyToken] + if ok && len(toks) > 0 { + return toks[0] + } + return "" +} + +// ContextWithToken returns a context with the given ACL token attached. +func ContextWithToken(ctx context.Context, token string) context.Context { + return metadata.AppendToOutgoingContext(ctx, metadataKeyToken, token) +} diff --git a/agent/submatview/store_integration_test.go b/agent/submatview/store_integration_test.go index b6e6295438..69dab7cfcd 100644 --- a/agent/submatview/store_integration_test.go +++ b/agent/submatview/store_integration_test.go @@ -37,9 +37,9 @@ func TestStore_IntegrationWithBackend(t *testing.T) { var maxIndex uint64 = 200 count := &counter{latest: 3} producers := map[string]*eventProducer{ - "srv1": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv1", count, maxIndex), - "srv2": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv2", count, maxIndex), - "srv3": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv3", count, maxIndex), + state.EventSubjectService{Key: "srv1"}.String(): newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv1", count, maxIndex), + state.EventSubjectService{Key: "srv2"}.String(): newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv2", count, maxIndex), + state.EventSubjectService{Key: "srv3"}.String(): newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv3", count, maxIndex), } sh := snapshotHandler{producers: producers} @@ -88,7 +88,7 @@ func TestStore_IntegrationWithBackend(t *testing.T) { t.Run(fmt.Sprintf("consumer %d", i), func(t *testing.T) { require.True(t, len(consumer.states) > 2, "expected more than %d events", len(consumer.states)) - expected := producers[consumer.srvName].nodesByIndex + expected := producers[state.EventSubjectService{Key: consumer.srvName}.String()].nodesByIndex for idx, nodes := range consumer.states { assertDeepEqual(t, idx, expected[idx], nodes) } @@ -348,7 +348,7 @@ type snapshotHandler struct { } func (s *snapshotHandler) Snapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { - producer := s.producers[req.Key] + producer := s.producers[req.Subject.String()] producer.nodesLock.Lock() defer producer.nodesLock.Unlock() diff --git a/agent/xds/server.go b/agent/xds/server.go index d385ac8638..88419547a6 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -13,10 +13,10 @@ import ( "github.com/hashicorp/go-hclog" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/grpc/public" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/xdscommon" @@ -189,18 +189,6 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error { return errors.New("not implemented") } -func tokenFromContext(ctx context.Context) string { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return "" - } - toks, ok := md["x-consul-token"] - if ok && len(toks) > 0 { - return toks[0] - } - return "" -} - // Register the XDS server handlers to the given gRPC server. func (s *Server) Register(srv *grpc.Server) { envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s) @@ -221,7 +209,7 @@ func (s *Server) authorize(ctx context.Context, cfgSnap *proxycfg.ConfigSnapshot return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") } - authz, err := s.ResolveToken(tokenFromContext(ctx)) + authz, err := s.ResolveToken(public.TokenFromContext(ctx)) if acl.IsErrNotFound(err) { return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) } else if acl.IsErrPermissionDenied(err) { diff --git a/proto-public/pbconnectca/ca.pb.binary.go b/proto-public/pbconnectca/ca.pb.binary.go new file mode 100644 index 0000000000..e373db9b50 --- /dev/null +++ b/proto-public/pbconnectca/ca.pb.binary.go @@ -0,0 +1,28 @@ +// Code generated by protoc-gen-go-binary. DO NOT EDIT. +// source: proto-public/pbconnectca/ca.proto + +package pbconnectca + +import ( + "github.com/golang/protobuf/proto" +) + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *WatchRootsResponse) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *WatchRootsResponse) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *CARoot) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *CARoot) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} diff --git a/proto-public/pbconnectca/ca.pb.go b/proto-public/pbconnectca/ca.pb.go new file mode 100644 index 0000000000..bb966a4dec --- /dev/null +++ b/proto-public/pbconnectca/ca.pb.go @@ -0,0 +1,473 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.23.0 +// protoc v3.15.8 +// source: proto-public/pbconnectca/ca.proto + +package pbconnectca + +import ( + context "context" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type WatchRootsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // active_root_id is the ID of a root in Roots that is the active CA root. + // Other roots are still valid if they're in the Roots list but are in the + // process of being rotated out. + ActiveRootId string `protobuf:"bytes,1,opt,name=active_root_id,json=activeRootId,proto3" json:"active_root_id,omitempty"` + // trust_domain is the identification root for this Consul cluster. All + // certificates signed by the cluster's CA must have their identifying URI + // in this domain. + // + // This does not include the protocol (currently spiffe://) since we may + // implement other protocols in future with equivalent semantics. It should + // be compared against the "authority" section of a URI (i.e. host:port). + TrustDomain string `protobuf:"bytes,2,opt,name=trust_domain,json=trustDomain,proto3" json:"trust_domain,omitempty"` + // roots is a list of root CA certs to trust. + Roots []*CARoot `protobuf:"bytes,3,rep,name=roots,proto3" json:"roots,omitempty"` +} + +func (x *WatchRootsResponse) Reset() { + *x = WatchRootsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_public_pbconnectca_ca_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WatchRootsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WatchRootsResponse) ProtoMessage() {} + +func (x *WatchRootsResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_public_pbconnectca_ca_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WatchRootsResponse.ProtoReflect.Descriptor instead. +func (*WatchRootsResponse) Descriptor() ([]byte, []int) { + return file_proto_public_pbconnectca_ca_proto_rawDescGZIP(), []int{0} +} + +func (x *WatchRootsResponse) GetActiveRootId() string { + if x != nil { + return x.ActiveRootId + } + return "" +} + +func (x *WatchRootsResponse) GetTrustDomain() string { + if x != nil { + return x.TrustDomain + } + return "" +} + +func (x *WatchRootsResponse) GetRoots() []*CARoot { + if x != nil { + return x.Roots + } + return nil +} + +type CARoot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // id is a globally unique ID (UUID) representing this CA root. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // name is a human-friendly name for this CA root. This value is opaque to + // Consul and is not used for anything internally. + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + // serial_number is the x509 serial number of the certificate. + SerialNumber uint64 `protobuf:"varint,3,opt,name=serial_number,json=serialNumber,proto3" json:"serial_number,omitempty"` + // signing_key_id is the connect.HexString encoded id of the public key that + // corresponds to the private key used to sign leaf certificates in the + // local datacenter. + // + // The value comes from x509.Certificate.SubjectKeyId of the local leaf + // signing cert. + // + // See https://www.rfc-editor.org/rfc/rfc3280#section-4.2.1.1 for more detail. + SigningKeyId string `protobuf:"bytes,4,opt,name=signing_key_id,json=signingKeyId,proto3" json:"signing_key_id,omitempty"` + // root_cert is the PEM-encoded public certificate. + RootCert string `protobuf:"bytes,5,opt,name=root_cert,json=rootCert,proto3" json:"root_cert,omitempty"` + // intermediate_certs is a list of PEM-encoded intermediate certs to + // attach to any leaf certs signed by this CA. + IntermediateCerts []string `protobuf:"bytes,6,rep,name=intermediate_certs,json=intermediateCerts,proto3" json:"intermediate_certs,omitempty"` + // active is true if this is the current active CA. This must only + // be true for exactly one CA. + Active bool `protobuf:"varint,7,opt,name=active,proto3" json:"active,omitempty"` + // rotated_out_at is the time at which this CA was removed from the state. + // This will only be set on roots that have been rotated out from being the + // active root. + RotatedOutAt *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=rotated_out_at,json=rotatedOutAt,proto3" json:"rotated_out_at,omitempty"` +} + +func (x *CARoot) Reset() { + *x = CARoot{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_public_pbconnectca_ca_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CARoot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CARoot) ProtoMessage() {} + +func (x *CARoot) ProtoReflect() protoreflect.Message { + mi := &file_proto_public_pbconnectca_ca_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CARoot.ProtoReflect.Descriptor instead. +func (*CARoot) Descriptor() ([]byte, []int) { + return file_proto_public_pbconnectca_ca_proto_rawDescGZIP(), []int{1} +} + +func (x *CARoot) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *CARoot) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *CARoot) GetSerialNumber() uint64 { + if x != nil { + return x.SerialNumber + } + return 0 +} + +func (x *CARoot) GetSigningKeyId() string { + if x != nil { + return x.SigningKeyId + } + return "" +} + +func (x *CARoot) GetRootCert() string { + if x != nil { + return x.RootCert + } + return "" +} + +func (x *CARoot) GetIntermediateCerts() []string { + if x != nil { + return x.IntermediateCerts + } + return nil +} + +func (x *CARoot) GetActive() bool { + if x != nil { + return x.Active + } + return false +} + +func (x *CARoot) GetRotatedOutAt() *timestamppb.Timestamp { + if x != nil { + return x.RotatedOutAt + } + return nil +} + +var File_proto_public_pbconnectca_ca_proto protoreflect.FileDescriptor + +var file_proto_public_pbconnectca_ca_proto_rawDesc = []byte{ + 0x0a, 0x21, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x2f, 0x70, + 0x62, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x2f, 0x63, 0x61, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x1a, 0x1b, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, + 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x86, 0x01, 0x0a, + 0x12, 0x57, 0x61, 0x74, 0x63, 0x68, 0x52, 0x6f, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x72, 0x6f, + 0x6f, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x63, 0x74, + 0x69, 0x76, 0x65, 0x52, 0x6f, 0x6f, 0x74, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x72, 0x75, + 0x73, 0x74, 0x5f, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0b, 0x74, 0x72, 0x75, 0x73, 0x74, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x12, 0x27, 0x0a, 0x05, + 0x72, 0x6f, 0x6f, 0x74, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x63, 0x6f, + 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x2e, 0x43, 0x41, 0x52, 0x6f, 0x6f, 0x74, 0x52, 0x05, + 0x72, 0x6f, 0x6f, 0x74, 0x73, 0x22, 0x9d, 0x02, 0x0a, 0x06, 0x43, 0x41, 0x52, 0x6f, 0x6f, 0x74, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x5f, 0x6e, + 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x73, 0x65, 0x72, + 0x69, 0x61, 0x6c, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x69, 0x67, + 0x6e, 0x69, 0x6e, 0x67, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0c, 0x73, 0x69, 0x67, 0x6e, 0x69, 0x6e, 0x67, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x12, + 0x1b, 0x0a, 0x09, 0x72, 0x6f, 0x6f, 0x74, 0x5f, 0x63, 0x65, 0x72, 0x74, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x6f, 0x74, 0x43, 0x65, 0x72, 0x74, 0x12, 0x2d, 0x0a, 0x12, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x5f, 0x63, 0x65, 0x72, + 0x74, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x11, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6d, + 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x43, 0x65, 0x72, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, + 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x61, 0x63, 0x74, + 0x69, 0x76, 0x65, 0x12, 0x40, 0x0a, 0x0e, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x6f, + 0x75, 0x74, 0x5f, 0x61, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0c, 0x72, 0x6f, 0x74, 0x61, 0x74, 0x65, 0x64, + 0x4f, 0x75, 0x74, 0x41, 0x74, 0x32, 0x5b, 0x0a, 0x10, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x43, 0x41, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x47, 0x0a, 0x0a, 0x57, 0x61, 0x74, + 0x63, 0x68, 0x52, 0x6f, 0x6f, 0x74, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, + 0x1d, 0x2e, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x2e, 0x57, 0x61, 0x74, 0x63, + 0x68, 0x52, 0x6f, 0x6f, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x30, 0x01, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, + 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2d, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x2f, 0x70, + 0x62, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x63, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_proto_public_pbconnectca_ca_proto_rawDescOnce sync.Once + file_proto_public_pbconnectca_ca_proto_rawDescData = file_proto_public_pbconnectca_ca_proto_rawDesc +) + +func file_proto_public_pbconnectca_ca_proto_rawDescGZIP() []byte { + file_proto_public_pbconnectca_ca_proto_rawDescOnce.Do(func() { + file_proto_public_pbconnectca_ca_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_public_pbconnectca_ca_proto_rawDescData) + }) + return file_proto_public_pbconnectca_ca_proto_rawDescData +} + +var file_proto_public_pbconnectca_ca_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_public_pbconnectca_ca_proto_goTypes = []interface{}{ + (*WatchRootsResponse)(nil), // 0: connectca.WatchRootsResponse + (*CARoot)(nil), // 1: connectca.CARoot + (*timestamppb.Timestamp)(nil), // 2: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 3: google.protobuf.Empty +} +var file_proto_public_pbconnectca_ca_proto_depIdxs = []int32{ + 1, // 0: connectca.WatchRootsResponse.roots:type_name -> connectca.CARoot + 2, // 1: connectca.CARoot.rotated_out_at:type_name -> google.protobuf.Timestamp + 3, // 2: connectca.ConnectCAService.WatchRoots:input_type -> google.protobuf.Empty + 0, // 3: connectca.ConnectCAService.WatchRoots:output_type -> connectca.WatchRootsResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_proto_public_pbconnectca_ca_proto_init() } +func file_proto_public_pbconnectca_ca_proto_init() { + if File_proto_public_pbconnectca_ca_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_public_pbconnectca_ca_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WatchRootsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_public_pbconnectca_ca_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CARoot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_public_pbconnectca_ca_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_public_pbconnectca_ca_proto_goTypes, + DependencyIndexes: file_proto_public_pbconnectca_ca_proto_depIdxs, + MessageInfos: file_proto_public_pbconnectca_ca_proto_msgTypes, + }.Build() + File_proto_public_pbconnectca_ca_proto = out.File + file_proto_public_pbconnectca_ca_proto_rawDesc = nil + file_proto_public_pbconnectca_ca_proto_goTypes = nil + file_proto_public_pbconnectca_ca_proto_depIdxs = nil +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConnInterface + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion6 + +// ConnectCAServiceClient is the client API for ConnectCAService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ConnectCAServiceClient interface { + // WatchRoots provides a stream on which you can receive the list of active + // Connect CA roots. Current roots are sent immediately at the start of the + // stream, and new lists will be sent whenever the roots are rotated. + WatchRoots(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (ConnectCAService_WatchRootsClient, error) +} + +type connectCAServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewConnectCAServiceClient(cc grpc.ClientConnInterface) ConnectCAServiceClient { + return &connectCAServiceClient{cc} +} + +func (c *connectCAServiceClient) WatchRoots(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (ConnectCAService_WatchRootsClient, error) { + stream, err := c.cc.NewStream(ctx, &_ConnectCAService_serviceDesc.Streams[0], "/connectca.ConnectCAService/WatchRoots", opts...) + if err != nil { + return nil, err + } + x := &connectCAServiceWatchRootsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ConnectCAService_WatchRootsClient interface { + Recv() (*WatchRootsResponse, error) + grpc.ClientStream +} + +type connectCAServiceWatchRootsClient struct { + grpc.ClientStream +} + +func (x *connectCAServiceWatchRootsClient) Recv() (*WatchRootsResponse, error) { + m := new(WatchRootsResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// ConnectCAServiceServer is the server API for ConnectCAService service. +type ConnectCAServiceServer interface { + // WatchRoots provides a stream on which you can receive the list of active + // Connect CA roots. Current roots are sent immediately at the start of the + // stream, and new lists will be sent whenever the roots are rotated. + WatchRoots(*emptypb.Empty, ConnectCAService_WatchRootsServer) error +} + +// UnimplementedConnectCAServiceServer can be embedded to have forward compatible implementations. +type UnimplementedConnectCAServiceServer struct { +} + +func (*UnimplementedConnectCAServiceServer) WatchRoots(*emptypb.Empty, ConnectCAService_WatchRootsServer) error { + return status.Errorf(codes.Unimplemented, "method WatchRoots not implemented") +} + +func RegisterConnectCAServiceServer(s *grpc.Server, srv ConnectCAServiceServer) { + s.RegisterService(&_ConnectCAService_serviceDesc, srv) +} + +func _ConnectCAService_WatchRoots_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ConnectCAServiceServer).WatchRoots(m, &connectCAServiceWatchRootsServer{stream}) +} + +type ConnectCAService_WatchRootsServer interface { + Send(*WatchRootsResponse) error + grpc.ServerStream +} + +type connectCAServiceWatchRootsServer struct { + grpc.ServerStream +} + +func (x *connectCAServiceWatchRootsServer) Send(m *WatchRootsResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _ConnectCAService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "connectca.ConnectCAService", + HandlerType: (*ConnectCAServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "WatchRoots", + Handler: _ConnectCAService_WatchRoots_Handler, + ServerStreams: true, + }, + }, + Metadata: "proto-public/pbconnectca/ca.proto", +} diff --git a/proto-public/pbconnectca/ca.proto b/proto-public/pbconnectca/ca.proto new file mode 100644 index 0000000000..fef15fbc1c --- /dev/null +++ b/proto-public/pbconnectca/ca.proto @@ -0,0 +1,72 @@ +syntax = "proto3"; + +package connectca; + +option go_package = "github.com/hashicorp/consul/proto-public/pbconnectca"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +service ConnectCAService { + // WatchRoots provides a stream on which you can receive the list of active + // Connect CA roots. Current roots are sent immediately at the start of the + // stream, and new lists will be sent whenever the roots are rotated. + rpc WatchRoots(google.protobuf.Empty) returns (stream WatchRootsResponse) {}; +} + +message WatchRootsResponse { + // active_root_id is the ID of a root in Roots that is the active CA root. + // Other roots are still valid if they're in the Roots list but are in the + // process of being rotated out. + string active_root_id = 1; + + // trust_domain is the identification root for this Consul cluster. All + // certificates signed by the cluster's CA must have their identifying URI + // in this domain. + // + // This does not include the protocol (currently spiffe://) since we may + // implement other protocols in future with equivalent semantics. It should + // be compared against the "authority" section of a URI (i.e. host:port). + string trust_domain = 2; + + // roots is a list of root CA certs to trust. + repeated CARoot roots = 3; +} + +message CARoot { + // id is a globally unique ID (UUID) representing this CA root. + string id = 1; + + // name is a human-friendly name for this CA root. This value is opaque to + // Consul and is not used for anything internally. + string name = 2; + + // serial_number is the x509 serial number of the certificate. + uint64 serial_number = 3; + + // signing_key_id is the connect.HexString encoded id of the public key that + // corresponds to the private key used to sign leaf certificates in the + // local datacenter. + // + // The value comes from x509.Certificate.SubjectKeyId of the local leaf + // signing cert. + // + // See https://www.rfc-editor.org/rfc/rfc3280#section-4.2.1.1 for more detail. + string signing_key_id = 4; + + // root_cert is the PEM-encoded public certificate. + string root_cert = 5; + + // intermediate_certs is a list of PEM-encoded intermediate certs to + // attach to any leaf certs signed by this CA. + repeated string intermediate_certs = 6; + + // active is true if this is the current active CA. This must only + // be true for exactly one CA. + bool active = 7; + + // rotated_out_at is the time at which this CA was removed from the state. + // This will only be set on roots that have been rotated out from being the + // active root. + google.protobuf.Timestamp rotated_out_at = 8; +}