Create and wire up the serverless patcher

This commit is contained in:
Eric 2022-03-15 10:07:40 -04:00
parent d59364fa7f
commit cf3e517d0e
10 changed files with 407 additions and 207 deletions

View File

@ -707,6 +707,7 @@ func (a *Agent) listenAndServeGRPC() error {
xdsServer := xds.NewServer(
a.logger.Named(logging.Envoy),
a.config.ConnectServerlessPluginEnabled,
a.proxyConfig,
func(id string) (acl.Authorizer, error) {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"path"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
@ -874,9 +875,14 @@ func (ct *ControllableCacheType) RegisterOptions() cache.RegisterOptions {
func golden(t testing.T, name string) string {
t.Helper()
golden := filepath.Join("../xds/testdata", name+".golden")
golden := filepath.Join(projectRoot(), "../", "/xds/testdata", name+".golden")
expected, err := ioutil.ReadFile(golden)
require.NoError(t, err)
return string(expected)
}
func projectRoot() string {
_, base, _, _ := runtime.Caller(0)
return filepath.Dir(base)
}

View File

@ -641,3 +641,15 @@ func TestConfigSnapshotTerminatingGatewayIgnoreExtraResolvers(t testing.T) *Conf
},
})
}
func TestConfigSnapshotTerminatingGatewayWithServiceDefaultsMeta(t testing.T) *ConfigSnapshot {
web := structs.NewServiceName("web", nil)
return TestConfigSnapshotTerminatingGateway(t, true, nil, []agentcache.UpdateEvent{
{
CorrelationID: serviceConfigIDPrefix + web.String(),
Result: &structs.ServiceConfigResponse{
Meta: map[string]string{"a": "b"},
},
},
})
}

View File

@ -23,6 +23,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/serverlessplugin"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/logging"
)
@ -212,6 +213,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
s.ResourceMapMutateFn(newResourceMap)
}
if s.serverlessPluginEnabled {
newResourceMap, err = serverlessplugin.MutateIndexedResources(newResourceMap, xdscommon.MakePluginConfiguration(cfgSnap))
if err != nil {
generator.Logger.Warn("failed to patch xDS resources in the serverless plugin", "err", err)
}
}
if err := populateChildIndexMap(newResourceMap); err != nil {
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
}

View File

@ -2,6 +2,7 @@ package xds
import (
"errors"
"fmt"
"strings"
"sync/atomic"
"testing"
@ -27,202 +28,207 @@ import (
// Stick to very straightforward stuff in xds_protocol_helpers_test.go.
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
for _, serverlessPluginEnabled := range []bool{false, true} {
t.Run(fmt.Sprintf("serverless patcher: %t", serverlessPluginEnabled), func(t *testing.T) {
sid := structs.NewServiceID("web-sidecar-proxy", nil)
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
sid := structs.NewServiceID("web-sidecar-proxy", nil)
var snap *proxycfg.ConfigSnapshot
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
runStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")
var snap *proxycfg.ConfigSnapshot
// Send initial cluster discover. We'll assume we are testing a partial
// reconnect and include some initial resource versions that will be
// cleaned up.
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
runStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover. We'll assume we are testing a partial
// reconnect and include some initial resource versions that will be
// cleaned up.
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
})
runStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
// We'll assume we are testing a partial "reconnect"
InitialResourceVersions: mustMakeVersionMap(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
// "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
//
// Include "fake-endpoints" here to test subscribing to an unknown
// thing and have consul tell us there's no data for it.
"fake-endpoints",
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
// SAME_AS_INITIAL_VERSION: "fake-endpoints",
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// If we re-subscribe to something even if there are no changes we get a
// fresh copy.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) {
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] =
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1]
}
runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesUnsubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
runStep(t, "restore endpoint subscription", func(t *testing.T) {
// Fix the snapshot
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// and fix the subscription
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
),
})
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// NOTE: this has to be the last subtest since it kills the stream
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail
envoy.SetSendErr(errors.New("test error"))
// Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment)
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// We never send any replies about this change because we died.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
})
runStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
// We'll assume we are testing a partial "reconnect"
InitialResourceVersions: mustMakeVersionMap(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
// "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
//
// Include "fake-endpoints" here to test subscribing to an unknown
// thing and have consul tell us there's no data for it.
"fake-endpoints",
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
// SAME_AS_INITIAL_VERSION: "fake-endpoints",
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// If we re-subscribe to something even if there are no changes we get a
// fresh copy.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) {
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] =
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1]
}
runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesUnsubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
runStep(t, "restore endpoint subscription", func(t *testing.T) {
// Fix the snapshot
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// and fix the subscription
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
),
})
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// NOTE: this has to be the last subtest since it kills the stream
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail
envoy.SetSendErr(errors.New("test error"))
// Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment)
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// We never send any replies about this change because we died.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
@ -231,7 +237,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -362,7 +368,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -515,7 +521,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
// This mutateFn causes any endpoint with a name containing "geo-cache" to be
@ -658,7 +664,7 @@ func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
_, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
envoy.EnvoyVersion = "1.18.0"
@ -722,7 +728,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -858,7 +864,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1118,7 +1124,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1195,6 +1201,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
false,
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1293,6 +1300,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
false,
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1373,7 +1381,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0)
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("ingress-gateway", nil)

View File

@ -131,7 +131,8 @@ type Server struct {
// ResourceMapMutateFn exclusively exists for testing purposes.
ResourceMapMutateFn func(resourceMap *xdscommon.IndexedResources)
activeStreams *activeStreamCounters
activeStreams *activeStreamCounters
serverlessPluginEnabled bool
}
// activeStreamCounters simply encapsulates two counters accessed atomically to
@ -166,19 +167,21 @@ func (c *activeStreamCounters) Increment(xdsVersion string) func() {
func NewServer(
logger hclog.Logger,
serverlessPluginEnabled bool,
cfgMgr ConfigManager,
resolveToken ACLResolverFunc,
checkFetcher HTTPCheckFetcher,
cfgFetcher ConfigFetcher,
) *Server {
return &Server{
Logger: logger,
CfgMgr: cfgMgr,
ResolveToken: resolveToken,
CheckFetcher: checkFetcher,
CfgFetcher: cfgFetcher,
AuthCheckFrequency: DefaultAuthCheckFrequency,
activeStreams: &activeStreamCounters{},
Logger: logger,
CfgMgr: cfgMgr,
ResolveToken: resolveToken,
CheckFetcher: checkFetcher,
CfgFetcher: cfgFetcher,
AuthCheckFrequency: DefaultAuthCheckFrequency,
activeStreams: &activeStreamCounters{},
serverlessPluginEnabled: serverlessPluginEnabled,
}
}

View File

@ -0,0 +1,9 @@
package serverlessplugin
import (
"github.com/hashicorp/consul/agent/xds/xdscommon"
)
func MutateIndexedResources(resources *xdscommon.IndexedResources, config xdscommon.PluginConfiguration) (*xdscommon.IndexedResources, error) {
return resources, nil
}

View File

@ -131,6 +131,7 @@ func newTestServerDeltaScenario(
proxyID string,
token string,
authCheckFrequency time.Duration,
serverlessPluginEnabled bool,
) *testServerScenario {
mgr := newTestManager(t)
envoy := NewTestEnvoy(t, proxyID, token)
@ -151,6 +152,7 @@ func newTestServerDeltaScenario(
s := NewServer(
testutil.Logger(t),
serverlessPluginEnabled,
mgr,
resolveToken,
nil, /*checkFetcher HTTPCheckFetcher*/

View File

@ -2,6 +2,11 @@ package xdscommon
import (
"github.com/golang/protobuf/proto"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
const (
@ -47,3 +52,77 @@ func EmptyIndexedResources() *IndexedResources {
},
}
}
type ServiceConfig struct {
// Kind identifies the final proxy kind that will make the request to the
// destination service.
Kind api.ServiceKind
Meta map[string]string
}
// PluginConfiguration is passed into Envoy plugins. It should depend on the
// API client rather than the structs package because the API client is meant
// to be public.
type PluginConfiguration struct {
// ServiceConfigs is a mapping from service names to the data Envoy plugins
// need to override the default Envoy configurations.
ServiceConfigs map[api.CompoundServiceName]ServiceConfig
// SNIToServiceName is a mapping from SNIs to service names. This allows
// Envoy plugins to easily convert from an SNI Envoy resource name to the
// associated service's CompoundServiceName
SNIToServiceName map[string]api.CompoundServiceName
// EnvoyIDToServiceName is a mapping from EnvoyIDs to service names. This allows
// Envoy plugins to easily convert from an EnvoyID Envoy resource name to the
// associated service's CompoundServiceName
EnvoyIDToServiceName map[string]api.CompoundServiceName
// Kind is mode the local Envoy proxy is running in
Kind api.ServiceKind
}
// MakePluginConfiguration generates the configuration that will be sent to
// Envoy plugins.
func MakePluginConfiguration(cfgSnap *proxycfg.ConfigSnapshot) PluginConfiguration {
serviceConfigs := make(map[api.CompoundServiceName]ServiceConfig)
sniMappings := make(map[string]api.CompoundServiceName)
envoyIDMappings := make(map[string]api.CompoundServiceName)
trustDomain := ""
if cfgSnap.Roots != nil {
trustDomain = cfgSnap.Roots.TrustDomain
}
switch cfgSnap.Kind {
case structs.ServiceKindTerminatingGateway:
for svc, c := range cfgSnap.TerminatingGateway.ServiceConfigs {
compoundServiceName := serviceNameToCompoundServiceName(svc)
serviceConfigs[compoundServiceName] = ServiceConfig{
Meta: c.Meta,
Kind: api.ServiceKindTerminatingGateway,
}
sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), cfgSnap.Datacenter, trustDomain)
sniMappings[sni] = compoundServiceName
envoyID := proxycfg.NewUpstreamIDFromServiceName(svc)
envoyIDMappings[envoyID.EnvoyID()] = compoundServiceName
}
}
return PluginConfiguration{
ServiceConfigs: serviceConfigs,
SNIToServiceName: sniMappings,
EnvoyIDToServiceName: envoyIDMappings,
Kind: api.ServiceKind(cfgSnap.Kind),
}
}
func serviceNameToCompoundServiceName(svc structs.ServiceName) api.CompoundServiceName {
return api.CompoundServiceName{
Name: svc.Name,
Partition: svc.PartitionOrDefault(),
Namespace: svc.NamespaceOrDefault(),
}
}

View File

@ -0,0 +1,71 @@
//go:build !consulent
// +build !consulent
package xdscommon
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/api"
)
func TestMakePluginConfiguration_TerminatingGateway(t *testing.T) {
snap := proxycfg.TestConfigSnapshotTerminatingGatewayWithServiceDefaultsMeta(t)
webService := api.CompoundServiceName{
Name: "web",
Namespace: "default",
Partition: "default",
}
dbService := api.CompoundServiceName{
Name: "db",
Namespace: "default",
Partition: "default",
}
cacheService := api.CompoundServiceName{
Name: "cache",
Namespace: "default",
Partition: "default",
}
apiService := api.CompoundServiceName{
Name: "api",
Namespace: "default",
Partition: "default",
}
expected := PluginConfiguration{
Kind: api.ServiceKindTerminatingGateway,
ServiceConfigs: map[api.CompoundServiceName]ServiceConfig{
webService: {
Kind: api.ServiceKindTerminatingGateway,
Meta: map[string]string{"a": "b"},
},
apiService: {
Kind: api.ServiceKindTerminatingGateway,
},
cacheService: {
Kind: api.ServiceKindTerminatingGateway,
},
dbService: {
Kind: api.ServiceKindTerminatingGateway,
},
},
SNIToServiceName: map[string]api.CompoundServiceName{
"api.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": apiService,
"cache.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": cacheService,
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": dbService,
"web.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": webService,
},
EnvoyIDToServiceName: map[string]api.CompoundServiceName{
"web": webService,
"db": dbService,
"cache": cacheService,
"api": apiService,
},
}
require.Equal(t, expected, MakePluginConfiguration(snap))
}