peering: mesh gateways are required for cross-peer service mesh communication (#13410)

Require use of mesh gateways in order for service mesh data plane
traffic to flow between peers.

This also adds plumbing for envoy integration tests involving peers, and
one starter peering test.
This commit is contained in:
R.B. Boyer 2022-06-09 11:05:18 -05:00 committed by GitHub
parent 6bd2d760af
commit bba3eb8cdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 443 additions and 488 deletions

View File

@ -41,8 +41,10 @@ func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger {
return l.With(
"topic", req.Topic.String(),
"dc", req.Datacenter,
"peer", req.PeerName,
"key", req.Key,
"namespace", req.Namespace,
"partition", req.Partition,
"request_index", req.Index,
"stream_id", &streamID{})
}

View File

@ -44,12 +44,7 @@ func makeServiceResponse(
return nil
}
var serviceName string
if strings.HasPrefix(update.CorrelationID, subExportedService) {
serviceName = strings.TrimPrefix(update.CorrelationID, subExportedService)
} else {
serviceName = strings.TrimPrefix(update.CorrelationID, subExportedProxyService) + syntheticProxyNameSuffix
}
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
// If no nodes are present then it's due to one of:
// 1. The service is newly registered or exported and yielded a transient empty update.

View File

@ -48,8 +48,6 @@ func (e *errPeeringInvalidServerAddress) Error() string {
type Config struct {
Datacenter string
ConnectEnabled bool
// TODO(peering): remove this when we're ready
DisableMeshGatewayMode bool
}
// Service implements pbpeering.PeeringService to provide RPC operations for
@ -62,7 +60,6 @@ type Service struct {
}
func NewService(logger hclog.Logger, cfg Config, backend Backend) *Service {
cfg.DisableMeshGatewayMode = true
return &Service{
Backend: backend,
logger: logger,
@ -735,8 +732,7 @@ func (s *Service) HandleStream(req HandleStreamRequest) error {
case update := <-subCh:
var resp *pbpeering.ReplicationMessage
switch {
case strings.HasPrefix(update.CorrelationID, subExportedService),
strings.HasPrefix(update.CorrelationID, subExportedProxyService):
case strings.HasPrefix(update.CorrelationID, subExportedService):
resp = makeServiceResponse(logger, update)
case strings.HasPrefix(update.CorrelationID, subMeshGateway):

View File

@ -619,12 +619,6 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}
func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
testStreamResources_Server_ServiceUpdates(t, true)
}
func TestStreamResources_Server_ServiceUpdates_EnableMeshGateways(t *testing.T) {
testStreamResources_Server_ServiceUpdates(t, false)
}
func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways bool) {
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
@ -640,7 +634,6 @@ func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways
Config{
Datacenter: "dc1",
ConnectEnabled: true,
DisableMeshGatewayMode: disableMeshGateways,
}, &testStreamBackend{
store: store,
pub: publisher,
@ -659,15 +652,6 @@ func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "foo", &structs.NodeService{
ID: "mysql-sidecar-proxy",
Service: "mysql-sidecar-proxy",
Kind: structs.ServiceKindConnectProxy,
Port: 5000,
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "mysql"},
}))
var (
mongoSN = structs.NewServiceName("mongo", nil).String()
mongoProxySN = structs.NewServiceName("mongo-sidecar-proxy", nil).String()
@ -703,12 +687,14 @@ func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways
// Roots tested in TestStreamResources_Server_CARootUpdates
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
// no mongo instances exist
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoSN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
// proxies can't export because no mesh gateway exists yet
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
@ -723,6 +709,41 @@ func testStreamResources_Server_ServiceUpdates(t *testing.T, disableMeshGateways
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
// proxies can't export because no mesh gateway exists yet
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation)
require.Nil(t, msg.GetResponse().Resource)
},
)
})
testutil.RunStep(t, "register mesh gateway to send proxy updates", func(t *testing.T) {
gateway := &structs.CheckServiceNode{Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"},
Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443},
// TODO: checks
}
lastIdx++
require.NoError(t, store.EnsureNode(lastIdx, gateway.Node))
lastIdx++
require.NoError(t, store.EnsureService(lastIdx, "mgw", gateway.Service))
expectReplEvents(t, client,
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mongoProxySN, msg.GetResponse().ResourceID)
require.Equal(t, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation)
var nodes pbservice.IndexedCheckServiceNodes
require.NoError(t, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes))
require.Len(t, nodes.Nodes, 1)
svid := "spiffe://11111111-2222-3333-4444-555555555555.consul/ns/default/dc/dc1/svc/mongo"
require.Equal(t, []string{svid}, nodes.Nodes[0].Service.Connect.PeerMeta.SpiffeID)
},
func(t *testing.T, msg *pbpeering.ReplicationMessage) {
require.Equal(t, pbpeering.TypeURLService, msg.GetResponse().ResourceURL)
require.Equal(t, mysqlProxySN, msg.GetResponse().ResourceID)
@ -1196,9 +1217,11 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test
}
}
const timeout = 10 * time.Second
var out []*pbpeering.ReplicationMessage
for len(out) < num {
msg, err := client.RecvWithTimeout(100 * time.Millisecond)
msg, err := client.RecvWithTimeout(timeout)
if err == io.EOF && msg == nil {
t.Fatalf("timed out with %d of %d events", len(out), num)
}

View File

@ -75,12 +75,9 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p
// Wrap our bare state store queries in goroutines that emit events.
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
if !m.config.DisableMeshGatewayMode && m.config.ConnectEnabled {
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
}
// If connect is enabled, watch for updates to CA roots.
if m.config.ConnectEnabled {
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
// If connect is enabled, watch for updates to CA roots.
go m.notifyRootCAUpdates(ctx, state.updateCh)
}
@ -129,13 +126,9 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
pending := &pendingPayload{}
m.syncNormalServices(ctx, state, pending, evt.Services)
if m.config.DisableMeshGatewayMode {
m.syncProxyServices(ctx, state, pending, evt.Services)
} else {
if m.config.ConnectEnabled {
m.syncDiscoveryChains(ctx, state, pending, evt.ListAllDiscoveryChains())
}
}
state.sendPendingEvents(ctx, m.logger, pending)
// cleanup event versions too
@ -152,7 +145,6 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
// Clear this raft index before exporting.
csn.Index = 0
if !m.config.DisableMeshGatewayMode {
// Ensure that connect things are scrubbed so we don't mix-and-match
// with the synthetic entries that point to mesh gateways.
filterConnectReferences(csn)
@ -167,17 +159,11 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
instance.Checks,
)
}
}
// Scrub raft indexes
for _, instance := range csn.Nodes {
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
if m.config.DisableMeshGatewayMode {
for _, chk := range instance.Checks {
chk.RaftIndex = nil
}
}
// skip checks since we just generated one from scratch
}
@ -197,61 +183,6 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
}
state.sendPendingEvents(ctx, m.logger, pending)
case strings.HasPrefix(u.CorrelationID, subExportedProxyService):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
if !m.config.DisableMeshGatewayMode {
return nil // ignore event
}
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, subExportedProxyService))
spiffeID := connect.SpiffeIDService{
Host: m.trustDomain,
Partition: sn.PartitionOrDefault(),
Namespace: sn.NamespaceOrDefault(),
Datacenter: m.config.Datacenter,
Service: sn.Name,
}
sni := connect.PeeredServiceSNI(
sn.Name,
sn.NamespaceOrDefault(),
sn.PartitionOrDefault(),
state.peerName,
m.trustDomain,
)
peerMeta := &pbservice.PeeringServiceMeta{
SNI: []string{sni},
SpiffeID: []string{spiffeID.URI().String()},
Protocol: "tcp",
}
// skip checks since we just generated one from scratch
// Set peerMeta on all instances and scrub the raft indexes.
for _, instance := range csn.Nodes {
instance.Service.Connect.PeerMeta = peerMeta
instance.Node.RaftIndex = nil
instance.Service.RaftIndex = nil
if m.config.DisableMeshGatewayMode {
for _, chk := range instance.Checks {
chk.RaftIndex = nil
}
}
}
csn.Index = 0
id := proxyServicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedProxyService)
// Just ferry this one directly along to the destination.
pending := &pendingPayload{}
if err := pending.Add(id, u.CorrelationID, csn); err != nil {
return err
}
state.sendPendingEvents(ctx, m.logger, pending)
case strings.HasPrefix(u.CorrelationID, subMeshGateway):
csn, ok := u.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
@ -260,7 +191,7 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
partition := strings.TrimPrefix(u.CorrelationID, subMeshGateway)
if m.config.DisableMeshGatewayMode || !m.config.ConnectEnabled {
if !m.config.ConnectEnabled {
return nil // ignore event
}
@ -510,57 +441,6 @@ func (m *subscriptionManager) syncNormalServices(
}
}
// TODO(peering): remove
func (m *subscriptionManager) syncProxyServices(
ctx context.Context,
state *subscriptionState,
pending *pendingPayload,
services []structs.ServiceName,
) {
// seen contains the set of exported service names and is used to reconcile the list of watched services.
seen := make(map[structs.ServiceName]struct{})
// Ensure there is a subscription for each service exported to the peer.
for _, svc := range services {
seen[svc] = struct{}{}
if _, ok := state.watchedProxyServices[svc]; ok {
// Exported service is already being watched, nothing to do.
continue
}
notifyCtx, cancel := context.WithCancel(ctx)
if err := m.NotifyConnectProxyService(notifyCtx, svc, state.updateCh); err != nil {
cancel()
m.logger.Error("failed to subscribe to proxy service", "service", svc.String())
continue
}
state.watchedProxyServices[svc] = cancel
}
// For every subscription without an exported service, call the associated cancel fn.
for svc, cancel := range state.watchedProxyServices {
if _, ok := seen[svc]; !ok {
cancel()
delete(state.watchedProxyServices, svc)
// Send an empty event to the stream handler to trigger sending a DELETE message.
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
err := pending.Add(
proxyServicePayloadIDPrefix+svc.String(),
subExportedProxyService+svc.String(),
&pbservice.IndexedCheckServiceNodes{},
)
if err != nil {
m.logger.Error("failed to send event for proxy service", "service", svc.String(), "error", err)
continue
}
}
}
}
func (m *subscriptionManager) syncDiscoveryChains(
ctx context.Context,
state *subscriptionState,
@ -763,7 +643,6 @@ func flattenChecks(
const (
subExportedServiceList = "exported-service-list"
subExportedService = "exported-service:"
subExportedProxyService = "exported-proxy-service:"
subMeshGateway = "mesh-gateway:"
)
@ -777,14 +656,6 @@ func (m *subscriptionManager) NotifyStandardService(
sr := newExportedStandardServiceRequest(m.logger, svc, m.backend)
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
}
func (m *subscriptionManager) NotifyConnectProxyService(
ctx context.Context,
svc structs.ServiceName,
updateCh chan<- cache.UpdateEvent,
) error {
sr := newExportedConnectProxyServiceRequest(m.logger, svc, m.backend)
return m.viewStore.Notify(ctx, sr, subExportedProxyService+svc.String(), updateCh)
}
// syntheticProxyNameSuffix is the suffix to add to synthetic proxies we
// replicate to route traffic to an exported discovery chain through the mesh

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
@ -23,12 +22,6 @@ import (
)
func TestSubscriptionManager_RegisterDeregister(t *testing.T) {
testSubscriptionManager_RegisterDeregister(t, true)
}
func TestSubscriptionManager_RegisterDeregister_EnableMeshGateways(t *testing.T) {
testSubscriptionManager_RegisterDeregister(t, false)
}
func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateways bool) {
backend := newTestSubscriptionBackend(t)
// initialCatalogIdx := backend.lastIdx
@ -42,7 +35,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
Datacenter: "dc1",
ConnectEnabled: true,
DisableMeshGatewayMode: disableMeshGateways,
}, connect.TestTrustDomain, backend)
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
@ -52,18 +44,12 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
mysqlCorrID = subExportedService + structs.NewServiceName("mysql", nil).String()
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
mysqlProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mysql", nil).String()
)
if disableMeshGateways {
expectEvents(t, subCh)
} else {
// Expect just the empty mesh gateway event to replicate.
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 0)
})
}
testutil.RunStep(t, "initial export syncs empty instance lists", func(t *testing.T) {
backend.ensureConfigEntry(t, &structs.ExportedServicesConfigEntry{
@ -84,16 +70,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
},
})
if disableMeshGateways {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlProxyCorrID_temp, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlCorrID, 0)
},
)
} else {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlCorrID, 0)
@ -102,7 +78,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
checkEvent(t, got, mysqlProxyCorrID, 0)
},
)
}
})
mysql1 := &structs.CheckServiceNode{
@ -125,17 +100,10 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
require.Len(t, res.Nodes, 1)
if disableMeshGateways {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService_temp("", "mysql-1", "mysql", 5000, nil),
}, res.Nodes[0])
} else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService("", "mysql-1", "mysql", 5000, nil),
}, res.Nodes[0])
}
})
backend.ensureCheck(t, mysql1.Checks[0])
@ -148,15 +116,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
require.Len(t, res.Nodes, 1)
if disableMeshGateways {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService_temp("", "mysql-1", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil),
},
}, res.Nodes[0])
} else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService("", "mysql-1", "mysql", 5000, nil),
@ -164,7 +123,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
pbCheck("foo", "mysql-1", "mysql", "critical", nil),
},
}, res.Nodes[0])
}
})
})
@ -188,19 +146,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
require.Len(t, res.Nodes, 2)
if disableMeshGateways {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService_temp("", "mysql-2", "mysql", 5000, nil),
}, res.Nodes[0])
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService_temp("", "mysql-1", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil),
},
}, res.Nodes[1])
} else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService("", "mysql-2", "mysql", 5000, nil),
@ -212,7 +157,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
pbCheck("foo", "mysql-1", "mysql", "critical", nil),
},
}, res.Nodes[1])
}
})
backend.ensureCheck(t, mysql2.Checks[0])
@ -224,22 +168,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 2)
if disableMeshGateways {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService_temp("", "mysql-2", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck_temp("bar", "mysql-2", "mysql", "mysql-2-check", "critical", nil),
},
}, res.Nodes[0])
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("foo", "10.0.0.1", partition),
Service: pbService_temp("", "mysql-1", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck_temp("foo", "mysql-1", "mysql", "mysql-check", "critical", nil),
},
}, res.Nodes[1])
} else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService("", "mysql-2", "mysql", 5000, nil),
@ -254,7 +182,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
pbCheck("foo", "mysql-1", "mysql", "critical", nil),
},
}, res.Nodes[1])
}
})
})
@ -284,15 +211,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
require.Equal(t, uint64(0), res.Index)
require.Len(t, res.Nodes, 1)
if disableMeshGateways {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService_temp("", "mysql-2", "mysql", 5000, nil),
Checks: []*pbservice.HealthCheck{
pbCheck_temp("bar", "mysql-2", "mysql", "mysql-2-check", "critical", nil),
},
}, res.Nodes[0])
} else {
prototest.AssertDeepEqual(t, &pbservice.CheckServiceNode{
Node: pbNode("bar", "10.0.0.2", partition),
Service: pbService("", "mysql-2", "mysql", 5000, nil),
@ -300,15 +218,10 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
pbCheck("bar", "mysql-2", "mysql", "critical", nil),
},
}, res.Nodes[0])
}
})
})
testutil.RunStep(t, "register mesh gateway to send proxy updates", func(t *testing.T) {
if disableMeshGateways {
t.Skip()
return
}
gateway := &structs.CheckServiceNode{
Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"},
Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443},
@ -381,10 +294,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
})
testutil.RunStep(t, "deregister mesh gateway to send proxy removals", func(t *testing.T) {
if disableMeshGateways {
t.Skip()
return
}
backend.deleteService(t, "mgw", "gateway-1")
expectEvents(t, subCh,
@ -407,12 +316,6 @@ func testSubscriptionManager_RegisterDeregister(t *testing.T, disableMeshGateway
}
func TestSubscriptionManager_InitialSnapshot(t *testing.T) {
testSubscriptionManager_InitialSnapshot(t, true)
}
func TestSubscriptionManager_InitialSnapshot_EnableMeshGateways(t *testing.T) {
testSubscriptionManager_InitialSnapshot(t, false)
}
func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways bool) {
backend := newTestSubscriptionBackend(t)
// initialCatalogIdx := backend.lastIdx
@ -426,7 +329,6 @@ func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways b
mgr := newSubscriptionManager(ctx, testutil.Logger(t), Config{
Datacenter: "dc1",
ConnectEnabled: true,
DisableMeshGatewayMode: disableMeshGateways,
}, connect.TestTrustDomain, backend)
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
@ -455,20 +357,12 @@ func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways b
mysqlProxyCorrID = subExportedService + structs.NewServiceName("mysql-sidecar-proxy", nil).String()
mongoProxyCorrID = subExportedService + structs.NewServiceName("mongo-sidecar-proxy", nil).String()
chainProxyCorrID = subExportedService + structs.NewServiceName("chain-sidecar-proxy", nil).String()
mysqlProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mysql", nil).String()
mongoProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("mongo", nil).String()
chainProxyCorrID_temp = subExportedProxyService + structs.NewServiceName("chain", nil).String()
)
if disableMeshGateways {
expectEvents(t, subCh)
} else {
// Expect just the empty mesh gateway event to replicate.
expectEvents(t, subCh, func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, gatewayCorrID, 0)
})
}
// At this point in time we'll have a mesh-gateway notification with no
// content stored and handled.
@ -497,28 +391,6 @@ func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways b
},
})
if disableMeshGateways {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, chainProxyCorrID_temp, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mongoProxyCorrID_temp, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlProxyCorrID_temp, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, chainCorrID, 0)
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mongoCorrID, 1, "mongo", string(structs.ServiceKindTypical))
},
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, mysqlCorrID, 1, "mysql", string(structs.ServiceKindTypical))
},
)
} else {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
checkEvent(t, got, chainCorrID, 0)
@ -539,14 +411,9 @@ func testSubscriptionManager_InitialSnapshot(t *testing.T, disableMeshGateways b
checkEvent(t, got, mysqlProxyCorrID, 0)
},
)
}
})
testutil.RunStep(t, "registering a mesh gateway triggers connect replies", func(t *testing.T) {
if disableMeshGateways {
t.Skip()
return
}
gateway := &structs.CheckServiceNode{
Node: &structs.Node{Node: "mgw", Address: "10.1.1.1"},
Service: &structs.NodeService{ID: "gateway-1", Kind: structs.ServiceKindMeshGateway, Service: "gateway", Port: 8443},
@ -850,29 +717,6 @@ func pbService(kind, id, name string, port int32, entMeta *pbcommon.EnterpriseMe
}
}
func pbService_temp(kind, id, name string, port int32, entMeta *pbcommon.EnterpriseMeta) *pbservice.NodeService {
if entMeta == nil {
entMeta = pbcommon.DefaultEnterpriseMeta
}
return &pbservice.NodeService{
ID: id,
Kind: kind,
Service: name,
Port: port,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: entMeta,
Connect: &pbservice.ServiceConnect{},
Proxy: &pbservice.ConnectProxyConfig{
MeshGateway: &pbservice.MeshGatewayConfig{},
Expose: &pbservice.ExposeConfig{},
TransparentProxy: &pbservice.TransparentProxyConfig{},
},
}
}
func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMeta) *pbservice.HealthCheck {
if entMeta == nil {
entMeta = pbcommon.DefaultEnterpriseMeta
@ -887,23 +731,3 @@ func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMe
EnterpriseMeta: entMeta,
}
}
func pbCheck_temp(node, svcID, svcName, checkID, status string, entMeta *pbcommon.EnterpriseMeta) *pbservice.HealthCheck {
if entMeta == nil {
entMeta = pbcommon.DefaultEnterpriseMeta
}
return &pbservice.HealthCheck{
Node: node,
CheckID: checkID,
Status: status,
ServiceID: svcID,
ServiceName: svcName,
EnterpriseMeta: entMeta,
Definition: &pbservice.HealthCheckDefinition{
DeregisterCriticalServiceAfter: durationpb.New(0),
Interval: durationpb.New(0),
TTL: durationpb.New(0),
Timeout: durationpb.New(0),
},
}
}

View File

@ -26,7 +26,6 @@ type subscriptionState struct {
exportList *structs.ExportedServiceList
watchedServices map[structs.ServiceName]context.CancelFunc
watchedProxyServices map[structs.ServiceName]context.CancelFunc // TODO(peering): remove
connectServices map[structs.ServiceName]string // value:protocol
// eventVersions is a duplicate event suppression system keyed by the "id"
@ -49,7 +48,6 @@ func newSubscriptionState(peerName, partition string) *subscriptionState {
peerName: peerName,
partition: partition,
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
watchedProxyServices: make(map[structs.ServiceName]context.CancelFunc),
connectServices: make(map[structs.ServiceName]string),
eventVersions: make(map[string]string),
}
@ -103,14 +101,6 @@ func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) {
keep = true
}
case strings.HasPrefix(id, proxyServicePayloadIDPrefix):
name := strings.TrimPrefix(id, proxyServicePayloadIDPrefix)
sn := structs.ServiceNameFromString(name)
if _, ok := s.watchedProxyServices[sn]; ok {
keep = true
}
case strings.HasPrefix(id, discoveryChainPayloadIDPrefix):
name := strings.TrimPrefix(id, discoveryChainPayloadIDPrefix)
sn := structs.ServiceNameFromString(name)
@ -142,7 +132,6 @@ const (
caRootsPayloadID = "roots"
meshGatewayPayloadID = "mesh-gateway"
servicePayloadIDPrefix = "service:"
proxyServicePayloadIDPrefix = "proxy-service:" // TODO(peering): remove
discoveryChainPayloadIDPrefix = "chain:"
)

View File

@ -37,20 +37,6 @@ func newExportedStandardServiceRequest(logger hclog.Logger, svc structs.ServiceN
}
}
// TODO(peering): remove
func newExportedConnectProxyServiceRequest(logger hclog.Logger, svc structs.ServiceName, sub Subscriber) *exportedServiceRequest {
req := structs.ServiceSpecificRequest{
ServiceName: svc.Name,
Connect: true,
EnterpriseMeta: svc.EnterpriseMeta,
}
return &exportedServiceRequest{
logger: logger,
req: req,
sub: sub,
}
}
// CacheInfo implements submatview.Request
func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo {
return e.req.CacheInfo()

View File

@ -1,4 +1,4 @@
FROM alpine:latest
FROM alpine:3.12
RUN apk add --no-cache tcpdump
VOLUME [ "/data" ]

View File

@ -0,0 +1,2 @@
primary_datacenter = "alpha"
log_level = "trace"

View File

@ -0,0 +1,26 @@
config_entries {
bootstrap = [
{
kind = "proxy-defaults"
name = "global"
config {
protocol = "tcp"
}
},
{
kind = "exported-services"
name = "default"
services = [
{
name = "s2"
consumers = [
{
peer_name = "alpha-to-primary"
}
]
}
]
}
]
}

View File

@ -0,0 +1,5 @@
services {
name = "mesh-gateway"
kind = "mesh-gateway"
port = 4432
}

View File

@ -0,0 +1 @@
# We don't want an s1 service in this peer

View File

@ -0,0 +1,7 @@
services {
name = "s2"
port = 8181
connect {
sidecar_service {}
}
}

View File

@ -0,0 +1,11 @@
#!/bin/bash
set -euo pipefail
register_services alpha
gen_envoy_bootstrap s2 19002 alpha
gen_envoy_bootstrap mesh-gateway 19003 alpha true
wait_for_config_entry proxy-defaults global alpha
wait_for_config_entry exported-services default alpha

View File

@ -0,0 +1,31 @@
#!/usr/bin/env bats
load helpers
@test "s2 proxy is running correct version" {
assert_envoy_version 19002
}
@test "s2 proxy admin is up on :19002" {
retry_default curl -f -s localhost:19002/stats -o /dev/null
}
@test "gateway-alpha proxy admin is up on :19003" {
retry_default curl -f -s localhost:19003/stats -o /dev/null
}
@test "s2 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s2 alpha
}
@test "s2 proxy should be healthy" {
assert_service_has_healthy_instances s2 1 alpha
}
@test "gateway-alpha should be up and listening" {
retry_long nc -z consul-alpha:4432
}
@test "s2 proxies should be healthy" {
assert_service_has_healthy_instances s2 1 alpha
}

