consul/agent/proxycfg/testing.go

1060 lines
33 KiB
Go
Raw Normal View History

package proxycfg
import (
"context"
"fmt"
"io/ioutil"
"path"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
Support Incremental xDS mode (#9855) This adds support for the Incremental xDS protocol when using xDS v3. This is best reviewed commit-by-commit and will not be squashed when merged. Union of all commit messages follows to give an overarching summary: xds: exclusively support incremental xDS when using xDS v3 Attempts to use SoTW via v3 will fail, much like attempts to use incremental via v2 will fail. Work around a strange older envoy behavior involving empty CDS responses over incremental xDS. xds: various cleanups and refactors that don't strictly concern the addition of incremental xDS support Dissolve the connectionInfo struct in favor of per-connection ResourceGenerators instead. Do a better job of ensuring the xds code uses a well configured logger that accurately describes the connected client. xds: pull out checkStreamACLs method in advance of a later commit xds: rewrite SoTW xDS protocol tests to use protobufs rather than hand-rolled json strings In the test we very lightly reuse some of the more boring protobuf construction helper code that is also technically under test. The important thing of the protocol tests is testing the protocol. The actual inputs and outputs are largely already handled by the xds golden output tests now so these protocol tests don't have to do double-duty. This also updates the SoTW protocol test to exclusively use xDS v2 which is the only variant of SoTW that will be supported in Consul 1.10. xds: default xds.Server.AuthCheckFrequency at use-time instead of construction-time
2021-04-29 18:54:05 +00:00
"github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
)
func TestPeerTrustBundles(t testing.T) *pbpeering.TrustBundleListByServiceResponse {
t.Helper()
return &pbpeering.TrustBundleListByServiceResponse{
Bundles: []*pbpeering.PeeringTrustBundle{
{
PeerName: "peer-a",
TrustDomain: "1c053652-8512-4373-90cf-5a7f6263a994.consul",
RootPEMs: []string{`-----BEGIN CERTIFICATE-----
MIICczCCAdwCCQC3BLnEmLCrSjANBgkqhkiG9w0BAQsFADB+MQswCQYDVQQGEwJV
UzELMAkGA1UECAwCQVoxEjAQBgNVBAcMCUZsYWdzdGFmZjEMMAoGA1UECgwDRm9v
MRAwDgYDVQQLDAdleGFtcGxlMQ8wDQYDVQQDDAZwZWVyLWExHTAbBgkqhkiG9w0B
CQEWDmZvb0BwZWVyLWEuY29tMB4XDTIyMDUyNjAxMDQ0NFoXDTIzMDUyNjAxMDQ0
NFowfjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkFaMRIwEAYDVQQHDAlGbGFnc3Rh
ZmYxDDAKBgNVBAoMA0ZvbzEQMA4GA1UECwwHZXhhbXBsZTEPMA0GA1UEAwwGcGVl
ci1hMR0wGwYJKoZIhvcNAQkBFg5mb29AcGVlci1hLmNvbTCBnzANBgkqhkiG9w0B
AQEFAAOBjQAwgYkCgYEA2zFYGTbXDAntT5pLTpZ2+VTiqx4J63VRJH1kdu11f0FV
c2jl1pqCuYDbQXknDU0Pv1Q5y0+nSAihD2KqGS571r+vHQiPtKYPYRqPEe9FzAhR
2KhWH6v/tk5DG1HqOjV9/zWRKB12gdFNZZqnw/e7NjLNq3wZ2UAwxXip5uJ8uwMC
AwEAATANBgkqhkiG9w0BAQsFAAOBgQC/CJ9Syf4aL91wZizKTejwouRYoWv4gRAk
yto45ZcNMHfJ0G2z+XAMl9ZbQsLgXmzAx4IM6y5Jckq8pKC4PEijCjlKTktLHlEy
0ggmFxtNB1tid2NC8dOzcQ3l45+gDjDqdILhAvLDjlAIebdkqVqb2CfFNW/I2CQH
ZAuKN1aoKA==
-----END CERTIFICATE-----`},
},
{
PeerName: "peer-b",
TrustDomain: "d89ac423-e95a-475d-94f2-1c557c57bf31.consul",
RootPEMs: []string{`-----BEGIN CERTIFICATE-----
MIICcTCCAdoCCQDyGxC08cD0BDANBgkqhkiG9w0BAQsFADB9MQswCQYDVQQGEwJV
UzELMAkGA1UECAwCQ0ExETAPBgNVBAcMCENhcmxzYmFkMQwwCgYDVQQKDANGb28x
EDAOBgNVBAsMB2V4YW1wbGUxDzANBgNVBAMMBnBlZXItYjEdMBsGCSqGSIb3DQEJ
ARYOZm9vQHBlZXItYi5jb20wHhcNMjIwNTI2MDExNjE2WhcNMjMwNTI2MDExNjE2
WjB9MQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExETAPBgNVBAcMCENhcmxzYmFk
MQwwCgYDVQQKDANGb28xEDAOBgNVBAsMB2V4YW1wbGUxDzANBgNVBAMMBnBlZXIt
YjEdMBsGCSqGSIb3DQEJARYOZm9vQHBlZXItYi5jb20wgZ8wDQYJKoZIhvcNAQEB
BQADgY0AMIGJAoGBAL4i5erdZ5vKk3mzW9Qt6Wvw/WN/IpMDlL0a28wz9oDCtMLN
cD/XQB9yT5jUwb2s4mD1lCDZtee8MHeD8zygICozufWVB+u2KvMaoA50T9GMQD0E
z/0nz/Z703I4q13VHeTpltmEpYcfxw/7nJ3leKA34+Nj3zteJ70iqvD/TNBBAgMB
AAEwDQYJKoZIhvcNAQELBQADgYEAbL04gicH+EIznDNhZJEb1guMBtBBJ8kujPyU
ao8xhlUuorDTLwhLpkKsOhD8619oSS8KynjEBichidQRkwxIaze0a2mrGT+tGBMf
pVz6UeCkqpde6bSJ/ozEe/2seQzKqYvRT1oUjLwYvY7OIh2DzYibOAxh6fewYAmU
5j5qNLc=
-----END CERTIFICATE-----`},
},
},
}
}
// TestCerts generates a CA and Leaf suitable for returning as mock CA
// root/leaf cache requests.
func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) {
t.Helper()
ca := connect.TestCA(t, nil)
roots := &structs.IndexedCARoots{
ActiveRootID: ca.ID,
TrustDomain: fmt.Sprintf("%s.consul", connect.TestClusterID),
Roots: []*structs.CARoot{ca},
}
return roots, TestLeafForCA(t, ca)
}
// TestLeafForCA generates new Leaf suitable for returning as mock CA
// leaf cache response, signed by an existing CA.
func TestLeafForCA(t testing.T, ca *structs.CARoot) *structs.IssuedCert {
leafPEM, pkPEM := connect.TestLeaf(t, "web", ca)
leafCert, err := connect.ParseCert(leafPEM)
require.NoError(t, err)
return &structs.IssuedCert{
SerialNumber: connect.EncodeSerialNumber(leafCert.SerialNumber),
CertPEM: leafPEM,
PrivateKeyPEM: pkPEM,
Service: "web",
ServiceURI: leafCert.URIs[0].String(),
ValidAfter: leafCert.NotBefore,
ValidBefore: leafCert.NotAfter,
}
}
// TestIntentions returns a sample intentions match result useful to
// mocking service discovery cache results.
func TestIntentions() *structs.IndexedIntentionMatches {
return &structs.IndexedIntentionMatches{
Matches: []structs.Intentions{
[]*structs.Intention{
{
ID: "foo",
SourceNS: "default",
SourceName: "billing",
DestinationNS: "default",
DestinationName: "web",
Action: structs.IntentionActionAllow,
},
},
},
}
}
// TestUpstreamNodes returns a sample service discovery result useful to
// mocking service discovery cache results.
func TestUpstreamNodes(t testing.T, service string) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.10.1.1",
Datacenter: "dc1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
Service: structs.TestNodeServiceWithName(t, service),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.10.1.2",
Datacenter: "dc1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
Service: structs.TestNodeServiceWithName(t, service),
},
}
}
2021-08-19 22:26:58 +00:00
// TestPreparedQueryNodes returns instances of a service spread across two datacenters.
// The service instance names use a "-target" suffix to ensure we don't use the
// prepared query's name for SAN validation.
// The name of prepared queries won't always match the name of the service they target.
func TestPreparedQueryNodes(t testing.T, query string) structs.CheckServiceNodes {
nodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.10.1.1",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
2021-08-19 22:26:58 +00:00
Service: query + "-sidecar-proxy",
Port: 8080,
Proxy: structs.ConnectProxyConfig{
2021-08-19 22:26:58 +00:00
DestinationServiceName: query + "-target",
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.20.1.2",
Datacenter: "dc2",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
2021-08-19 22:26:58 +00:00
Service: query + "-target",
Port: 8080,
Connect: structs.ServiceConnect{Native: true},
},
},
}
return nodes
}
connect: fix failover through a mesh gateway to a remote datacenter (#6259) Failover is pushed entirely down to the data plane by creating envoy clusters and putting each successive destination in a different load assignment priority band. For example this shows that normally requests go to 1.2.3.4:8080 but when that fails they go to 6.7.8.9:8080: - name: foo load_assignment: cluster_name: foo policy: overprovisioning_factor: 100000 endpoints: - priority: 0 lb_endpoints: - endpoint: address: socket_address: address: 1.2.3.4 port_value: 8080 - priority: 1 lb_endpoints: - endpoint: address: socket_address: address: 6.7.8.9 port_value: 8080 Mesh gateways route requests based solely on the SNI header tacked onto the TLS layer. Envoy currently only lets you configure the outbound SNI header at the cluster layer. If you try to failover through a mesh gateway you ideally would configure the SNI value per endpoint, but that's not possible in envoy today. This PR introduces a simpler way around the problem for now: 1. We identify any target of failover that will use mesh gateway mode local or remote and then further isolate any resolver node in the compiled discovery chain that has a failover destination set to one of those targets. 2. For each of these resolvers we will perform a small measurement of comparative healths of the endpoints that come back from the health API for the set of primary target and serial failover targets. We walk the list of targets in order and if any endpoint is healthy we return that target, otherwise we move on to the next target. 3. The CDS and EDS endpoints both perform the measurements in (2) for the affected resolver nodes. 4. For CDS this measurement selects which TLS SNI field to use for the cluster (note the cluster is always going to be named for the primary target) 5. For EDS this measurement selects which set of endpoints will populate the cluster. Priority tiered failover is ignored. One of the big downsides to this approach to failover is that the failover detection and correction is going to be controlled by consul rather than deferring that entirely to the data plane as with the prior version. This also means that we are bound to only failover using official health signals and cannot make use of data plane signals like outlier detection to affect failover. In this specific scenario the lack of data plane signals is ok because the effectiveness is already muted by the fact that the ultimate destination endpoints will have their data plane signals scrambled when they pass through the mesh gateway wrapper anyway so we're not losing much. Another related fix is that we now use the endpoint health from the underlying service, not the health of the gateway (regardless of failover mode).
2019-08-05 18:30:35 +00:00
func TestUpstreamNodesInStatus(t testing.T, status string) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.10.1.1",
Datacenter: "dc1",
},
Service: structs.TestNodeService(t),
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "test1",
ServiceName: "web",
Name: "force",
Status: status,
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.10.1.2",
Datacenter: "dc1",
},
Service: structs.TestNodeService(t),
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "test2",
ServiceName: "web",
Name: "force",
Status: status,
},
},
},
}
}
func TestUpstreamNodesDC2(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.20.1.1",
Datacenter: "dc2",
},
Service: structs.TestNodeService(t),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.20.1.2",
Datacenter: "dc2",
},
Service: structs.TestNodeService(t),
},
}
}
connect: fix failover through a mesh gateway to a remote datacenter (#6259) Failover is pushed entirely down to the data plane by creating envoy clusters and putting each successive destination in a different load assignment priority band. For example this shows that normally requests go to 1.2.3.4:8080 but when that fails they go to 6.7.8.9:8080: - name: foo load_assignment: cluster_name: foo policy: overprovisioning_factor: 100000 endpoints: - priority: 0 lb_endpoints: - endpoint: address: socket_address: address: 1.2.3.4 port_value: 8080 - priority: 1 lb_endpoints: - endpoint: address: socket_address: address: 6.7.8.9 port_value: 8080 Mesh gateways route requests based solely on the SNI header tacked onto the TLS layer. Envoy currently only lets you configure the outbound SNI header at the cluster layer. If you try to failover through a mesh gateway you ideally would configure the SNI value per endpoint, but that's not possible in envoy today. This PR introduces a simpler way around the problem for now: 1. We identify any target of failover that will use mesh gateway mode local or remote and then further isolate any resolver node in the compiled discovery chain that has a failover destination set to one of those targets. 2. For each of these resolvers we will perform a small measurement of comparative healths of the endpoints that come back from the health API for the set of primary target and serial failover targets. We walk the list of targets in order and if any endpoint is healthy we return that target, otherwise we move on to the next target. 3. The CDS and EDS endpoints both perform the measurements in (2) for the affected resolver nodes. 4. For CDS this measurement selects which TLS SNI field to use for the cluster (note the cluster is always going to be named for the primary target) 5. For EDS this measurement selects which set of endpoints will populate the cluster. Priority tiered failover is ignored. One of the big downsides to this approach to failover is that the failover detection and correction is going to be controlled by consul rather than deferring that entirely to the data plane as with the prior version. This also means that we are bound to only failover using official health signals and cannot make use of data plane signals like outlier detection to affect failover. In this specific scenario the lack of data plane signals is ok because the effectiveness is already muted by the fact that the ultimate destination endpoints will have their data plane signals scrambled when they pass through the mesh gateway wrapper anyway so we're not losing much. Another related fix is that we now use the endpoint health from the underlying service, not the health of the gateway (regardless of failover mode).
2019-08-05 18:30:35 +00:00
func TestUpstreamNodesInStatusDC2(t testing.T, status string) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.20.1.1",
Datacenter: "dc2",
},
Service: structs.TestNodeService(t),
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "test1",
ServiceName: "web",
Name: "force",
Status: status,
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.20.1.2",
Datacenter: "dc2",
},
Service: structs.TestNodeService(t),
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "test2",
ServiceName: "web",
Name: "force",
Status: status,
},
},
},
}
}
func TestUpstreamNodesAlternate(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "alt-test1",
Node: "alt-test1",
Address: "10.20.1.1",
Datacenter: "dc1",
},
Service: structs.TestNodeService(t),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "alt-test2",
Node: "alt-test2",
Address: "10.20.1.2",
Datacenter: "dc1",
},
Service: structs.TestNodeService(t),
},
}
}
connect: fix failover through a mesh gateway to a remote datacenter (#6259) Failover is pushed entirely down to the data plane by creating envoy clusters and putting each successive destination in a different load assignment priority band. For example this shows that normally requests go to 1.2.3.4:8080 but when that fails they go to 6.7.8.9:8080: - name: foo load_assignment: cluster_name: foo policy: overprovisioning_factor: 100000 endpoints: - priority: 0 lb_endpoints: - endpoint: address: socket_address: address: 1.2.3.4 port_value: 8080 - priority: 1 lb_endpoints: - endpoint: address: socket_address: address: 6.7.8.9 port_value: 8080 Mesh gateways route requests based solely on the SNI header tacked onto the TLS layer. Envoy currently only lets you configure the outbound SNI header at the cluster layer. If you try to failover through a mesh gateway you ideally would configure the SNI value per endpoint, but that's not possible in envoy today. This PR introduces a simpler way around the problem for now: 1. We identify any target of failover that will use mesh gateway mode local or remote and then further isolate any resolver node in the compiled discovery chain that has a failover destination set to one of those targets. 2. For each of these resolvers we will perform a small measurement of comparative healths of the endpoints that come back from the health API for the set of primary target and serial failover targets. We walk the list of targets in order and if any endpoint is healthy we return that target, otherwise we move on to the next target. 3. The CDS and EDS endpoints both perform the measurements in (2) for the affected resolver nodes. 4. For CDS this measurement selects which TLS SNI field to use for the cluster (note the cluster is always going to be named for the primary target) 5. For EDS this measurement selects which set of endpoints will populate the cluster. Priority tiered failover is ignored. One of the big downsides to this approach to failover is that the failover detection and correction is going to be controlled by consul rather than deferring that entirely to the data plane as with the prior version. This also means that we are bound to only failover using official health signals and cannot make use of data plane signals like outlier detection to affect failover. In this specific scenario the lack of data plane signals is ok because the effectiveness is already muted by the fact that the ultimate destination endpoints will have their data plane signals scrambled when they pass through the mesh gateway wrapper anyway so we're not losing much. Another related fix is that we now use the endpoint health from the underlying service, not the health of the gateway (regardless of failover mode).
2019-08-05 18:30:35 +00:00
func TestGatewayNodesDC1(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-1",
Node: "mesh-gateway",
Address: "10.10.1.1",
Datacenter: "dc1",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.10.1.1", 8443,
structs.ServiceAddress{Address: "10.10.1.1", Port: 8443},
structs.ServiceAddress{Address: "198.118.1.1", Port: 443}),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-2",
Node: "mesh-gateway",
Address: "10.10.1.2",
Datacenter: "dc1",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.10.1.2", 8443,
structs.ServiceAddress{Address: "10.0.1.2", Port: 8443},
structs.ServiceAddress{Address: "198.118.1.2", Port: 443}),
},
}
}
func TestGatewayNodesDC2(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-1",
Node: "mesh-gateway",
Address: "10.0.1.1",
Datacenter: "dc2",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.0.1.1", 8443,
structs.ServiceAddress{Address: "10.0.1.1", Port: 8443},
structs.ServiceAddress{Address: "198.18.1.1", Port: 443}),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-2",
Node: "mesh-gateway",
Address: "10.0.1.2",
Datacenter: "dc2",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.0.1.2", 8443,
structs.ServiceAddress{Address: "10.0.1.2", Port: 8443},
structs.ServiceAddress{Address: "198.18.1.2", Port: 443}),
},
}
}
connect: fix failover through a mesh gateway to a remote datacenter (#6259) Failover is pushed entirely down to the data plane by creating envoy clusters and putting each successive destination in a different load assignment priority band. For example this shows that normally requests go to 1.2.3.4:8080 but when that fails they go to 6.7.8.9:8080: - name: foo load_assignment: cluster_name: foo policy: overprovisioning_factor: 100000 endpoints: - priority: 0 lb_endpoints: - endpoint: address: socket_address: address: 1.2.3.4 port_value: 8080 - priority: 1 lb_endpoints: - endpoint: address: socket_address: address: 6.7.8.9 port_value: 8080 Mesh gateways route requests based solely on the SNI header tacked onto the TLS layer. Envoy currently only lets you configure the outbound SNI header at the cluster layer. If you try to failover through a mesh gateway you ideally would configure the SNI value per endpoint, but that's not possible in envoy today. This PR introduces a simpler way around the problem for now: 1. We identify any target of failover that will use mesh gateway mode local or remote and then further isolate any resolver node in the compiled discovery chain that has a failover destination set to one of those targets. 2. For each of these resolvers we will perform a small measurement of comparative healths of the endpoints that come back from the health API for the set of primary target and serial failover targets. We walk the list of targets in order and if any endpoint is healthy we return that target, otherwise we move on to the next target. 3. The CDS and EDS endpoints both perform the measurements in (2) for the affected resolver nodes. 4. For CDS this measurement selects which TLS SNI field to use for the cluster (note the cluster is always going to be named for the primary target) 5. For EDS this measurement selects which set of endpoints will populate the cluster. Priority tiered failover is ignored. One of the big downsides to this approach to failover is that the failover detection and correction is going to be controlled by consul rather than deferring that entirely to the data plane as with the prior version. This also means that we are bound to only failover using official health signals and cannot make use of data plane signals like outlier detection to affect failover. In this specific scenario the lack of data plane signals is ok because the effectiveness is already muted by the fact that the ultimate destination endpoints will have their data plane signals scrambled when they pass through the mesh gateway wrapper anyway so we're not losing much. Another related fix is that we now use the endpoint health from the underlying service, not the health of the gateway (regardless of failover mode).
2019-08-05 18:30:35 +00:00
func TestGatewayNodesDC3(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-1",
Node: "mesh-gateway",
Address: "10.30.1.1",
Datacenter: "dc3",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.1", 8443,
structs.ServiceAddress{Address: "10.0.1.1", Port: 8443},
structs.ServiceAddress{Address: "198.38.1.1", Port: 443}),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-2",
Node: "mesh-gateway",
Address: "10.30.1.2",
Datacenter: "dc3",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.2", 8443,
structs.ServiceAddress{Address: "10.30.1.2", Port: 8443},
structs.ServiceAddress{Address: "198.38.1.2", Port: 443}),
},
}
}
func TestGatewayNodesDC4Hostname(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-1",
Node: "mesh-gateway",
Address: "10.30.1.1",
Datacenter: "dc4",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.1", 8443,
structs.ServiceAddress{Address: "10.0.1.1", Port: 8443},
structs.ServiceAddress{Address: "123.us-west-2.elb.notaws.com", Port: 443}),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-2",
Node: "mesh-gateway",
Address: "10.30.1.2",
Datacenter: "dc4",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.2", 8443,
structs.ServiceAddress{Address: "10.30.1.2", Port: 8443},
structs.ServiceAddress{Address: "456.us-west-2.elb.notaws.com", Port: 443}),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-3",
Node: "mesh-gateway",
Address: "10.30.1.3",
Datacenter: "dc4",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.3", 8443,
structs.ServiceAddress{Address: "10.30.1.3", Port: 8443},
structs.ServiceAddress{Address: "198.38.1.1", Port: 443}),
},
}
}
func TestGatewayNodesDC5Hostname(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-1",
Node: "mesh-gateway",
Address: "10.30.1.1",
Datacenter: "dc5",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.1", 8443,
structs.ServiceAddress{Address: "10.0.1.1", Port: 8443},
structs.ServiceAddress{Address: "123.us-west-2.elb.notaws.com", Port: 443}),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-2",
Node: "mesh-gateway",
Address: "10.30.1.2",
Datacenter: "dc5",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.2", 8443,
structs.ServiceAddress{Address: "10.30.1.2", Port: 8443},
structs.ServiceAddress{Address: "456.us-west-2.elb.notaws.com", Port: 443}),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-3",
Node: "mesh-gateway",
Address: "10.30.1.3",
Datacenter: "dc5",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.3", 8443,
structs.ServiceAddress{Address: "10.30.1.3", Port: 8443},
structs.ServiceAddress{Address: "198.38.1.1", Port: 443}),
},
}
}
func TestGatewayNodesDC6Hostname(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "mesh-gateway-1",
Node: "mesh-gateway",
Address: "10.30.1.1",
Datacenter: "dc6",
},
Service: structs.TestNodeServiceMeshGatewayWithAddrs(t,
"10.30.1.1", 8443,
structs.ServiceAddress{Address: "10.0.1.1", Port: 8443},
structs.ServiceAddress{Address: "123.us-east-1.elb.notaws.com", Port: 443}),
Checks: structs.HealthChecks{
{
Status: api.HealthCritical,
},
},
},
}
}
func TestGatewayServiceGroupBarDC1(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "bar-node-1",
Node: "bar-node-1",
Address: "10.1.1.4",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "bar-sidecar-proxy",
Address: "172.16.1.6",
Port: 2222,
Meta: map[string]string{
"version": "1",
},
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "bar",
Upstreams: structs.TestUpstreams(t),
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "bar-node-2",
Node: "bar-node-2",
Address: "10.1.1.5",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "bar-sidecar-proxy",
Address: "172.16.1.7",
Port: 2222,
Meta: map[string]string{
"version": "1",
},
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "bar",
Upstreams: structs.TestUpstreams(t),
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "bar-node-3",
Node: "bar-node-3",
Address: "10.1.1.6",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "bar-sidecar-proxy",
Address: "172.16.1.8",
Port: 2222,
Meta: map[string]string{
"version": "2",
},
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "bar",
Upstreams: structs.TestUpstreams(t),
},
},
},
}
}
func TestGatewayServiceGroupFooDC1(t testing.T) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "foo-node-1",
Node: "foo-node-1",
Address: "10.1.1.1",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "foo-sidecar-proxy",
Address: "172.16.1.3",
Port: 2222,
Meta: map[string]string{
"version": "1",
},
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
Upstreams: structs.TestUpstreams(t),
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "foo-node-2",
Node: "foo-node-2",
Address: "10.1.1.2",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "foo-sidecar-proxy",
Address: "172.16.1.4",
Port: 2222,
Meta: map[string]string{
"version": "1",
},
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
Upstreams: structs.TestUpstreams(t),
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "foo-node-3",
Node: "foo-node-3",
Address: "10.1.1.3",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "foo-sidecar-proxy",
Address: "172.16.1.5",
Port: 2222,
Meta: map[string]string{
"version": "2",
},
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
Upstreams: structs.TestUpstreams(t),
},
},
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "foo-node-4",
Node: "foo-node-4",
Address: "10.1.1.7",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "foo-sidecar-proxy",
Address: "172.16.1.9",
Port: 2222,
Meta: map[string]string{
"version": "2",
},
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
Upstreams: structs.TestUpstreams(t),
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo-node-4",
ServiceName: "foo-sidecar-proxy",
Name: "proxy-alive",
Status: "warning",
},
},
},
}
}
type noopDataSource[ReqType any] struct{}
func (*noopDataSource[ReqType]) Notify(context.Context, ReqType, string, chan<- UpdateEvent) error {
return nil
}
// testConfigSnapshotFixture helps you execute normal proxycfg event machinery
// to assemble a ConfigSnapshot via standard means to ensure test data used in
// any tests is actually a valid configuration.
//
// The provided ns argument will be manipulated by the nsFn callback if present
// before it is used.
//
// The events provided in the updates slice will be fed into the event
// machinery.
func testConfigSnapshotFixture(
t testing.T,
ns *structs.NodeService,
nsFn func(ns *structs.NodeService),
serverSNIFn ServerSNIFunc,
updates []UpdateEvent,
) *ConfigSnapshot {
const token = ""
2020-08-28 20:27:40 +00:00
if nsFn != nil {
nsFn(ns)
}
config := stateConfig{
logger: hclog.NewNullLogger(),
source: &structs.QuerySource{
Datacenter: "dc1",
},
dataSources: DataSources{
CARoots: &noopDataSource[*structs.DCSpecificRequest]{},
CompiledDiscoveryChain: &noopDataSource[*structs.DiscoveryChainRequest]{},
ConfigEntry: &noopDataSource[*structs.ConfigEntryQuery]{},
ConfigEntryList: &noopDataSource[*structs.ConfigEntryQuery]{},
Datacenters: &noopDataSource[*structs.DatacentersRequest]{},
FederationStateListMeshGateways: &noopDataSource[*structs.DCSpecificRequest]{},
GatewayServices: &noopDataSource[*structs.ServiceSpecificRequest]{},
Health: &noopDataSource[*structs.ServiceSpecificRequest]{},
HTTPChecks: &noopDataSource[*cachetype.ServiceHTTPChecksRequest]{},
Intentions: &noopDataSource[*structs.IntentionQueryRequest]{},
IntentionUpstreams: &noopDataSource[*structs.ServiceSpecificRequest]{},
InternalServiceDump: &noopDataSource[*structs.ServiceDumpRequest]{},
LeafCertificate: &noopDataSource[*cachetype.ConnectCALeafRequest]{},
PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{},
ResolvedServiceConfig: &noopDataSource[*structs.ServiceConfigRequest]{},
ServiceList: &noopDataSource[*structs.DCSpecificRequest]{},
TrustBundle: &noopDataSource[*pbpeering.TrustBundleReadRequest]{},
},
dnsConfig: DNSConfig{ // TODO: make configurable
Domain: "consul",
AltDomain: "",
},
serverSNIFn: serverSNIFn,
intentionDefaultAllow: false, // TODO: make configurable
}
testConfigSnapshotFixtureEnterprise(&config)
s, err := newServiceInstanceFromNodeService(ProxyID{ServiceID: ns.CompoundServiceID()}, ns, token)
if err != nil {
t.Fatalf("err: %v", err)
return nil
}
handler, err := newKindHandler(config, s, nil) // NOTE: nil channel
if err != nil {
t.Fatalf("err: %v", err)
return nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snap, err := handler.initialize(ctx)
if err != nil {
t.Fatalf("err: %v", err)
return nil
}
for _, u := range updates {
if err := handler.handleUpdate(ctx, u, &snap); err != nil {
t.Fatalf("Failed to handle update from watch %q: %v", u.CorrelationID, err)
return nil
connect: fix failover through a mesh gateway to a remote datacenter (#6259) Failover is pushed entirely down to the data plane by creating envoy clusters and putting each successive destination in a different load assignment priority band. For example this shows that normally requests go to 1.2.3.4:8080 but when that fails they go to 6.7.8.9:8080: - name: foo load_assignment: cluster_name: foo policy: overprovisioning_factor: 100000 endpoints: - priority: 0 lb_endpoints: - endpoint: address: socket_address: address: 1.2.3.4 port_value: 8080 - priority: 1 lb_endpoints: - endpoint: address: socket_address: address: 6.7.8.9 port_value: 8080 Mesh gateways route requests based solely on the SNI header tacked onto the TLS layer. Envoy currently only lets you configure the outbound SNI header at the cluster layer. If you try to failover through a mesh gateway you ideally would configure the SNI value per endpoint, but that's not possible in envoy today. This PR introduces a simpler way around the problem for now: 1. We identify any target of failover that will use mesh gateway mode local or remote and then further isolate any resolver node in the compiled discovery chain that has a failover destination set to one of those targets. 2. For each of these resolvers we will perform a small measurement of comparative healths of the endpoints that come back from the health API for the set of primary target and serial failover targets. We walk the list of targets in order and if any endpoint is healthy we return that target, otherwise we move on to the next target. 3. The CDS and EDS endpoints both perform the measurements in (2) for the affected resolver nodes. 4. For CDS this measurement selects which TLS SNI field to use for the cluster (note the cluster is always going to be named for the primary target) 5. For EDS this measurement selects which set of endpoints will populate the cluster. Priority tiered failover is ignored. One of the big downsides to this approach to failover is that the failover detection and correction is going to be controlled by consul rather than deferring that entirely to the data plane as with the prior version. This also means that we are bound to only failover using official health signals and cannot make use of data plane signals like outlier detection to affect failover. In this specific scenario the lack of data plane signals is ok because the effectiveness is already muted by the fact that the ultimate destination endpoints will have their data plane signals scrambled when they pass through the mesh gateway wrapper anyway so we're not losing much. Another related fix is that we now use the endpoint health from the underlying service, not the health of the gateway (regardless of failover mode).
2019-08-05 18:30:35 +00:00
}
}
return &snap
}
func testSpliceEvents(base, extra []UpdateEvent) []UpdateEvent {
if len(extra) == 0 {
return base
}
var (
hasExtra = make(map[string]UpdateEvent)
completeExtra = make(map[string]struct{})
allEvents []UpdateEvent
)
for _, e := range extra {
hasExtra[e.CorrelationID] = e
}
// Override base events with extras if they share the same correlationID,
// then put the rest of the extras at the end.
for _, e := range base {
if extraEvt, ok := hasExtra[e.CorrelationID]; ok {
if extraEvt.Result != nil { // nil results are tombstones
allEvents = append(allEvents, extraEvt)
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
}
completeExtra[e.CorrelationID] = struct{}{}
} else {
allEvents = append(allEvents, e)
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
}
}
for _, e := range extra {
if _, ok := completeExtra[e.CorrelationID]; !ok {
allEvents = append(allEvents, e)
}
}
return allEvents
}
func testSpliceNodeServiceFunc(prev, next func(ns *structs.NodeService)) func(ns *structs.NodeService) {
return func(ns *structs.NodeService) {
if prev != nil {
prev(ns)
2020-04-14 14:59:23 +00:00
}
next(ns)
}
}
// ControllableCacheType is a cache.Type that simulates a typical blocking RPC
// but lets us control the responses and when they are delivered easily.
type ControllableCacheType struct {
index uint64
value sync.Map
// Need a condvar to trigger all blocking requests (there might be multiple
// for same type due to background refresh and timing issues) when values
// change. Chans make it nondeterministic which one triggers or need extra
// locking to coordinate replacing after close etc.
triggerMu sync.Mutex
trigger *sync.Cond
blocking bool
lastReq atomic.Value
}
// NewControllableCacheType returns a cache.Type that can be controlled for
// testing.
func NewControllableCacheType(t testing.T) *ControllableCacheType {
c := &ControllableCacheType{
index: 5,
blocking: true,
}
c.trigger = sync.NewCond(&c.triggerMu)
return c
}
// Set sets the response value to be returned from subsequent cache gets for the
// type.
func (ct *ControllableCacheType) Set(key string, value interface{}) {
atomic.AddUint64(&ct.index, 1)
ct.value.Store(key, value)
ct.triggerMu.Lock()
ct.trigger.Broadcast()
ct.triggerMu.Unlock()
}
// Fetch implements cache.Type. It simulates blocking or non-blocking queries.
func (ct *ControllableCacheType) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
index := atomic.LoadUint64(&ct.index)
ct.lastReq.Store(req)
shouldBlock := ct.blocking && opts.MinIndex > 0 && opts.MinIndex == index
if shouldBlock {
// Wait for return to be triggered. We ignore timeouts based on opts.Timeout
// since in practice they will always be way longer than our tests run for
// and the caller can simulate timeout by triggering return without changing
// index or value.
ct.triggerMu.Lock()
ct.trigger.Wait()
ct.triggerMu.Unlock()
}
info := req.CacheInfo()
key := path.Join(info.Key, info.Datacenter) // omit token for testing purposes
// reload index as it probably got bumped
index = atomic.LoadUint64(&ct.index)
val, _ := ct.value.Load(key)
if err, ok := val.(error); ok {
return cache.FetchResult{
Value: nil,
Index: index,
}, err
}
return cache.FetchResult{
Value: val,
Index: index,
}, nil
}
func (ct *ControllableCacheType) RegisterOptions() cache.RegisterOptions {
return cache.RegisterOptions{
Refresh: ct.blocking,
SupportsBlocking: ct.blocking,
QueryTimeout: 10 * time.Minute,
}
}
// golden is used to read golden files stores in consul/agent/xds/testdata
func golden(t testing.T, name string) string {
t.Helper()
golden := filepath.Join(projectRoot(), "../", "/xds/testdata", name+".golden")
expected, err := ioutil.ReadFile(golden)
require.NoError(t, err)
return string(expected)
}
func projectRoot() string {
_, base, _, _ := runtime.Caller(0)
return filepath.Dir(base)
}
// NewTestDataSources creates a set of data sources that can be used to provide
// the Manager with data in tests.
func NewTestDataSources() *TestDataSources {
srcs := &TestDataSources{
CARoots: NewTestDataSource[*structs.DCSpecificRequest, *structs.IndexedCARoots](),
CompiledDiscoveryChain: NewTestDataSource[*structs.DiscoveryChainRequest, *structs.DiscoveryChainResponse](),
ConfigEntry: NewTestDataSource[*structs.ConfigEntryQuery, *structs.ConfigEntryResponse](),
ConfigEntryList: NewTestDataSource[*structs.ConfigEntryQuery, *structs.IndexedConfigEntries](),
Datacenters: NewTestDataSource[*structs.DatacentersRequest, *[]string](),
FederationStateListMeshGateways: NewTestDataSource[*structs.DCSpecificRequest, *structs.DatacenterIndexedCheckServiceNodes](),
GatewayServices: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedGatewayServices](),
Health: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedCheckServiceNodes](),
HTTPChecks: NewTestDataSource[*cachetype.ServiceHTTPChecksRequest, []structs.CheckType](),
Intentions: NewTestDataSource[*structs.IntentionQueryRequest, *structs.IndexedIntentionMatches](),
IntentionUpstreams: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](),
InternalServiceDump: NewTestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways](),
LeafCertificate: NewTestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert](),
PreparedQuery: NewTestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse](),
ResolvedServiceConfig: NewTestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse](),
ServiceList: NewTestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList](),
TrustBundle: NewTestDataSource[*pbpeering.TrustBundleReadRequest, *pbpeering.TrustBundleReadResponse](),
}
srcs.buildEnterpriseSources()
return srcs
}
type TestDataSources struct {
CARoots *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedCARoots]
CompiledDiscoveryChain *TestDataSource[*structs.DiscoveryChainRequest, *structs.DiscoveryChainResponse]
ConfigEntry *TestDataSource[*structs.ConfigEntryQuery, *structs.ConfigEntryResponse]
ConfigEntryList *TestDataSource[*structs.ConfigEntryQuery, *structs.IndexedConfigEntries]
FederationStateListMeshGateways *TestDataSource[*structs.DCSpecificRequest, *structs.DatacenterIndexedCheckServiceNodes]
Datacenters *TestDataSource[*structs.DatacentersRequest, *[]string]
GatewayServices *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedGatewayServices]
Health *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedCheckServiceNodes]
HTTPChecks *TestDataSource[*cachetype.ServiceHTTPChecksRequest, []structs.CheckType]
Intentions *TestDataSource[*structs.IntentionQueryRequest, *structs.IndexedIntentionMatches]
IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways]
LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert]
PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse]
ResolvedServiceConfig *TestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse]
ServiceList *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList]
TrustBundle *TestDataSource[*pbpeering.TrustBundleReadRequest, *pbpeering.TrustBundleReadResponse]
}
func (t *TestDataSources) ToDataSources() DataSources {
ds := DataSources{
CARoots: t.CARoots,
CompiledDiscoveryChain: t.CompiledDiscoveryChain,
ConfigEntry: t.ConfigEntry,
ConfigEntryList: t.ConfigEntryList,
Datacenters: t.Datacenters,
GatewayServices: t.GatewayServices,
Health: t.Health,
HTTPChecks: t.HTTPChecks,
Intentions: t.Intentions,
IntentionUpstreams: t.IntentionUpstreams,
InternalServiceDump: t.InternalServiceDump,
LeafCertificate: t.LeafCertificate,
PreparedQuery: t.PreparedQuery,
ResolvedServiceConfig: t.ResolvedServiceConfig,
ServiceList: t.ServiceList,
TrustBundle: t.TrustBundle,
}
t.fillEnterpriseDataSources(&ds)
return ds
}
// NewTestDataSource creates a test data source that accepts requests to Notify
// of type RequestType and dispatches UpdateEvents with a result of type ValType.
//
// TODO(agentless): we still depend on cache.Request here because it provides the
// CacheInfo method used for hashing the request - this won't work when we extract
// this package into a shared library.
func NewTestDataSource[ReqType cache.Request, ValType any]() *TestDataSource[ReqType, ValType] {
return &TestDataSource[ReqType, ValType]{
data: make(map[string]ValType),
trigger: make(chan struct{}),
}
}
type TestDataSource[ReqType cache.Request, ValType any] struct {
mu sync.Mutex
data map[string]ValType
lastReq ReqType
// Note: trigger is currently global for all requests of the given type, so
// Manager may receive duplicate events - as the dispatch goroutine will be
// woken up whenever *any* requested data changes.
trigger chan struct{}
}
// Notify satisfies the interfaces used by Manager to subscribe to data.
func (t *TestDataSource[ReqType, ValType]) Notify(ctx context.Context, req ReqType, correlationID string, ch chan<- UpdateEvent) error {
t.mu.Lock()
t.lastReq = req
t.mu.Unlock()
go t.dispatch(ctx, correlationID, t.reqKey(req), ch)
return nil
}
func (t *TestDataSource[ReqType, ValType]) dispatch(ctx context.Context, correlationID, key string, ch chan<- UpdateEvent) {
for {
t.mu.Lock()
val, ok := t.data[key]
trigger := t.trigger
t.mu.Unlock()
if ok {
event := UpdateEvent{
CorrelationID: correlationID,
Result: val,
}
select {
case ch <- event:
case <-ctx.Done():
}
}
select {
case <-trigger:
case <-ctx.Done():
return
}
}
}
func (t *TestDataSource[ReqType, ValType]) reqKey(req ReqType) string {
return req.CacheInfo().Key
}
// Set broadcasts the given value to consumers that subscribed with the given
// request.
func (t *TestDataSource[ReqType, ValType]) Set(req ReqType, val ValType) error {
t.mu.Lock()
t.data[t.reqKey(req)] = val
oldTrigger := t.trigger
t.trigger = make(chan struct{})
t.mu.Unlock()
close(oldTrigger)
return nil
}
// LastReq returns the request from the last call to Notify that was received.
func (t *TestDataSource[ReqType, ValType]) LastReq() ReqType {
t.mu.Lock()
defer t.mu.Unlock()
return t.lastReq
}