peering: disable requirement for mesh gateways initially (#13213)

This commit is contained in:
R.B. Boyer 2022-05-25 10:13:23 -05:00 committed by GitHub
parent 0ed9ff8ef7
commit be631ebdce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 379 additions and 31 deletions

View File

@ -46,7 +46,12 @@ func pushServiceResponse(
return nil
}
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
var serviceName string
if strings.HasPrefix(update.CorrelationID, subExportedService) {
serviceName = strings.TrimPrefix(update.CorrelationID, subExportedService)
} else {
serviceName = strings.TrimPrefix(update.CorrelationID, subExportedProxyService) + syntheticProxyNameSuffix
}
// If no nodes are present then it's due to one of:
// 1. The service is newly registered or exported and yielded a transient empty update.

View File

@ -50,14 +50,19 @@ type Service struct {
Backend Backend
logger hclog.Logger
streams *streamTracker
// TODO(peering): remove this when we're ready
DisableMeshGatewayMode bool
}
func NewService(logger hclog.Logger, backend Backend) *Service {
return &Service{
srv := &Service{
Backend: backend,
logger: logger,
streams: newStreamTracker(),
}
srv.DisableMeshGatewayMode = true
return srv
}
var _ pbpeering.PeeringServiceServer = (*Service)(nil)
@ -517,6 +522,7 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
defer s.streams.disconnected(req.LocalID)
mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend)
mgr.DisableMeshGatewayMode = s.DisableMeshGatewayMode
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID, req.Partition)
sub := &pbpeering.ReplicationMessage{
@ -653,7 +659,8 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
case update := <-subCh:
switch {
case strings.HasPrefix(update.CorrelationID, subExportedService):
case strings.HasPrefix(update.CorrelationID, subExportedService),
strings.HasPrefix(update.CorrelationID, subExportedProxyService):
if err := pushServiceResponse(logger, req.Stream, status, update); err != nil {
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
}

View File

@ -617,6 +617,201 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
store: store,
pub: publisher,
})
srv.DisableMeshGatewayMode = false
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- err
}
}()
// Issue a services subscription to server
init := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
PeerID: peerID,
ResourceURL: pbpeering.TypeURLService,
},
},
}
require.NoError(t, client.Send(init))
// Receive a services subscription from server
receivedSub, err := client.Recv()
require.NoError(t, err)
expect := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: pbpeering.TypeURLService,
PeerID: remotePeerID,
},
},
}
prototest.AssertDeepEqual(t, expect, receivedSub)
// Register a service that is not yet exported
mysql := &structs.CheckServiceNode{
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"},
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000},
}
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
var (
mongoSN = structs.NewServiceName("mongo", nil).String()
mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String()
mysqlSN = structs.NewServiceName("mysql", nil).String()
mysqlProxySN = structs.NewServiceName("mysql-sidecar-proxy", nil).String()
)
testutil.RunStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "mysql",
Consumers: []structs.ServiceConsumer{
{PeerName: "my-peering"},
},
},
{
// Mongo does not get pushed because it does not have instances registered.
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{PeerName: "my-peering"},
},
},
},
}
lastIdx++
require.NoError(t, store.EnsureConfigEntry(lastIdx, entry))
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
)
})
mongo := &structs.CheckServiceNode{
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"},
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000},
}
testutil.RunStep(t, "registering mongo instance leads to an UPSERT event", func(t *testing.T) {
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service))
retry.Run(t, func(r *retry.R) {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
require.NoError(r, err)
require.Equal(r, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation)
require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(r, nodes.Nodes, 1)
})
})
testutil.RunStep(t, "un-exporting mysql leads to a DELETE event for mysql", func(t *testing.T) {
entry := &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "mongo",
Consumers: []structs.ServiceConsumer{
{
PeerName: "my-peering",
},
},
},
},
}
lastIdx++
err = store.EnsureConfigEntry(lastIdx, entry)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
require.NoError(r, err)
require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Equal(r, mysql.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
require.Nil(r, msg.GetResponse().Resource)
})
})
testutil.RunStep(t, "deleting the config entry leads to a DELETE event for mongo", func(t *testing.T) {
lastIdx++
err = store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", nil)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
require.NoError(r, err)
require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID)
require.Nil(r, msg.GetResponse().Resource)
})
})
}
func TestStreamResources_Server_ServiceUpdates_DisableMeshGatewayMode(t *testing.T) {
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
// Create a peering
var lastIdx uint64 = 1
p := writeInitiatedPeering(t, store, lastIdx, "my-peering")
var (
peerID = p.ID // for Send
remotePeerID = p.PeerID // for Recv
)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
pub: publisher,
})
srv.DisableMeshGatewayMode = true
client := NewMockClient(context.Background())