View File

@ -0,0 +1,2 @@
bind_addr = "0.0.0.0"
advertise_addr = "{{ GetInterfaceIP \"eth0\" }}"

View File

@ -0,0 +1,6 @@
#!/bin/bash
snapshot_envoy_admin localhost:19000 s1 primary || true
snapshot_envoy_admin localhost:19001 mesh-gateway primary || true
snapshot_envoy_admin localhost:19002 s2 alpha || true
snapshot_envoy_admin localhost:19003 mesh-gateway alpha || true

View File

@ -0,0 +1,12 @@
config_entries {
bootstrap = [
{
kind = "proxy-defaults"
name = "global"
config {
protocol = "tcp"
}
}
]
}

View File

@ -0,0 +1,5 @@
services {
name = "mesh-gateway"
kind = "mesh-gateway"
port = 4431
}

View File

@ -0,0 +1,17 @@
services {
name = "s1"
port = 8080
connect {
sidecar_service {
proxy {
upstreams = [
{
destination_name = "s2"
destination_peer = "primary-to-alpha"
local_bind_port = 5000
}
]
}
}
}
}

View File

@ -0,0 +1 @@
# We don't want an s2 service in the primary dc

View File

@ -0,0 +1,10 @@
#!/bin/bash
set -euo pipefail
register_services primary
gen_envoy_bootstrap s1 19000 primary
gen_envoy_bootstrap mesh-gateway 19001 primary true
wait_for_config_entry proxy-defaults global

