mirror of https://github.com/status-im/consul.git
upgrade test: splitter and resolver config entry in peered cluster (#16356)
This commit is contained in:
parent
0972697661
commit
de17c7c26f
|
@ -49,9 +49,17 @@ func CatalogNodeExists(t *testing.T, c *api.Client, nodeName string) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func HTTPServiceEchoes(t *testing.T, ip string, port int, path string) {
|
||||||
|
doHTTPServiceEchoes(t, ip, port, path, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func HTTPServiceEchoesResHeader(t *testing.T, ip string, port int, path string, expectedResHeader map[string]string) {
|
||||||
|
doHTTPServiceEchoes(t, ip, port, path, expectedResHeader)
|
||||||
|
}
|
||||||
|
|
||||||
// HTTPServiceEchoes verifies that a post to the given ip/port combination returns the data
|
// HTTPServiceEchoes verifies that a post to the given ip/port combination returns the data
|
||||||
// in the response body. Optional path can be provided to differentiate requests.
|
// in the response body. Optional path can be provided to differentiate requests.
|
||||||
func HTTPServiceEchoes(t *testing.T, ip string, port int, path string) {
|
func doHTTPServiceEchoes(t *testing.T, ip string, port int, path string, expectedResHeader map[string]string) {
|
||||||
const phrase = "hello"
|
const phrase = "hello"
|
||||||
|
|
||||||
failer := func() *retry.Timer {
|
failer := func() *retry.Timer {
|
||||||
|
@ -82,6 +90,24 @@ func HTTPServiceEchoes(t *testing.T, ip string, port int, path string) {
|
||||||
if !strings.Contains(string(body), phrase) {
|
if !strings.Contains(string(body), phrase) {
|
||||||
r.Fatal("received an incorrect response ", string(body))
|
r.Fatal("received an incorrect response ", string(body))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for k, v := range expectedResHeader {
|
||||||
|
if headerValues, ok := res.Header[k]; !ok {
|
||||||
|
r.Fatal("expected header not found", k)
|
||||||
|
} else {
|
||||||
|
found := false
|
||||||
|
for _, value := range headerValues {
|
||||||
|
if value == v {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
r.Fatalf("header %s value not match want %s got %s ", k, v, headerValues)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ const disableRYUKEnv = "TESTCONTAINERS_RYUK_DISABLED"
|
||||||
// Exposed ports info
|
// Exposed ports info
|
||||||
const MaxEnvoyOnNode = 10 // the max number of Envoy sidecar can run along with the agent, base is 19000
|
const MaxEnvoyOnNode = 10 // the max number of Envoy sidecar can run along with the agent, base is 19000
|
||||||
const ServiceUpstreamLocalBindPort = 5000 // local bind Port of service's upstream
|
const ServiceUpstreamLocalBindPort = 5000 // local bind Port of service's upstream
|
||||||
|
const ServiceUpstreamLocalBindPort2 = 5001 // local bind Port of service's upstream, for services with 2 upstreams
|
||||||
|
|
||||||
// consulContainerNode implements the Agent interface by running a Consul agent
|
// consulContainerNode implements the Agent interface by running a Consul agent
|
||||||
// in a container.
|
// in a container.
|
||||||
|
@ -530,6 +531,7 @@ func newContainerRequest(config Config, opts containerOpts) (podRequest, consulR
|
||||||
|
|
||||||
// Envoy upstream listener
|
// Envoy upstream listener
|
||||||
pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", ServiceUpstreamLocalBindPort))
|
pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", ServiceUpstreamLocalBindPort))
|
||||||
|
pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", ServiceUpstreamLocalBindPort2))
|
||||||
|
|
||||||
// Reserve the exposed ports for Envoy admin port, e.g., 19000 - 19009
|
// Reserve the exposed ports for Envoy admin port, e.g., 19000 - 19009
|
||||||
basePort := 19000
|
basePort := 19000
|
||||||
|
|
|
@ -14,7 +14,6 @@ import (
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
|
"github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
|
||||||
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
|
|
||||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -23,7 +22,7 @@ type ConnectContainer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
container testcontainers.Container
|
container testcontainers.Container
|
||||||
ip string
|
ip string
|
||||||
appPort int
|
appPort []int
|
||||||
externalAdminPort int
|
externalAdminPort int
|
||||||
internalAdminPort int
|
internalAdminPort int
|
||||||
mappedPublicPort int
|
mappedPublicPort int
|
||||||
|
@ -52,6 +51,10 @@ func (g ConnectContainer) Export(partition, peer string, client *api.Client) err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g ConnectContainer) GetAddr() (string, int) {
|
func (g ConnectContainer) GetAddr() (string, int) {
|
||||||
|
return g.ip, g.appPort[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g ConnectContainer) GetAddrs() (string, []int) {
|
||||||
return g.ip, g.appPort
|
return g.ip, g.appPort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,7 +142,7 @@ func (g ConnectContainer) GetStatus() (string, error) {
|
||||||
// node. The container exposes port serviceBindPort and envoy admin port
|
// node. The container exposes port serviceBindPort and envoy admin port
|
||||||
// (19000) by mapping them onto host ports. The container's name has a prefix
|
// (19000) by mapping them onto host ports. The container's name has a prefix
|
||||||
// combining datacenter and name.
|
// combining datacenter and name.
|
||||||
func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) {
|
func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID string, serviceBindPorts []int, node cluster.Agent) (*ConnectContainer, error) {
|
||||||
nodeConfig := node.GetConfig()
|
nodeConfig := node.GetConfig()
|
||||||
if nodeConfig.ScratchDir == "" {
|
if nodeConfig.ScratchDir == "" {
|
||||||
return nil, fmt.Errorf("node ScratchDir is required")
|
return nil, fmt.Errorf("node ScratchDir is required")
|
||||||
|
@ -209,11 +212,19 @@ func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
appPortStr = strconv.Itoa(serviceBindPort)
|
appPortStrs []string
|
||||||
adminPortStr = strconv.Itoa(internalAdminPort)
|
adminPortStr = strconv.Itoa(internalAdminPort)
|
||||||
)
|
)
|
||||||
|
|
||||||
info, err := cluster.LaunchContainerOnNode(ctx, node, req, []string{appPortStr, adminPortStr})
|
for _, port := range serviceBindPorts {
|
||||||
|
appPortStrs = append(appPortStrs, strconv.Itoa(port))
|
||||||
|
}
|
||||||
|
|
||||||
|
// expose the app ports and the envoy adminPortStr on the agent container
|
||||||
|
exposedPorts := make([]string, len(appPortStrs))
|
||||||
|
copy(exposedPorts, appPortStrs)
|
||||||
|
exposedPorts = append(exposedPorts, adminPortStr)
|
||||||
|
info, err := cluster.LaunchContainerOnNode(ctx, node, req, exposedPorts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -222,14 +233,17 @@ func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
container: info.Container,
|
container: info.Container,
|
||||||
ip: info.IP,
|
ip: info.IP,
|
||||||
appPort: info.MappedPorts[appPortStr].Int(),
|
|
||||||
externalAdminPort: info.MappedPorts[adminPortStr].Int(),
|
externalAdminPort: info.MappedPorts[adminPortStr].Int(),
|
||||||
internalAdminPort: internalAdminPort,
|
internalAdminPort: internalAdminPort,
|
||||||
serviceName: sidecarServiceName,
|
serviceName: sidecarServiceName,
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("NewConnectService: name %s, mapped App Port %d, service bind port %d\n",
|
for _, port := range appPortStrs {
|
||||||
serviceID, out.appPort, serviceBindPort)
|
out.appPort = append(out.appPort, info.MappedPorts[port].Int())
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("NewConnectService: name %s, mapped App Port %d, service bind port %v\n",
|
||||||
|
serviceID, out.appPort, serviceBindPorts)
|
||||||
fmt.Printf("NewConnectService sidecar: name %s, mapped admin port %d, admin port %d\n",
|
fmt.Printf("NewConnectService sidecar: name %s, mapped admin port %d, admin port %d\n",
|
||||||
sidecarServiceName, out.externalAdminPort, internalAdminPort)
|
sidecarServiceName, out.externalAdminPort, internalAdminPort)
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,10 @@ func (g exampleContainer) GetAddr() (string, int) {
|
||||||
return g.ip, g.httpPort
|
return g.ip, g.httpPort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g exampleContainer) GetAddrs() (string, []int) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
func (g exampleContainer) Restart() error {
|
func (g exampleContainer) Restart() error {
|
||||||
return fmt.Errorf("Restart Unimplemented by ConnectContainer")
|
return fmt.Errorf("Restart Unimplemented by ConnectContainer")
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,10 @@ func (g gatewayContainer) GetAddr() (string, int) {
|
||||||
return g.ip, g.port
|
return g.ip, g.port
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g gatewayContainer) GetAddrs() (string, []int) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
func (g gatewayContainer) GetLogs() (string, error) {
|
func (g gatewayContainer) GetLogs() (string, error) {
|
||||||
rc, err := g.container.Logs(context.Background())
|
rc, err := g.container.Logs(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -61,7 +61,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts
|
||||||
_ = serverService.Terminate()
|
_ = serverService.Terminate()
|
||||||
})
|
})
|
||||||
|
|
||||||
serverConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", serviceOpts.ID), serviceOpts.ID, serviceOpts.HTTPPort, node) // bindPort not used
|
serverConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", serviceOpts.ID), serviceOpts.ID, []int{serviceOpts.HTTPPort}, node) // bindPort not used
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ func CreateAndRegisterStaticClientSidecar(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a service and proxy instance
|
// Create a service and proxy instance
|
||||||
clientConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", StaticClientServiceName), StaticClientServiceName, libcluster.ServiceUpstreamLocalBindPort, node)
|
clientConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", StaticClientServiceName), StaticClientServiceName, []int{libcluster.ServiceUpstreamLocalBindPort}, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ type Service interface {
|
||||||
// Export a service to the peering cluster
|
// Export a service to the peering cluster
|
||||||
Export(partition, peer string, client *api.Client) error
|
Export(partition, peer string, client *api.Client) error
|
||||||
GetAddr() (string, int)
|
GetAddr() (string, int)
|
||||||
|
GetAddrs() (string, []int)
|
||||||
// GetAdminAddr returns the external admin address
|
// GetAdminAddr returns the external admin address
|
||||||
GetAdminAddr() (string, int)
|
GetAdminAddr() (string, int)
|
||||||
GetLogs() (string, error)
|
GetLogs() (string, error)
|
||||||
|
|
|
@ -24,7 +24,12 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
oldversion string
|
oldversion string
|
||||||
targetVersion string
|
targetVersion string
|
||||||
name string
|
name string
|
||||||
create func(*cluster.Cluster) (libservice.Service, error)
|
// create creates addtional resources in peered clusters depending on cases, e.g., static-client,
|
||||||
|
// static server, and config-entries. It returns the proxy services, an assertation function to
|
||||||
|
// be called to verify the resources.
|
||||||
|
create func(*cluster.Cluster, *cluster.Cluster) (libservice.Service, libservice.Service, func(), error)
|
||||||
|
// extraAssertion adds additional assertion function to the common resources across cases.
|
||||||
|
// common resources includes static-client in dialing cluster, and static-server in accepting cluster.
|
||||||
extraAssertion func(int)
|
extraAssertion func(int)
|
||||||
}
|
}
|
||||||
tcs := []testcase{
|
tcs := []testcase{
|
||||||
|
@ -38,8 +43,8 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
oldversion: "1.14",
|
oldversion: "1.14",
|
||||||
targetVersion: utils.TargetVersion,
|
targetVersion: utils.TargetVersion,
|
||||||
name: "basic",
|
name: "basic",
|
||||||
create: func(c *cluster.Cluster) (libservice.Service, error) {
|
create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) {
|
||||||
return nil, nil
|
return nil, nil, func() {}, nil
|
||||||
},
|
},
|
||||||
extraAssertion: func(clientUpstreamPort int) {},
|
extraAssertion: func(clientUpstreamPort int) {},
|
||||||
},
|
},
|
||||||
|
@ -49,7 +54,8 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
name: "http_router",
|
name: "http_router",
|
||||||
// Create a second static-service at the client agent of accepting cluster and
|
// Create a second static-service at the client agent of accepting cluster and
|
||||||
// a service-router that routes /static-server-2 to static-server-2
|
// a service-router that routes /static-server-2 to static-server-2
|
||||||
create: func(c *cluster.Cluster) (libservice.Service, error) {
|
create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) {
|
||||||
|
c := accepting
|
||||||
serviceOpts := &libservice.ServiceOpts{
|
serviceOpts := &libservice.ServiceOpts{
|
||||||
Name: libservice.StaticServer2ServiceName,
|
Name: libservice.StaticServer2ServiceName,
|
||||||
ID: "static-server-2",
|
ID: "static-server-2",
|
||||||
|
@ -60,7 +66,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(c.Clients()[0], serviceOpts)
|
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(c.Clients()[0], serviceOpts)
|
||||||
libassert.CatalogServiceExists(t, c.Clients()[0].GetClient(), libservice.StaticServer2ServiceName)
|
libassert.CatalogServiceExists(t, c.Clients()[0].GetClient(), libservice.StaticServer2ServiceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
err = c.ConfigEntryWrite(&api.ProxyConfigEntry{
|
err = c.ConfigEntryWrite(&api.ProxyConfigEntry{
|
||||||
Kind: api.ProxyDefaults,
|
Kind: api.ProxyDefaults,
|
||||||
|
@ -70,7 +76,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
routerConfigEntry := &api.ServiceRouterConfigEntry{
|
routerConfigEntry := &api.ServiceRouterConfigEntry{
|
||||||
Kind: api.ServiceRouter,
|
Kind: api.ServiceRouter,
|
||||||
|
@ -90,12 +96,127 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = c.ConfigEntryWrite(routerConfigEntry)
|
err = c.ConfigEntryWrite(routerConfigEntry)
|
||||||
return serverConnectProxy, err
|
return serverConnectProxy, nil, func() {}, err
|
||||||
},
|
},
|
||||||
extraAssertion: func(clientUpstreamPort int) {
|
extraAssertion: func(clientUpstreamPort int) {
|
||||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", clientUpstreamPort), "static-server-2")
|
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", clientUpstreamPort), "static-server-2")
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
oldversion: "1.14",
|
||||||
|
targetVersion: utils.TargetVersion,
|
||||||
|
name: "http splitter and resolver",
|
||||||
|
// In addtional to the basic topology, this case provisions the following
|
||||||
|
// services in the dialing cluster:
|
||||||
|
//
|
||||||
|
// - a new static-client at server_0 that has two upstreams: split-static-server (5000)
|
||||||
|
// and peer-static-server (5001)
|
||||||
|
// - a local static-server service at client_0
|
||||||
|
// - service-splitter named split-static-server w/ 2 services: "local-static-server" and
|
||||||
|
// "peer-static-server".
|
||||||
|
// - service-resolved named local-static-server
|
||||||
|
// - service-resolved named peer-static-server
|
||||||
|
create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) {
|
||||||
|
err := dialing.ConfigEntryWrite(&api.ProxyConfigEntry{
|
||||||
|
Kind: api.ProxyDefaults,
|
||||||
|
Name: "global",
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
clientConnectProxy, err := createAndRegisterStaticClientSidecarWithSplittingUpstreams(dialing)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, fmt.Errorf("error creating client connect proxy in cluster %s", dialing.NetworkName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// make a resolver for service peer-static-server
|
||||||
|
resolverConfigEntry := &api.ServiceResolverConfigEntry{
|
||||||
|
Kind: api.ServiceResolver,
|
||||||
|
Name: "peer-static-server",
|
||||||
|
Redirect: &api.ServiceResolverRedirect{
|
||||||
|
Service: libservice.StaticServerServiceName,
|
||||||
|
Peer: libtopology.DialingPeerName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = dialing.ConfigEntryWrite(resolverConfigEntry)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, fmt.Errorf("error writing resolver config entry for %s", resolverConfigEntry.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// make a splitter for service split-static-server
|
||||||
|
splitter := &api.ServiceSplitterConfigEntry{
|
||||||
|
Kind: api.ServiceSplitter,
|
||||||
|
Name: "split-static-server",
|
||||||
|
Splits: []api.ServiceSplit{
|
||||||
|
{
|
||||||
|
Weight: 50,
|
||||||
|
Service: "local-static-server",
|
||||||
|
ResponseHeaders: &api.HTTPHeaderModifiers{
|
||||||
|
Set: map[string]string{
|
||||||
|
"x-test-split": "local",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Weight: 50,
|
||||||
|
Service: "peer-static-server",
|
||||||
|
ResponseHeaders: &api.HTTPHeaderModifiers{
|
||||||
|
Set: map[string]string{
|
||||||
|
"x-test-split": "peer",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = dialing.ConfigEntryWrite(splitter)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, fmt.Errorf("error writing splitter config entry for %s", splitter.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// make a resolver for service local-static-server
|
||||||
|
resolverConfigEntry = &api.ServiceResolverConfigEntry{
|
||||||
|
Kind: api.ServiceResolver,
|
||||||
|
Name: "local-static-server",
|
||||||
|
Redirect: &api.ServiceResolverRedirect{
|
||||||
|
Service: libservice.StaticServerServiceName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err = dialing.ConfigEntryWrite(resolverConfigEntry)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, fmt.Errorf("error writing resolver config entry for %s", resolverConfigEntry.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a static-server in dialing cluster
|
||||||
|
serviceOpts := &libservice.ServiceOpts{
|
||||||
|
Name: libservice.StaticServerServiceName,
|
||||||
|
ID: "static-server",
|
||||||
|
HTTPPort: 8081,
|
||||||
|
GRPCPort: 8078,
|
||||||
|
}
|
||||||
|
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(dialing.Clients()[0], serviceOpts)
|
||||||
|
libassert.CatalogServiceExists(t, dialing.Clients()[0].GetClient(), libservice.StaticServerServiceName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, appPorts := clientConnectProxy.GetAddrs()
|
||||||
|
assertionFn := func() {
|
||||||
|
libassert.HTTPServiceEchoesResHeader(t, "localhost", appPorts[0], "", map[string]string{
|
||||||
|
"X-Test-Split": "local",
|
||||||
|
})
|
||||||
|
libassert.HTTPServiceEchoesResHeader(t, "localhost", appPorts[0], "", map[string]string{
|
||||||
|
"X-Test-Split": "peer",
|
||||||
|
})
|
||||||
|
libassert.HTTPServiceEchoes(t, "localhost", appPorts[0], "")
|
||||||
|
}
|
||||||
|
return serverConnectProxy, clientConnectProxy, assertionFn, nil
|
||||||
|
},
|
||||||
|
extraAssertion: func(clientUpstreamPort int) {},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
run := func(t *testing.T, tc testcase) {
|
run := func(t *testing.T, tc testcase) {
|
||||||
|
@ -115,7 +236,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
_, staticClientPort := dialing.Container.GetAddr()
|
_, staticClientPort := dialing.Container.GetAddr()
|
||||||
|
|
||||||
_, appPort := dialing.Container.GetAddr()
|
_, appPort := dialing.Container.GetAddr()
|
||||||
_, err = tc.create(acceptingCluster)
|
_, secondClientProxy, assertionAdditionalResources, err := tc.create(acceptingCluster, dialingCluster)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
tc.extraAssertion(appPort)
|
tc.extraAssertion(appPort)
|
||||||
|
|
||||||
|
@ -145,6 +266,12 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
require.NoError(t, accepting.Container.Restart())
|
require.NoError(t, accepting.Container.Restart())
|
||||||
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
|
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
|
||||||
|
|
||||||
|
// restart the secondClientProxy if exist
|
||||||
|
if secondClientProxy != nil {
|
||||||
|
require.NoError(t, secondClientProxy.Restart())
|
||||||
|
}
|
||||||
|
assertionAdditionalResources()
|
||||||
|
|
||||||
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
|
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, port := clientSidecarService.GetAddr()
|
_, port := clientSidecarService.GetAddr()
|
||||||
|
@ -165,3 +292,64 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
// time.Sleep(3 * time.Second)
|
// time.Sleep(3 * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createAndRegisterStaticClientSidecarWithSplittingUpstreams creates a static-client-1 that
|
||||||
|
// has two upstreams: split-static-server (5000) and peer-static-server (5001)
|
||||||
|
func createAndRegisterStaticClientSidecarWithSplittingUpstreams(c *cluster.Cluster) (*libservice.ConnectContainer, error) {
|
||||||
|
// Do some trickery to ensure that partial completion is correctly torn
|
||||||
|
// down, but successful execution is not.
|
||||||
|
var deferClean utils.ResettableDefer
|
||||||
|
defer deferClean.Execute()
|
||||||
|
|
||||||
|
node := c.Servers()[0]
|
||||||
|
mgwMode := api.MeshGatewayModeLocal
|
||||||
|
|
||||||
|
// Register the static-client service and sidecar first to prevent race with sidecar
|
||||||
|
// trying to get xDS before it's ready
|
||||||
|
req := &api.AgentServiceRegistration{
|
||||||
|
Name: libservice.StaticClientServiceName,
|
||||||
|
Port: 8080,
|
||||||
|
Connect: &api.AgentServiceConnect{
|
||||||
|
SidecarService: &api.AgentServiceRegistration{
|
||||||
|
Proxy: &api.AgentServiceConnectProxyConfig{
|
||||||
|
Upstreams: []api.Upstream{
|
||||||
|
{
|
||||||
|
DestinationName: "split-static-server",
|
||||||
|
LocalBindAddress: "0.0.0.0",
|
||||||
|
LocalBindPort: cluster.ServiceUpstreamLocalBindPort,
|
||||||
|
MeshGateway: api.MeshGatewayConfig{
|
||||||
|
Mode: mgwMode,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
DestinationName: "peer-static-server",
|
||||||
|
LocalBindAddress: "0.0.0.0",
|
||||||
|
LocalBindPort: cluster.ServiceUpstreamLocalBindPort2,
|
||||||
|
MeshGateway: api.MeshGatewayConfig{
|
||||||
|
Mode: mgwMode,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := node.GetClient().Agent().ServiceRegister(req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a service and proxy instance
|
||||||
|
clientConnectProxy, err := libservice.NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", libservice.StaticClientServiceName), libservice.StaticClientServiceName, []int{cluster.ServiceUpstreamLocalBindPort, cluster.ServiceUpstreamLocalBindPort2}, node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
deferClean.Add(func() {
|
||||||
|
_ = clientConnectProxy.Terminate()
|
||||||
|
})
|
||||||
|
|
||||||
|
// disable cleanup functions now that we have an object with a Terminate() function
|
||||||
|
deferClean.Reset()
|
||||||
|
|
||||||
|
return clientConnectProxy, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue