From cf3e517d0e8887f12ec70ed1e133252935428639 Mon Sep 17 00:00:00 2001 From: Eric Date: Tue, 15 Mar 2022 10:07:40 -0400 Subject: [PATCH] Create and wire up the serverless patcher --- agent/agent.go | 1 + agent/proxycfg/testing.go | 8 +- agent/proxycfg/testing_terminating_gateway.go | 12 + agent/xds/delta.go | 9 + agent/xds/delta_test.go | 404 +++++++++--------- agent/xds/server.go | 19 +- .../xds/serverlessplugin/serverlessplugin.go | 9 + agent/xds/xds_protocol_helpers_test.go | 2 + agent/xds/xdscommon/xdscommon.go | 79 ++++ agent/xds/xdscommon/xdscommon_oss_test.go | 71 +++ 10 files changed, 407 insertions(+), 207 deletions(-) create mode 100644 agent/xds/serverlessplugin/serverlessplugin.go create mode 100644 agent/xds/xdscommon/xdscommon_oss_test.go diff --git a/agent/agent.go b/agent/agent.go index 15b439be0b..40151727d1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -707,6 +707,7 @@ func (a *Agent) listenAndServeGRPC() error { xdsServer := xds.NewServer( a.logger.Named(logging.Envoy), + a.config.ConnectServerlessPluginEnabled, a.proxyConfig, func(id string) (acl.Authorizer, error) { return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil) diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index f95ccfe374..af3a330615 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "path" "path/filepath" + "runtime" "sync" "sync/atomic" "time" @@ -874,9 +875,14 @@ func (ct *ControllableCacheType) RegisterOptions() cache.RegisterOptions { func golden(t testing.T, name string) string { t.Helper() - golden := filepath.Join("../xds/testdata", name+".golden") + golden := filepath.Join(projectRoot(), "../", "/xds/testdata", name+".golden") expected, err := ioutil.ReadFile(golden) require.NoError(t, err) return string(expected) } + +func projectRoot() string { + _, base, _, _ := runtime.Caller(0) + return filepath.Dir(base) +} diff --git a/agent/proxycfg/testing_terminating_gateway.go b/agent/proxycfg/testing_terminating_gateway.go index 155ae7c129..5b9889c85c 100644 --- a/agent/proxycfg/testing_terminating_gateway.go +++ b/agent/proxycfg/testing_terminating_gateway.go @@ -641,3 +641,15 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf }, }) } + +func TestConfigSnapshotTerminatingGatewayWithServiceDefaultsMeta(t testing.T) *ConfigSnapshot { + web := structs.NewServiceName("web", nil) + return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{ + { + CorrelationID: serviceConfigIDPrefix + web.String(), + Result: &structs.ServiceConfigResponse{ + Meta: map[string]string{"a": "b"}, + }, + }, + }) +} diff --git a/agent/xds/delta.go b/agent/xds/delta.go index cea41d453e..7b4aa5242a 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/xds/serverlessplugin" "github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/logging" ) @@ -212,6 +213,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove s.ResourceMapMutateFn(newResourceMap) } + if s.serverlessPluginEnabled { + newResourceMap, err = serverlessplugin.MutateIndexedResources(newResourceMap, xdscommon.MakePluginConfiguration(cfgSnap)) + + if err != nil { + generator.Logger.Warn("failed to patch xDS resources in the serverless plugin", "err", err) + } + } + if err := populateChildIndexMap(newResourceMap); err != nil { return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err) } diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index f74b05029f..c60f5fc021 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -2,6 +2,7 @@ package xds import ( "errors" + "fmt" "strings" "sync/atomic" "testing" @@ -27,202 +28,207 @@ import ( // Stick to very straightforward stuff in xds_protocol_helpers_test.go. func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { - aclResolve := func(id string) (acl.Authorizer, error) { - // Allow all - return acl.RootAuthorizer("manage"), nil - } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) - mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + for _, serverlessPluginEnabled := range []bool{false, true} { + t.Run(fmt.Sprintf("serverless patcher: %t", serverlessPluginEnabled), func(t *testing.T) { - sid := structs.NewServiceID("web-sidecar-proxy", nil) + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy - // Register the proxy to create state needed to Watch() on - mgr.RegisterProxy(t, sid) + sid := structs.NewServiceID("web-sidecar-proxy", nil) - var snap *proxycfg.ConfigSnapshot + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) - runStep(t, "initial setup", func(t *testing.T) { - snap = newTestSnapshot(t, nil, "") + var snap *proxycfg.ConfigSnapshot - // Send initial cluster discover. We'll assume we are testing a partial - // reconnect and include some initial resource versions that will be - // cleaned up. - envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - InitialResourceVersions: mustMakeVersionMap(t, - makeTestCluster(t, snap, "tcp:geo-cache"), - ), + runStep(t, "initial setup", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Send initial cluster discover. We'll assume we are testing a partial + // reconnect and include some initial resource versions that will be + // cleaned up. + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + InitialResourceVersions: mustMakeVersionMap(t, + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot (tcp with one tcp upstream) + mgr.DeliverConfig(t, sid, snap) + }) + + runStep(t, "first sync", func(t *testing.T) { + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + // SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + // Envoy then tries to discover endpoints for those clusters. + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + // We'll assume we are testing a partial "reconnect" + InitialResourceVersions: mustMakeVersionMap(t, + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + // "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + // + // Include "fake-endpoints" here to test subscribing to an unknown + // thing and have consul tell us there's no data for it. + "fake-endpoints", + }, + }) + + // It also (in parallel) issues the cluster ACK + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) + + // We should get a response immediately since the config is already present in + // the server for endpoints. Note that this should not be racy if the server + // is behaving well since the Cluster send above should be blocked until we + // deliver a new config version. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.EndpointType, + Nonce: hexString(2), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + // SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"), + // SAME_AS_INITIAL_VERSION: "fake-endpoints", + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Envoy now sends listener request + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) + + // It also (in parallel) issues the endpoint ACK + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) + + // And should get a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.ListenerType, + Nonce: hexString(3), + Resources: makeTestResources(t, + makeTestListener(t, snap, "tcp:public_listener"), + makeTestListener(t, snap, "tcp:db"), + makeTestListener(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // ACKs the listener + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) + + // If we re-subscribe to something even if there are no changes we get a + // fresh copy. + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.EndpointType, + Nonce: hexString(4), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) { + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] = + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1] + } + + runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesUnsubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB + snap = newTestSnapshot(t, snap, "") + deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") + mgr.DeliverConfig(t, sid, snap) + + // We never send an EDS reply about this change. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + runStep(t, "restore endpoint subscription", func(t *testing.T) { + // Fix the snapshot + snap = newTestSnapshot(t, snap, "") + mgr.DeliverConfig(t, sid, snap) + + // We never send an EDS reply about this change. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // and fix the subscription + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + }) + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.EndpointType, + Nonce: hexString(5), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + ), + }) + + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + // NOTE: this has to be the last subtest since it kills the stream + runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) { + // Force sends to fail + envoy.SetSendErr(errors.New("test error")) + + // Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment) + snap = newTestSnapshot(t, snap, "") + mgr.DeliverConfig(t, sid, snap) + + // We never send any replies about this change because we died. + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } }) - - // Check no response sent yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - requireProtocolVersionGauge(t, scenario, "v3", 1) - - // Deliver a new snapshot (tcp with one tcp upstream) - mgr.DeliverConfig(t, sid, snap) - }) - - runStep(t, "first sync", func(t *testing.T) { - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.ClusterType, - Nonce: hexString(1), - Resources: makeTestResources(t, - makeTestCluster(t, snap, "tcp:local_app"), - makeTestCluster(t, snap, "tcp:db"), - // SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"), - ), - }) - - // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - // We'll assume we are testing a partial "reconnect" - InitialResourceVersions: mustMakeVersionMap(t, - makeTestEndpoints(t, snap, "tcp:geo-cache"), - ), - ResourceNamesSubscribe: []string{ - "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", - // "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", - // - // Include "fake-endpoints" here to test subscribing to an unknown - // thing and have consul tell us there's no data for it. - "fake-endpoints", - }, - }) - - // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) - - // We should get a response immediately since the config is already present in - // the server for endpoints. Note that this should not be racy if the server - // is behaving well since the Cluster send above should be blocked until we - // deliver a new config version. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.EndpointType, - Nonce: hexString(2), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:db"), - // SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"), - // SAME_AS_INITIAL_VERSION: "fake-endpoints", - ), - }) - - // And no other response yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // Envoy now sends listener request - envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) - - // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) - - // And should get a response immediately. - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.ListenerType, - Nonce: hexString(3), - Resources: makeTestResources(t, - makeTestListener(t, snap, "tcp:public_listener"), - makeTestListener(t, snap, "tcp:db"), - makeTestListener(t, snap, "tcp:geo-cache"), - ), - }) - - // And no other response yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // ACKs the listener - envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) - - // If we re-subscribe to something even if there are no changes we get a - // fresh copy. - envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - ResourceNamesSubscribe: []string{ - "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", - }, - }) - - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.EndpointType, - Nonce: hexString(4), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:geo-cache"), - ), - }) - - envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) - - // And no other response yet - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - - deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) { - snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] = - snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1] - } - - runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { - envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - ResourceNamesUnsubscribe: []string{ - "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", - }, - }) - - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB - snap = newTestSnapshot(t, snap, "") - deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") - mgr.DeliverConfig(t, sid, snap) - - // We never send an EDS reply about this change. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - - runStep(t, "restore endpoint subscription", func(t *testing.T) { - // Fix the snapshot - snap = newTestSnapshot(t, snap, "") - mgr.DeliverConfig(t, sid, snap) - - // We never send an EDS reply about this change. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - - // and fix the subscription - envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ - ResourceNamesSubscribe: []string{ - "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", - }, - }) - assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: xdscommon.EndpointType, - Nonce: hexString(5), - Resources: makeTestResources(t, - makeTestEndpoints(t, snap, "tcp:db"), - ), - }) - - envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5) - - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - - // NOTE: this has to be the last subtest since it kills the stream - runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) { - // Force sends to fail - envoy.SetSendErr(errors.New("test error")) - - // Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment) - snap = newTestSnapshot(t, snap, "") - mgr.DeliverConfig(t, sid, snap) - - // We never send any replies about this change because we died. - assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) - }) - - envoy.Close() - select { - case err := <-errCh: - require.NoError(t, err) - case <-time.After(50 * time.Millisecond): - t.Fatalf("timed out waiting for handler to finish") } } @@ -231,7 +237,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -362,7 +368,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -515,7 +521,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy // This mutateFn causes any endpoint with a name containing "geo-cache" to be @@ -658,7 +664,7 @@ func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) _, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy envoy.EnvoyVersion = "1.18.0" @@ -722,7 +728,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -858,7 +864,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1118,7 +1124,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1195,6 +1201,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri } scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. + false, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1293,6 +1300,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa } scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. + false, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1373,7 +1381,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0) + scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("ingress-gateway", nil) diff --git a/agent/xds/server.go b/agent/xds/server.go index 78cd2d66b0..efab8923c3 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -131,7 +131,8 @@ type Server struct { // ResourceMapMutateFn exclusively exists for testing purposes. ResourceMapMutateFn func(resourceMap *xdscommon.IndexedResources) - activeStreams *activeStreamCounters + activeStreams *activeStreamCounters + serverlessPluginEnabled bool } // activeStreamCounters simply encapsulates two counters accessed atomically to @@ -166,19 +167,21 @@ func (c *activeStreamCounters) Increment(xdsVersion string) func() { func NewServer( logger hclog.Logger, + serverlessPluginEnabled bool, cfgMgr ConfigManager, resolveToken ACLResolverFunc, checkFetcher HTTPCheckFetcher, cfgFetcher ConfigFetcher, ) *Server { return &Server{ - Logger: logger, - CfgMgr: cfgMgr, - ResolveToken: resolveToken, - CheckFetcher: checkFetcher, - CfgFetcher: cfgFetcher, - AuthCheckFrequency: DefaultAuthCheckFrequency, - activeStreams: &activeStreamCounters{}, + Logger: logger, + CfgMgr: cfgMgr, + ResolveToken: resolveToken, + CheckFetcher: checkFetcher, + CfgFetcher: cfgFetcher, + AuthCheckFrequency: DefaultAuthCheckFrequency, + activeStreams: &activeStreamCounters{}, + serverlessPluginEnabled: serverlessPluginEnabled, } } diff --git a/agent/xds/serverlessplugin/serverlessplugin.go b/agent/xds/serverlessplugin/serverlessplugin.go new file mode 100644 index 0000000000..8ad6c62738 --- /dev/null +++ b/agent/xds/serverlessplugin/serverlessplugin.go @@ -0,0 +1,9 @@ +package serverlessplugin + +import ( + "github.com/hashicorp/consul/agent/xds/xdscommon" +) + +func MutateIndexedResources(resources *xdscommon.IndexedResources, config xdscommon.PluginConfiguration) (*xdscommon.IndexedResources, error) { + return resources, nil +} diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 3ffc6c831b..228cf543bb 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -131,6 +131,7 @@ func newTestServerDeltaScenario( proxyID string, token string, authCheckFrequency time.Duration, + serverlessPluginEnabled bool, ) *testServerScenario { mgr := newTestManager(t) envoy := NewTestEnvoy(t, proxyID, token) @@ -151,6 +152,7 @@ func newTestServerDeltaScenario( s := NewServer( testutil.Logger(t), + serverlessPluginEnabled, mgr, resolveToken, nil, /*checkFetcher HTTPCheckFetcher*/ diff --git a/agent/xds/xdscommon/xdscommon.go b/agent/xds/xdscommon/xdscommon.go index 2c373e2982..de856c355c 100644 --- a/agent/xds/xdscommon/xdscommon.go +++ b/agent/xds/xdscommon/xdscommon.go @@ -2,6 +2,11 @@ package xdscommon import ( "github.com/golang/protobuf/proto" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" ) const ( @@ -47,3 +52,77 @@ func EmptyIndexedResources() *IndexedResources { }, } } + +type ServiceConfig struct { + // Kind identifies the final proxy kind that will make the request to the + // destination service. + Kind api.ServiceKind + Meta map[string]string +} + +// PluginConfiguration is passed into Envoy plugins. It should depend on the +// API client rather than the structs package because the API client is meant +// to be public. +type PluginConfiguration struct { + // ServiceConfigs is a mapping from service names to the data Envoy plugins + // need to override the default Envoy configurations. + ServiceConfigs map[api.CompoundServiceName]ServiceConfig + + // SNIToServiceName is a mapping from SNIs to service names. This allows + // Envoy plugins to easily convert from an SNI Envoy resource name to the + // associated service's CompoundServiceName + SNIToServiceName map[string]api.CompoundServiceName + + // EnvoyIDToServiceName is a mapping from EnvoyIDs to service names. This allows + // Envoy plugins to easily convert from an EnvoyID Envoy resource name to the + // associated service's CompoundServiceName + EnvoyIDToServiceName map[string]api.CompoundServiceName + + // Kind is mode the local Envoy proxy is running in + Kind api.ServiceKind +} + +// MakePluginConfiguration generates the configuration that will be sent to +// Envoy plugins. +func MakePluginConfiguration(cfgSnap *proxycfg.ConfigSnapshot) PluginConfiguration { + serviceConfigs := make(map[api.CompoundServiceName]ServiceConfig) + sniMappings := make(map[string]api.CompoundServiceName) + envoyIDMappings := make(map[string]api.CompoundServiceName) + + trustDomain := "" + if cfgSnap.Roots != nil { + trustDomain = cfgSnap.Roots.TrustDomain + } + + switch cfgSnap.Kind { + case structs.ServiceKindTerminatingGateway: + for svc, c := range cfgSnap.TerminatingGateway.ServiceConfigs { + compoundServiceName := serviceNameToCompoundServiceName(svc) + serviceConfigs[compoundServiceName] = ServiceConfig{ + Meta: c.Meta, + Kind: api.ServiceKindTerminatingGateway, + } + + sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, trustDomain) + sniMappings[sni] = compoundServiceName + + envoyID := proxycfg.NewUpstreamIDFromServiceName(svc) + envoyIDMappings[envoyID.EnvoyID()] = compoundServiceName + } + } + + return PluginConfiguration{ + ServiceConfigs: serviceConfigs, + SNIToServiceName: sniMappings, + EnvoyIDToServiceName: envoyIDMappings, + Kind: api.ServiceKind(cfgSnap.Kind), + } +} + +func serviceNameToCompoundServiceName(svc structs.ServiceName) api.CompoundServiceName { + return api.CompoundServiceName{ + Name: svc.Name, + Partition: svc.PartitionOrDefault(), + Namespace: svc.NamespaceOrDefault(), + } +} diff --git a/agent/xds/xdscommon/xdscommon_oss_test.go b/agent/xds/xdscommon/xdscommon_oss_test.go new file mode 100644 index 0000000000..d11c9b015c --- /dev/null +++ b/agent/xds/xdscommon/xdscommon_oss_test.go @@ -0,0 +1,71 @@ +//go:build !consulent +// +build !consulent + +package xdscommon + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/api" +) + +func TestMakePluginConfiguration_TerminatingGateway(t *testing.T) { + snap := proxycfg.TestConfigSnapshotTerminatingGatewayWithServiceDefaultsMeta(t) + + webService := api.CompoundServiceName{ + Name: "web", + Namespace: "default", + Partition: "default", + } + dbService := api.CompoundServiceName{ + Name: "db", + Namespace: "default", + Partition: "default", + } + cacheService := api.CompoundServiceName{ + Name: "cache", + Namespace: "default", + Partition: "default", + } + apiService := api.CompoundServiceName{ + Name: "api", + Namespace: "default", + Partition: "default", + } + + expected := PluginConfiguration{ + Kind: api.ServiceKindTerminatingGateway, + ServiceConfigs: map[api.CompoundServiceName]ServiceConfig{ + webService: { + Kind: api.ServiceKindTerminatingGateway, + Meta: map[string]string{"a": "b"}, + }, + apiService: { + Kind: api.ServiceKindTerminatingGateway, + }, + cacheService: { + Kind: api.ServiceKindTerminatingGateway, + }, + dbService: { + Kind: api.ServiceKindTerminatingGateway, + }, + }, + SNIToServiceName: map[string]api.CompoundServiceName{ + "api.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": apiService, + "cache.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": cacheService, + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": dbService, + "web.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": webService, + }, + EnvoyIDToServiceName: map[string]api.CompoundServiceName{ + "web": webService, + "db": dbService, + "cache": cacheService, + "api": apiService, + }, + } + + require.Equal(t, expected, MakePluginConfiguration(snap)) +}