diff --git a/agent/rpc/peering/replication.go b/agent/rpc/peering/replication.go index e6ac639a34..14513b01bb 100644 --- a/agent/rpc/peering/replication.go +++ b/agent/rpc/peering/replication.go @@ -46,7 +46,12 @@ func pushServiceResponse( return nil } - serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) + var serviceName string + if strings.HasPrefix(update.CorrelationID, subExportedService) { + serviceName = strings.TrimPrefix(update.CorrelationID, subExportedService) + } else { + serviceName = strings.TrimPrefix(update.CorrelationID, subExportedProxyService) + syntheticProxyNameSuffix + } // If no nodes are present then it's due to one of: // 1. The service is newly registered or exported and yielded a transient empty update. diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 57343efb59..d38d80ed5f 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -50,14 +50,19 @@ type Service struct { Backend Backend logger hclog.Logger streams *streamTracker + + // TODO(peering): remove this when we're ready + DisableMeshGatewayMode bool } func NewService(logger hclog.Logger, backend Backend) *Service { - return &Service{ + srv := &Service{ Backend: backend, logger: logger, streams: newStreamTracker(), } + srv.DisableMeshGatewayMode = true + return srv } var _ pbpeering.PeeringServiceServer = (*Service)(nil) @@ -517,6 +522,7 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { defer s.streams.disconnected(req.LocalID) mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend) + mgr.DisableMeshGatewayMode = s.DisableMeshGatewayMode subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.Partition) sub := &pbpeering.ReplicationMessage{ @@ -653,7 +659,8 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { case update := <-subCh: switch { - case strings.HasPrefix(update.CorrelationID, subExportedService): + case strings.HasPrefix(update.CorrelationID, subExportedService), + strings.HasPrefix(update.CorrelationID, subExportedProxyService): if err := pushServiceResponse(logger, req.Stream, status, update); err != nil { return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) } diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 04c4c89a1c..0ebdadf6f1 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -617,6 +617,201 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) { store: store, pub: publisher, }) + srv.DisableMeshGatewayMode = false + + client := NewMockClient(context.Background()) + + errCh := make(chan error, 1) + client.ErrCh = errCh + + go func() { + // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). + // This matches gRPC's behavior when an error is returned by a server. + if err := srv.StreamResources(client.ReplicationStream); err != nil { + errCh <- err + } + }() + + // Issue a services subscription to server + init := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Request_{ + Request: &pbpeering.ReplicationMessage_Request{ + PeerID: peerID, + ResourceURL: pbpeering.TypeURLService, + }, + }, + } + require.NoError(t, client.Send(init)) + + // Receive a services subscription from server + receivedSub, err := client.Recv() + require.NoError(t, err) + + expect := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Request_{ + Request: &pbpeering.ReplicationMessage_Request{ + ResourceURL: pbpeering.TypeURLService, + PeerID: remotePeerID, + }, + }, + } + prototest.AssertDeepEqual(t, expect, receivedSub) + + // Register a service that is not yet exported + mysql := &structs.CheckServiceNode{ + Node: &structs.Node{Node: "foo", Address: "10.0.0.1"}, + Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000}, + } + + lastIdx++ + require.NoError(t, store.EnsureNode(lastIdx, mysql.Node)) + + lastIdx++ + require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) + + var ( + mongoSN = structs.NewServiceName("mongo", nil).String() + mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String() + mysqlSN = structs.NewServiceName("mysql", nil).String() + mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String() + ) + + testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) { + entry := &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "mysql", + Consumers: []structs.ServiceConsumer{ + {PeerName: "my-peering"}, + }, + }, + { + // Mongo does not get pushed because it does not have instances registered. + Name: "mongo", + Consumers: []structs.ServiceConsumer{ + {PeerName: "my-peering"}, + }, + }, + }, + } + lastIdx++ + require.NoError(t, store.EnsureConfigEntry(lastIdx, entry)) + + expectReplEvents(t, client, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mongoSN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Nil(t, msg.GetResponse().Resource) + }, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Nil(t, msg.GetResponse().Resource) + }, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mysqlSN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) + + var nodes pbservice.IndexedCheckServiceNodes + require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) + require.Len(t, nodes.Nodes, 1) + }, + func(t *testing.T, msg *pbpeering.ReplicationMessage) { + require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL) + require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID) + require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Nil(t, msg.GetResponse().Resource) + }, + ) + }) + + mongo := &structs.CheckServiceNode{ + Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, + Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000}, + } + + testutil.RunStep(t, "registering mongo instance leads to an UPSERT event", func(t *testing.T) { + lastIdx++ + require.NoError(t, store.EnsureNode(lastIdx, mongo.Node)) + + lastIdx++ + require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service)) + + retry.Run(t, func(r *retry.R) { + msg, err := client.RecvWithTimeout(100 * time.Millisecond) + require.NoError(r, err) + require.Equal(r, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) + require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) + + var nodes pbservice.IndexedCheckServiceNodes + require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) + require.Len(r, nodes.Nodes, 1) + }) + }) + + testutil.RunStep(t, "un-exporting mysql leads to a DELETE event for mysql", func(t *testing.T) { + entry := &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "mongo", + Consumers: []structs.ServiceConsumer{ + { + PeerName: "my-peering", + }, + }, + }, + }, + } + lastIdx++ + err = store.EnsureConfigEntry(lastIdx, entry) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + msg, err := client.RecvWithTimeout(100 * time.Millisecond) + require.NoError(r, err) + require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Equal(r, mysql.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) + require.Nil(r, msg.GetResponse().Resource) + }) + }) + + testutil.RunStep(t, "deleting the config entry leads to a DELETE event for mongo", func(t *testing.T) { + lastIdx++ + err = store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", nil) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + msg, err := client.RecvWithTimeout(100 * time.Millisecond) + require.NoError(r, err) + require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) + require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) + require.Nil(r, msg.GetResponse().Resource) + }) + }) +} + +func TestStreamResources_Server_ServiceUpdates_DisableMeshGatewayMode(t *testing.T) { + publisher := stream.NewEventPublisher(10 * time.Second) + store := newStateStore(t, publisher) + + // Create a peering + var lastIdx uint64 = 1 + p := writeInitiatedPeering(t, store, lastIdx, "my-peering") + var ( + peerID = p.ID // for Send + remotePeerID = p.PeerID // for Recv + ) + + srv := NewService(testutil.Logger(t), &testStreamBackend{ + store: store, + pub: publisher, + }) + srv.DisableMeshGatewayMode = true client := NewMockClient(context.Background()) diff --git a/agent/rpc/peering/subscription_manager.go b/agent/rpc/peering/subscription_manager.go index d198635fb9..928d32e549 100644 --- a/agent/rpc/peering/subscription_manager.go +++ b/agent/rpc/peering/subscription_manager.go @@ -32,6 +32,9 @@ type subscriptionManager struct { logger hclog.Logger viewStore MaterializedViewStore backend SubscriptionBackend + + // TODO(peering): remove this when we're ready + DisableMeshGatewayMode bool } // TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering. @@ -60,7 +63,9 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, partition s // Wrap our bare state store queries in goroutines that emit events. go m.notifyExportedServicesForPeerID(ctx, state, peerID) - go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) + if !m.DisableMeshGatewayMode { + go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) + } // This goroutine is the only one allowed to manipulate protected // subscriptionManager fields. @@ -107,7 +112,11 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti pending := &pendingPayload{} m.syncNormalServices(ctx, state, pending, evt.Services) - m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains()) + if m.DisableMeshGatewayMode { + m.syncProxyServices(ctx, state, pending, evt.Services) + } else { + m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains()) + } state.sendPendingEvents(ctx, m.logger, pending) // cleanup event versions too @@ -124,19 +133,21 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti // Clear this raft index before exporting. csn.Index = 0 - // Ensure that connect things are scrubbed so we don't mix-and-match - // with the synthetic entries that point to mesh gateways. - filterConnectReferences(csn) + if !m.DisableMeshGatewayMode { + // Ensure that connect things are scrubbed so we don't mix-and-match + // with the synthetic entries that point to mesh gateways. + filterConnectReferences(csn) - // Flatten health checks - for _, instance := range csn.Nodes { - instance.Checks = flattenChecks( - instance.Node.Node, - instance.Service.ID, - instance.Service.Service, - instance.Service.EnterpriseMeta, - instance.Checks, - ) + // Flatten health checks + for _, instance := range csn.Nodes { + instance.Checks = flattenChecks( + instance.Node.Node, + instance.Service.ID, + instance.Service.Service, + instance.Service.EnterpriseMeta, + instance.Checks, + ) + } } id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService) @@ -148,6 +159,39 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti } state.sendPendingEvents(ctx, m.logger, pending) + case strings.HasPrefix(u.CorrelationID, subExportedProxyService): + csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + + if !m.DisableMeshGatewayMode { + return nil // ignore event + } + + // Clear this raft index before exporting. + csn.Index = 0 + + // // Flatten health checks + // for _, instance := range csn.Nodes { + // instance.Checks = flattenChecks( + // instance.Node.Node, + // instance.Service.ID, + // instance.Service.Service, + // instance.Service.EnterpriseMeta, + // instance.Checks, + // ) + // } + + id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService) + + // Just ferry this one directly along to the destination. + pending := &pendingPayload{} + if err := pending.Add(id, u.CorrelationID, csn); err != nil { + return err + } + state.sendPendingEvents(ctx, m.logger, pending) + case strings.HasPrefix(u.CorrelationID, subMeshGateway): csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes) if !ok { @@ -156,6 +200,10 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway) + if m.DisableMeshGatewayMode { + return nil // ignore event + } + if !acl.EqualPartitions(partition, state.partition) { return nil // ignore event } @@ -266,6 +314,57 @@ func (m *subscriptionManager) syncNormalServices( } } +// TODO(peering): remove +func (m *subscriptionManager) syncProxyServices( + ctx context.Context, + state *subscriptionState, + pending *pendingPayload, + services []structs.ServiceName, +) { + // seen contains the set of exported service names and is used to reconcile the list of watched services. + seen := make(map[structs.ServiceName]struct{}) + + // Ensure there is a subscription for each service exported to the peer. + for _, svc := range services { + seen[svc] = struct{}{} + + if _, ok := state.watchedProxyServices[svc]; ok { + // Exported service is already being watched, nothing to do. + continue + } + + notifyCtx, cancel := context.WithCancel(ctx) + if err := m.NotifyConnectProxyService(notifyCtx, svc, state.updateCh); err != nil { + cancel() + m.logger.Error("failed to subscribe to proxy service", "service", svc.String()) + continue + } + + state.watchedProxyServices[svc] = cancel + } + + // For every subscription without an exported service, call the associated cancel fn. + for svc, cancel := range state.watchedProxyServices { + if _, ok := seen[svc]; !ok { + cancel() + + delete(state.watchedProxyServices, svc) + + // Send an empty event to the stream handler to trigger sending a DELETE message. + // Cancelling the subscription context above is necessary, but does not yield a useful signal on its own. + err := pending.Add( + proxyServicePayloadIDPrefix+svc.String(), + subExportedProxyService+svc.String(), + &pbservice.IndexedCheckServiceNodes{}, + ) + if err != nil { + m.logger.Error("failed to send event for proxy service", "service", svc.String(), "error", err) + continue + } + } + } +} + func (m *subscriptionManager) syncDiscoveryChains( ctx context.Context, state *subscriptionState, @@ -422,9 +521,10 @@ func flattenChecks( } const ( - subExportedServiceList = "exported-service-list" - subExportedService = "exported-service:" - subMeshGateway = "mesh-gateway:" + subExportedServiceList = "exported-service-list" + subExportedService = "exported-service:" + subExportedProxyService = "exported-proxy-service:" + subMeshGateway = "mesh-gateway:" ) // NotifyStandardService will notify the given channel when there are updates @@ -437,6 +537,14 @@ func (m *subscriptionManager) NotifyStandardService( sr := newExportedStandardServiceRequest(m.logger, svc, m.backend) return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh) } +func (m *subscriptionManager) NotifyConnectProxyService( + ctx context.Context, + svc structs.ServiceName, + updateCh chan<- cache.UpdateEvent, +) error { + sr := newExportedConnectProxyServiceRequest(m.logger, svc, m.backend) + return m.viewStore.Notify(ctx, sr, subExportedProxyService+svc.String(), updateCh) +} // syntheticProxyNameSuffix is the suffix to add to synthetic proxies we // replicate to route traffic to an exported discovery chain through the mesh diff --git a/agent/rpc/peering/subscription_state.go b/agent/rpc/peering/subscription_state.go index c17fbc4d6f..55f1fda00c 100644 --- a/agent/rpc/peering/subscription_state.go +++ b/agent/rpc/peering/subscription_state.go @@ -23,8 +23,9 @@ type subscriptionState struct { // plain data exportList *structs.ExportedServiceList - watchedServices map[structs.ServiceName]context.CancelFunc - connectServices map[structs.ServiceName]struct{} + watchedServices map[structs.ServiceName]context.CancelFunc + watchedProxyServices map[structs.ServiceName]context.CancelFunc // TODO(peering): remove + connectServices map[structs.ServiceName]struct{} // eventVersions is a duplicate event suppression system keyed by the "id" // not the "correlationID" @@ -43,10 +44,11 @@ type subscriptionState struct { func newSubscriptionState(partition string) *subscriptionState { return &subscriptionState{ - partition: partition, - watchedServices: make(map[structs.ServiceName]context.CancelFunc), - connectServices: make(map[structs.ServiceName]struct{}), - eventVersions: make(map[string]string), + partition: partition, + watchedServices: make(map[structs.ServiceName]context.CancelFunc), + watchedProxyServices: make(map[structs.ServiceName]context.CancelFunc), + connectServices: make(map[structs.ServiceName]struct{}), + eventVersions: make(map[string]string), } } @@ -95,6 +97,14 @@ func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) { keep = true } + case strings.HasPrefix(id, proxyServicePayloadIDPrefix): + name := strings.TrimPrefix(id, proxyServicePayloadIDPrefix) + sn := structs.ServiceNameFromString(name) + + if _, ok := s.watchedProxyServices[sn]; ok { + keep = true + } + case strings.HasPrefix(id, discoveryChainPayloadIDPrefix): name := strings.TrimPrefix(id, discoveryChainPayloadIDPrefix) sn := structs.ServiceNameFromString(name) @@ -125,6 +135,7 @@ type pendingEvent struct { const ( meshGatewayPayloadID = "mesh-gateway" servicePayloadIDPrefix = "service:" + proxyServicePayloadIDPrefix = "proxy-service:" // TODO(peering): remove discoveryChainPayloadIDPrefix = "chain:" ) diff --git a/agent/rpc/peering/subscription_view.go b/agent/rpc/peering/subscription_view.go index f009872a25..2cd2a6ab2d 100644 --- a/agent/rpc/peering/subscription_view.go +++ b/agent/rpc/peering/subscription_view.go @@ -37,20 +37,42 @@ func newExportedStandardServiceRequest(logger hclog.Logger, svc structs.ServiceN } } +// TODO(peering): remove +func newExportedConnectProxyServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest { + req := structs.ServiceSpecificRequest{ + ServiceName: svc.Name, + Connect: true, + EnterpriseMeta: svc.EnterpriseMeta, + } + return &exportedServiceRequest{ + logger: logger, + req: req, + sub: sub, + } +} + // CacheInfo implements submatview.Request func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo { return e.req.CacheInfo() } +func (e *exportedServiceRequest) getTopic() pbsubscribe.Topic { + if e.req.Connect { + return pbsubscribe.Topic_ServiceHealthConnect + } + // Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name. + return pbsubscribe.Topic_ServiceHealth +} + // NewMaterializer implements submatview.Request func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) { - if e.req.Connect { - return nil, fmt.Errorf("connect views are not supported") - } + // TODO(peering): reinstate this + // if e.req.Connect { + // return nil, fmt.Errorf("connect views are not supported") + // } reqFn := func(index uint64) *pbsubscribe.SubscribeRequest { return &pbsubscribe.SubscribeRequest{ - // Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name. - Topic: pbsubscribe.Topic_ServiceHealth, + Topic: e.getTopic(), Key: e.req.ServiceName, Token: e.req.Token, Datacenter: e.req.Datacenter,