View File

@ -0,0 +1,57 @@
#!/usr/bin/env bats
load helpers
@test "s1 proxy is running correct version" {
assert_envoy_version 19000
}
@test "s1 proxy admin is up on :19000" {
retry_default curl -f -s localhost:19000/stats -o /dev/null
}
@test "gateway-primary proxy admin is up on :19001" {
retry_default curl -f -s localhost:19001/stats -o /dev/null
}
@test "s1 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s1
}
@test "s2 proxies should be healthy in alpha" {
assert_service_has_healthy_instances s2 1 alpha
}
@test "gateway-primary should be up and listening" {
retry_long nc -z consul-primary:4431
}
@test "gateway-alpha should be up and listening" {
retry_long nc -z consul-alpha:4432
}
@test "peer the two clusters together" {
create_peering primary alpha
}
@test "s2 alpha proxies should be healthy in primary" {
assert_service_has_healthy_instances s2 1 primary "" "" primary-to-alpha
}
@test "gateway-alpha should have healthy endpoints for s2" {
assert_upstream_has_endpoints_in_status consul-alpha:19003 s2.default.alpha HEALTHY 1
}
@test "s1 upstream should have healthy endpoints for s2" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2.default.default.alpha-to-primary.external HEALTHY 1
}
@test "s1 upstream should be able to connect to s2" {
run retry_default curl -s -f -d hello localhost:5000
[ "$status" -eq 0 ]
[ "$output" = "hello" ]
}
@test "s1 upstream made 1 connection to s2" {
assert_envoy_metric_at_least 127.0.0.1:19000 "cluster.s2.default.default.alpha-to-primary.external.*cx_total" 1
}