View File

@ -32,6 +32,9 @@ type subscriptionManager struct {
logger hclog.Logger
viewStore MaterializedViewStore
backend SubscriptionBackend
// TODO(peering): remove this when we're ready
DisableMeshGatewayMode bool
}
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
@ -60,7 +63,9 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, partition s
// Wrap our bare state store queries in goroutines that emit events.
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
if !m.DisableMeshGatewayMode {
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
}
// This goroutine is the only one allowed to manipulate protected
// subscriptionManager fields.
@ -107,7 +112,11 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
pending := &pendingPayload{}
m.syncNormalServices(ctx, state, pending, evt.Services)
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
if m.DisableMeshGatewayMode {
m.syncProxyServices(ctx, state, pending, evt.Services)
} else {
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
}
state.sendPendingEvents(ctx, m.logger, pending)
// cleanup event versions too
@ -124,19 +133,21 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
// Clear this raft index before exporting.
csn.Index = 0
// Ensure that connect things are scrubbed so we don't mix-and-match
// with the synthetic entries that point to mesh gateways.
filterConnectReferences(csn)
if !m.DisableMeshGatewayMode {
// Ensure that connect things are scrubbed so we don't mix-and-match
// with the synthetic entries that point to mesh gateways.
filterConnectReferences(csn)
// Flatten health checks
for _, instance := range csn.Nodes {
instance.Checks = flattenChecks(
instance.Node.Node,
instance.Service.ID,
instance.Service.Service,
instance.Service.EnterpriseMeta,
instance.Checks,
)
// Flatten health checks
for _, instance := range csn.Nodes {
instance.Checks = flattenChecks(
instance.Node.Node,
instance.Service.ID,
instance.Service.Service,
instance.Service.EnterpriseMeta,
instance.Checks,
)
}
}
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
@ -148,6 +159,39 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
}
state.sendPendingEvents(ctx, m.logger, pending)
case strings.HasPrefix(u.CorrelationID, subExportedProxyService):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
if !m.DisableMeshGatewayMode {
return nil // ignore event
}
// Clear this raft index before exporting.
csn.Index = 0
// // Flatten health checks
// for _, instance := range csn.Nodes {
// instance.Checks = flattenChecks(
// instance.Node.Node,
// instance.Service.ID,
// instance.Service.Service,
// instance.Service.EnterpriseMeta,
// instance.Checks,
// )
// }
id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService)
// Just ferry this one directly along to the destination.
pending := &pendingPayload{}
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
return err
}
state.sendPendingEvents(ctx, m.logger, pending)
case strings.HasPrefix(u.CorrelationID, subMeshGateway):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
@ -156,6 +200,10 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway)
if m.DisableMeshGatewayMode {
return nil // ignore event
}
if !acl.EqualPartitions(partition, state.partition) {
return nil // ignore event
}
@ -266,6 +314,57 @@ func (m *subscriptionManager) syncNormalServices(
}
}
// TODO(peering): remove
func (m *subscriptionManager) syncProxyServices(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
services []structs.ServiceName,
) {
// seen contains the set of exported service names and is used to reconcile the list of watched services.
seen := make(map[structs.ServiceName]struct{})
// Ensure there is a subscription for each service exported to the peer.
for _, svc := range services {
seen[svc] = struct{}{}
if _, ok := state.watchedProxyServices[svc]; ok {
// Exported service is already being watched, nothing to do.
continue
}
notifyCtx, cancel := context.WithCancel(ctx)
if err := m.NotifyConnectProxyService(notifyCtx, svc, state.updateCh); err != nil {
cancel()
m.logger.Error("failed to subscribe to proxy service", "service", svc.String())
continue
}
state.watchedProxyServices[svc] = cancel
}
// For every subscription without an exported service, call the associated cancel fn.
for svc, cancel := range state.watchedProxyServices {
if _, ok := seen[svc]; !ok {
cancel()
delete(state.watchedProxyServices, svc)
// Send an empty event to the stream handler to trigger sending a DELETE message.
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
err := pending.Add(
proxyServicePayloadIDPrefix+svc.String(),
subExportedProxyService+svc.String(),
&pbservice.IndexedCheckServiceNodes{},
)
if err != nil {
m.logger.Error("failed to send event for proxy service", "service", svc.String(), "error", err)
continue
}
}
}
}
func (m *subscriptionManager) syncDiscoveryChains(
ctx context.Context,
state *subscriptionState,
@ -422,9 +521,10 @@ func flattenChecks(
}
const (
subExportedServiceList = "exported-service-list"
subExportedService = "exported-service:"
subMeshGateway = "mesh-gateway:"
subExportedServiceList = "exported-service-list"
subExportedService = "exported-service:"
subExportedProxyService = "exported-proxy-service:"
subMeshGateway = "mesh-gateway:"
)
// NotifyStandardService will notify the given channel when there are updates
@ -437,6 +537,14 @@ func (m *subscriptionManager) NotifyStandardService(
sr := newExportedStandardServiceRequest(m.logger, svc, m.backend)
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
}
func (m *subscriptionManager) NotifyConnectProxyService(
ctx context.Context,
svc structs.ServiceName,
updateCh chan<- cache.UpdateEvent,
) error {
sr := newExportedConnectProxyServiceRequest(m.logger, svc, m.backend)
return m.viewStore.Notify(ctx, sr, subExportedProxyService+svc.String(), updateCh)
}
// syntheticProxyNameSuffix is the suffix to add to synthetic proxies we
// replicate to route traffic to an exported discovery chain through the mesh

View File

@ -23,8 +23,9 @@ type subscriptionState struct {
// plain data
exportList *structs.ExportedServiceList
watchedServices map[structs.ServiceName]context.CancelFunc
connectServices map[structs.ServiceName]struct{}
watchedServices map[structs.ServiceName]context.CancelFunc
watchedProxyServices map[structs.ServiceName]context.CancelFunc // TODO(peering): remove
connectServices map[structs.ServiceName]struct{}
// eventVersions is a duplicate event suppression system keyed by the "id"
// not the "correlationID"
@ -43,10 +44,11 @@ type subscriptionState struct {
func newSubscriptionState(partition string) *subscriptionState {
return &subscriptionState{
partition: partition,
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
connectServices: make(map[structs.ServiceName]struct{}),
eventVersions: make(map[string]string),
partition: partition,
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
watchedProxyServices: make(map[structs.ServiceName]context.CancelFunc),
connectServices: make(map[structs.ServiceName]struct{}),
eventVersions: make(map[string]string),
}
}
@ -95,6 +97,14 @@ func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) {
keep = true
}
case strings.HasPrefix(id, proxyServicePayloadIDPrefix):
name := strings.TrimPrefix(id, proxyServicePayloadIDPrefix)
sn := structs.ServiceNameFromString(name)
if _, ok := s.watchedProxyServices[sn]; ok {
keep = true
}
case strings.HasPrefix(id, discoveryChainPayloadIDPrefix):
name := strings.TrimPrefix(id, discoveryChainPayloadIDPrefix)
sn := structs.ServiceNameFromString(name)
@ -125,6 +135,7 @@ type pendingEvent struct {
const (
meshGatewayPayloadID = "mesh-gateway"
servicePayloadIDPrefix = "service:"
proxyServicePayloadIDPrefix = "proxy-service:" // TODO(peering): remove
discoveryChainPayloadIDPrefix = "chain:"
)

View File

@ -37,20 +37,42 @@ func newExportedStandardServiceRequest(logger hclog.Logger, svc structs.ServiceN
}
}
// TODO(peering): remove
func newExportedConnectProxyServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest {
req := structs.ServiceSpecificRequest{
ServiceName: svc.Name,
Connect: true,
EnterpriseMeta: svc.EnterpriseMeta,
}
return &exportedServiceRequest{
logger: logger,
req: req,
sub: sub,
}
}
// CacheInfo implements submatview.Request
func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo {
return e.req.CacheInfo()
}
func (e *exportedServiceRequest) getTopic() pbsubscribe.Topic {
if e.req.Connect {
return pbsubscribe.Topic_ServiceHealthConnect
}
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
return pbsubscribe.Topic_ServiceHealth
}
// NewMaterializer implements submatview.Request
func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) {
if e.req.Connect {
return nil, fmt.Errorf("connect views are not supported")
}
// TODO(peering): reinstate this
// if e.req.Connect {
// return nil, fmt.Errorf("connect views are not supported")
// }
reqFn := func(index uint64) *pbsubscribe.SubscribeRequest {
return &pbsubscribe.SubscribeRequest{
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
Topic: pbsubscribe.Topic_ServiceHealth,
Topic: e.getTopic(),
Key: e.req.ServiceName,
Token: e.req.Token,
Datacenter: e.req.Datacenter,