View File

@ -0,0 +1,4 @@
#!/bin/bash
export REQUIRED_SERVICES="s1 s1-sidecar-proxy gateway-primary s2-alpha s2-sidecar-proxy-alpha gateway-alpha tcpdump-primary tcpdump-alpha"
export REQUIRE_PEERS=1

View File

@ -4,3 +4,4 @@ export DEFAULT_REQUIRED_SERVICES="s1 s1-sidecar-proxy s2 s2-sidecar-proxy"
export REQUIRED_SERVICES="${DEFAULT_REQUIRED_SERVICES}"
export REQUIRE_SECONDARY=0
export REQUIRE_PARTITIONS=0
export REQUIRE_PEERS=0

View File

@ -357,7 +357,6 @@ function get_upstream_endpoint_in_status_count {
local HEALTH_STATUS=$3
run curl -s -f "http://${HOSTPORT}/clusters?format=json"
[ "$status" -eq 0 ]
# echo "$output" >&3
echo "$output" | jq --raw-output "
.cluster_statuses[]
| select(.name|startswith(\"${CLUSTER_NAME}\"))
@ -477,8 +476,11 @@ function get_healthy_service_count {
local SERVICE_NAME=$1
local DC=$2
local NS=$3
local AP=$4
local PEER_NAME=$5
run curl -s -f ${HEADERS} "consul-${DC}:8500/v1/health/connect/${SERVICE_NAME}?passing&ns=${NS}&partition=${AP}&peer=${PEER_NAME}"
run curl -s -f ${HEADERS} "127.0.0.1:8500/v1/health/connect/${SERVICE_NAME}?dc=${DC}&passing&ns=${NS}"
[ "$status" -eq 0 ]
echo "$output" | jq --raw-output '. | length'
}
@ -508,9 +510,11 @@ function assert_service_has_healthy_instances_once {
local SERVICE_NAME=$1
local EXPECT_COUNT=$2
local DC=${3:-primary}
local NS=$4
local NS=${4:-}
local AP=${5:-}
local PEER_NAME=${6:-}
GOT_COUNT=$(get_healthy_service_count "$SERVICE_NAME" "$DC" "$NS")
GOT_COUNT=$(get_healthy_service_count "$SERVICE_NAME" "$DC" "$NS" "$AP" "$PEER_NAME")
[ "$GOT_COUNT" -eq $EXPECT_COUNT ]
}
@ -519,9 +523,11 @@ function assert_service_has_healthy_instances {
local SERVICE_NAME=$1
local EXPECT_COUNT=$2
local DC=${3:-primary}
local NS=$4
local NS=${4:-}
local AP=${5:-}
local PEER_NAME=${6:-}
run retry_long assert_service_has_healthy_instances_once "$SERVICE_NAME" "$EXPECT_COUNT" "$DC" "$NS"
run retry_long assert_service_has_healthy_instances_once "$SERVICE_NAME" "$EXPECT_COUNT" "$DC" "$NS" "$AP" "$PEER_NAME"
[ "$status" -eq 0 ]
}
@ -941,3 +947,20 @@ function assert_expected_fortio_host_header {
return 1
fi
}
function create_peering {
local GENERATE_PEER=$1
local INITIATE_PEER=$2
run curl -sL -XPOST "http://consul-${GENERATE_PEER}:8500/v1/peering/token" -d"{ \"PeerName\" : \"${GENERATE_PEER}-to-${INITIATE_PEER}\" }"
# echo "$output" >&3
[ "$status" == 0 ]
local token
token="$(echo "$output" | jq -r .PeeringToken)"
[ -n "$token" ]
run curl -sLv -XPOST "http://consul-${INITIATE_PEER}:8500/v1/peering/initiate" -d"{ \"PeerName\" : \"${INITIATE_PEER}-to-${GENERATE_PEER}\", \"PeeringToken\" : \"${token}\" }"
# echo "$output" >&3
[ "$status" == 0 ]
}

View File

@ -127,12 +127,20 @@ function start_consul {
'-p=9411:9411'
'-p=16686:16686'
)
if [[ $DC == 'secondary' ]]; then
case "$DC" in
secondary)
ports=(
'-p=9500:8500'
'-p=9502:8502'
)
fi
;;
alpha)
ports=(
'-p=9510:8500'
'-p=9512:8502'
)
;;
esac
license="${CONSUL_LICENSE:-}"
# load the consul license so we can pass it into the consul
@ -269,7 +277,10 @@ function capture_logs {
then
services="$services consul-ap1"
fi
if is_set $REQUIRE_PEERS
then
services="$services consul-alpha"
fi
if [ -f "${CASE_DIR}/capture.sh" ]
then
@ -289,7 +300,7 @@ function stop_services {
# Teardown
docker_kill_rm $REQUIRED_SERVICES
docker_kill_rm consul-primary consul-secondary consul-ap1
docker_kill_rm consul-primary consul-secondary consul-ap1 consul-alpha
}
function init_vars {
@ -332,6 +343,10 @@ function run_tests {
then
init_workdir ap1
fi
if is_set $REQUIRE_PEERS
then
init_workdir alpha
fi
global_setup
@ -357,6 +372,9 @@ function run_tests {
docker_consul "primary" consul partition create -name ap1 > /dev/null
start_partitioned_client ap1
fi
if is_set $REQUIRE_PEERS; then
start_consul alpha
fi
echo "Setting up the primary datacenter"
pre_service_setup primary
@ -369,6 +387,10 @@ function run_tests {
echo "Setting up the non-default partition"
pre_service_setup ap1
fi
if is_set $REQUIRE_PEERS; then
echo "Setting up the alpha peer"
pre_service_setup alpha
fi
echo "Starting services"
start_services
@ -381,6 +403,10 @@ function run_tests {
echo "Verifying the secondary datacenter"
verify secondary
fi
if is_set $REQUIRE_PEERS; then
echo "Verifying the alpha peer"
verify alpha
fi
}
function test_teardown {
@ -435,13 +461,13 @@ function suite_setup {
}
function suite_teardown {
docker_kill_rm verify-primary verify-secondary
docker_kill_rm verify-primary verify-secondary verify-alpha
# this is some hilarious magic
docker_kill_rm $(grep "^function run_container_" $self_name | \
sed 's/^function run_container_\(.*\) {/\1/g')
docker_kill_rm consul-primary consul-secondary consul-ap1
docker_kill_rm consul-primary consul-secondary consul-ap1 consul-alpha
if docker network inspect envoy-tests &>/dev/null ; then
echo -n "Deleting network 'envoy-tests'..."
@ -530,6 +556,14 @@ function run_container_s3-ap1 {
common_run_container_service s3 ap1 8580 8579
}
function run_container_s1-alpha {
common_run_container_service s1-alpha alpha 8080 8079
}
function run_container_s2-alpha {
common_run_container_service s2-alpha alpha 8181 8179
}
function common_run_container_sidecar_proxy {
local service="$1"
local CLUSTER="$2"
@ -544,7 +578,7 @@ function common_run_container_sidecar_proxy {
"${HASHICORP_DOCKER_PROXY}/envoyproxy/envoy:v${ENVOY_VERSION}" \
envoy \
-c /workdir/${CLUSTER}/envoy/${service}-bootstrap.json \
-l debug \
-l trace \
--disable-hot-restart \
--drain-time-s 1 >/dev/null
}
@ -564,7 +598,7 @@ function run_container_s1-sidecar-proxy-consul-exec {
consul connect envoy -sidecar-for s1 \
-envoy-version ${ENVOY_VERSION} \
-- \
-l debug >/dev/null
-l trace >/dev/null
}
function run_container_s2-sidecar-proxy {
@ -606,6 +640,13 @@ function run_container_s3-ap1-sidecar-proxy {
common_run_container_sidecar_proxy s3 ap1
}
function run_container_s1-sidecar-proxy-alpha {
common_run_container_sidecar_proxy s1 alpha
}
function run_container_s2-sidecar-proxy-alpha {
common_run_container_sidecar_proxy s2 alpha
}
function common_run_container_gateway {
local name="$1"
local DC="$2"
@ -620,7 +661,7 @@ function common_run_container_gateway {
"${HASHICORP_DOCKER_PROXY}/envoyproxy/envoy:v${ENVOY_VERSION}" \
envoy \
-c /workdir/${DC}/envoy/${name}-bootstrap.json \
-l debug \
-l trace \
--disable-hot-restart \
--drain-time-s 1 >/dev/null
}
@ -631,6 +672,9 @@ function run_container_gateway-primary {
function run_container_gateway-secondary {
common_run_container_gateway mesh-gateway secondary
}
function run_container_gateway-alpha {
common_run_container_gateway mesh-gateway alpha
}
function run_container_ingress-gateway-primary {
common_run_container_gateway ingress-gateway primary
@ -699,6 +743,10 @@ function run_container_tcpdump-secondary {
# To use add "tcpdump-secondary" to REQUIRED_SERVICES
common_run_container_tcpdump secondary
}
function run_container_tcpdump-alpha {
# To use add "tcpdump-alpha" to REQUIRED_SERVICES
common_run_container_tcpdump alpha
}
function common_run_container_tcpdump {
local DC="$1"